博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
[官网翻译]RabbitMQ基本消息队列使用
阅读量:6894 次
发布时间:2019-06-27

本文共 26455 字,大约阅读时间需要 88 分钟。

hot3.png

RabbitMQ 更像是一个邮件服务器,用户发送邮件(消息),到邮箱服务器(exchange),其他用户能够保证收到消息发送者的邮件(消息).

AMQP 服务器类似与邮件服务器, 每个交换器(exchange)都扮演了消息传送代理,每个消息队列(queue)都作为邮箱,而绑定(binding)则定义了每个传送代理中的路由表.发布者(producer)发送消息给独立的传送代理,然后传送代理(exchange)再路由(binding)消息到邮箱(queue)中.消费者(customer)从邮箱(queue)中收取消息.

RabbitMQ 有三个主要概念,生产者,队列,消费者

生产者 单纯的发送消息

队列 依赖主机的内存和磁盘,(这个可以通过配置文件修改参数)可以理解为一个缓存.

消费者 一直等待接收消息

rabbitmq默认配置virtual host 为 "/", exchange默认AMQP default,没有默认queue. 如果exchange不指定,则exchange为默认exchange.使用virtualHost和exchange可以方便实现"分区"的概念. 比如我想有交易和会员两个系统,我可以创建两个virtualhost,分别表示两个系统,这两个系统是相互隔离的. 跟activemq不同的是,rabbitmq更加灵活一点.

官方文档 1 hello world

创建rabbitmq的流程跟一般的模板一样,通过ConnectionFactory--Connection--Channel--获取客户端通道. 拿到channel之后,就可以进行发布或消费.对应的jms中的点对点队列.

在编码的时候,我们可以制定一端创建信息的规则,比如消费端进行声明exchange或者queue

下面是消费端代码,这个代码中没有exchange,默认为类型为direct的exchange是AMQP default,这里绑定了一个队列.

//1 工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");//2 连接Connection connection = factory.newConnection();//3 渠道Channel channel = connection.createChannel();//4 创建队列channel.queueDeclare(QUEUE_NAME,false,false,false,null);//5 设置消费channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){    [@Override](https://my.oschina.net/u/1162528)    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {        String msg = new String(body,"UTF-8");        System.out.println(name + " 接收到消息 msg = " + msg);    }});System.out.println("客户端启动.");latch.countDown();} catch (TimeoutException e) {e.printStackTrace();} catch (IOException e) {e.printStackTrace();}

下面是生产者服务端代码,注意,这里的basicPublish()的第二个参数,routingkey是消费端的queue的名称. 这一点,其实让人有点迷惑的.

//1 工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");//2 连接try (Connection connection = factory.newConnection()) {    //3 渠道    Channel channel = connection.createChannel();    //4 发布消息    String msg = "hello rabbitmq";    channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());    System.out.println(name + " 发送消息 msg = " + msg);    channel.close();} catch (TimeoutException e) {    e.printStackTrace();} catch (IOException e) {    e.printStackTrace();}

如果按照生产者/消费者--exchange--routingkey绑定--queue 的标准模式, 在下面的例子中exchange为hello.world,routingkey为key-hello,队列为QUEUE_NAME

:

//1 工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");//2 连接try (Connection connection = factory.newConnection()) {    //3 渠道    Channel channel = connection.createChannel();    //4 发布消息    String msg = "hello rabbitmq";    channel.basicPublish("hello.world","key-hello",null,msg.getBytes());    System.out.println(name + " 发送消息 msg = " + msg);    channel.close();} catch (TimeoutException e) {    e.printStackTrace();} catch (IOException e) {    e.printStackTrace();}

:

try {    //1 工厂    ConnectionFactory factory = new ConnectionFactory();    factory.setHost("localhost");    //2 连接    Connection connection = factory.newConnection();    //3 渠道    Channel channel = connection.createChannel();    channel.exchangeDeclare("hello.world","direct");    //4 创建队列    channel.queueDeclare(QUEUE_NAME,false,false,false,null);    channel.queueBind(QUEUE_NAME,"hello.world","key-hello");    //5 设置消费    channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){        [@Override](https://my.oschina.net/u/1162528)        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {            String msg = new String(body,"UTF-8");            System.out.println(name + " 接收到消息 msg = " + msg);        }    });    System.out.println("客户端启动.");    latch.countDown();

官方文档 2 扇出模式(订阅模式)

用过jms的都知道topic订阅队列,在rabbitmq的对应的是exchange的"fanout". 实现原理,直接使用exchange,队列自动创建,不在通过routingkey绑定exchange和queue,从而实现在exchange下的queue都可以接收到消息.

下面是,创建一个exchange,类型为fanout,然后获取一个系统的queue,然后将queue和exchange绑定在一起.

Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_LOG, ExchangeTypes.FANOUT);String queueName = channel.queueDeclare().getQueue();channel.queueBind(queueName,EXCHANGE_LOG,"");Consumer consumer = new DefaultConsumer(channel){    [@Override](https://my.oschina.net/u/1162528)    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {        String log = new String(body);        System.out.println(name+"<<<<  " + log);    }};channel.basicConsume(queueName,true,consumer);System.out.println(name +" 客户端等待中....." );latch.countDown();

下面是

Channel channel = connection.createChannel();//channel.exchangeDeclare(EXCHANGE_LOG, ExchangeTypes.FANOUT);for (int i = 0; i < MSG_NUM; i++) {    String logMsg =name +">>>> 日志.... "+i;    channel.basicPublish(EXCHANGE_LOG,"",null,logMsg.getBytes());}channel.close();

官方文档 3 主题模式

首先区别正则表达式,只有2种通配符,*表示一个字(一个英文word,不是字母),#表示多个英文字. 这个实现是通过带有通配符的绑定关系,通过绑定关系,将不同的消息分发到不同的queue.

,创建一个exchange,类型为topic,使用通配符绑定关系绑定exchange和自身的队列.

try {    Connection connection = factory.newConnection();    Channel channel = connection.createChannel();    channel.exchangeDeclare(TOPIC,ExchangeTypes.TOPIC);    String queueName = channel.queueDeclare().getQueue();    for (int i = 0; i < routingKeys.length; i++) {        String routingKey = routingKeys[i];        channel.queueBind(queueName, TOPIC, routingKey);    }    channel.basicConsume(queueName,new DefaultConsumer(channel){        [@Override](https://my.oschina.net/u/1162528)        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {            System.out.println(name+ " : " +new String(body) );        }    });    System.out.println(name+" 等待中.....");    latch.countDown();} catch (IOException e) {    e.printStackTrace();} catch (TimeoutException e) {    e.printStackTrace();}

try (Connection connection = factory.newConnection()){    Channel channel = connection.createChannel();    channel.exchangeDeclare(TOPIC, ExchangeTypes.TOPIC);    for (String routingKey:routingKeys) {        channel.basicPublish(TOPIC,routingKey,null,routingKey.getBytes());    }    channel.close();} catch (IOException e) {    e.printStackTrace();} catch (TimeoutException e) {    e.printStackTrace();}

官方文档 4 自定义路由

看了上面的简单,订阅,主题,你会更加理解exchange,routingkey,queue之间的. 上面的都是一些比较特殊场景的应用.

,创建一个exchange,默认类型为direct,然后通过routingkey绑定不同的queue,值得注意的是消费端可以将exchange绑定不同的queue.

try {    Connection connection = factory.newConnection();    Channel channel = connection.createChannel();    channel.exchangeDeclare(ROUNTING, ExchangeTypes.DIRECT);    String queueNm = channel.queueDeclare().getQueue();    channel.queueBind(queueNm,ROUNTING,routingKey);    if(routingKey.contains("info")){        channel.queueBind(queueNm,ROUNTING,"error");    }    DefaultConsumer defaultConsumer = new DefaultConsumer(channel){        [@Override](https://my.oschina.net/u/1162528)        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {            String msg = new String(body);            System.out.println(routingKey+" : " + msg);        }    };    channel.basicConsume(queueNm,true,defaultConsumer);    System.out.println(routingKey+"客户端等待中....");    latch.countDown();} catch (IOException e) {    e.printStackTrace();} catch (TimeoutException e) {    e.printStackTrace();}

try (Connection connection = factory.newConnection()){    Channel channel = connection.createChannel();//                channel.exchangeDeclare(ROUNTING, ExchangeTypes.DIRECT);    if (name.contains("1")){        channel.basicPublish(ROUNTING,"error",null,(name+">>error").getBytes());        channel.basicPublish(ROUNTING,"warning",null,(name+">>warning").getBytes());    }else {        channel.basicPublish(ROUNTING,"info",null,(name+">>infoinfo").getBytes());    }    channel.close();} catch (IOException e) {    e.printStackTrace();} catch (TimeoutException e) {    e.printStackTrace();}

官方文档 5 使用mq来实现rpc

在分布式环境中,远程调用rpc有很多实现方式,比较流行的,非跨语言速度极快的java RMI, google的基于protobuf/http2的GRPC ,facebook的IO多路复用/tcp的Thrift,使用WSDL的Web Service等. MQ同样也可以做RPC实现,这源于MQ的天然负载均衡,以及rpc的非实时性要求. 使用rabbitmq实现rpc,用到了三点,第一是connection属性的BasicProperties,需要设置一个 应答队列replyTo,这个是在publish时带入的;第二 使用默认exchange,不需要设定exchange; 第三,应答队列的属性应当是排他自动删除的,这个使用默认无数方法生成的队列就可以,默认为排他, 自动删除,非持久队列.关于这点,可以看源码:

AutorecoveringChannel.java

@Overridepublic AMQP.Queue.DeclareOk queueDeclare() throws IOException {    return queueDeclare("", false, true, true, null);}

下面是rpc服务端代码

try {    Connection connection = factory.newConnection();    Channel channel = connection.createChannel();    channel.basicQos(1);    channel.queueDeclare(REQUEST_QUEUE,false,false,true,null);    System.out.println("RPC 服务器等待....");    channel.basicConsume(REQUEST_QUEUE,false,new DefaultConsumer(channel){        @Override        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {            String replyQueue = properties.getReplyTo();            AMQP.BasicProperties replyProp = new AMQP.BasicProperties().builder().correlationId(properties.getCorrelationId()).build();            String message = new String(body);            int n = Integer.parseInt(message);            String responseBody =String.valueOf(fibonacci(n));            channel.basicPublish("",replyQueue,replyProp,responseBody.getBytes());            channel.basicAck(envelope.getDeliveryTag(),false);            System.out.println("计算 Fibonacci ["+message+"] = "+responseBody);        }    });    latch.countDown();} catch (IOException e) {    e.printStackTrace();} catch (TimeoutException e) {    e.printStackTrace();}private int fibonacci(int value){    if(value == 0 || value == 1){        return value;    }else {        return fibonacci(value-1)+fibonacci(value-2);    }}

下面是rpc客户端代码,注意看没有设置exchange,队列也是使用默认的queueDeclare()

try {    Connection connection = factory.newConnection();    Channel channel = connection.createChannel();    //声明应答队列,默认是排他,自动删除,非持久队列,也就是说,当客户端停止了,队列就好消失    String queueName = channel.queueDeclare().getQueue();    String correlationId = UUID.randomUUID().toString();    AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().correlationId(correlationId).replyTo(queueName).build();    channel.basicPublish("",REQUEST_QUEUE,properties,message.getBytes());    BlockingQueue
response = new ArrayBlockingQueue
(1); channel.basicConsume(queueName,true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { if (correlationId.equalsIgnoreCase(properties.getCorrelationId())){ response.offer(new String(body)); } } }); System.out.println("接收到消息:"+response.take());} catch (IOException e) { e.printStackTrace();} catch (TimeoutException e) { e.printStackTrace();} catch (InterruptedException e) { e.printStackTrace();}

非官方 6 被抛弃冷落的direct同胞兄弟headers 类似主题+订阅模式结合

上面的例子都是使用routingkey来进行绑定关系,在一些情况下,可能还是不能满足业务场景, 比如我想要"张三",电话"123456789"的所有消息,转到一个特殊处理(仅举例,无意义).

,同样是创建一个exchange,类型headers,然后构建一个map,通过BasicProperties, 传递参数.注意这里的map的value可以为java的一些基本类型(可以查阅Frame.fieldValueSize()), 但是不能是用户自定义的类型.rabbitmq对于不存在queue,发送的消息会丢失,所以从消息持久化的角度, 服务端和客户端都应当declare,但是只有消费端declare,并不会报错,如果消息比客户端启动更早到达,则会丢失消息.

我做了一个测试,any可以有多个,正常接收消息,类似订阅模式fanout,但是注意all只能有一个接收.

try {    Connection connection = factory.newConnection();    Channel channel = connection.createChannel();    channel.exchangeDeclare(HEADERS_QUEUE,ExchangeTypes.HEADERS);    String queueName = channel.queueDeclare().getQueue();//                channel.queueDeclare(HEADERS_QUEUE,false,false,true,null);    Map
headers = new HashMap<>(); if(name.endsWith("all")){ headers.put("x-match","all"); headers.put("name","张三"); headers.put("phone","123456789"); }else if (name.endsWith("any1")){ headers.put("x-match","any"); headers.put("name","张三"); headers.put("phone","0000"); }else{ headers.put("x-match","any"); headers.put("name","张三1"); headers.put("phone","123456789"); } channel.queueBind(queueName,HEADERS,"",headers); channel.basicConsume(queueName,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("接收消息>>"+new String(body)); } }); System.out.println("客户启动."); latch.countDown();; } catch (IOException e) { e.printStackTrace();} catch (TimeoutException e) { e.printStackTrace();}

,服务端很简单,只需要将过滤条件添加即可.

try {    Connection connection = factory.newConnection();    Channel channel = connection.createChannel();//  channel.exchangeDeclare(HEADERS, ExchangeTypes.HEADERS);    Map
headers = new HashMap<>(); headers.put("name","张三"); headers.put("phone","123456789"); AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().headers(headers).build(); channel.basicPublish(HEADERS,"",properties,"hello rabbitmq ".getBytes()); channel.close(); connection.close();} catch (IOException e) { e.printStackTrace();} catch (TimeoutException e) { e.printStackTrace();}System.out.println("服务端启动.");

非官方 7 事务

事务几乎无处不在,而现在谈及事务绝不是简单的事务,而是分布式事务.遗憾的是这里的事务跟分布式事务没有必然联系. 这里单纯的谈及rabbitmq的事务.首先说一下,rabbitmq是基于tcp协议的,tcp三次握手四次挥手,这里就涉及到消息的确认 机制.而rabbitmq的事务也是依赖这个确认机制的.再来说一下确认机制,我们在使用rabbitmq或者jms默认都是 有确认机制的,只不过是默认实现,我们可以通过一些ack的参数或接口设置.一般都是默认批量自动ack, 什么时候ack呢?rabbitmq中没有消息过期的概念,只有消息被正常处理了,客户端发送了ack,才会删除. 批量ack,则是在ack到一定数量之后才一块发送ack,减少带宽,但是失败则影响较大.传统的事务,是先 开启事务,进行操作,事务提交,事务回滚,速度将减慢到原来的2倍(经过本地测试,差不多这个数). rabbitmq提供了一个高级的Publisher Confirm机制,跟传统不太一样,实际上是将事务的提交拆分了, 等所有事务提交完毕,在最终确认.速度介于并接近非事务速度(可能测试用例问题,跟传统tx差不多?!). 当开启publisher confirm时,该信道上会为每一个消息分配一个id,当消息被发送到消费端时,rabbitmq就 会发确认到生产端,消息的发送和确认是异步.

static class NoPublisher implements Runnable {    /**     * When an object implementing interface Runnable is used     * to create a thread, starting the thread causes the object's     * run method to be called in that separately executing     * thread.     * 

* The general contract of the method run is that it may * take any action whatsoever. * * @see Thread#run() */ @Override public void run() { try { try (Connection connection = factory.newConnection()) { Channel channel = connection.createChannel(); channel.queueDeclare(NO_TRANSACTION, false, false, false, null); long start = System.currentTimeMillis(); try { for (int i = 0; i < MSG_NUM; i++) { String msg = "rabbitmq msg!"; channel.basicPublish("", NO_TRANSACTION, null, msg.getBytes()); } channel.basicPublish("", NO_TRANSACTION, null, "end".getBytes()); } catch (Exception e) { e.printStackTrace(); } finally { channel.close(); } long end = System.currentTimeMillis(); System.out.println("[发送方]发送方耗时:" + (end - start)); } } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } }}static class NoConsumer implements Runnable { /** * When an object implementing interface Runnable is used * to create a thread, starting the thread causes the object's * run method to be called in that separately executing * thread. *

* The general contract of the method run is that it may * take any action whatsoever. * * @see Thread#run() */ @Override public void run() { try { Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(NO_TRANSACTION, false, false, false, null); //每次1条 channel.basicQos(1); long start = System.currentTimeMillis(); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body); channel.basicAck(envelope.getDeliveryTag(),false); if (msg.equalsIgnoreCase("end")){ long end = System.currentTimeMillis(); System.out.println("[接收方]接收完毕"+(end-start)); try { channel.close(); connection.close(); } catch (TimeoutException e) { e.printStackTrace(); } } } }; //手动ack channel.basicConsume(NO_TRANSACTION, false, consumer); System.out.println("[接收方]客户端等待中......"); latch.countDown(); } catch (TimeoutException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } }}

输出:

[接收方]客户端等待中......[发送方]发送方耗时:4080[接收方]接收完毕16904

static class TranPublisher implements Runnable {    /**     * When an object implementing interface Runnable is used     * to create a thread, starting the thread causes the object's     * run method to be called in that separately executing     * thread.     * 

* The general contract of the method run is that it may * take any action whatsoever. * * @see Thread#run() */ @Override public void run() { try { try (Connection connection = factory.newConnection()) { Channel channel = connection.createChannel(); channel.queueDeclare(TRANSACTION, false, false, false, null); long start = System.currentTimeMillis(); try { for (int i = 0; i < MSG_NUM;) { if (i%BATCH ==0){ //开启事务 channel.txSelect(); for (int j = 0; j < BATCH; j++) { String msg = "rabbitmq msg!"; if(i + j != MSG_NUM -1 ){ channel.basicPublish("", TRANSACTION, null, msg.getBytes()); }else{ channel.basicPublish("", TRANSACTION, null, "end".getBytes()); } } i += BATCH; //commit channel.txCommit(); } } } catch (Exception e) { //回滚事务 channel.txRollback(); e.printStackTrace(); } finally { channel.close(); } long end = System.currentTimeMillis(); System.out.println("[tx发送方]发送方耗时:" + (end - start)+" 批量大小="+BATCH); } } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } }}static class TranConsumer implements Runnable { /** * When an object implementing interface Runnable is used * to create a thread, starting the thread causes the object's * run method to be called in that separately executing * thread. *

* The general contract of the method run is that it may * take any action whatsoever. * * @see Thread#run() */ @Override public void run() { try { Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(TRANSACTION, false, false, false, null); //每次1条 channel.basicQos(1); long start = System.currentTimeMillis(); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body); //发送ack channel.basicAck(envelope.getDeliveryTag(), false); if (msg.equalsIgnoreCase("end")){ long end = System.currentTimeMillis(); System.out.println("[tx接收方]接收完毕"+(end-start)); try { channel.close(); connection.close(); } catch (TimeoutException e) { e.printStackTrace(); } } } }; //手动ack channel.basicConsume(TRANSACTION, false, consumer); System.out.println("[tx接收方]客户端等待中......"); latch.countDown(); } catch (TimeoutException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } }}

输出:

[tx接收方]客户端等待中......[tx发送方]发送方耗时:8703 批量大小=100[tx接收方]接收完毕22160

static class ConfirmPublisher implements Runnable {    /**     * When an object implementing interface Runnable is used     * to create a thread, starting the thread causes the object's     * run method to be called in that separately executing     * thread.     * 

* The general contract of the method run is that it may * take any action whatsoever. * * @see Thread#run() */ @Override public void run() { try { try (Connection connection = factory.newConnection()) { Channel channel = connection.createChannel(); long start = System.currentTimeMillis(); try { for (int i = 0; i < MSG_NUM; ) { if (i%BATCH ==0){ //开启confirm3 channel.confirmSelect(); for (int j = 0; j < BATCH; j++) { String msg = "rabbitmq msg!"; if(i + j != MSG_NUM -1){ channel.basicPublish("", CONFIRM, null, msg.getBytes()); }else{ channel.basicPublish("", CONFIRM, null, "end".getBytes()); } } i += BATCH; //confirm// waitForConfirmsOrDie 相对于 waitForConfirms 来说,只要有nack就好抛出异常,同时也是一种阻塞式 channel.waitForConfirmsOrDie(); //channel.addConfirmListener(new ConfirmListener() {// @Override// public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.out.println("ack deliveryTag = " + deliveryTag);// }//// @Override// public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("nack deliveryTag = " + deliveryTag);// }// }); } } } catch (Exception e) { e.printStackTrace(); } finally { channel.close(); } long end = System.currentTimeMillis(); System.out.println("[confirm发送方]发送方耗时:" + (end - start)+" 批量大小="+BATCH); } } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } }}static class ConfirmConsumer implements Runnable { /** * When an object implementing interface Runnable is used * to create a thread, starting the thread causes the object's * run method to be called in that separately executing * thread. *

* The general contract of the method run is that it may * take any action whatsoever. * * @see Thread#run() */ @Override public void run() { try { Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(CONFIRM, false, false, false, null); //每次1条 channel.basicQos(1); long start = System.currentTimeMillis(); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body); //发送ack channel.basicAck(envelope.getDeliveryTag(), false);// System.out.println("确认"+msg); if (msg.equals("end")){ long end = System.currentTimeMillis(); System.out.println("[confirm接收方]接收完毕"+(end-start)); try { channel.close(); connection.close(); } catch (TimeoutException e) { e.printStackTrace(); } } } }; //手动ack channel.basicConsume(CONFIRM, false, consumer); System.out.println("[confirm接收方]客户端等待中......"); latch.countDown(); } catch (TimeoutException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } }}

输出:

[confirm接收方]客户端等待中......[confirm发送方]发送方耗时:5358 批量大小=100[confirm接收方]接收完毕22502

10w简单消息发送时间

无事务:15s左右

tx事务:20s左右

confirm事务:20s左右

本地测试,所以这里没有网络的延迟.

这里有个疑问,confirm事务没有像官方说明的一样,接近无事务的效率.

由于个人水平有限,如有问题请指出。 https://fansinzhao.github.io/page/image/weixin.png

转载于:https://my.oschina.net/u/3774406/blog/1648846

你可能感兴趣的文章
判断字符是否为中文
查看>>
23.CP命令
查看>>
LINUX中JDK环境变量配置
查看>>
linux 切换用户之后变成-bash-3.2$的解决方法
查看>>
我的友情链接
查看>>
curl wget lynx
查看>>
[PHP]函数级分层性能分析工具-Xhprof的安装与使用
查看>>
使用list
查看>>
Ubuntu 12.04 安装 gcc-4.8 及 gdb 7.6
查看>>
DOM设置表格隔行变色js代码及鼠标悬停在哪行,哪行字体就加粗效果
查看>>
GII 和 DEBUG 模块出现 403 解决
查看>>
shell历史命令记录功能
查看>>
kali linux软件源
查看>>
centos6设置静态IP
查看>>
cocos2d_x在windows环境下的方向键支持
查看>>
Mysql数据库恢复,Ibdata1文件删除数据恢复成功
查看>>
开篇博文
查看>>
Maven学习总结(11)——Maven Tomcat7自动部署
查看>>
Shell 中常用的sqlplus 代码段
查看>>
Maven学习总结(1)——Maven入门
查看>>