setup.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376
  1. // Copyright 2019 github.com. All rights reserved.
  2. // Use of this source code is governed by github.com.
  3. package pb
  4. import (
  5. fmt "fmt"
  6. "property-system-gateway/parser"
  7. "time"
  8. "go.etcd.io/etcd/client/v3/naming/resolver"
  9. grpc "google.golang.org/grpc"
  10. "google.golang.org/grpc/keepalive"
  11. )
  12. // 客户端集合
  13. var System SystemClient
  14. var Common CommonClient
  15. var Garden GardenClient
  16. var Thirdparty PropertyThirdpartyClient
  17. var Household HouseholdClient
  18. var PropertyLog PropertyLogClient
  19. var Device DeviceClient
  20. func setupDeviceClient2(kacp keepalive.ClientParameters, conns []*grpc.ClientConn) {
  21. // 根据是否为k8s来组装targets
  22. var serviceName string
  23. serviceName = parser.Conf.Rpc.Device.ServiceName
  24. builder, err := resolver.NewBuilder(parser.GetEtcdClient())
  25. if err != nil {
  26. panic(err)
  27. }
  28. // 发起一个连接并记录连接conn,后期释放
  29. if conn, err := grpc.Dial("etcd:///"+parser.Conf.Rpc.Prefix+"/"+serviceName,
  30. grpc.WithResolvers(builder),
  31. grpc.WithBalancerName("round_robin"),
  32. grpc.WithInsecure(), grpc.WithKeepaliveParams(kacp)); err == nil {
  33. Device = NewDeviceClient(conn)
  34. conns = append(conns, conn)
  35. } else {
  36. fmt.Println("[rpc] dial Device conn err", err)
  37. }
  38. return
  39. }
  40. func setupSystemClient2(kacp keepalive.ClientParameters, conns []*grpc.ClientConn) {
  41. // 根据是否为k8s来组装targets
  42. var serviceName string
  43. serviceName = parser.Conf.Rpc.System.ServiceName
  44. builder, err := resolver.NewBuilder(parser.GetEtcdClient())
  45. if err != nil {
  46. panic(err)
  47. }
  48. // 发起一个连接并记录连接conn,后期释放
  49. if conn, err := grpc.Dial("etcd:///"+parser.Conf.Rpc.Prefix+"/"+serviceName,
  50. grpc.WithResolvers(builder),
  51. grpc.WithBalancerName("round_robin"),
  52. grpc.WithInsecure(), grpc.WithKeepaliveParams(kacp)); err == nil {
  53. System = NewSystemClient(conn)
  54. conns = append(conns, conn)
  55. } else {
  56. fmt.Println("[rpc] dial system conn err", err)
  57. }
  58. return
  59. }
  60. func setupPropertyLogClient2(kacp keepalive.ClientParameters, conns []*grpc.ClientConn) {
  61. // 根据是否为k8s来组装targets
  62. var serviceName string
  63. serviceName = parser.Conf.Rpc.PropertyLog.ServiceName
  64. builder, err := resolver.NewBuilder(parser.GetEtcdClient())
  65. if err != nil {
  66. panic(err)
  67. }
  68. // 发起一个连接并记录连接conn,后期释放
  69. if conn, err := grpc.Dial("etcd:///"+parser.Conf.Rpc.Prefix+"/"+serviceName,
  70. grpc.WithResolvers(builder),
  71. grpc.WithBalancerName("round_robin"),
  72. grpc.WithInsecure(), grpc.WithKeepaliveParams(kacp)); err == nil {
  73. PropertyLog = NewPropertyLogClient(conn)
  74. conns = append(conns, conn)
  75. } else {
  76. fmt.Println("[rpc] dial PropertyLog conn err", err)
  77. }
  78. return
  79. }
  80. func setupHouseholdClient2(kacp keepalive.ClientParameters, conns []*grpc.ClientConn) {
  81. // 根据是否为k8s来组装targets
  82. var serviceName string
  83. serviceName = parser.Conf.Rpc.Household.ServiceName
  84. builder, err := resolver.NewBuilder(parser.GetEtcdClient())
  85. if err != nil {
  86. panic(err)
  87. }
  88. // 发起一个连接并记录连接conn,后期释放
  89. if conn, err := grpc.Dial("etcd:///"+parser.Conf.Rpc.Prefix+"/"+serviceName,
  90. grpc.WithResolvers(builder),
  91. grpc.WithBalancerName("round_robin"),
  92. grpc.WithInsecure(), grpc.WithKeepaliveParams(kacp)); err == nil {
  93. Household = NewHouseholdClient(conn)
  94. conns = append(conns, conn)
  95. } else {
  96. fmt.Println("[rpc] dial Household conn err", err)
  97. }
  98. return
  99. }
  100. func setupGardenClient2(kacp keepalive.ClientParameters, conns []*grpc.ClientConn) {
  101. // 根据是否为k8s来组装targets
  102. var serviceName string
  103. serviceName = parser.Conf.Rpc.Garden.ServiceName
  104. builder, err := resolver.NewBuilder(parser.GetEtcdClient())
  105. if err != nil {
  106. panic(err)
  107. }
  108. // 发起一个连接并记录连接conn,后期释放
  109. if conn, err := grpc.Dial("etcd:///"+parser.Conf.Rpc.Prefix+"/"+serviceName,
  110. grpc.WithResolvers(builder),
  111. grpc.WithBalancerName("round_robin"),
  112. grpc.WithInsecure(), grpc.WithKeepaliveParams(kacp)); err == nil {
  113. Garden = NewGardenClient(conn)
  114. conns = append(conns, conn)
  115. } else {
  116. fmt.Println("[rpc] dial Garden conn err", err)
  117. }
  118. return
  119. }
  120. func setupCommonClient2(kacp keepalive.ClientParameters, conns []*grpc.ClientConn) {
  121. // 根据是否为k8s来组装targets
  122. var serviceName string
  123. serviceName = parser.Conf.Rpc.Common.ServiceName
  124. builder, err := resolver.NewBuilder(parser.GetEtcdClient())
  125. if err != nil {
  126. panic(err)
  127. }
  128. // 发起一个连接并记录连接conn,后期释放
  129. if conn, err := grpc.Dial("etcd:///"+parser.Conf.Rpc.Prefix+"/"+serviceName,
  130. grpc.WithResolvers(builder),
  131. grpc.WithBalancerName("round_robin"),
  132. grpc.WithInsecure(), grpc.WithKeepaliveParams(kacp)); err == nil {
  133. Common = NewCommonClient(conn)
  134. conns = append(conns, conn)
  135. } else {
  136. fmt.Println("[rpc] dial Common conn err", err)
  137. }
  138. return
  139. }
  140. func setupThirdpartyClient2(kacp keepalive.ClientParameters, conns []*grpc.ClientConn) {
  141. // 根据是否为k8s来组装targets
  142. var serviceName string
  143. serviceName = parser.Conf.Rpc.Thirdparty.ServiceName
  144. builder, err := resolver.NewBuilder(parser.GetEtcdClient())
  145. if err != nil {
  146. panic(err)
  147. }
  148. // 发起一个连接并记录连接conn,后期释放
  149. if conn, err := grpc.Dial("etcd:///"+parser.Conf.Rpc.Prefix+"/"+serviceName,
  150. grpc.WithResolvers(builder),
  151. grpc.WithBalancerName("round_robin"),
  152. grpc.WithInsecure(), grpc.WithKeepaliveParams(kacp)); err == nil {
  153. Thirdparty = NewPropertyThirdpartyClient(conn)
  154. conns = append(conns, conn)
  155. } else {
  156. fmt.Println("[rpc] dial Thirdparty conn err", err)
  157. }
  158. return
  159. }
  160. func setupDeviceClient(kacp keepalive.ClientParameters, conns []*grpc.ClientConn) {
  161. // 根据是否为k8s来组装targets
  162. var serviceName string
  163. if parser.Conf.K8s {
  164. serviceName = parser.Conf.Rpc.Device.ServiceName
  165. } else {
  166. serviceName = parser.Conf.Rpc.Device.ServiceIp
  167. }
  168. // 发起一个连接并记录连接conn,后期释放
  169. if conn, err := grpc.Dial(fmt.Sprintf("%s:%d", serviceName,
  170. parser.Conf.Rpc.Device.ServicePort),
  171. grpc.WithInsecure(), grpc.WithKeepaliveParams(kacp)); err == nil {
  172. Device = NewDeviceClient(conn)
  173. conns = append(conns, conn)
  174. } else {
  175. fmt.Println("[rpc] dial Device conn err", err)
  176. }
  177. return
  178. }
  179. func setupSystemClient(kacp keepalive.ClientParameters, conns []*grpc.ClientConn) {
  180. // 根据是否为k8s来组装targets
  181. var serviceName string
  182. if parser.Conf.K8s {
  183. serviceName = parser.Conf.Rpc.System.ServiceName
  184. } else {
  185. serviceName = parser.Conf.Rpc.System.ServiceIp
  186. }
  187. // 发起一个连接并记录连接conn,后期释放
  188. if conn, err := grpc.Dial(fmt.Sprintf("%s:%d", serviceName,
  189. parser.Conf.Rpc.System.ServicePort),
  190. grpc.WithInsecure(), grpc.WithKeepaliveParams(kacp)); err == nil {
  191. System = NewSystemClient(conn)
  192. conns = append(conns, conn)
  193. } else {
  194. fmt.Println("[rpc] dial system conn err", err)
  195. }
  196. return
  197. }
  198. func setupPropertyLogClient(kacp keepalive.ClientParameters, conns []*grpc.ClientConn) {
  199. // 根据是否为k8s来组装targets
  200. var serviceName string
  201. if parser.Conf.K8s {
  202. serviceName = parser.Conf.Rpc.PropertyLog.ServiceName
  203. } else {
  204. serviceName = parser.Conf.Rpc.PropertyLog.ServiceIp
  205. }
  206. // 发起一个连接并记录连接conn,后期释放
  207. if conn, err := grpc.Dial(fmt.Sprintf("%s:%d", serviceName,
  208. parser.Conf.Rpc.PropertyLog.ServicePort),
  209. grpc.WithInsecure(), grpc.WithKeepaliveParams(kacp)); err == nil {
  210. PropertyLog = NewPropertyLogClient(conn)
  211. conns = append(conns, conn)
  212. } else {
  213. fmt.Println("[rpc] dial property log conn err", err)
  214. }
  215. return
  216. }
  217. func setupHouseholdClient(kacp keepalive.ClientParameters, conns []*grpc.ClientConn) {
  218. // 根据是否为k8s来组装targets
  219. var serviceName string
  220. if parser.Conf.K8s {
  221. serviceName = parser.Conf.Rpc.Household.ServiceName
  222. } else {
  223. serviceName = parser.Conf.Rpc.Household.ServiceIp
  224. }
  225. // 发起一个连接并记录连接conn,后期释放
  226. if conn, err := grpc.Dial(fmt.Sprintf("%s:%d", serviceName,
  227. parser.Conf.Rpc.Household.ServicePort),
  228. grpc.WithInsecure(), grpc.WithKeepaliveParams(kacp)); err == nil {
  229. Household = NewHouseholdClient(conn)
  230. conns = append(conns, conn)
  231. } else {
  232. fmt.Println("[rpc] dial household conn err", err)
  233. }
  234. return
  235. }
  236. func setupGardenClient(kacp keepalive.ClientParameters, conns []*grpc.ClientConn) {
  237. // 根据是否为k8s来组装targets
  238. var serviceName string
  239. if parser.Conf.K8s {
  240. serviceName = parser.Conf.Rpc.Garden.ServiceName
  241. } else {
  242. serviceName = parser.Conf.Rpc.Garden.ServiceIp
  243. }
  244. // 发起一个连接并记录连接conn,后期释放
  245. if conn, err := grpc.Dial(fmt.Sprintf("%s:%d", serviceName,
  246. parser.Conf.Rpc.Garden.ServicePort),
  247. grpc.WithInsecure(), grpc.WithKeepaliveParams(kacp)); err == nil {
  248. Garden = NewGardenClient(conn)
  249. conns = append(conns, conn)
  250. } else {
  251. fmt.Println("[rpc] dial Garden conn err", err)
  252. }
  253. return
  254. }
  255. func setupCommonClient(kacp keepalive.ClientParameters, conns []*grpc.ClientConn) {
  256. // 根据是否为k8s来组装targets
  257. var serviceName string
  258. if parser.Conf.K8s {
  259. serviceName = parser.Conf.Rpc.Common.ServiceName
  260. } else {
  261. serviceName = parser.Conf.Rpc.Common.ServiceIp
  262. }
  263. // 发起一个连接并记录连接conn,后期释放
  264. if conn, err := grpc.Dial(fmt.Sprintf("%s:%d", serviceName,
  265. parser.Conf.Rpc.Common.ServicePort),
  266. grpc.WithInsecure(), grpc.WithKeepaliveParams(kacp)); err == nil {
  267. Common = NewCommonClient(conn)
  268. conns = append(conns, conn)
  269. } else {
  270. fmt.Println("[rpc] dial common conn err", err)
  271. }
  272. return
  273. }
  274. func setupThirdpartyClient(kacp keepalive.ClientParameters, conns []*grpc.ClientConn) {
  275. // 根据是否为k8s来组装targets
  276. var serviceName string
  277. if parser.Conf.K8s {
  278. serviceName = parser.Conf.Rpc.Thirdparty.ServiceName
  279. } else {
  280. serviceName = parser.Conf.Rpc.Thirdparty.ServiceIp
  281. }
  282. // 发起一个连接并记录连接conn,后期释放
  283. if conn, err := grpc.Dial(fmt.Sprintf("%s:%d", serviceName,
  284. parser.Conf.Rpc.Thirdparty.ServicePort),
  285. grpc.WithInsecure(), grpc.WithKeepaliveParams(kacp)); err == nil {
  286. Thirdparty = NewPropertyThirdpartyClient(conn)
  287. conns = append(conns, conn)
  288. } else {
  289. fmt.Println("[rpc] dial thirdparty conn err", err)
  290. }
  291. return
  292. }
  293. // SetupClients 创建客户端
  294. func SetupClients() (conns []*grpc.ClientConn) {
  295. // 客户端配置参数
  296. var kacp = keepalive.ClientParameters{
  297. // send pings every n seconds if there is no activity
  298. Time: time.Duration(parser.Conf.Rpc.Keepalive.ClientTime) * time.Second,
  299. // wait n second for ping ack before considering the connection dead
  300. Timeout: time.Duration(parser.Conf.Rpc.Keepalive.ClientTimeout) * time.Second,
  301. // send pings even without active streams
  302. PermitWithoutStream: true,
  303. }
  304. if len(parser.Conf.EtcdAddrs) == 0 {
  305. setupSystemClient(kacp, conns)
  306. setupCommonClient(kacp, conns)
  307. setupThirdpartyClient(kacp, conns)
  308. setupGardenClient(kacp, conns)
  309. setupHouseholdClient(kacp, conns)
  310. setupPropertyLogClient(kacp, conns)
  311. setupDeviceClient(kacp, conns)
  312. return
  313. }
  314. setupSystemClient2(kacp, conns)
  315. setupCommonClient2(kacp, conns)
  316. setupThirdpartyClient2(kacp, conns)
  317. setupGardenClient2(kacp, conns)
  318. setupHouseholdClient2(kacp, conns)
  319. setupPropertyLogClient2(kacp, conns)
  320. setupDeviceClient2(kacp, conns)
  321. return
  322. }