[java] Java, How to get number of messages in a topic in apache kafka

I am using apache kafka for messaging. I have implemented the producer and consumer in Java. How can we get the number of messages in a topic?

This question is related to java messages apache-kafka

The answer is


You may use kafkatool. Please check this link -> http://www.kafkatool.com/download.html

Kafka Tool is a GUI application for managing and using Apache Kafka clusters. It provides an intuitive UI that allows one to quickly view objects within a Kafka cluster as well as the messages stored in the topics of the cluster. enter image description here


Using the Java client of Kafka 2.11-1.0.0, you can do the following thing :

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Collections.singletonList("test"));
    while(true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());

            // after each message, query the number of messages of the topic
            Set<TopicPartition> partitions = consumer.assignment();
            Map<TopicPartition, Long> offsets = consumer.endOffsets(partitions);
            for(TopicPartition partition : offsets.keySet()) {
                System.out.printf("partition %s is at %d\n", partition.topic(), offsets.get(partition));
            }
        }
    }

Output is something like this :

offset = 10, key = null, value = un
partition test is at 13
offset = 11, key = null, value = deux
partition test is at 13
offset = 12, key = null, value = trois
partition test is at 13

Excerpts from Kafka docs

Deprecations in 0.9.0.0

The kafka-consumer-offset-checker.sh (kafka.tools.ConsumerOffsetChecker) has been deprecated. Going forward, please use kafka-consumer-groups.sh (kafka.admin.ConsumerGroupCommand) for this functionality.

I am running Kafka broker with SSL enabled for both server and client. Below command I use

kafka-consumer-groups.sh --bootstrap-server Broker_IP:Port --list --command-config /tmp/ssl_config kafka-consumer-groups.sh --bootstrap-server Broker_IP:Port --command-config /tmp/ssl_config --describe --group group_name_x

where /tmp/ssl_config is as below

security.protocol=SSL
ssl.truststore.location=truststore_file_path.jks
ssl.truststore.password=truststore_password
ssl.keystore.location=keystore_file_path.jks
ssl.keystore.password=keystore_password
ssl.key.password=key_password

I had this same question and this is how I am doing it, from a KafkaConsumer, in Kotlin:

val messageCount = consumer.listTopics().entries.filter { it.key == topicName }
    .map {
        it.value.map { topicInfo -> TopicPartition(topicInfo.topic(), topicInfo.partition()) }
    }.map { consumer.endOffsets(it).values.sum() - consumer.beginningOffsets(it).values.sum()}
    .first()

Very rough code, as I just got this to work, but basically you want to subtract the topic's beginning offset from the ending offset and this will be the current message count for the topic.

You can't just rely on the end offset because of other configurations (cleanup policy, retention-ms, etc.) that may end up causing the deletion old messages from your topic. Offsets only "move" forward, so it is the beggining offset that will move forward closer to the end offset (or eventually to the same value, if the topic contains no message right now).

Basically the end offset represents the overall number of messages that went through that topic, and the difference between the two represent the number of messages that the topic contains right now.


Apache Kafka command to get un handled messages on all partitions of a topic:

kafka-run-class kafka.tools.ConsumerOffsetChecker 
    --topic test --zookeeper localhost:2181 
    --group test_group

Prints:

Group      Topic        Pid Offset          logSize         Lag             Owner
test_group test         0   11051           11053           2               none
test_group test         1   10810           10812           2               none
test_group test         2   11027           11028           1               none

Column 6 is the un-handled messages. Add them up like this:

kafka-run-class kafka.tools.ConsumerOffsetChecker 
    --topic test --zookeeper localhost:2181 
    --group test_group 2>/dev/null | awk 'NR>1 {sum += $6} 
    END {print sum}'

awk reads the rows, skips the header line and adds up the 6th column and at the end prints the sum.

Prints

5

Sometimes the interest is in knowing the number of messages in each partition, for example, when testing a custom partitioner.The ensuing steps have been tested to work with Kafka 0.10.2.1-2 from Confluent 3.2. Given a Kafka topic, kt and the following command-line:

$ kafka-run-class kafka.tools.GetOffsetShell \
  --broker-list host01:9092,host02:9092,host02:9092 --topic kt

That prints the sample output showing the count of messages in the three partitions:

kt:2:6138
kt:1:6123
kt:0:6137

The number of lines could be more or less depending on the number of partitions for the topic.


Since ConsumerOffsetChecker is no longer supported, you can use this command to check all messages in topic:

bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand \
    --group my-group \
    --bootstrap-server localhost:9092 \
    --describe

Where LAG is the count of messages in topic partition:

enter image description here

Also you can try to use kafkacat. This is an open source project that may help you to read messages from a topic and partition and prints them to stdout. Here is a sample that reads the last 10 messages from sample-kafka-topic topic, then exit:

kafkacat -b localhost:9092 -t sample-kafka-topic -p 0 -o -10 -e

The simplest way I've found is to use the Kafdrop REST API /topic/topicName and specify the key: "Accept" / value: "application/json" header in order to get back a JSON response.

This is documented here.


To get all the messages stored for the topic you can seek the consumer to the beginning and end of the stream for each partition and sum the results

List<TopicPartition> partitions = consumer.partitionsFor(topic).stream()
        .map(p -> new TopicPartition(topic, p.partition()))
        .collect(Collectors.toList());
    consumer.assign(partitions); 
    consumer.seekToEnd(Collections.emptySet());
Map<TopicPartition, Long> endPartitions = partitions.stream()
        .collect(Collectors.toMap(Function.identity(), consumer::position));
    consumer.seekToBeginning(Collections.emptySet());
System.out.println(partitions.stream().mapToLong(p -> endPartitions.get(p) - consumer.position(p)).sum());

It is not java, but may be useful

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell 
  --broker-list <broker>:  <port> 
  --topic <topic-name> --time -1 --offsets 1 
  | awk -F  ":" '{sum += $3} END {print sum}'

I actually use this for benchmarking my POC. The item you want to use ConsumerOffsetChecker. You can run it using bash script like below.

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker  --topic test --zookeeper localhost:2181 --group testgroup

And below is the result : enter image description here As you can see on the red box, 999 is the number of message currently in the topic.

Update: ConsumerOffsetChecker is deprecated since 0.10.0, you may want to start using ConsumerGroupCommand.


In most recent versions of Kafka Manager, there is a column titled Summed Recent Offsets.

enter image description here


Use https://prestodb.io/docs/current/connector/kafka-tutorial.html

A super SQL engine, provided by Facebook, that connects on several data sources (Cassandra, Kafka, JMX, Redis ...).

PrestoDB is running as a server with optional workers (there is a standalone mode without extra workers), then you use a small executable JAR (called presto CLI) to make queries.

Once you have configured well the Presto server , you can use traditionnal SQL:

SELECT count(*) FROM TOPIC_NAME;

Run the following (assuming kafka-console-consumer.sh is on the path):

kafka-console-consumer.sh  --from-beginning \
--bootstrap-server yourbroker:9092 --property print.key=true  \
--property print.value=false --property print.partition \
--topic yourtopic --timeout-ms 5000 | tail -n 10|grep "Processed a total of"

If you have access to server's JMX interface, the start & end offsets are present at:

kafka.log:type=Log,name=LogStartOffset,topic=TOPICNAME,partition=PARTITIONNUMBER
kafka.log:type=Log,name=LogEndOffset,topic=TOPICNAME,partition=PARTITIONNUMBER

(you need to replace TOPICNAME & PARTITIONNUMBER). Bear in mind you need to check for each of the replicas of given partition, or you need to find out which one of the brokers is the leader for a given partition (and this can change over time).

Alternatively, you can use Kafka Consumer methods beginningOffsets and endOffsets.


I haven't tried this myself, but it seems to make sense.

You can also use kafka.tools.ConsumerOffsetChecker (source).