rabbitmq.go 7.2 KB


  1. // Copyright 2019 getensh.com. All rights reserved.
  2. // Use of this source code is governed by getensh.com.
  3. package mq
  4. import (
  5. "adm-ods/common.in/jsonrpc2"
  6. "encoding/json"
  7. "fmt"
  8. "log"
  9. //"github.com/go-kit/kit/transport/amqp"
  10. //"github.com/go-kit/kit/transport/amqp"
  11. "github.com/streadway/amqp"
  12. "os"
  13. "time"
  14. )
  15. type RabbitmqClient struct {
  16. Addr string
  17. Username string
  18. Passwrod string
  19. Vhost string
  20. ExchangeName string
  21. QueueName string
  22. RouteBindKey string
  23. Connection *amqp.Connection
  24. PubChannel *amqp.Channel
  25. CallFunc func([]byte) error
  26. IsPub bool
  27. ConsumerChannelCount int
  28. }
  29. var DwsMq *RabbitmqClient
  30. var OdsMq *RabbitmqClient
  31. func SetDwsMq(client *RabbitmqClient) {
  32. DwsMq = client
  33. }
  34. func SetOdsMq(client *RabbitmqClient) {
  35. OdsMq = client
  36. }
  37. func InitRabbitmq(addr, username, passwrod, vhost, exchangeName, queueName, routeBindKey string, callFunc func([]byte) error, isPub bool, consumerChannelCount int) *RabbitmqClient {
  38. // 创建连接
  39. rabbitmqClient := &RabbitmqClient{Addr: addr,
  40. Username: username, Passwrod: passwrod,
  41. Vhost: vhost, ExchangeName: exchangeName,
  42. RouteBindKey: routeBindKey, QueueName: queueName,
  43. CallFunc: callFunc, IsPub: isPub,
  44. ConsumerChannelCount: consumerChannelCount}
  45. var err error
  46. rabbitmqClient.Connection, err = rabbitmqClient.Connect()
  47. if err != nil {
  48. fmt.Println("dial err :", err)
  49. os.Exit(1)
  50. }
  51. // 启动发送channel
  52. if isPub {
  53. err = rabbitmqClient.InitPubChannel()
  54. if err != nil {
  55. os.Exit(2)
  56. }
  57. }
  58. return rabbitmqClient
  59. }
  60. func (r *RabbitmqClient) StartConsumer() {
  61. // 启动消费channel
  62. if r.CallFunc != nil {
  63. for i := 0; i < r.ConsumerChannelCount; i++ {
  64. err := r.InitConsumerChannel(i)
  65. if err != nil {
  66. os.Exit(3)
  67. }
  68. }
  69. }
  70. }
  71. func (r *RabbitmqClient) Connect() (*amqp.Connection, error) {
  72. // 创建连接
  73. connectAddr := fmt.Sprintf("amqp://%s:%s@%s/%s", r.Username, r.Passwrod, r.Addr, r.Vhost)
  74. connect, err := amqp.Dial(connectAddr)
  75. if err != nil {
  76. return nil, err
  77. }
  78. go func() {
  79. log.Printf("Closing: %s", <-connect.NotifyClose(make(chan *amqp.Error)))
  80. log.Printf("Trying to reconnect")
  81. for {
  82. err := r.Reconnect()
  83. if err != nil {
  84. log.Println("reconnect:", err)
  85. time.Sleep(3 * time.Second)
  86. } else {
  87. break
  88. }
  89. }
  90. }()
  91. return connect, nil
  92. }
  93. func (r *RabbitmqClient) InitPubChannel() error {
  94. // 获取通道
  95. var err error
  96. r.PubChannel, err = r.Connection.Channel()
  97. if err != nil {
  98. return err
  99. }
  100. // 创建交换机
  101. err = r.PubChannel.ExchangeDeclare(r.ExchangeName, "topic", true, false, false, true, nil)
  102. if err != nil {
  103. return err
  104. }
  105. // 创建队列
  106. _, err = r.PubChannel.QueueDeclare(r.QueueName, true, false, false, true, nil)
  107. if err != nil {
  108. return err
  109. }
  110. // 绑定交换机和队列
  111. err = r.PubChannel.QueueBind(r.QueueName, r.RouteBindKey, r.ExchangeName, true, nil)
  112. if err != nil {
  113. return err
  114. }
  115. return nil
  116. }
  117. func (r *RabbitmqClient) InitConsumerChannel(index int) error {
  118. // 获取通道
  119. channel, err := r.Connection.Channel()
  120. if err != nil {
  121. return err
  122. }
  123. // 创建交换机
  124. err = channel.ExchangeDeclare(r.ExchangeName, "topic", true, false, false, true, nil)
  125. if err != nil {
  126. return err
  127. }
  128. // 创建队列
  129. _, err = channel.QueueDeclare(r.QueueName, true, false, false, true, nil)
  130. if err != nil {
  131. return err
  132. }
  133. // 绑定交换机和队列
  134. err = channel.QueueBind(r.QueueName, r.RouteBindKey, r.ExchangeName, true, nil)
  135. if err != nil {
  136. return err
  137. }
  138. channel.Qos(1, 0, false)
  139. go ConsumeMsg(r.CallFunc, index, r.QueueName, channel)
  140. return nil
  141. }
  142. func ConsumeMsg(callFunc func([]byte) error, index int, queueName string, channel *amqp.Channel) {
  143. defer func() {
  144. if r := recover(); r != nil {
  145. err := fmt.Errorf("%+v", r)
  146. e := &jsonrpc2.Error{}
  147. if er := json.Unmarshal([]byte(err.Error()), e); er != nil {
  148. fmt.Println("panic:", err.Error())
  149. }
  150. }
  151. }()
  152. name := fmt.Sprintf("consumer%d", index)
  153. deliveries, err := channel.Consume(queueName, name, false, false, false, true, nil)
  154. if err != nil {
  155. fmt.Println("consume fail")
  156. return
  157. }
  158. //defer channel.Close()
  159. closeChan := make(chan *amqp.Error, 1)
  160. notifyClose := channel.NotifyClose(closeChan)
  161. for {
  162. select {
  163. case e := <-notifyClose:
  164. fmt.Printf("chan通道错误,e:%s", e.Error())
  165. close(closeChan)
  166. return
  167. case delivery := <-deliveries:
  168. // 手动ack确认
  169. // 注意: 这里只要调用了ack就是手动确认模式,
  170. // multiple 表示的是在此channel中先前所有未确认的deliveries都将被确认
  171. // 并不是表示设置为false就不进行当前ack确认,multiple=true: 用于设置批量消息确认;
  172. // 假设:在Channel(ch)上有5,6,7,8这4个delivery(ack) tags未确认;
  173. //情况1,delivery_tag=8 & multiple=true: 则5,6,7,8这4个tags都将被确认;
  174. //情况2,delivery_tag=8 & multiple=false:则只有8被确认,而5,6,7将不会被被确认;
  175. // 不做ack的话,在该消费者连接中断后,会重新投递给其他消费者
  176. err := callFunc(delivery.Body)
  177. if err != nil {
  178. fmt.Println("callFunc:", err.Error())
  179. } else {
  180. if err := delivery.Ack(false); err != nil {
  181. fmt.Println(err.Error())
  182. }
  183. }
  184. }
  185. /*v, ok := <-deliveries
  186. if ok {
  187. // 手动ack确认
  188. // 注意: 这里只要调用了ack就是手动确认模式,
  189. // multiple 表示的是在此channel中先前所有未确认的deliveries都将被确认
  190. // 并不是表示设置为false就不进行当前ack确认,multiple=true: 用于设置批量消息确认;
  191. // 假设:在Channel(ch)上有5,6,7,8这4个delivery(ack) tags未确认;
  192. //情况1,delivery_tag=8 & multiple=true: 则5,6,7,8这4个tags都将被确认;
  193. //情况2,delivery_tag=8 & multiple=false:则只有8被确认,而5,6,7将不会被被确认;
  194. // 不做ack的话,在该消费者连接中断后,会重新投递给其他消费者
  195. err := callFunc(v.Body)
  196. if err != nil {
  197. fmt.Println("callFunc:",err.Error())
  198. }else{
  199. if err := v.Ack(false); err != nil {
  200. fmt.Println(err.Error())
  201. }
  202. }
  203. } else {
  204. fmt.Println("Channel close")
  205. return
  206. }*/
  207. }
  208. }
  209. func (r *RabbitmqClient) Reconnect() error {
  210. // 创建连接
  211. var err error
  212. r.Connection, err = r.Connect()
  213. if err != nil {
  214. fmt.Println("dial err :", err)
  215. return err
  216. }
  217. // 启动发送channel
  218. if r.IsPub {
  219. err = r.InitPubChannel()
  220. if err != nil {
  221. return err
  222. }
  223. }
  224. // 启动消费channel
  225. if r.CallFunc != nil {
  226. for i := 0; i < r.ConsumerChannelCount; i++ {
  227. err = r.InitConsumerChannel(i)
  228. if err != nil {
  229. return err
  230. }
  231. }
  232. }
  233. return nil
  234. }
  235. func (r *RabbitmqClient) PublishMsg(data []byte) (err error) {
  236. defer func() {
  237. if r := recover(); r != nil {
  238. err = fmt.Errorf("%+v", r)
  239. e := &jsonrpc2.Error{}
  240. if er := json.Unmarshal([]byte(err.Error()), e); er != nil {
  241. fmt.Println("panic:", err.Error())
  242. }
  243. }
  244. }()
  245. err = r.PubChannel.Publish(r.ExchangeName, r.RouteBindKey, true, false, amqp.Publishing{
  246. DeliveryMode: amqp.Persistent, // 消息持久化
  247. Timestamp: time.Now(),
  248. ContentType: "text/plain",
  249. Body: data,
  250. })
  251. if err != nil {
  252. fmt.Println("publish msg err:", err)
  253. } else {
  254. fmt.Println("publish success!!!!")
  255. }
  256. return err
  257. }