Below is the step by step procedure in java to fetch messages from kafka topic in the specified time range.
We can achieve this in four steps..
- Assign all the Topic partitions to your consumer.
- Get the offsets in the partitions for the specified start time.
- Feed those offsets to the consumer
- Poll the topic from which you want to consume the messages
Lets Begin Coding..
Getting necessary inputs like Kafka server address, topic name, number of partitions, start time and end time
String bootstrapServers = args[0];
String topic = args[1];
int numberOfPartitions = Integer.parseInt(args[2]);
long start = Long.parseLong(args[3]);
long end = Long.parseLong(args[4]);
Create TopicPartition Objects and assign those partitions to the consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
TopicPartition[] topicPartitions = new TopicPartition[partitions];for(int i=0;i<numberOfPartitions;i++) {
TopicPartition partition = new TopicPartition(topic, i);
topicPartitions[i] = partition;
}
consumer.assign(Arrays.asList(topicPartitions));
Now Fetch the offsets for the specified start time
Map<TopicPartition,Long> timeStamps = new HashMap<TopicPartition, Long>();for(int i=0;i<numberOfPartitions;i++){
timeStamps.put(topicPartitions[i], start);
}Map<TopicPartition,OffsetAndTimestamp> partitionOffsetMap = consumer.offsetsForTimes(timeStamps);
Feed the obtained offsets to the consumer
for(int i=0;i<numberOfPartitions;i++) {
OffsetAndTimestamp offsetAndTimestamp = partitionOffsetMap.get(topicPartitions[i]);
if(offsetAndTimestamp != null){
long offset = offsetAndTimestamp.offset();
consumer.seek(topicPartitions[i],offset);
}
}
Now poll the consumer
boolean flag = true;
long count = 0L;
while (flag) {
ConsumerRecords<String, String> records = consumer.poll(Duration.millis(1000)); count = count + records.count();
System.out.println("Record Fetch Count : " + count);
if(records != null && !records.isEmpty()) {
for (ConsumerRecord<String, String> record : records) {
System.out.println(record);
if (record.timestamp() >= end) {
flag = false;
break;
}
}
} else{
break;
}
}
consumer.close();
Done.. If you run this code, you will get messages like this
ConsumerRecord(topic = test-events, partition = 0, leaderEpoch = null, offset = 11, CreateTime = 1605855312434, serialized key size = -1, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message twelve)
ConsumerRecord(topic = test-events, partition = 0, leaderEpoch = null, offset = 12, CreateTime = 1605855317735, serialized key size = -1, serialized value size = 16, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message thirteen)
ConsumerRecord(topic = test-events, partition = 0, leaderEpoch = null, offset = 13, CreateTime = 1605855322106, serialized key size = -1, serialized value size = 16, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message fourteen)
Here is the complete code for your reference
Happy Learning :)