图图
发布于 2024-02-22 / 41 阅读 / 0 评论 / 3 点赞

rabbitMq的使用和代码示例

使用场景

在消息队列模式中,消息生产者发送到一个中间件中的消息队列,而消息消费者则从该队列中接收和处理消息。这种方式使得发送者和接收者可以独立地进行处理,而无需直接交互,从而实现解耦。发送者和接收者只需要知道如何与消息队列进行通信,而不需要知道彼此的存在。它通过在发送者和接收者之间建立一个消息队列来实现异步通信解耦,在高并发的业务中,mq可以有效的缓冲大量涌入的数据,也常用于服务之间的削峰

横向对比

市面上有许多成熟的消息队列非常多,例如:

RabbitMQ:简单,社区活跃,更新快,资料丰富,支持大部分的主流语言,中小型项目首选。

Kafka:高吞吐、可持久化、可水平扩展、支持流数据处理等多种特性而被广泛使用,大量用于大数据领域

RocketMQ:阿里开源,具有高性能、高可靠、高实时、分布式特点

ActiveMQ:高可用、高性能、可伸缩的企业级面向消息服务的系统

安装部署

docker pull rabbitmq:management
docker run -id --name=rabbitmq --privileged=true --restart=always -v ./data/rabbitmq:/var/lib/rabbitmq -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123456 rabbitmq:management

通过客户端登陆管理面板:127.0.0.1:15672

Exchange(交换机)

RabbitMQ消息传递模型中的核心思想是生产者从不将任何消息直接发送到队列。实际上,生产者经常甚至根本不知道是否将消息传递到任何队列。

相反,生产者只能将消息发送到交换机。交流是一件非常简单的事情。一方面,它接收来自生产者的消息,另一方面,将它们推入队列。交易所必须确切知道如何处理收到的消息。是否应将其附加到特定队列?是否应该将其附加到许多队列中?还是应该丢弃它。规则由交换机类型定义 。

Exchange 的类型有下面四种:

direct(直连交换机):将队列绑定到交换机,消息的 routeKey 需要与队列绑定的 routeKey 相同。

fanout (扇形交换机):不处理 routeKey ,直接把消息转发到与其绑定的所有队列中。

topic(主题交换机):根据一定的规则,根据 routeKey 把消息转发到符合规则的队列中,其中 # 用于匹配符合一个或者多个词(范围更广), * 用于匹配一个词。

headers (头部交换机):根据消息的 headers 转发消息而不是根据 routeKey 来转发消息, 其中 header 是一个 Map,也就意味着不仅可以匹配字符串类型,也可以匹配其他类型数据。 规则可以分为所有键值对匹配或者单一键值对匹配。

六种消息模式

在的 RabbitMQ 中,出现了六种消息传播模式: RabbitMQ 官网说明的六种模式

简单工作队列

Simple Work Queue,点对点模式,一条消息由一个消费者进行消费。(当有多个消费者时,默认使用轮训机制把消息分配给消费者)。

工作队列

Work Queues,也叫公平队列,能者多劳的消息队列模型。队列必须接收到来自消费者的 手动ack 才可以继续往消费者发送消息。

发布订阅模式

Publish/Subscribe,一条消息被多个消费者消费。

路由模式

Routing,有选择的接收消息。

主题模式

Topics,通过一定的规则来选择性的接收消息

RPC 模式

发布者发布消息,并且通过 RPC 方式等待结果。

注意:官网最后有 Publisher Confirms 为消息确认机制。指的是生产者如何发送可靠的消息。

用户角色类型

None

  • 不能访问 management plugin

Management

  • 查看自己相关节点信息

  • 列出自己可以通过AMQP登入的虚拟机

  • 查看自己的虚拟机节点virtual hosts的queues,exchanges和bindings信息

  • 查看和关闭自己的channels和connections

  • 查看有关自己的虚拟机节点virtual hosts的统计信息。包括其他用户在这个节点virtual hosts中的活动信息

Policymaker

  • 包含management所有权跟

  • 查看和创建和删除自己的virtual hosts所属的policies和parameters信息

Monitoring

  • 包含management所有权限

  • 罗列出所有的virtual hosts,包括不能登录的virtual hosts

  • 查看其他用户的connections和channels信息

  • 查看节点级别的数据如clustering和memory使用情况

  • 查看所有的virtual hosts的全局统计信息。

Administrator

  • 最高权限

  • 可以创建和删除 virtual hosts

  • 可以查看,创建和删除users

  • 查看创建permissions

  • 关闭所有用户的connections

死信队列

死信交换机(Dead Letter Exchange,DLX)用于处理那些无法被正常消费的消息。当消息成为死信时,RabbitMQ 会将其重新发布到一个指定的交换机(即死信交换机)中,然后可以由与该交换机绑定的队列进行进一步处理,即为死信队列。进入死信消息的产生有以下几种条件:

  1. 消息被消费者拒绝(并且设置了 requeue=false)。

  2. 消息的 TTL(Time To Live,生存时间)过期。

  3. 队列达到最大长度而无法再容纳更多消息。

使用死信队列的消息过期时间应该是均匀的,因为普通队列里的消息是有顺序的,后入的消息必须等待先入的消息出队才可以,即使后入消息存活时间要比先入的消息长,也会导致取出的数据达不到预期的取出时间。所以如果想实现延迟消息,可以使用另一种更推荐的方式来实现。

延迟队列

延迟队列是rabbitMq一种比较特殊的队列,需要额外安装插件rabbitmq_delayed_message_exchange来实现,当消息投递到这种交换机时,它能够暂存一段时间,直到达到设定的延迟时间后再将消息投递到相应的队列。

rabbitMq社区插件连接传送门

制作一个含有延迟交换机插件的镜像

FROM rabbitmq:management
COPY ["./plugins/rabbitmq_delayed_message_exchange-3.13.0.ez" , "/plugins/"]
RUN chmod 777 /plugins/rabbitmq_delayed_message_exchange-3.13.0.ez
RUN rabbitmq-plugins enable --offline rabbitmq_delayed_message_exchange

用新镜像启动容器

docker image build --file Dockerfile --tag rabbitmq:management_delayed_exc .
docker run -id --name=rabbitmq --privileged=true --restart=always -v ./data/rabbitmq:/var/lib/rabbitmq -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123456 rabbitmq:management_delayed_exc

常见问题

消息丢失

默认情况下,Consume进行消费的时候autoAck默认设置为true,也就是生产者消息投递后,默认收到,收到后队列的消息就会删除。这里容易出现的问题是假设队列有10000条数据,4个消费者,每个消费者会平均分到2500条数据,如果其中某一个消费者挂了,会导致这个消费者的2000多条数据丢失,这里设置autoAck为false,手动进行消费确认(消息确认Ack机制),等到消费完成以后进行确认,如果不进行确认,消费者就会重复的消息投递到其他队列。

重复消费

特殊情况下,消费者接收到消息但没有ack,导致消息重回队列,等待消费。

乱序消费

消息队列本身是有序的,但是用户在多个消费者情况下,在没有阻塞情况下因为会平均分配消息到消费者队列,就会出现消息3提前与消息1完成处理的情况,在某些特定场景对顺序消费有要求,可以限制消费者队列数量或进行队列阻塞。

消息堆积

生产速度高于消费速度,导致消息积压在消费队列,可以考虑对生产者客户端进行限流,或者增加消费者数量。同时可以增加消费者队列prefetch设置数量让消费队列每次接收更多的消息

演示代码

mq.go

package rabbitMq

import (
    "log"

    "github.com/streadway/amqp"
) //导入mq包

// MQURL 格式 amqp://账号:密码@rabbitmq服务器地址:端口号/vhost (默认是5672端口)
// 端口可在 /etc/rabbitmq/rabbitmq-env.conf 配置文件设置,也可以启动后通过netstat -tlnp查看
const MQURL = "amqp://admin:[email protected]:5672/"

// RabbitMQ  结构体
type RabbitMQ struct {
    Conn    *amqp.Connection
    Channel *amqp.Channel
    // 队列名称
    QueueName string
    // 交换机
    Exchange string
    // routing Key
    RoutingKey string
    //MQ链接字符串
    Mqurl string
}

// 创建结构体实例
func NewRabbitMQ(queueName, exchange, routingKey string) *RabbitMQ {
    rabbitMQ := RabbitMQ{
       QueueName:  queueName,
       Exchange:   exchange,
       RoutingKey: routingKey,
       Mqurl:      MQURL,
    }
    var err error
    //创建rabbitmq连接
    rabbitMQ.Conn, err = amqp.Dial(rabbitMQ.Mqurl)
    checkErr(err, "创建连接失败")

    //创建Channel
    rabbitMQ.Channel, err = rabbitMQ.Conn.Channel()
    checkErr(err, "创建channel失败")

    return &rabbitMQ

}

// 释放资源,建议NewRabbitMQ获取实例后 配合defer使用
func (mq *RabbitMQ) ReleaseRes() {
    mq.Conn.Close()
    mq.Channel.Close()
}

// 错误处理
func checkErr(err error, meg string) {
    if err != nil {
       log.Fatalf("%s:%s\n", meg, err)
    }
}

rabbitMq/producer/main.go

package main

import (
	"fmt"
	"github.com/streadway/amqp"
	"rabbitmq/rabbitMq"
	"math/rand"
	"sync"
	"time"
)

// 生产者发布流程
func main() {
	// 初始化mq
	mq := rabbitMq.NewRabbitMQ("queue_test", "exchange_test", "key_test")
	defer mq.ReleaseRes() // 完成任务释放资源

	// 1.声明队列
	/*
	  如果只有一方声明队列,可能会导致下面的情况:
	   a)消费者是无法订阅或者获取不存在的MessageQueue中信息
	   b)消息被Exchange接受以后,如果没有匹配的Queue,则会被丢弃
	  为了避免上面的问题,所以最好选择两方一起声明
	  ps:如果客户端尝试建立一个已经存在的消息队列,Rabbit MQ不会做任何事情,并返回客户端建立成功的
	*/
	_, err := mq.Channel.QueueDeclare( // 返回的队列对象内部记录了队列的一些信息,这里没什么用
		mq.QueueName, // 队列名
		true,         // 是否持久化
		false,        // 是否自动删除(前提是至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。注意:生产者客户端创建这个队列,或者没有消费者客户端与这个队列连接时,都不会自动删除这个队列)
		false,        // 是否为排他队列(排他的队列仅对“首次”声明的conn可见[一个conn中的其他channel也能访问该队列],conn结束后队列删除)
		false,        // 是否阻塞
		nil,          //额外属性
	)
	if err != nil {
		fmt.Println("声明队列失败", err)
		return
	}

	// 2.声明交换器
	err = mq.Channel.ExchangeDeclare(
		mq.Exchange, //交换器名
		"direct",    //exchange type:一般用fanout、direct、topic
		true,        // 是否持久化
		false,       //是否自动删除(自动删除的前提是至少有一个队列或者交换器与这和交换器绑定,之后所有与这个交换器绑定的队列或者交换器都与此解绑)
		false,       //设置是否内置的。true表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器这种方式
		false,       // 是否阻塞
		nil,         // 额外属性
	)
	if err != nil {
		fmt.Println("声明交换器失败", err)
		return
	}

	// 3.建立Binding(可随心所欲建立多个绑定关系)
	err = mq.Channel.QueueBind(
		mq.QueueName,  // 绑定的队列名称
		mq.RoutingKey, // bindkey 用于消息路由分发的key
		mq.Exchange,   // 绑定的exchange名
		false,         // 是否阻塞
		nil,           // 额外属性
	)
	if err != nil {
		fmt.Println("绑定队列和交换器失败", err)
		return
	}

	type Index struct {
		Num int64
		sync.Mutex
	}

	var i Index
    forever := make(chan bool)

	for {

		for j := 0; j < 10; j++ {
			go func() {

				// 生成0.1-1秒的随机休眠时间
				sleepTime := time.Duration(rand.Intn(900)+100) * time.Millisecond
				time.Sleep(sleepTime)

				i.Lock() // 加锁

				mq.Channel.Publish(
					mq.Exchange,   // 交换器名
					mq.RoutingKey, // routing key
					false,         // 是否返回消息(匹配队列),如果为true, 会根据binding规则匹配queue,如未匹配queue,则把发送的消息返回给发送者
					false,         // 是否返回消息(匹配消费者),如果为true, 消息发送到queue后发现没有绑定消费者,则把发送的消息返回给发送者
					amqp.Publishing{ // 发送的消息,固定有消息体和一些额外的消息头,包中提供了封装对象
						Body:        []byte(fmt.Sprintf("msg:%d", i.Num)), // 消息内容
					},
				)

				i.Num++    // 自增
				i.Unlock() // 解锁

			}()
		}

		fmt.Println("消息投递完成")
	    <-forever
	}
} 

rabbitMq/consumer/main.go

package main

import (
	"fmt"
	"rabbitmq/rabbitMq"
	"time"
)

// 消费者1
func main() {
	// 初始化mq
	mq := rabbitMq.NewRabbitMQ("queue_normal_test", "", "key_dead_test")
	defer mq.ReleaseRes() // 完成任务释放资源

	// 从队列获取消息(消费者只关注队列)consume方式会不断的从队列中获取消息
	msgChanl, err := mq.Channel.Consume(
		mq.QueueName, // 队列名
		"",           // 消费者名,用来区分多个消费者,以实现公平分发或均等分发策略
		false,        // 是否自动应答,设置自动应答后从队列获取消息就会自动应答,队列消息删除,程序崩溃或退出会造成消息丢失
		false,        // 是否排他,true只允许连接一个消费者
		false,        // 是否接收只同一个连接中的消息,若为true,则只能接收别的conn中发送的消息
		false,        // 队列消费是否阻塞,阻塞以后同一时间只会把消息发送给一个队列,等待队列退出以后其他队列才会获取
		nil,          // 额外属性
	)
	if err != nil {
		fmt.Println("获取消息失败", err)
		return
	}

	var (
		i   int
		txt string
	)
	for msg := range msgChanl {

		now := time.Now().Format(time.DateTime)
		txt = fmt.Sprintf("消息获取时间:%s", now)

		if i%3 == 1 {
			txt += " reject"
			msg.Reject(false) // 拒绝消费
		} else {
			msg.Ack(true) // 主动应答
		}

		fmt.Println(txt)
		fmt.Println(string(msg.Body))
		fmt.Println("-----")
		i++
	}

}

rabbitMq/consumer2/main.go

package main

import (
    "fmt"
    "rabbitmq/rabbitMq"
    "time"
)

// 消费者2
func main() {
    // 初始化mq
    mq := rabbitMq.NewRabbitMQ("queue_dead_test", "exchange_dead_test", "key_dead_test")
    defer mq.ReleaseRes() // 完成任务释放资源

    // 1.声明队列(两端都要声明,原因在生产者处已经说明)
    _, err := mq.Channel.QueueDeclare( // 返回的队列对象内部记录了队列的一些信息,这里没什么用
       mq.QueueName, // 队列名
       true,         // 是否持久化
       false,        // 是否自动删除(前提是至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。注意:生产者客户端创建这个队列,或者没有消费者客户端与这个队列连接时,都不会自动删除这个队列)
       false,        // 是否为排他队列(排他的队列仅对“首次”声明的conn可见[一个conn中的其他channel也能访问该队列],conn结束后队列删除)
       false,        // 是否阻塞
       nil,          // 额外属性
    )
    if err != nil {
       fmt.Println("声明队列失败", err)
       return
    }

    // 2.从队列获取消息(消费者只关注队列)consume方式会不断的从队列中获取消息
    msgChanl, err := mq.Channel.Consume(
       mq.QueueName, // 队列名
       "",           // 消费者名,用来区分多个消费者,以实现公平分发或均等分发策略
       false,        // 是否自动应答,设置自动应答后从队列获取消息就会自动应答,队列消息删除,程序崩溃或退出会造成消息丢失
       false,        // 是否排他,true只允许连接一个消费者
       false,        // 是否接收只同一个连接中的消息,若为true,则只能接收别的conn中发送的消息
       false,        // 队列消费是否阻塞,阻塞以后同一时间只会把消息发送给一个队列,等待队列退出以后其他队列才会获取
       nil,          // 额外属性
    )
    if err != nil {
       fmt.Println("获取消息失败", err)
       return
    }

    var (
       txt string
    )
    for msg := range msgChanl {

       now := time.Now().Format(time.DateTime)
       txt = fmt.Sprintf("死信消息获取时间:%s", now)

       msg.Ack(true) // 主动应答

       fmt.Println(txt)
       fmt.Println(string(msg.Body))
       fmt.Println("-----")
    }

}

rabbitMq/delayed/main.go

package main

import (
    "fmt"
    "github.com/streadway/amqp"
    "rabbitmq/rabbitMq"
    "math/rand"
    "sync"
    "time"
)

// 延迟队列生产者
func main() {
    // 初始化mq
    mq := rabbitMq.NewRabbitMQ("queue_delayed_test", "exchange_delayed_test", "key_delayed_test")
    defer mq.ReleaseRes() // 完成任务释放资源

    _, err := mq.Channel.QueueDeclare( // 返回的队列对象内部记录了队列的一些信息,这里没什么用
       mq.QueueName, // 队列名
       true,         // 是否持久化
       false,        // 是否自动删除(前提是至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。注意:生产者客户端创建这个队列,或者没有消费者客户端与这个队列连接时,都不会自动删除这个队列)
       false,        // 是否为排他队列(排他的队列仅对“首次”声明的conn可见[一个conn中的其他channel也能访问该队列],conn结束后队列删除)
       false,        // 是否阻塞
       nil,          //额外属性
    )
    if err != nil {
       fmt.Println("声明队列失败", err)
       return
    }

    // 创建延时交换器
    err = mq.Channel.ExchangeDeclare(
       mq.Exchange,         // 交换器名称
       "x-delayed-message", // 交换器类型
       true,                // 是否持久化
       false,               // 是否自动删除
       false,               // 是否内部使用
       false,               // 是否等待服务器确认
       amqp.Table{
          "x-delayed-type": "direct", // 基础类型
       },
    )
    if err != nil {
       fmt.Println("声明交换器失败", err)
       return
    }

    // 绑定队列到交换器
    err = mq.Channel.QueueBind(
       mq.QueueName,  // 绑定的队列名称
       mq.RoutingKey, // bindkey 用于消息路由分发的key
       mq.Exchange,   // 绑定的exchange名
       false,         // 是否阻塞
       nil,           // 额外属性
    )
    if err != nil {
       fmt.Println("绑定队列和交换器失败", err)
       return
    }

    type Index struct {
       Num int64
       sync.Mutex
    }

    var (
       i       Index
       headers amqp.Table
       msg     string
       randExp int
    )

    for j := 0; j < 100; j++ {

       // 生成0.1-1秒的随机休眠时间
       sleepTime := time.Duration(rand.Intn(900)+100) * time.Millisecond
       time.Sleep(sleepTime)

       now := time.Now().Format(time.DateTime)

       // 随机1-10s延时
       randExp = (rand.Intn(9) * 100) + 1000
       // 发送延时消息
       headers = amqp.Table{
          "x-delay": int64(randExp), // 延时时间(毫秒)
       }
       msg = fmt.Sprintf("msg:%d send-time:%s", i.Num, now)

       mq.Channel.Publish(
          mq.Exchange,   // 交换器名
          mq.RoutingKey, // routing key
          false,         // 是否强制
          false,         // 是否立即发送
          amqp.Publishing{
             Headers: headers,
             Body:    []byte(msg), // 消息内容
          },
       )

       i.Num++ // 自增
    }

    fmt.Println("消息投递完成")
}

rabbitMq/dead_queue/main.go

package main

import (
    "fmt"
    "github.com/streadway/amqp"
    "rabbitmq/rabbitMq"
    "math/rand"
    "strconv"
    "sync"
    "time"
)

// 死信队列生产者模拟
func main() {
    // 初始化mq
    mq := rabbitMq.NewRabbitMQ("queue_dead_test", "exchange_dead_test", "key_dead_test")
    defer mq.ReleaseRes() // 完成任务释放资源

    // 创建死信交换机
    mq.Channel.ExchangeDeclare(
       mq.Exchange, // 死信交换机名称
       "direct",    // 交换机类型
       true,        // 是否持久化
       false,       // 是否自动删除
       false,       // 是否内部使用
       false,       // 是否等待服务器确认
       nil,         // 额外的参数
    )

    // 创建普通队列,并指定死信交换机参数
    mq.Channel.QueueDeclare(
       "queue_normal_test", // 普通队列名称
       true,                // 是否持久化
       false,               // 是否自动删除
       false,               // 是否排他
       false,               // 是否等待服务器确认
       amqp.Table{
          "x-dead-letter-exchange":    mq.Exchange,   // 死信交换机名称
          "x-dead-letter-routing-key": mq.RoutingKey, // 死信路由键
       },
    )

    // 创建死信队列
    mq.Channel.QueueDeclare(
       mq.QueueName, // 死信队列名称
       true,         // 是否持久化
       false,        // 是否自动删除
       false,        // 是否排他
       false,        // 是否等待服务器确认
       nil,          // 额外的参数
    )

    mq.Channel.QueueBind(
       mq.QueueName,  // 死信队列名称
       mq.RoutingKey, // 路由键(与普通队列中指定的死信路由键一致)
       mq.Exchange,   // 死信交换机名称
       false,         // 是否等待服务器确认
       nil,           // 额外的参数
    )

    // 发送消息到普通队列,队列消费去执行拒绝,消息转移到死信交换机,监听死信队列获取
    type Index struct {
       Num int64
       sync.Mutex
    }

    var i Index
    forever := make(chan bool)
    for j := 0; j < 10; j++ {
       go func() {

          for {
             // 生成0.1-1秒的随机休眠时间
             sleepTime := time.Duration(rand.Intn(900)+100) * time.Millisecond
             time.Sleep(sleepTime)

             i.Lock() // 加锁

             mq.Channel.Publish(
                "",                  // 交换器名
                "queue_normal_test", // 队列名称
                false,               // 是否返回消息(匹配队列),如果为true, 会根据binding规则匹配queue,如未匹配queue,则把发送的消息返回给发送者
                false,               // 是否返回消息(匹配消费者),如果为true, 消息发送到queue后发现没有绑定消费者,则把发送的消息返回给发送者
                amqp.Publishing{ // 发送的消息,固定有消息体和一些额外的消息头,包中提供了封装对象
                   Body:      []byte(fmt.Sprintf("msg:%d", i.Num)), // 消息内容
                   MessageId: strconv.Itoa(int(i.Num)),
                },
             )

             i.Num++    // 自增
             i.Unlock() // 解锁
          }
       }()
    }

    fmt.Println("消息投递完成")
    <-forever
}


RPC模式应用

例如项目中有一个需求,通过api下单购买实例,购买后运营商需要创建实例,等待时间较长,这里就可以使用远程调用模式,发起请求创建实例,投递消息,服务端收到消息处理请求创建实例,实例创建完成或者失败均需要通知客户端创建情况。以下是示例代码

package mqact

import (
	"auto_agent/common"
	"auto_agent/conf"
	"auto_agent/tools/linode"
	rabbitMq "auto_agent/tools/rabbitmq"
	"encoding/json"
	"github.com/streadway/amqp"
	"log"
)

// 下单队列
func BuyQueue(c *conf.SvcConfig) {
	// 初始化mq
	mq := rabbitMq.NewRabbitMQ(c.MQ.DSN, c.MQ.QueueName, "", c.MQ.RoutingKey)
	defer mq.ReleaseRes()

	q, err := mq.Channel.QueueDeclare(mq.QueueName, true, false, false, false, nil)
	if err != nil {
		log.Printf("MQ => 声明队列失败:%s", err.Error())
		return
	}

	msgChanl, err := mq.Channel.Consume(q.Name, "", false, false, false, false, nil)
	if err != nil {
		log.Fatalf("MQ => 获取消息失败:%s", err.Error())
	}

	for msg := range msgChanl {
		// 下单
		log.Printf("MQ => 收到消息:%s", string(msg.Body))

		res, err := createInstances(msg)
		if err != nil {
			log.Printf("MQ => 处理消息失败:%s", err.Error())
			msg.Nack(false, true)
			continue
		}

		// 将响应发送到回调队列
		err = mq.Channel.Publish("", msg.ReplyTo, false, false,
			amqp.Publishing{
				ContentType:   "text/plain",
				CorrelationId: msg.CorrelationId,
				Body:          []byte(res),
			})
		if err != nil {
			log.Fatalf("MQ => 回调消息投递失败: %s", err.Error())
		}

		msg.Ack(false)
	}
}

type MqCreateInstances struct {
	Region string `json:"region"`
	Plan   string `json:"plan"`
	OsID   string `json:"os_id"`
	Label  string `json:"label"`
	Pwd    string `json:"pwd"`
	SshKey string `json:"ssh_key"`
}

func createInstances(msg amqp.Delivery) ([]byte, error) {

	var data MqCreateInstances
	err := json.Unmarshal(msg.Body, &data)
	if err != nil {
		return nil, err
	}

	agent := linode.NewLinodeApi(conf.GetConfig())

	var res common.Response
	if res, err = agent.CreateInstances(data.Region, data.Plan, data.OsID, data.Label, data.Pwd, data.SshKey); err != nil {
		return nil, err
	}

	return json.Marshal(res)
}

测试生产者代码

package main

import (
    "encoding/json"
    "fmt"
    "github.com/streadway/amqp"
    "log"
    "math/rand"
)

type MqCreateInstances struct {
    Region string `json:"region"`
    Plan   string `json:"plan"`
    OsID   string `json:"os_id"`
    Label  string `json:"label"`
    Pwd    string `json:"pwd"`
    SshKey string `json:"ssh_key"`
}

func main() {
    // 连接到 RabbitMQ 服务器
    conn, err := amqp.Dial("amqp://admin:[email protected]:5672/")
    if err != nil {
       log.Fatalf("MQ => 服务链接失败: %s", err.Error())
    }
    defer conn.Close()

    // 创建一个通道
    ch, err := conn.Channel()
    if err != nil {
       log.Fatalf("MQ => 队列通道创建失败:%s", err.Error())
    }
    defer ch.Close()

    // 创建一个回调队列
    q, err := ch.QueueDeclare(
       "", false, false, true, false, nil)
    if err != nil {
       log.Fatalf("MQ => 回调队列创建失败:%s", err.Error())
    }

    data := MqCreateInstances{
       Region: "ca-central",
       Plan:   "g6-nanode-1",
       OsID:   "linode/centos7",
       Label:  "Yz-test666",
       Pwd:    "!Qq321852089",
       SshKey: "",
    }
    body, err := json.Marshal(data)
    if err != nil {
       log.Fatalf("MQ => Failed to marshal JSON: %s", err)
    }

    // 消息唯一id
    corrId := randomString(32)
    // 发送请求 
    // 注意:rpc模式key应该是server的队列名
    err = ch.Publish("", "linode", false, false,
       amqp.Publishing{
          ContentType:   "application/json",
          CorrelationId: corrId,
          ReplyTo:       q.Name,
          Body:          body,
          DeliveryMode:  amqp.Persistent, // 持久化消息
       })
    if err != nil {
       log.Fatalf("MQ => 消息推送失败: %s", err)
    } else {
       log.Printf("消息已发送成功!")
    }

    // 开始消费回调队列中的消息
    msgs, err := ch.Consume(q.Name, "", false, false, false, false, nil)
    if err != nil {
       log.Printf("MQ => 回调消息异常:%s", err.Error())
    }

    // 等待响应
    for d := range msgs {
       if corrId == d.CorrelationId {

          fmt.Printf("Received a response: %s", d.Body)
          d.Ack(false)
          break
       }
    }

    fmt.Println("the end!")
}

// 生成一个随机字符串
func randomString(l int) string {
    bytes := make([]byte, l)
    for i := 0; i < l; i++ {
       bytes[i] = byte(rand.Intn(26) + 97)
    }
    return string(bytes)
}