main.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. /*
  2. *
  3. * Copyright 2018 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. // Binary server is an example server.
  19. package main
  20. import (
  21. "context"
  22. "flag"
  23. "fmt"
  24. "io"
  25. "log"
  26. "net"
  27. "strings"
  28. "time"
  29. "google.golang.org/grpc"
  30. "google.golang.org/grpc/codes"
  31. "google.golang.org/grpc/credentials"
  32. ecpb "google.golang.org/grpc/examples/features/proto/echo"
  33. "google.golang.org/grpc/metadata"
  34. "google.golang.org/grpc/status"
  35. "google.golang.org/grpc/testdata"
  36. )
  37. var (
  38. port = flag.Int("port", 50051, "the port to serve on")
  39. errMissingMetadata = status.Errorf(codes.InvalidArgument, "missing metadata")
  40. errInvalidToken = status.Errorf(codes.Unauthenticated, "invalid token")
  41. )
  42. // logger is to mock a sophisticated logging system. To simplify the example, we just print out the content.
  43. func logger(format string, a ...interface{}) {
  44. fmt.Printf("LOG:\t"+format+"\n", a...)
  45. }
  46. type server struct{}
  47. func (s *server) UnaryEcho(ctx context.Context, in *ecpb.EchoRequest) (*ecpb.EchoResponse, error) {
  48. fmt.Printf("unary echoing message %q\n", in.Message)
  49. return &ecpb.EchoResponse{Message: in.Message}, nil
  50. }
  51. func (s *server) ServerStreamingEcho(in *ecpb.EchoRequest, stream ecpb.Echo_ServerStreamingEchoServer) error {
  52. return status.Error(codes.Unimplemented, "not implemented")
  53. }
  54. func (s *server) ClientStreamingEcho(stream ecpb.Echo_ClientStreamingEchoServer) error {
  55. return status.Error(codes.Unimplemented, "not implemented")
  56. }
  57. func (s *server) BidirectionalStreamingEcho(stream ecpb.Echo_BidirectionalStreamingEchoServer) error {
  58. for {
  59. in, err := stream.Recv()
  60. if err != nil {
  61. if err == io.EOF {
  62. return nil
  63. }
  64. fmt.Printf("server: error receiving from stream: %v\n", err)
  65. return err
  66. }
  67. fmt.Printf("bidi echoing message %q\n", in.Message)
  68. stream.Send(&ecpb.EchoResponse{Message: in.Message})
  69. }
  70. }
  71. // valid validates the authorization.
  72. func valid(authorization []string) bool {
  73. if len(authorization) < 1 {
  74. return false
  75. }
  76. token := strings.TrimPrefix(authorization[0], "Bearer ")
  77. // Perform the token validation here. For the sake of this example, the code
  78. // here forgoes any of the usual OAuth2 token validation and instead checks
  79. // for a token matching an arbitrary string.
  80. return token == "some-secret-token"
  81. }
  82. func unaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
  83. // authentication (token verification)
  84. md, ok := metadata.FromIncomingContext(ctx)
  85. if !ok {
  86. return nil, errMissingMetadata
  87. }
  88. if !valid(md["authorization"]) {
  89. return nil, errInvalidToken
  90. }
  91. m, err := handler(ctx, req)
  92. if err != nil {
  93. logger("RPC failed with error %v", err)
  94. }
  95. return m, err
  96. }
  97. // wrappedStream wraps around the embedded grpc.ServerStream, and intercepts the RecvMsg and
  98. // SendMsg method call.
  99. type wrappedStream struct {
  100. grpc.ServerStream
  101. }
  102. func (w *wrappedStream) RecvMsg(m interface{}) error {
  103. logger("Receive a message (Type: %T) at %s", m, time.Now().Format(time.RFC3339))
  104. return w.ServerStream.RecvMsg(m)
  105. }
  106. func (w *wrappedStream) SendMsg(m interface{}) error {
  107. logger("Send a message (Type: %T) at %v", m, time.Now().Format(time.RFC3339))
  108. return w.ServerStream.SendMsg(m)
  109. }
  110. func newWrappedStream(s grpc.ServerStream) grpc.ServerStream {
  111. return &wrappedStream{s}
  112. }
  113. func streamInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
  114. // authentication (token verification)
  115. md, ok := metadata.FromIncomingContext(ss.Context())
  116. if !ok {
  117. return errMissingMetadata
  118. }
  119. if !valid(md["authorization"]) {
  120. return errInvalidToken
  121. }
  122. err := handler(srv, newWrappedStream(ss))
  123. if err != nil {
  124. logger("RPC failed with error %v", err)
  125. }
  126. return err
  127. }
  128. func main() {
  129. flag.Parse()
  130. lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
  131. if err != nil {
  132. log.Fatalf("failed to listen: %v", err)
  133. }
  134. // Create tls based credential.
  135. creds, err := credentials.NewServerTLSFromFile(testdata.Path("server1.pem"), testdata.Path("server1.key"))
  136. if err != nil {
  137. log.Fatalf("failed to create credentials: %v", err)
  138. }
  139. s := grpc.NewServer(grpc.Creds(creds), grpc.UnaryInterceptor(unaryInterceptor), grpc.StreamInterceptor(streamInterceptor))
  140. // Register EchoServer on the server.
  141. ecpb.RegisterEchoServer(s, &server{})
  142. if err := s.Serve(lis); err != nil {
  143. log.Fatalf("failed to serve: %v", err)
  144. }
  145. }