kafka如何实现延时消息
rabbitmq可以通过死信队列实现延时消费
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ExecutionException;
@SpringBootTest
public class DelayQueueTest {
private KafkaConsumer<String, String> consumer;
private KafkaProducer<String, String> producer;
private volatile Boolean exit = false;
private final Object lock = new Object();
private final String servers = "";
@BeforeEach
void initConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "d");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "5000");
consumer = new KafkaConsumer<>(props, new StringDeserializer(), new StringDeserializer());
}
@BeforeEach
void initProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producer = new KafkaProducer<>(props);
}
@Test
void testDelayQueue() throws JsonProcessingException, InterruptedException {
String topic = "delay-minutes-1";
List<String> topics = Collections.singletonList(topic);
consumer.subscribe(topics);
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
synchronized (lock) {
consumer.resume(consumer.paused());
lock.notify();
}
}
}, 0, 1000);
do {
synchronized (lock) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(200));
if (consumerRecords.isEmpty()) {
lock.wait();
continue;
}
boolean timed = false;
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
long timestamp = consumerRecord.timestamp();//
TopicPartition topicPartition = new TopicPartition(consumerRecord.topic(), consumerRecord.partition());
if (timestamp + 60 * 1000 < System.currentTimeMillis()) {
String value = consumerRecord.value();
ObjectMapper objectMapper = new ObjectMapper();
JsonNode jsonNode = objectMapper.readTree(value);
JsonNode jsonNodeTopic = jsonNode.get("topic");
String appTopic = null, appKey = null, appValue = null;
if (jsonNodeTopic != null) {
appTopic = jsonNodeTopic.asText();
}
if (appTopic == null) {
continue;
}
JsonNode jsonNodeKey = jsonNode.get("key");
if (jsonNodeKey != null) {
appKey = jsonNode.asText();
}
JsonNode jsonNodeValue = jsonNode.get("value");
if (jsonNodeValue != null) {
appValue = jsonNodeValue.asText();
}
// send to application topic
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(appTopic, appKey, appValue);
try {
producer.send(producerRecord).get();
// success. commit message
OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(consumerRecord.offset() + 1);
HashMap<TopicPartition, OffsetAndMetadata> metadataHashMap = new HashMap<>();
metadataHashMap.put(topicPartition, offsetAndMetadata);
consumer.commitSync(metadataHashMap);
} catch (ExecutionException e) {
consumer.pause(Collections.singletonList(topicPartition));
consumer.seek(topicPartition, consumerRecord.offset());
timed = true;
break;
}
} else {
consumer.pause(Collections.singletonList(topicPartition));
consumer.seek(topicPartition, consumerRecord.offset());
timed = true;
break;
}
}
if (timed) {
lock.wait();
}
}
} while (!exit);
}
}
- kafka是通过一个提交日志记录的方式来存储消息记录,采用拉模式,而RabbitMQ则采用队列的方式,属于推模式
- consumerRecord.timestamp()发送消息的时候,如果指定了timestamp则使用用户指定的,如果未指定则使用producer端的当前时间
- 在通过poll获取到consumerRecord数据时,校验timestamp是否达到了延时条件,如果达到了条件,则发送至另一topic
- 如果未达到延时条件,则通过seek重置消费位移
- 上述代码在单分区下没有问题,如果是多分区,需要对seek方法特殊处理,因为上殊操作当拉取到多个分区消息时对一个分区seek后即break了
- 处理方式,保存一个seek的集合,如果下一条消息的分区在该集合中,则不再处理,如果不在,则继续执行上述逻辑
当延时时长过长时,通过定时任务替换kafka的方式解决
如何保证消息顺序消费
- 如果是自生产自消费,可通过指定分区策略的方式实现按生产顺序消费,因为一个分区只会被一个消费者节点消费
- 如果不能指定分区策略,则存在两种情况
- 第一针对状态类的修改
- 因为都是单向的,所以通过状态流转校验消费消息
- 有可能两个状态均会到达某一节点,针对该情况,可校验消息发送时间戳,如果小于之前的时间戳,则根据业务只补充数据,不再修改状态
- 针对描述性修改
- 如果时间戳小于之前处理的时间戳或版本,则不再修改
- 第一针对状态类的修改