Is there a way to delete all the data from a topic or delete the topic before every run?

Can I modify the KafkaConfig.scala file to change the logRetentionHours property? Is there a way the messages gets deleted as soon as the consumer reads it?

I am using producers to fetch the data from somewhere and sending the data to a particular topic where a consumer consumes, can I delete all the data from that topic on every run? I want only new data every time in the topic. Is there a way to reinitialize the topic somehow?

Below are scripts for emptying and deleting a Kafka topic assuming localhost as the zookeeper server and Kafka_Home is set to the install directory:

The script below will empty a topic by setting its retention time to 1 second and then removing the configuration:

echo "Enter name of topic to empty:"
read topicName
/$Kafka_Home/bin/kafka-configs --zookeeper localhost:2181 --alter --entity-type topics --entity-name $topicName --add-config retention.ms=1000
sleep 5
/$Kafka_Home/bin/kafka-configs --zookeeper localhost:2181 --alter --entity-type topics --entity-name $topicName --delete-config retention.ms

To fully delete topics you must stop any applicable kafka broker(s) and remove it's directory(s) from the kafka log dir (default: /tmp/kafka-logs) and then run this script to remove the topic from zookeeper. To verify it's been deleted from zookeeper the output of ls /brokers/topics should no longer include the topic:

echo "Enter name of topic to delete from zookeeper:"
read topicName
/$Kafka_Home/bin/zookeeper-shell localhost:2181 <<EOF
rmr /brokers/topics/$topicName
ls /brokers/topics

For brew users

If you're using brew like me and wasted a lot of time searching for the infamous kafka-logs folder, fear no more. (and please do let me know if that works for you and multiple different versions of Homebrew, Kafka etc :) )

You're probably going to find it under:



How to actually find that path

(this is also helpful for basically every app you install through brew)

1) brew services list

kafka started matbhz /Users/matbhz/Library/LaunchAgents/homebrew.mxcl.kafka.plist

2) Open and read that plist you found above

3) Find the line defining server.properties location open it, in my case:

  • /usr/local/etc/kafka/server.properties

4) Look for the log.dirs line:


5) Go to that location and delete the logs for the topics you wish

6) Restart Kafka with brew services restart kafka

All data about topics and its partitions are stored in tmp/kafka-logs/. Moreover they are stored in a format topic-partionNumber, so if you want to delete a topic newTopic, you can:

  • stop kafka
  • delete the files rm -rf /tmp/kafka-logs/newTopic-*

As a dirty workaround, you can adjust per-topic runtime retention settings, e.g. bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my_topic --config retention.bytes=1 (retention.bytes=0 might also work)

After a short while kafka should free the space. Not sure if this has any implications compared to re-creating the topic.

ps. Better bring retention settings back, once kafka done with cleaning.

You can also use retention.ms to persist historical data

I use this script:

topics=`kafka-topics --list --zookeeper zookeeper:2181`
for t in $topics; do 
    for p in retention.ms retention.bytes segment.ms segment.bytes; do
        kafka-topics --zookeeper zookeeper:2181 --alter --topic $t --config ${p}=100
sleep 60
for t in $topics; do 
    for p in retention.ms retention.bytes segment.ms segment.bytes; do
        kafka-topics --zookeeper zookeeper:2181 --alter --topic $t --delete-config ${p}

  1. Stop ZooKeeper and Kafka
  2. In server.properties, change log.retention.hours value. You can comment log.retention.hours and add log.retention.ms=1000. It would keep the record on Kafka Topic for only one second.
  3. Start zookeeper and kafka.
  4. Check on consumer console. When I opened the console for the first time, record was there. But when I opened the console again, the record was removed.
  5. Later on, you can set the value of log.retention.hours to your desired figure.

As I mentioned here Purge Kafka Queue:

Tested in Kafka 0.8.2, for the quick-start example: First, Add one line to server.properties file under config folder:


then, you can run this command:

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test

Tested with kafka 0.10

1. stop zookeeper & Kafka server,
2. then go to 'kafka-logs' folder , there you will see list of kafka topic folders, delete folder with topic name
3. go to 'zookeeper-data' folder , delete data inside that.
4. start zookeeper & kafka server again.

Note : if you are deleting topic folder/s inside kafka-logs but not from zookeeper-data folder, then you will see topics are still there.

In manually deleting a topic from a kafka cluster , you just might check this out https://github.com/darrenfu/bigdata/issues/6 A vital step missed a lot in most solution is in deleting the /config/topics/<topic_name> in ZK.

We tried pretty much what the other answers are describing with moderate level of success. What really worked for us (Apache Kafka 0.8.1) is the class command

sh kafka-run-class.sh kafka.admin.DeleteTopicCommand --topic yourtopic --zookeeper localhost:2181

As of kafka 2.3.0 version, there is an alternate way to soft deletion of Kafka (old approach are deprecated ).

Update retention.ms to 1 sec (1000ms) then set it again after a min, to default setting i.e 7 days (168 hours, 604,800,000 in ms )

Soft deletion:- (rentention.ms=1000) (using kafka-configs.sh)

bin/kafka-configs.sh --zookeeper --alter --entity-name kafka_topic3p3r --entity-type topics  --add-config retention.ms=1000
Completed Updating config for entity: topic 'kafka_topic3p3r'.

Setting to default:- 7 days (168 hours , retention.ms= 604800000)

bin/kafka-configs.sh --zookeeper --alter --entity-name kafka_topic3p3r --entity-type topics  --add-config retention.ms=604800000

I use the utility below to cleanup after my integration test run.

It uses the latest AdminZkClient api. The older api has been deprecated.

import javax.inject.Inject
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.common.utils.Time

class ZookeeperUtils @Inject() (config: AppConfig) {

  val testTopic = "users_1"

  val zkHost = config.KafkaConfig.zkHost
  val sessionTimeoutMs = 10 * 1000
  val connectionTimeoutMs = 60 * 1000
  val isSecure = false
  val maxInFlightRequests = 10
  val time: Time = Time.SYSTEM

  def cleanupTopic(config: AppConfig) = {

    val zkClient = KafkaZkClient.apply(zkHost, isSecure, sessionTimeoutMs, connectionTimeoutMs, maxInFlightRequests, time)
    val zkUtils = new AdminZkClient(zkClient)

    val pp = new Properties()
    pp.setProperty("delete.retention.ms", "10")
    pp.setProperty("file.delete.delay.ms", "1000")
    zkUtils.changeTopicConfig(testTopic , pp)
    //    zkUtils.deleteTopic(testTopic)

    println("Waiting for topic to be purged. Then reset to retain records for the run")

    val resetProps = new Properties()
    resetProps.setProperty("delete.retention.ms", "3000000")
    resetProps.setProperty("file.delete.delay.ms", "4000000")
    zkUtils.changeTopicConfig(testTopic , resetProps)



There is an option delete topic. But, it marks the topic for deletion. Zookeeper later deletes the topic. Since this can be unpredictably long, I prefer the retention.ms approach