server.go 48 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640
  1. /*
  2. *
  3. * Copyright 2014 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package grpc
  19. import (
  20. "context"
  21. "errors"
  22. "fmt"
  23. "io"
  24. "math"
  25. "net"
  26. "net/http"
  27. "reflect"
  28. "runtime"
  29. "strings"
  30. "sync"
  31. "sync/atomic"
  32. "time"
  33. "golang.org/x/net/trace"
  34. "google.golang.org/grpc/codes"
  35. "google.golang.org/grpc/credentials"
  36. "google.golang.org/grpc/encoding"
  37. "google.golang.org/grpc/encoding/proto"
  38. "google.golang.org/grpc/grpclog"
  39. "google.golang.org/grpc/internal/binarylog"
  40. "google.golang.org/grpc/internal/channelz"
  41. "google.golang.org/grpc/internal/grpcsync"
  42. "google.golang.org/grpc/internal/transport"
  43. "google.golang.org/grpc/keepalive"
  44. "google.golang.org/grpc/metadata"
  45. "google.golang.org/grpc/peer"
  46. "google.golang.org/grpc/stats"
  47. "google.golang.org/grpc/status"
  48. "google.golang.org/grpc/tap"
  49. )
  50. const (
  51. defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
  52. defaultServerMaxSendMessageSize = math.MaxInt32
  53. )
  54. var statusOK = status.New(codes.OK, "")
  55. type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error)
  56. // MethodDesc represents an RPC service's method specification.
  57. type MethodDesc struct {
  58. MethodName string
  59. Handler methodHandler
  60. }
  61. // ServiceDesc represents an RPC service's specification.
  62. type ServiceDesc struct {
  63. ServiceName string
  64. // The pointer to the service interface. Used to check whether the user
  65. // provided implementation satisfies the interface requirements.
  66. HandlerType interface{}
  67. Methods []MethodDesc
  68. Streams []StreamDesc
  69. Metadata interface{}
  70. }
  71. // service consists of the information of the server serving this service and
  72. // the methods in this service.
  73. type service struct {
  74. server interface{} // the server for service methods
  75. md map[string]*MethodDesc
  76. sd map[string]*StreamDesc
  77. mdata interface{}
  78. }
  79. // Server is a gRPC server to serve RPC requests.
  80. type Server struct {
  81. opts serverOptions
  82. mu sync.Mutex // guards following
  83. lis map[net.Listener]bool
  84. conns map[transport.ServerTransport]bool
  85. serve bool
  86. drain bool
  87. cv *sync.Cond // signaled when connections close for GracefulStop
  88. m map[string]*service // service name -> service info
  89. events trace.EventLog
  90. quit *grpcsync.Event
  91. done *grpcsync.Event
  92. channelzRemoveOnce sync.Once
  93. serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop
  94. channelzID int64 // channelz unique identification number
  95. czData *channelzData
  96. }
  97. type serverOptions struct {
  98. creds credentials.TransportCredentials
  99. codec baseCodec
  100. cp Compressor
  101. dc Decompressor
  102. unaryInt UnaryServerInterceptor
  103. streamInt StreamServerInterceptor
  104. chainUnaryInts []UnaryServerInterceptor
  105. chainStreamInts []StreamServerInterceptor
  106. inTapHandle tap.ServerInHandle
  107. statsHandler stats.Handler
  108. maxConcurrentStreams uint32
  109. maxReceiveMessageSize int
  110. maxSendMessageSize int
  111. unknownStreamDesc *StreamDesc
  112. keepaliveParams keepalive.ServerParameters
  113. keepalivePolicy keepalive.EnforcementPolicy
  114. initialWindowSize int32
  115. initialConnWindowSize int32
  116. writeBufferSize int
  117. readBufferSize int
  118. connectionTimeout time.Duration
  119. maxHeaderListSize *uint32
  120. headerTableSize *uint32
  121. }
  122. var defaultServerOptions = serverOptions{
  123. maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
  124. maxSendMessageSize: defaultServerMaxSendMessageSize,
  125. connectionTimeout: 120 * time.Second,
  126. writeBufferSize: defaultWriteBufSize,
  127. readBufferSize: defaultReadBufSize,
  128. }
  129. // A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
  130. type ServerOption interface {
  131. apply(*serverOptions)
  132. }
  133. // EmptyServerOption does not alter the server configuration. It can be embedded
  134. // in another structure to build custom server options.
  135. //
  136. // This API is EXPERIMENTAL.
  137. type EmptyServerOption struct{}
  138. func (EmptyServerOption) apply(*serverOptions) {}
  139. // funcServerOption wraps a function that modifies serverOptions into an
  140. // implementation of the ServerOption interface.
  141. type funcServerOption struct {
  142. f func(*serverOptions)
  143. }
  144. func (fdo *funcServerOption) apply(do *serverOptions) {
  145. fdo.f(do)
  146. }
  147. func newFuncServerOption(f func(*serverOptions)) *funcServerOption {
  148. return &funcServerOption{
  149. f: f,
  150. }
  151. }
  152. // WriteBufferSize determines how much data can be batched before doing a write on the wire.
  153. // The corresponding memory allocation for this buffer will be twice the size to keep syscalls low.
  154. // The default value for this buffer is 32KB.
  155. // Zero will disable the write buffer such that each write will be on underlying connection.
  156. // Note: A Send call may not directly translate to a write.
  157. func WriteBufferSize(s int) ServerOption {
  158. return newFuncServerOption(func(o *serverOptions) {
  159. o.writeBufferSize = s
  160. })
  161. }
  162. // ReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most
  163. // for one read syscall.
  164. // The default value for this buffer is 32KB.
  165. // Zero will disable read buffer for a connection so data framer can access the underlying
  166. // conn directly.
  167. func ReadBufferSize(s int) ServerOption {
  168. return newFuncServerOption(func(o *serverOptions) {
  169. o.readBufferSize = s
  170. })
  171. }
  172. // InitialWindowSize returns a ServerOption that sets window size for stream.
  173. // The lower bound for window size is 64K and any value smaller than that will be ignored.
  174. func InitialWindowSize(s int32) ServerOption {
  175. return newFuncServerOption(func(o *serverOptions) {
  176. o.initialWindowSize = s
  177. })
  178. }
  179. // InitialConnWindowSize returns a ServerOption that sets window size for a connection.
  180. // The lower bound for window size is 64K and any value smaller than that will be ignored.
  181. func InitialConnWindowSize(s int32) ServerOption {
  182. return newFuncServerOption(func(o *serverOptions) {
  183. o.initialConnWindowSize = s
  184. })
  185. }
  186. // KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
  187. func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
  188. if kp.Time > 0 && kp.Time < time.Second {
  189. grpclog.Warning("Adjusting keepalive ping interval to minimum period of 1s")
  190. kp.Time = time.Second
  191. }
  192. return newFuncServerOption(func(o *serverOptions) {
  193. o.keepaliveParams = kp
  194. })
  195. }
  196. // KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server.
  197. func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption {
  198. return newFuncServerOption(func(o *serverOptions) {
  199. o.keepalivePolicy = kep
  200. })
  201. }
  202. // CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
  203. //
  204. // This will override any lookups by content-subtype for Codecs registered with RegisterCodec.
  205. func CustomCodec(codec Codec) ServerOption {
  206. return newFuncServerOption(func(o *serverOptions) {
  207. o.codec = codec
  208. })
  209. }
  210. // RPCCompressor returns a ServerOption that sets a compressor for outbound
  211. // messages. For backward compatibility, all outbound messages will be sent
  212. // using this compressor, regardless of incoming message compression. By
  213. // default, server messages will be sent using the same compressor with which
  214. // request messages were sent.
  215. //
  216. // Deprecated: use encoding.RegisterCompressor instead.
  217. func RPCCompressor(cp Compressor) ServerOption {
  218. return newFuncServerOption(func(o *serverOptions) {
  219. o.cp = cp
  220. })
  221. }
  222. // RPCDecompressor returns a ServerOption that sets a decompressor for inbound
  223. // messages. It has higher priority than decompressors registered via
  224. // encoding.RegisterCompressor.
  225. //
  226. // Deprecated: use encoding.RegisterCompressor instead.
  227. func RPCDecompressor(dc Decompressor) ServerOption {
  228. return newFuncServerOption(func(o *serverOptions) {
  229. o.dc = dc
  230. })
  231. }
  232. // MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
  233. // If this is not set, gRPC uses the default limit.
  234. //
  235. // Deprecated: use MaxRecvMsgSize instead.
  236. func MaxMsgSize(m int) ServerOption {
  237. return MaxRecvMsgSize(m)
  238. }
  239. // MaxRecvMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
  240. // If this is not set, gRPC uses the default 4MB.
  241. func MaxRecvMsgSize(m int) ServerOption {
  242. return newFuncServerOption(func(o *serverOptions) {
  243. o.maxReceiveMessageSize = m
  244. })
  245. }
  246. // MaxSendMsgSize returns a ServerOption to set the max message size in bytes the server can send.
  247. // If this is not set, gRPC uses the default `math.MaxInt32`.
  248. func MaxSendMsgSize(m int) ServerOption {
  249. return newFuncServerOption(func(o *serverOptions) {
  250. o.maxSendMessageSize = m
  251. })
  252. }
  253. // MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
  254. // of concurrent streams to each ServerTransport.
  255. func MaxConcurrentStreams(n uint32) ServerOption {
  256. return newFuncServerOption(func(o *serverOptions) {
  257. o.maxConcurrentStreams = n
  258. })
  259. }
  260. // Creds returns a ServerOption that sets credentials for server connections.
  261. func Creds(c credentials.TransportCredentials) ServerOption {
  262. return newFuncServerOption(func(o *serverOptions) {
  263. o.creds = c
  264. })
  265. }
  266. // UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the
  267. // server. Only one unary interceptor can be installed. The construction of multiple
  268. // interceptors (e.g., chaining) can be implemented at the caller.
  269. func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
  270. return newFuncServerOption(func(o *serverOptions) {
  271. if o.unaryInt != nil {
  272. panic("The unary server interceptor was already set and may not be reset.")
  273. }
  274. o.unaryInt = i
  275. })
  276. }
  277. // ChainUnaryInterceptor returns a ServerOption that specifies the chained interceptor
  278. // for unary RPCs. The first interceptor will be the outer most,
  279. // while the last interceptor will be the inner most wrapper around the real call.
  280. // All unary interceptors added by this method will be chained.
  281. func ChainUnaryInterceptor(interceptors ...UnaryServerInterceptor) ServerOption {
  282. return newFuncServerOption(func(o *serverOptions) {
  283. o.chainUnaryInts = append(o.chainUnaryInts, interceptors...)
  284. })
  285. }
  286. // StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the
  287. // server. Only one stream interceptor can be installed.
  288. func StreamInterceptor(i StreamServerInterceptor) ServerOption {
  289. return newFuncServerOption(func(o *serverOptions) {
  290. if o.streamInt != nil {
  291. panic("The stream server interceptor was already set and may not be reset.")
  292. }
  293. o.streamInt = i
  294. })
  295. }
  296. // ChainStreamInterceptor returns a ServerOption that specifies the chained interceptor
  297. // for stream RPCs. The first interceptor will be the outer most,
  298. // while the last interceptor will be the inner most wrapper around the real call.
  299. // All stream interceptors added by this method will be chained.
  300. func ChainStreamInterceptor(interceptors ...StreamServerInterceptor) ServerOption {
  301. return newFuncServerOption(func(o *serverOptions) {
  302. o.chainStreamInts = append(o.chainStreamInts, interceptors...)
  303. })
  304. }
  305. // InTapHandle returns a ServerOption that sets the tap handle for all the server
  306. // transport to be created. Only one can be installed.
  307. func InTapHandle(h tap.ServerInHandle) ServerOption {
  308. return newFuncServerOption(func(o *serverOptions) {
  309. if o.inTapHandle != nil {
  310. panic("The tap handle was already set and may not be reset.")
  311. }
  312. o.inTapHandle = h
  313. })
  314. }
  315. // StatsHandler returns a ServerOption that sets the stats handler for the server.
  316. func StatsHandler(h stats.Handler) ServerOption {
  317. return newFuncServerOption(func(o *serverOptions) {
  318. o.statsHandler = h
  319. })
  320. }
  321. // UnknownServiceHandler returns a ServerOption that allows for adding a custom
  322. // unknown service handler. The provided method is a bidi-streaming RPC service
  323. // handler that will be invoked instead of returning the "unimplemented" gRPC
  324. // error whenever a request is received for an unregistered service or method.
  325. // The handling function and stream interceptor (if set) have full access to
  326. // the ServerStream, including its Context.
  327. func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
  328. return newFuncServerOption(func(o *serverOptions) {
  329. o.unknownStreamDesc = &StreamDesc{
  330. StreamName: "unknown_service_handler",
  331. Handler: streamHandler,
  332. // We need to assume that the users of the streamHandler will want to use both.
  333. ClientStreams: true,
  334. ServerStreams: true,
  335. }
  336. })
  337. }
  338. // ConnectionTimeout returns a ServerOption that sets the timeout for
  339. // connection establishment (up to and including HTTP/2 handshaking) for all
  340. // new connections. If this is not set, the default is 120 seconds. A zero or
  341. // negative value will result in an immediate timeout.
  342. //
  343. // This API is EXPERIMENTAL.
  344. func ConnectionTimeout(d time.Duration) ServerOption {
  345. return newFuncServerOption(func(o *serverOptions) {
  346. o.connectionTimeout = d
  347. })
  348. }
  349. // MaxHeaderListSize returns a ServerOption that sets the max (uncompressed) size
  350. // of header list that the server is prepared to accept.
  351. func MaxHeaderListSize(s uint32) ServerOption {
  352. return newFuncServerOption(func(o *serverOptions) {
  353. o.maxHeaderListSize = &s
  354. })
  355. }
  356. // HeaderTableSize returns a ServerOption that sets the size of dynamic
  357. // header table for stream.
  358. //
  359. // This API is EXPERIMENTAL.
  360. func HeaderTableSize(s uint32) ServerOption {
  361. return newFuncServerOption(func(o *serverOptions) {
  362. o.headerTableSize = &s
  363. })
  364. }
  365. // NewServer creates a gRPC server which has no service registered and has not
  366. // started to accept requests yet.
  367. func NewServer(opt ...ServerOption) *Server {
  368. opts := defaultServerOptions
  369. for _, o := range opt {
  370. o.apply(&opts)
  371. }
  372. s := &Server{
  373. lis: make(map[net.Listener]bool),
  374. opts: opts,
  375. conns: make(map[transport.ServerTransport]bool),
  376. m: make(map[string]*service),
  377. quit: grpcsync.NewEvent(),
  378. done: grpcsync.NewEvent(),
  379. czData: new(channelzData),
  380. }
  381. chainUnaryServerInterceptors(s)
  382. chainStreamServerInterceptors(s)
  383. s.cv = sync.NewCond(&s.mu)
  384. if EnableTracing {
  385. _, file, line, _ := runtime.Caller(1)
  386. s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
  387. }
  388. if channelz.IsOn() {
  389. s.channelzID = channelz.RegisterServer(&channelzServer{s}, "")
  390. }
  391. return s
  392. }
  393. // printf records an event in s's event log, unless s has been stopped.
  394. // REQUIRES s.mu is held.
  395. func (s *Server) printf(format string, a ...interface{}) {
  396. if s.events != nil {
  397. s.events.Printf(format, a...)
  398. }
  399. }
  400. // errorf records an error in s's event log, unless s has been stopped.
  401. // REQUIRES s.mu is held.
  402. func (s *Server) errorf(format string, a ...interface{}) {
  403. if s.events != nil {
  404. s.events.Errorf(format, a...)
  405. }
  406. }
  407. // RegisterService registers a service and its implementation to the gRPC
  408. // server. It is called from the IDL generated code. This must be called before
  409. // invoking Serve.
  410. func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
  411. ht := reflect.TypeOf(sd.HandlerType).Elem()
  412. st := reflect.TypeOf(ss)
  413. if !st.Implements(ht) {
  414. grpclog.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
  415. }
  416. s.register(sd, ss)
  417. }
  418. func (s *Server) register(sd *ServiceDesc, ss interface{}) {
  419. s.mu.Lock()
  420. defer s.mu.Unlock()
  421. s.printf("RegisterService(%q)", sd.ServiceName)
  422. if s.serve {
  423. grpclog.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
  424. }
  425. if _, ok := s.m[sd.ServiceName]; ok {
  426. grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
  427. }
  428. srv := &service{
  429. server: ss,
  430. md: make(map[string]*MethodDesc),
  431. sd: make(map[string]*StreamDesc),
  432. mdata: sd.Metadata,
  433. }
  434. for i := range sd.Methods {
  435. d := &sd.Methods[i]
  436. srv.md[d.MethodName] = d
  437. }
  438. for i := range sd.Streams {
  439. d := &sd.Streams[i]
  440. srv.sd[d.StreamName] = d
  441. }
  442. s.m[sd.ServiceName] = srv
  443. }
  444. // MethodInfo contains the information of an RPC including its method name and type.
  445. type MethodInfo struct {
  446. // Name is the method name only, without the service name or package name.
  447. Name string
  448. // IsClientStream indicates whether the RPC is a client streaming RPC.
  449. IsClientStream bool
  450. // IsServerStream indicates whether the RPC is a server streaming RPC.
  451. IsServerStream bool
  452. }
  453. // ServiceInfo contains unary RPC method info, streaming RPC method info and metadata for a service.
  454. type ServiceInfo struct {
  455. Methods []MethodInfo
  456. // Metadata is the metadata specified in ServiceDesc when registering service.
  457. Metadata interface{}
  458. }
  459. // GetServiceInfo returns a map from service names to ServiceInfo.
  460. // Service names include the package names, in the form of <package>.<service>.
  461. func (s *Server) GetServiceInfo() map[string]ServiceInfo {
  462. ret := make(map[string]ServiceInfo)
  463. for n, srv := range s.m {
  464. methods := make([]MethodInfo, 0, len(srv.md)+len(srv.sd))
  465. for m := range srv.md {
  466. methods = append(methods, MethodInfo{
  467. Name: m,
  468. IsClientStream: false,
  469. IsServerStream: false,
  470. })
  471. }
  472. for m, d := range srv.sd {
  473. methods = append(methods, MethodInfo{
  474. Name: m,
  475. IsClientStream: d.ClientStreams,
  476. IsServerStream: d.ServerStreams,
  477. })
  478. }
  479. ret[n] = ServiceInfo{
  480. Methods: methods,
  481. Metadata: srv.mdata,
  482. }
  483. }
  484. return ret
  485. }
  486. // ErrServerStopped indicates that the operation is now illegal because of
  487. // the server being stopped.
  488. var ErrServerStopped = errors.New("grpc: the server has been stopped")
  489. func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
  490. if s.opts.creds == nil {
  491. return rawConn, nil, nil
  492. }
  493. return s.opts.creds.ServerHandshake(rawConn)
  494. }
  495. type listenSocket struct {
  496. net.Listener
  497. channelzID int64
  498. }
  499. func (l *listenSocket) ChannelzMetric() *channelz.SocketInternalMetric {
  500. return &channelz.SocketInternalMetric{
  501. SocketOptions: channelz.GetSocketOption(l.Listener),
  502. LocalAddr: l.Listener.Addr(),
  503. }
  504. }
  505. func (l *listenSocket) Close() error {
  506. err := l.Listener.Close()
  507. if channelz.IsOn() {
  508. channelz.RemoveEntry(l.channelzID)
  509. }
  510. return err
  511. }
  512. // Serve accepts incoming connections on the listener lis, creating a new
  513. // ServerTransport and service goroutine for each. The service goroutines
  514. // read gRPC requests and then call the registered handlers to reply to them.
  515. // Serve returns when lis.Accept fails with fatal errors. lis will be closed when
  516. // this method returns.
  517. // Serve will return a non-nil error unless Stop or GracefulStop is called.
  518. func (s *Server) Serve(lis net.Listener) error {
  519. s.mu.Lock()
  520. s.printf("serving")
  521. s.serve = true
  522. if s.lis == nil {
  523. // Serve called after Stop or GracefulStop.
  524. s.mu.Unlock()
  525. lis.Close()
  526. return ErrServerStopped
  527. }
  528. s.serveWG.Add(1)
  529. defer func() {
  530. s.serveWG.Done()
  531. if s.quit.HasFired() {
  532. // Stop or GracefulStop called; block until done and return nil.
  533. <-s.done.Done()
  534. }
  535. }()
  536. ls := &listenSocket{Listener: lis}
  537. s.lis[ls] = true
  538. if channelz.IsOn() {
  539. ls.channelzID = channelz.RegisterListenSocket(ls, s.channelzID, lis.Addr().String())
  540. }
  541. s.mu.Unlock()
  542. defer func() {
  543. s.mu.Lock()
  544. if s.lis != nil && s.lis[ls] {
  545. ls.Close()
  546. delete(s.lis, ls)
  547. }
  548. s.mu.Unlock()
  549. }()
  550. var tempDelay time.Duration // how long to sleep on accept failure
  551. for {
  552. rawConn, err := lis.Accept()
  553. if err != nil {
  554. if ne, ok := err.(interface {
  555. Temporary() bool
  556. }); ok && ne.Temporary() {
  557. if tempDelay == 0 {
  558. tempDelay = 5 * time.Millisecond
  559. } else {
  560. tempDelay *= 2
  561. }
  562. if max := 1 * time.Second; tempDelay > max {
  563. tempDelay = max
  564. }
  565. s.mu.Lock()
  566. s.printf("Accept error: %v; retrying in %v", err, tempDelay)
  567. s.mu.Unlock()
  568. timer := time.NewTimer(tempDelay)
  569. select {
  570. case <-timer.C:
  571. case <-s.quit.Done():
  572. timer.Stop()
  573. return nil
  574. }
  575. continue
  576. }
  577. s.mu.Lock()
  578. s.printf("done serving; Accept = %v", err)
  579. s.mu.Unlock()
  580. if s.quit.HasFired() {
  581. return nil
  582. }
  583. return err
  584. }
  585. tempDelay = 0
  586. // Start a new goroutine to deal with rawConn so we don't stall this Accept
  587. // loop goroutine.
  588. //
  589. // Make sure we account for the goroutine so GracefulStop doesn't nil out
  590. // s.conns before this conn can be added.
  591. s.serveWG.Add(1)
  592. go func() {
  593. s.handleRawConn(rawConn)
  594. s.serveWG.Done()
  595. }()
  596. }
  597. }
  598. // handleRawConn forks a goroutine to handle a just-accepted connection that
  599. // has not had any I/O performed on it yet.
  600. func (s *Server) handleRawConn(rawConn net.Conn) {
  601. if s.quit.HasFired() {
  602. rawConn.Close()
  603. return
  604. }
  605. rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
  606. conn, authInfo, err := s.useTransportAuthenticator(rawConn)
  607. if err != nil {
  608. // ErrConnDispatched means that the connection was dispatched away from
  609. // gRPC; those connections should be left open.
  610. if err != credentials.ErrConnDispatched {
  611. s.mu.Lock()
  612. s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
  613. s.mu.Unlock()
  614. channelz.Warningf(s.channelzID, "grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
  615. rawConn.Close()
  616. }
  617. rawConn.SetDeadline(time.Time{})
  618. return
  619. }
  620. // Finish handshaking (HTTP2)
  621. st := s.newHTTP2Transport(conn, authInfo)
  622. if st == nil {
  623. return
  624. }
  625. rawConn.SetDeadline(time.Time{})
  626. if !s.addConn(st) {
  627. return
  628. }
  629. go func() {
  630. s.serveStreams(st)
  631. s.removeConn(st)
  632. }()
  633. }
  634. // newHTTP2Transport sets up a http/2 transport (using the
  635. // gRPC http2 server transport in transport/http2_server.go).
  636. func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) transport.ServerTransport {
  637. config := &transport.ServerConfig{
  638. MaxStreams: s.opts.maxConcurrentStreams,
  639. AuthInfo: authInfo,
  640. InTapHandle: s.opts.inTapHandle,
  641. StatsHandler: s.opts.statsHandler,
  642. KeepaliveParams: s.opts.keepaliveParams,
  643. KeepalivePolicy: s.opts.keepalivePolicy,
  644. InitialWindowSize: s.opts.initialWindowSize,
  645. InitialConnWindowSize: s.opts.initialConnWindowSize,
  646. WriteBufferSize: s.opts.writeBufferSize,
  647. ReadBufferSize: s.opts.readBufferSize,
  648. ChannelzParentID: s.channelzID,
  649. MaxHeaderListSize: s.opts.maxHeaderListSize,
  650. HeaderTableSize: s.opts.headerTableSize,
  651. }
  652. st, err := transport.NewServerTransport("http2", c, config)
  653. if err != nil {
  654. s.mu.Lock()
  655. s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
  656. s.mu.Unlock()
  657. c.Close()
  658. channelz.Warning(s.channelzID, "grpc: Server.Serve failed to create ServerTransport: ", err)
  659. return nil
  660. }
  661. return st
  662. }
  663. func (s *Server) serveStreams(st transport.ServerTransport) {
  664. defer st.Close()
  665. var wg sync.WaitGroup
  666. st.HandleStreams(func(stream *transport.Stream) {
  667. wg.Add(1)
  668. go func() {
  669. defer wg.Done()
  670. s.handleStream(st, stream, s.traceInfo(st, stream))
  671. }()
  672. }, func(ctx context.Context, method string) context.Context {
  673. if !EnableTracing {
  674. return ctx
  675. }
  676. tr := trace.New("grpc.Recv."+methodFamily(method), method)
  677. return trace.NewContext(ctx, tr)
  678. })
  679. wg.Wait()
  680. }
  681. var _ http.Handler = (*Server)(nil)
  682. // ServeHTTP implements the Go standard library's http.Handler
  683. // interface by responding to the gRPC request r, by looking up
  684. // the requested gRPC method in the gRPC server s.
  685. //
  686. // The provided HTTP request must have arrived on an HTTP/2
  687. // connection. When using the Go standard library's server,
  688. // practically this means that the Request must also have arrived
  689. // over TLS.
  690. //
  691. // To share one port (such as 443 for https) between gRPC and an
  692. // existing http.Handler, use a root http.Handler such as:
  693. //
  694. // if r.ProtoMajor == 2 && strings.HasPrefix(
  695. // r.Header.Get("Content-Type"), "application/grpc") {
  696. // grpcServer.ServeHTTP(w, r)
  697. // } else {
  698. // yourMux.ServeHTTP(w, r)
  699. // }
  700. //
  701. // Note that ServeHTTP uses Go's HTTP/2 server implementation which is totally
  702. // separate from grpc-go's HTTP/2 server. Performance and features may vary
  703. // between the two paths. ServeHTTP does not support some gRPC features
  704. // available through grpc-go's HTTP/2 server, and it is currently EXPERIMENTAL
  705. // and subject to change.
  706. func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  707. st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandler)
  708. if err != nil {
  709. http.Error(w, err.Error(), http.StatusInternalServerError)
  710. return
  711. }
  712. if !s.addConn(st) {
  713. return
  714. }
  715. defer s.removeConn(st)
  716. s.serveStreams(st)
  717. }
  718. // traceInfo returns a traceInfo and associates it with stream, if tracing is enabled.
  719. // If tracing is not enabled, it returns nil.
  720. func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) {
  721. if !EnableTracing {
  722. return nil
  723. }
  724. tr, ok := trace.FromContext(stream.Context())
  725. if !ok {
  726. return nil
  727. }
  728. trInfo = &traceInfo{
  729. tr: tr,
  730. firstLine: firstLine{
  731. client: false,
  732. remoteAddr: st.RemoteAddr(),
  733. },
  734. }
  735. if dl, ok := stream.Context().Deadline(); ok {
  736. trInfo.firstLine.deadline = time.Until(dl)
  737. }
  738. return trInfo
  739. }
  740. func (s *Server) addConn(st transport.ServerTransport) bool {
  741. s.mu.Lock()
  742. defer s.mu.Unlock()
  743. if s.conns == nil {
  744. st.Close()
  745. return false
  746. }
  747. if s.drain {
  748. // Transport added after we drained our existing conns: drain it
  749. // immediately.
  750. st.Drain()
  751. }
  752. s.conns[st] = true
  753. return true
  754. }
  755. func (s *Server) removeConn(st transport.ServerTransport) {
  756. s.mu.Lock()
  757. defer s.mu.Unlock()
  758. if s.conns != nil {
  759. delete(s.conns, st)
  760. s.cv.Broadcast()
  761. }
  762. }
  763. func (s *Server) channelzMetric() *channelz.ServerInternalMetric {
  764. return &channelz.ServerInternalMetric{
  765. CallsStarted: atomic.LoadInt64(&s.czData.callsStarted),
  766. CallsSucceeded: atomic.LoadInt64(&s.czData.callsSucceeded),
  767. CallsFailed: atomic.LoadInt64(&s.czData.callsFailed),
  768. LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&s.czData.lastCallStartedTime)),
  769. }
  770. }
  771. func (s *Server) incrCallsStarted() {
  772. atomic.AddInt64(&s.czData.callsStarted, 1)
  773. atomic.StoreInt64(&s.czData.lastCallStartedTime, time.Now().UnixNano())
  774. }
  775. func (s *Server) incrCallsSucceeded() {
  776. atomic.AddInt64(&s.czData.callsSucceeded, 1)
  777. }
  778. func (s *Server) incrCallsFailed() {
  779. atomic.AddInt64(&s.czData.callsFailed, 1)
  780. }
  781. func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {
  782. data, err := encode(s.getCodec(stream.ContentSubtype()), msg)
  783. if err != nil {
  784. channelz.Error(s.channelzID, "grpc: server failed to encode response: ", err)
  785. return err
  786. }
  787. compData, err := compress(data, cp, comp)
  788. if err != nil {
  789. channelz.Error(s.channelzID, "grpc: server failed to compress response: ", err)
  790. return err
  791. }
  792. hdr, payload := msgHeader(data, compData)
  793. // TODO(dfawley): should we be checking len(data) instead?
  794. if len(payload) > s.opts.maxSendMessageSize {
  795. return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(payload), s.opts.maxSendMessageSize)
  796. }
  797. err = t.Write(stream, hdr, payload, opts)
  798. if err == nil && s.opts.statsHandler != nil {
  799. s.opts.statsHandler.HandleRPC(stream.Context(), outPayload(false, msg, data, payload, time.Now()))
  800. }
  801. return err
  802. }
  803. // chainUnaryServerInterceptors chains all unary server interceptors into one.
  804. func chainUnaryServerInterceptors(s *Server) {
  805. // Prepend opts.unaryInt to the chaining interceptors if it exists, since unaryInt will
  806. // be executed before any other chained interceptors.
  807. interceptors := s.opts.chainUnaryInts
  808. if s.opts.unaryInt != nil {
  809. interceptors = append([]UnaryServerInterceptor{s.opts.unaryInt}, s.opts.chainUnaryInts...)
  810. }
  811. var chainedInt UnaryServerInterceptor
  812. if len(interceptors) == 0 {
  813. chainedInt = nil
  814. } else if len(interceptors) == 1 {
  815. chainedInt = interceptors[0]
  816. } else {
  817. chainedInt = func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (interface{}, error) {
  818. return interceptors[0](ctx, req, info, getChainUnaryHandler(interceptors, 0, info, handler))
  819. }
  820. }
  821. s.opts.unaryInt = chainedInt
  822. }
  823. // getChainUnaryHandler recursively generate the chained UnaryHandler
  824. func getChainUnaryHandler(interceptors []UnaryServerInterceptor, curr int, info *UnaryServerInfo, finalHandler UnaryHandler) UnaryHandler {
  825. if curr == len(interceptors)-1 {
  826. return finalHandler
  827. }
  828. return func(ctx context.Context, req interface{}) (interface{}, error) {
  829. return interceptors[curr+1](ctx, req, info, getChainUnaryHandler(interceptors, curr+1, info, finalHandler))
  830. }
  831. }
  832. func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) {
  833. sh := s.opts.statsHandler
  834. if sh != nil || trInfo != nil || channelz.IsOn() {
  835. if channelz.IsOn() {
  836. s.incrCallsStarted()
  837. }
  838. var statsBegin *stats.Begin
  839. if sh != nil {
  840. beginTime := time.Now()
  841. statsBegin = &stats.Begin{
  842. BeginTime: beginTime,
  843. }
  844. sh.HandleRPC(stream.Context(), statsBegin)
  845. }
  846. if trInfo != nil {
  847. trInfo.tr.LazyLog(&trInfo.firstLine, false)
  848. }
  849. // The deferred error handling for tracing, stats handler and channelz are
  850. // combined into one function to reduce stack usage -- a defer takes ~56-64
  851. // bytes on the stack, so overflowing the stack will require a stack
  852. // re-allocation, which is expensive.
  853. //
  854. // To maintain behavior similar to separate deferred statements, statements
  855. // should be executed in the reverse order. That is, tracing first, stats
  856. // handler second, and channelz last. Note that panics *within* defers will
  857. // lead to different behavior, but that's an acceptable compromise; that
  858. // would be undefined behavior territory anyway.
  859. defer func() {
  860. if trInfo != nil {
  861. if err != nil && err != io.EOF {
  862. trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  863. trInfo.tr.SetError()
  864. }
  865. trInfo.tr.Finish()
  866. }
  867. if sh != nil {
  868. end := &stats.End{
  869. BeginTime: statsBegin.BeginTime,
  870. EndTime: time.Now(),
  871. }
  872. if err != nil && err != io.EOF {
  873. end.Error = toRPCErr(err)
  874. }
  875. sh.HandleRPC(stream.Context(), end)
  876. }
  877. if channelz.IsOn() {
  878. if err != nil && err != io.EOF {
  879. s.incrCallsFailed()
  880. } else {
  881. s.incrCallsSucceeded()
  882. }
  883. }
  884. }()
  885. }
  886. binlog := binarylog.GetMethodLogger(stream.Method())
  887. if binlog != nil {
  888. ctx := stream.Context()
  889. md, _ := metadata.FromIncomingContext(ctx)
  890. logEntry := &binarylog.ClientHeader{
  891. Header: md,
  892. MethodName: stream.Method(),
  893. PeerAddr: nil,
  894. }
  895. if deadline, ok := ctx.Deadline(); ok {
  896. logEntry.Timeout = time.Until(deadline)
  897. if logEntry.Timeout < 0 {
  898. logEntry.Timeout = 0
  899. }
  900. }
  901. if a := md[":authority"]; len(a) > 0 {
  902. logEntry.Authority = a[0]
  903. }
  904. if peer, ok := peer.FromContext(ctx); ok {
  905. logEntry.PeerAddr = peer.Addr
  906. }
  907. binlog.Log(logEntry)
  908. }
  909. // comp and cp are used for compression. decomp and dc are used for
  910. // decompression. If comp and decomp are both set, they are the same;
  911. // however they are kept separate to ensure that at most one of the
  912. // compressor/decompressor variable pairs are set for use later.
  913. var comp, decomp encoding.Compressor
  914. var cp Compressor
  915. var dc Decompressor
  916. // If dc is set and matches the stream's compression, use it. Otherwise, try
  917. // to find a matching registered compressor for decomp.
  918. if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
  919. dc = s.opts.dc
  920. } else if rc != "" && rc != encoding.Identity {
  921. decomp = encoding.GetCompressor(rc)
  922. if decomp == nil {
  923. st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
  924. t.WriteStatus(stream, st)
  925. return st.Err()
  926. }
  927. }
  928. // If cp is set, use it. Otherwise, attempt to compress the response using
  929. // the incoming message compression method.
  930. //
  931. // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
  932. if s.opts.cp != nil {
  933. cp = s.opts.cp
  934. stream.SetSendCompress(cp.Type())
  935. } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
  936. // Legacy compressor not specified; attempt to respond with same encoding.
  937. comp = encoding.GetCompressor(rc)
  938. if comp != nil {
  939. stream.SetSendCompress(rc)
  940. }
  941. }
  942. var payInfo *payloadInfo
  943. if sh != nil || binlog != nil {
  944. payInfo = &payloadInfo{}
  945. }
  946. d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
  947. if err != nil {
  948. if st, ok := status.FromError(err); ok {
  949. if e := t.WriteStatus(stream, st); e != nil {
  950. channelz.Warningf(s.channelzID, "grpc: Server.processUnaryRPC failed to write status %v", e)
  951. }
  952. }
  953. return err
  954. }
  955. if channelz.IsOn() {
  956. t.IncrMsgRecv()
  957. }
  958. df := func(v interface{}) error {
  959. if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {
  960. return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
  961. }
  962. if sh != nil {
  963. sh.HandleRPC(stream.Context(), &stats.InPayload{
  964. RecvTime: time.Now(),
  965. Payload: v,
  966. WireLength: payInfo.wireLength,
  967. Data: d,
  968. Length: len(d),
  969. })
  970. }
  971. if binlog != nil {
  972. binlog.Log(&binarylog.ClientMessage{
  973. Message: d,
  974. })
  975. }
  976. if trInfo != nil {
  977. trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
  978. }
  979. return nil
  980. }
  981. ctx := NewContextWithServerTransportStream(stream.Context(), stream)
  982. reply, appErr := md.Handler(srv.server, ctx, df, s.opts.unaryInt)
  983. if appErr != nil {
  984. appStatus, ok := status.FromError(appErr)
  985. if !ok {
  986. // Convert appErr if it is not a grpc status error.
  987. appErr = status.Error(codes.Unknown, appErr.Error())
  988. appStatus, _ = status.FromError(appErr)
  989. }
  990. if trInfo != nil {
  991. trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
  992. trInfo.tr.SetError()
  993. }
  994. if e := t.WriteStatus(stream, appStatus); e != nil {
  995. channelz.Warningf(s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
  996. }
  997. if binlog != nil {
  998. if h, _ := stream.Header(); h.Len() > 0 {
  999. // Only log serverHeader if there was header. Otherwise it can
  1000. // be trailer only.
  1001. binlog.Log(&binarylog.ServerHeader{
  1002. Header: h,
  1003. })
  1004. }
  1005. binlog.Log(&binarylog.ServerTrailer{
  1006. Trailer: stream.Trailer(),
  1007. Err: appErr,
  1008. })
  1009. }
  1010. return appErr
  1011. }
  1012. if trInfo != nil {
  1013. trInfo.tr.LazyLog(stringer("OK"), false)
  1014. }
  1015. opts := &transport.Options{Last: true}
  1016. if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
  1017. if err == io.EOF {
  1018. // The entire stream is done (for unary RPC only).
  1019. return err
  1020. }
  1021. if sts, ok := status.FromError(err); ok {
  1022. if e := t.WriteStatus(stream, sts); e != nil {
  1023. channelz.Warningf(s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
  1024. }
  1025. } else {
  1026. switch st := err.(type) {
  1027. case transport.ConnectionError:
  1028. // Nothing to do here.
  1029. default:
  1030. panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
  1031. }
  1032. }
  1033. if binlog != nil {
  1034. h, _ := stream.Header()
  1035. binlog.Log(&binarylog.ServerHeader{
  1036. Header: h,
  1037. })
  1038. binlog.Log(&binarylog.ServerTrailer{
  1039. Trailer: stream.Trailer(),
  1040. Err: appErr,
  1041. })
  1042. }
  1043. return err
  1044. }
  1045. if binlog != nil {
  1046. h, _ := stream.Header()
  1047. binlog.Log(&binarylog.ServerHeader{
  1048. Header: h,
  1049. })
  1050. binlog.Log(&binarylog.ServerMessage{
  1051. Message: reply,
  1052. })
  1053. }
  1054. if channelz.IsOn() {
  1055. t.IncrMsgSent()
  1056. }
  1057. if trInfo != nil {
  1058. trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
  1059. }
  1060. // TODO: Should we be logging if writing status failed here, like above?
  1061. // Should the logging be in WriteStatus? Should we ignore the WriteStatus
  1062. // error or allow the stats handler to see it?
  1063. err = t.WriteStatus(stream, statusOK)
  1064. if binlog != nil {
  1065. binlog.Log(&binarylog.ServerTrailer{
  1066. Trailer: stream.Trailer(),
  1067. Err: appErr,
  1068. })
  1069. }
  1070. return err
  1071. }
  1072. // chainStreamServerInterceptors chains all stream server interceptors into one.
  1073. func chainStreamServerInterceptors(s *Server) {
  1074. // Prepend opts.streamInt to the chaining interceptors if it exists, since streamInt will
  1075. // be executed before any other chained interceptors.
  1076. interceptors := s.opts.chainStreamInts
  1077. if s.opts.streamInt != nil {
  1078. interceptors = append([]StreamServerInterceptor{s.opts.streamInt}, s.opts.chainStreamInts...)
  1079. }
  1080. var chainedInt StreamServerInterceptor
  1081. if len(interceptors) == 0 {
  1082. chainedInt = nil
  1083. } else if len(interceptors) == 1 {
  1084. chainedInt = interceptors[0]
  1085. } else {
  1086. chainedInt = func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error {
  1087. return interceptors[0](srv, ss, info, getChainStreamHandler(interceptors, 0, info, handler))
  1088. }
  1089. }
  1090. s.opts.streamInt = chainedInt
  1091. }
  1092. // getChainStreamHandler recursively generate the chained StreamHandler
  1093. func getChainStreamHandler(interceptors []StreamServerInterceptor, curr int, info *StreamServerInfo, finalHandler StreamHandler) StreamHandler {
  1094. if curr == len(interceptors)-1 {
  1095. return finalHandler
  1096. }
  1097. return func(srv interface{}, ss ServerStream) error {
  1098. return interceptors[curr+1](srv, ss, info, getChainStreamHandler(interceptors, curr+1, info, finalHandler))
  1099. }
  1100. }
  1101. func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) {
  1102. if channelz.IsOn() {
  1103. s.incrCallsStarted()
  1104. }
  1105. sh := s.opts.statsHandler
  1106. var statsBegin *stats.Begin
  1107. if sh != nil {
  1108. beginTime := time.Now()
  1109. statsBegin = &stats.Begin{
  1110. BeginTime: beginTime,
  1111. }
  1112. sh.HandleRPC(stream.Context(), statsBegin)
  1113. }
  1114. ctx := NewContextWithServerTransportStream(stream.Context(), stream)
  1115. ss := &serverStream{
  1116. ctx: ctx,
  1117. t: t,
  1118. s: stream,
  1119. p: &parser{r: stream},
  1120. codec: s.getCodec(stream.ContentSubtype()),
  1121. maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
  1122. maxSendMessageSize: s.opts.maxSendMessageSize,
  1123. trInfo: trInfo,
  1124. statsHandler: sh,
  1125. }
  1126. if sh != nil || trInfo != nil || channelz.IsOn() {
  1127. // See comment in processUnaryRPC on defers.
  1128. defer func() {
  1129. if trInfo != nil {
  1130. ss.mu.Lock()
  1131. if err != nil && err != io.EOF {
  1132. ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  1133. ss.trInfo.tr.SetError()
  1134. }
  1135. ss.trInfo.tr.Finish()
  1136. ss.trInfo.tr = nil
  1137. ss.mu.Unlock()
  1138. }
  1139. if sh != nil {
  1140. end := &stats.End{
  1141. BeginTime: statsBegin.BeginTime,
  1142. EndTime: time.Now(),
  1143. }
  1144. if err != nil && err != io.EOF {
  1145. end.Error = toRPCErr(err)
  1146. }
  1147. sh.HandleRPC(stream.Context(), end)
  1148. }
  1149. if channelz.IsOn() {
  1150. if err != nil && err != io.EOF {
  1151. s.incrCallsFailed()
  1152. } else {
  1153. s.incrCallsSucceeded()
  1154. }
  1155. }
  1156. }()
  1157. }
  1158. ss.binlog = binarylog.GetMethodLogger(stream.Method())
  1159. if ss.binlog != nil {
  1160. md, _ := metadata.FromIncomingContext(ctx)
  1161. logEntry := &binarylog.ClientHeader{
  1162. Header: md,
  1163. MethodName: stream.Method(),
  1164. PeerAddr: nil,
  1165. }
  1166. if deadline, ok := ctx.Deadline(); ok {
  1167. logEntry.Timeout = time.Until(deadline)
  1168. if logEntry.Timeout < 0 {
  1169. logEntry.Timeout = 0
  1170. }
  1171. }
  1172. if a := md[":authority"]; len(a) > 0 {
  1173. logEntry.Authority = a[0]
  1174. }
  1175. if peer, ok := peer.FromContext(ss.Context()); ok {
  1176. logEntry.PeerAddr = peer.Addr
  1177. }
  1178. ss.binlog.Log(logEntry)
  1179. }
  1180. // If dc is set and matches the stream's compression, use it. Otherwise, try
  1181. // to find a matching registered compressor for decomp.
  1182. if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
  1183. ss.dc = s.opts.dc
  1184. } else if rc != "" && rc != encoding.Identity {
  1185. ss.decomp = encoding.GetCompressor(rc)
  1186. if ss.decomp == nil {
  1187. st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
  1188. t.WriteStatus(ss.s, st)
  1189. return st.Err()
  1190. }
  1191. }
  1192. // If cp is set, use it. Otherwise, attempt to compress the response using
  1193. // the incoming message compression method.
  1194. //
  1195. // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
  1196. if s.opts.cp != nil {
  1197. ss.cp = s.opts.cp
  1198. stream.SetSendCompress(s.opts.cp.Type())
  1199. } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
  1200. // Legacy compressor not specified; attempt to respond with same encoding.
  1201. ss.comp = encoding.GetCompressor(rc)
  1202. if ss.comp != nil {
  1203. stream.SetSendCompress(rc)
  1204. }
  1205. }
  1206. if trInfo != nil {
  1207. trInfo.tr.LazyLog(&trInfo.firstLine, false)
  1208. }
  1209. var appErr error
  1210. var server interface{}
  1211. if srv != nil {
  1212. server = srv.server
  1213. }
  1214. if s.opts.streamInt == nil {
  1215. appErr = sd.Handler(server, ss)
  1216. } else {
  1217. info := &StreamServerInfo{
  1218. FullMethod: stream.Method(),
  1219. IsClientStream: sd.ClientStreams,
  1220. IsServerStream: sd.ServerStreams,
  1221. }
  1222. appErr = s.opts.streamInt(server, ss, info, sd.Handler)
  1223. }
  1224. if appErr != nil {
  1225. appStatus, ok := status.FromError(appErr)
  1226. if !ok {
  1227. appStatus = status.New(codes.Unknown, appErr.Error())
  1228. appErr = appStatus.Err()
  1229. }
  1230. if trInfo != nil {
  1231. ss.mu.Lock()
  1232. ss.trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
  1233. ss.trInfo.tr.SetError()
  1234. ss.mu.Unlock()
  1235. }
  1236. t.WriteStatus(ss.s, appStatus)
  1237. if ss.binlog != nil {
  1238. ss.binlog.Log(&binarylog.ServerTrailer{
  1239. Trailer: ss.s.Trailer(),
  1240. Err: appErr,
  1241. })
  1242. }
  1243. // TODO: Should we log an error from WriteStatus here and below?
  1244. return appErr
  1245. }
  1246. if trInfo != nil {
  1247. ss.mu.Lock()
  1248. ss.trInfo.tr.LazyLog(stringer("OK"), false)
  1249. ss.mu.Unlock()
  1250. }
  1251. err = t.WriteStatus(ss.s, statusOK)
  1252. if ss.binlog != nil {
  1253. ss.binlog.Log(&binarylog.ServerTrailer{
  1254. Trailer: ss.s.Trailer(),
  1255. Err: appErr,
  1256. })
  1257. }
  1258. return err
  1259. }
  1260. func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
  1261. sm := stream.Method()
  1262. if sm != "" && sm[0] == '/' {
  1263. sm = sm[1:]
  1264. }
  1265. pos := strings.LastIndex(sm, "/")
  1266. if pos == -1 {
  1267. if trInfo != nil {
  1268. trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true)
  1269. trInfo.tr.SetError()
  1270. }
  1271. errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
  1272. if err := t.WriteStatus(stream, status.New(codes.ResourceExhausted, errDesc)); err != nil {
  1273. if trInfo != nil {
  1274. trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  1275. trInfo.tr.SetError()
  1276. }
  1277. channelz.Warningf(s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
  1278. }
  1279. if trInfo != nil {
  1280. trInfo.tr.Finish()
  1281. }
  1282. return
  1283. }
  1284. service := sm[:pos]
  1285. method := sm[pos+1:]
  1286. srv, knownService := s.m[service]
  1287. if knownService {
  1288. if md, ok := srv.md[method]; ok {
  1289. s.processUnaryRPC(t, stream, srv, md, trInfo)
  1290. return
  1291. }
  1292. if sd, ok := srv.sd[method]; ok {
  1293. s.processStreamingRPC(t, stream, srv, sd, trInfo)
  1294. return
  1295. }
  1296. }
  1297. // Unknown service, or known server unknown method.
  1298. if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
  1299. s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
  1300. return
  1301. }
  1302. var errDesc string
  1303. if !knownService {
  1304. errDesc = fmt.Sprintf("unknown service %v", service)
  1305. } else {
  1306. errDesc = fmt.Sprintf("unknown method %v for service %v", method, service)
  1307. }
  1308. if trInfo != nil {
  1309. trInfo.tr.LazyPrintf("%s", errDesc)
  1310. trInfo.tr.SetError()
  1311. }
  1312. if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
  1313. if trInfo != nil {
  1314. trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  1315. trInfo.tr.SetError()
  1316. }
  1317. channelz.Warningf(s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
  1318. }
  1319. if trInfo != nil {
  1320. trInfo.tr.Finish()
  1321. }
  1322. }
  1323. // The key to save ServerTransportStream in the context.
  1324. type streamKey struct{}
  1325. // NewContextWithServerTransportStream creates a new context from ctx and
  1326. // attaches stream to it.
  1327. //
  1328. // This API is EXPERIMENTAL.
  1329. func NewContextWithServerTransportStream(ctx context.Context, stream ServerTransportStream) context.Context {
  1330. return context.WithValue(ctx, streamKey{}, stream)
  1331. }
  1332. // ServerTransportStream is a minimal interface that a transport stream must
  1333. // implement. This can be used to mock an actual transport stream for tests of
  1334. // handler code that use, for example, grpc.SetHeader (which requires some
  1335. // stream to be in context).
  1336. //
  1337. // See also NewContextWithServerTransportStream.
  1338. //
  1339. // This API is EXPERIMENTAL.
  1340. type ServerTransportStream interface {
  1341. Method() string
  1342. SetHeader(md metadata.MD) error
  1343. SendHeader(md metadata.MD) error
  1344. SetTrailer(md metadata.MD) error
  1345. }
  1346. // ServerTransportStreamFromContext returns the ServerTransportStream saved in
  1347. // ctx. Returns nil if the given context has no stream associated with it
  1348. // (which implies it is not an RPC invocation context).
  1349. //
  1350. // This API is EXPERIMENTAL.
  1351. func ServerTransportStreamFromContext(ctx context.Context) ServerTransportStream {
  1352. s, _ := ctx.Value(streamKey{}).(ServerTransportStream)
  1353. return s
  1354. }
  1355. // Stop stops the gRPC server. It immediately closes all open
  1356. // connections and listeners.
  1357. // It cancels all active RPCs on the server side and the corresponding
  1358. // pending RPCs on the client side will get notified by connection
  1359. // errors.
  1360. func (s *Server) Stop() {
  1361. s.quit.Fire()
  1362. defer func() {
  1363. s.serveWG.Wait()
  1364. s.done.Fire()
  1365. }()
  1366. s.channelzRemoveOnce.Do(func() {
  1367. if channelz.IsOn() {
  1368. channelz.RemoveEntry(s.channelzID)
  1369. }
  1370. })
  1371. s.mu.Lock()
  1372. listeners := s.lis
  1373. s.lis = nil
  1374. st := s.conns
  1375. s.conns = nil
  1376. // interrupt GracefulStop if Stop and GracefulStop are called concurrently.
  1377. s.cv.Broadcast()
  1378. s.mu.Unlock()
  1379. for lis := range listeners {
  1380. lis.Close()
  1381. }
  1382. for c := range st {
  1383. c.Close()
  1384. }
  1385. s.mu.Lock()
  1386. if s.events != nil {
  1387. s.events.Finish()
  1388. s.events = nil
  1389. }
  1390. s.mu.Unlock()
  1391. }
  1392. // GracefulStop stops the gRPC server gracefully. It stops the server from
  1393. // accepting new connections and RPCs and blocks until all the pending RPCs are
  1394. // finished.
  1395. func (s *Server) GracefulStop() {
  1396. s.quit.Fire()
  1397. defer s.done.Fire()
  1398. s.channelzRemoveOnce.Do(func() {
  1399. if channelz.IsOn() {
  1400. channelz.RemoveEntry(s.channelzID)
  1401. }
  1402. })
  1403. s.mu.Lock()
  1404. if s.conns == nil {
  1405. s.mu.Unlock()
  1406. return
  1407. }
  1408. for lis := range s.lis {
  1409. lis.Close()
  1410. }
  1411. s.lis = nil
  1412. if !s.drain {
  1413. for st := range s.conns {
  1414. st.Drain()
  1415. }
  1416. s.drain = true
  1417. }
  1418. // Wait for serving threads to be ready to exit. Only then can we be sure no
  1419. // new conns will be created.
  1420. s.mu.Unlock()
  1421. s.serveWG.Wait()
  1422. s.mu.Lock()
  1423. for len(s.conns) != 0 {
  1424. s.cv.Wait()
  1425. }
  1426. s.conns = nil
  1427. if s.events != nil {
  1428. s.events.Finish()
  1429. s.events = nil
  1430. }
  1431. s.mu.Unlock()
  1432. }
  1433. // contentSubtype must be lowercase
  1434. // cannot return nil
  1435. func (s *Server) getCodec(contentSubtype string) baseCodec {
  1436. if s.opts.codec != nil {
  1437. return s.opts.codec
  1438. }
  1439. if contentSubtype == "" {
  1440. return encoding.GetCodec(proto.Name)
  1441. }
  1442. codec := encoding.GetCodec(contentSubtype)
  1443. if codec == nil {
  1444. return encoding.GetCodec(proto.Name)
  1445. }
  1446. return codec
  1447. }
  1448. // SetHeader sets the header metadata.
  1449. // When called multiple times, all the provided metadata will be merged.
  1450. // All the metadata will be sent out when one of the following happens:
  1451. // - grpc.SendHeader() is called;
  1452. // - The first response is sent out;
  1453. // - An RPC status is sent out (error or success).
  1454. func SetHeader(ctx context.Context, md metadata.MD) error {
  1455. if md.Len() == 0 {
  1456. return nil
  1457. }
  1458. stream := ServerTransportStreamFromContext(ctx)
  1459. if stream == nil {
  1460. return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
  1461. }
  1462. return stream.SetHeader(md)
  1463. }
  1464. // SendHeader sends header metadata. It may be called at most once.
  1465. // The provided md and headers set by SetHeader() will be sent.
  1466. func SendHeader(ctx context.Context, md metadata.MD) error {
  1467. stream := ServerTransportStreamFromContext(ctx)
  1468. if stream == nil {
  1469. return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
  1470. }
  1471. if err := stream.SendHeader(md); err != nil {
  1472. return toRPCErr(err)
  1473. }
  1474. return nil
  1475. }
  1476. // SetTrailer sets the trailer metadata that will be sent when an RPC returns.
  1477. // When called more than once, all the provided metadata will be merged.
  1478. func SetTrailer(ctx context.Context, md metadata.MD) error {
  1479. if md.Len() == 0 {
  1480. return nil
  1481. }
  1482. stream := ServerTransportStreamFromContext(ctx)
  1483. if stream == nil {
  1484. return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
  1485. }
  1486. return stream.SetTrailer(md)
  1487. }
  1488. // Method returns the method string for the server context. The returned
  1489. // string is in the format of "/service/method".
  1490. func Method(ctx context.Context) (string, bool) {
  1491. s := ServerTransportStreamFromContext(ctx)
  1492. if s == nil {
  1493. return "", false
  1494. }
  1495. return s.Method(), true
  1496. }
  1497. type channelzServer struct {
  1498. s *Server
  1499. }
  1500. func (c *channelzServer) ChannelzMetric() *channelz.ServerInternalMetric {
  1501. return c.s.channelzMetric()
  1502. }