server.go 52 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741
  1. /*
  2. *
  3. * Copyright 2014 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package grpc
  19. import (
  20. "context"
  21. "errors"
  22. "fmt"
  23. "io"
  24. "math"
  25. "net"
  26. "net/http"
  27. "reflect"
  28. "runtime"
  29. "strings"
  30. "sync"
  31. "sync/atomic"
  32. "time"
  33. "golang.org/x/net/trace"
  34. "google.golang.org/grpc/codes"
  35. "google.golang.org/grpc/credentials"
  36. "google.golang.org/grpc/encoding"
  37. "google.golang.org/grpc/encoding/proto"
  38. "google.golang.org/grpc/grpclog"
  39. "google.golang.org/grpc/internal/binarylog"
  40. "google.golang.org/grpc/internal/channelz"
  41. "google.golang.org/grpc/internal/grpcrand"
  42. "google.golang.org/grpc/internal/grpcsync"
  43. "google.golang.org/grpc/internal/transport"
  44. "google.golang.org/grpc/keepalive"
  45. "google.golang.org/grpc/metadata"
  46. "google.golang.org/grpc/peer"
  47. "google.golang.org/grpc/stats"
  48. "google.golang.org/grpc/status"
  49. "google.golang.org/grpc/tap"
  50. )
  51. const (
  52. defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
  53. defaultServerMaxSendMessageSize = math.MaxInt32
  54. )
  55. var statusOK = status.New(codes.OK, "")
  56. var logger = grpclog.Component("core")
  57. type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error)
  58. // MethodDesc represents an RPC service's method specification.
  59. type MethodDesc struct {
  60. MethodName string
  61. Handler methodHandler
  62. }
  63. // ServiceDesc represents an RPC service's specification.
  64. type ServiceDesc struct {
  65. ServiceName string
  66. // The pointer to the service interface. Used to check whether the user
  67. // provided implementation satisfies the interface requirements.
  68. HandlerType interface{}
  69. Methods []MethodDesc
  70. Streams []StreamDesc
  71. Metadata interface{}
  72. }
  73. // service consists of the information of the server serving this service and
  74. // the methods in this service.
  75. type service struct {
  76. server interface{} // the server for service methods
  77. md map[string]*MethodDesc
  78. sd map[string]*StreamDesc
  79. mdata interface{}
  80. }
  81. type serverWorkerData struct {
  82. st transport.ServerTransport
  83. wg *sync.WaitGroup
  84. stream *transport.Stream
  85. }
  86. // Server is a gRPC server to serve RPC requests.
  87. type Server struct {
  88. opts serverOptions
  89. mu sync.Mutex // guards following
  90. lis map[net.Listener]bool
  91. conns map[transport.ServerTransport]bool
  92. serve bool
  93. drain bool
  94. cv *sync.Cond // signaled when connections close for GracefulStop
  95. m map[string]*service // service name -> service info
  96. events trace.EventLog
  97. quit *grpcsync.Event
  98. done *grpcsync.Event
  99. channelzRemoveOnce sync.Once
  100. serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop
  101. channelzID int64 // channelz unique identification number
  102. czData *channelzData
  103. serverWorkerChannels []chan *serverWorkerData
  104. }
  105. type serverOptions struct {
  106. creds credentials.TransportCredentials
  107. codec baseCodec
  108. cp Compressor
  109. dc Decompressor
  110. unaryInt UnaryServerInterceptor
  111. streamInt StreamServerInterceptor
  112. chainUnaryInts []UnaryServerInterceptor
  113. chainStreamInts []StreamServerInterceptor
  114. inTapHandle tap.ServerInHandle
  115. statsHandler stats.Handler
  116. maxConcurrentStreams uint32
  117. maxReceiveMessageSize int
  118. maxSendMessageSize int
  119. unknownStreamDesc *StreamDesc
  120. keepaliveParams keepalive.ServerParameters
  121. keepalivePolicy keepalive.EnforcementPolicy
  122. initialWindowSize int32
  123. initialConnWindowSize int32
  124. writeBufferSize int
  125. readBufferSize int
  126. connectionTimeout time.Duration
  127. maxHeaderListSize *uint32
  128. headerTableSize *uint32
  129. numServerWorkers uint32
  130. }
  131. var defaultServerOptions = serverOptions{
  132. maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
  133. maxSendMessageSize: defaultServerMaxSendMessageSize,
  134. connectionTimeout: 120 * time.Second,
  135. writeBufferSize: defaultWriteBufSize,
  136. readBufferSize: defaultReadBufSize,
  137. }
  138. // A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
  139. type ServerOption interface {
  140. apply(*serverOptions)
  141. }
  142. // EmptyServerOption does not alter the server configuration. It can be embedded
  143. // in another structure to build custom server options.
  144. //
  145. // This API is EXPERIMENTAL.
  146. type EmptyServerOption struct{}
  147. func (EmptyServerOption) apply(*serverOptions) {}
  148. // funcServerOption wraps a function that modifies serverOptions into an
  149. // implementation of the ServerOption interface.
  150. type funcServerOption struct {
  151. f func(*serverOptions)
  152. }
  153. func (fdo *funcServerOption) apply(do *serverOptions) {
  154. fdo.f(do)
  155. }
  156. func newFuncServerOption(f func(*serverOptions)) *funcServerOption {
  157. return &funcServerOption{
  158. f: f,
  159. }
  160. }
  161. // WriteBufferSize determines how much data can be batched before doing a write on the wire.
  162. // The corresponding memory allocation for this buffer will be twice the size to keep syscalls low.
  163. // The default value for this buffer is 32KB.
  164. // Zero will disable the write buffer such that each write will be on underlying connection.
  165. // Note: A Send call may not directly translate to a write.
  166. func WriteBufferSize(s int) ServerOption {
  167. return newFuncServerOption(func(o *serverOptions) {
  168. o.writeBufferSize = s
  169. })
  170. }
  171. // ReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most
  172. // for one read syscall.
  173. // The default value for this buffer is 32KB.
  174. // Zero will disable read buffer for a connection so data framer can access the underlying
  175. // conn directly.
  176. func ReadBufferSize(s int) ServerOption {
  177. return newFuncServerOption(func(o *serverOptions) {
  178. o.readBufferSize = s
  179. })
  180. }
  181. // InitialWindowSize returns a ServerOption that sets window size for stream.
  182. // The lower bound for window size is 64K and any value smaller than that will be ignored.
  183. func InitialWindowSize(s int32) ServerOption {
  184. return newFuncServerOption(func(o *serverOptions) {
  185. o.initialWindowSize = s
  186. })
  187. }
  188. // InitialConnWindowSize returns a ServerOption that sets window size for a connection.
  189. // The lower bound for window size is 64K and any value smaller than that will be ignored.
  190. func InitialConnWindowSize(s int32) ServerOption {
  191. return newFuncServerOption(func(o *serverOptions) {
  192. o.initialConnWindowSize = s
  193. })
  194. }
  195. // KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
  196. func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
  197. if kp.Time > 0 && kp.Time < time.Second {
  198. logger.Warning("Adjusting keepalive ping interval to minimum period of 1s")
  199. kp.Time = time.Second
  200. }
  201. return newFuncServerOption(func(o *serverOptions) {
  202. o.keepaliveParams = kp
  203. })
  204. }
  205. // KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server.
  206. func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption {
  207. return newFuncServerOption(func(o *serverOptions) {
  208. o.keepalivePolicy = kep
  209. })
  210. }
  211. // CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
  212. //
  213. // This will override any lookups by content-subtype for Codecs registered with RegisterCodec.
  214. //
  215. // Deprecated: register codecs using encoding.RegisterCodec. The server will
  216. // automatically use registered codecs based on the incoming requests' headers.
  217. // See also
  218. // https://github.com/grpc/grpc-go/blob/master/Documentation/encoding.md#using-a-codec.
  219. // Will be supported throughout 1.x.
  220. func CustomCodec(codec Codec) ServerOption {
  221. return newFuncServerOption(func(o *serverOptions) {
  222. o.codec = codec
  223. })
  224. }
  225. // RPCCompressor returns a ServerOption that sets a compressor for outbound
  226. // messages. For backward compatibility, all outbound messages will be sent
  227. // using this compressor, regardless of incoming message compression. By
  228. // default, server messages will be sent using the same compressor with which
  229. // request messages were sent.
  230. //
  231. // Deprecated: use encoding.RegisterCompressor instead. Will be supported
  232. // throughout 1.x.
  233. func RPCCompressor(cp Compressor) ServerOption {
  234. return newFuncServerOption(func(o *serverOptions) {
  235. o.cp = cp
  236. })
  237. }
  238. // RPCDecompressor returns a ServerOption that sets a decompressor for inbound
  239. // messages. It has higher priority than decompressors registered via
  240. // encoding.RegisterCompressor.
  241. //
  242. // Deprecated: use encoding.RegisterCompressor instead. Will be supported
  243. // throughout 1.x.
  244. func RPCDecompressor(dc Decompressor) ServerOption {
  245. return newFuncServerOption(func(o *serverOptions) {
  246. o.dc = dc
  247. })
  248. }
  249. // MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
  250. // If this is not set, gRPC uses the default limit.
  251. //
  252. // Deprecated: use MaxRecvMsgSize instead. Will be supported throughout 1.x.
  253. func MaxMsgSize(m int) ServerOption {
  254. return MaxRecvMsgSize(m)
  255. }
  256. // MaxRecvMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
  257. // If this is not set, gRPC uses the default 4MB.
  258. func MaxRecvMsgSize(m int) ServerOption {
  259. return newFuncServerOption(func(o *serverOptions) {
  260. o.maxReceiveMessageSize = m
  261. })
  262. }
  263. // MaxSendMsgSize returns a ServerOption to set the max message size in bytes the server can send.
  264. // If this is not set, gRPC uses the default `math.MaxInt32`.
  265. func MaxSendMsgSize(m int) ServerOption {
  266. return newFuncServerOption(func(o *serverOptions) {
  267. o.maxSendMessageSize = m
  268. })
  269. }
  270. // MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
  271. // of concurrent streams to each ServerTransport.
  272. func MaxConcurrentStreams(n uint32) ServerOption {
  273. return newFuncServerOption(func(o *serverOptions) {
  274. o.maxConcurrentStreams = n
  275. })
  276. }
  277. // Creds returns a ServerOption that sets credentials for server connections.
  278. func Creds(c credentials.TransportCredentials) ServerOption {
  279. return newFuncServerOption(func(o *serverOptions) {
  280. o.creds = c
  281. })
  282. }
  283. // UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the
  284. // server. Only one unary interceptor can be installed. The construction of multiple
  285. // interceptors (e.g., chaining) can be implemented at the caller.
  286. func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
  287. return newFuncServerOption(func(o *serverOptions) {
  288. if o.unaryInt != nil {
  289. panic("The unary server interceptor was already set and may not be reset.")
  290. }
  291. o.unaryInt = i
  292. })
  293. }
  294. // ChainUnaryInterceptor returns a ServerOption that specifies the chained interceptor
  295. // for unary RPCs. The first interceptor will be the outer most,
  296. // while the last interceptor will be the inner most wrapper around the real call.
  297. // All unary interceptors added by this method will be chained.
  298. func ChainUnaryInterceptor(interceptors ...UnaryServerInterceptor) ServerOption {
  299. return newFuncServerOption(func(o *serverOptions) {
  300. o.chainUnaryInts = append(o.chainUnaryInts, interceptors...)
  301. })
  302. }
  303. // StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the
  304. // server. Only one stream interceptor can be installed.
  305. func StreamInterceptor(i StreamServerInterceptor) ServerOption {
  306. return newFuncServerOption(func(o *serverOptions) {
  307. if o.streamInt != nil {
  308. panic("The stream server interceptor was already set and may not be reset.")
  309. }
  310. o.streamInt = i
  311. })
  312. }
  313. // ChainStreamInterceptor returns a ServerOption that specifies the chained interceptor
  314. // for streaming RPCs. The first interceptor will be the outer most,
  315. // while the last interceptor will be the inner most wrapper around the real call.
  316. // All stream interceptors added by this method will be chained.
  317. func ChainStreamInterceptor(interceptors ...StreamServerInterceptor) ServerOption {
  318. return newFuncServerOption(func(o *serverOptions) {
  319. o.chainStreamInts = append(o.chainStreamInts, interceptors...)
  320. })
  321. }
  322. // InTapHandle returns a ServerOption that sets the tap handle for all the server
  323. // transport to be created. Only one can be installed.
  324. func InTapHandle(h tap.ServerInHandle) ServerOption {
  325. return newFuncServerOption(func(o *serverOptions) {
  326. if o.inTapHandle != nil {
  327. panic("The tap handle was already set and may not be reset.")
  328. }
  329. o.inTapHandle = h
  330. })
  331. }
  332. // StatsHandler returns a ServerOption that sets the stats handler for the server.
  333. func StatsHandler(h stats.Handler) ServerOption {
  334. return newFuncServerOption(func(o *serverOptions) {
  335. o.statsHandler = h
  336. })
  337. }
  338. // UnknownServiceHandler returns a ServerOption that allows for adding a custom
  339. // unknown service handler. The provided method is a bidi-streaming RPC service
  340. // handler that will be invoked instead of returning the "unimplemented" gRPC
  341. // error whenever a request is received for an unregistered service or method.
  342. // The handling function and stream interceptor (if set) have full access to
  343. // the ServerStream, including its Context.
  344. func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
  345. return newFuncServerOption(func(o *serverOptions) {
  346. o.unknownStreamDesc = &StreamDesc{
  347. StreamName: "unknown_service_handler",
  348. Handler: streamHandler,
  349. // We need to assume that the users of the streamHandler will want to use both.
  350. ClientStreams: true,
  351. ServerStreams: true,
  352. }
  353. })
  354. }
  355. // ConnectionTimeout returns a ServerOption that sets the timeout for
  356. // connection establishment (up to and including HTTP/2 handshaking) for all
  357. // new connections. If this is not set, the default is 120 seconds. A zero or
  358. // negative value will result in an immediate timeout.
  359. //
  360. // This API is EXPERIMENTAL.
  361. func ConnectionTimeout(d time.Duration) ServerOption {
  362. return newFuncServerOption(func(o *serverOptions) {
  363. o.connectionTimeout = d
  364. })
  365. }
  366. // MaxHeaderListSize returns a ServerOption that sets the max (uncompressed) size
  367. // of header list that the server is prepared to accept.
  368. func MaxHeaderListSize(s uint32) ServerOption {
  369. return newFuncServerOption(func(o *serverOptions) {
  370. o.maxHeaderListSize = &s
  371. })
  372. }
  373. // HeaderTableSize returns a ServerOption that sets the size of dynamic
  374. // header table for stream.
  375. //
  376. // This API is EXPERIMENTAL.
  377. func HeaderTableSize(s uint32) ServerOption {
  378. return newFuncServerOption(func(o *serverOptions) {
  379. o.headerTableSize = &s
  380. })
  381. }
  382. // NumStreamWorkers returns a ServerOption that sets the number of worker
  383. // goroutines that should be used to process incoming streams. Setting this to
  384. // zero (default) will disable workers and spawn a new goroutine for each
  385. // stream.
  386. //
  387. // This API is EXPERIMENTAL.
  388. func NumStreamWorkers(numServerWorkers uint32) ServerOption {
  389. // TODO: If/when this API gets stabilized (i.e. stream workers become the
  390. // only way streams are processed), change the behavior of the zero value to
  391. // a sane default. Preliminary experiments suggest that a value equal to the
  392. // number of CPUs available is most performant; requires thorough testing.
  393. return newFuncServerOption(func(o *serverOptions) {
  394. o.numServerWorkers = numServerWorkers
  395. })
  396. }
  397. // serverWorkerResetThreshold defines how often the stack must be reset. Every
  398. // N requests, by spawning a new goroutine in its place, a worker can reset its
  399. // stack so that large stacks don't live in memory forever. 2^16 should allow
  400. // each goroutine stack to live for at least a few seconds in a typical
  401. // workload (assuming a QPS of a few thousand requests/sec).
  402. const serverWorkerResetThreshold = 1 << 16
  403. // serverWorkers blocks on a *transport.Stream channel forever and waits for
  404. // data to be fed by serveStreams. This allows different requests to be
  405. // processed by the same goroutine, removing the need for expensive stack
  406. // re-allocations (see the runtime.morestack problem [1]).
  407. //
  408. // [1] https://github.com/golang/go/issues/18138
  409. func (s *Server) serverWorker(ch chan *serverWorkerData) {
  410. // To make sure all server workers don't reset at the same time, choose a
  411. // random number of iterations before resetting.
  412. threshold := serverWorkerResetThreshold + grpcrand.Intn(serverWorkerResetThreshold)
  413. for completed := 0; completed < threshold; completed++ {
  414. data, ok := <-ch
  415. if !ok {
  416. return
  417. }
  418. s.handleStream(data.st, data.stream, s.traceInfo(data.st, data.stream))
  419. data.wg.Done()
  420. }
  421. go s.serverWorker(ch)
  422. }
  423. // initServerWorkers creates worker goroutines and channels to process incoming
  424. // connections to reduce the time spent overall on runtime.morestack.
  425. func (s *Server) initServerWorkers() {
  426. s.serverWorkerChannels = make([]chan *serverWorkerData, s.opts.numServerWorkers)
  427. for i := uint32(0); i < s.opts.numServerWorkers; i++ {
  428. s.serverWorkerChannels[i] = make(chan *serverWorkerData)
  429. go s.serverWorker(s.serverWorkerChannels[i])
  430. }
  431. }
  432. func (s *Server) stopServerWorkers() {
  433. for i := uint32(0); i < s.opts.numServerWorkers; i++ {
  434. close(s.serverWorkerChannels[i])
  435. }
  436. }
  437. // NewServer creates a gRPC server which has no service registered and has not
  438. // started to accept requests yet.
  439. func NewServer(opt ...ServerOption) *Server {
  440. opts := defaultServerOptions
  441. for _, o := range opt {
  442. o.apply(&opts)
  443. }
  444. s := &Server{
  445. lis: make(map[net.Listener]bool),
  446. opts: opts,
  447. conns: make(map[transport.ServerTransport]bool),
  448. m: make(map[string]*service),
  449. quit: grpcsync.NewEvent(),
  450. done: grpcsync.NewEvent(),
  451. czData: new(channelzData),
  452. }
  453. chainUnaryServerInterceptors(s)
  454. chainStreamServerInterceptors(s)
  455. s.cv = sync.NewCond(&s.mu)
  456. if EnableTracing {
  457. _, file, line, _ := runtime.Caller(1)
  458. s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
  459. }
  460. if s.opts.numServerWorkers > 0 {
  461. s.initServerWorkers()
  462. }
  463. if channelz.IsOn() {
  464. s.channelzID = channelz.RegisterServer(&channelzServer{s}, "")
  465. }
  466. return s
  467. }
  468. // printf records an event in s's event log, unless s has been stopped.
  469. // REQUIRES s.mu is held.
  470. func (s *Server) printf(format string, a ...interface{}) {
  471. if s.events != nil {
  472. s.events.Printf(format, a...)
  473. }
  474. }
  475. // errorf records an error in s's event log, unless s has been stopped.
  476. // REQUIRES s.mu is held.
  477. func (s *Server) errorf(format string, a ...interface{}) {
  478. if s.events != nil {
  479. s.events.Errorf(format, a...)
  480. }
  481. }
  482. // RegisterService registers a service and its implementation to the gRPC
  483. // server. It is called from the IDL generated code. This must be called before
  484. // invoking Serve.
  485. func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
  486. ht := reflect.TypeOf(sd.HandlerType).Elem()
  487. st := reflect.TypeOf(ss)
  488. if !st.Implements(ht) {
  489. logger.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
  490. }
  491. s.register(sd, ss)
  492. }
  493. func (s *Server) register(sd *ServiceDesc, ss interface{}) {
  494. s.mu.Lock()
  495. defer s.mu.Unlock()
  496. s.printf("RegisterService(%q)", sd.ServiceName)
  497. if s.serve {
  498. logger.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
  499. }
  500. if _, ok := s.m[sd.ServiceName]; ok {
  501. logger.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
  502. }
  503. srv := &service{
  504. server: ss,
  505. md: make(map[string]*MethodDesc),
  506. sd: make(map[string]*StreamDesc),
  507. mdata: sd.Metadata,
  508. }
  509. for i := range sd.Methods {
  510. d := &sd.Methods[i]
  511. srv.md[d.MethodName] = d
  512. }
  513. for i := range sd.Streams {
  514. d := &sd.Streams[i]
  515. srv.sd[d.StreamName] = d
  516. }
  517. s.m[sd.ServiceName] = srv
  518. }
  519. // MethodInfo contains the information of an RPC including its method name and type.
  520. type MethodInfo struct {
  521. // Name is the method name only, without the service name or package name.
  522. Name string
  523. // IsClientStream indicates whether the RPC is a client streaming RPC.
  524. IsClientStream bool
  525. // IsServerStream indicates whether the RPC is a server streaming RPC.
  526. IsServerStream bool
  527. }
  528. // ServiceInfo contains unary RPC method info, streaming RPC method info and metadata for a service.
  529. type ServiceInfo struct {
  530. Methods []MethodInfo
  531. // Metadata is the metadata specified in ServiceDesc when registering service.
  532. Metadata interface{}
  533. }
  534. // GetServiceInfo returns a map from service names to ServiceInfo.
  535. // Service names include the package names, in the form of <package>.<service>.
  536. func (s *Server) GetServiceInfo() map[string]ServiceInfo {
  537. ret := make(map[string]ServiceInfo)
  538. for n, srv := range s.m {
  539. methods := make([]MethodInfo, 0, len(srv.md)+len(srv.sd))
  540. for m := range srv.md {
  541. methods = append(methods, MethodInfo{
  542. Name: m,
  543. IsClientStream: false,
  544. IsServerStream: false,
  545. })
  546. }
  547. for m, d := range srv.sd {
  548. methods = append(methods, MethodInfo{
  549. Name: m,
  550. IsClientStream: d.ClientStreams,
  551. IsServerStream: d.ServerStreams,
  552. })
  553. }
  554. ret[n] = ServiceInfo{
  555. Methods: methods,
  556. Metadata: srv.mdata,
  557. }
  558. }
  559. return ret
  560. }
  561. // ErrServerStopped indicates that the operation is now illegal because of
  562. // the server being stopped.
  563. var ErrServerStopped = errors.New("grpc: the server has been stopped")
  564. func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
  565. if s.opts.creds == nil {
  566. return rawConn, nil, nil
  567. }
  568. return s.opts.creds.ServerHandshake(rawConn)
  569. }
  570. type listenSocket struct {
  571. net.Listener
  572. channelzID int64
  573. }
  574. func (l *listenSocket) ChannelzMetric() *channelz.SocketInternalMetric {
  575. return &channelz.SocketInternalMetric{
  576. SocketOptions: channelz.GetSocketOption(l.Listener),
  577. LocalAddr: l.Listener.Addr(),
  578. }
  579. }
  580. func (l *listenSocket) Close() error {
  581. err := l.Listener.Close()
  582. if channelz.IsOn() {
  583. channelz.RemoveEntry(l.channelzID)
  584. }
  585. return err
  586. }
  587. // Serve accepts incoming connections on the listener lis, creating a new
  588. // ServerTransport and service goroutine for each. The service goroutines
  589. // read gRPC requests and then call the registered handlers to reply to them.
  590. // Serve returns when lis.Accept fails with fatal errors. lis will be closed when
  591. // this method returns.
  592. // Serve will return a non-nil error unless Stop or GracefulStop is called.
  593. func (s *Server) Serve(lis net.Listener) error {
  594. s.mu.Lock()
  595. s.printf("serving")
  596. s.serve = true
  597. if s.lis == nil {
  598. // Serve called after Stop or GracefulStop.
  599. s.mu.Unlock()
  600. lis.Close()
  601. return ErrServerStopped
  602. }
  603. s.serveWG.Add(1)
  604. defer func() {
  605. s.serveWG.Done()
  606. if s.quit.HasFired() {
  607. // Stop or GracefulStop called; block until done and return nil.
  608. <-s.done.Done()
  609. }
  610. }()
  611. ls := &listenSocket{Listener: lis}
  612. s.lis[ls] = true
  613. if channelz.IsOn() {
  614. ls.channelzID = channelz.RegisterListenSocket(ls, s.channelzID, lis.Addr().String())
  615. }
  616. s.mu.Unlock()
  617. defer func() {
  618. s.mu.Lock()
  619. if s.lis != nil && s.lis[ls] {
  620. ls.Close()
  621. delete(s.lis, ls)
  622. }
  623. s.mu.Unlock()
  624. }()
  625. var tempDelay time.Duration // how long to sleep on accept failure
  626. for {
  627. rawConn, err := lis.Accept()
  628. if err != nil {
  629. if ne, ok := err.(interface {
  630. Temporary() bool
  631. }); ok && ne.Temporary() {
  632. if tempDelay == 0 {
  633. tempDelay = 5 * time.Millisecond
  634. } else {
  635. tempDelay *= 2
  636. }
  637. if max := 1 * time.Second; tempDelay > max {
  638. tempDelay = max
  639. }
  640. s.mu.Lock()
  641. s.printf("Accept error: %v; retrying in %v", err, tempDelay)
  642. s.mu.Unlock()
  643. timer := time.NewTimer(tempDelay)
  644. select {
  645. case <-timer.C:
  646. case <-s.quit.Done():
  647. timer.Stop()
  648. return nil
  649. }
  650. continue
  651. }
  652. s.mu.Lock()
  653. s.printf("done serving; Accept = %v", err)
  654. s.mu.Unlock()
  655. if s.quit.HasFired() {
  656. return nil
  657. }
  658. return err
  659. }
  660. tempDelay = 0
  661. // Start a new goroutine to deal with rawConn so we don't stall this Accept
  662. // loop goroutine.
  663. //
  664. // Make sure we account for the goroutine so GracefulStop doesn't nil out
  665. // s.conns before this conn can be added.
  666. s.serveWG.Add(1)
  667. go func() {
  668. s.handleRawConn(rawConn)
  669. s.serveWG.Done()
  670. }()
  671. }
  672. }
  673. // handleRawConn forks a goroutine to handle a just-accepted connection that
  674. // has not had any I/O performed on it yet.
  675. func (s *Server) handleRawConn(rawConn net.Conn) {
  676. if s.quit.HasFired() {
  677. rawConn.Close()
  678. return
  679. }
  680. rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
  681. conn, authInfo, err := s.useTransportAuthenticator(rawConn)
  682. if err != nil {
  683. // ErrConnDispatched means that the connection was dispatched away from
  684. // gRPC; those connections should be left open.
  685. if err != credentials.ErrConnDispatched {
  686. s.mu.Lock()
  687. s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
  688. s.mu.Unlock()
  689. channelz.Warningf(logger, s.channelzID, "grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
  690. rawConn.Close()
  691. }
  692. rawConn.SetDeadline(time.Time{})
  693. return
  694. }
  695. // Finish handshaking (HTTP2)
  696. st := s.newHTTP2Transport(conn, authInfo)
  697. if st == nil {
  698. return
  699. }
  700. rawConn.SetDeadline(time.Time{})
  701. if !s.addConn(st) {
  702. return
  703. }
  704. go func() {
  705. s.serveStreams(st)
  706. s.removeConn(st)
  707. }()
  708. }
  709. // newHTTP2Transport sets up a http/2 transport (using the
  710. // gRPC http2 server transport in transport/http2_server.go).
  711. func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) transport.ServerTransport {
  712. config := &transport.ServerConfig{
  713. MaxStreams: s.opts.maxConcurrentStreams,
  714. AuthInfo: authInfo,
  715. InTapHandle: s.opts.inTapHandle,
  716. StatsHandler: s.opts.statsHandler,
  717. KeepaliveParams: s.opts.keepaliveParams,
  718. KeepalivePolicy: s.opts.keepalivePolicy,
  719. InitialWindowSize: s.opts.initialWindowSize,
  720. InitialConnWindowSize: s.opts.initialConnWindowSize,
  721. WriteBufferSize: s.opts.writeBufferSize,
  722. ReadBufferSize: s.opts.readBufferSize,
  723. ChannelzParentID: s.channelzID,
  724. MaxHeaderListSize: s.opts.maxHeaderListSize,
  725. HeaderTableSize: s.opts.headerTableSize,
  726. }
  727. st, err := transport.NewServerTransport("http2", c, config)
  728. if err != nil {
  729. s.mu.Lock()
  730. s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
  731. s.mu.Unlock()
  732. c.Close()
  733. channelz.Warning(logger, s.channelzID, "grpc: Server.Serve failed to create ServerTransport: ", err)
  734. return nil
  735. }
  736. return st
  737. }
  738. func (s *Server) serveStreams(st transport.ServerTransport) {
  739. defer st.Close()
  740. var wg sync.WaitGroup
  741. var roundRobinCounter uint32
  742. st.HandleStreams(func(stream *transport.Stream) {
  743. wg.Add(1)
  744. if s.opts.numServerWorkers > 0 {
  745. data := &serverWorkerData{st: st, wg: &wg, stream: stream}
  746. select {
  747. case s.serverWorkerChannels[atomic.AddUint32(&roundRobinCounter, 1)%s.opts.numServerWorkers] <- data:
  748. default:
  749. // If all stream workers are busy, fallback to the default code path.
  750. go func() {
  751. s.handleStream(st, stream, s.traceInfo(st, stream))
  752. wg.Done()
  753. }()
  754. }
  755. } else {
  756. go func() {
  757. defer wg.Done()
  758. s.handleStream(st, stream, s.traceInfo(st, stream))
  759. }()
  760. }
  761. }, func(ctx context.Context, method string) context.Context {
  762. if !EnableTracing {
  763. return ctx
  764. }
  765. tr := trace.New("grpc.Recv."+methodFamily(method), method)
  766. return trace.NewContext(ctx, tr)
  767. })
  768. wg.Wait()
  769. }
  770. var _ http.Handler = (*Server)(nil)
  771. // ServeHTTP implements the Go standard library's http.Handler
  772. // interface by responding to the gRPC request r, by looking up
  773. // the requested gRPC method in the gRPC server s.
  774. //
  775. // The provided HTTP request must have arrived on an HTTP/2
  776. // connection. When using the Go standard library's server,
  777. // practically this means that the Request must also have arrived
  778. // over TLS.
  779. //
  780. // To share one port (such as 443 for https) between gRPC and an
  781. // existing http.Handler, use a root http.Handler such as:
  782. //
  783. // if r.ProtoMajor == 2 && strings.HasPrefix(
  784. // r.Header.Get("Content-Type"), "application/grpc") {
  785. // grpcServer.ServeHTTP(w, r)
  786. // } else {
  787. // yourMux.ServeHTTP(w, r)
  788. // }
  789. //
  790. // Note that ServeHTTP uses Go's HTTP/2 server implementation which is totally
  791. // separate from grpc-go's HTTP/2 server. Performance and features may vary
  792. // between the two paths. ServeHTTP does not support some gRPC features
  793. // available through grpc-go's HTTP/2 server, and it is currently EXPERIMENTAL
  794. // and subject to change.
  795. func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  796. st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandler)
  797. if err != nil {
  798. http.Error(w, err.Error(), http.StatusInternalServerError)
  799. return
  800. }
  801. if !s.addConn(st) {
  802. return
  803. }
  804. defer s.removeConn(st)
  805. s.serveStreams(st)
  806. }
  807. // traceInfo returns a traceInfo and associates it with stream, if tracing is enabled.
  808. // If tracing is not enabled, it returns nil.
  809. func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) {
  810. if !EnableTracing {
  811. return nil
  812. }
  813. tr, ok := trace.FromContext(stream.Context())
  814. if !ok {
  815. return nil
  816. }
  817. trInfo = &traceInfo{
  818. tr: tr,
  819. firstLine: firstLine{
  820. client: false,
  821. remoteAddr: st.RemoteAddr(),
  822. },
  823. }
  824. if dl, ok := stream.Context().Deadline(); ok {
  825. trInfo.firstLine.deadline = time.Until(dl)
  826. }
  827. return trInfo
  828. }
  829. func (s *Server) addConn(st transport.ServerTransport) bool {
  830. s.mu.Lock()
  831. defer s.mu.Unlock()
  832. if s.conns == nil {
  833. st.Close()
  834. return false
  835. }
  836. if s.drain {
  837. // Transport added after we drained our existing conns: drain it
  838. // immediately.
  839. st.Drain()
  840. }
  841. s.conns[st] = true
  842. return true
  843. }
  844. func (s *Server) removeConn(st transport.ServerTransport) {
  845. s.mu.Lock()
  846. defer s.mu.Unlock()
  847. if s.conns != nil {
  848. delete(s.conns, st)
  849. s.cv.Broadcast()
  850. }
  851. }
  852. func (s *Server) channelzMetric() *channelz.ServerInternalMetric {
  853. return &channelz.ServerInternalMetric{
  854. CallsStarted: atomic.LoadInt64(&s.czData.callsStarted),
  855. CallsSucceeded: atomic.LoadInt64(&s.czData.callsSucceeded),
  856. CallsFailed: atomic.LoadInt64(&s.czData.callsFailed),
  857. LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&s.czData.lastCallStartedTime)),
  858. }
  859. }
  860. func (s *Server) incrCallsStarted() {
  861. atomic.AddInt64(&s.czData.callsStarted, 1)
  862. atomic.StoreInt64(&s.czData.lastCallStartedTime, time.Now().UnixNano())
  863. }
  864. func (s *Server) incrCallsSucceeded() {
  865. atomic.AddInt64(&s.czData.callsSucceeded, 1)
  866. }
  867. func (s *Server) incrCallsFailed() {
  868. atomic.AddInt64(&s.czData.callsFailed, 1)
  869. }
  870. func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {
  871. data, err := encode(s.getCodec(stream.ContentSubtype()), msg)
  872. if err != nil {
  873. channelz.Error(logger, s.channelzID, "grpc: server failed to encode response: ", err)
  874. return err
  875. }
  876. compData, err := compress(data, cp, comp)
  877. if err != nil {
  878. channelz.Error(logger, s.channelzID, "grpc: server failed to compress response: ", err)
  879. return err
  880. }
  881. hdr, payload := msgHeader(data, compData)
  882. // TODO(dfawley): should we be checking len(data) instead?
  883. if len(payload) > s.opts.maxSendMessageSize {
  884. return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(payload), s.opts.maxSendMessageSize)
  885. }
  886. err = t.Write(stream, hdr, payload, opts)
  887. if err == nil && s.opts.statsHandler != nil {
  888. s.opts.statsHandler.HandleRPC(stream.Context(), outPayload(false, msg, data, payload, time.Now()))
  889. }
  890. return err
  891. }
  892. // chainUnaryServerInterceptors chains all unary server interceptors into one.
  893. func chainUnaryServerInterceptors(s *Server) {
  894. // Prepend opts.unaryInt to the chaining interceptors if it exists, since unaryInt will
  895. // be executed before any other chained interceptors.
  896. interceptors := s.opts.chainUnaryInts
  897. if s.opts.unaryInt != nil {
  898. interceptors = append([]UnaryServerInterceptor{s.opts.unaryInt}, s.opts.chainUnaryInts...)
  899. }
  900. var chainedInt UnaryServerInterceptor
  901. if len(interceptors) == 0 {
  902. chainedInt = nil
  903. } else if len(interceptors) == 1 {
  904. chainedInt = interceptors[0]
  905. } else {
  906. chainedInt = func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (interface{}, error) {
  907. return interceptors[0](ctx, req, info, getChainUnaryHandler(interceptors, 0, info, handler))
  908. }
  909. }
  910. s.opts.unaryInt = chainedInt
  911. }
  912. // getChainUnaryHandler recursively generate the chained UnaryHandler
  913. func getChainUnaryHandler(interceptors []UnaryServerInterceptor, curr int, info *UnaryServerInfo, finalHandler UnaryHandler) UnaryHandler {
  914. if curr == len(interceptors)-1 {
  915. return finalHandler
  916. }
  917. return func(ctx context.Context, req interface{}) (interface{}, error) {
  918. return interceptors[curr+1](ctx, req, info, getChainUnaryHandler(interceptors, curr+1, info, finalHandler))
  919. }
  920. }
  921. func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) {
  922. sh := s.opts.statsHandler
  923. if sh != nil || trInfo != nil || channelz.IsOn() {
  924. if channelz.IsOn() {
  925. s.incrCallsStarted()
  926. }
  927. var statsBegin *stats.Begin
  928. if sh != nil {
  929. beginTime := time.Now()
  930. statsBegin = &stats.Begin{
  931. BeginTime: beginTime,
  932. }
  933. sh.HandleRPC(stream.Context(), statsBegin)
  934. }
  935. if trInfo != nil {
  936. trInfo.tr.LazyLog(&trInfo.firstLine, false)
  937. }
  938. // The deferred error handling for tracing, stats handler and channelz are
  939. // combined into one function to reduce stack usage -- a defer takes ~56-64
  940. // bytes on the stack, so overflowing the stack will require a stack
  941. // re-allocation, which is expensive.
  942. //
  943. // To maintain behavior similar to separate deferred statements, statements
  944. // should be executed in the reverse order. That is, tracing first, stats
  945. // handler second, and channelz last. Note that panics *within* defers will
  946. // lead to different behavior, but that's an acceptable compromise; that
  947. // would be undefined behavior territory anyway.
  948. defer func() {
  949. if trInfo != nil {
  950. if err != nil && err != io.EOF {
  951. trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  952. trInfo.tr.SetError()
  953. }
  954. trInfo.tr.Finish()
  955. }
  956. if sh != nil {
  957. end := &stats.End{
  958. BeginTime: statsBegin.BeginTime,
  959. EndTime: time.Now(),
  960. }
  961. if err != nil && err != io.EOF {
  962. end.Error = toRPCErr(err)
  963. }
  964. sh.HandleRPC(stream.Context(), end)
  965. }
  966. if channelz.IsOn() {
  967. if err != nil && err != io.EOF {
  968. s.incrCallsFailed()
  969. } else {
  970. s.incrCallsSucceeded()
  971. }
  972. }
  973. }()
  974. }
  975. binlog := binarylog.GetMethodLogger(stream.Method())
  976. if binlog != nil {
  977. ctx := stream.Context()
  978. md, _ := metadata.FromIncomingContext(ctx)
  979. logEntry := &binarylog.ClientHeader{
  980. Header: md,
  981. MethodName: stream.Method(),
  982. PeerAddr: nil,
  983. }
  984. if deadline, ok := ctx.Deadline(); ok {
  985. logEntry.Timeout = time.Until(deadline)
  986. if logEntry.Timeout < 0 {
  987. logEntry.Timeout = 0
  988. }
  989. }
  990. if a := md[":authority"]; len(a) > 0 {
  991. logEntry.Authority = a[0]
  992. }
  993. if peer, ok := peer.FromContext(ctx); ok {
  994. logEntry.PeerAddr = peer.Addr
  995. }
  996. binlog.Log(logEntry)
  997. }
  998. // comp and cp are used for compression. decomp and dc are used for
  999. // decompression. If comp and decomp are both set, they are the same;
  1000. // however they are kept separate to ensure that at most one of the
  1001. // compressor/decompressor variable pairs are set for use later.
  1002. var comp, decomp encoding.Compressor
  1003. var cp Compressor
  1004. var dc Decompressor
  1005. // If dc is set and matches the stream's compression, use it. Otherwise, try
  1006. // to find a matching registered compressor for decomp.
  1007. if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
  1008. dc = s.opts.dc
  1009. } else if rc != "" && rc != encoding.Identity {
  1010. decomp = encoding.GetCompressor(rc)
  1011. if decomp == nil {
  1012. st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
  1013. t.WriteStatus(stream, st)
  1014. return st.Err()
  1015. }
  1016. }
  1017. // If cp is set, use it. Otherwise, attempt to compress the response using
  1018. // the incoming message compression method.
  1019. //
  1020. // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
  1021. if s.opts.cp != nil {
  1022. cp = s.opts.cp
  1023. stream.SetSendCompress(cp.Type())
  1024. } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
  1025. // Legacy compressor not specified; attempt to respond with same encoding.
  1026. comp = encoding.GetCompressor(rc)
  1027. if comp != nil {
  1028. stream.SetSendCompress(rc)
  1029. }
  1030. }
  1031. var payInfo *payloadInfo
  1032. if sh != nil || binlog != nil {
  1033. payInfo = &payloadInfo{}
  1034. }
  1035. d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
  1036. if err != nil {
  1037. if st, ok := status.FromError(err); ok {
  1038. if e := t.WriteStatus(stream, st); e != nil {
  1039. channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status %v", e)
  1040. }
  1041. }
  1042. return err
  1043. }
  1044. if channelz.IsOn() {
  1045. t.IncrMsgRecv()
  1046. }
  1047. df := func(v interface{}) error {
  1048. if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {
  1049. return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
  1050. }
  1051. if sh != nil {
  1052. sh.HandleRPC(stream.Context(), &stats.InPayload{
  1053. RecvTime: time.Now(),
  1054. Payload: v,
  1055. WireLength: payInfo.wireLength,
  1056. Data: d,
  1057. Length: len(d),
  1058. })
  1059. }
  1060. if binlog != nil {
  1061. binlog.Log(&binarylog.ClientMessage{
  1062. Message: d,
  1063. })
  1064. }
  1065. if trInfo != nil {
  1066. trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
  1067. }
  1068. return nil
  1069. }
  1070. ctx := NewContextWithServerTransportStream(stream.Context(), stream)
  1071. reply, appErr := md.Handler(srv.server, ctx, df, s.opts.unaryInt)
  1072. if appErr != nil {
  1073. appStatus, ok := status.FromError(appErr)
  1074. if !ok {
  1075. // Convert appErr if it is not a grpc status error.
  1076. appErr = status.Error(codes.Unknown, appErr.Error())
  1077. appStatus, _ = status.FromError(appErr)
  1078. }
  1079. if trInfo != nil {
  1080. trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
  1081. trInfo.tr.SetError()
  1082. }
  1083. if e := t.WriteStatus(stream, appStatus); e != nil {
  1084. channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
  1085. }
  1086. if binlog != nil {
  1087. if h, _ := stream.Header(); h.Len() > 0 {
  1088. // Only log serverHeader if there was header. Otherwise it can
  1089. // be trailer only.
  1090. binlog.Log(&binarylog.ServerHeader{
  1091. Header: h,
  1092. })
  1093. }
  1094. binlog.Log(&binarylog.ServerTrailer{
  1095. Trailer: stream.Trailer(),
  1096. Err: appErr,
  1097. })
  1098. }
  1099. return appErr
  1100. }
  1101. if trInfo != nil {
  1102. trInfo.tr.LazyLog(stringer("OK"), false)
  1103. }
  1104. opts := &transport.Options{Last: true}
  1105. if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
  1106. if err == io.EOF {
  1107. // The entire stream is done (for unary RPC only).
  1108. return err
  1109. }
  1110. if sts, ok := status.FromError(err); ok {
  1111. if e := t.WriteStatus(stream, sts); e != nil {
  1112. channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
  1113. }
  1114. } else {
  1115. switch st := err.(type) {
  1116. case transport.ConnectionError:
  1117. // Nothing to do here.
  1118. default:
  1119. panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
  1120. }
  1121. }
  1122. if binlog != nil {
  1123. h, _ := stream.Header()
  1124. binlog.Log(&binarylog.ServerHeader{
  1125. Header: h,
  1126. })
  1127. binlog.Log(&binarylog.ServerTrailer{
  1128. Trailer: stream.Trailer(),
  1129. Err: appErr,
  1130. })
  1131. }
  1132. return err
  1133. }
  1134. if binlog != nil {
  1135. h, _ := stream.Header()
  1136. binlog.Log(&binarylog.ServerHeader{
  1137. Header: h,
  1138. })
  1139. binlog.Log(&binarylog.ServerMessage{
  1140. Message: reply,
  1141. })
  1142. }
  1143. if channelz.IsOn() {
  1144. t.IncrMsgSent()
  1145. }
  1146. if trInfo != nil {
  1147. trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
  1148. }
  1149. // TODO: Should we be logging if writing status failed here, like above?
  1150. // Should the logging be in WriteStatus? Should we ignore the WriteStatus
  1151. // error or allow the stats handler to see it?
  1152. err = t.WriteStatus(stream, statusOK)
  1153. if binlog != nil {
  1154. binlog.Log(&binarylog.ServerTrailer{
  1155. Trailer: stream.Trailer(),
  1156. Err: appErr,
  1157. })
  1158. }
  1159. return err
  1160. }
  1161. // chainStreamServerInterceptors chains all stream server interceptors into one.
  1162. func chainStreamServerInterceptors(s *Server) {
  1163. // Prepend opts.streamInt to the chaining interceptors if it exists, since streamInt will
  1164. // be executed before any other chained interceptors.
  1165. interceptors := s.opts.chainStreamInts
  1166. if s.opts.streamInt != nil {
  1167. interceptors = append([]StreamServerInterceptor{s.opts.streamInt}, s.opts.chainStreamInts...)
  1168. }
  1169. var chainedInt StreamServerInterceptor
  1170. if len(interceptors) == 0 {
  1171. chainedInt = nil
  1172. } else if len(interceptors) == 1 {
  1173. chainedInt = interceptors[0]
  1174. } else {
  1175. chainedInt = func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error {
  1176. return interceptors[0](srv, ss, info, getChainStreamHandler(interceptors, 0, info, handler))
  1177. }
  1178. }
  1179. s.opts.streamInt = chainedInt
  1180. }
  1181. // getChainStreamHandler recursively generate the chained StreamHandler
  1182. func getChainStreamHandler(interceptors []StreamServerInterceptor, curr int, info *StreamServerInfo, finalHandler StreamHandler) StreamHandler {
  1183. if curr == len(interceptors)-1 {
  1184. return finalHandler
  1185. }
  1186. return func(srv interface{}, ss ServerStream) error {
  1187. return interceptors[curr+1](srv, ss, info, getChainStreamHandler(interceptors, curr+1, info, finalHandler))
  1188. }
  1189. }
  1190. func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) {
  1191. if channelz.IsOn() {
  1192. s.incrCallsStarted()
  1193. }
  1194. sh := s.opts.statsHandler
  1195. var statsBegin *stats.Begin
  1196. if sh != nil {
  1197. beginTime := time.Now()
  1198. statsBegin = &stats.Begin{
  1199. BeginTime: beginTime,
  1200. }
  1201. sh.HandleRPC(stream.Context(), statsBegin)
  1202. }
  1203. ctx := NewContextWithServerTransportStream(stream.Context(), stream)
  1204. ss := &serverStream{
  1205. ctx: ctx,
  1206. t: t,
  1207. s: stream,
  1208. p: &parser{r: stream},
  1209. codec: s.getCodec(stream.ContentSubtype()),
  1210. maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
  1211. maxSendMessageSize: s.opts.maxSendMessageSize,
  1212. trInfo: trInfo,
  1213. statsHandler: sh,
  1214. }
  1215. if sh != nil || trInfo != nil || channelz.IsOn() {
  1216. // See comment in processUnaryRPC on defers.
  1217. defer func() {
  1218. if trInfo != nil {
  1219. ss.mu.Lock()
  1220. if err != nil && err != io.EOF {
  1221. ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  1222. ss.trInfo.tr.SetError()
  1223. }
  1224. ss.trInfo.tr.Finish()
  1225. ss.trInfo.tr = nil
  1226. ss.mu.Unlock()
  1227. }
  1228. if sh != nil {
  1229. end := &stats.End{
  1230. BeginTime: statsBegin.BeginTime,
  1231. EndTime: time.Now(),
  1232. }
  1233. if err != nil && err != io.EOF {
  1234. end.Error = toRPCErr(err)
  1235. }
  1236. sh.HandleRPC(stream.Context(), end)
  1237. }
  1238. if channelz.IsOn() {
  1239. if err != nil && err != io.EOF {
  1240. s.incrCallsFailed()
  1241. } else {
  1242. s.incrCallsSucceeded()
  1243. }
  1244. }
  1245. }()
  1246. }
  1247. ss.binlog = binarylog.GetMethodLogger(stream.Method())
  1248. if ss.binlog != nil {
  1249. md, _ := metadata.FromIncomingContext(ctx)
  1250. logEntry := &binarylog.ClientHeader{
  1251. Header: md,
  1252. MethodName: stream.Method(),
  1253. PeerAddr: nil,
  1254. }
  1255. if deadline, ok := ctx.Deadline(); ok {
  1256. logEntry.Timeout = time.Until(deadline)
  1257. if logEntry.Timeout < 0 {
  1258. logEntry.Timeout = 0
  1259. }
  1260. }
  1261. if a := md[":authority"]; len(a) > 0 {
  1262. logEntry.Authority = a[0]
  1263. }
  1264. if peer, ok := peer.FromContext(ss.Context()); ok {
  1265. logEntry.PeerAddr = peer.Addr
  1266. }
  1267. ss.binlog.Log(logEntry)
  1268. }
  1269. // If dc is set and matches the stream's compression, use it. Otherwise, try
  1270. // to find a matching registered compressor for decomp.
  1271. if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
  1272. ss.dc = s.opts.dc
  1273. } else if rc != "" && rc != encoding.Identity {
  1274. ss.decomp = encoding.GetCompressor(rc)
  1275. if ss.decomp == nil {
  1276. st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
  1277. t.WriteStatus(ss.s, st)
  1278. return st.Err()
  1279. }
  1280. }
  1281. // If cp is set, use it. Otherwise, attempt to compress the response using
  1282. // the incoming message compression method.
  1283. //
  1284. // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
  1285. if s.opts.cp != nil {
  1286. ss.cp = s.opts.cp
  1287. stream.SetSendCompress(s.opts.cp.Type())
  1288. } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
  1289. // Legacy compressor not specified; attempt to respond with same encoding.
  1290. ss.comp = encoding.GetCompressor(rc)
  1291. if ss.comp != nil {
  1292. stream.SetSendCompress(rc)
  1293. }
  1294. }
  1295. if trInfo != nil {
  1296. trInfo.tr.LazyLog(&trInfo.firstLine, false)
  1297. }
  1298. var appErr error
  1299. var server interface{}
  1300. if srv != nil {
  1301. server = srv.server
  1302. }
  1303. if s.opts.streamInt == nil {
  1304. appErr = sd.Handler(server, ss)
  1305. } else {
  1306. info := &StreamServerInfo{
  1307. FullMethod: stream.Method(),
  1308. IsClientStream: sd.ClientStreams,
  1309. IsServerStream: sd.ServerStreams,
  1310. }
  1311. appErr = s.opts.streamInt(server, ss, info, sd.Handler)
  1312. }
  1313. if appErr != nil {
  1314. appStatus, ok := status.FromError(appErr)
  1315. if !ok {
  1316. appStatus = status.New(codes.Unknown, appErr.Error())
  1317. appErr = appStatus.Err()
  1318. }
  1319. if trInfo != nil {
  1320. ss.mu.Lock()
  1321. ss.trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
  1322. ss.trInfo.tr.SetError()
  1323. ss.mu.Unlock()
  1324. }
  1325. t.WriteStatus(ss.s, appStatus)
  1326. if ss.binlog != nil {
  1327. ss.binlog.Log(&binarylog.ServerTrailer{
  1328. Trailer: ss.s.Trailer(),
  1329. Err: appErr,
  1330. })
  1331. }
  1332. // TODO: Should we log an error from WriteStatus here and below?
  1333. return appErr
  1334. }
  1335. if trInfo != nil {
  1336. ss.mu.Lock()
  1337. ss.trInfo.tr.LazyLog(stringer("OK"), false)
  1338. ss.mu.Unlock()
  1339. }
  1340. err = t.WriteStatus(ss.s, statusOK)
  1341. if ss.binlog != nil {
  1342. ss.binlog.Log(&binarylog.ServerTrailer{
  1343. Trailer: ss.s.Trailer(),
  1344. Err: appErr,
  1345. })
  1346. }
  1347. return err
  1348. }
  1349. func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
  1350. sm := stream.Method()
  1351. if sm != "" && sm[0] == '/' {
  1352. sm = sm[1:]
  1353. }
  1354. pos := strings.LastIndex(sm, "/")
  1355. if pos == -1 {
  1356. if trInfo != nil {
  1357. trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true)
  1358. trInfo.tr.SetError()
  1359. }
  1360. errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
  1361. if err := t.WriteStatus(stream, status.New(codes.ResourceExhausted, errDesc)); err != nil {
  1362. if trInfo != nil {
  1363. trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  1364. trInfo.tr.SetError()
  1365. }
  1366. channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
  1367. }
  1368. if trInfo != nil {
  1369. trInfo.tr.Finish()
  1370. }
  1371. return
  1372. }
  1373. service := sm[:pos]
  1374. method := sm[pos+1:]
  1375. srv, knownService := s.m[service]
  1376. if knownService {
  1377. if md, ok := srv.md[method]; ok {
  1378. s.processUnaryRPC(t, stream, srv, md, trInfo)
  1379. return
  1380. }
  1381. if sd, ok := srv.sd[method]; ok {
  1382. s.processStreamingRPC(t, stream, srv, sd, trInfo)
  1383. return
  1384. }
  1385. }
  1386. // Unknown service, or known server unknown method.
  1387. if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
  1388. s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
  1389. return
  1390. }
  1391. var errDesc string
  1392. if !knownService {
  1393. errDesc = fmt.Sprintf("unknown service %v", service)
  1394. } else {
  1395. errDesc = fmt.Sprintf("unknown method %v for service %v", method, service)
  1396. }
  1397. if trInfo != nil {
  1398. trInfo.tr.LazyPrintf("%s", errDesc)
  1399. trInfo.tr.SetError()
  1400. }
  1401. if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
  1402. if trInfo != nil {
  1403. trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  1404. trInfo.tr.SetError()
  1405. }
  1406. channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
  1407. }
  1408. if trInfo != nil {
  1409. trInfo.tr.Finish()
  1410. }
  1411. }
  1412. // The key to save ServerTransportStream in the context.
  1413. type streamKey struct{}
  1414. // NewContextWithServerTransportStream creates a new context from ctx and
  1415. // attaches stream to it.
  1416. //
  1417. // This API is EXPERIMENTAL.
  1418. func NewContextWithServerTransportStream(ctx context.Context, stream ServerTransportStream) context.Context {
  1419. return context.WithValue(ctx, streamKey{}, stream)
  1420. }
  1421. // ServerTransportStream is a minimal interface that a transport stream must
  1422. // implement. This can be used to mock an actual transport stream for tests of
  1423. // handler code that use, for example, grpc.SetHeader (which requires some
  1424. // stream to be in context).
  1425. //
  1426. // See also NewContextWithServerTransportStream.
  1427. //
  1428. // This API is EXPERIMENTAL.
  1429. type ServerTransportStream interface {
  1430. Method() string
  1431. SetHeader(md metadata.MD) error
  1432. SendHeader(md metadata.MD) error
  1433. SetTrailer(md metadata.MD) error
  1434. }
  1435. // ServerTransportStreamFromContext returns the ServerTransportStream saved in
  1436. // ctx. Returns nil if the given context has no stream associated with it
  1437. // (which implies it is not an RPC invocation context).
  1438. //
  1439. // This API is EXPERIMENTAL.
  1440. func ServerTransportStreamFromContext(ctx context.Context) ServerTransportStream {
  1441. s, _ := ctx.Value(streamKey{}).(ServerTransportStream)
  1442. return s
  1443. }
  1444. // Stop stops the gRPC server. It immediately closes all open
  1445. // connections and listeners.
  1446. // It cancels all active RPCs on the server side and the corresponding
  1447. // pending RPCs on the client side will get notified by connection
  1448. // errors.
  1449. func (s *Server) Stop() {
  1450. s.quit.Fire()
  1451. defer func() {
  1452. s.serveWG.Wait()
  1453. s.done.Fire()
  1454. }()
  1455. s.channelzRemoveOnce.Do(func() {
  1456. if channelz.IsOn() {
  1457. channelz.RemoveEntry(s.channelzID)
  1458. }
  1459. })
  1460. s.mu.Lock()
  1461. listeners := s.lis
  1462. s.lis = nil
  1463. st := s.conns
  1464. s.conns = nil
  1465. // interrupt GracefulStop if Stop and GracefulStop are called concurrently.
  1466. s.cv.Broadcast()
  1467. s.mu.Unlock()
  1468. for lis := range listeners {
  1469. lis.Close()
  1470. }
  1471. for c := range st {
  1472. c.Close()
  1473. }
  1474. if s.opts.numServerWorkers > 0 {
  1475. s.stopServerWorkers()
  1476. }
  1477. s.mu.Lock()
  1478. if s.events != nil {
  1479. s.events.Finish()
  1480. s.events = nil
  1481. }
  1482. s.mu.Unlock()
  1483. }
  1484. // GracefulStop stops the gRPC server gracefully. It stops the server from
  1485. // accepting new connections and RPCs and blocks until all the pending RPCs are
  1486. // finished.
  1487. func (s *Server) GracefulStop() {
  1488. s.quit.Fire()
  1489. defer s.done.Fire()
  1490. s.channelzRemoveOnce.Do(func() {
  1491. if channelz.IsOn() {
  1492. channelz.RemoveEntry(s.channelzID)
  1493. }
  1494. })
  1495. s.mu.Lock()
  1496. if s.conns == nil {
  1497. s.mu.Unlock()
  1498. return
  1499. }
  1500. for lis := range s.lis {
  1501. lis.Close()
  1502. }
  1503. s.lis = nil
  1504. if !s.drain {
  1505. for st := range s.conns {
  1506. st.Drain()
  1507. }
  1508. s.drain = true
  1509. }
  1510. // Wait for serving threads to be ready to exit. Only then can we be sure no
  1511. // new conns will be created.
  1512. s.mu.Unlock()
  1513. s.serveWG.Wait()
  1514. s.mu.Lock()
  1515. for len(s.conns) != 0 {
  1516. s.cv.Wait()
  1517. }
  1518. s.conns = nil
  1519. if s.events != nil {
  1520. s.events.Finish()
  1521. s.events = nil
  1522. }
  1523. s.mu.Unlock()
  1524. }
  1525. // contentSubtype must be lowercase
  1526. // cannot return nil
  1527. func (s *Server) getCodec(contentSubtype string) baseCodec {
  1528. if s.opts.codec != nil {
  1529. return s.opts.codec
  1530. }
  1531. if contentSubtype == "" {
  1532. return encoding.GetCodec(proto.Name)
  1533. }
  1534. codec := encoding.GetCodec(contentSubtype)
  1535. if codec == nil {
  1536. return encoding.GetCodec(proto.Name)
  1537. }
  1538. return codec
  1539. }
  1540. // SetHeader sets the header metadata.
  1541. // When called multiple times, all the provided metadata will be merged.
  1542. // All the metadata will be sent out when one of the following happens:
  1543. // - grpc.SendHeader() is called;
  1544. // - The first response is sent out;
  1545. // - An RPC status is sent out (error or success).
  1546. func SetHeader(ctx context.Context, md metadata.MD) error {
  1547. if md.Len() == 0 {
  1548. return nil
  1549. }
  1550. stream := ServerTransportStreamFromContext(ctx)
  1551. if stream == nil {
  1552. return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
  1553. }
  1554. return stream.SetHeader(md)
  1555. }
  1556. // SendHeader sends header metadata. It may be called at most once.
  1557. // The provided md and headers set by SetHeader() will be sent.
  1558. func SendHeader(ctx context.Context, md metadata.MD) error {
  1559. stream := ServerTransportStreamFromContext(ctx)
  1560. if stream == nil {
  1561. return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
  1562. }
  1563. if err := stream.SendHeader(md); err != nil {
  1564. return toRPCErr(err)
  1565. }
  1566. return nil
  1567. }
  1568. // SetTrailer sets the trailer metadata that will be sent when an RPC returns.
  1569. // When called more than once, all the provided metadata will be merged.
  1570. func SetTrailer(ctx context.Context, md metadata.MD) error {
  1571. if md.Len() == 0 {
  1572. return nil
  1573. }
  1574. stream := ServerTransportStreamFromContext(ctx)
  1575. if stream == nil {
  1576. return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
  1577. }
  1578. return stream.SetTrailer(md)
  1579. }
  1580. // Method returns the method string for the server context. The returned
  1581. // string is in the format of "/service/method".
  1582. func Method(ctx context.Context) (string, bool) {
  1583. s := ServerTransportStreamFromContext(ctx)
  1584. if s == nil {
  1585. return "", false
  1586. }
  1587. return s.Method(), true
  1588. }
  1589. type channelzServer struct {
  1590. s *Server
  1591. }
  1592. func (c *channelzServer) ChannelzMetric() *channelz.ServerInternalMetric {
  1593. return c.s.channelzMetric()
  1594. }