1. 本实例使用的RocketMq版本为4.3.2, JDK>=1.8
2. 启动rocketmq, 在安装目录的bin目录下
启动 NAMESERVER start mqnamesrv.cmd
启动 BROKER start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true


3. rocketmq jar包
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.3.2</version> </dependency>
4. application.properties
# 消费者组名 apache.rocketmq.consumer.PushConsumer=PushConsumer2 # 生产者的组名 apache.rocketmq.producer.producerGroup=Producer4 # NameServer的地址 apache.rocketmq.namesrvAddr=127.0.0.1:9876
Topic 默认对应四个队列 queue, 队列中有生产者和消费者, 生产者组成生产者组, 消费者组成消费组
有相同名称的是属于同一个组的, 一个queue对应一个消费者, 但一个消费者对应多个queue
setConsumeMessageBatchMaxSize ---- 设置批量消费消息的大小 (1: 一次一条 n: 一条多条)
消息消费完成后, 如果消费者返回CONSUME_SUCCESS则消息成功被消费
如果消费者返回RECONSUME_LATER 则消息在一段时间后会被再次消费
produce0 -------> queue0 --------> consume0
ProduceGroup produce1 -------> queue1 --------> consume1 ConsumeGroup
produce2 -------> queue2 --------> consume2
produce3 -------> queue3 --------> consume3
5. 生产者代码
@Component
public class RocketMQProduce {
@Value("${apache.rocketmq.producer.producerGroup}")
private String producerGroup;
@Value("${apache.rocketmq.namesrvAddr}")
private String namesrvAddr;
//@PostConstruct
public void defaultMQProducer() {
try {
DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
//producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");
producer.setNamesrvAddr(namesrvAddr);
producer.start();
// String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD",
// "TagE" };
for (int i = 1; i <= 1; i++) {
Message msg = new Message("TopicOrderTest", "order_1", "KEY" + i, ("order_1 " + i).getBytes());
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, Integer.valueOf(i));
System.out.println(sendResult);
}
producer.shutdown();
} catch (MQClientException e) {
e.printStackTrace();
} catch (RemotingException e) {
e.printStackTrace();
} catch (MQBrokerException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}6. 消费者代码 MessageListenerOrderly
对于同一个队列, 队列中的消息是同步消费的
如果是多个阶列, 多个队列中的消息是同步的
@Component
public class RocketMQConsumer {
@Value("${apache.rocketmq.consumer.PushConsumer}")
private String consumerGroup;
@Value("${apache.rocketmq.namesrvAddr}")
private String namesrvAddr;
@PostConstruct
public void defaultMQPushConsumer() throws Exception{
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
//consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");
consumer.setNamesrvAddr(namesrvAddr);
/**
* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
* 如果非第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//一次只处理一条消息, 处理成功后再提交, 失败可以针对单条进行重试
//如果这里设置多条,失败需要对多条进行重试
consumer.setConsumeMessageBatchMaxSize(1);
consumer.subscribe("TopicOrderTest", "order_1");
/**
* 注意: 只有被放在只一个queue中的消息才能被有序消费
* 这里的顺序消费的维度是 同一个topic对应的一个queue, 一个queue只会被一个线程处理
* ---- queue1 ----> thread1
* ---- queue2 ----> thread2
* ---- queue3 ----> thread3
* ---- queue4 ----> thread4
*/
consumer.registerMessageListener(new MessageListenerOrderly() {
AtomicLong consumeTimes = new AtomicLong(0);
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
// 设置自动提交
context.setAutoCommit(true);
System.out.println("msgs.size = " + msgs.size());
for (MessageExt msg : msgs) {
System.out.println("内容:" + new String(msg.getBody()) + " 线程名称:" + Thread.currentThread().getName() + " " + msg);
int reconsumeTimes = msg.getReconsumeTimes();
//第四次成功
if(reconsumeTimes >= 10){
//表明消息被消费
return ConsumeOrderlyStatus.SUCCESS;
}else{
//消息未被消费. 需要重试
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
try {
TimeUnit.SECONDS.sleep(2L);
} catch (InterruptedException e) {
e.printStackTrace();
}
;
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.println("Consumer1 Started.");
}
}
下载Demo