rabbitmq.go 7.0 KB


  1. // Copyright 2019 getensh.com. All rights reserved.
  2. // Use of this source code is governed by getensh.com.
  3. package mq
  4. import (
  5. "adm-ads/common.in/jsonrpc2"
  6. "encoding/json"
  7. "fmt"
  8. "log"
  9. //"github.com/go-kit/kit/transport/amqp"
  10. //"github.com/go-kit/kit/transport/amqp"
  11. "github.com/streadway/amqp"
  12. "os"
  13. "time"
  14. )
  15. type RabbitmqClient struct {
  16. Addr string
  17. Username string
  18. Passwrod string
  19. Vhost string
  20. ExchangeName string
  21. QueueName string
  22. RouteBindKey string
  23. Connection *amqp.Connection
  24. PubChannel *amqp.Channel
  25. CallFunc func([]byte)error
  26. IsPub bool
  27. ConsumerChannelCount int
  28. }
  29. var AdsMq *RabbitmqClient
  30. func SetAdsMq(client *RabbitmqClient){
  31. AdsMq = client
  32. }
  33. func InitRabbitmq(addr, username, passwrod, vhost, exchangeName, queueName, routeBindKey string,callFunc func([]byte)error,isPub bool,consumerChannelCount int) *RabbitmqClient {
  34. // 创建连接
  35. rabbitmqClient := &RabbitmqClient{Addr: addr,
  36. Username: username, Passwrod: passwrod,
  37. Vhost: vhost, ExchangeName: exchangeName,
  38. RouteBindKey: routeBindKey, QueueName: queueName,
  39. CallFunc:callFunc,IsPub:isPub,
  40. ConsumerChannelCount:consumerChannelCount}
  41. var err error
  42. rabbitmqClient.Connection ,err = rabbitmqClient.Connect()
  43. if err != nil{
  44. fmt.Println("dial err :", err)
  45. os.Exit(1)
  46. }
  47. // 启动发送channel
  48. if isPub{
  49. err = rabbitmqClient.InitPubChannel()
  50. if err != nil {
  51. os.Exit(2)
  52. }
  53. }
  54. return rabbitmqClient
  55. }
  56. func (r *RabbitmqClient) StartConsumer(){
  57. // 启动消费channel
  58. if r.CallFunc!= nil{
  59. for i:=0;i<r.ConsumerChannelCount;i++{
  60. err := r.InitConsumerChannel(i)
  61. if err != nil{
  62. os.Exit(3)
  63. }
  64. }
  65. }
  66. }
  67. func (r *RabbitmqClient) Connect() (*amqp.Connection,error) {
  68. // 创建连接
  69. connectAddr := fmt.Sprintf("amqp://%s:%s@%s/%s", r.Username, r.Passwrod, r.Addr, r.Vhost)
  70. connect, err := amqp.Dial(connectAddr)
  71. if err != nil {
  72. return nil,err
  73. }
  74. go func() {
  75. log.Printf("Closing: %s", <-connect.NotifyClose(make(chan *amqp.Error)))
  76. log.Printf("Trying to reconnect")
  77. for {
  78. err := r.Reconnect()
  79. if err != nil {
  80. log.Println("reconnect:", err)
  81. time.Sleep(3 * time.Second)
  82. } else {
  83. break
  84. }
  85. }
  86. }()
  87. return connect,nil
  88. }
  89. func (r *RabbitmqClient) InitPubChannel() error {
  90. // 获取通道
  91. var err error
  92. r.PubChannel, err = r.Connection.Channel()
  93. if err != nil {
  94. return err
  95. }
  96. // 创建交换机
  97. err = r.PubChannel.ExchangeDeclare(r.ExchangeName, "topic", true, false, false, true, nil)
  98. if err != nil {
  99. return err
  100. }
  101. // 创建队列
  102. _, err = r.PubChannel.QueueDeclare(r.QueueName, true, false, false, true, nil)
  103. if err != nil {
  104. return err
  105. }
  106. // 绑定交换机和队列
  107. err = r.PubChannel.QueueBind(r.QueueName, r.RouteBindKey, r.ExchangeName, true, nil)
  108. if err != nil {
  109. return err
  110. }
  111. return nil
  112. }
  113. func (r *RabbitmqClient) InitConsumerChannel(index int) error {
  114. // 获取通道
  115. channel, err := r.Connection.Channel()
  116. if err != nil {
  117. return err
  118. }
  119. // 创建交换机
  120. err = channel.ExchangeDeclare(r.ExchangeName, "topic", true, false, false, true, nil)
  121. if err != nil {
  122. return err
  123. }
  124. // 创建队列
  125. _, err = channel.QueueDeclare(r.QueueName, true, false, false, true, nil)
  126. if err != nil {
  127. return err
  128. }
  129. // 绑定交换机和队列
  130. err =channel.QueueBind(r.QueueName, r.RouteBindKey, r.ExchangeName, true, nil)
  131. if err != nil {
  132. return err
  133. }
  134. channel.Qos(1,0,false)
  135. go ConsumeMsg(r.CallFunc,index,r.QueueName,channel)
  136. return nil
  137. }
  138. func ConsumeMsg(callFunc func([]byte) error,index int,queueName string,channel *amqp.Channel) {
  139. defer func() {
  140. if r := recover(); r != nil {
  141. err := fmt.Errorf("%+v", r)
  142. e := &jsonrpc2.Error{}
  143. if er := json.Unmarshal([]byte(err.Error()), e); er != nil {
  144. fmt.Println("panic:", err.Error())
  145. }
  146. }
  147. }()
  148. name := fmt.Sprintf("consumer%d",index)
  149. deliveries, err := channel.Consume(queueName, name, false, false, false, true, nil)
  150. if err != nil {
  151. fmt.Println("consume fail")
  152. return
  153. }
  154. //defer channel.Close()
  155. closeChan := make(chan *amqp.Error, 1)
  156. notifyClose := channel.NotifyClose(closeChan)
  157. for {
  158. select {
  159. case e := <-notifyClose:
  160. fmt.Printf("chan通道错误,e:%s", e.Error())
  161. close(closeChan)
  162. return
  163. case delivery := <-deliveries:
  164. // 手动ack确认
  165. // 注意: 这里只要调用了ack就是手动确认模式,
  166. // multiple 表示的是在此channel中先前所有未确认的deliveries都将被确认
  167. // 并不是表示设置为false就不进行当前ack确认,multiple=true: 用于设置批量消息确认;
  168. // 假设:在Channel(ch)上有5,6,7,8这4个delivery(ack) tags未确认;
  169. //情况1,delivery_tag=8 & multiple=true: 则5,6,7,8这4个tags都将被确认;
  170. //情况2,delivery_tag=8 & multiple=false:则只有8被确认,而5,6,7将不会被被确认;
  171. // 不做ack的话,在该消费者连接中断后,会重新投递给其他消费者
  172. err := callFunc(delivery.Body)
  173. if err != nil {
  174. fmt.Println("callFunc:",err.Error())
  175. }else{
  176. if err := delivery.Ack(false); err != nil {
  177. fmt.Println(err.Error())
  178. }
  179. }
  180. }
  181. /*v, ok := <-deliveries
  182. if ok {
  183. // 手动ack确认
  184. // 注意: 这里只要调用了ack就是手动确认模式,
  185. // multiple 表示的是在此channel中先前所有未确认的deliveries都将被确认
  186. // 并不是表示设置为false就不进行当前ack确认,multiple=true: 用于设置批量消息确认;
  187. // 假设:在Channel(ch)上有5,6,7,8这4个delivery(ack) tags未确认;
  188. //情况1,delivery_tag=8 & multiple=true: 则5,6,7,8这4个tags都将被确认;
  189. //情况2,delivery_tag=8 & multiple=false:则只有8被确认,而5,6,7将不会被被确认;
  190. // 不做ack的话,在该消费者连接中断后,会重新投递给其他消费者
  191. err := callFunc(v.Body)
  192. if err != nil {
  193. fmt.Println("callFunc:",err.Error())
  194. }else{
  195. if err := v.Ack(false); err != nil {
  196. fmt.Println(err.Error())
  197. }
  198. }
  199. } else {
  200. fmt.Println("Channel close")
  201. return
  202. }*/
  203. }
  204. }
  205. func (r *RabbitmqClient) Reconnect() error {
  206. // 创建连接
  207. var err error
  208. r.Connection ,err = r.Connect()
  209. if err != nil{
  210. fmt.Println("dial err :", err)
  211. return err
  212. }
  213. // 启动发送channel
  214. if r.IsPub{
  215. err = r.InitPubChannel()
  216. if err != nil {
  217. return err
  218. }
  219. }
  220. // 启动消费channel
  221. if r.CallFunc!= nil{
  222. for i:=0;i<r.ConsumerChannelCount;i++{
  223. err = r.InitConsumerChannel(i)
  224. if err != nil{
  225. return err
  226. }
  227. }
  228. }
  229. return nil
  230. }
  231. func (r *RabbitmqClient) PublishMsg(data []byte) (err error) {
  232. defer func() {
  233. if r := recover(); r != nil {
  234. err = fmt.Errorf("%+v", r)
  235. e := &jsonrpc2.Error{}
  236. if er := json.Unmarshal([]byte(err.Error()), e); er != nil {
  237. fmt.Println("panic:", err.Error())
  238. }
  239. }
  240. }()
  241. err = r.PubChannel.Publish(r.ExchangeName, r.RouteBindKey, true, false, amqp.Publishing{
  242. DeliveryMode: amqp.Persistent, // 消息持久化
  243. Timestamp: time.Now(),
  244. ContentType: "text/plain",
  245. Body: data,
  246. })
  247. if err != nil {
  248. fmt.Println("publish msg err:", err)
  249. } else {
  250. fmt.Println("publish success!!!!")
  251. }
  252. return err
  253. }