config.go 8.2 KB


  1. package config
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io/ioutil"
  6. "os"
  7. "time"
  8. "github.com/fsnotify/fsnotify"
  9. )
  10. var Conf *Configure
  11. var ConfigPath = "conf/common.json"
  12. type KeepaliveConfig struct {
  13. ClientTime json.Number
  14. ClientTimeout json.Number
  15. ServerTime json.Number
  16. ServerTimeout json.Number
  17. ServerMiniTime json.Number
  18. }
  19. type OssConfig struct {
  20. AccessKey string `json:"access_key"`
  21. AccessSecret string `json:"access_secret"`
  22. Bucket string `json:"bucket"`
  23. EndPoint string `json:"end_point"`
  24. DefaultBrandImage string `json:"default_brand_image"`
  25. BrandImage string `json:"brand_image"`
  26. DefaultSeriesImage string `json:"default_series_image"`
  27. SeriesImage string `json:"series_image"`
  28. }
  29. // mysql 配置
  30. type MysqlConfig struct {
  31. User string `json:"user"`
  32. Password string `json:"password"`
  33. Addr string `json:"addr"`
  34. Db string `json:"db"`
  35. Charset string `json:"charset"`
  36. MaxIdle json.Number `json:"max_idle"`
  37. MaxConn json.Number `json:"max_conn"`
  38. }
  39. type MongoConfig struct {
  40. User string `json:"user"`
  41. Password string `json:"password"`
  42. Addr string `json:"addr"`
  43. }
  44. // redis 配置
  45. type RedisConfig struct {
  46. Addrs string `json:"addrs"`
  47. Password string `json:"password"`
  48. Db json.Number `json:"db"`
  49. PoolSize json.Number `json:"pool_size"`
  50. MinIdleConns json.Number `json:"min_idle_conns"`
  51. MaxRetries json.Number `json:"max_retries"`
  52. IsCluster string `json:"is_cluster"`
  53. }
  54. type ElasticConfig struct {
  55. Addr string `json:"addr"`
  56. Sniff string `json:"sniff"`
  57. }
  58. type LogConfig struct {
  59. MaxSize json.Number `json:"max_size"`
  60. MaxBackups json.Number `json:"max_backups"`
  61. MaxAge json.Number `json:"max_age"`
  62. Level string `json:"level"`
  63. Stacktrace string `json:"stacktrace"`
  64. Path string `json:"path"`
  65. }
  66. type RPCNode struct {
  67. ServiceName string `json:"service_name"`
  68. ServicePort string `json:"service_port"`
  69. ServiceIp string `json:"service_ip"`
  70. MysqlDb string `json:"mysql_db"`
  71. RedisDb json.Number `json:"redis_db"`
  72. LogLevel string `json:"log_level"`
  73. LogStacktrace string `json:"log_stacktrace"`
  74. }
  75. type RPCConfig struct {
  76. Prefix string
  77. Keepalive KeepaliveConfig
  78. AdmDws RPCNode `json:"adm_dws"`
  79. AdmOds RPCNode `json:"adm_ods"`
  80. // AdmAds RPCNode `json:"adm_ads"`
  81. AdmTask RPCNode `json:"adm_task"`
  82. }
  83. type RabbitmqConfig struct {
  84. Addr string `json:"addr"`
  85. Username string `json:"username"`
  86. Passwrod string `json:"passwrod"`
  87. Vhost string `json:"vhost"`
  88. ExchangeName string `json:"exchange_name"`
  89. QueueName string `json:"queue_name"`
  90. RouteBindKey string `json:"route_bind_key"`
  91. ConsumerCount json.Number `json:"consumer_count"`
  92. }
  93. type Configure struct {
  94. RunMode string `json:"run_mode"`
  95. Log LogConfig `json:"log"`
  96. Mysql MysqlConfig `json:"mysql"`
  97. Mongo MongoConfig `json:"mongo"`
  98. Redis RedisConfig `json:"redis"`
  99. Elastic ElasticConfig `json:"elastic"`
  100. Oss OssConfig `json:"oss"`
  101. Rpc RPCConfig `json:"rpc"`
  102. OdsRabbitmq RabbitmqConfig `json:"ods_rabbitmq"`
  103. DwsRabbitmq RabbitmqConfig `json:"dws_rabbitmq"`
  104. AdsRabbitmq RabbitmqConfig `json:"ads_rabbitmq"`
  105. Md5Rabbitmq RabbitmqConfig `json:"md5_rabbitmq"`
  106. }
  107. /*
  108. func watchEtcd(keysAPI client.KeysAPI, index uint64, runmode, key string, cli client.Client) {
  109. basePath := fmt.Sprintf("/%s/config", runmode)
  110. watcherOptions := &client.WatcherOptions{
  111. AfterIndex: index,
  112. Recursive: true,
  113. }
  114. watcher := keysAPI.Watcher(basePath, watcherOptions)
  115. for {
  116. r, err := watcher.Next(context.Background())
  117. if err != nil || r == nil {
  118. break
  119. }
  120. if r.Node != nil && r.PrevNode != nil {
  121. if r.Node.Key == r.PrevNode.Key && (r.Node.Value != r.PrevNode.Value) {
  122. reloadEtcd(runmode, key, cli)
  123. }
  124. }
  125. if r.Node != nil && r.PrevNode == nil {
  126. reloadEtcd(runmode, key, cli)
  127. }
  128. }
  129. }
  130. func reloadEtcd(runmode, key string, cli client.Client) {
  131. keysAPI := client.NewKeysAPI(cli)
  132. basePath := fmt.Sprintf("/%s/config", runmode)
  133. if resp, err := keysAPI.Get(context.Background(), basePath, &client.GetOptions{
  134. Recursive: true,
  135. }); err == nil && resp != nil && resp.Node != nil {
  136. conf := &Configure{}
  137. value := getNodeData(key, resp.Node)
  138. if jsonStr, err := json.Marshal(value); err == nil {
  139. if err := json.Unmarshal(jsonStr, &conf); err == nil {
  140. *Conf = *conf
  141. return
  142. } else {
  143. fmt.Printf("json Unmarshal failed. error:%s", err)
  144. }
  145. } else {
  146. fmt.Printf("json Marshal failed. error:%s", err)
  147. }
  148. } else {
  149. fmt.Printf("get %s failed. error:%s", basePath, err)
  150. }
  151. }
  152. // key 为加密密钥
  153. func GetConfig(runmode, key string, cli client.Client) *Configure {
  154. keysAPI := client.NewKeysAPI(cli)
  155. basePath := fmt.Sprintf("/%s/config", runmode)
  156. if resp, err := keysAPI.Get(context.Background(), basePath, &client.GetOptions{
  157. Recursive: true,
  158. }); err == nil && resp != nil && resp.Node != nil {
  159. Conf = &Configure{}
  160. value := getNodeData(key, resp.Node)
  161. if jsonStr, err := json.Marshal(value); err == nil {
  162. if err := json.Unmarshal(jsonStr, &Conf); err == nil {
  163. go watchEtcd(keysAPI, resp.Index, runmode, key, cli)
  164. return Conf
  165. } else {
  166. fmt.Printf("json Unmarshal failed. error:%s", err)
  167. }
  168. } else {
  169. fmt.Printf("json Marshal failed. error:%s", err)
  170. }
  171. } else {
  172. fmt.Printf("get %s failed. error:%s", basePath, err)
  173. os.Exit(1)
  174. }
  175. return nil
  176. }
  177. // 递归取出node的叶子节点值
  178. func getNodeData(key string, head *client.Node) (value interface{}) {
  179. s0 := strings.Split(head.Key, "/")
  180. len0 := len(s0)
  181. if len0 == 0 {
  182. return
  183. }
  184. if head.Dir {
  185. mapData := map[string]interface{}{}
  186. for _, node := range head.Nodes {
  187. s1 := strings.Split(node.Key, "/")
  188. len1 := len(s1)
  189. if len1 == 0 {
  190. break
  191. }
  192. mapData[s1[len1-1]] = getNodeData(key, node)
  193. }
  194. value = mapData
  195. } else {
  196. if key != "" && head.Value != "" {
  197. if bytesData, err := Base64URLDecode(head.Value); err != nil {
  198. fmt.Printf("Base64URLDecode(%s) failed. error:%s", head.Value, err)
  199. os.Exit(1)
  200. } else {
  201. if data, err := AesDecrypt(bytesData, []byte(key)); err != nil {
  202. fmt.Printf("AesDecrypt failed. error:%s", err)
  203. os.Exit(1)
  204. } else {
  205. value = string(data)
  206. }
  207. }
  208. } else {
  209. // 无加密,直接取值
  210. value = head.Value
  211. }
  212. }
  213. return
  214. }
  215. */
  216. // 适配k8s 方式
  217. // 公共配置会以configmap的方式映射到容器的conf/common.json中
  218. func SetConfigFile(configPath string) {
  219. ConfigPath = configPath
  220. }
  221. func GetConfigForK8s() *Configure {
  222. buffer, err := ioutil.ReadFile(ConfigPath)
  223. if err != nil {
  224. fmt.Printf("get %s failed. error:%s", ConfigPath, err)
  225. return nil
  226. }
  227. Conf = &Configure{}
  228. if err := json.Unmarshal(buffer, Conf); err != nil {
  229. fmt.Printf("json Unmarshal failed. error:%s", err)
  230. return nil
  231. }
  232. go watchConfigFileForK8s()
  233. return Conf
  234. }
  235. func ReloadConfigForK8s() {
  236. buffer, err := ioutil.ReadFile(ConfigPath)
  237. if err != nil {
  238. fmt.Printf("get %s failed. error:%s", ConfigPath, err)
  239. }
  240. confTmp := &Configure{}
  241. if err := json.Unmarshal(buffer, confTmp); err != nil {
  242. fmt.Printf("json Unmarshal failed. error:%s", err)
  243. }
  244. Conf = confTmp
  245. }
  246. // 判断路径文件/文件夹是否存在
  247. func Exists(path string) bool {
  248. _, err := os.Stat(path)
  249. if err != nil {
  250. if os.IsExist(err) {
  251. return true
  252. }
  253. return false
  254. }
  255. return true
  256. }
  257. func watchConfigFileForK8s() {
  258. fileExist := true
  259. watch, err := fsnotify.NewWatcher()
  260. if err != nil {
  261. fmt.Printf("new file watcher failed\n\n")
  262. os.Exit(1)
  263. }
  264. defer watch.Close()
  265. /*err = watch.Add(ConfigPath)
  266. if err != nil {
  267. fmt.Printf("add file watcher failed\n\n")
  268. os.Exit(1)
  269. }*/
  270. for {
  271. // 判断文件是否存在
  272. if !Exists(ConfigPath) {
  273. time.Sleep(10 * time.Second)
  274. fileExist = false
  275. continue
  276. } else {
  277. watch.Remove(ConfigPath)
  278. watch.Add(ConfigPath)
  279. if !fileExist { // 文件重新创建
  280. ReloadConfigForK8s()
  281. }
  282. fileExist = true
  283. }
  284. select {
  285. case ev := <-watch.Events:
  286. {
  287. fmt.Println("op : ", ev.Op)
  288. ReloadConfigForK8s()
  289. }
  290. case err := <-watch.Errors:
  291. {
  292. fmt.Println("error : ", err)
  293. continue
  294. }
  295. }
  296. }
  297. }