grpclb_remote_balancer.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355
  1. /*
  2. *
  3. * Copyright 2017 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package grpclb
  19. import (
  20. "context"
  21. "fmt"
  22. "io"
  23. "net"
  24. "reflect"
  25. "time"
  26. timestamppb "github.com/golang/protobuf/ptypes/timestamp"
  27. "google.golang.org/grpc"
  28. "google.golang.org/grpc/balancer"
  29. lbpb "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1"
  30. "google.golang.org/grpc/connectivity"
  31. "google.golang.org/grpc/grpclog"
  32. "google.golang.org/grpc/internal"
  33. "google.golang.org/grpc/internal/channelz"
  34. "google.golang.org/grpc/metadata"
  35. "google.golang.org/grpc/resolver"
  36. )
  37. // processServerList updates balaner's internal state, create/remove SubConns
  38. // and regenerates picker using the received serverList.
  39. func (lb *lbBalancer) processServerList(l *lbpb.ServerList) {
  40. if grpclog.V(2) {
  41. grpclog.Infof("lbBalancer: processing server list: %+v", l)
  42. }
  43. lb.mu.Lock()
  44. defer lb.mu.Unlock()
  45. // Set serverListReceived to true so fallback will not take effect if it has
  46. // not hit timeout.
  47. lb.serverListReceived = true
  48. // If the new server list == old server list, do nothing.
  49. if reflect.DeepEqual(lb.fullServerList, l.Servers) {
  50. if grpclog.V(2) {
  51. grpclog.Infof("lbBalancer: new serverlist same as the previous one, ignoring")
  52. }
  53. return
  54. }
  55. lb.fullServerList = l.Servers
  56. var backendAddrs []resolver.Address
  57. for i, s := range l.Servers {
  58. if s.Drop {
  59. continue
  60. }
  61. md := metadata.Pairs(lbTokeyKey, s.LoadBalanceToken)
  62. ip := net.IP(s.IpAddress)
  63. ipStr := ip.String()
  64. if ip.To4() == nil {
  65. // Add square brackets to ipv6 addresses, otherwise net.Dial() and
  66. // net.SplitHostPort() will return too many colons error.
  67. ipStr = fmt.Sprintf("[%s]", ipStr)
  68. }
  69. addr := resolver.Address{
  70. Addr: fmt.Sprintf("%s:%d", ipStr, s.Port),
  71. Metadata: &md,
  72. }
  73. if grpclog.V(2) {
  74. grpclog.Infof("lbBalancer: server list entry[%d]: ipStr:|%s|, port:|%d|, load balancer token:|%v|",
  75. i, ipStr, s.Port, s.LoadBalanceToken)
  76. }
  77. backendAddrs = append(backendAddrs, addr)
  78. }
  79. // Call refreshSubConns to create/remove SubConns. If we are in fallback,
  80. // this is also exiting fallback.
  81. lb.refreshSubConns(backendAddrs, false, lb.usePickFirst)
  82. }
  83. // refreshSubConns creates/removes SubConns with backendAddrs, and refreshes
  84. // balancer state and picker.
  85. //
  86. // Caller must hold lb.mu.
  87. func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fallback bool, pickFirst bool) {
  88. defer func() {
  89. // Regenerate and update picker after refreshing subconns because with
  90. // cache, even if SubConn was newed/removed, there might be no state
  91. // changes (the subconn will be kept in cache, not actually
  92. // newed/removed).
  93. lb.updateStateAndPicker(true, true)
  94. }()
  95. lb.inFallback = fallback
  96. opts := balancer.NewSubConnOptions{}
  97. if !fallback {
  98. opts.CredsBundle = lb.grpclbBackendCreds
  99. }
  100. lb.backendAddrs = backendAddrs
  101. lb.backendAddrsWithoutMetadata = nil
  102. if lb.usePickFirst != pickFirst {
  103. // Remove all SubConns when switching modes.
  104. for a, sc := range lb.subConns {
  105. if lb.usePickFirst {
  106. lb.cc.cc.RemoveSubConn(sc)
  107. } else {
  108. lb.cc.RemoveSubConn(sc)
  109. }
  110. delete(lb.subConns, a)
  111. }
  112. lb.usePickFirst = pickFirst
  113. }
  114. if lb.usePickFirst {
  115. var sc balancer.SubConn
  116. for _, sc = range lb.subConns {
  117. break
  118. }
  119. if sc != nil {
  120. sc.UpdateAddresses(backendAddrs)
  121. sc.Connect()
  122. return
  123. }
  124. // This bypasses the cc wrapper with SubConn cache.
  125. sc, err := lb.cc.cc.NewSubConn(backendAddrs, opts)
  126. if err != nil {
  127. grpclog.Warningf("grpclb: failed to create new SubConn: %v", err)
  128. return
  129. }
  130. sc.Connect()
  131. lb.subConns[backendAddrs[0]] = sc
  132. lb.scStates[sc] = connectivity.Idle
  133. return
  134. }
  135. // addrsSet is the set converted from backendAddrsWithoutMetadata, it's used to quick
  136. // lookup for an address.
  137. addrsSet := make(map[resolver.Address]struct{})
  138. // Create new SubConns.
  139. for _, addr := range backendAddrs {
  140. addrWithoutMD := addr
  141. addrWithoutMD.Metadata = nil
  142. addrsSet[addrWithoutMD] = struct{}{}
  143. lb.backendAddrsWithoutMetadata = append(lb.backendAddrsWithoutMetadata, addrWithoutMD)
  144. if _, ok := lb.subConns[addrWithoutMD]; !ok {
  145. // Use addrWithMD to create the SubConn.
  146. sc, err := lb.cc.NewSubConn([]resolver.Address{addr}, opts)
  147. if err != nil {
  148. grpclog.Warningf("grpclb: failed to create new SubConn: %v", err)
  149. continue
  150. }
  151. lb.subConns[addrWithoutMD] = sc // Use the addr without MD as key for the map.
  152. if _, ok := lb.scStates[sc]; !ok {
  153. // Only set state of new sc to IDLE. The state could already be
  154. // READY for cached SubConns.
  155. lb.scStates[sc] = connectivity.Idle
  156. }
  157. sc.Connect()
  158. }
  159. }
  160. for a, sc := range lb.subConns {
  161. // a was removed by resolver.
  162. if _, ok := addrsSet[a]; !ok {
  163. lb.cc.RemoveSubConn(sc)
  164. delete(lb.subConns, a)
  165. // Keep the state of this sc in b.scStates until sc's state becomes Shutdown.
  166. // The entry will be deleted in HandleSubConnStateChange.
  167. }
  168. }
  169. }
  170. func (lb *lbBalancer) readServerList(s *balanceLoadClientStream) error {
  171. for {
  172. reply, err := s.Recv()
  173. if err != nil {
  174. if err == io.EOF {
  175. return errServerTerminatedConnection
  176. }
  177. return fmt.Errorf("grpclb: failed to recv server list: %v", err)
  178. }
  179. if serverList := reply.GetServerList(); serverList != nil {
  180. lb.processServerList(serverList)
  181. }
  182. }
  183. }
  184. func (lb *lbBalancer) sendLoadReport(s *balanceLoadClientStream, interval time.Duration) {
  185. ticker := time.NewTicker(interval)
  186. defer ticker.Stop()
  187. for {
  188. select {
  189. case <-ticker.C:
  190. case <-s.Context().Done():
  191. return
  192. }
  193. stats := lb.clientStats.toClientStats()
  194. t := time.Now()
  195. stats.Timestamp = &timestamppb.Timestamp{
  196. Seconds: t.Unix(),
  197. Nanos: int32(t.Nanosecond()),
  198. }
  199. if err := s.Send(&lbpb.LoadBalanceRequest{
  200. LoadBalanceRequestType: &lbpb.LoadBalanceRequest_ClientStats{
  201. ClientStats: stats,
  202. },
  203. }); err != nil {
  204. return
  205. }
  206. }
  207. }
  208. func (lb *lbBalancer) callRemoteBalancer() (backoff bool, _ error) {
  209. lbClient := &loadBalancerClient{cc: lb.ccRemoteLB}
  210. ctx, cancel := context.WithCancel(context.Background())
  211. defer cancel()
  212. stream, err := lbClient.BalanceLoad(ctx, grpc.WaitForReady(true))
  213. if err != nil {
  214. return true, fmt.Errorf("grpclb: failed to perform RPC to the remote balancer %v", err)
  215. }
  216. lb.mu.Lock()
  217. lb.remoteBalancerConnected = true
  218. lb.mu.Unlock()
  219. // grpclb handshake on the stream.
  220. initReq := &lbpb.LoadBalanceRequest{
  221. LoadBalanceRequestType: &lbpb.LoadBalanceRequest_InitialRequest{
  222. InitialRequest: &lbpb.InitialLoadBalanceRequest{
  223. Name: lb.target,
  224. },
  225. },
  226. }
  227. if err := stream.Send(initReq); err != nil {
  228. return true, fmt.Errorf("grpclb: failed to send init request: %v", err)
  229. }
  230. reply, err := stream.Recv()
  231. if err != nil {
  232. return true, fmt.Errorf("grpclb: failed to recv init response: %v", err)
  233. }
  234. initResp := reply.GetInitialResponse()
  235. if initResp == nil {
  236. return true, fmt.Errorf("grpclb: reply from remote balancer did not include initial response")
  237. }
  238. if initResp.LoadBalancerDelegate != "" {
  239. return true, fmt.Errorf("grpclb: Delegation is not supported")
  240. }
  241. go func() {
  242. if d := convertDuration(initResp.ClientStatsReportInterval); d > 0 {
  243. lb.sendLoadReport(stream, d)
  244. }
  245. }()
  246. // No backoff if init req/resp handshake was successful.
  247. return false, lb.readServerList(stream)
  248. }
  249. func (lb *lbBalancer) watchRemoteBalancer() {
  250. var retryCount int
  251. for {
  252. doBackoff, err := lb.callRemoteBalancer()
  253. select {
  254. case <-lb.doneCh:
  255. return
  256. default:
  257. if err != nil {
  258. if err == errServerTerminatedConnection {
  259. grpclog.Info(err)
  260. } else {
  261. grpclog.Warning(err)
  262. }
  263. }
  264. }
  265. // Trigger a re-resolve when the stream errors.
  266. lb.cc.cc.ResolveNow(resolver.ResolveNowOption{})
  267. lb.mu.Lock()
  268. lb.remoteBalancerConnected = false
  269. lb.fullServerList = nil
  270. // Enter fallback when connection to remote balancer is lost, and the
  271. // aggregated state is not Ready.
  272. if !lb.inFallback && lb.state != connectivity.Ready {
  273. // Entering fallback.
  274. lb.refreshSubConns(lb.resolvedBackendAddrs, true, lb.usePickFirst)
  275. }
  276. lb.mu.Unlock()
  277. if !doBackoff {
  278. retryCount = 0
  279. continue
  280. }
  281. timer := time.NewTimer(lb.backoff.Backoff(retryCount))
  282. select {
  283. case <-timer.C:
  284. case <-lb.doneCh:
  285. timer.Stop()
  286. return
  287. }
  288. retryCount++
  289. }
  290. }
  291. func (lb *lbBalancer) dialRemoteLB(remoteLBName string) {
  292. var dopts []grpc.DialOption
  293. if creds := lb.opt.DialCreds; creds != nil {
  294. if err := creds.OverrideServerName(remoteLBName); err == nil {
  295. dopts = append(dopts, grpc.WithTransportCredentials(creds))
  296. } else {
  297. grpclog.Warningf("grpclb: failed to override the server name in the credentials: %v, using Insecure", err)
  298. dopts = append(dopts, grpc.WithInsecure())
  299. }
  300. } else if bundle := lb.grpclbClientConnCreds; bundle != nil {
  301. dopts = append(dopts, grpc.WithCredentialsBundle(bundle))
  302. } else {
  303. dopts = append(dopts, grpc.WithInsecure())
  304. }
  305. if lb.opt.Dialer != nil {
  306. dopts = append(dopts, grpc.WithContextDialer(lb.opt.Dialer))
  307. }
  308. // Explicitly set pickfirst as the balancer.
  309. dopts = append(dopts, grpc.WithBalancerName(grpc.PickFirstBalancerName))
  310. wrb := internal.WithResolverBuilder.(func(resolver.Builder) grpc.DialOption)
  311. dopts = append(dopts, wrb(lb.manualResolver))
  312. if channelz.IsOn() {
  313. dopts = append(dopts, grpc.WithChannelzParentID(lb.opt.ChannelzParentID))
  314. }
  315. // DialContext using manualResolver.Scheme, which is a random scheme
  316. // generated when init grpclb. The target scheme here is not important.
  317. //
  318. // The grpc dial target will be used by the creds (ALTS) as the authority,
  319. // so it has to be set to remoteLBName that comes from resolver.
  320. cc, err := grpc.DialContext(context.Background(), remoteLBName, dopts...)
  321. if err != nil {
  322. grpclog.Fatalf("failed to dial: %v", err)
  323. }
  324. lb.ccRemoteLB = cc
  325. go lb.watchRemoteBalancer()
  326. }