1. 本实例使用的RocketMq版本为4.3.2, JDK>=1.8
2. 启动rocketmq, 在安装目录的bin目录下
启动 NAMESERVER start mqnamesrv.cmd
启动 BROKER start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true


3. rocketmq jar包
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.3.2</version> </dependency>
4. application.properties
# 消费者组名 apache.rocketmq.consumer.PushConsumer=PushConsumer2 # 生产者的组名 apache.rocketmq.producer.producerGroup=Producer4 # NameServer的地址 apache.rocketmq.namesrvAddr=127.0.0.1:9876
Topic 默认对应四个队列 queue, 队列中有生产者和消费者, 生产者组成生产者组, 消费者组成消费组
有相同名称的是属于同一个组的, 一个queue对应一个消费者, 但一个消费者对应多个queue
setConsumeMessageBatchMaxSize ---- 设置批量消费消息的大小 (1: 一次一条 n: 一条多条)
消息消费完成后, 如果消费者返回CONSUME_SUCCESS则消息成功被消费
如果消费者返回RECONSUME_LATER 则消息在一段时间后会被再次消费
produce0 -------> queue0 --------> consume0
ProduceGroup produce1 -------> queue1 --------> consume1 ConsumeGroup
produce2 -------> queue2 --------> consume2
produce3 -------> queue3 --------> consume3
5. 事务生产者代码
@Component
public class TransactionProducer {
@Value("${apache.rocketmq.producer.producerGroup}")
private String producerGroup;
@Value("${apache.rocketmq.namesrvAddr}")
private String namesrvAddr;
public void transactionProduce() throws MQClientException, InterruptedException {
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer(producerGroup);
producer.setNamesrvAddr(namesrvAddr);
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 1; i <= 2; i++) {
try {
Message msg =
new Message("TopicTest1234", "transaction"+i, "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, Integer.parseInt("1"));
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
/*for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}*/
producer.shutdown();
}
}6. 消息事务回调提交
public class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
System.out.println("执行本地事务msg = " + new String(msg.getBody()));
System.out.println("执行本地事务arg = " + arg);
String tags = msg.getTags();
if (tags.equals("transaction2")) {
System.out.println("======我的操作============,失败了 -进行ROLLBACK");
return LocalTransactionState.ROLLBACK_MESSAGE;
}
return LocalTransactionState.COMMIT_MESSAGE;
// return LocalTransactionState.UNKNOW
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Integer status = localTrans.get(msg.getTransactionId());
if (null != status) {
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}7. 消费者代码
@Component
public class RocketMQConsumer {
@Value("${apache.rocketmq.consumer.PushConsumer}")
private String consumerGroup;
@Value("${apache.rocketmq.namesrvAddr}")
private String namesrvAddr;
@PostConstruct
public void defaultMQPushConsumer() {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
//设置最多一次消费多少条
consumer.setConsumeMessageBatchMaxSize(1);
consumer.setNamesrvAddr(namesrvAddr);
// consumer.setMessageModel(MessageModel.CLUSTERING);
try {
consumer.subscribe("TopicTestA", "push");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener((MessageListenerConcurrently) (list, context) -> {
try {
for (MessageExt messageExt : list) {
System.out.println("messageExt: " + messageExt);// 输出消息内容
String messageBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
/*System.out.println("----消费响应:msgId : " + messageExt.getMsgId() + ", msgBody : " + messageBody
+ " " + Thread.currentThread().getName());*/
int reconsumeTimes = messageExt.getReconsumeTimes();
//消费次数为2时,将消息消费掉
if(reconsumeTimes >= 2){
System.out.println("----消费响应:msgId : " + messageExt.getMsgId() + ", msgBody : " + messageBody
+ " " + Thread.currentThread().getName()
+ " 消费次数:" + reconsumeTimes);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}else{
System.out.println("++++消费响应:msgId : " + messageExt.getMsgId() + ", msgBody : " + messageBody
+ " " + Thread.currentThread().getName()
+ " 消费次数:" + reconsumeTimes);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER; // 稍后再试
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 消费成功
});
consumer.start();
} catch (MQClientException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
下载Demo