test_utils.go 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838
  1. /*
  2. *
  3. * Copyright 2014 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. //go:generate protoc --go_out=plugins=grpc:. grpc_testing/test.proto
  19. package interop
  20. import (
  21. "context"
  22. "fmt"
  23. "io"
  24. "io/ioutil"
  25. "strings"
  26. "time"
  27. "github.com/golang/protobuf/proto"
  28. "golang.org/x/oauth2"
  29. "golang.org/x/oauth2/google"
  30. "google.golang.org/grpc"
  31. "google.golang.org/grpc/codes"
  32. "google.golang.org/grpc/grpclog"
  33. testpb "google.golang.org/grpc/interop/grpc_testing"
  34. "google.golang.org/grpc/metadata"
  35. "google.golang.org/grpc/status"
  36. )
  37. var (
  38. reqSizes = []int{27182, 8, 1828, 45904}
  39. respSizes = []int{31415, 9, 2653, 58979}
  40. largeReqSize = 271828
  41. largeRespSize = 314159
  42. initialMetadataKey = "x-grpc-test-echo-initial"
  43. trailingMetadataKey = "x-grpc-test-echo-trailing-bin"
  44. )
  45. // ClientNewPayload returns a payload of the given type and size.
  46. func ClientNewPayload(t testpb.PayloadType, size int) *testpb.Payload {
  47. if size < 0 {
  48. grpclog.Fatalf("Requested a response with invalid length %d", size)
  49. }
  50. body := make([]byte, size)
  51. switch t {
  52. case testpb.PayloadType_COMPRESSABLE:
  53. case testpb.PayloadType_UNCOMPRESSABLE:
  54. grpclog.Fatalf("PayloadType UNCOMPRESSABLE is not supported")
  55. default:
  56. grpclog.Fatalf("Unsupported payload type: %d", t)
  57. }
  58. return &testpb.Payload{
  59. Type: t,
  60. Body: body,
  61. }
  62. }
  63. // DoEmptyUnaryCall performs a unary RPC with empty request and response messages.
  64. func DoEmptyUnaryCall(tc testpb.TestServiceClient, args ...grpc.CallOption) {
  65. reply, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, args...)
  66. if err != nil {
  67. grpclog.Fatal("/TestService/EmptyCall RPC failed: ", err)
  68. }
  69. if !proto.Equal(&testpb.Empty{}, reply) {
  70. grpclog.Fatalf("/TestService/EmptyCall receives %v, want %v", reply, testpb.Empty{})
  71. }
  72. }
  73. // DoLargeUnaryCall performs a unary RPC with large payload in the request and response.
  74. func DoLargeUnaryCall(tc testpb.TestServiceClient, args ...grpc.CallOption) {
  75. pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
  76. req := &testpb.SimpleRequest{
  77. ResponseType: testpb.PayloadType_COMPRESSABLE,
  78. ResponseSize: int32(largeRespSize),
  79. Payload: pl,
  80. }
  81. reply, err := tc.UnaryCall(context.Background(), req, args...)
  82. if err != nil {
  83. grpclog.Fatal("/TestService/UnaryCall RPC failed: ", err)
  84. }
  85. t := reply.GetPayload().GetType()
  86. s := len(reply.GetPayload().GetBody())
  87. if t != testpb.PayloadType_COMPRESSABLE || s != largeRespSize {
  88. grpclog.Fatalf("Got the reply with type %d len %d; want %d, %d", t, s, testpb.PayloadType_COMPRESSABLE, largeRespSize)
  89. }
  90. }
  91. // DoClientStreaming performs a client streaming RPC.
  92. func DoClientStreaming(tc testpb.TestServiceClient, args ...grpc.CallOption) {
  93. stream, err := tc.StreamingInputCall(context.Background(), args...)
  94. if err != nil {
  95. grpclog.Fatalf("%v.StreamingInputCall(_) = _, %v", tc, err)
  96. }
  97. var sum int
  98. for _, s := range reqSizes {
  99. pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, s)
  100. req := &testpb.StreamingInputCallRequest{
  101. Payload: pl,
  102. }
  103. if err := stream.Send(req); err != nil {
  104. grpclog.Fatalf("%v has error %v while sending %v", stream, err, req)
  105. }
  106. sum += s
  107. }
  108. reply, err := stream.CloseAndRecv()
  109. if err != nil {
  110. grpclog.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil)
  111. }
  112. if reply.GetAggregatedPayloadSize() != int32(sum) {
  113. grpclog.Fatalf("%v.CloseAndRecv().GetAggregatePayloadSize() = %v; want %v", stream, reply.GetAggregatedPayloadSize(), sum)
  114. }
  115. }
  116. // DoServerStreaming performs a server streaming RPC.
  117. func DoServerStreaming(tc testpb.TestServiceClient, args ...grpc.CallOption) {
  118. respParam := make([]*testpb.ResponseParameters, len(respSizes))
  119. for i, s := range respSizes {
  120. respParam[i] = &testpb.ResponseParameters{
  121. Size: int32(s),
  122. }
  123. }
  124. req := &testpb.StreamingOutputCallRequest{
  125. ResponseType: testpb.PayloadType_COMPRESSABLE,
  126. ResponseParameters: respParam,
  127. }
  128. stream, err := tc.StreamingOutputCall(context.Background(), req, args...)
  129. if err != nil {
  130. grpclog.Fatalf("%v.StreamingOutputCall(_) = _, %v", tc, err)
  131. }
  132. var rpcStatus error
  133. var respCnt int
  134. var index int
  135. for {
  136. reply, err := stream.Recv()
  137. if err != nil {
  138. rpcStatus = err
  139. break
  140. }
  141. t := reply.GetPayload().GetType()
  142. if t != testpb.PayloadType_COMPRESSABLE {
  143. grpclog.Fatalf("Got the reply of type %d, want %d", t, testpb.PayloadType_COMPRESSABLE)
  144. }
  145. size := len(reply.GetPayload().GetBody())
  146. if size != respSizes[index] {
  147. grpclog.Fatalf("Got reply body of length %d, want %d", size, respSizes[index])
  148. }
  149. index++
  150. respCnt++
  151. }
  152. if rpcStatus != io.EOF {
  153. grpclog.Fatalf("Failed to finish the server streaming rpc: %v", rpcStatus)
  154. }
  155. if respCnt != len(respSizes) {
  156. grpclog.Fatalf("Got %d reply, want %d", len(respSizes), respCnt)
  157. }
  158. }
  159. // DoPingPong performs ping-pong style bi-directional streaming RPC.
  160. func DoPingPong(tc testpb.TestServiceClient, args ...grpc.CallOption) {
  161. stream, err := tc.FullDuplexCall(context.Background(), args...)
  162. if err != nil {
  163. grpclog.Fatalf("%v.FullDuplexCall(_) = _, %v", tc, err)
  164. }
  165. var index int
  166. for index < len(reqSizes) {
  167. respParam := []*testpb.ResponseParameters{
  168. {
  169. Size: int32(respSizes[index]),
  170. },
  171. }
  172. pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, reqSizes[index])
  173. req := &testpb.StreamingOutputCallRequest{
  174. ResponseType: testpb.PayloadType_COMPRESSABLE,
  175. ResponseParameters: respParam,
  176. Payload: pl,
  177. }
  178. if err := stream.Send(req); err != nil {
  179. grpclog.Fatalf("%v has error %v while sending %v", stream, err, req)
  180. }
  181. reply, err := stream.Recv()
  182. if err != nil {
  183. grpclog.Fatalf("%v.Recv() = %v", stream, err)
  184. }
  185. t := reply.GetPayload().GetType()
  186. if t != testpb.PayloadType_COMPRESSABLE {
  187. grpclog.Fatalf("Got the reply of type %d, want %d", t, testpb.PayloadType_COMPRESSABLE)
  188. }
  189. size := len(reply.GetPayload().GetBody())
  190. if size != respSizes[index] {
  191. grpclog.Fatalf("Got reply body of length %d, want %d", size, respSizes[index])
  192. }
  193. index++
  194. }
  195. if err := stream.CloseSend(); err != nil {
  196. grpclog.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil)
  197. }
  198. if _, err := stream.Recv(); err != io.EOF {
  199. grpclog.Fatalf("%v failed to complele the ping pong test: %v", stream, err)
  200. }
  201. }
  202. // DoEmptyStream sets up a bi-directional streaming with zero message.
  203. func DoEmptyStream(tc testpb.TestServiceClient, args ...grpc.CallOption) {
  204. stream, err := tc.FullDuplexCall(context.Background(), args...)
  205. if err != nil {
  206. grpclog.Fatalf("%v.FullDuplexCall(_) = _, %v", tc, err)
  207. }
  208. if err := stream.CloseSend(); err != nil {
  209. grpclog.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil)
  210. }
  211. if _, err := stream.Recv(); err != io.EOF {
  212. grpclog.Fatalf("%v failed to complete the empty stream test: %v", stream, err)
  213. }
  214. }
  215. // DoTimeoutOnSleepingServer performs an RPC on a sleep server which causes RPC timeout.
  216. func DoTimeoutOnSleepingServer(tc testpb.TestServiceClient, args ...grpc.CallOption) {
  217. ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond)
  218. defer cancel()
  219. stream, err := tc.FullDuplexCall(ctx, args...)
  220. if err != nil {
  221. if status.Code(err) == codes.DeadlineExceeded {
  222. return
  223. }
  224. grpclog.Fatalf("%v.FullDuplexCall(_) = _, %v", tc, err)
  225. }
  226. pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, 27182)
  227. req := &testpb.StreamingOutputCallRequest{
  228. ResponseType: testpb.PayloadType_COMPRESSABLE,
  229. Payload: pl,
  230. }
  231. if err := stream.Send(req); err != nil && err != io.EOF {
  232. grpclog.Fatalf("%v.Send(_) = %v", stream, err)
  233. }
  234. if _, err := stream.Recv(); status.Code(err) != codes.DeadlineExceeded {
  235. grpclog.Fatalf("%v.Recv() = _, %v, want error code %d", stream, err, codes.DeadlineExceeded)
  236. }
  237. }
  238. // DoComputeEngineCreds performs a unary RPC with compute engine auth.
  239. func DoComputeEngineCreds(tc testpb.TestServiceClient, serviceAccount, oauthScope string) {
  240. pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
  241. req := &testpb.SimpleRequest{
  242. ResponseType: testpb.PayloadType_COMPRESSABLE,
  243. ResponseSize: int32(largeRespSize),
  244. Payload: pl,
  245. FillUsername: true,
  246. FillOauthScope: true,
  247. }
  248. reply, err := tc.UnaryCall(context.Background(), req)
  249. if err != nil {
  250. grpclog.Fatal("/TestService/UnaryCall RPC failed: ", err)
  251. }
  252. user := reply.GetUsername()
  253. scope := reply.GetOauthScope()
  254. if user != serviceAccount {
  255. grpclog.Fatalf("Got user name %q, want %q.", user, serviceAccount)
  256. }
  257. if !strings.Contains(oauthScope, scope) {
  258. grpclog.Fatalf("Got OAuth scope %q which is NOT a substring of %q.", scope, oauthScope)
  259. }
  260. }
  261. func getServiceAccountJSONKey(keyFile string) []byte {
  262. jsonKey, err := ioutil.ReadFile(keyFile)
  263. if err != nil {
  264. grpclog.Fatalf("Failed to read the service account key file: %v", err)
  265. }
  266. return jsonKey
  267. }
  268. // DoServiceAccountCreds performs a unary RPC with service account auth.
  269. func DoServiceAccountCreds(tc testpb.TestServiceClient, serviceAccountKeyFile, oauthScope string) {
  270. pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
  271. req := &testpb.SimpleRequest{
  272. ResponseType: testpb.PayloadType_COMPRESSABLE,
  273. ResponseSize: int32(largeRespSize),
  274. Payload: pl,
  275. FillUsername: true,
  276. FillOauthScope: true,
  277. }
  278. reply, err := tc.UnaryCall(context.Background(), req)
  279. if err != nil {
  280. grpclog.Fatal("/TestService/UnaryCall RPC failed: ", err)
  281. }
  282. jsonKey := getServiceAccountJSONKey(serviceAccountKeyFile)
  283. user := reply.GetUsername()
  284. scope := reply.GetOauthScope()
  285. if !strings.Contains(string(jsonKey), user) {
  286. grpclog.Fatalf("Got user name %q which is NOT a substring of %q.", user, jsonKey)
  287. }
  288. if !strings.Contains(oauthScope, scope) {
  289. grpclog.Fatalf("Got OAuth scope %q which is NOT a substring of %q.", scope, oauthScope)
  290. }
  291. }
  292. // DoJWTTokenCreds performs a unary RPC with JWT token auth.
  293. func DoJWTTokenCreds(tc testpb.TestServiceClient, serviceAccountKeyFile string) {
  294. pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
  295. req := &testpb.SimpleRequest{
  296. ResponseType: testpb.PayloadType_COMPRESSABLE,
  297. ResponseSize: int32(largeRespSize),
  298. Payload: pl,
  299. FillUsername: true,
  300. }
  301. reply, err := tc.UnaryCall(context.Background(), req)
  302. if err != nil {
  303. grpclog.Fatal("/TestService/UnaryCall RPC failed: ", err)
  304. }
  305. jsonKey := getServiceAccountJSONKey(serviceAccountKeyFile)
  306. user := reply.GetUsername()
  307. if !strings.Contains(string(jsonKey), user) {
  308. grpclog.Fatalf("Got user name %q which is NOT a substring of %q.", user, jsonKey)
  309. }
  310. }
  311. // GetToken obtains an OAUTH token from the input.
  312. func GetToken(serviceAccountKeyFile string, oauthScope string) *oauth2.Token {
  313. jsonKey := getServiceAccountJSONKey(serviceAccountKeyFile)
  314. config, err := google.JWTConfigFromJSON(jsonKey, oauthScope)
  315. if err != nil {
  316. grpclog.Fatalf("Failed to get the config: %v", err)
  317. }
  318. token, err := config.TokenSource(context.Background()).Token()
  319. if err != nil {
  320. grpclog.Fatalf("Failed to get the token: %v", err)
  321. }
  322. return token
  323. }
  324. // DoOauth2TokenCreds performs a unary RPC with OAUTH2 token auth.
  325. func DoOauth2TokenCreds(tc testpb.TestServiceClient, serviceAccountKeyFile, oauthScope string) {
  326. pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
  327. req := &testpb.SimpleRequest{
  328. ResponseType: testpb.PayloadType_COMPRESSABLE,
  329. ResponseSize: int32(largeRespSize),
  330. Payload: pl,
  331. FillUsername: true,
  332. FillOauthScope: true,
  333. }
  334. reply, err := tc.UnaryCall(context.Background(), req)
  335. if err != nil {
  336. grpclog.Fatal("/TestService/UnaryCall RPC failed: ", err)
  337. }
  338. jsonKey := getServiceAccountJSONKey(serviceAccountKeyFile)
  339. user := reply.GetUsername()
  340. scope := reply.GetOauthScope()
  341. if !strings.Contains(string(jsonKey), user) {
  342. grpclog.Fatalf("Got user name %q which is NOT a substring of %q.", user, jsonKey)
  343. }
  344. if !strings.Contains(oauthScope, scope) {
  345. grpclog.Fatalf("Got OAuth scope %q which is NOT a substring of %q.", scope, oauthScope)
  346. }
  347. }
  348. // DoPerRPCCreds performs a unary RPC with per RPC OAUTH2 token.
  349. func DoPerRPCCreds(tc testpb.TestServiceClient, serviceAccountKeyFile, oauthScope string) {
  350. jsonKey := getServiceAccountJSONKey(serviceAccountKeyFile)
  351. pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
  352. req := &testpb.SimpleRequest{
  353. ResponseType: testpb.PayloadType_COMPRESSABLE,
  354. ResponseSize: int32(largeRespSize),
  355. Payload: pl,
  356. FillUsername: true,
  357. FillOauthScope: true,
  358. }
  359. token := GetToken(serviceAccountKeyFile, oauthScope)
  360. kv := map[string]string{"authorization": token.Type() + " " + token.AccessToken}
  361. ctx := metadata.NewOutgoingContext(context.Background(), metadata.MD{"authorization": []string{kv["authorization"]}})
  362. reply, err := tc.UnaryCall(ctx, req)
  363. if err != nil {
  364. grpclog.Fatal("/TestService/UnaryCall RPC failed: ", err)
  365. }
  366. user := reply.GetUsername()
  367. scope := reply.GetOauthScope()
  368. if !strings.Contains(string(jsonKey), user) {
  369. grpclog.Fatalf("Got user name %q which is NOT a substring of %q.", user, jsonKey)
  370. }
  371. if !strings.Contains(oauthScope, scope) {
  372. grpclog.Fatalf("Got OAuth scope %q which is NOT a substring of %q.", scope, oauthScope)
  373. }
  374. }
  375. // DoGoogleDefaultCredentials performs an unary RPC with google default credentials
  376. func DoGoogleDefaultCredentials(tc testpb.TestServiceClient, defaultServiceAccount string) {
  377. pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
  378. req := &testpb.SimpleRequest{
  379. ResponseType: testpb.PayloadType_COMPRESSABLE,
  380. ResponseSize: int32(largeRespSize),
  381. Payload: pl,
  382. FillUsername: true,
  383. FillOauthScope: true,
  384. }
  385. reply, err := tc.UnaryCall(context.Background(), req)
  386. if err != nil {
  387. grpclog.Fatal("/TestService/UnaryCall RPC failed: ", err)
  388. }
  389. if reply.GetUsername() != defaultServiceAccount {
  390. grpclog.Fatalf("Got user name %q; wanted %q. ", reply.GetUsername(), defaultServiceAccount)
  391. }
  392. }
  393. // DoComputeEngineChannelCredentials performs an unary RPC with compute engine channel credentials
  394. func DoComputeEngineChannelCredentials(tc testpb.TestServiceClient, defaultServiceAccount string) {
  395. pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
  396. req := &testpb.SimpleRequest{
  397. ResponseType: testpb.PayloadType_COMPRESSABLE,
  398. ResponseSize: int32(largeRespSize),
  399. Payload: pl,
  400. FillUsername: true,
  401. FillOauthScope: true,
  402. }
  403. reply, err := tc.UnaryCall(context.Background(), req)
  404. if err != nil {
  405. grpclog.Fatal("/TestService/UnaryCall RPC failed: ", err)
  406. }
  407. if reply.GetUsername() != defaultServiceAccount {
  408. grpclog.Fatalf("Got user name %q; wanted %q. ", reply.GetUsername(), defaultServiceAccount)
  409. }
  410. }
  411. var testMetadata = metadata.MD{
  412. "key1": []string{"value1"},
  413. "key2": []string{"value2"},
  414. }
  415. // DoCancelAfterBegin cancels the RPC after metadata has been sent but before payloads are sent.
  416. func DoCancelAfterBegin(tc testpb.TestServiceClient, args ...grpc.CallOption) {
  417. ctx, cancel := context.WithCancel(metadata.NewOutgoingContext(context.Background(), testMetadata))
  418. stream, err := tc.StreamingInputCall(ctx, args...)
  419. if err != nil {
  420. grpclog.Fatalf("%v.StreamingInputCall(_) = _, %v", tc, err)
  421. }
  422. cancel()
  423. _, err = stream.CloseAndRecv()
  424. if status.Code(err) != codes.Canceled {
  425. grpclog.Fatalf("%v.CloseAndRecv() got error code %d, want %d", stream, status.Code(err), codes.Canceled)
  426. }
  427. }
  428. // DoCancelAfterFirstResponse cancels the RPC after receiving the first message from the server.
  429. func DoCancelAfterFirstResponse(tc testpb.TestServiceClient, args ...grpc.CallOption) {
  430. ctx, cancel := context.WithCancel(context.Background())
  431. stream, err := tc.FullDuplexCall(ctx, args...)
  432. if err != nil {
  433. grpclog.Fatalf("%v.FullDuplexCall(_) = _, %v", tc, err)
  434. }
  435. respParam := []*testpb.ResponseParameters{
  436. {
  437. Size: 31415,
  438. },
  439. }
  440. pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, 27182)
  441. req := &testpb.StreamingOutputCallRequest{
  442. ResponseType: testpb.PayloadType_COMPRESSABLE,
  443. ResponseParameters: respParam,
  444. Payload: pl,
  445. }
  446. if err := stream.Send(req); err != nil {
  447. grpclog.Fatalf("%v has error %v while sending %v", stream, err, req)
  448. }
  449. if _, err := stream.Recv(); err != nil {
  450. grpclog.Fatalf("%v.Recv() = %v", stream, err)
  451. }
  452. cancel()
  453. if _, err := stream.Recv(); status.Code(err) != codes.Canceled {
  454. grpclog.Fatalf("%v compleled with error code %d, want %d", stream, status.Code(err), codes.Canceled)
  455. }
  456. }
  457. var (
  458. initialMetadataValue = "test_initial_metadata_value"
  459. trailingMetadataValue = "\x0a\x0b\x0a\x0b\x0a\x0b"
  460. customMetadata = metadata.Pairs(
  461. initialMetadataKey, initialMetadataValue,
  462. trailingMetadataKey, trailingMetadataValue,
  463. )
  464. )
  465. func validateMetadata(header, trailer metadata.MD) {
  466. if len(header[initialMetadataKey]) != 1 {
  467. grpclog.Fatalf("Expected exactly one header from server. Received %d", len(header[initialMetadataKey]))
  468. }
  469. if header[initialMetadataKey][0] != initialMetadataValue {
  470. grpclog.Fatalf("Got header %s; want %s", header[initialMetadataKey][0], initialMetadataValue)
  471. }
  472. if len(trailer[trailingMetadataKey]) != 1 {
  473. grpclog.Fatalf("Expected exactly one trailer from server. Received %d", len(trailer[trailingMetadataKey]))
  474. }
  475. if trailer[trailingMetadataKey][0] != trailingMetadataValue {
  476. grpclog.Fatalf("Got trailer %s; want %s", trailer[trailingMetadataKey][0], trailingMetadataValue)
  477. }
  478. }
  479. // DoCustomMetadata checks that metadata is echoed back to the client.
  480. func DoCustomMetadata(tc testpb.TestServiceClient, args ...grpc.CallOption) {
  481. // Testing with UnaryCall.
  482. pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, 1)
  483. req := &testpb.SimpleRequest{
  484. ResponseType: testpb.PayloadType_COMPRESSABLE,
  485. ResponseSize: int32(1),
  486. Payload: pl,
  487. }
  488. ctx := metadata.NewOutgoingContext(context.Background(), customMetadata)
  489. var header, trailer metadata.MD
  490. args = append(args, grpc.Header(&header), grpc.Trailer(&trailer))
  491. reply, err := tc.UnaryCall(
  492. ctx,
  493. req,
  494. args...,
  495. )
  496. if err != nil {
  497. grpclog.Fatal("/TestService/UnaryCall RPC failed: ", err)
  498. }
  499. t := reply.GetPayload().GetType()
  500. s := len(reply.GetPayload().GetBody())
  501. if t != testpb.PayloadType_COMPRESSABLE || s != 1 {
  502. grpclog.Fatalf("Got the reply with type %d len %d; want %d, %d", t, s, testpb.PayloadType_COMPRESSABLE, 1)
  503. }
  504. validateMetadata(header, trailer)
  505. // Testing with FullDuplex.
  506. stream, err := tc.FullDuplexCall(ctx, args...)
  507. if err != nil {
  508. grpclog.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
  509. }
  510. respParam := []*testpb.ResponseParameters{
  511. {
  512. Size: 1,
  513. },
  514. }
  515. streamReq := &testpb.StreamingOutputCallRequest{
  516. ResponseType: testpb.PayloadType_COMPRESSABLE,
  517. ResponseParameters: respParam,
  518. Payload: pl,
  519. }
  520. if err := stream.Send(streamReq); err != nil {
  521. grpclog.Fatalf("%v has error %v while sending %v", stream, err, streamReq)
  522. }
  523. streamHeader, err := stream.Header()
  524. if err != nil {
  525. grpclog.Fatalf("%v.Header() = %v", stream, err)
  526. }
  527. if _, err := stream.Recv(); err != nil {
  528. grpclog.Fatalf("%v.Recv() = %v", stream, err)
  529. }
  530. if err := stream.CloseSend(); err != nil {
  531. grpclog.Fatalf("%v.CloseSend() = %v, want <nil>", stream, err)
  532. }
  533. if _, err := stream.Recv(); err != io.EOF {
  534. grpclog.Fatalf("%v failed to complete the custom metadata test: %v", stream, err)
  535. }
  536. streamTrailer := stream.Trailer()
  537. validateMetadata(streamHeader, streamTrailer)
  538. }
  539. // DoStatusCodeAndMessage checks that the status code is propagated back to the client.
  540. func DoStatusCodeAndMessage(tc testpb.TestServiceClient, args ...grpc.CallOption) {
  541. var code int32 = 2
  542. msg := "test status message"
  543. expectedErr := status.Error(codes.Code(code), msg)
  544. respStatus := &testpb.EchoStatus{
  545. Code: code,
  546. Message: msg,
  547. }
  548. // Test UnaryCall.
  549. req := &testpb.SimpleRequest{
  550. ResponseStatus: respStatus,
  551. }
  552. if _, err := tc.UnaryCall(context.Background(), req, args...); err == nil || err.Error() != expectedErr.Error() {
  553. grpclog.Fatalf("%v.UnaryCall(_, %v) = _, %v, want _, %v", tc, req, err, expectedErr)
  554. }
  555. // Test FullDuplexCall.
  556. stream, err := tc.FullDuplexCall(context.Background(), args...)
  557. if err != nil {
  558. grpclog.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
  559. }
  560. streamReq := &testpb.StreamingOutputCallRequest{
  561. ResponseStatus: respStatus,
  562. }
  563. if err := stream.Send(streamReq); err != nil {
  564. grpclog.Fatalf("%v has error %v while sending %v, want <nil>", stream, err, streamReq)
  565. }
  566. if err := stream.CloseSend(); err != nil {
  567. grpclog.Fatalf("%v.CloseSend() = %v, want <nil>", stream, err)
  568. }
  569. if _, err = stream.Recv(); err.Error() != expectedErr.Error() {
  570. grpclog.Fatalf("%v.Recv() returned error %v, want %v", stream, err, expectedErr)
  571. }
  572. }
  573. // DoSpecialStatusMessage verifies Unicode and whitespace is correctly processed
  574. // in status message.
  575. func DoSpecialStatusMessage(tc testpb.TestServiceClient, args ...grpc.CallOption) {
  576. const (
  577. code int32 = 2
  578. msg string = "\t\ntest with whitespace\r\nand Unicode BMP ☺ and non-BMP 😈\t\n"
  579. )
  580. expectedErr := status.Error(codes.Code(code), msg)
  581. req := &testpb.SimpleRequest{
  582. ResponseStatus: &testpb.EchoStatus{
  583. Code: code,
  584. Message: msg,
  585. },
  586. }
  587. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  588. defer cancel()
  589. if _, err := tc.UnaryCall(ctx, req, args...); err == nil || err.Error() != expectedErr.Error() {
  590. grpclog.Fatalf("%v.UnaryCall(_, %v) = _, %v, want _, %v", tc, req, err, expectedErr)
  591. }
  592. }
  593. // DoUnimplementedService attempts to call a method from an unimplemented service.
  594. func DoUnimplementedService(tc testpb.UnimplementedServiceClient) {
  595. _, err := tc.UnimplementedCall(context.Background(), &testpb.Empty{})
  596. if status.Code(err) != codes.Unimplemented {
  597. grpclog.Fatalf("%v.UnimplementedCall() = _, %v, want _, %v", tc, status.Code(err), codes.Unimplemented)
  598. }
  599. }
  600. // DoUnimplementedMethod attempts to call an unimplemented method.
  601. func DoUnimplementedMethod(cc *grpc.ClientConn) {
  602. var req, reply proto.Message
  603. if err := cc.Invoke(context.Background(), "/grpc.testing.TestService/UnimplementedCall", req, reply); err == nil || status.Code(err) != codes.Unimplemented {
  604. grpclog.Fatalf("ClientConn.Invoke(_, _, _, _, _) = %v, want error code %s", err, codes.Unimplemented)
  605. }
  606. }
  607. // DoPickFirstUnary runs multiple RPCs (rpcCount) and checks that all requests
  608. // are sent to the same backend.
  609. func DoPickFirstUnary(tc testpb.TestServiceClient) {
  610. const rpcCount = 100
  611. pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, 1)
  612. req := &testpb.SimpleRequest{
  613. ResponseType: testpb.PayloadType_COMPRESSABLE,
  614. ResponseSize: int32(1),
  615. Payload: pl,
  616. FillServerId: true,
  617. }
  618. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  619. defer cancel()
  620. var serverID string
  621. for i := 0; i < rpcCount; i++ {
  622. resp, err := tc.UnaryCall(ctx, req)
  623. if err != nil {
  624. grpclog.Fatalf("iteration %d, failed to do UnaryCall: %v", i, err)
  625. }
  626. id := resp.ServerId
  627. if id == "" {
  628. grpclog.Fatalf("iteration %d, got empty server ID", i)
  629. }
  630. if i == 0 {
  631. serverID = id
  632. continue
  633. }
  634. if serverID != id {
  635. grpclog.Fatalf("iteration %d, got different server ids: %q vs %q", i, serverID, id)
  636. }
  637. }
  638. }
  639. type testServer struct {
  640. }
  641. // NewTestServer creates a test server for test service.
  642. func NewTestServer() testpb.TestServiceServer {
  643. return &testServer{}
  644. }
  645. func (s *testServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
  646. return new(testpb.Empty), nil
  647. }
  648. func serverNewPayload(t testpb.PayloadType, size int32) (*testpb.Payload, error) {
  649. if size < 0 {
  650. return nil, fmt.Errorf("requested a response with invalid length %d", size)
  651. }
  652. body := make([]byte, size)
  653. switch t {
  654. case testpb.PayloadType_COMPRESSABLE:
  655. case testpb.PayloadType_UNCOMPRESSABLE:
  656. return nil, fmt.Errorf("payloadType UNCOMPRESSABLE is not supported")
  657. default:
  658. return nil, fmt.Errorf("unsupported payload type: %d", t)
  659. }
  660. return &testpb.Payload{
  661. Type: t,
  662. Body: body,
  663. }, nil
  664. }
  665. func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
  666. st := in.GetResponseStatus()
  667. if md, ok := metadata.FromIncomingContext(ctx); ok {
  668. if initialMetadata, ok := md[initialMetadataKey]; ok {
  669. header := metadata.Pairs(initialMetadataKey, initialMetadata[0])
  670. grpc.SendHeader(ctx, header)
  671. }
  672. if trailingMetadata, ok := md[trailingMetadataKey]; ok {
  673. trailer := metadata.Pairs(trailingMetadataKey, trailingMetadata[0])
  674. grpc.SetTrailer(ctx, trailer)
  675. }
  676. }
  677. if st != nil && st.Code != 0 {
  678. return nil, status.Error(codes.Code(st.Code), st.Message)
  679. }
  680. pl, err := serverNewPayload(in.GetResponseType(), in.GetResponseSize())
  681. if err != nil {
  682. return nil, err
  683. }
  684. return &testpb.SimpleResponse{
  685. Payload: pl,
  686. }, nil
  687. }
  688. func (s *testServer) StreamingOutputCall(args *testpb.StreamingOutputCallRequest, stream testpb.TestService_StreamingOutputCallServer) error {
  689. cs := args.GetResponseParameters()
  690. for _, c := range cs {
  691. if us := c.GetIntervalUs(); us > 0 {
  692. time.Sleep(time.Duration(us) * time.Microsecond)
  693. }
  694. pl, err := serverNewPayload(args.GetResponseType(), c.GetSize())
  695. if err != nil {
  696. return err
  697. }
  698. if err := stream.Send(&testpb.StreamingOutputCallResponse{
  699. Payload: pl,
  700. }); err != nil {
  701. return err
  702. }
  703. }
  704. return nil
  705. }
  706. func (s *testServer) StreamingInputCall(stream testpb.TestService_StreamingInputCallServer) error {
  707. var sum int
  708. for {
  709. in, err := stream.Recv()
  710. if err == io.EOF {
  711. return stream.SendAndClose(&testpb.StreamingInputCallResponse{
  712. AggregatedPayloadSize: int32(sum),
  713. })
  714. }
  715. if err != nil {
  716. return err
  717. }
  718. p := in.GetPayload().GetBody()
  719. sum += len(p)
  720. }
  721. }
  722. func (s *testServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error {
  723. if md, ok := metadata.FromIncomingContext(stream.Context()); ok {
  724. if initialMetadata, ok := md[initialMetadataKey]; ok {
  725. header := metadata.Pairs(initialMetadataKey, initialMetadata[0])
  726. stream.SendHeader(header)
  727. }
  728. if trailingMetadata, ok := md[trailingMetadataKey]; ok {
  729. trailer := metadata.Pairs(trailingMetadataKey, trailingMetadata[0])
  730. stream.SetTrailer(trailer)
  731. }
  732. }
  733. for {
  734. in, err := stream.Recv()
  735. if err == io.EOF {
  736. // read done.
  737. return nil
  738. }
  739. if err != nil {
  740. return err
  741. }
  742. st := in.GetResponseStatus()
  743. if st != nil && st.Code != 0 {
  744. return status.Error(codes.Code(st.Code), st.Message)
  745. }
  746. cs := in.GetResponseParameters()
  747. for _, c := range cs {
  748. if us := c.GetIntervalUs(); us > 0 {
  749. time.Sleep(time.Duration(us) * time.Microsecond)
  750. }
  751. pl, err := serverNewPayload(in.GetResponseType(), c.GetSize())
  752. if err != nil {
  753. return err
  754. }
  755. if err := stream.Send(&testpb.StreamingOutputCallResponse{
  756. Payload: pl,
  757. }); err != nil {
  758. return err
  759. }
  760. }
  761. }
  762. }
  763. func (s *testServer) HalfDuplexCall(stream testpb.TestService_HalfDuplexCallServer) error {
  764. var msgBuf []*testpb.StreamingOutputCallRequest
  765. for {
  766. in, err := stream.Recv()
  767. if err == io.EOF {
  768. // read done.
  769. break
  770. }
  771. if err != nil {
  772. return err
  773. }
  774. msgBuf = append(msgBuf, in)
  775. }
  776. for _, m := range msgBuf {
  777. cs := m.GetResponseParameters()
  778. for _, c := range cs {
  779. if us := c.GetIntervalUs(); us > 0 {
  780. time.Sleep(time.Duration(us) * time.Microsecond)
  781. }
  782. pl, err := serverNewPayload(m.GetResponseType(), c.GetSize())
  783. if err != nil {
  784. return err
  785. }
  786. if err := stream.Send(&testpb.StreamingOutputCallResponse{
  787. Payload: pl,
  788. }); err != nil {
  789. return err
  790. }
  791. }
  792. }
  793. return nil
  794. }