中间件 - 消息队列 - RocketMQ-程序员宅基地

技术标签: 中间件  消息队列  RocketMQ  

0、RocketMQ 简介

RocketMQ 是由阿里捐赠给Apache 的一款分布式、队列模型的开源消息中间件,经过过淘宝双十一的洗礼。

RocketMQ 的特性有如下几个方面:

  • 原生分布式
  • 两种消息拉取
  • 严格消息顺序
  • 特有的分布式协调器
  • 亿级消息堆积
  • 消息组

1、RocketMQ 的基础概念

RocketMQ 由以下几个概念组成:

Producer:消息生产者

Consumer:消息消费者

Broker:MQ 服务,负责接收、存储和发送消息

NameServer:负责 MQ 服务之间的协调

Topic:消息发送的目的地

Tag:用来将消息做区分发送的标记

2、RocketMQ 的服务架构

首先Rocket 集群服务启动之前需要先启动 NameServer 集群,NameServer 是用来提供 MQ 服务器的信息注册和路由信息(生产者 -> topic -> broker -> 消费者)存储的;

接着启动 Rocket 集群服务,每个服务器都会将本机信息注册到 NameServer 集群中的每台机器上;

消息发送之前首先需要创建 Topic (因为生产者最终要操作的对象),所以告诉 NameServer 要创建 Topic,NameServer 就会创建对应集群上的多台Broker上的 Topic 分区 partition

生产者准备发送消息之前需要先咨询 NameServer 当前 发送的 Topic 应该连接到哪台 Broker 上面,NameServer 根据之前服务器的注册信息以及 Topic 分区信息分配一台 Broker 供生产者进行连接并发送数据,生产者就会将数据放到对应 Broker 上面的 Topic 分区中的某个队列上(队列数量可以配置);

消费者消费消息也是同样通过 NameServer 根据 Topic 以及 Topic 分区信息来分配一台 Broker 去进行消费。一个消费者能同时消费者多个队列,但是一个队列同时只能被一个消费者消费,因此如果所有分区的队列数总共 8个,那么部署更多的消费者也并不能提高执行效率;

消息发送到 Broker 之后暂时存放在内存,但是为了持久化考虑所以要将消息写入到磁盘中,这个写入的过程分为同步写入和异步写入。同步写入会立马写入磁盘并且知道写入成功才会返回消息发送成功给生产者;而异步写入则会立即返回给生产者消息发送成功,但是这种情况存在数据丢失的问题

消息存放在 Broker 之后,Broker 可能会宕机,因此又产生出主从备份的架构。消息在发送到主服务器之后,会再次往从服务器进行消息的同步,这里服务器间的写入操作也分为同步写入和异步写入,同样的同步写入会响应较慢,但是数据不会丢失,而异步写入响应快但存在数据丢失的风险。

 

3、RocketMQ 简单示例

3.1 同步消息发送

发送同步消息,可靠的同步传输用于广泛的场景,如重要的通知消息,短信通知,短信营销系统等。

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;


public class SyncProducer {

	public static final String NAME_SERVER_ADDR="";
	
	public static void main(String[] args) throws Exception{
	
		DefaultMQProducer producer=new DefaultMQProducer("GROUP_TEST");
		
		producer.setNamesrvAddr(NAME_SERVER_ADDR);
		
		producer.start();
		
		for(int i=0;i<10;i++){
                        //创建消息的时候指定消息发送到的 Topic 以及用来区分消息的标记 Tag
			Message message=new Message("TopicTest", "TagA", ("Hello MQ:" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
			
			SendResult result=producer.send(message);
			
			System.out.printf("发送结果:%s%n", result);
			
			
		}
		
		producer.shutdown();
	}
}
    

 3.2 异步消息发送

异步消息发送一般用来对方法调用响应时间有较严格要求的情况下,异步调用,立即返回。不同于同步的唯一在于: send方法调用的时候多携带一个回调接口参数 SendCallBack,用来异步处理消息发送结果;另外还可以使用 setRetryTimesWhenSendAsyncFailed 方法来设置异步消息发送失败重试次数。
 

public class AsyncProducer {

	public static final String NAME_SERVER_ADDR="";
	
	public static void main(String[] args) throws Exception{
		//1.创建生产者对象,并指定组名
		DefaultMQProducer producer=new DefaultMQProducer("GROUP_TEST");
		//2.指定 NameServer 的地址
		producer.setNamesrvAddr(NAME_SERVER_ADDR);
		//3.启动生产者
		producer.start();
		//4.设置异步发送失败重试次数,默认为 2
		producer.setRetryTimesWhenSendAsyncFailed(0);
		
		int count=10;
		CountDownLatch cd=new CountDownLatch(count);
		for(int i=0;i<count;i++){
			final int index=i;
			//5.创建一个消息,指定发送到主题 TopicTest 上,并且设定 TAGS 属性为 TagB,KEYS 属性为 ID110,消息内容为 Hello World 1...
			Message msg=new Message("TopicTest", "TagB", "ID110", ("Hello world"+index).getBytes(RemotingHelper.DEFAULT_CHARSET));
			//6.发送异步消息并且在消息发送完成后回调 onSuccess 或 onException 方法
			producer.send(msg, new SendCallback() {
				
				@Override
				public void onSuccess(SendResult arg0) {
					// TODO Auto-generated method stub
					System.out.printf("%-10d OK MSG_ID:%s %n", index,arg0.getMsgId());
					cd.countDown();
				}
				
				@Override
				public void onException(Throwable arg0) {
					// TODO Auto-generated method stub
					System.out.printf("%-10d Exception %s %n", index,arg0);
					arg0.printStackTrace();
					cd.countDown();
				}
			});
		}
		
		cd.await();
		producer.shutdown();
	}
}

3.3 单向消息

 单向模式一般用来对可靠性有一定要求的消息发送,例如日志系统。不同于同步的唯一之处在于:调用的是sendOneway方法,且方法不返回任何值,即调用者不需要关心成功或失败。

public class OnewayProducer {
	
	public static final String NAME_SERVER_ADDR="";

	public static void main(String[] args) throws Exception {
		// TODO Auto-generated method stub
		DefaultMQProducer producer=new DefaultMQProducer("GROUP_TEST");
		
		producer.setNamesrvAddr(NAME_SERVER_ADDR);
		
		producer.start();
		
		for(int i=0;i<10;i++){
			Message msg=new Message("TopicTest", "TagC", ("Hello World"+i).getBytes(RemotingHelper.DEFAULT_CHARSET));
			producer.sendOneway(msg);
		}
		System.out.println("消息已发送");
		
		producer.shutdown();
	}

}

 3.4 Push模式的消费者

消费者有两种消费模式,一种是 Push,即服务器端将消息推送到客户端;另外一种是 Pull,即客户端主动去服务器端拉取消息。但其实在 RocketMQ 中,Push 模式的实现也是通过 Pull 模式来实现的,不过是帮我们屏蔽了对MQ上的消息队列 offset 的操作。

Push 模式下的消费者需要注册一个消息监听器来接受服务器发送过来的消息,同时也包括对消息的消费确认,即通过 MessageListenerConcurrently 接口的方法 consumeMessage 返回值 ConsumeConcurrentlyStatus。

/**
 * 普通消息消费者
 */
public class Consumer {

	public static final String NAME_SERVER_ADDR="";
	
	public static void main(String[] args) throws Exception{
		
		//1.创建消费者对象,基于服务器主动 push 请求
		DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("GROUP_TEST");
		
		//2.设置 NameServer 的地址,也可以通过设置环境变量 NAMESRV_ADDR的值
		consumer.setNamesrvAddr(NAME_SERVER_ADDR);
		
		//设置消费重试次数,-1 代表 16 次
		consumer.setMaxReconsumeTimes(-1);
		
		//3.订阅对应的主题和Tag
		consumer.subscribe("TopicTest", "*");
		
		//4.注册消息接收监听器,即接收到 Broker 的消息后如何处理
		consumer.registerMessageListener(new MessageListenerConcurrently() {
			
			@Override
			public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
				// TODO Auto-generated method stub
				try {
					MessageExt messageExt=msgs.get(0);
					System.out.printf("线程:%-25s 接收到新消息 %s --- %s %n", Thread.currentThread().getName(), messageExt.getTags(), new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET));
				} catch (Exception e) {
					// TODO: handle exception
				}
				return ConsumeConcurrentlyStatus.RECONSUME_LATER;
			}
		});
		
		consumer.start();
		
		System.out.println("已启动消费者");
	}
}

3.5 Pull 模式的消费者

 消费者主动从服务器上拉取数据的方式。这种方式下面需要消费者主动的更新队列中的 offset 来使得下一次读取的位置得以更新。

public class PullConsumer {

	public static final String NAME_SERVER_ADDR="";
	
	public static void main(String[] args) throws Exception {
		// TODO Auto-generated method stub
		DefaultMQPullConsumer consumer=new DefaultMQPullConsumer("GROUP_TEST");
		consumer.setNamesrvAddr(NAME_SERVER_ADDR);
		consumer.start();
		
		//获取到订阅的 Topic 上面所有的 Queue
		Set<MessageQueue> messageQueues = consumer.fetchSubscribeMessageQueues("TopicTest");
		Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(()->{
			try {
				//遍历所有的 MessageQueue
				for(MessageQueue messageQueue:messageQueues){
					//抓取到当前 MessageQueue 的偏移量
					long offset=consumer.fetchConsumeOffset(messageQueue, true);
					//从当前的偏移量开始取 10 条数据
					PullResult result=consumer.pullBlockIfNotFound(messageQueue, null, offset, 10);
					System.out.println(result.getNextBeginOffset());
					//取完数据后更新该 MessageQueue 的偏移量,以便于下次取消息
					consumer.updateConsumeOffset(messageQueue, result.getNextBeginOffset());
					//如果可以从 MessageQueue 取到消息
					if(result.getPullStatus()==PullStatus.FOUND){
						List<MessageExt> messageExts=result.getMsgFoundList();
						for(MessageExt messageExt:messageExts){
							System.out.printf("线程:%-25s 接收到新消息 %s --- %s %n", Thread.currentThread().getName(), messageExt.getTags(), new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET));
						}
					}
				}
			} catch (Exception e) {
				// TODO: handle exception
			}
		},1000L,1000L,TimeUnit.MILLISECONDS);
		
	}

}

 

4、高级特性

4.1 有序消息

(1)为什么需要有序消息呢?

我们试想一下这样的场景:你去银行存钱取钱的时候,银行的通知系统都会给你发短信告诉你本次操作的结果,假设你先是取了1000块钱,又取了1000块钱,但是你收到的短信通知是取了1000块钱,之后又收到通知是存了1000 块钱,虽然说结果上是没有区别的,但是用户体验却很差。

再比如我们平时购物的过程中,涉及到下订单、支付、发货、收货等,如果商家还没有处理完支付的消息就把发货的消息处理了,那么就会出现问题。除了这两个之外,还有很多的场景都需要保证消息的有序处理。

(2)那怎么样来实现有序消息处理呢?

首先,生产者在发送消息给 MQ 服务器的时候就需要是有序的;其次,消息在 MQ 上的存储也必须是有序的;最后,消息在消费者端的消费也要保证是有序的;只有以上三点都保证了,消息的有序处理才能得以保证。

(3)RocketMQ 如何来保证消息的有序处理呢?

RocketMQ 中提供了两种机制来保证消息的顺序存储:一种是分区顺序,一种是全局顺序。其实原理上都是一样,都是将需要保证有序的消息存放在同一个队列上,但是分区顺序是当消息发送过来之前,生产者需要指定消息要放到哪个队列上去,即具有同一业务逻辑的消息放在同一个队列上,RocketMQ 提供了 MessageQueueSelector 来实现如上功能;而全局顺序则是将所有消息统一的放到同一个队列上去。可想而知,如果所有的消息都发送到同一个队列上去处理,而一个队列同时只能被一个消费者消费,那么消费的性能就不高,但是这种情况却能保证所有消息都可以顺序的被消费。

虽然消息是顺序存储在队列上,但是并不能确保消息的消费时有序的,所以就需要消费者端自己去实现消息的有序执行。当然 RocketMQ 也提供了相应的消息监听接口 MessageListenerOrderly 来使得客户端可以轻松的实现消息的有序消费。

(4)RocketMQ 中的有序消息代码示例:

下面是有序消息发送者的代码实例中的关键部分,其中定义了 MessageQueueSelector 来决定消息被发送到哪个队列;还调用了与之前不同的 send 方法,在 send 方法中指明了要使用 MessageQueueSelector 以及传递给 MessageQueueSelector 中 select 方法的 arg 参数值。

                //设置异步发送失败重试次数,默认为2
		producer.setRetryTimesWhenSendAsyncFailed(0);
		
		MessageQueueSelector messageQueueSelector=new MessageQueueSelector() {
			
			/**
             * 消息队列选择器,保证同一条业务数据的消息在同一个队列
             * @param mqs topic中所有队列的集合
             * @param msg 发送的消息
             * @param arg 此参数是本示例中producer.send的第三个参数
             * @return
             */
			@Override
			public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
				// TODO Auto-generated method stub
				Integer id=(Integer)arg;
				
				int index=id%mqs.size();
				
				return mqs.get(index);
				
				//return mqs.get(mqs.size()-1);
			}
		};
		
		String[] tags = new String[]{"TagA", "TagB", "TagC"};
		//获取到要发送的数据
		List<Map> bizDatas=getBizDatas();
		
		for (int i = 0; i < bizDatas.size(); i++) {
			Map bizData=bizDatas.get(i);
			// keys:业务数据的ID,比如用户ID、订单编号等等
            // 这里面的 msgType 就是用来区分发送到哪个队列的依据
			Message msg=new Message("TopicTest", tags[i%tags.length], ""+bizData.get("msgType"), bizData.toString().getBytes(RemotingHelper.DEFAULT_CHARSET));
			// 发送有序消息
			SendResult result = producer.send(msg, messageQueueSelector, bizData.get("msgType"));
			System.out.printf("%s, body:%s%n", result, bizData);
		}

下面是有序消息消费者的关键代码部分,使用的是基于服务器端 Push 模式的消费者,通过在消费者上面注册消息监听端口并传入 MessageListenerOrderly 接口的匿名实现类来实现消息的接收以及有序消费(有序消费是相对于并发消费 MessageListenerConcurrently 来说的,因为并发消费并不能保证消息被顺序处理)

                //设置消息接收监听器,注意此时使用的是 MessageListenerOrderly 监听器,平常情况下我们使用的是 MessageListenerConcurrently 监听器
		consumer.setMessageListener(new MessageListenerOrderly() {
			
			@Override
			public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
				// TODO Auto-generated method stub
				context.setAutoCommit(true);
				doBiz(msgs.get(0));
				return ConsumeOrderlyStatus.SUCCESS;
			}
		});

(5)顺序消费的实现

首先消费者启动之后,会尝试通过 PullMessageService 从服务器端拉取消息,然后将消息放到本地的一个 processQueue 队列中

然后通过 ConsumeMessageOrderlyService 来消费消息,消费消息队列的过程中会对消费队列加锁以避免其他线程执行读取操作。 

(6)无序消息、分区顺序消息,全局顺序消息的对比

(7)有序消息的缺点

  • 发送有序消息无法利用集群的Failover即故障转移特性,因为不能更换 MessageQueue 进行重试。这里的意思其实是虽然有主从备份,但是如果主 MQ 服务器宕机了,在从服务器上不知道从 MessageQueue 的何处进行消费。
  • 因为发送的路由策略导致的热点问题,可能某一些 MessageQueue 的数据量特别大
  • 消费的并行读取依赖于 queue 数量,因此一个队列同时只能被一个消费者消费
  • 消费失败时无法跳过当前消息继续消费

4.2 发布订阅模式

RocketMQ 本身并没有提供发布订阅机制,但是可以在定义消费者的程序中通过设置消息接收模式来使得多个消费者都可以拿到同一个队列的同一份数据,默认情况下消费者的消费模式是 CLUSTERING 即集群消费模式,而修改成 BROADCASTING 模式后即可达到发布订阅的效果。

//设置 consumer 第一次启动是从队列头部开始消费还是队列尾部开始消费
//如果非第一次启动,那么按照上次消费的位置继续消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

//MessageModel.BROADCASTING 广播消费模式,所有订阅了同一个主题的消费者都会收到相同的消息
//MessageModel.CLUSTERING   集群消费模式,此集群不是指集群环境中的集群,但概念相同,即可实现消息的负载均衡。当有多个消费者订阅了相同的主题时,同一条消息只会有一个消费者消费
consumer.setMessageModel(MessageModel.BROADCASTING);

4.3 定时消息

 通过给要发送的消息设置延迟级别即:message.setDelayTimeLevel(3) 来完成延时的效果,延时时间是根据 broker 服务器端的设置的 messageDelayLevel 来判断的。例如 messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,那么上面消息设置的 3 等级就是延迟 10s 。定时消息的实现原理是 RocketMQ 中内置了针对各种延时等级的队列,设置了不同等级的消息在进入 MQ 服务器后被最后放到这些内置的延时等级队列中,这个队列中的消息保留了原 Topic 的信息从而可以保证消息被正确的消费者消费,另外 MQ 也会定时的去读取各个等级延时队列中的消息是否达到了指定的延时时长,如果到达了就发送;否则进入下一次遍历等待循环中。

Message message=new Message("TopicTest", content.getBytes(RemotingHelper.DEFAULT_CHARSET));
//设置消息延时等级,此消息将在 10 秒后传递给消费者
//可以在 broker 服务器端自行配置  messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
message.setDelayTimeLevel(3);

4.4 批量发送和消费

批量发送和消费消息是为了追求高性能的常见手段,主要是为了减少客户端和服务器之间的通信次数。

在使用批量消息的过程中要注意如下几点:

  • 同一批的消息应该具有相同的主题、相同的消息配置
  • 不支持批量发送定时消息
  • 建议一个批量消息的大小最好不要超过 1MB,避免影响其他消息的处理

批量发送消息主要是通过 RocketMQ 的 Producer 提供的 send 方法来实现的,即send 方法支持发送一条消息,也同时支持发送多条消息:

List<Message> messages=new ArrayList<>();
for(int i=0;i<32;i++){
	String content="Hello batch message "+i;
	Message message=new Message("TopicTest", content.getBytes(RemotingHelper.DEFAULT_CHARSET));
	messages.add(message);
}

批量消费消息是通过设置消费者一次抓取队列上消息的个数来实现的,默认抓取的消息个数为 1,同时 Push 模式的消费者注册的 MessageListenerConcurrently 类型监听器后,里面的 consumeMessage 方法的第一个参数本来就是一个集合类型,因此它本身就是为了批量接收消息而编写的接口。

                //设置批量消息处理数量,即每次最多获取多少消息,默认是1
		consumer.setConsumeMessageBatchMaxSize(10);
		
		consumer.registerMessageListener(new MessageListenerConcurrently() {
			
			@Override
			public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
				// TODO Auto-generated method stub
				try {
					for(MessageExt messageExt:msgs){
						System.out.printf("线程:%-25s 接收到新消息 --- %s %n", Thread.currentThread().getName(), new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET));
					}
				} catch (Exception e) {
					// TODO: handle exception
				}
				return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
			}
		});

 

4.5 事务消息

所谓的事务消息就是指 Producer 端消息发送事务和本地事务事件同时成功或者同时失败。

下面是RocketMQ 的逻辑实现示意图:

首先,MQ 发送方会将消息发送到 MQ Server 的半数队列中

然后,MQ Server 返回发送成功的回执给 MQ 发送方

MQ 发送方开始处理本地事务并告知 MQ Server 提交或者回滚之前发送的消息,提交则表示消息可以进行投递,而回滚则表示消息被删除

如果 MQ Server 迟迟没有收到 MQ 发送方的确认,在间隔一段时间之后会再次将消息发送回给 MQ 发送方并希望其检查本地事务的状态,MQ 发送方根据本地事务状态来最终确认消息的提交或者回滚

在使用 RocketMQ 的事务消息时要注意:

  • 事务消息不支持定时和批量发送
  • 为了避免一个消息被多次检查,从而导致半数队列消息堆积,RocketMQ 会限制单个消息的默认检查次数为 15 次,这个参数可以通过修改 broker 配置文件中的 transactionCheckMax 进行修改
  • 特定的时间段之后来回查 MQ 发送方的事务,可以通过 broker 配置文件中的 transactionTimeout 参数后者用户配置 CHECK_IMMUNITY_TIME_IN_SENCODS 来调整回查的时间间隔。
  • 一个事务消息可能会被检查或者消费多次
  • 提交过的消息重新放到用户目标主题时可能会失败
  • 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共用,即用来生产事务消息的生产者就只能生产事务消息

下面是 RocketMQ 的事务消息使用示例:首先消息发送者实例是 TransactionMQProducer 以此来支持事务消息的发送,其次需要给事务消息发送者实例设置一个事务监听器,该监听器提供了如下两个功能:一、用来在收到 MQ Server 确认之后执行本地事务的方法 executeLocalTransaction ;二、当 MQ Server 长时间无法确认事务状态后发送回查事务状态的时候执行的 checkLocalTransaction 方法

private static final TransactionListener transactionListenerImpl=new TransactionListener() {
		
		/**
		 * 在发送消息成功时执行本地事务
		 * @param msg
		 * @param arg
		 * @return
		 */
		@Override
		public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
			// TODO 开启本地事务(实际就是我们的jdbc操作)

                        // TODO 执行业务代码(插入订单数据库表)
                        // int i = orderDatabaseService.insert(....)
                        // TODO 提交或回滚本地事务(如果用spring事务注解,这些都不需要我们手工去操作)

                        // 模拟一个处理结果
			int index=8;
			
			/**
			 * 模拟返回事务的状态
			 */
			switch(index){
				case 3:
					System.out.printf("本地事务回滚,回滚消息,id:%s%n", msg.getKeys());
					return LocalTransactionState.ROLLBACK_MESSAGE;
				case 5:
				case 8:
					return LocalTransactionState.UNKNOW;
				default:
					System.out.println("事务提交,消息正常处理");
					return LocalTransactionState.COMMIT_MESSAGE;
					
			}
		}
		
		/**
		 * broker 端对未确定状态的消息发起回查,将消息发送到对应的 Producer 端(同一个 Group 的 Producer)
		 * 由 Producer 根据消息来检查本地事务的状态,进而执行 Commit 或者 Rollback
		 * @param msg
		 * @return
		 */
		@Override
		public LocalTransactionState checkLocalTransaction(MessageExt msg) {
			// 根据业务,正确处理: 订单场景,只要数据库有了这条记录,消息应该被commit
			String transactionId=msg.getTransactionId();
			String key=msg.getKeys();
			System.out.printf("回查事务状态 key:%-5s msgId:%-10s transactionId:%-10s %n", key,msg.getMsgId(),transactionId);
			
			//测试,只有 id 为 5 的消息才能正确被提交,从而到达消费者端
			if("id_5".equals(key)){
				System.out.printf("回查到本地事务已提交,提交消息,id:%s%n", msg.getKeys());
				return LocalTransactionState.COMMIT_MESSAGE;
			}else{
				System.out.printf("未查到本地事务状态,回滚消息,id:%s%n",msg.getKeys());
				return LocalTransactionState.ROLLBACK_MESSAGE;
			}
		}
	};

public static void main(String[] args){ 
                //使用 TransactionMQProducer 来指定这是一个事务消息生产者实例
                TransactionMQProducer producer=new TransactionMQProducer("GROUP_TEST");
		producer.setNamesrvAddr(NAME_SERVER_ADDR);
		//设置事务的监听器
		producer.setTransactionListener(transactionListenerImpl);


        ....

}  

4.6 可视化界面 

rocketmq默认不带可视化控制台,需要去单独编译一个工具 https://github.com/apache/rocketmq-externals

(1)首先将代码从 git 上面下载下来

(2)将下载下来的代码编译成 jar 包

(3)启动

# jar包在target目录下面,你可以放在一台服务器上面运行
java -jar rocketmq-console-ng-1.0.0.jar --server.port=8081--rocketmq.config.namesrvAddr=192.168.100.242:9876
# --server.port springboot内置tomcat的端口号,默认8080;
# --rocketmq.config.namesrvAddr  nameserver的地址

5、最佳实践建议

5.1 Producer 的最佳实践

(1)一个应用应该尽可能用同一个 Topic,消息子类型用 Tags 来标识,tags 可以由应用自由设置

只有消息在发送时设置了 tags,消费方才可以利用 tags 在 Broker 上做消息过滤

message.setTags("TagA");

(2)每个消息在业务处层面的唯一标志码,应该要设置到 keys 字段,方便将来定位消息丢失问题

服务器会为每个消息创建索引,应用可以通过 topic,key 来查询这条消息的内容以及该消息被谁消费

由于索引是哈希索引,所以请务必保证 key 尽可能的唯一,这样可以避免潜在的哈希冲突

String orderId="1250689524981";
message.setKeys(orderId);

(3)如有可靠性需要,消息发送成功或者失败后要打印消息日志(保留 sendResult 和 key 信息)

(4)如果相同性质的消息量大,可以使用批量消息来提升性能

(5)建议消息大小不超过 512KB 防止造成网络阻塞

(6)send(msg) 会阻塞生产者,所以如果有性能要求,可以使用异步的方式,即添加一个回调接口:send(msg,callBack) 

(7)如果在一个 JVM 中,有多个生产者进行大量数据处理,建议:少数生产者使用异步发送的方式(3~5个就够);通过 setInstanceName 方法,给每个生产者设置一个实例名

(8)send 消息方法只要不抛异常,就代表发送成功,但是发送成功会有多个状态,在 sendResult 中定义:

  • SEND_OK:消息发送成功
  • FLUSH_DISK_TIMEOUT:消息发送成功,但是服务器将内存数据刷到磁盘超时,此时如果服务器宕机的话,消息会丢失
  • FLUSH_SLAVE_TIMEOUT:消息发送成功,但是主服务器在同步到 Slave 服务器时超时,此时如果服务器宕机,消息就会丢失
  • SLAVE_NO_AVAILABLE:消息发送成功,但是此时 slave 服务器不可用

如果状态是 FLUSH_DISK_TIMEOUT 或者 FLUSH_SLAVE_TIMEOUT,可以丢弃这条消息或者重发,但建议重发消息,由消费者去重。

Producer 向 Broker 发送请求会等待响应,但如果达到最大等待时间,未得到响应,则客户端会抛出 RemotingTimeoutException

默认等待时间是 3 秒,如果使用 send(msg,timeout) 则可以自己设定超时时间,但超时时间不能设置太小,因为 Broker 需要一些时间来将内存中的数据刷到磁盘或者与从服务器进行同步

如果设置超时时间超过 syncFlushTimeout,那么该值可能影响不大,因为 Broker 可能会在超时之前返回 FLUSH_DISK_TIMEOUT 或者FLUSH_SLAVE_TIMEOUT

(9)对于消息不可丢失的应用,务必要有消息重发机制。

Producer 的 send 方法本身支持内部重发:至少重发 3 次;如果发送失败,则轮转发送到下一个 Broker;这个方法的总耗时时间不超过 sendMsgTimeout 设置的值,默认 10s。所以如果本身向 Broker 发送消息产生超时异常,就不会再做重发。

以上策略仍然不能保证消息一定发送成功,建议将消息存储到本地 DB,由后台线程定时重试,保证消息一定发送到 Broker

5.2 Consumer 的最佳实践

(1)消费者组和订阅:不同的消费者组可以独立地消费同样的主题,并且每个消费者都有自己的消费偏移量(offset)。同时要确保同一个组中的每个消费者订阅的是相同的主题

(2)顺序消费:消费者在顺序消费消息时会锁定每个 MessageQueue,以确保每个消息每个都按顺序被消费,但是这将导致性能损耗。另外在消费过程中,不建议直接抛出异常,而是以返回 ConsumerOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT 代替,即表示挂起当前队列并且稍后再试。同样的,在并发消息消费过程中,如果出现了异常,可以给服务器返回稍后重试:ConsumeConcurrentlyStatus.RECONSUME_LATER。如果 MQ Server 收到消费者稍后重试的请求,那么MQ Server 会自动地为每个消费者组创建一个 %Retry%consumerGroupName 的 topic 并且将原先 topic 中的数据放到这个 topic 中,等待一定的延时时间后将消息再次放到原先的 topic 中。延时的时间跟之前定时消息的等级一样,从 Level 3  开始不断的重试,下次重试间隔时间会随着重试次数的增加,时间等级也会逐渐增加。

(3)阻塞:消费者在消费过程中,通过在 MessageListener 中执行业务代码来完成消费过程,但是不建议在这个线程中同步执行耗时很长的业务,因为它会阻塞 consumer 的线程池,最终可能会停止消费者程序。如果真的有耗时业务,可以另外启动一个线程池异步处理。

(4)线程数:针对 (3) 中提到的消费者线程池,可以通过设置 setConsumeThreadMin 或者 setConsumeThreadMax 来更改它。

(5)从何处开始消费:当建立一个新的 Consumer Group 时,需要决定是否需要消费 Broker 中已经存在的历史消息。使用 CONSUME_FROM_LAST_OFFSET 将忽略历史消息,并消费此后生成的任何内容;使用 CONSUME_FROM_FIRST_OFFSET 将消费 Broker 中存在的所有消息,还可以使用 CONSUME_FROM_TIMESTAMP 来消费指定的时间戳之后的消息

(6)消息去重:RocketMQ 无法避免消息重复,如果业务对重复消费非常敏感,务必在业务层做去重:一、可以通过记录消息唯一键进行去重;二、可以使用业务层面的状态机制去重

5.3 NameServer 的最佳实践

(1)在 Apache RocketMQ 中,NameServer 用于协调分布式系统的每个组件,主要通过管理 Topic 路由信息来实现协调。管理操作包括两部分:一、Brokers 定期更新保存在每个 NameServer 中的元数据;二、NameServer 是为客户端提供最新的路由信息服务的,包括生产者、消费者和命令行客户端。因此在启动 Brokers 和 Clients 之前,我们需要给他们提供 NameServer 的地址列表来访问 NameServer,在 RocketMQ 中提供了以下几种方式来声明要连接的 NameServer:

第一种:对于 Brokers,我们可以在 broker 的配置文件中指定

 

namesrvAddr=name-server-ip1:port;name-server-ip2:port

 第二种:对于生产者和消费者,我们可以在代码中给他们提供 NameServer 地址列表

DefaultMQProducer producer=new DefaultMQProducer("consumer_group_name");
producer.setNamesrvAddr("name-server-ip1:port;name-server-ip2:port");

DefaultMQConsumer consumer =new DefaultMQConsumer("consumer_group_name");
consumer.setNamesrvAddr("name-server-ip1:port;name-server-ip2:port");

第三种:对于从 shell 中使用管理命令时也可以指定 NameServer:

sh mqadmin command-name -n name-server-ip1:port;name-server-ip2:port -X other-options
#例如如下的命令:用来向 NameServer 查询集群中服务器的列表信息
sh mqadmin -n localhost:9876 clusterList

 第四种:对于将管理工具集成到自己的项目中的情况,可以使用如下方式指定:

DefaultMQAdminExt defaultMQAdminExt=new DefaultMQAdminExt("consumer_group_name");
defaultMQAdminExt.setNamesrvAddr("name-server-ip1:port;name-server-ip2:port");

第五种:在 broker 的启动脚本中,配置 rocketmq.namesrv.addr 参数来指定 NameServer 的地址列表

第六种:可以设置 NAMESRV_ADDR 环境变量。如果设置了,Broker 和 Client 将检查并使用环境变量的值

第七种:HTTP 接口方式:如果没有使用前面提到的方法指定 NameServer 地址列表,RocketMQ 将每2分钟发送一次 HTTP 请求以获取和更新 NameServer 地址列表,初始延迟 10 秒。默认情况下,访问的 HTTP 地址为:

http://jmenv.tbsite.net:8080/rocketmq/nsaddr

可以通过 JVM 参数 rocketmq.namesrv.domain 修改默认的 jmenv.tbsite.net,

可以通过 JVM 参数 rocketmq.namesrv.domain.subgroup 修改默认的 nsaddr

 

以上设置方式的优先级为:编程方式 > JVM 参数 > 环境变量 > HTTP 方式

 

5.4 JVM 的最佳实践

(1)首先推荐使用最新发布的 JDK 1.8 版本,另外设置 RocketMQ 占用的内存大小时,默认情况下 Broker 启动时会占用 8G 的内存,可以通过设置相同的 Xms 和 Xmx 值来防止 JVM 调整堆大小以获得更好的性能:

-server -Xms8g -Xmx8g -Xmn4g

如果不关心 Broker 的启动时间,可以设置预先分配堆内存的方式来避免 Broker 启动后到处寻找内存的情况。

(2)在存在大量锁对象的创建并高度并发的环境下禁用偏向锁能够带来一定的性能优化:

-XX:-UseBiasedLocking

(3)垃圾回收策略:建议使用 G1 收集器,因为当程序的堆内存占用超过 6G 并且程序需要低延迟、高吞吐,那么建议使用 G1 收集器,并且 G1 收集器可以在老年代和新生代中使用

-XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30

另外 GC 的停止时间:-XX:MaxGCPauseMillis 不要设置太小的值,否则 JVM 将使用一个小的新生代来保证GC时间短的设置,但是这将会导致新生代的频繁 GC

(4)推荐使用滚动 GC 日志文件:

-XX:UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m

如果写入 GC 文件会增加服务器的延迟,请考略将GC 日志文件重定向到Linux 操作系统的内存文件系统中

-Xloggc:/dev/shm/mq_gc_%p.log

 

6、集群部署 

准备两主两从以及两个 NameServer 共六台机器(当然也可以是通过多个端口启动多个服务的伪集群方式),两主两从通过下面的配置文件来决定主从关系、内存写入磁盘的方式、主从之间同步的方式、注册到 NameServer 的地址等。

#所属集群名字
brokerClusterName=DefaultCluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a|broker-b
#0表示Master,>0表示Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=10.10.1.31:9876;10.10.1.32:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/alibaba-rocketmq/store
#commitLog 存储路径
storePathCommitLog=/usr/local/alibaba-rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/alibaba-rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/alibaba-rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/alibaba-rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/alibaba-rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=ASYNC_MASTER
#abort 文件存储路径
abortFile=/usr/local/alibaba-rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=ASYNC_MASTER

# 定时任务级别
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
# 事务检查时间,默认一分钟
transactionCheckInterval =60000

 

 

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/championzgj/article/details/90726080

智能推荐

c# 调用c++ lib静态库_c#调用lib-程序员宅基地

文章浏览阅读2w次,点赞7次,收藏51次。四个步骤1.创建C++ Win32项目动态库dll 2.在Win32项目动态库中添加 外部依赖项 lib头文件和lib库3.导出C接口4.c#调用c++动态库开始你的表演...①创建一个空白的解决方案,在解决方案中添加 Visual C++ , Win32 项目空白解决方案的创建:添加Visual C++ , Win32 项目这......_c#调用lib

deepin/ubuntu安装苹方字体-程序员宅基地

文章浏览阅读4.6k次。苹方字体是苹果系统上的黑体,挺好看的。注重颜值的网站都会使用,例如知乎:font-family: -apple-system, BlinkMacSystemFont, Helvetica Neue, PingFang SC, Microsoft YaHei, Source Han Sans SC, Noto Sans CJK SC, W..._ubuntu pingfang

html表单常见操作汇总_html表单的处理程序有那些-程序员宅基地

文章浏览阅读159次。表单表单概述表单标签表单域按钮控件demo表单标签表单标签基本语法结构<form action="处理数据程序的url地址“ method=”get|post“ name="表单名称”></form><!--action,当提交表单时,向何处发送表单中的数据,地址可以是相对地址也可以是绝对地址--><!--method将表单中的数据传送给服务器处理,get方式直接显示在url地址中,数据可以被缓存,且长度有限制;而post方式数据隐藏传输,_html表单的处理程序有那些

PHP设置谷歌验证器(Google Authenticator)实现操作二步验证_php otp 验证器-程序员宅基地

文章浏览阅读1.2k次。使用说明:开启Google的登陆二步验证(即Google Authenticator服务)后用户登陆时需要输入额外由手机客户端生成的一次性密码。实现Google Authenticator功能需要服务器端和客户端的支持。服务器端负责密钥的生成、验证一次性密码是否正确。客户端记录密钥后生成一次性密码。下载谷歌验证类库文件放到项目合适位置(我这边放在项目Vender下面)https://github.com/PHPGangsta/GoogleAuthenticatorPHP代码示例://引入谷_php otp 验证器

【Python】matplotlib.plot画图横坐标混乱及间隔处理_matplotlib更改横轴间距-程序员宅基地

文章浏览阅读4.3k次,点赞5次,收藏11次。matplotlib.plot画图横坐标混乱及间隔处理_matplotlib更改横轴间距

docker — 容器存储_docker 保存容器-程序员宅基地

文章浏览阅读2.2k次。①Storage driver 处理各镜像层及容器层的处理细节,实现了多层数据的堆叠,为用户 提供了多层数据合并后的统一视图②所有 Storage driver 都使用可堆叠图像层和写时复制(CoW)策略③docker info 命令可查看当系统上的 storage driver主要用于测试目的,不建议用于生成环境。_docker 保存容器

随便推点

网络拓扑结构_网络拓扑csdn-程序员宅基地

文章浏览阅读834次,点赞27次,收藏13次。网络拓扑结构是指计算机网络中各组件(如计算机、服务器、打印机、路由器、交换机等设备)及其连接线路在物理布局或逻辑构型上的排列形式。这种布局不仅描述了设备间的实际物理连接方式,也决定了数据在网络中流动的路径和方式。不同的网络拓扑结构影响着网络的性能、可靠性、可扩展性及管理维护的难易程度。_网络拓扑csdn

JS重写Date函数,兼容IOS系统_date.prototype 将所有 ios-程序员宅基地

文章浏览阅读1.8k次,点赞5次,收藏8次。IOS系统Date的坑要创建一个指定时间的new Date对象时,通常的做法是:new Date("2020-09-21 11:11:00")这行代码在 PC 端和安卓端都是正常的,而在 iOS 端则会提示 Invalid Date 无效日期。在IOS年月日中间的横岗许换成斜杠,也就是new Date("2020/09/21 11:11:00")通常为了兼容IOS的这个坑,需要做一些额外的特殊处理,笔者在开发的时候经常会忘了兼容IOS系统。所以就想试着重写Date函数,一劳永逸,避免每次ne_date.prototype 将所有 ios

如何将EXCEL表导入plsql数据库中-程序员宅基地

文章浏览阅读5.3k次。方法一:用PLSQL Developer工具。 1 在PLSQL Developer的sql window里输入select * from test for update; 2 按F8执行 3 打开锁, 再按一下加号. 鼠标点到第一列的列头,使全列成选中状态,然后粘贴,最后commit提交即可。(前提..._excel导入pl/sql

Git常用命令速查手册-程序员宅基地

文章浏览阅读83次。Git常用命令速查手册1、初始化仓库git init2、将文件添加到仓库git add 文件名 # 将工作区的某个文件添加到暂存区 git add -u # 添加所有被tracked文件中被修改或删除的文件信息到暂存区,不处理untracked的文件git add -A # 添加所有被tracked文件中被修改或删除的文件信息到暂存区,包括untracked的文件...

分享119个ASP.NET源码总有一个是你想要的_千博二手车源码v2023 build 1120-程序员宅基地

文章浏览阅读202次。分享119个ASP.NET源码总有一个是你想要的_千博二手车源码v2023 build 1120

【C++缺省函数】 空类默认产生的6个类成员函数_空类默认产生哪些类成员函数-程序员宅基地

文章浏览阅读1.8k次。版权声明:转载请注明出处 http://blog.csdn.net/irean_lau。目录(?)[+]1、缺省构造函数。2、缺省拷贝构造函数。3、 缺省析构函数。4、缺省赋值运算符。5、缺省取址运算符。6、 缺省取址运算符 const。[cpp] view plain copy_空类默认产生哪些类成员函数

推荐文章

热门文章

相关标签