config.go 7.9 KB


  1. package config
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io/ioutil"
  6. "os"
  7. "time"
  8. "github.com/fsnotify/fsnotify"
  9. )
  10. var Conf *Configure
  11. var ConfigPath = "conf/common.json"
  12. type KeepaliveConfig struct {
  13. ClientTime json.Number `json:"client_time"`
  14. ClientTimeout json.Number `json:"client_timeout"`
  15. ServerTime json.Number `json:"server_time"`
  16. ServerTimeout json.Number `json:"server_timeout"`
  17. ServerMiniTime json.Number `json:"server_mini_time"`
  18. }
  19. type OssConfig struct {
  20. Endpoint string `json:"endpoint"`
  21. Id string `json:"id"`
  22. Secret string `json:"secret"`
  23. Bucket string `json:"bucket"`
  24. }
  25. // mysql 配置
  26. type MysqlConfig struct {
  27. User string `json:"user"`
  28. Password string `json:"password"`
  29. Addr string `json:"addr"`
  30. Db string `json:"db"`
  31. Charset string `json:"charset"`
  32. MaxIdle json.Number `json:"max_idle"`
  33. MaxConn json.Number `json:"max_conn"`
  34. }
  35. type MongoConfig struct {
  36. User string `json:"user"`
  37. Password string `json:"password"`
  38. Addr string `json:"addr"`
  39. }
  40. // redis 配置
  41. type RedisConfig struct {
  42. Addrs string `json:"addrs"`
  43. Password string `json:"password"`
  44. Db json.Number `json:"db"`
  45. PoolSize json.Number `json:"pool_size"`
  46. MinIdleConns json.Number `json:"min_idle_conns"`
  47. MaxRetries json.Number `json:"max_retries"`
  48. IsCluster string `json:"is_cluster"`
  49. }
  50. type ElasticConfig struct {
  51. Addr string `json:"addr"`
  52. Sniff string `json:"sniff"`
  53. }
  54. type LogConfig struct {
  55. MaxSize json.Number `json:"max_size"`
  56. MaxBackups json.Number `json:"max_backups"`
  57. MaxAge json.Number `json:"max_age"`
  58. Level string `json:"level"`
  59. Stacktrace string `json:"stacktrace"`
  60. Path string `json:"path"`
  61. }
  62. type RPCNode struct {
  63. ServiceName string `json:"service_name"`
  64. ServicePort string `json:"service_port"`
  65. ServiceIp string `json:"service_ip"`
  66. MysqlDb string `json:"mysql_db"`
  67. RedisDb json.Number `json:"redis_db"`
  68. LogLevel string `json:"log_level"`
  69. LogStacktrace string `json:"log_stacktrace"`
  70. }
  71. type RPCConfig struct {
  72. Prefix string
  73. Keepalive KeepaliveConfig `json:"keepalive"`
  74. AdmDws RPCNode `json:"adm_dws"`
  75. AdmOds RPCNode `json:"adm_ods"`
  76. AdmAds RPCNode `json:"adm_ads"`
  77. AdmTask RPCNode `json:"adm_task"`
  78. }
  79. type RabbitmqConfig struct {
  80. Addr string `json:"addr"`
  81. Username string `json:"username"`
  82. Passwrod string `json:"passwrod"`
  83. Vhost string `json:"vhost"`
  84. ExchangeName string `json:"exchange_name"`
  85. QueueName string `json:"queue_name"`
  86. RouteBindKey string `json:"route_bind_key"`
  87. ConsumerCount json.Number `json:"consumer_count"`
  88. }
  89. type Configure struct {
  90. RunMode string `json:"run_mode"`
  91. Log LogConfig `json:"log"`
  92. Mysql MysqlConfig `json:"mysql"`
  93. Mongo MongoConfig `json:"mongo"`
  94. Redis RedisConfig `json:"redis"`
  95. Elastic ElasticConfig `json:"elastic"`
  96. Oss OssConfig `json:"oss"`
  97. Rpc RPCConfig `json:"rpc"`
  98. OdsRabbitmq RabbitmqConfig `json:"ods_rabbitmq"`
  99. DwsRabbitmq RabbitmqConfig `json:"dws_rabbitmq"`
  100. AdsRabbitmq RabbitmqConfig `json:"ads_rabbitmq"`
  101. }
  102. /*
  103. func watchEtcd(keysAPI client.KeysAPI, index uint64, runmode, key string, cli client.Client) {
  104. basePath := fmt.Sprintf("/%s/config", runmode)
  105. watcherOptions := &client.WatcherOptions{
  106. AfterIndex: index,
  107. Recursive: true,
  108. }
  109. watcher := keysAPI.Watcher(basePath, watcherOptions)
  110. for {
  111. r, err := watcher.Next(context.Background())
  112. if err != nil || r == nil {
  113. break
  114. }
  115. if r.Node != nil && r.PrevNode != nil {
  116. if r.Node.Key == r.PrevNode.Key && (r.Node.Value != r.PrevNode.Value) {
  117. reloadEtcd(runmode, key, cli)
  118. }
  119. }
  120. if r.Node != nil && r.PrevNode == nil {
  121. reloadEtcd(runmode, key, cli)
  122. }
  123. }
  124. }
  125. func reloadEtcd(runmode, key string, cli client.Client) {
  126. keysAPI := client.NewKeysAPI(cli)
  127. basePath := fmt.Sprintf("/%s/config", runmode)
  128. if resp, err := keysAPI.Get(context.Background(), basePath, &client.GetOptions{
  129. Recursive: true,
  130. }); err == nil && resp != nil && resp.Node != nil {
  131. conf := &Configure{}
  132. value := getNodeData(key, resp.Node)
  133. if jsonStr, err := json.Marshal(value); err == nil {
  134. if err := json.Unmarshal(jsonStr, &conf); err == nil {
  135. *Conf = *conf
  136. return
  137. } else {
  138. fmt.Printf("json Unmarshal failed. error:%s", err)
  139. }
  140. } else {
  141. fmt.Printf("json Marshal failed. error:%s", err)
  142. }
  143. } else {
  144. fmt.Printf("get %s failed. error:%s", basePath, err)
  145. }
  146. }
  147. // key 为加密密钥
  148. func GetConfig(runmode, key string, cli client.Client) *Configure {
  149. keysAPI := client.NewKeysAPI(cli)
  150. basePath := fmt.Sprintf("/%s/config", runmode)
  151. if resp, err := keysAPI.Get(context.Background(), basePath, &client.GetOptions{
  152. Recursive: true,
  153. }); err == nil && resp != nil && resp.Node != nil {
  154. Conf = &Configure{}
  155. value := getNodeData(key, resp.Node)
  156. if jsonStr, err := json.Marshal(value); err == nil {
  157. if err := json.Unmarshal(jsonStr, &Conf); err == nil {
  158. go watchEtcd(keysAPI, resp.Index, runmode, key, cli)
  159. return Conf
  160. } else {
  161. fmt.Printf("json Unmarshal failed. error:%s", err)
  162. }
  163. } else {
  164. fmt.Printf("json Marshal failed. error:%s", err)
  165. }
  166. } else {
  167. fmt.Printf("get %s failed. error:%s", basePath, err)
  168. os.Exit(1)
  169. }
  170. return nil
  171. }
  172. // 递归取出node的叶子节点值
  173. func getNodeData(key string, head *client.Node) (value interface{}) {
  174. s0 := strings.Split(head.Key, "/")
  175. len0 := len(s0)
  176. if len0 == 0 {
  177. return
  178. }
  179. if head.Dir {
  180. mapData := map[string]interface{}{}
  181. for _, node := range head.Nodes {
  182. s1 := strings.Split(node.Key, "/")
  183. len1 := len(s1)
  184. if len1 == 0 {
  185. break
  186. }
  187. mapData[s1[len1-1]] = getNodeData(key, node)
  188. }
  189. value = mapData
  190. } else {
  191. if key != "" && head.Value != "" {
  192. if bytesData, err := Base64URLDecode(head.Value); err != nil {
  193. fmt.Printf("Base64URLDecode(%s) failed. error:%s", head.Value, err)
  194. os.Exit(1)
  195. } else {
  196. if data, err := AesDecrypt(bytesData, []byte(key)); err != nil {
  197. fmt.Printf("AesDecrypt failed. error:%s", err)
  198. os.Exit(1)
  199. } else {
  200. value = string(data)
  201. }
  202. }
  203. } else {
  204. // 无加密,直接取值
  205. value = head.Value
  206. }
  207. }
  208. return
  209. }
  210. */
  211. // 适配k8s 方式
  212. // 公共配置会以configmap的方式映射到容器的conf/common.json中
  213. func SetConfigFile(configPath string) {
  214. ConfigPath = configPath
  215. }
  216. func GetConfigForK8s() *Configure {
  217. buffer, err := ioutil.ReadFile(ConfigPath)
  218. if err != nil {
  219. fmt.Printf("get %s failed. error:%s", ConfigPath, err)
  220. return nil
  221. }
  222. Conf = &Configure{}
  223. if err := json.Unmarshal(buffer, Conf); err != nil {
  224. fmt.Printf("json Unmarshal failed. error:%s", err)
  225. return nil
  226. }
  227. go watchConfigFileForK8s()
  228. return Conf
  229. }
  230. func ReloadConfigForK8s() {
  231. buffer, err := ioutil.ReadFile(ConfigPath)
  232. if err != nil {
  233. fmt.Printf("get %s failed. error:%s", ConfigPath, err)
  234. }
  235. confTmp := &Configure{}
  236. if err := json.Unmarshal(buffer, confTmp); err != nil {
  237. fmt.Printf("json Unmarshal failed. error:%s", err)
  238. }
  239. Conf = confTmp
  240. }
  241. // 判断路径文件/文件夹是否存在
  242. func Exists(path string) bool {
  243. _, err := os.Stat(path)
  244. if err != nil {
  245. if os.IsExist(err) {
  246. return true
  247. }
  248. return false
  249. }
  250. return true
  251. }
  252. func watchConfigFileForK8s() {
  253. fileExist := true
  254. watch, err := fsnotify.NewWatcher()
  255. if err != nil {
  256. fmt.Printf("new file watcher failed\n\n")
  257. os.Exit(1)
  258. }
  259. defer watch.Close()
  260. /*err = watch.Add(ConfigPath)
  261. if err != nil {
  262. fmt.Printf("add file watcher failed\n\n")
  263. os.Exit(1)
  264. }*/
  265. for {
  266. // 判断文件是否存在
  267. if !Exists(ConfigPath) {
  268. time.Sleep(10 * time.Second)
  269. fileExist = false
  270. continue
  271. } else {
  272. watch.Remove(ConfigPath)
  273. watch.Add(ConfigPath)
  274. if !fileExist { // 文件重新创建
  275. ReloadConfigForK8s()
  276. }
  277. fileExist = true
  278. }
  279. select {
  280. case ev := <-watch.Events:
  281. {
  282. fmt.Println("op : ", ev.Op)
  283. ReloadConfigForK8s()
  284. }
  285. case err := <-watch.Errors:
  286. {
  287. fmt.Println("error : ", err)
  288. continue
  289. }
  290. }
  291. }
  292. }