EU e-Privacy Directive

    This website uses cookies to manage authentication, navigation, and other functions. By using our website, you agree that we can place these types of cookies on your device.

    View e-Privacy Directive Documents

    View GDPR Documents

    You have declined cookies. This decision can be reversed.

    How Kafka commits messages

    Every message your producers send to a Kafka partition has an offset—a sequential index number that identifies each message. To keep track of which messages have already been processed, your consumer needs to commit the offsets of the messages that were processed.

    How Kafka commits messages

    There are different ways in which commit can happen and each way has its own pros andcons. Let's see them in detail:

    Auto-committing messages

    The default configuration of the consumer is to auto-commit messages. Consumer auto-commits the offset of the latest read messages at the configured interval of time. If we make enable.auto.commit = true and set auto.commit.interval.ms=2000 , then consumer will commit the offset every two seconds. There are certain risks associated with this option. For example, you set the interval to 10 seconds and consumer starts consuming the data. At the seventh second, your consumer fails, what will happen? Consumer hasn't committed the read offset yet so when it starts again, it will start reading from the start of the last committed offset and this will lead to duplicates.

    Using Current offset commit

    Most of the time, we may want to have control over committing an offset when required. Kafka provides you with an API to enable this feature. We first need to do enable.auto.commit = false and then use the commitSync() method to call a commit offset from the consumer thread. This will commit the latest offset returned by polling. It would be better to use this method call after we process all instances of ConsumerRecord , otherwise there is a risk of losing records if consumer fails in between.

    while (true) {
    	ConsumerRecords<String, String> records = consumer.poll(2);
    	for (ConsumerRecord<String, String> record : records)System.out.printf("offset = %d, key = %s, value = %sn",
    	record.offset(), record.key(), record.value());
    	try {
    	} catch (CommitFailedException ex) {
    		//Logger or code to handle failed commit

    Asynchronous commit

    The problem with synchronous commit is that unless we receive an acknowledgment for a commit offset request from the Kafka server, consumer will be blocked. This will cause low throughput. It can be done by making commit happen asynchronously. However, there is a problem in asynchronous commit--it may lead to duplicate message processing in a few cases where the order of the commit offset changes. For example, offset of message 10 got committed before offset of message 5. In this case, Kafka will again serve message 5-10 to consumer as the latest offset 10 is overridden by 5.

    while (true) {
    	ConsumerRecords<String, String> records = consumer.poll(2);
    	for (ConsumerRecord<String, String> record : records)
    	System.out.printf("offset = %d, key = %s, value = %sn", record.offset(), record.key(), record.value());
    	consumer.commitAsync(new OffsetCommitCallback() {
    		public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
    FREE WildFly Application Server - JBoss - Quarkus - Drools Tutorials
    © 2020 masteringintegration.com. All Rights Reserved.

    Please publish modules in offcanvas position.