dial.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. // Copyright 2015 Google LLC.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. // Package grpc supports network connections to GRPC servers.
  5. // This package is not intended for use by end developers. Use the
  6. // google.golang.org/api/option package to configure API clients.
  7. package grpc
  8. import (
  9. "context"
  10. "errors"
  11. "log"
  12. "os"
  13. "strings"
  14. "go.opencensus.io/plugin/ocgrpc"
  15. "golang.org/x/oauth2"
  16. "google.golang.org/api/internal"
  17. "google.golang.org/api/option"
  18. "google.golang.org/grpc"
  19. "google.golang.org/grpc/credentials"
  20. grpcgoogle "google.golang.org/grpc/credentials/google"
  21. "google.golang.org/grpc/credentials/oauth"
  22. // Install grpclb, which is required for direct path.
  23. _ "google.golang.org/grpc/balancer/grpclb"
  24. )
  25. // Set at init time by dial_appengine.go. If nil, we're not on App Engine.
  26. var appengineDialerHook func(context.Context) grpc.DialOption
  27. // Set at init time by dial_socketopt.go. If nil, socketopt is not supported.
  28. var timeoutDialerOption grpc.DialOption
  29. // Dial returns a GRPC connection for use communicating with a Google cloud
  30. // service, configured with the given ClientOptions.
  31. func Dial(ctx context.Context, opts ...option.ClientOption) (*grpc.ClientConn, error) {
  32. return dial(ctx, false, opts)
  33. }
  34. // DialInsecure returns an insecure GRPC connection for use communicating
  35. // with fake or mock Google cloud service implementations, such as emulators.
  36. // The connection is configured with the given ClientOptions.
  37. func DialInsecure(ctx context.Context, opts ...option.ClientOption) (*grpc.ClientConn, error) {
  38. return dial(ctx, true, opts)
  39. }
  40. func dial(ctx context.Context, insecure bool, opts []option.ClientOption) (*grpc.ClientConn, error) {
  41. var o internal.DialSettings
  42. for _, opt := range opts {
  43. opt.Apply(&o)
  44. }
  45. if err := o.Validate(); err != nil {
  46. return nil, err
  47. }
  48. if o.HTTPClient != nil {
  49. return nil, errors.New("unsupported HTTP client specified")
  50. }
  51. if o.GRPCConn != nil {
  52. return o.GRPCConn, nil
  53. }
  54. var grpcOpts []grpc.DialOption
  55. if insecure {
  56. grpcOpts = []grpc.DialOption{grpc.WithInsecure()}
  57. } else if !o.NoAuth {
  58. if o.APIKey != "" {
  59. log.Print("API keys are not supported for gRPC APIs. Remove the WithAPIKey option from your client-creating call.")
  60. }
  61. creds, err := internal.Creds(ctx, &o)
  62. if err != nil {
  63. return nil, err
  64. }
  65. // Attempt Direct Path only if:
  66. // * The endpoint is a host:port (or dns:///host:port).
  67. // * Credentials are obtained via GCE metadata server, using the default
  68. // service account.
  69. // * Opted in via GOOGLE_CLOUD_ENABLE_DIRECT_PATH environment variable.
  70. // For example, GOOGLE_CLOUD_ENABLE_DIRECT_PATH=spanner,pubsub
  71. if isDirectPathEnabled(o.Endpoint) && isTokenSourceDirectPathCompatible(creds.TokenSource) {
  72. if !strings.HasPrefix(o.Endpoint, "dns:///") {
  73. o.Endpoint = "dns:///" + o.Endpoint
  74. }
  75. grpcOpts = []grpc.DialOption{
  76. grpc.WithCredentialsBundle(
  77. grpcgoogle.NewComputeEngineCredentials(),
  78. ),
  79. }
  80. // TODO(cbro): add support for system parameters (quota project, request reason) via chained interceptor.
  81. } else {
  82. grpcOpts = []grpc.DialOption{
  83. grpc.WithPerRPCCredentials(grpcTokenSource{
  84. TokenSource: oauth.TokenSource{creds.TokenSource},
  85. quotaProject: o.QuotaProject,
  86. requestReason: o.RequestReason,
  87. }),
  88. grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, "")),
  89. }
  90. }
  91. }
  92. if appengineDialerHook != nil {
  93. // Use the Socket API on App Engine.
  94. // appengine dialer will override socketopt dialer
  95. grpcOpts = append(grpcOpts, appengineDialerHook(ctx))
  96. }
  97. // Add tracing, but before the other options, so that clients can override the
  98. // gRPC stats handler.
  99. // This assumes that gRPC options are processed in order, left to right.
  100. grpcOpts = addOCStatsHandler(grpcOpts, o)
  101. grpcOpts = append(grpcOpts, o.GRPCDialOpts...)
  102. if o.UserAgent != "" {
  103. grpcOpts = append(grpcOpts, grpc.WithUserAgent(o.UserAgent))
  104. }
  105. // TODO(weiranf): This socketopt dialer will be used by default at some
  106. // point when isDirectPathEnabled will default to true, we guard it by
  107. // the Directpath env var for now once we can introspect user defined
  108. // dialer (https://github.com/grpc/grpc-go/issues/2795).
  109. if timeoutDialerOption != nil && isDirectPathEnabled(o.Endpoint) {
  110. grpcOpts = append(grpcOpts, timeoutDialerOption)
  111. }
  112. return grpc.DialContext(ctx, o.Endpoint, grpcOpts...)
  113. }
  114. func addOCStatsHandler(opts []grpc.DialOption, settings internal.DialSettings) []grpc.DialOption {
  115. if settings.TelemetryDisabled {
  116. return opts
  117. }
  118. return append(opts, grpc.WithStatsHandler(&ocgrpc.ClientHandler{}))
  119. }
  120. // grpcTokenSource supplies PerRPCCredentials from an oauth.TokenSource.
  121. type grpcTokenSource struct {
  122. oauth.TokenSource
  123. // Additional metadata attached as headers.
  124. quotaProject string
  125. requestReason string
  126. }
  127. // GetRequestMetadata gets the request metadata as a map from a grpcTokenSource.
  128. func (ts grpcTokenSource) GetRequestMetadata(ctx context.Context, uri ...string) (
  129. map[string]string, error) {
  130. metadata, err := ts.TokenSource.GetRequestMetadata(ctx, uri...)
  131. if err != nil {
  132. return nil, err
  133. }
  134. // Attach system parameter
  135. if ts.quotaProject != "" {
  136. metadata["X-goog-user-project"] = ts.quotaProject
  137. }
  138. if ts.requestReason != "" {
  139. metadata["X-goog-request-reason"] = ts.requestReason
  140. }
  141. return metadata, nil
  142. }
  143. func isTokenSourceDirectPathCompatible(ts oauth2.TokenSource) bool {
  144. if ts == nil {
  145. return false
  146. }
  147. tok, err := ts.Token()
  148. if err != nil {
  149. return false
  150. }
  151. if tok == nil {
  152. return false
  153. }
  154. if source, _ := tok.Extra("oauth2.google.tokenSource").(string); source != "compute-metadata" {
  155. return false
  156. }
  157. if acct, _ := tok.Extra("oauth2.google.serviceAccount").(string); acct != "default" {
  158. return false
  159. }
  160. return true
  161. }
  162. func isDirectPathEnabled(endpoint string) bool {
  163. // Only host:port is supported, not other schemes (e.g., "tcp://" or "unix://").
  164. // Also don't try direct path if the user has chosen an alternate name resolver
  165. // (i.e., via ":///" prefix).
  166. //
  167. // TODO(cbro): once gRPC has introspectible options, check the user hasn't
  168. // provided a custom dialer in gRPC options.
  169. if strings.Contains(endpoint, "://") && !strings.HasPrefix(endpoint, "dns:///") {
  170. return false
  171. }
  172. // Only try direct path if the user has opted in via the environment variable.
  173. whitelist := strings.Split(os.Getenv("GOOGLE_CLOUD_ENABLE_DIRECT_PATH"), ",")
  174. for _, api := range whitelist {
  175. // Ignore empty string since an empty env variable splits into [""]
  176. if api != "" && strings.Contains(endpoint, api) {
  177. return true
  178. }
  179. }
  180. return false
  181. }