3 客户端开发向导

felix.shao2025-02-18

3 客户端开发向导

TIP

 本小节主要介绍以下知识:

  • 连接
  • 交换器/队列的创建与绑定
  • 发送消息
  • 消费消息
  • 消费消息的确认和关闭连接

概述

 代码相对比较简单,有些就省略了。

连接 RabbitMQ

 连接指定的参数为:IP 地址、端口号、用户名、密码等:

TIP

 Connection 可以用来创建多个 Channel 实例,但是 Channel 实例不能在线程间共享,应用程序应该为每一个线程开辟一个 Channel 。某些情况下 Channel 的操作可以并发运行,但是在其他情况下会导致在网络上出现错误的通行帧交错,同时也会影响发送方确认机制的运行,所以多线程间共享 Channel 实例是非线程安全的。

 Channel 或者 Connection 中有个 isOpen 方法可以用来检测其是否已处于开启状态。但并不推荐在生产环境下的代码上使用,有可能会产生竞争。如下源码:

public boolean isOpen(){
    synchronized(this.monitor){
        return this.shutdownCause == null;
    }
}

 正确使用是通过捕获 com.rabbitmq.client.ShutdownSignalException 异常来处理。

使用交换器和队列

 交换器和队列是 AMQP 中 high-level 层面的构建模块,我们主要看下声明和绑定的方法。

exchangeDeclare 方法详解

 Channel 类的声明交换器相关方法介绍。

/**
 * 声明一个交换器
 * @param exchange 交换器名称
 * @param type 交换器类型,常见的如 fanout、direct、topic
 * @param durable 设置是否持久化, true 表示持久化, 反之是非持久化, 持久化可以将交换器存盘, 在服务器重启的时候不会丢失相关信息
 * @param autoDelete 设置是否自动删除, true 表示自动删除, 自动删除的前提是至少有一个队列或者交换器与这个交换器绑定, 之后所有与这个交换器绑定的队列或交换器都于此解绑, 注意不能错误的把这个参数理解为"当与此交换器连接的客户端都断开时, RabbitMQ 会自动删除本交换器"
 * @param internal 设置是否内置的, true 表示是内置的交换器, 客户端程序无法直接发送消息到这个交换器中, 只能通过交换器路由到交换器这个方式
 * @param arguments 其他一些结构化参数, 比如 alternate-exchange
 * @return 需要等待服务器的返回(服务器会返回Exchange.Declare-Ok这个AMQP命令)
 */
Exchange.DeclareOk exchangeDeclare(String exchange,
                                        String type,
                                        boolean durable,
                                        boolean autoDelete,
                                        boolean internal,
                                        Map<String, Object> arguments) throws IOException;

/**
 * 基本同 exchangeDeclare,无返回值。不推荐使用,因为声明完交换器后,但是服务器还并未完成交换器的创建,此时紧接着使用就会发生异常。
 */
... exchangeDeclareNoWait...

/**
 * 检测响应的交换器是否存在,存在则正常返回;如果不存在则抛出异常:404 channel exception,同时 Channel 也会被关闭
 * @param name 交换器名称
 */
Exchange.DeclareOk exchangeDeclarePassive(String name) throws IOException;

/**
 * 删除交换器
 * @param exchange 交换器名称
 * @param ifUnused true 设置是否在交换器没有被使用的情况下删除
 */
Exchange.DeleteOk exchangeDelete(String exchange, boolean ifUnused) throws IOException;

void exchangeDeleteNoWait(String exchange, boolean ifUnused) throws IOException;

queueDeclare 方法详解

 Channel 类的声明队列相关方法介绍。

/**
 * Declare a queue
 * @param queue 队列的名称
 * @param durable 设置是否持久化, true 表示队列为持久化, 持久化的队列会存盘, 在服务器重启的时候会保证不丢失相关信息
 * @param exclusive 设置是否排他, true 表示队列为排他的, 如果一个队列被设置为排他队列, 该队列仅对首次声明它的连接可见, 并在连接断开时自动删除, (这里需要注意三点:1.排他队列是基于连接Connection可见的, 同一个连接的不同信道Channel是可以同时访问同一连接创建的排他队列;"首次"是指如果一个连接己经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同;即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除,这种队列适用于一个客户端同时发送和读取消息的应用场景)
 * @param autoDelete 设置是否自动删除。为true 则设置队列为自动删除。自动删除的前提是, 至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。不能把这个参数错误地理解为: "当连接到此队列的所有客户端断开时,这个队列自动删除",因为生产者客户端创建这个队列,或者没有消费者客户端与这个队列连接时,都不会自动删除这个队列。
 * @param arguments 设置队列的其他一些参数, 如 x-message-ttl 等
 * @return 需要等待服务器的返回(服务器会返回Exchange.Declare-Ok这个AMQP命令)
 */
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                            Map<String, Object> arguments) throws IOException;

/**
 * 基本同 exchangeDeclare ,无返回值。不推荐使用,因为声明完交换器后,但是服务器还并未完成交换器的创建,此时紧接着使用就会发生异常。
 */
void queueDeclareNoWait(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                            Map<String, Object> arguments) throws IOException;

/**
 * 检测响应的队列是否存在,存在则正常返回;如果不存在则抛出异常:404 channel exception,同时 Channel 也会被关闭
 * @param queue 队列的名称
 */
Queue.DeclareOk queueDeclarePassive(String queue) throws IOException;    

/**
 * 删除队列
 * @param queue 队列的名称
 * @param ifUnused true 设置是否在交换器没有被使用的情况下删除
 * @param ifEmpty true 表示队列为空的情况下才能删除
 */
Queue.DeleteOk queueDelete(String queue, boolean ifUnused, boolean ifEmpty) throws IOException;

void queueDeleteNoWait(String queue, boolean ifUnused, boolean ifEmpty) throws IOException;

/**
 * 清空队列中的内容
 * @param queue 队列的名称
 */
Queue.PurgeOk queuePurge(String queue) throws IOException;

TIP

 生产者和消费者都能够使用 queueDeclare 来声明一个队列,但是如果消费者在同一个信道上订阅了另一个队列,就无法再声明队列了。必须先取消订阅,然后将信道置为“传输”模式,之后才能声明队列。

queueBind 方法详解

 Channel 类的队列和交换器绑定和解绑方法介绍。

/**
 * 绑定队列到交换器
 * @param queue 队列名
 * @param exchange 交换器名称
 * @param routingKey 路由 key 或者绑定 key
 * @param arguments 一些参数
 */
Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;

void queueBindNoWait(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;

/**
 * 解除绑定队列到交换器
 * @param queue 队列名
 * @param exchange 交换器名称
 * @param routingKey 路由 key 或者绑定 key
 */
Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey) throws IOException;

exchangeBind 方法详解

 Channel 类的交换器和交换器绑定和解绑方法介绍。
 此时消息流转流程为:生产者发送消息到交换器 source中,交换器source根据路由键找到与其匹配的另一个交换器 destination,并将消息转发到 destination 中,进而存储在 destination 绑定的队列 queue中。

/**
 * 绑定交换器到交换器
 * @param destination 目标交换器
 * @param source 源交换器
 * @param routingKey 路由 key 或者绑定 key
 * @param arguments 一些参数
 */
Exchange.BindOk exchangeBind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;

void exchangeBindNoWait(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;

/**
 * 解除绑定交换器到交换器
 * @param destination 目标交换器
 * @param source 源交换器
 * @param routingKey 路由 key 或者绑定 key
 */
Exchange.UnbindOk exchangeUnbind(String destination, String source, String routingKey) throws IOException;

何时创建

 RabbitMQ 的消息存储在队列中,交换器的使用并不真正耗费服务器的性能,而队列会。

  • 如果集群资源充足,而即将使用的队列所占用的资源又在可控的范围之内,为了增加业务程序的灵活性,也可以完全在业务程序中声明队列。
  • 至于是使用预先分配创建资源的静态方式还是动态的创建方式,需要从业务逻辑本身、公司运维体系和公司硬件资源等方面考虑。

发送消息

 发送消息方法介绍。

/**
 * 发送消息
 * @param exchange 交换器的名称,指明消息需要发送到哪个交换器中。如果设置为空字符串,则消息会被发送到 RabbitMQ 默认的交换器中。
 * @param routingKey 路由键,交换器根据路由键将消息存储到相应的队列之中。
 * @param mandatory true if the 'mandatory' flag is to be set。
 * @param immediate true if the 'immediate' flag is to be。
 * @param props 消息的基本属性集,其包含14个属性成员,分别有 contentType、contentEncoding、headers、deliveryMode、priority、correlationId、replyTo、expiration、messageId、private Date timestamp、type、userId、appId、clusterId。
 * @param body 消息体(payload),真正需要发送的消息。
 */
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
    throws IOException;

消费消息

 RabbitMQ 的消息模式分两种:推(Push)模式和拉(Pull)模式。推模式采用 Basic.consume 进行消费,而拉模式则是调用 Basic.Get 进行消费。

推模式

 在推模式中,可以通过持续订阅的方式来消费消息,使用到的相关类有com.rabbitmq.client.Consumercom.rabbitmq.client.DefaultConsumer
 相关源码如下:

public interface Consumer {

    void handleConsumeOk(String consumerTag);

    void handleCancelOk(String consumerTag);

    void handleCancel(String consumerTag) throws IOException;

    /**
     * 当 Channel 或者 Connection 关闭的时候会调用。
     */
    void handleShutdownSignal(String consumerTag, ShutdownSignalException sig);

    void handleRecoverOk(String consumerTag);

    void handleDelivery(String consumerTag, Envelope envelope,
                        AMQP.BasicProperties properties, byte[] body) throws IOException;
}
public interface Channel extends ShutdownNotifier, AutoCloseable {

    /**
     * 启动一个消费者,并返回服务端生成的消费者标识
     * @param queue 队列名
     * @param autoAck 设置是否自动确认。建议设成 false,即不自动确认。
     * @param consumerTag 消费者标签,用来区分多个消费者
     * @param noLocal 设置为true则表示不能将同一个 Connection 中生产者发送的消息传送给这个 Connection 的消费者
     * @param exclusive 设置是否排他
     * @param arguments 设置消费者的其他参数
     * @param deliverCallback 当一个消息发送过来后的回调接口
     * @param cancelCallback 当一个消费者取消订阅时的回调接口;取消消费者订阅队列时除了使用{@link Channel#basicCancel}之外的所有方式都会调用该回调方法
     * @param shutdownSignalCallback 当 channel/connection 关闭后回调
     */
    String basicConsume(String queue, boolean autoAck, String consumerTag,
     boolean noLocal, boolean exclusive, Map<String, Object> arguments, 
     DeliverCallback deliverCallback, CancelCallback cancelCallback, 
     ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException;

}

 接收消息一般通过实现 Consumer 接口或者继承 DefaultConsumer 类来实现。当调用与 Consumer 相关的API方法时,不同的订阅采用不同的消费者标签(consumerTag)来区分彼此,在同一个 Channel 中的消费者也需要通过唯一的消费者标签以作区分。关键消费代码清单如下:

boolean autoAck = false;
channel.basicQos(64);
channel.basicConsume(QUEUE_NAME, autoAck, "myCunsumerTag", new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("recv message: " + new String(body));
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        channel.basicAck(envelope.getDeliveryTag(), false);
    }
});

 注意,上面代码中显示地设置 autoAck 为 false,然后在接收到消息之后进行显示 ack 操作(channel.basicAck),对于消费者来说这个设置是非常必要的,可以防止消息不必要地丢失。

TIP

channel.basicCancel(String consumerTag);注意这行代码会首先触发 handleConsumerOk 方法,之后触发 handleDelivery 方法,最后才触发 handleCancelOk 方法。

 和生产者一样,消费者客户端同样需要考虑线程安全的问题。消费者客户端的这些 callback 会被分配到与 Channel 不同的线程池上,这意味着消费者客户端可以安全地调用这些阻塞方法,比如 channel.queueDeclare、channel.basicCancel 等。
 每个 Channel 都拥有自己独立的线程。最常用的做法是一个 Channel 对应一个消费者,也就是意味着消费者彼此之间没有任何关联。当然也可以在一个 Channel 中维持多个消费者,但是要注意一个问题,如果 Channel 中的一个消费者一直在运行,那么其他消费者的 callback 会被“耽搁”。

拉模式

 通过 channel.basicGet 方法可以单条地获取消息,其返回值是 GetResponse。对应源码如下:

/**
 * 拉消息
 * @param queue 队列名
 * @param 设置是否自动确认。建议设成 false,即不自动确认。
 */
GetResponse basicGet(String queue, boolean autoAck) throws IOException;

 拉模式的关键代码如下所示:

GetResponse response = channel.basicGet(QUEUE_NAME, false);
System.out.println(new String(response.getBody()));
channel.basicAck(response.getEnvelope().getDeliveryTag(), false);

TIP

 注意要点:Basic.Consume 将信道置为投递模式,直到取消队列的订阅为止。在投递模式期间,RabbitMQ 会不断地推送消息给消费者,当然推送消息的个数还是会受 Basic.Qos 的限制。如果只想从队列获得单条消息而不是持续订阅,建议还是使用 Basic.Get 进行消费。但是不能将 Basic.Get 放在一个循环里来代替 Basic.Consume,这样做会严重影响 RabbitMQ 的性能。如果要实现高吞吐量,消费者理应使用 Basic.Consume 方法。

消费端的确认和拒绝

消费端的确认

 即RabbitMQ 的消息确认机制(message acknowledgement)。消费者在订阅队列时,可以指定 autoAck 参数,当其为 true 时,RabbitMQ 会等待消费者显示地回复确认信号后才从内存(或者磁盘)中移去消息(实质上时先打上删除标记,之后再删除)。当 autoAck 等于 true 时,RabbitMQ 会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正地消费到了这些消息。
 采用消息确认机制后,只要设置 autoAck 参数为 false,消费者就有足够的事件处理消息(任务),不用担心处理消息过程中消费者进程挂掉后消息丢失的问题,因为 RabbitMQ 会一直等待持有消息直到消费者显示调用Basic.Ack 命令为止。RabbitMQ 不会为未确认的消息设置过期时间。
 RabbitMQ 的 Web 管理平台上可以看到当前队列中的“Ready”状态和“Unacknowledged”状态的消息数,分别对应上文中的等待投递给消费者的消息数和以及投递给消费者但是未收到确认信号的消息数。也可以通过相应的命令来查看上述信息:$ rabbitmqctl list_queues name messages_ready

消费端的拒绝

 通过 channel.basicReject 方法可以拒绝消息。对应源码如下:

/**
 * 拒绝消息 
 * @param deliveryTag 消息的编号
 * @param requeue true: RabbitMQ 会重新将这条消息存入队列,以便可以发送给下一个订阅的消费者,false:立即把消息从队列中移除,而不会把它发送给新的消费者。
 */
void basicReject(long deliveryTag, boolean requeue) throws IOException;

 Basic.Reject 命令一次只能拒绝一条消息,如果想要批量拒绝消息,则可以使用Basic.Nack命令。消费者客户端可以调用channel.basicNack方法来实现,方法定义如下:

/**
 * 批量拒绝消息
 * @param deliveryTag 消息的编号
 * @param multiple true: 表示拒绝 deliveryTag 编号之前所有未被当前消费者确认的消息;false: 和 basicReject 一样,拒绝编号为 deliveryTag 的一条消息
 * @param requeue true: RabbitMQ 会重新将这条消息存入队列,以便可以发送给下一个订阅的消费者,false:立即把消息从队列中移除,而不会把它发送给新的消费者。
 */
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;

TIP

 将channel.basicReject 或者 channel.basicNack 中的 requeue 设置为 false,可以启用“死信队列”的功能。它可以通过检测被拒绝或者未送达的消息来追踪问题。

 对于requeue,AMQP 中还有一个命令 Basic.Recover 具备可重入队列的特性。其对应的客户端方法为:

Basic.RecoverOk basicRecover() throws IOException;

/**
 * 用来请求 RabbitMQ 重新发送还未被确认的消息。
 * @param requeue true:未被确认的消息会被重新加入到队列中,这样对于同一条消息来说,可能会被分配给与之前不同的消费者;false:那么消息会被分配给予之前相同的消费者。默认是 true。
 */
Basic.RecoverOk basicRecover(boolean requeue) throws IOException;

关闭连接

 在应用程序使用完之后,需要关闭连接,释放资源:

channel.close();
conn.close();

 显示地关闭 Channel 是个好习惯,但这不是必须的,在 Connection 关闭的时候,Channel 也会自动关闭。
 AMQP 协议中的 Connection 和 Channel 采用同样的方式来管理网络失败、内部错误和显示地关闭连接。Connection 和 Channel 所具备的生命周期如下所述。

  • Open:开启状态,代表当前对象可以使用。
  • Closing:正在关闭的状态。当前对象被显示地通知调用关闭方法(shutdown),这样就产生了一个关闭请求让其内部对象进行相应的操作,并等待这些关闭操作的完成。
  • closed:已经关闭状态。当前对象已经接收到所有的内部对象已完成关闭动作的通知,并且也关闭了自身。

 在 Connection 和 Channel 中,与关闭相关的方法有 addShutdownListener (ShutdownListener listener) 和 removeShutdownLister (ShutdownListener listener)。当 Connection 或者 Channel 的状态转变为 Closed 的时候会调用 ShutdownListener。而且如果将一个 ShutdownListener注册到一个已经处于 Closed状态的对象(这里特指Connection和Channel)时,会立即调用 ShutdownListener。
 getCloseReason 方法可以让你直到对象关闭的原因;isOpen 方法检测对象当前是否处于开启状态;close(int closeCode, String closeMessage) 方法显示地通知当前对象执行关闭操作。

 示例代码如下:

// addShutdownListener
conn.addShutdownListener(new ShutdownListener() {
    @Override
    public void shutdownCompleted(ShutdownSignalException cause) {
        System.out.println("断开连接");
        if(cause.isHardError()){
            Connection c1 = (Connection) cause.getReference();
            if(!cause.isInitiatedByApplication()){
                Method method = cause.getReason();
            }
        } else {
            Channel c2 = (Channel) cause.getReference();
        }
    }
});

参考文献

  • [RabbitMQ实战指南]
Last Updated 2/18/2025, 5:05:12 PM