|
- // Copyright 2019 getensh.com. All rights reserved.
- // Use of this source code is governed by getensh.com.
- package parser
- import (
- "adm-management/config"
- "encoding/json"
- "fmt"
- "log"
- "os"
- "time"
- "github.com/streadway/amqp"
- "git.getensh.com/common/gopkgsv2/logger"
- "go.uber.org/zap"
- "google.golang.org/grpc/status"
- )
- type RabbitmqClient struct {
- Addr string
- Username string
- Passwrod string
- Vhost string
- ExchangeName string
- QueueName string
- RouteBindKey string
- Connection *amqp.Connection
- PubChannel *amqp.Channel
- CallFunc func([]byte) error
- IsPub bool
- ConsumerChannelCount int
- }
- var OdsMq *RabbitmqClient
- func RabbitmqHandler(conf *config.Configure) {
- OdsMq = InitRabbitmq(
- conf.OdsRabbitmq.Addr,
- conf.OdsRabbitmq.Username,
- conf.OdsRabbitmq.Passwrod,
- conf.OdsRabbitmq.Vhost,
- conf.OdsRabbitmq.ExchangeName,
- conf.OdsRabbitmq.QueueName,
- conf.OdsRabbitmq.RouteBindKey,
- nil,
- true,
- 0,
- )
- }
- func InitRabbitmq(addr, username, passwrod, vhost, exchangeName, queueName, routeBindKey string, callFunc func([]byte) error, isPub bool, consumerChannelCount int) *RabbitmqClient {
- // 创建连接
- rabbitmqClient := &RabbitmqClient{Addr: addr,
- Username: username, Passwrod: passwrod,
- Vhost: vhost, ExchangeName: exchangeName,
- RouteBindKey: routeBindKey, QueueName: queueName,
- CallFunc: callFunc, IsPub: isPub,
- ConsumerChannelCount: consumerChannelCount}
- var err error
- rabbitmqClient.Connection, err = rabbitmqClient.Connect()
- if err != nil {
- fmt.Println("dial err :", err)
- os.Exit(1)
- }
- // 启动发送channel
- if isPub {
- err = rabbitmqClient.InitPubChannel()
- if err != nil {
- os.Exit(2)
- }
- }
- return rabbitmqClient
- }
- func (r *RabbitmqClient) StartConsumer() {
- // 启动消费channel
- if r.CallFunc != nil {
- for i := 0; i < r.ConsumerChannelCount; i++ {
- err := r.InitConsumerChannel(i)
- if err != nil {
- os.Exit(3)
- }
- }
- }
- }
- func (r *RabbitmqClient) Connect() (*amqp.Connection, error) {
- // 创建连接
- connectAddr := fmt.Sprintf("amqp://%s:%s@%s/%s", r.Username, r.Passwrod, r.Addr, r.Vhost)
- connect, err := amqp.Dial(connectAddr)
- if err != nil {
- return nil, err
- }
- go func() {
- log.Printf("Closing: %s", <-connect.NotifyClose(make(chan *amqp.Error)))
- log.Printf("Trying to reconnect")
- for {
- err := r.Reconnect()
- if err != nil {
- log.Println("reconnect:", err)
- time.Sleep(3 * time.Second)
- } else {
- break
- }
- }
- }()
- return connect, nil
- }
- func (r *RabbitmqClient) InitPubChannel() error {
- // 获取通道
- var err error
- r.PubChannel, err = r.Connection.Channel()
- if err != nil {
- return err
- }
- // 创建交换机
- err = r.PubChannel.ExchangeDeclare(r.ExchangeName, "topic", true, false, false, true, nil)
- if err != nil {
- return err
- }
- // 创建队列
- _, err = r.PubChannel.QueueDeclare(r.QueueName, true, false, false, true, nil)
- if err != nil {
- return err
- }
- // 绑定交换机和队列
- err = r.PubChannel.QueueBind(r.QueueName, r.RouteBindKey, r.ExchangeName, true, nil)
- if err != nil {
- return err
- }
- return nil
- }
- func (r *RabbitmqClient) InitConsumerChannel(index int) error {
- // 获取通道
- channel, err := r.Connection.Channel()
- if err != nil {
- return err
- }
- // 创建交换机
- err = channel.ExchangeDeclare(r.ExchangeName, "topic", true, false, false, true, nil)
- if err != nil {
- return err
- }
- // 创建队列
- _, err = channel.QueueDeclare(r.QueueName, true, false, false, true, nil)
- if err != nil {
- return err
- }
- // 绑定交换机和队列
- err = channel.QueueBind(r.QueueName, r.RouteBindKey, r.ExchangeName, true, nil)
- if err != nil {
- return err
- }
- go ConsumeMsg(r.CallFunc, index, r.QueueName, channel)
- return nil
- }
- func ConsumeMsg(callFunc func([]byte) error, index int, queueName string, channel *amqp.Channel) {
- defer func() {
- if r := recover(); r != nil {
- err := fmt.Errorf("%+v", r)
- e := &status.Status{}
- if er := json.Unmarshal([]byte(err.Error()), e); er != nil {
- logger.Error("err",
- zap.String("system_err", err.Error()),
- zap.Stack("stacktrace"))
- }
- }
- }()
- name := fmt.Sprintf("consumer%d", index)
- deliveries, err := channel.Consume(queueName, name, false, false, false, true, nil)
- if err != nil {
- fmt.Println("consume fail")
- return
- }
- //defer channel.Close()
- closeChan := make(chan *amqp.Error, 1)
- notifyClose := channel.NotifyClose(closeChan)
- for {
- select {
- case e := <-notifyClose:
- fmt.Printf("chan通道错误,e:%s", e.Error())
- close(closeChan)
- return
- case delivery := <-deliveries:
- // 手动ack确认
- // 注意: 这里只要调用了ack就是手动确认模式,
- // multiple 表示的是在此channel中先前所有未确认的deliveries都将被确认
- // 并不是表示设置为false就不进行当前ack确认,multiple=true: 用于设置批量消息确认;
- // 假设:在Channel(ch)上有5,6,7,8这4个delivery(ack) tags未确认;
- //情况1,delivery_tag=8 & multiple=true: 则5,6,7,8这4个tags都将被确认;
- //情况2,delivery_tag=8 & multiple=false:则只有8被确认,而5,6,7将不会被被确认;
- // 不做ack的话,在该消费者连接中断后,会重新投递给其他消费者
- err := callFunc(delivery.Body)
- if err != nil {
- fmt.Println("callFunc:", err.Error())
- } else {
- if err := delivery.Ack(false); err != nil {
- fmt.Println(err.Error())
- }
- }
- }
- /*v, ok := <-deliveries
- if ok {
- // 手动ack确认
- // 注意: 这里只要调用了ack就是手动确认模式,
- // multiple 表示的是在此channel中先前所有未确认的deliveries都将被确认
- // 并不是表示设置为false就不进行当前ack确认,multiple=true: 用于设置批量消息确认;
- // 假设:在Channel(ch)上有5,6,7,8这4个delivery(ack) tags未确认;
- //情况1,delivery_tag=8 & multiple=true: 则5,6,7,8这4个tags都将被确认;
- //情况2,delivery_tag=8 & multiple=false:则只有8被确认,而5,6,7将不会被被确认;
- // 不做ack的话,在该消费者连接中断后,会重新投递给其他消费者
- err := callFunc(v.Body)
- if err != nil {
- fmt.Println("callFunc:",err.Error())
- }else{
- if err := v.Ack(false); err != nil {
- fmt.Println(err.Error())
- }
- }
- } else {
- fmt.Println("Channel close")
- return
- }*/
- }
- }
- func (r *RabbitmqClient) Reconnect() error {
- // 创建连接
- var err error
- r.Connection, err = r.Connect()
- if err != nil {
- fmt.Println("dial err :", err)
- return err
- }
- // 启动发送channel
- if r.IsPub {
- err = r.InitPubChannel()
- if err != nil {
- return err
- }
- }
- // 启动消费channel
- if r.CallFunc != nil {
- for i := 0; i < r.ConsumerChannelCount; i++ {
- err = r.InitConsumerChannel(i)
- if err != nil {
- return err
- }
- }
- }
- return nil
- }
- func (r *RabbitmqClient) PublishMsg(data []byte) (err error) {
- defer func() {
- if r := recover(); r != nil {
- err := fmt.Errorf("%+v", r)
- e := &status.Status{}
- if er := json.Unmarshal([]byte(err.Error()), e); er != nil {
- logger.Error("err",
- zap.String("system_err", err.Error()),
- zap.Stack("stacktrace"))
- }
- }
- }()
- fmt.Println("publish msg:", string(data))
- err = r.PubChannel.Publish(r.ExchangeName, r.RouteBindKey, true, false, amqp.Publishing{
- DeliveryMode: amqp.Persistent, // 消息持久化
- Timestamp: time.Now(),
- ContentType: "text/plain",
- Body: data,
- })
- if err != nil {
- fmt.Println("publish msg err:", err)
- } else {
- fmt.Println("publish success!!!!")
- }
- return err
- }
|