Direct(路由选择)订阅消息模型

fanout消息模型中,一条消息会被所有已订阅的消费者消费。而在某些情况下,我们希望这个消息被特定的消费者消费。此时就可以用exchange中的Direct类型来实现,在fanout中,消息的转发会直接忽略Routing Key,而在Derict中,则会将消息自带的Routing KeyexchangeRouting Key相比较,相等则发送到该交换机绑定的队列。

在direct模型下

  • 消息发送方发送到exchange时需要给消息携带一个Routing key
  • exchange交换机不能将消息分发给所有的消费者了,exchange需要比较消息中的Routing key与自己路由中Ruoting key是否相等,相等的话则将消息发送给绑定的队列进行消费
  • 队列与交换机绑定不再是任意绑定,而需要指定Routing key

Direct

  • P:Provider(消息生产者),将消息发送给exchange时给消息设置一个Routing key
  • X:exchange(交换机),收到消息后将消息的routing key与自己的虚拟表去比对,相等则将消息转发给对应绑定的队列
  • C1:只接受Routing Keyerror的消息
  • C2:只接受Routing Keyinfo或着error或着warning的消息

生产者

该生产者发送routingkeywarning的消息

public class Provider {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMqUtil.getConnection();
        Channel channel = connection.createChannel();
        //通道声明交换机
        channel.exchangeDeclare("logs_direct","direct");
        String routingkey = "warning";
        //生产消息
        channel.basicPublish("logs_direct",routingkey,null,"基于direct模型的路由选择消息发送".getBytes());
        RabbitMqUtil.close(channel,connection);
    }
}

多消费者

消费者1只消费routingkeyerror的消息

customer1:

public class Customer1 {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMqUtil.getConnection();
        Channel channel = connection.createChannel();
        //通道绑定交换机
        channel.exchangeDeclare("logs_direct","direct");
        //创建一个临时队列
        String queue = channel.queueDeclare().getQueue();
        //声明route key
        String routingkey = "error";
        //队列绑定交换机,指明只接受消费route key为error的消息
        channel.queueBind(queue,"logs_direct",routingkey);
        //消费
        channel.basicConsume(queue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者一消费route key为:"+routingkey+"|"+"消息为:"+new String(body));
            }
        });
    }
}

消费者2只消费routingkeyerrorwarninginfo的消息

customer2:

public class Customer2 {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMqUtil.getConnection();
        Channel channel = connection.createChannel();
        //通道绑定交换机
        channel.exchangeDeclare("logs_direct","direct");
        //创建一个临时队列
        String queue = channel.queueDeclare().getQueue();
        //队列绑定交换机,指明只接受消费route key为error、warning、info的消息
        channel.queueBind(queue,"logs_direct","error");
        channel.queueBind(queue,"logs_direct","warning");
        channel.queueBind(queue,"logs_direct","info");
        //消费
        channel.basicConsume(queue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者二消费:"+new String(body));
            }
        });
    }
}

消费结果:

customer1
customer1
customer2
customer2

Topic(动态路由模型)

Topic
该模型在Direct模型上增加了两个通配符,在消费者端指定接收的routekey时可直接模糊匹配即可。

'*' (star) can substitute for exactly one word.
'#' (hash) can substitute for zero or more words.

channel.queueBind(queue,"topics","kexing.site.#");

指定kexing.site后面可以跟一个或着多个单词,比如kexing.site.happy.day,也可以为空

最后修改:2021 年 11 月 15 日 05 : 15 PM
如果觉得我的文章对你有用,请随意赞赏