rabbitmq.go 7.2 KB


  1. // Copyright 2019 getensh.com. All rights reserved.
  2. // Use of this source code is governed by getensh.com.
  3. package mq
  4. import (
  5. "adm-dws/common.in/jsonrpc2"
  6. "encoding/json"
  7. "fmt"
  8. "log"
  9. //"github.com/go-kit/kit/transport/amqp"
  10. //"github.com/go-kit/kit/transport/amqp"
  11. "github.com/streadway/amqp"
  12. "os"
  13. "time"
  14. )
  15. type RabbitmqClient struct {
  16. Addr string
  17. Username string
  18. Passwrod string
  19. Vhost string
  20. ExchangeName string
  21. QueueName string
  22. RouteBindKey string
  23. Connection *amqp.Connection
  24. PubChannel *amqp.Channel
  25. CallFunc func([]byte)error
  26. IsPub bool
  27. ConsumerChannelCount int
  28. }
  29. var AdsMq *RabbitmqClient
  30. var DwsMq *RabbitmqClient
  31. var Md5Mq *RabbitmqClient
  32. func SetAdsMq(client *RabbitmqClient){
  33. AdsMq = client
  34. }
  35. func SetDwsMq(client *RabbitmqClient){
  36. DwsMq = client
  37. }
  38. func SetMd5Mq(client *RabbitmqClient){
  39. Md5Mq = client
  40. }
  41. func InitRabbitmq(addr, username, passwrod, vhost, exchangeName, queueName, routeBindKey string,callFunc func([]byte)error,isPub bool,consumerChannelCount int) *RabbitmqClient {
  42. // 创建连接
  43. rabbitmqClient := &RabbitmqClient{Addr: addr,
  44. Username: username, Passwrod: passwrod,
  45. Vhost: vhost, ExchangeName: exchangeName,
  46. RouteBindKey: routeBindKey, QueueName: queueName,
  47. CallFunc:callFunc,IsPub:isPub,
  48. ConsumerChannelCount:consumerChannelCount}
  49. var err error
  50. rabbitmqClient.Connection ,err = rabbitmqClient.Connect()
  51. if err != nil{
  52. fmt.Println("dial err :", err)
  53. os.Exit(1)
  54. }
  55. // 启动发送channel
  56. if isPub{
  57. err = rabbitmqClient.InitPubChannel()
  58. if err != nil {
  59. os.Exit(2)
  60. }
  61. }
  62. return rabbitmqClient
  63. }
  64. func (r *RabbitmqClient) StartConsumer(){
  65. // 启动消费channel
  66. if r.CallFunc!= nil{
  67. for i:=0;i<r.ConsumerChannelCount;i++{
  68. err := r.InitConsumerChannel(i)
  69. if err != nil{
  70. os.Exit(3)
  71. }
  72. }
  73. }
  74. }
  75. func (r *RabbitmqClient) Connect() (*amqp.Connection,error) {
  76. // 创建连接
  77. connectAddr := fmt.Sprintf("amqp://%s:%s@%s/%s", r.Username, r.Passwrod, r.Addr, r.Vhost)
  78. connect, err := amqp.Dial(connectAddr)
  79. if err != nil {
  80. return nil,err
  81. }
  82. go func() {
  83. log.Printf("Closing: %s", <-connect.NotifyClose(make(chan *amqp.Error)))
  84. fmt.Println("退出系统")
  85. os.Exit(1)
  86. log.Printf("Trying to reconnect")
  87. for {
  88. err := r.Reconnect()
  89. if err != nil {
  90. log.Println("reconnect:", err)
  91. time.Sleep(3 * time.Second)
  92. } else {
  93. break
  94. }
  95. }
  96. }()
  97. return connect,nil
  98. }
  99. func (r *RabbitmqClient) InitPubChannel() error {
  100. // 获取通道
  101. var err error
  102. r.PubChannel, err = r.Connection.Channel()
  103. if err != nil {
  104. return err
  105. }
  106. // 创建交换机
  107. err = r.PubChannel.ExchangeDeclare(r.ExchangeName, "topic", true, false, false, true, nil)
  108. if err != nil {
  109. return err
  110. }
  111. // 创建队列
  112. _, err = r.PubChannel.QueueDeclare(r.QueueName, true, false, false, true, nil)
  113. if err != nil {
  114. return err
  115. }
  116. // 绑定交换机和队列
  117. err = r.PubChannel.QueueBind(r.QueueName, r.RouteBindKey, r.ExchangeName, true, nil)
  118. if err != nil {
  119. return err
  120. }
  121. return nil
  122. }
  123. func (r *RabbitmqClient) InitConsumerChannel(index int) error {
  124. // 获取通道
  125. channel, err := r.Connection.Channel()
  126. if err != nil {
  127. return err
  128. }
  129. // 创建交换机
  130. err = channel.ExchangeDeclare(r.ExchangeName, "topic", true, false, false, true, nil)
  131. if err != nil {
  132. return err
  133. }
  134. // 创建队列
  135. _, err = channel.QueueDeclare(r.QueueName, true, false, false, true, nil)
  136. if err != nil {
  137. return err
  138. }
  139. // 绑定交换机和队列
  140. err =channel.QueueBind(r.QueueName, r.RouteBindKey, r.ExchangeName, true, nil)
  141. if err != nil {
  142. return err
  143. }
  144. channel.Qos(1,0,false)
  145. go ConsumeMsg(r.CallFunc,index,r.QueueName,channel)
  146. return nil
  147. }
  148. func ConsumeMsg(callFunc func([]byte) error,index int,queueName string,channel *amqp.Channel) {
  149. defer func() {
  150. if r := recover(); r != nil {
  151. err := fmt.Errorf("%+v", r)
  152. e := &jsonrpc2.Error{}
  153. if er := json.Unmarshal([]byte(err.Error()), e); er != nil {
  154. fmt.Println("panic:", err.Error())
  155. }
  156. }
  157. }()
  158. name := fmt.Sprintf("consumer%d",index)
  159. deliveries, err := channel.Consume(queueName, name, false, false, false, true, nil)
  160. if err != nil {
  161. fmt.Println("consume fail")
  162. return
  163. }
  164. //defer channel.Close()
  165. closeChan := make(chan *amqp.Error, 1)
  166. notifyClose := channel.NotifyClose(closeChan)
  167. for {
  168. select {
  169. case e := <-notifyClose:
  170. fmt.Printf("chan通道错误,e:%s", e.Error())
  171. close(closeChan)
  172. return
  173. case delivery := <-deliveries:
  174. // 手动ack确认
  175. // 注意: 这里只要调用了ack就是手动确认模式,
  176. // multiple 表示的是在此channel中先前所有未确认的deliveries都将被确认
  177. // 并不是表示设置为false就不进行当前ack确认,multiple=true: 用于设置批量消息确认;
  178. // 假设:在Channel(ch)上有5,6,7,8这4个delivery(ack) tags未确认;
  179. //情况1,delivery_tag=8 & multiple=true: 则5,6,7,8这4个tags都将被确认;
  180. //情况2,delivery_tag=8 & multiple=false:则只有8被确认,而5,6,7将不会被被确认;
  181. // 不做ack的话,在该消费者连接中断后,会重新投递给其他消费者
  182. err := callFunc(delivery.Body)
  183. if err != nil {
  184. fmt.Println("callFunc:",err.Error())
  185. }else{
  186. if err := delivery.Ack(false); err != nil {
  187. fmt.Println(err.Error())
  188. }
  189. }
  190. }
  191. /*v, ok := <-deliveries
  192. if ok {
  193. // 手动ack确认
  194. // 注意: 这里只要调用了ack就是手动确认模式,
  195. // multiple 表示的是在此channel中先前所有未确认的deliveries都将被确认
  196. // 并不是表示设置为false就不进行当前ack确认,multiple=true: 用于设置批量消息确认;
  197. // 假设:在Channel(ch)上有5,6,7,8这4个delivery(ack) tags未确认;
  198. //情况1,delivery_tag=8 & multiple=true: 则5,6,7,8这4个tags都将被确认;
  199. //情况2,delivery_tag=8 & multiple=false:则只有8被确认,而5,6,7将不会被被确认;
  200. // 不做ack的话,在该消费者连接中断后,会重新投递给其他消费者
  201. err := callFunc(v.Body)
  202. if err != nil {
  203. fmt.Println("callFunc:",err.Error())
  204. }else{
  205. if err := v.Ack(false); err != nil {
  206. fmt.Println(err.Error())
  207. }
  208. }
  209. } else {
  210. fmt.Println("Channel close")
  211. return
  212. }*/
  213. }
  214. }
  215. func (r *RabbitmqClient) Reconnect() error {
  216. // 创建连接
  217. var err error
  218. r.Connection ,err = r.Connect()
  219. if err != nil{
  220. fmt.Println("dial err :", err)
  221. return err
  222. }
  223. // 启动发送channel
  224. if r.IsPub{
  225. err = r.InitPubChannel()
  226. if err != nil {
  227. return err
  228. }
  229. }
  230. // 启动消费channel
  231. if r.CallFunc!= nil{
  232. for i:=0;i<r.ConsumerChannelCount;i++{
  233. err = r.InitConsumerChannel(i)
  234. if err != nil{
  235. return err
  236. }
  237. }
  238. }
  239. return nil
  240. }
  241. func (r *RabbitmqClient) PublishMsg(data []byte) (err error) {
  242. defer func() {
  243. if r := recover(); r != nil {
  244. err = fmt.Errorf("%+v", r)
  245. e := &jsonrpc2.Error{}
  246. if er := json.Unmarshal([]byte(err.Error()), e); er != nil {
  247. fmt.Println("panic:", err.Error())
  248. }
  249. }
  250. }()
  251. err = r.PubChannel.Publish(r.ExchangeName, r.RouteBindKey, true, false, amqp.Publishing{
  252. DeliveryMode: amqp.Persistent, // 消息持久化
  253. Timestamp: time.Now(),
  254. ContentType: "text/plain",
  255. Body: data,
  256. })
  257. if err != nil {
  258. fmt.Println("publish msg err:", err)
  259. } else {
  260. fmt.Println("publish success!!!!")
  261. }
  262. return err
  263. }