[java] How can I send large messages with Kafka (over 15MB)?

I send String-messages to Kafka V. 0.8 with the Java Producer API. If the message size is about 15 MB I get a MessageSizeTooLargeException. I have tried to set message.max.bytesto 40 MB, but I still get the exception. Small messages worked without problems.

(The exception appear in the producer, I don't have a consumer in this application.)

What can I do to get rid of this exception?

My example producer config

private ProducerConfig kafkaConfig() {
    Properties props = new Properties();
    props.put("metadata.broker.list", BROKERS);
    props.put("serializer.class", "kafka.serializer.StringEncoder");
    props.put("request.required.acks", "1");
    props.put("message.max.bytes", "" + 1024 * 1024 * 40);
    return new ProducerConfig(props);
}

Error-Log:

4709 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with correlation id 214 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
4869 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with    correlation id 217 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5035 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with   correlation id 220 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5198 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with correlation id 223 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5305 [main] ERROR kafka.producer.async.DefaultEventHandler  - Failed to send requests for topics datasift with correlation ids in [213,224]

kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(Unknown Source)
at kafka.producer.Producer.send(Unknown Source)
at kafka.javaapi.producer.Producer.send(Unknown Source)

This question is related to java apache-kafka

The answer is


You need to adjust three (or four) properties:

  • Consumer side:fetch.message.max.bytes - this will determine the largest size of a message that can be fetched by the consumer.
  • Broker side: replica.fetch.max.bytes - this will allow for the replicas in the brokers to send messages within the cluster and make sure the messages are replicated correctly. If this is too small, then the message will never be replicated, and therefore, the consumer will never see the message because the message will never be committed (fully replicated).
  • Broker side: message.max.bytes - this is the largest size of the message that can be received by the broker from a producer.
  • Broker side (per topic): max.message.bytes - this is the largest size of the message the broker will allow to be appended to the topic. This size is validated pre-compression. (Defaults to broker's message.max.bytes.)

I found out the hard way about number 2 - you don't get ANY exceptions, messages, or warnings from Kafka, so be sure to consider this when you are sending large messages.


Minor changes required for Kafka 0.10 and the new consumer compared to laughing_man's answer:

  • Broker: No changes, you still need to increase properties message.max.bytes and replica.fetch.max.bytes. message.max.bytes has to be equal or smaller(*) than replica.fetch.max.bytes.
  • Producer: Increase max.request.size to send the larger message.
  • Consumer: Increase max.partition.fetch.bytes to receive larger messages.

(*) Read the comments to learn more about message.max.bytes<=replica.fetch.max.bytes


The answer from @laughing_man is quite accurate. But still, I wanted to give a recommendation which I learned from Kafka expert Stephane Maarek.

Kafka isn’t meant to handle large messages.

Your API should use cloud storage (Ex AWS S3), and just push to Kafka or any message broker a reference of S3. You must find somewhere to persist your data, maybe it’s a network drive, maybe it’s whatever, but it shouldn't be message broker.

Now, if you don’t want to go with the above solution

The message max size is 1MB (the setting in your brokers is called message.max.bytes) Apache Kafka. If you really needed it badly, you could increase that size and make sure to increase the network buffers for your producers and consumers.

And if you really care about splitting your message, make sure each message split has the exact same key so that it gets pushed to the same partition, and your message content should report a “part id” so that your consumer can fully reconstruct the message.

You can also explore compression, if your message is text-based (gzip, snappy, lz4 compression) which may reduce the data size, but not magically.

Again, you have to use an external system to store that data and just push an external reference to Kafka. That is a very common architecture and one you should go with and widely accepted.

Keep that in mind Kafka works best only if the messages are huge in amount but not in size.

Source: https://www.quora.com/How-do-I-send-Large-messages-80-MB-in-Kafka


For people using landoop kafka: You can pass the config values in the environment variables like:

docker run -d --rm -p 2181:2181 -p 3030:3030 -p 8081-8083:8081-8083  -p 9581-9585:9581-9585 -p 9092:9092
 -e KAFKA_TOPIC_MAX_MESSAGE_BYTES=15728640 -e KAFKA_REPLICA_FETCH_MAX_BYTES=15728640  landoop/fast-data-dev:latest `

And if you're usind rdkafka then pass the message.max.bytes in the producer config like:

  const producer = new Kafka.Producer({
        'metadata.broker.list': 'localhost:9092',
        'message.max.bytes': '15728640',
        'dr_cb': true
    });

Similarly, for the consumer,

  const kafkaConf = {
   "group.id": "librd-test",
   "fetch.message.max.bytes":"15728640",
   ... .. }                                                                                                                                                                                                                                                      

The idea is to have equal size of message being sent from Kafka Producer to Kafka Broker and then received by Kafka Consumer i.e.

Kafka producer --> Kafka Broker --> Kafka Consumer

Suppose if the requirement is to send 15MB of message, then the Producer, the Broker and the Consumer, all three, needs to be in sync.

Kafka Producer sends 15 MB --> Kafka Broker Allows/Stores 15 MB --> Kafka Consumer receives 15 MB

The setting therefore should be:

a) on Broker:

message.max.bytes=15728640 
replica.fetch.max.bytes=15728640

b) on Consumer:

fetch.message.max.bytes=15728640

One key thing to remember that message.max.bytes attribute must be in sync with the consumer's fetch.message.max.bytes property. the fetch size must be at least as large as the maximum message size otherwise there could be situation where producers can send messages larger than the consumer can consume/fetch. It might worth taking a look at it.
Which version of Kafka you are using? Also provide some more details trace that you are getting. is there some thing like ... payload size of xxxx larger than 1000000 coming up in the log?


You need to override the following properties:

Broker Configs($KAFKA_HOME/config/server.properties)

  • replica.fetch.max.bytes
  • message.max.bytes

Consumer Configs($KAFKA_HOME/config/consumer.properties)
This step didn't work for me. I add it to the consumer app and it was working fine

  • fetch.message.max.bytes

Restart the server.

look at this documentation for more info: http://kafka.apache.org/08/configuration.html