main.go 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. package main
  2. import (
  3. "gd_access_log/impl"
  4. "gd_access_log/impl/access_log"
  5. "gd_access_log/impl/h5_access_log"
  6. "gd_access_log/rpc_apis"
  7. "gd_access_log/rpc_apis/gd_management"
  8. "flag"
  9. "fmt"
  10. "log"
  11. "os"
  12. "os/signal"
  13. "strings"
  14. "syscall"
  15. "time"
  16. "gd_access_log/common.in/mq"
  17. "gd_access_log/common.in/cache"
  18. "gd_access_log/common.in/clinit"
  19. "gd_access_log/common.in/config"
  20. "gd_access_log/common.in/logger"
  21. "gd_access_log/common.in/utils"
  22. "github.com/astaxie/beego/orm"
  23. _ "github.com/go-sql-driver/mysql"
  24. //"github.com/gomodule/redigo/redis"
  25. "github.com/smallnest/rpcx/server"
  26. "github.com/smallnest/rpcx/serverplugin"
  27. gconfig "gd_access_log/common.in/config"
  28. "github.com/rcrowley/go-metrics"
  29. "gopkg.in/ini.v1"
  30. )
  31. var (
  32. // 这里可以改默认值
  33. configFile = flag.String("config", "/etc/gd_access_log/app.conf", "config file location")
  34. version = flag.Bool("version", false, "config file location")
  35. GitCommit = "library-import"
  36. Version = "library-import"
  37. )
  38. func showVersion() {
  39. fmt.Println("Version: ", Version)
  40. fmt.Println("GitCommit:", GitCommit)
  41. }
  42. func prepare( projectName, runmode, key, logDir string, etcdAddrs []string, discoveryType string) {
  43. var conf *config.Configure
  44. if discoveryType == "k8s" {
  45. conf = config.GetConfigForK8s()
  46. if conf == nil {
  47. fmt.Printf("get conf failed\n\n")
  48. os.Exit(1)
  49. }
  50. rpc_apis.InitForK8s(conf)
  51. } else {
  52. // 先行于读配置
  53. clinit.InitEtcd(etcdAddrs)
  54. conf = config.GetConfig(projectName+"/"+runmode, key, clinit.GetEtcdClient())
  55. if conf == nil {
  56. fmt.Printf("get conf failed\n\n")
  57. os.Exit(1)
  58. }
  59. // 初始化所调用的微服务
  60. rpc_apis.Init(etcdAddrs, conf)
  61. }
  62. appname := conf.Rpc.AccessLog.Name
  63. fmt.Println("init rabbitmq")
  64. // 初始化rabbitmq
  65. odsMq := mq.InitRabbitmq(
  66. conf.OdsRabbitmq.Addr,
  67. conf.OdsRabbitmq.Username,
  68. conf.OdsRabbitmq.Passwrod,
  69. conf.OdsRabbitmq.Vhost,
  70. conf.OdsRabbitmq.ExchangeName,
  71. conf.OdsRabbitmq.QueueName,
  72. conf.OdsRabbitmq.RouteBindKey,
  73. nil,
  74. true,
  75. 0,
  76. )
  77. mq.SetOdsMq(odsMq)
  78. // 指定mysql数据库,若无则使用默认数据库
  79. mysqldb := conf.Rpc.AccessLog.MysqlDB
  80. if mysqldb == "" {
  81. mysqldb = conf.LogMysql.DefaultDB
  82. }
  83. // 指定redis数据库,若无则使用默认数据库
  84. redisdb := conf.Rpc.AccessLog.RedisDB
  85. if redisdb == "" {
  86. redisdb = conf.Redis.DefaultDB
  87. }
  88. // 连接数据库服务器
  89. clinit.InitMySQL(&clinit.MysqlConfig{
  90. User: conf.LogMysql.User,
  91. Password: conf.LogMysql.Password,
  92. Addr: conf.LogMysql.Addr,
  93. DB: mysqldb,
  94. Charset: conf.LogMysql.Charset,
  95. MaxIdle: conf.LogMysql.MaxIdle,
  96. MaxConn: conf.LogMysql.MaxConn,
  97. })
  98. // 连接redis服务器
  99. cache.InitRedis(&cache.RedisConfig{
  100. Addrs: strings.Split(conf.Redis.Addrs, ","),
  101. Password: conf.Redis.Password,
  102. DB: redisdb,
  103. PoolSize: conf.Redis.PoolSize,
  104. MinIdleConns: conf.Redis.MinIdleConns,
  105. MaxRetries: conf.Redis.MaxRetries,
  106. IsCluster: conf.Redis.IsCluster,
  107. })
  108. // 初始化logger
  109. /*ms, _ := conf.Rpc.AccessLog.Log.MaxSize.Int64()
  110. mb, _ := conf.Rpc.AccessLog.Log.MaxBackups.Int64()
  111. ma, _ := conf.Rpc.AccessLog.Log.MaxAge.Int64()
  112. maxSize := int(ms)
  113. maxBackups := int(mb)
  114. maxAge := int(ma)
  115. disableStacktrace := (conf.Rpc.AccessLog.Log.DisableStacktrace == "true")
  116. commonLogger := logger.InitLogger(runmode, fmt.Sprintf("%s/%s.log", logDir, appname),
  117. maxSize, maxBackups, maxAge, disableStacktrace)
  118. accessLogger := logger.NewInfoLogger(runmode, fmt.Sprintf("%s/%s-access.log", logDir, appname),
  119. maxSize, maxBackups, maxAge)
  120. thirdpartyLogger := logger.NewInfoLogger(runmode, fmt.Sprintf("%s/%s-thirdparty.log", logDir, appname),
  121. maxSize, maxBackups, maxAge)
  122. */
  123. // 设置需要使用logger的地方
  124. /*if err := mongo.MgoInit(conf.Mongo.Addr, conf.Mongo.User, conf.Mongo.Password); err != nil {
  125. fmt.Printf("init mongo failed:%v\n", err)
  126. os.Exit(-1)
  127. }*/
  128. ms, _ := conf.Rpc.AccessLog.Log.MaxSize.Int64()
  129. mb, _ := conf.Rpc.AccessLog.Log.MaxBackups.Int64()
  130. ma, _ := conf.Rpc.AccessLog.Log.MaxAge.Int64()
  131. maxSize := int(ms)
  132. maxBackups := int(mb)
  133. maxAge := int(ma)
  134. disableStacktrace := (conf.Rpc.AccessLog.Log.DisableStacktrace == "true")
  135. // 通用logger
  136. commonLogger := logger.InitLogger(runmode, fmt.Sprintf("%s/%s.log", logDir, appname), appname, conf.Rpc.AccessLog.Log.Level,
  137. maxSize, maxBackups, maxAge, disableStacktrace)
  138. access_log.SetLogger(commonLogger)
  139. h5_access_log.SetLogger(commonLogger)
  140. impl.RegisterOrmModel()
  141. // common日志
  142. if runmode != "prod" {
  143. orm.Debug = true
  144. //redis.Debug = false
  145. }
  146. }
  147. func start(conf *config.Configure, serveAddr string, etcdAddrs []string) {
  148. fmt.Printf("xxxx:%v\n", etcdAddrs)
  149. s := server.NewServer()
  150. r := &serverplugin.EtcdRegisterPlugin{
  151. ServiceAddress: fmt.Sprintf("tcp@%s", serveAddr),
  152. EtcdServers: etcdAddrs,
  153. BasePath: conf.Rpc.BasePath,
  154. Metrics: metrics.NewRegistry(),
  155. UpdateInterval: time.Minute,
  156. }
  157. if err := r.Start(); err != nil {
  158. fmt.Printf("start failed. error:%s\n\n", err)
  159. os.Exit(1)
  160. }
  161. s.Plugins.Add(r)
  162. s.RegisterName(conf.Rpc.AccessLog.Name, new(impl.Rcvr), "")
  163. go func() {
  164. if err := s.Serve("tcp", serveAddr); err != nil {
  165. log.Fatalf("HTTP server listen failed. err: %s\n", err.Error())
  166. }
  167. }()
  168. // 捕获信号
  169. sigChan := make(chan os.Signal, 1)
  170. signal.Notify(sigChan, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL)
  171. sigValue := <-sigChan
  172. log.Printf("Got a signal:%v", sigValue)
  173. r.Stop()
  174. time.Sleep(5 * time.Second)
  175. log.Println("Shutdown server finished.")
  176. }
  177. func startPeerToPeer(conf *config.Configure, serveAddr string) {
  178. s := server.NewServer()
  179. s.RegisterName(conf.Rpc.AccessLog.Name, new(impl.Rcvr), "")
  180. s.Serve("tcp", serveAddr)
  181. }
  182. func main() {
  183. flag.Parse()
  184. if *version {
  185. showVersion()
  186. }
  187. cfg, err := ini.Load(*configFile)
  188. if err != nil {
  189. fmt.Printf("Fail to read file: %v\n\n", err)
  190. os.Exit(1)
  191. }
  192. //appname := cfg.Section("").Key("appname").String()
  193. logDir := cfg.Section("").Key("log_dir").String()
  194. runmode := cfg.Section("").Key("runmode").String()
  195. //serveAddr := cfg.Section("").Key("serve_addr").String()
  196. etcdAddrs := strings.Split(cfg.Section("").Key("etcd_addrs").String(), ",")
  197. encryptKey := cfg.Section("").Key("encrypt_key").String()
  198. discoveryType := cfg.Section("").Key("discovery_type").String()
  199. projectName := cfg.Section("").Key("project_name").String()
  200. prepare( projectName, runmode, encryptKey, logDir, etcdAddrs, discoveryType)
  201. go utils.Free()
  202. go impl.CleanLocalCache()
  203. //utils.HandleExitSignal()
  204. gd_management.Watch()
  205. serveAddr := fmt.Sprintf("%s:%s",gconfig.Conf.Rpc.AccessLog.ServiceName,gconfig.Conf.Rpc.AccessLog.ServicePort.String())
  206. if discoveryType == "etcd" {
  207. fmt.Println(gconfig.Conf, serveAddr, etcdAddrs)
  208. start(gconfig.Conf, serveAddr, etcdAddrs)
  209. } else {
  210. startPeerToPeer(gconfig.Conf, serveAddr)
  211. }
  212. return
  213. }