Fetch Kafka Messages in a specified Time range

JCR
2 min readNov 20, 2020

--

Below is the step by step procedure in java to fetch messages from kafka topic in the specified time range.

We can achieve this in four steps..

  1. Assign all the Topic partitions to your consumer.
  2. Get the offsets in the partitions for the specified start time.
  3. Feed those offsets to the consumer
  4. Poll the topic from which you want to consume the messages

Lets Begin Coding..

Getting necessary inputs like Kafka server address, topic name, number of partitions, start time and end time

String bootstrapServers = args[0];
String topic = args[1];
int numberOfPartitions = Integer.parseInt(args[2]);
long start = Long.parseLong(args[3]);
long end = Long.parseLong(args[4]);

Create TopicPartition Objects and assign those partitions to the consumer

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
TopicPartition[] topicPartitions = new TopicPartition[partitions];
for(int i=0;i<numberOfPartitions;i++) {
TopicPartition partition = new TopicPartition(topic, i);
topicPartitions[i] = partition;

}
consumer.assign(Arrays.asList(topicPartitions));

Now Fetch the offsets for the specified start time

Map<TopicPartition,Long> timeStamps = new HashMap<TopicPartition, Long>();for(int i=0;i<numberOfPartitions;i++){
timeStamps.put(topicPartitions[i], start);
}
Map<TopicPartition,OffsetAndTimestamp> partitionOffsetMap = consumer.offsetsForTimes(timeStamps);

Feed the obtained offsets to the consumer

for(int i=0;i<numberOfPartitions;i++) {
OffsetAndTimestamp offsetAndTimestamp = partitionOffsetMap.get(topicPartitions[i]);

if(offsetAndTimestamp != null){
long offset = offsetAndTimestamp.offset();
consumer.seek(topicPartitions[i],offset);
}
}

Now poll the consumer

boolean flag = true;

long count = 0L;

while (flag) {
ConsumerRecords<String, String> records = consumer.poll(Duration.millis(1000));
count = count + records.count();
System.out.println("Record Fetch Count : " + count);
if(records != null && !records.isEmpty()) {
for (ConsumerRecord<String, String> record : records) {
System.out.println(record);
if (record.timestamp() >= end) {
flag = false;
break;
}
}
} else{
break;
}
}
consumer.close();

Done.. If you run this code, you will get messages like this

ConsumerRecord(topic = test-events, partition = 0, leaderEpoch = null, offset = 11, CreateTime = 1605855312434, serialized key size = -1, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message twelve)

ConsumerRecord(topic = test-events, partition = 0, leaderEpoch = null, offset = 12, CreateTime = 1605855317735, serialized key size = -1, serialized value size = 16, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message thirteen)

ConsumerRecord(topic = test-events, partition = 0, leaderEpoch = null, offset = 13, CreateTime = 1605855322106, serialized key size = -1, serialized value size = 16, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message fourteen)

Here is the complete code for your reference

Happy Learning :)

--

--

JCR
JCR

Responses (1)