// Copyright 2019 getensh.com. All rights reserved. // Use of this source code is governed by getensh.com. package mq import ( "adm-ods/common.in/jsonrpc2" "encoding/json" "fmt" "log" //"github.com/go-kit/kit/transport/amqp" //"github.com/go-kit/kit/transport/amqp" "github.com/streadway/amqp" "os" "time" ) type RabbitmqClient struct { Addr string Username string Passwrod string Vhost string ExchangeName string QueueName string RouteBindKey string Connection *amqp.Connection PubChannel *amqp.Channel CallFunc func([]byte) error IsPub bool ConsumerChannelCount int } var DwsMq *RabbitmqClient var OdsMq *RabbitmqClient func SetDwsMq(client *RabbitmqClient) { DwsMq = client } func SetOdsMq(client *RabbitmqClient) { OdsMq = client } func InitRabbitmq(addr, username, passwrod, vhost, exchangeName, queueName, routeBindKey string, callFunc func([]byte) error, isPub bool, consumerChannelCount int) *RabbitmqClient { // 创建连接 rabbitmqClient := &RabbitmqClient{Addr: addr, Username: username, Passwrod: passwrod, Vhost: vhost, ExchangeName: exchangeName, RouteBindKey: routeBindKey, QueueName: queueName, CallFunc: callFunc, IsPub: isPub, ConsumerChannelCount: consumerChannelCount} var err error rabbitmqClient.Connection, err = rabbitmqClient.Connect() if err != nil { fmt.Println("dial err :", err) os.Exit(1) } // 启动发送channel if isPub { err = rabbitmqClient.InitPubChannel() if err != nil { os.Exit(2) } } return rabbitmqClient } func (r *RabbitmqClient) StartConsumer() { // 启动消费channel if r.CallFunc != nil { for i := 0; i < r.ConsumerChannelCount; i++ { err := r.InitConsumerChannel(i) if err != nil { os.Exit(3) } } } } func (r *RabbitmqClient) Connect() (*amqp.Connection, error) { // 创建连接 connectAddr := fmt.Sprintf("amqp://%s:%s@%s/%s", r.Username, r.Passwrod, r.Addr, r.Vhost) connect, err := amqp.Dial(connectAddr) if err != nil { return nil, err } go func() { log.Printf("Closing: %s", <-connect.NotifyClose(make(chan *amqp.Error))) log.Printf("Trying to reconnect") for { err := r.Reconnect() if err != nil { log.Println("reconnect:", err) time.Sleep(3 * time.Second) } else { break } } }() return connect, nil } func (r *RabbitmqClient) InitPubChannel() error { // 获取通道 var err error r.PubChannel, err = r.Connection.Channel() if err != nil { return err } // 创建交换机 err = r.PubChannel.ExchangeDeclare(r.ExchangeName, "topic", true, false, false, true, nil) if err != nil { return err } // 创建队列 _, err = r.PubChannel.QueueDeclare(r.QueueName, true, false, false, true, nil) if err != nil { return err } // 绑定交换机和队列 err = r.PubChannel.QueueBind(r.QueueName, r.RouteBindKey, r.ExchangeName, true, nil) if err != nil { return err } return nil } func (r *RabbitmqClient) InitConsumerChannel(index int) error { // 获取通道 channel, err := r.Connection.Channel() if err != nil { return err } // 创建交换机 err = channel.ExchangeDeclare(r.ExchangeName, "topic", true, false, false, true, nil) if err != nil { return err } // 创建队列 _, err = channel.QueueDeclare(r.QueueName, true, false, false, true, nil) if err != nil { return err } // 绑定交换机和队列 err = channel.QueueBind(r.QueueName, r.RouteBindKey, r.ExchangeName, true, nil) if err != nil { return err } channel.Qos(1, 0, false) go ConsumeMsg(r.CallFunc, index, r.QueueName, channel) return nil } func ConsumeMsg(callFunc func([]byte) error, index int, queueName string, channel *amqp.Channel) { defer func() { if r := recover(); r != nil { err := fmt.Errorf("%+v", r) e := &jsonrpc2.Error{} if er := json.Unmarshal([]byte(err.Error()), e); er != nil { fmt.Println("panic:", err.Error()) } } }() name := fmt.Sprintf("consumer%d", index) deliveries, err := channel.Consume(queueName, name, false, false, false, true, nil) if err != nil { fmt.Println("consume fail") return } //defer channel.Close() closeChan := make(chan *amqp.Error, 1) notifyClose := channel.NotifyClose(closeChan) for { select { case e := <-notifyClose: fmt.Printf("chan通道错误,e:%s", e.Error()) close(closeChan) return case delivery := <-deliveries: // 手动ack确认 // 注意: 这里只要调用了ack就是手动确认模式, // multiple 表示的是在此channel中先前所有未确认的deliveries都将被确认 // 并不是表示设置为false就不进行当前ack确认,multiple=true: 用于设置批量消息确认; // 假设:在Channel(ch)上有5,6,7,8这4个delivery(ack) tags未确认; //情况1,delivery_tag=8 & multiple=true: 则5,6,7,8这4个tags都将被确认; //情况2,delivery_tag=8 & multiple=false:则只有8被确认,而5,6,7将不会被被确认; // 不做ack的话,在该消费者连接中断后,会重新投递给其他消费者 err := callFunc(delivery.Body) if err != nil { fmt.Println("callFunc:", err.Error()) } else { if err := delivery.Ack(false); err != nil { fmt.Println(err.Error()) } } } /*v, ok := <-deliveries if ok { // 手动ack确认 // 注意: 这里只要调用了ack就是手动确认模式, // multiple 表示的是在此channel中先前所有未确认的deliveries都将被确认 // 并不是表示设置为false就不进行当前ack确认,multiple=true: 用于设置批量消息确认; // 假设:在Channel(ch)上有5,6,7,8这4个delivery(ack) tags未确认; //情况1,delivery_tag=8 & multiple=true: 则5,6,7,8这4个tags都将被确认; //情况2,delivery_tag=8 & multiple=false:则只有8被确认,而5,6,7将不会被被确认; // 不做ack的话,在该消费者连接中断后,会重新投递给其他消费者 err := callFunc(v.Body) if err != nil { fmt.Println("callFunc:",err.Error()) }else{ if err := v.Ack(false); err != nil { fmt.Println(err.Error()) } } } else { fmt.Println("Channel close") return }*/ } } func (r *RabbitmqClient) Reconnect() error { // 创建连接 var err error r.Connection, err = r.Connect() if err != nil { fmt.Println("dial err :", err) return err } // 启动发送channel if r.IsPub { err = r.InitPubChannel() if err != nil { return err } } // 启动消费channel if r.CallFunc != nil { for i := 0; i < r.ConsumerChannelCount; i++ { err = r.InitConsumerChannel(i) if err != nil { return err } } } return nil } func (r *RabbitmqClient) PublishMsg(data []byte) (err error) { defer func() { if r := recover(); r != nil { err = fmt.Errorf("%+v", r) e := &jsonrpc2.Error{} if er := json.Unmarshal([]byte(err.Error()), e); er != nil { fmt.Println("panic:", err.Error()) } } }() err = r.PubChannel.Publish(r.ExchangeName, r.RouteBindKey, true, false, amqp.Publishing{ DeliveryMode: amqp.Persistent, // 消息持久化 Timestamp: time.Now(), ContentType: "text/plain", Body: data, }) if err != nil { fmt.Println("publish msg err:", err) } else { fmt.Println("publish success!!!!") } return err }