rabbitmq.go 7.5 KB


  1. // Copyright 2019 getensh.com. All rights reserved.
  2. // Use of this source code is governed by getensh.com.
  3. package parser
  4. import (
  5. "adm-management/config"
  6. "encoding/json"
  7. "fmt"
  8. "log"
  9. "os"
  10. "time"
  11. "github.com/streadway/amqp"
  12. "git.getensh.com/common/gopkgsv2/logger"
  13. "go.uber.org/zap"
  14. "google.golang.org/grpc/status"
  15. )
  16. type RabbitmqClient struct {
  17. Addr string
  18. Username string
  19. Passwrod string
  20. Vhost string
  21. ExchangeName string
  22. QueueName string
  23. RouteBindKey string
  24. Connection *amqp.Connection
  25. PubChannel *amqp.Channel
  26. CallFunc func([]byte) error
  27. IsPub bool
  28. ConsumerChannelCount int
  29. }
  30. var OdsMq *RabbitmqClient
  31. func RabbitmqHandler(conf *config.Configure) {
  32. OdsMq = InitRabbitmq(
  33. conf.OdsRabbitmq.Addr,
  34. conf.OdsRabbitmq.Username,
  35. conf.OdsRabbitmq.Passwrod,
  36. conf.OdsRabbitmq.Vhost,
  37. conf.OdsRabbitmq.ExchangeName,
  38. conf.OdsRabbitmq.QueueName,
  39. conf.OdsRabbitmq.RouteBindKey,
  40. nil,
  41. true,
  42. 0,
  43. )
  44. }
  45. func InitRabbitmq(addr, username, passwrod, vhost, exchangeName, queueName, routeBindKey string, callFunc func([]byte) error, isPub bool, consumerChannelCount int) *RabbitmqClient {
  46. // 创建连接
  47. rabbitmqClient := &RabbitmqClient{Addr: addr,
  48. Username: username, Passwrod: passwrod,
  49. Vhost: vhost, ExchangeName: exchangeName,
  50. RouteBindKey: routeBindKey, QueueName: queueName,
  51. CallFunc: callFunc, IsPub: isPub,
  52. ConsumerChannelCount: consumerChannelCount}
  53. var err error
  54. rabbitmqClient.Connection, err = rabbitmqClient.Connect()
  55. if err != nil {
  56. fmt.Println("dial err :", err)
  57. os.Exit(1)
  58. }
  59. // 启动发送channel
  60. if isPub {
  61. err = rabbitmqClient.InitPubChannel()
  62. if err != nil {
  63. os.Exit(2)
  64. }
  65. }
  66. return rabbitmqClient
  67. }
  68. func (r *RabbitmqClient) StartConsumer() {
  69. // 启动消费channel
  70. if r.CallFunc != nil {
  71. for i := 0; i < r.ConsumerChannelCount; i++ {
  72. err := r.InitConsumerChannel(i)
  73. if err != nil {
  74. os.Exit(3)
  75. }
  76. }
  77. }
  78. }
  79. func (r *RabbitmqClient) Connect() (*amqp.Connection, error) {
  80. // 创建连接
  81. connectAddr := fmt.Sprintf("amqp://%s:%s@%s/%s", r.Username, r.Passwrod, r.Addr, r.Vhost)
  82. connect, err := amqp.Dial(connectAddr)
  83. if err != nil {
  84. return nil, err
  85. }
  86. go func() {
  87. log.Printf("Closing: %s", <-connect.NotifyClose(make(chan *amqp.Error)))
  88. log.Printf("Trying to reconnect")
  89. for {
  90. err := r.Reconnect()
  91. if err != nil {
  92. log.Println("reconnect:", err)
  93. time.Sleep(3 * time.Second)
  94. } else {
  95. break
  96. }
  97. }
  98. }()
  99. return connect, nil
  100. }
  101. func (r *RabbitmqClient) InitPubChannel() error {
  102. // 获取通道
  103. var err error
  104. r.PubChannel, err = r.Connection.Channel()
  105. if err != nil {
  106. return err
  107. }
  108. // 创建交换机
  109. err = r.PubChannel.ExchangeDeclare(r.ExchangeName, "topic", true, false, false, true, nil)
  110. if err != nil {
  111. return err
  112. }
  113. // 创建队列
  114. _, err = r.PubChannel.QueueDeclare(r.QueueName, true, false, false, true, nil)
  115. if err != nil {
  116. return err
  117. }
  118. // 绑定交换机和队列
  119. err = r.PubChannel.QueueBind(r.QueueName, r.RouteBindKey, r.ExchangeName, true, nil)
  120. if err != nil {
  121. return err
  122. }
  123. return nil
  124. }
  125. func (r *RabbitmqClient) InitConsumerChannel(index int) error {
  126. // 获取通道
  127. channel, err := r.Connection.Channel()
  128. if err != nil {
  129. return err
  130. }
  131. // 创建交换机
  132. err = channel.ExchangeDeclare(r.ExchangeName, "topic", true, false, false, true, nil)
  133. if err != nil {
  134. return err
  135. }
  136. // 创建队列
  137. _, err = channel.QueueDeclare(r.QueueName, true, false, false, true, nil)
  138. if err != nil {
  139. return err
  140. }
  141. // 绑定交换机和队列
  142. err = channel.QueueBind(r.QueueName, r.RouteBindKey, r.ExchangeName, true, nil)
  143. if err != nil {
  144. return err
  145. }
  146. go ConsumeMsg(r.CallFunc, index, r.QueueName, channel)
  147. return nil
  148. }
  149. func ConsumeMsg(callFunc func([]byte) error, index int, queueName string, channel *amqp.Channel) {
  150. defer func() {
  151. if r := recover(); r != nil {
  152. err := fmt.Errorf("%+v", r)
  153. e := &status.Status{}
  154. if er := json.Unmarshal([]byte(err.Error()), e); er != nil {
  155. logger.Error("err",
  156. zap.String("system_err", err.Error()),
  157. zap.Stack("stacktrace"))
  158. }
  159. }
  160. }()
  161. name := fmt.Sprintf("consumer%d", index)
  162. deliveries, err := channel.Consume(queueName, name, false, false, false, true, nil)
  163. if err != nil {
  164. fmt.Println("consume fail")
  165. return
  166. }
  167. //defer channel.Close()
  168. closeChan := make(chan *amqp.Error, 1)
  169. notifyClose := channel.NotifyClose(closeChan)
  170. for {
  171. select {
  172. case e := <-notifyClose:
  173. fmt.Printf("chan通道错误,e:%s", e.Error())
  174. close(closeChan)
  175. return
  176. case delivery := <-deliveries:
  177. // 手动ack确认
  178. // 注意: 这里只要调用了ack就是手动确认模式,
  179. // multiple 表示的是在此channel中先前所有未确认的deliveries都将被确认
  180. // 并不是表示设置为false就不进行当前ack确认,multiple=true: 用于设置批量消息确认;
  181. // 假设:在Channel(ch)上有5,6,7,8这4个delivery(ack) tags未确认;
  182. //情况1,delivery_tag=8 & multiple=true: 则5,6,7,8这4个tags都将被确认;
  183. //情况2,delivery_tag=8 & multiple=false:则只有8被确认,而5,6,7将不会被被确认;
  184. // 不做ack的话,在该消费者连接中断后,会重新投递给其他消费者
  185. err := callFunc(delivery.Body)
  186. if err != nil {
  187. fmt.Println("callFunc:", err.Error())
  188. } else {
  189. if err := delivery.Ack(false); err != nil {
  190. fmt.Println(err.Error())
  191. }
  192. }
  193. }
  194. /*v, ok := <-deliveries
  195. if ok {
  196. // 手动ack确认
  197. // 注意: 这里只要调用了ack就是手动确认模式,
  198. // multiple 表示的是在此channel中先前所有未确认的deliveries都将被确认
  199. // 并不是表示设置为false就不进行当前ack确认,multiple=true: 用于设置批量消息确认;
  200. // 假设:在Channel(ch)上有5,6,7,8这4个delivery(ack) tags未确认;
  201. //情况1,delivery_tag=8 & multiple=true: 则5,6,7,8这4个tags都将被确认;
  202. //情况2,delivery_tag=8 & multiple=false:则只有8被确认,而5,6,7将不会被被确认;
  203. // 不做ack的话,在该消费者连接中断后,会重新投递给其他消费者
  204. err := callFunc(v.Body)
  205. if err != nil {
  206. fmt.Println("callFunc:",err.Error())
  207. }else{
  208. if err := v.Ack(false); err != nil {
  209. fmt.Println(err.Error())
  210. }
  211. }
  212. } else {
  213. fmt.Println("Channel close")
  214. return
  215. }*/
  216. }
  217. }
  218. func (r *RabbitmqClient) Reconnect() error {
  219. // 创建连接
  220. var err error
  221. r.Connection, err = r.Connect()
  222. if err != nil {
  223. fmt.Println("dial err :", err)
  224. return err
  225. }
  226. // 启动发送channel
  227. if r.IsPub {
  228. err = r.InitPubChannel()
  229. if err != nil {
  230. return err
  231. }
  232. }
  233. // 启动消费channel
  234. if r.CallFunc != nil {
  235. for i := 0; i < r.ConsumerChannelCount; i++ {
  236. err = r.InitConsumerChannel(i)
  237. if err != nil {
  238. return err
  239. }
  240. }
  241. }
  242. return nil
  243. }
  244. func (r *RabbitmqClient) PublishMsg(data []byte) (err error) {
  245. defer func() {
  246. if r := recover(); r != nil {
  247. err := fmt.Errorf("%+v", r)
  248. e := &status.Status{}
  249. if er := json.Unmarshal([]byte(err.Error()), e); er != nil {
  250. logger.Error("err",
  251. zap.String("system_err", err.Error()),
  252. zap.Stack("stacktrace"))
  253. }
  254. }
  255. }()
  256. fmt.Println("publish msg:", string(data))
  257. err = r.PubChannel.Publish(r.ExchangeName, r.RouteBindKey, true, false, amqp.Publishing{
  258. DeliveryMode: amqp.Persistent, // 消息持久化
  259. Timestamp: time.Now(),
  260. ContentType: "text/plain",
  261. Body: data,
  262. })
  263. if err != nil {
  264. fmt.Println("publish msg err:", err)
  265. } else {
  266. fmt.Println("publish success!!!!")
  267. }
  268. return err
  269. }