setup.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229
  1. // Copyright 2019 getensh.com. All rights reserved.
  2. // Use of this source code is governed by getensh.com.
  3. package pb
  4. import (
  5. fmt "fmt"
  6. "property-device/parser"
  7. "time"
  8. "go.etcd.io/etcd/client/v3/naming/resolver"
  9. grpc "google.golang.org/grpc"
  10. "google.golang.org/grpc/keepalive"
  11. )
  12. // 客户端集合
  13. var System SystemClient
  14. var Garden GardenClient
  15. var Mqtt MqttClient
  16. var Common CommonClient
  17. func setupMqttClient(kacp keepalive.ClientParameters, conns []*grpc.ClientConn) {
  18. // 根据是否为k8s来组装targets
  19. var serviceName string
  20. if parser.Conf.K8s {
  21. serviceName = parser.Conf.Rpc.Mqtt.ServiceName
  22. } else {
  23. serviceName = parser.Conf.Rpc.Mqtt.ServiceIp
  24. }
  25. // 发起一个连接并记录连接conn,后期释放
  26. if conn, err := grpc.Dial(fmt.Sprintf("%s:%d", serviceName,
  27. parser.Conf.Rpc.Mqtt.ServicePort),
  28. grpc.WithInsecure(), grpc.WithKeepaliveParams(kacp)); err == nil {
  29. Mqtt = NewMqttClient(conn)
  30. conns = append(conns, conn)
  31. } else {
  32. fmt.Println("[rpc] dial mqtt conn err", err)
  33. }
  34. return
  35. }
  36. func setupCommonClient(kacp keepalive.ClientParameters, conns []*grpc.ClientConn) {
  37. // 根据是否为k8s来组装targets
  38. var serviceName string
  39. if parser.Conf.K8s {
  40. serviceName = parser.Conf.Rpc.Common.ServiceName
  41. } else {
  42. serviceName = parser.Conf.Rpc.Common.ServiceIp
  43. }
  44. // 发起一个连接并记录连接conn,后期释放
  45. if conn, err := grpc.Dial(fmt.Sprintf("%s:%d", serviceName,
  46. parser.Conf.Rpc.Common.ServicePort),
  47. grpc.WithInsecure(), grpc.WithKeepaliveParams(kacp)); err == nil {
  48. Common = NewCommonClient(conn)
  49. conns = append(conns, conn)
  50. } else {
  51. fmt.Println("[rpc] dial Common conn err", err)
  52. }
  53. return
  54. }
  55. func setupSystemClient(kacp keepalive.ClientParameters, conns []*grpc.ClientConn) {
  56. // 根据是否为k8s来组装targets
  57. var serviceName string
  58. if parser.Conf.K8s {
  59. serviceName = parser.Conf.Rpc.System.ServiceName
  60. } else {
  61. serviceName = parser.Conf.Rpc.System.ServiceIp
  62. }
  63. // 发起一个连接并记录连接conn,后期释放
  64. if conn, err := grpc.Dial(fmt.Sprintf("%s:%d", serviceName,
  65. parser.Conf.Rpc.System.ServicePort),
  66. grpc.WithInsecure(), grpc.WithKeepaliveParams(kacp)); err == nil {
  67. System = NewSystemClient(conn)
  68. conns = append(conns, conn)
  69. } else {
  70. fmt.Println("[rpc] dial system conn err", err)
  71. }
  72. return
  73. }
  74. func setupGardenClient(kacp keepalive.ClientParameters, conns []*grpc.ClientConn) {
  75. // 根据是否为k8s来组装targets
  76. var serviceName string
  77. if parser.Conf.K8s {
  78. serviceName = parser.Conf.Rpc.Garden.ServiceName
  79. } else {
  80. serviceName = parser.Conf.Rpc.Garden.ServiceIp
  81. }
  82. // 发起一个连接并记录连接conn,后期释放
  83. if conn, err := grpc.Dial(fmt.Sprintf("%s:%d", serviceName,
  84. parser.Conf.Rpc.Garden.ServicePort),
  85. grpc.WithInsecure(), grpc.WithKeepaliveParams(kacp)); err == nil {
  86. Garden = NewGardenClient(conn)
  87. conns = append(conns, conn)
  88. } else {
  89. fmt.Println("[rpc] dial garden conn err", err)
  90. }
  91. return
  92. }
  93. func setupSystemClient2(kacp keepalive.ClientParameters, conns []*grpc.ClientConn) {
  94. // 根据是否为k8s来组装targets
  95. var serviceName string
  96. serviceName = parser.Conf.Rpc.System.ServiceName
  97. builder, err := resolver.NewBuilder(parser.GetEtcdClient())
  98. if err != nil {
  99. panic(err)
  100. }
  101. // 发起一个连接并记录连接conn,后期释放
  102. if conn, err := grpc.Dial("etcd:///"+parser.Conf.Rpc.Prefix+"/"+serviceName,
  103. grpc.WithResolvers(builder),
  104. grpc.WithBalancerName("round_robin"),
  105. grpc.WithInsecure(), grpc.WithKeepaliveParams(kacp)); err == nil {
  106. System = NewSystemClient(conn)
  107. conns = append(conns, conn)
  108. } else {
  109. fmt.Println("[rpc] dial system conn err", err)
  110. }
  111. return
  112. }
  113. func setupCommonClient2(kacp keepalive.ClientParameters, conns []*grpc.ClientConn) {
  114. // 根据是否为k8s来组装targets
  115. var serviceName string
  116. serviceName = parser.Conf.Rpc.Common.ServiceName
  117. builder, err := resolver.NewBuilder(parser.GetEtcdClient())
  118. if err != nil {
  119. panic(err)
  120. }
  121. // 发起一个连接并记录连接conn,后期释放
  122. if conn, err := grpc.Dial("etcd:///"+parser.Conf.Rpc.Prefix+"/"+serviceName,
  123. grpc.WithResolvers(builder),
  124. grpc.WithBalancerName("round_robin"),
  125. grpc.WithInsecure(), grpc.WithKeepaliveParams(kacp)); err == nil {
  126. Common = NewCommonClient(conn)
  127. conns = append(conns, conn)
  128. } else {
  129. fmt.Println("[rpc] dial Common conn err", err)
  130. }
  131. return
  132. }
  133. func setupMqttClient2(kacp keepalive.ClientParameters, conns []*grpc.ClientConn) {
  134. // 根据是否为k8s来组装targets
  135. var serviceName string
  136. serviceName = parser.Conf.Rpc.Mqtt.ServiceName
  137. builder, err := resolver.NewBuilder(parser.GetEtcdClient())
  138. if err != nil {
  139. panic(err)
  140. }
  141. // 发起一个连接并记录连接conn,后期释放
  142. if conn, err := grpc.Dial("etcd:///"+parser.Conf.Rpc.Prefix+"/"+serviceName,
  143. grpc.WithResolvers(builder),
  144. grpc.WithBalancerName("round_robin"),
  145. grpc.WithInsecure(), grpc.WithKeepaliveParams(kacp)); err == nil {
  146. Mqtt = NewMqttClient(conn)
  147. conns = append(conns, conn)
  148. } else {
  149. fmt.Println("[rpc] dial mqtt conn err", err)
  150. }
  151. return
  152. }
  153. func setupGardenClient2(kacp keepalive.ClientParameters, conns []*grpc.ClientConn) {
  154. // 根据是否为k8s来组装targets
  155. var serviceName string
  156. serviceName = parser.Conf.Rpc.Garden.ServiceName
  157. builder, err := resolver.NewBuilder(parser.GetEtcdClient())
  158. if err != nil {
  159. panic(err)
  160. }
  161. // 发起一个连接并记录连接conn,后期释放
  162. if conn, err := grpc.Dial("etcd:///"+parser.Conf.Rpc.Prefix+"/"+serviceName,
  163. grpc.WithResolvers(builder),
  164. grpc.WithBalancerName("round_robin"),
  165. grpc.WithInsecure(), grpc.WithKeepaliveParams(kacp)); err == nil {
  166. Garden = NewGardenClient(conn)
  167. conns = append(conns, conn)
  168. } else {
  169. fmt.Println("[rpc] dial Garden conn err", err)
  170. }
  171. return
  172. }
  173. // SetupClients 创建客户端
  174. func SetupClients() (conns []*grpc.ClientConn) {
  175. // 客户端配置参数
  176. var kacp = keepalive.ClientParameters{
  177. // send pings every n seconds if there is no activity
  178. Time: time.Duration(parser.Conf.Rpc.Keepalive.ClientTime) * time.Second,
  179. // wait n second for ping ack before considering the connection dead
  180. Timeout: time.Duration(parser.Conf.Rpc.Keepalive.ClientTimeout) * time.Second,
  181. // send pings even without active streams
  182. PermitWithoutStream: true,
  183. }
  184. if len(parser.Conf.EtcdAddrs) == 0 {
  185. setupSystemClient(kacp, conns)
  186. setupGardenClient(kacp, conns)
  187. setupMqttClient(kacp, conns)
  188. setupCommonClient(kacp, conns)
  189. return
  190. }
  191. setupSystemClient2(kacp, conns)
  192. setupGardenClient2(kacp, conns)
  193. setupMqttClient2(kacp, conns)
  194. setupCommonClient2(kacp, conns)
  195. return
  196. }