http2_server.go 36 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214
  1. /*
  2. *
  3. * Copyright 2014 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package transport
  19. import (
  20. "bytes"
  21. "context"
  22. "errors"
  23. "fmt"
  24. "io"
  25. "math"
  26. "net"
  27. "strconv"
  28. "sync"
  29. "sync/atomic"
  30. "time"
  31. "github.com/golang/protobuf/proto"
  32. "golang.org/x/net/http2"
  33. "golang.org/x/net/http2/hpack"
  34. "google.golang.org/grpc/codes"
  35. "google.golang.org/grpc/credentials"
  36. "google.golang.org/grpc/grpclog"
  37. "google.golang.org/grpc/internal/channelz"
  38. "google.golang.org/grpc/internal/grpcrand"
  39. "google.golang.org/grpc/keepalive"
  40. "google.golang.org/grpc/metadata"
  41. "google.golang.org/grpc/peer"
  42. "google.golang.org/grpc/stats"
  43. "google.golang.org/grpc/status"
  44. "google.golang.org/grpc/tap"
  45. )
  46. var (
  47. // ErrIllegalHeaderWrite indicates that setting header is illegal because of
  48. // the stream's state.
  49. ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHeader was already called")
  50. // ErrHeaderListSizeLimitViolation indicates that the header list size is larger
  51. // than the limit set by peer.
  52. ErrHeaderListSizeLimitViolation = errors.New("transport: trying to send header list size larger than the limit set by peer")
  53. )
  54. // http2Server implements the ServerTransport interface with HTTP2.
  55. type http2Server struct {
  56. ctx context.Context
  57. ctxDone <-chan struct{} // Cache the context.Done() chan
  58. cancel context.CancelFunc
  59. conn net.Conn
  60. loopy *loopyWriter
  61. readerDone chan struct{} // sync point to enable testing.
  62. writerDone chan struct{} // sync point to enable testing.
  63. remoteAddr net.Addr
  64. localAddr net.Addr
  65. maxStreamID uint32 // max stream ID ever seen
  66. authInfo credentials.AuthInfo // auth info about the connection
  67. inTapHandle tap.ServerInHandle
  68. framer *framer
  69. // The max number of concurrent streams.
  70. maxStreams uint32
  71. // controlBuf delivers all the control related tasks (e.g., window
  72. // updates, reset streams, and various settings) to the controller.
  73. controlBuf *controlBuffer
  74. fc *trInFlow
  75. stats stats.Handler
  76. // Flag to keep track of reading activity on transport.
  77. // 1 is true and 0 is false.
  78. activity uint32 // Accessed atomically.
  79. // Keepalive and max-age parameters for the server.
  80. kp keepalive.ServerParameters
  81. // Keepalive enforcement policy.
  82. kep keepalive.EnforcementPolicy
  83. // The time instance last ping was received.
  84. lastPingAt time.Time
  85. // Number of times the client has violated keepalive ping policy so far.
  86. pingStrikes uint8
  87. // Flag to signify that number of ping strikes should be reset to 0.
  88. // This is set whenever data or header frames are sent.
  89. // 1 means yes.
  90. resetPingStrikes uint32 // Accessed atomically.
  91. initialWindowSize int32
  92. bdpEst *bdpEstimator
  93. maxSendHeaderListSize *uint32
  94. mu sync.Mutex // guard the following
  95. // drainChan is initialized when drain(...) is called the first time.
  96. // After which the server writes out the first GoAway(with ID 2^31-1) frame.
  97. // Then an independent goroutine will be launched to later send the second GoAway.
  98. // During this time we don't want to write another first GoAway(with ID 2^31 -1) frame.
  99. // Thus call to drain(...) will be a no-op if drainChan is already initialized since draining is
  100. // already underway.
  101. drainChan chan struct{}
  102. state transportState
  103. activeStreams map[uint32]*Stream
  104. // idle is the time instant when the connection went idle.
  105. // This is either the beginning of the connection or when the number of
  106. // RPCs go down to 0.
  107. // When the connection is busy, this value is set to 0.
  108. idle time.Time
  109. // Fields below are for channelz metric collection.
  110. channelzID int64 // channelz unique identification number
  111. czData *channelzData
  112. }
  113. // newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
  114. // returned if something goes wrong.
  115. func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
  116. writeBufSize := config.WriteBufferSize
  117. readBufSize := config.ReadBufferSize
  118. maxHeaderListSize := defaultServerMaxHeaderListSize
  119. if config.MaxHeaderListSize != nil {
  120. maxHeaderListSize = *config.MaxHeaderListSize
  121. }
  122. framer := newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize)
  123. // Send initial settings as connection preface to client.
  124. var isettings []http2.Setting
  125. // TODO(zhaoq): Have a better way to signal "no limit" because 0 is
  126. // permitted in the HTTP2 spec.
  127. maxStreams := config.MaxStreams
  128. if maxStreams == 0 {
  129. maxStreams = math.MaxUint32
  130. } else {
  131. isettings = append(isettings, http2.Setting{
  132. ID: http2.SettingMaxConcurrentStreams,
  133. Val: maxStreams,
  134. })
  135. }
  136. dynamicWindow := true
  137. iwz := int32(initialWindowSize)
  138. if config.InitialWindowSize >= defaultWindowSize {
  139. iwz = config.InitialWindowSize
  140. dynamicWindow = false
  141. }
  142. icwz := int32(initialWindowSize)
  143. if config.InitialConnWindowSize >= defaultWindowSize {
  144. icwz = config.InitialConnWindowSize
  145. dynamicWindow = false
  146. }
  147. if iwz != defaultWindowSize {
  148. isettings = append(isettings, http2.Setting{
  149. ID: http2.SettingInitialWindowSize,
  150. Val: uint32(iwz)})
  151. }
  152. if config.MaxHeaderListSize != nil {
  153. isettings = append(isettings, http2.Setting{
  154. ID: http2.SettingMaxHeaderListSize,
  155. Val: *config.MaxHeaderListSize,
  156. })
  157. }
  158. if err := framer.fr.WriteSettings(isettings...); err != nil {
  159. return nil, connectionErrorf(false, err, "transport: %v", err)
  160. }
  161. // Adjust the connection flow control window if needed.
  162. if delta := uint32(icwz - defaultWindowSize); delta > 0 {
  163. if err := framer.fr.WriteWindowUpdate(0, delta); err != nil {
  164. return nil, connectionErrorf(false, err, "transport: %v", err)
  165. }
  166. }
  167. kp := config.KeepaliveParams
  168. if kp.MaxConnectionIdle == 0 {
  169. kp.MaxConnectionIdle = defaultMaxConnectionIdle
  170. }
  171. if kp.MaxConnectionAge == 0 {
  172. kp.MaxConnectionAge = defaultMaxConnectionAge
  173. }
  174. // Add a jitter to MaxConnectionAge.
  175. kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge)
  176. if kp.MaxConnectionAgeGrace == 0 {
  177. kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace
  178. }
  179. if kp.Time == 0 {
  180. kp.Time = defaultServerKeepaliveTime
  181. }
  182. if kp.Timeout == 0 {
  183. kp.Timeout = defaultServerKeepaliveTimeout
  184. }
  185. kep := config.KeepalivePolicy
  186. if kep.MinTime == 0 {
  187. kep.MinTime = defaultKeepalivePolicyMinTime
  188. }
  189. ctx, cancel := context.WithCancel(context.Background())
  190. t := &http2Server{
  191. ctx: ctx,
  192. cancel: cancel,
  193. ctxDone: ctx.Done(),
  194. conn: conn,
  195. remoteAddr: conn.RemoteAddr(),
  196. localAddr: conn.LocalAddr(),
  197. authInfo: config.AuthInfo,
  198. framer: framer,
  199. readerDone: make(chan struct{}),
  200. writerDone: make(chan struct{}),
  201. maxStreams: maxStreams,
  202. inTapHandle: config.InTapHandle,
  203. fc: &trInFlow{limit: uint32(icwz)},
  204. state: reachable,
  205. activeStreams: make(map[uint32]*Stream),
  206. stats: config.StatsHandler,
  207. kp: kp,
  208. idle: time.Now(),
  209. kep: kep,
  210. initialWindowSize: iwz,
  211. czData: new(channelzData),
  212. }
  213. t.controlBuf = newControlBuffer(t.ctxDone)
  214. if dynamicWindow {
  215. t.bdpEst = &bdpEstimator{
  216. bdp: initialWindowSize,
  217. updateFlowControl: t.updateFlowControl,
  218. }
  219. }
  220. if t.stats != nil {
  221. t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{
  222. RemoteAddr: t.remoteAddr,
  223. LocalAddr: t.localAddr,
  224. })
  225. connBegin := &stats.ConnBegin{}
  226. t.stats.HandleConn(t.ctx, connBegin)
  227. }
  228. if channelz.IsOn() {
  229. t.channelzID = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr))
  230. }
  231. t.framer.writer.Flush()
  232. defer func() {
  233. if err != nil {
  234. t.Close()
  235. }
  236. }()
  237. // Check the validity of client preface.
  238. preface := make([]byte, len(clientPreface))
  239. if _, err := io.ReadFull(t.conn, preface); err != nil {
  240. return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to receive the preface from client: %v", err)
  241. }
  242. if !bytes.Equal(preface, clientPreface) {
  243. return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams received bogus greeting from client: %q", preface)
  244. }
  245. frame, err := t.framer.fr.ReadFrame()
  246. if err == io.EOF || err == io.ErrUnexpectedEOF {
  247. return nil, err
  248. }
  249. if err != nil {
  250. return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to read initial settings frame: %v", err)
  251. }
  252. atomic.StoreUint32(&t.activity, 1)
  253. sf, ok := frame.(*http2.SettingsFrame)
  254. if !ok {
  255. return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
  256. }
  257. t.handleSettings(sf)
  258. go func() {
  259. t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst)
  260. t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
  261. if err := t.loopy.run(); err != nil {
  262. errorf("transport: loopyWriter.run returning. Err: %v", err)
  263. }
  264. t.conn.Close()
  265. close(t.writerDone)
  266. }()
  267. go t.keepalive()
  268. return t, nil
  269. }
  270. // operateHeader takes action on the decoded headers.
  271. func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) {
  272. streamID := frame.Header().StreamID
  273. state := &decodeState{
  274. serverSide: true,
  275. }
  276. if err := state.decodeHeader(frame); err != nil {
  277. if se, ok := status.FromError(err); ok {
  278. t.controlBuf.put(&cleanupStream{
  279. streamID: streamID,
  280. rst: true,
  281. rstCode: statusCodeConvTab[se.Code()],
  282. onWrite: func() {},
  283. })
  284. }
  285. return false
  286. }
  287. buf := newRecvBuffer()
  288. s := &Stream{
  289. id: streamID,
  290. st: t,
  291. buf: buf,
  292. fc: &inFlow{limit: uint32(t.initialWindowSize)},
  293. recvCompress: state.data.encoding,
  294. method: state.data.method,
  295. contentSubtype: state.data.contentSubtype,
  296. }
  297. if frame.StreamEnded() {
  298. // s is just created by the caller. No lock needed.
  299. s.state = streamReadDone
  300. }
  301. if state.data.timeoutSet {
  302. s.ctx, s.cancel = context.WithTimeout(t.ctx, state.data.timeout)
  303. } else {
  304. s.ctx, s.cancel = context.WithCancel(t.ctx)
  305. }
  306. pr := &peer.Peer{
  307. Addr: t.remoteAddr,
  308. }
  309. // Attach Auth info if there is any.
  310. if t.authInfo != nil {
  311. pr.AuthInfo = t.authInfo
  312. }
  313. s.ctx = peer.NewContext(s.ctx, pr)
  314. // Attach the received metadata to the context.
  315. if len(state.data.mdata) > 0 {
  316. s.ctx = metadata.NewIncomingContext(s.ctx, state.data.mdata)
  317. }
  318. if state.data.statsTags != nil {
  319. s.ctx = stats.SetIncomingTags(s.ctx, state.data.statsTags)
  320. }
  321. if state.data.statsTrace != nil {
  322. s.ctx = stats.SetIncomingTrace(s.ctx, state.data.statsTrace)
  323. }
  324. if t.inTapHandle != nil {
  325. var err error
  326. info := &tap.Info{
  327. FullMethodName: state.data.method,
  328. }
  329. s.ctx, err = t.inTapHandle(s.ctx, info)
  330. if err != nil {
  331. warningf("transport: http2Server.operateHeaders got an error from InTapHandle: %v", err)
  332. t.controlBuf.put(&cleanupStream{
  333. streamID: s.id,
  334. rst: true,
  335. rstCode: http2.ErrCodeRefusedStream,
  336. onWrite: func() {},
  337. })
  338. return false
  339. }
  340. }
  341. t.mu.Lock()
  342. if t.state != reachable {
  343. t.mu.Unlock()
  344. return false
  345. }
  346. if uint32(len(t.activeStreams)) >= t.maxStreams {
  347. t.mu.Unlock()
  348. t.controlBuf.put(&cleanupStream{
  349. streamID: streamID,
  350. rst: true,
  351. rstCode: http2.ErrCodeRefusedStream,
  352. onWrite: func() {},
  353. })
  354. return false
  355. }
  356. if streamID%2 != 1 || streamID <= t.maxStreamID {
  357. t.mu.Unlock()
  358. // illegal gRPC stream id.
  359. errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID)
  360. return true
  361. }
  362. t.maxStreamID = streamID
  363. t.activeStreams[streamID] = s
  364. if len(t.activeStreams) == 1 {
  365. t.idle = time.Time{}
  366. }
  367. t.mu.Unlock()
  368. if channelz.IsOn() {
  369. atomic.AddInt64(&t.czData.streamsStarted, 1)
  370. atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
  371. }
  372. s.requestRead = func(n int) {
  373. t.adjustWindow(s, uint32(n))
  374. }
  375. s.ctx = traceCtx(s.ctx, s.method)
  376. if t.stats != nil {
  377. s.ctx = t.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
  378. inHeader := &stats.InHeader{
  379. FullMethod: s.method,
  380. RemoteAddr: t.remoteAddr,
  381. LocalAddr: t.localAddr,
  382. Compression: s.recvCompress,
  383. WireLength: int(frame.Header().Length),
  384. }
  385. t.stats.HandleRPC(s.ctx, inHeader)
  386. }
  387. s.ctxDone = s.ctx.Done()
  388. s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
  389. s.trReader = &transportReader{
  390. reader: &recvBufferReader{
  391. ctx: s.ctx,
  392. ctxDone: s.ctxDone,
  393. recv: s.buf,
  394. },
  395. windowHandler: func(n int) {
  396. t.updateWindow(s, uint32(n))
  397. },
  398. }
  399. // Register the stream with loopy.
  400. t.controlBuf.put(&registerStream{
  401. streamID: s.id,
  402. wq: s.wq,
  403. })
  404. handle(s)
  405. return false
  406. }
  407. // HandleStreams receives incoming streams using the given handler. This is
  408. // typically run in a separate goroutine.
  409. // traceCtx attaches trace to ctx and returns the new context.
  410. func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
  411. defer close(t.readerDone)
  412. for {
  413. frame, err := t.framer.fr.ReadFrame()
  414. atomic.StoreUint32(&t.activity, 1)
  415. if err != nil {
  416. if se, ok := err.(http2.StreamError); ok {
  417. warningf("transport: http2Server.HandleStreams encountered http2.StreamError: %v", se)
  418. t.mu.Lock()
  419. s := t.activeStreams[se.StreamID]
  420. t.mu.Unlock()
  421. if s != nil {
  422. t.closeStream(s, true, se.Code, false)
  423. } else {
  424. t.controlBuf.put(&cleanupStream{
  425. streamID: se.StreamID,
  426. rst: true,
  427. rstCode: se.Code,
  428. onWrite: func() {},
  429. })
  430. }
  431. continue
  432. }
  433. if err == io.EOF || err == io.ErrUnexpectedEOF {
  434. t.Close()
  435. return
  436. }
  437. warningf("transport: http2Server.HandleStreams failed to read frame: %v", err)
  438. t.Close()
  439. return
  440. }
  441. switch frame := frame.(type) {
  442. case *http2.MetaHeadersFrame:
  443. if t.operateHeaders(frame, handle, traceCtx) {
  444. t.Close()
  445. break
  446. }
  447. case *http2.DataFrame:
  448. t.handleData(frame)
  449. case *http2.RSTStreamFrame:
  450. t.handleRSTStream(frame)
  451. case *http2.SettingsFrame:
  452. t.handleSettings(frame)
  453. case *http2.PingFrame:
  454. t.handlePing(frame)
  455. case *http2.WindowUpdateFrame:
  456. t.handleWindowUpdate(frame)
  457. case *http2.GoAwayFrame:
  458. // TODO: Handle GoAway from the client appropriately.
  459. default:
  460. errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
  461. }
  462. }
  463. }
  464. func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) {
  465. t.mu.Lock()
  466. defer t.mu.Unlock()
  467. if t.activeStreams == nil {
  468. // The transport is closing.
  469. return nil, false
  470. }
  471. s, ok := t.activeStreams[f.Header().StreamID]
  472. if !ok {
  473. // The stream is already done.
  474. return nil, false
  475. }
  476. return s, true
  477. }
  478. // adjustWindow sends out extra window update over the initial window size
  479. // of stream if the application is requesting data larger in size than
  480. // the window.
  481. func (t *http2Server) adjustWindow(s *Stream, n uint32) {
  482. if w := s.fc.maybeAdjust(n); w > 0 {
  483. t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
  484. }
  485. }
  486. // updateWindow adjusts the inbound quota for the stream and the transport.
  487. // Window updates will deliver to the controller for sending when
  488. // the cumulative quota exceeds the corresponding threshold.
  489. func (t *http2Server) updateWindow(s *Stream, n uint32) {
  490. if w := s.fc.onRead(n); w > 0 {
  491. t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id,
  492. increment: w,
  493. })
  494. }
  495. }
  496. // updateFlowControl updates the incoming flow control windows
  497. // for the transport and the stream based on the current bdp
  498. // estimation.
  499. func (t *http2Server) updateFlowControl(n uint32) {
  500. t.mu.Lock()
  501. for _, s := range t.activeStreams {
  502. s.fc.newLimit(n)
  503. }
  504. t.initialWindowSize = int32(n)
  505. t.mu.Unlock()
  506. t.controlBuf.put(&outgoingWindowUpdate{
  507. streamID: 0,
  508. increment: t.fc.newLimit(n),
  509. })
  510. t.controlBuf.put(&outgoingSettings{
  511. ss: []http2.Setting{
  512. {
  513. ID: http2.SettingInitialWindowSize,
  514. Val: n,
  515. },
  516. },
  517. })
  518. }
  519. func (t *http2Server) handleData(f *http2.DataFrame) {
  520. size := f.Header().Length
  521. var sendBDPPing bool
  522. if t.bdpEst != nil {
  523. sendBDPPing = t.bdpEst.add(size)
  524. }
  525. // Decouple connection's flow control from application's read.
  526. // An update on connection's flow control should not depend on
  527. // whether user application has read the data or not. Such a
  528. // restriction is already imposed on the stream's flow control,
  529. // and therefore the sender will be blocked anyways.
  530. // Decoupling the connection flow control will prevent other
  531. // active(fast) streams from starving in presence of slow or
  532. // inactive streams.
  533. if w := t.fc.onData(size); w > 0 {
  534. t.controlBuf.put(&outgoingWindowUpdate{
  535. streamID: 0,
  536. increment: w,
  537. })
  538. }
  539. if sendBDPPing {
  540. // Avoid excessive ping detection (e.g. in an L7 proxy)
  541. // by sending a window update prior to the BDP ping.
  542. if w := t.fc.reset(); w > 0 {
  543. t.controlBuf.put(&outgoingWindowUpdate{
  544. streamID: 0,
  545. increment: w,
  546. })
  547. }
  548. t.controlBuf.put(bdpPing)
  549. }
  550. // Select the right stream to dispatch.
  551. s, ok := t.getStream(f)
  552. if !ok {
  553. return
  554. }
  555. if size > 0 {
  556. if err := s.fc.onData(size); err != nil {
  557. t.closeStream(s, true, http2.ErrCodeFlowControl, false)
  558. return
  559. }
  560. if f.Header().Flags.Has(http2.FlagDataPadded) {
  561. if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
  562. t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
  563. }
  564. }
  565. // TODO(bradfitz, zhaoq): A copy is required here because there is no
  566. // guarantee f.Data() is consumed before the arrival of next frame.
  567. // Can this copy be eliminated?
  568. if len(f.Data()) > 0 {
  569. data := make([]byte, len(f.Data()))
  570. copy(data, f.Data())
  571. s.write(recvMsg{data: data})
  572. }
  573. }
  574. if f.Header().Flags.Has(http2.FlagDataEndStream) {
  575. // Received the end of stream from the client.
  576. s.compareAndSwapState(streamActive, streamReadDone)
  577. s.write(recvMsg{err: io.EOF})
  578. }
  579. }
  580. func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) {
  581. // If the stream is not deleted from the transport's active streams map, then do a regular close stream.
  582. if s, ok := t.getStream(f); ok {
  583. t.closeStream(s, false, 0, false)
  584. return
  585. }
  586. // If the stream is already deleted from the active streams map, then put a cleanupStream item into controlbuf to delete the stream from loopy writer's established streams map.
  587. t.controlBuf.put(&cleanupStream{
  588. streamID: f.Header().StreamID,
  589. rst: false,
  590. rstCode: 0,
  591. onWrite: func() {},
  592. })
  593. }
  594. func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
  595. if f.IsAck() {
  596. return
  597. }
  598. var ss []http2.Setting
  599. var updateFuncs []func()
  600. f.ForeachSetting(func(s http2.Setting) error {
  601. switch s.ID {
  602. case http2.SettingMaxHeaderListSize:
  603. updateFuncs = append(updateFuncs, func() {
  604. t.maxSendHeaderListSize = new(uint32)
  605. *t.maxSendHeaderListSize = s.Val
  606. })
  607. default:
  608. ss = append(ss, s)
  609. }
  610. return nil
  611. })
  612. t.controlBuf.executeAndPut(func(interface{}) bool {
  613. for _, f := range updateFuncs {
  614. f()
  615. }
  616. return true
  617. }, &incomingSettings{
  618. ss: ss,
  619. })
  620. }
  621. const (
  622. maxPingStrikes = 2
  623. defaultPingTimeout = 2 * time.Hour
  624. )
  625. func (t *http2Server) handlePing(f *http2.PingFrame) {
  626. if f.IsAck() {
  627. if f.Data == goAwayPing.data && t.drainChan != nil {
  628. close(t.drainChan)
  629. return
  630. }
  631. // Maybe it's a BDP ping.
  632. if t.bdpEst != nil {
  633. t.bdpEst.calculate(f.Data)
  634. }
  635. return
  636. }
  637. pingAck := &ping{ack: true}
  638. copy(pingAck.data[:], f.Data[:])
  639. t.controlBuf.put(pingAck)
  640. now := time.Now()
  641. defer func() {
  642. t.lastPingAt = now
  643. }()
  644. // A reset ping strikes means that we don't need to check for policy
  645. // violation for this ping and the pingStrikes counter should be set
  646. // to 0.
  647. if atomic.CompareAndSwapUint32(&t.resetPingStrikes, 1, 0) {
  648. t.pingStrikes = 0
  649. return
  650. }
  651. t.mu.Lock()
  652. ns := len(t.activeStreams)
  653. t.mu.Unlock()
  654. if ns < 1 && !t.kep.PermitWithoutStream {
  655. // Keepalive shouldn't be active thus, this new ping should
  656. // have come after at least defaultPingTimeout.
  657. if t.lastPingAt.Add(defaultPingTimeout).After(now) {
  658. t.pingStrikes++
  659. }
  660. } else {
  661. // Check if keepalive policy is respected.
  662. if t.lastPingAt.Add(t.kep.MinTime).After(now) {
  663. t.pingStrikes++
  664. }
  665. }
  666. if t.pingStrikes > maxPingStrikes {
  667. // Send goaway and close the connection.
  668. errorf("transport: Got too many pings from the client, closing the connection.")
  669. t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true})
  670. }
  671. }
  672. func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) {
  673. t.controlBuf.put(&incomingWindowUpdate{
  674. streamID: f.Header().StreamID,
  675. increment: f.Increment,
  676. })
  677. }
  678. func appendHeaderFieldsFromMD(headerFields []hpack.HeaderField, md metadata.MD) []hpack.HeaderField {
  679. for k, vv := range md {
  680. if isReservedHeader(k) {
  681. // Clients don't tolerate reading restricted headers after some non restricted ones were sent.
  682. continue
  683. }
  684. for _, v := range vv {
  685. headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
  686. }
  687. }
  688. return headerFields
  689. }
  690. func (t *http2Server) checkForHeaderListSize(it interface{}) bool {
  691. if t.maxSendHeaderListSize == nil {
  692. return true
  693. }
  694. hdrFrame := it.(*headerFrame)
  695. var sz int64
  696. for _, f := range hdrFrame.hf {
  697. if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
  698. errorf("header list size to send violates the maximum size (%d bytes) set by client", *t.maxSendHeaderListSize)
  699. return false
  700. }
  701. }
  702. return true
  703. }
  704. // WriteHeader sends the header metedata md back to the client.
  705. func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
  706. if s.updateHeaderSent() || s.getState() == streamDone {
  707. return ErrIllegalHeaderWrite
  708. }
  709. s.hdrMu.Lock()
  710. if md.Len() > 0 {
  711. if s.header.Len() > 0 {
  712. s.header = metadata.Join(s.header, md)
  713. } else {
  714. s.header = md
  715. }
  716. }
  717. if err := t.writeHeaderLocked(s); err != nil {
  718. s.hdrMu.Unlock()
  719. return err
  720. }
  721. s.hdrMu.Unlock()
  722. return nil
  723. }
  724. func (t *http2Server) writeHeaderLocked(s *Stream) error {
  725. // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
  726. // first and create a slice of that exact size.
  727. headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else.
  728. headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
  729. headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)})
  730. if s.sendCompress != "" {
  731. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
  732. }
  733. headerFields = appendHeaderFieldsFromMD(headerFields, s.header)
  734. success, err := t.controlBuf.executeAndPut(t.checkForHeaderListSize, &headerFrame{
  735. streamID: s.id,
  736. hf: headerFields,
  737. endStream: false,
  738. onWrite: func() {
  739. atomic.StoreUint32(&t.resetPingStrikes, 1)
  740. },
  741. })
  742. if !success {
  743. if err != nil {
  744. return err
  745. }
  746. t.closeStream(s, true, http2.ErrCodeInternal, false)
  747. return ErrHeaderListSizeLimitViolation
  748. }
  749. if t.stats != nil {
  750. // Note: WireLength is not set in outHeader.
  751. // TODO(mmukhi): Revisit this later, if needed.
  752. outHeader := &stats.OutHeader{}
  753. t.stats.HandleRPC(s.Context(), outHeader)
  754. }
  755. return nil
  756. }
  757. // WriteStatus sends stream status to the client and terminates the stream.
  758. // There is no further I/O operations being able to perform on this stream.
  759. // TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
  760. // OK is adopted.
  761. func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
  762. if s.getState() == streamDone {
  763. return nil
  764. }
  765. s.hdrMu.Lock()
  766. // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
  767. // first and create a slice of that exact size.
  768. headerFields := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else.
  769. if !s.updateHeaderSent() { // No headers have been sent.
  770. if len(s.header) > 0 { // Send a separate header frame.
  771. if err := t.writeHeaderLocked(s); err != nil {
  772. s.hdrMu.Unlock()
  773. return err
  774. }
  775. } else { // Send a trailer only response.
  776. headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
  777. headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)})
  778. }
  779. }
  780. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))})
  781. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())})
  782. if p := st.Proto(); p != nil && len(p.Details) > 0 {
  783. stBytes, err := proto.Marshal(p)
  784. if err != nil {
  785. // TODO: return error instead, when callers are able to handle it.
  786. grpclog.Errorf("transport: failed to marshal rpc status: %v, error: %v", p, err)
  787. } else {
  788. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader(stBytes)})
  789. }
  790. }
  791. // Attach the trailer metadata.
  792. headerFields = appendHeaderFieldsFromMD(headerFields, s.trailer)
  793. trailingHeader := &headerFrame{
  794. streamID: s.id,
  795. hf: headerFields,
  796. endStream: true,
  797. onWrite: func() {
  798. atomic.StoreUint32(&t.resetPingStrikes, 1)
  799. },
  800. }
  801. s.hdrMu.Unlock()
  802. success, err := t.controlBuf.execute(t.checkForHeaderListSize, trailingHeader)
  803. if !success {
  804. if err != nil {
  805. return err
  806. }
  807. t.closeStream(s, true, http2.ErrCodeInternal, false)
  808. return ErrHeaderListSizeLimitViolation
  809. }
  810. // Send a RST_STREAM after the trailers if the client has not already half-closed.
  811. rst := s.getState() == streamActive
  812. t.finishStream(s, rst, http2.ErrCodeNo, trailingHeader, true)
  813. if t.stats != nil {
  814. t.stats.HandleRPC(s.Context(), &stats.OutTrailer{})
  815. }
  816. return nil
  817. }
  818. // Write converts the data into HTTP2 data frame and sends it out. Non-nil error
  819. // is returns if it fails (e.g., framing error, transport error).
  820. func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
  821. if !s.isHeaderSent() { // Headers haven't been written yet.
  822. if err := t.WriteHeader(s, nil); err != nil {
  823. if _, ok := err.(ConnectionError); ok {
  824. return err
  825. }
  826. // TODO(mmukhi, dfawley): Make sure this is the right code to return.
  827. return status.Errorf(codes.Internal, "transport: %v", err)
  828. }
  829. } else {
  830. // Writing headers checks for this condition.
  831. if s.getState() == streamDone {
  832. // TODO(mmukhi, dfawley): Should the server write also return io.EOF?
  833. s.cancel()
  834. select {
  835. case <-t.ctx.Done():
  836. return ErrConnClosing
  837. default:
  838. }
  839. return ContextErr(s.ctx.Err())
  840. }
  841. }
  842. // Add some data to header frame so that we can equally distribute bytes across frames.
  843. emptyLen := http2MaxFrameLen - len(hdr)
  844. if emptyLen > len(data) {
  845. emptyLen = len(data)
  846. }
  847. hdr = append(hdr, data[:emptyLen]...)
  848. data = data[emptyLen:]
  849. df := &dataFrame{
  850. streamID: s.id,
  851. h: hdr,
  852. d: data,
  853. onEachWrite: func() {
  854. atomic.StoreUint32(&t.resetPingStrikes, 1)
  855. },
  856. }
  857. if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
  858. select {
  859. case <-t.ctx.Done():
  860. return ErrConnClosing
  861. default:
  862. }
  863. return ContextErr(s.ctx.Err())
  864. }
  865. return t.controlBuf.put(df)
  866. }
  867. // keepalive running in a separate goroutine does the following:
  868. // 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle.
  869. // 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge.
  870. // 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge.
  871. // 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-responsive connection
  872. // after an additional duration of keepalive.Timeout.
  873. func (t *http2Server) keepalive() {
  874. p := &ping{}
  875. var pingSent bool
  876. maxIdle := time.NewTimer(t.kp.MaxConnectionIdle)
  877. maxAge := time.NewTimer(t.kp.MaxConnectionAge)
  878. keepalive := time.NewTimer(t.kp.Time)
  879. // NOTE: All exit paths of this function should reset their
  880. // respective timers. A failure to do so will cause the
  881. // following clean-up to deadlock and eventually leak.
  882. defer func() {
  883. if !maxIdle.Stop() {
  884. <-maxIdle.C
  885. }
  886. if !maxAge.Stop() {
  887. <-maxAge.C
  888. }
  889. if !keepalive.Stop() {
  890. <-keepalive.C
  891. }
  892. }()
  893. for {
  894. select {
  895. case <-maxIdle.C:
  896. t.mu.Lock()
  897. idle := t.idle
  898. if idle.IsZero() { // The connection is non-idle.
  899. t.mu.Unlock()
  900. maxIdle.Reset(t.kp.MaxConnectionIdle)
  901. continue
  902. }
  903. val := t.kp.MaxConnectionIdle - time.Since(idle)
  904. t.mu.Unlock()
  905. if val <= 0 {
  906. // The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
  907. // Gracefully close the connection.
  908. t.drain(http2.ErrCodeNo, []byte{})
  909. // Resetting the timer so that the clean-up doesn't deadlock.
  910. maxIdle.Reset(infinity)
  911. return
  912. }
  913. maxIdle.Reset(val)
  914. case <-maxAge.C:
  915. t.drain(http2.ErrCodeNo, []byte{})
  916. maxAge.Reset(t.kp.MaxConnectionAgeGrace)
  917. select {
  918. case <-maxAge.C:
  919. // Close the connection after grace period.
  920. t.Close()
  921. // Resetting the timer so that the clean-up doesn't deadlock.
  922. maxAge.Reset(infinity)
  923. case <-t.ctx.Done():
  924. }
  925. return
  926. case <-keepalive.C:
  927. if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
  928. pingSent = false
  929. keepalive.Reset(t.kp.Time)
  930. continue
  931. }
  932. if pingSent {
  933. t.Close()
  934. // Resetting the timer so that the clean-up doesn't deadlock.
  935. keepalive.Reset(infinity)
  936. return
  937. }
  938. pingSent = true
  939. if channelz.IsOn() {
  940. atomic.AddInt64(&t.czData.kpCount, 1)
  941. }
  942. t.controlBuf.put(p)
  943. keepalive.Reset(t.kp.Timeout)
  944. case <-t.ctx.Done():
  945. return
  946. }
  947. }
  948. }
  949. // Close starts shutting down the http2Server transport.
  950. // TODO(zhaoq): Now the destruction is not blocked on any pending streams. This
  951. // could cause some resource issue. Revisit this later.
  952. func (t *http2Server) Close() error {
  953. t.mu.Lock()
  954. if t.state == closing {
  955. t.mu.Unlock()
  956. return errors.New("transport: Close() was already called")
  957. }
  958. t.state = closing
  959. streams := t.activeStreams
  960. t.activeStreams = nil
  961. t.mu.Unlock()
  962. t.controlBuf.finish()
  963. t.cancel()
  964. err := t.conn.Close()
  965. if channelz.IsOn() {
  966. channelz.RemoveEntry(t.channelzID)
  967. }
  968. // Cancel all active streams.
  969. for _, s := range streams {
  970. s.cancel()
  971. }
  972. if t.stats != nil {
  973. connEnd := &stats.ConnEnd{}
  974. t.stats.HandleConn(t.ctx, connEnd)
  975. }
  976. return err
  977. }
  978. // deleteStream deletes the stream s from transport's active streams.
  979. func (t *http2Server) deleteStream(s *Stream, eosReceived bool) (oldState streamState) {
  980. oldState = s.swapState(streamDone)
  981. if oldState == streamDone {
  982. // If the stream was already done, return.
  983. return oldState
  984. }
  985. // In case stream sending and receiving are invoked in separate
  986. // goroutines (e.g., bi-directional streaming), cancel needs to be
  987. // called to interrupt the potential blocking on other goroutines.
  988. s.cancel()
  989. t.mu.Lock()
  990. if _, ok := t.activeStreams[s.id]; ok {
  991. delete(t.activeStreams, s.id)
  992. if len(t.activeStreams) == 0 {
  993. t.idle = time.Now()
  994. }
  995. }
  996. t.mu.Unlock()
  997. if channelz.IsOn() {
  998. if eosReceived {
  999. atomic.AddInt64(&t.czData.streamsSucceeded, 1)
  1000. } else {
  1001. atomic.AddInt64(&t.czData.streamsFailed, 1)
  1002. }
  1003. }
  1004. return oldState
  1005. }
  1006. // finishStream closes the stream and puts the trailing headerFrame into controlbuf.
  1007. func (t *http2Server) finishStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
  1008. oldState := t.deleteStream(s, eosReceived)
  1009. // If the stream is already closed, then don't put trailing header to controlbuf.
  1010. if oldState == streamDone {
  1011. return
  1012. }
  1013. hdr.cleanup = &cleanupStream{
  1014. streamID: s.id,
  1015. rst: rst,
  1016. rstCode: rstCode,
  1017. onWrite: func() {},
  1018. }
  1019. t.controlBuf.put(hdr)
  1020. }
  1021. // closeStream clears the footprint of a stream when the stream is not needed any more.
  1022. func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, eosReceived bool) {
  1023. t.deleteStream(s, eosReceived)
  1024. t.controlBuf.put(&cleanupStream{
  1025. streamID: s.id,
  1026. rst: rst,
  1027. rstCode: rstCode,
  1028. onWrite: func() {},
  1029. })
  1030. }
  1031. func (t *http2Server) RemoteAddr() net.Addr {
  1032. return t.remoteAddr
  1033. }
  1034. func (t *http2Server) Drain() {
  1035. t.drain(http2.ErrCodeNo, []byte{})
  1036. }
  1037. func (t *http2Server) drain(code http2.ErrCode, debugData []byte) {
  1038. t.mu.Lock()
  1039. defer t.mu.Unlock()
  1040. if t.drainChan != nil {
  1041. return
  1042. }
  1043. t.drainChan = make(chan struct{})
  1044. t.controlBuf.put(&goAway{code: code, debugData: debugData, headsUp: true})
  1045. }
  1046. var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}
  1047. // Handles outgoing GoAway and returns true if loopy needs to put itself
  1048. // in draining mode.
  1049. func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
  1050. t.mu.Lock()
  1051. if t.state == closing { // TODO(mmukhi): This seems unnecessary.
  1052. t.mu.Unlock()
  1053. // The transport is closing.
  1054. return false, ErrConnClosing
  1055. }
  1056. sid := t.maxStreamID
  1057. if !g.headsUp {
  1058. // Stop accepting more streams now.
  1059. t.state = draining
  1060. if len(t.activeStreams) == 0 {
  1061. g.closeConn = true
  1062. }
  1063. t.mu.Unlock()
  1064. if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil {
  1065. return false, err
  1066. }
  1067. if g.closeConn {
  1068. // Abruptly close the connection following the GoAway (via
  1069. // loopywriter). But flush out what's inside the buffer first.
  1070. t.framer.writer.Flush()
  1071. return false, fmt.Errorf("transport: Connection closing")
  1072. }
  1073. return true, nil
  1074. }
  1075. t.mu.Unlock()
  1076. // For a graceful close, send out a GoAway with stream ID of MaxUInt32,
  1077. // Follow that with a ping and wait for the ack to come back or a timer
  1078. // to expire. During this time accept new streams since they might have
  1079. // originated before the GoAway reaches the client.
  1080. // After getting the ack or timer expiration send out another GoAway this
  1081. // time with an ID of the max stream server intends to process.
  1082. if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, []byte{}); err != nil {
  1083. return false, err
  1084. }
  1085. if err := t.framer.fr.WritePing(false, goAwayPing.data); err != nil {
  1086. return false, err
  1087. }
  1088. go func() {
  1089. timer := time.NewTimer(time.Minute)
  1090. defer timer.Stop()
  1091. select {
  1092. case <-t.drainChan:
  1093. case <-timer.C:
  1094. case <-t.ctx.Done():
  1095. return
  1096. }
  1097. t.controlBuf.put(&goAway{code: g.code, debugData: g.debugData})
  1098. }()
  1099. return false, nil
  1100. }
  1101. func (t *http2Server) ChannelzMetric() *channelz.SocketInternalMetric {
  1102. s := channelz.SocketInternalMetric{
  1103. StreamsStarted: atomic.LoadInt64(&t.czData.streamsStarted),
  1104. StreamsSucceeded: atomic.LoadInt64(&t.czData.streamsSucceeded),
  1105. StreamsFailed: atomic.LoadInt64(&t.czData.streamsFailed),
  1106. MessagesSent: atomic.LoadInt64(&t.czData.msgSent),
  1107. MessagesReceived: atomic.LoadInt64(&t.czData.msgRecv),
  1108. KeepAlivesSent: atomic.LoadInt64(&t.czData.kpCount),
  1109. LastRemoteStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)),
  1110. LastMessageSentTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)),
  1111. LastMessageReceivedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)),
  1112. LocalFlowControlWindow: int64(t.fc.getSize()),
  1113. SocketOptions: channelz.GetSocketOption(t.conn),
  1114. LocalAddr: t.localAddr,
  1115. RemoteAddr: t.remoteAddr,
  1116. // RemoteName :
  1117. }
  1118. if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok {
  1119. s.Security = au.GetSecurityValue()
  1120. }
  1121. s.RemoteFlowControlWindow = t.getOutFlowWindow()
  1122. return &s
  1123. }
  1124. func (t *http2Server) IncrMsgSent() {
  1125. atomic.AddInt64(&t.czData.msgSent, 1)
  1126. atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano())
  1127. }
  1128. func (t *http2Server) IncrMsgRecv() {
  1129. atomic.AddInt64(&t.czData.msgRecv, 1)
  1130. atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano())
  1131. }
  1132. func (t *http2Server) getOutFlowWindow() int64 {
  1133. resp := make(chan uint32, 1)
  1134. timer := time.NewTimer(time.Second)
  1135. defer timer.Stop()
  1136. t.controlBuf.put(&outFlowControlSizeRequest{resp})
  1137. select {
  1138. case sz := <-resp:
  1139. return int64(sz)
  1140. case <-t.ctxDone:
  1141. return -1
  1142. case <-timer.C:
  1143. return -2
  1144. }
  1145. }
  1146. func getJitter(v time.Duration) time.Duration {
  1147. if v == infinity {
  1148. return 0
  1149. }
  1150. // Generate a jitter between +/- 10% of the value.
  1151. r := int64(v / 10)
  1152. j := grpcrand.Int63n(2*r) - r
  1153. return time.Duration(j)
  1154. }