×

    Message

    EU e-Privacy Directive

    This website uses cookies to manage authentication, navigation, and other functions. By using our website, you agree that we can place these types of cookies on your device.

    View e-Privacy Directive Documents

    View GDPR Documents

    You have declined cookies. This decision can be reversed.

    Kafka simple Consumer and Consumer Group

    Kafka topics can be consumed using a single-threaded Consumer or using a multi-threaded Consumer. In this tutorial we will learn the differences between them.

    Let's start from the simplest use case of Consumer, which consumes messages serially in a single thread:

    import java.util.Properties;
    import java.util.Arrays;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    
    public class SimpleConsumer {
       public static void main(String[] args) throws Exception {
          if(args.length == 0){
             System.out.println("Enter topic name");
             return;
          }
          //Kafka consumer configuration settings
          String topicName = args[0].toString();
          Properties props = new Properties();
          
          props.put("bootstrap.servers", "localhost:9092");
          props.put("group.id", "test");
          props.put("enable.auto.commit", "true");
          props.put("auto.commit.interval.ms", "1000");
          props.put("session.timeout.ms", "30000");
          props.put("key.deserializer", 
             "org.apache.kafka.common.serializa-tion.StringDeserializer");
          props.put("value.deserializer", 
             "org.apache.kafka.common.serializa-tion.StringDeserializer");
          KafkaConsumer<String, String> consumer = new KafkaConsumer
             <String, String>(props);
          
          //Kafka Consumer subscribes list of topics here.
          consumer.subscribe(Arrays.asList(topicName))
          
          //print the topic name
          System.out.println("Subscribed to topic " + topicName);
          int i = 0;
          
          while (true) {
             ConsumerRecords<String, String> records = con-sumer.poll(100);
             for (ConsumerRecord<String, String> record : records)
             
             // print the offset,key and value for the consumer records.
             System.out.printf("offset = %d, key = %s, value = %s\n", 
                record.offset(), record.key(), record.value());
          }
       }
    }
    

    A Consumer group, on the other hand, is a multi-threaded or multi-machine consumption from Kafka topics.

    • Consumers can join a group by using the same"group.id."
    • The maximum parallelism of a group can be achieved when the number of consumers in the group equals to the number of partitions.
    • Kafka assigns the partitions of a topic to the consumer in a group, so that each partition is consumed by exactly one consumer in the group.
    • Kafka also guarantees that a message is consumed by a single consumer in the group.
    • Kafka Consumers can see the message in the order they were stored in the log.

    Here is an example of Consumer Group:

    package com.masteringintegration.kafka;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.errors.WakeupException;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.time.Duration;
    import java.util.Collections;
    import java.util.Properties;
    import java.util.concurrent.CountDownLatch;
    
    public class SampleKafkaConsumer {
    
        public static void main(String[] args) {
            String server = "127.0.0.1:9092";
            String groupId = "SampleKafkaConsumer";
            String topic = "testTopic";
    
            new SampleKafkaConsumer(server, groupId, topic).run();
        }
    
        // Variables
    
        private final Logger mLogger = LoggerFactory.getLogger(SampleKafkaConsumer.class.getName());
        private final String mBootstrapServer;
        private final String mGroupId;
        private final String mTopic;
    
        // Constructor
    
        SampleKafkaConsumer(String bootstrapServer, String groupId, String topic) {
            mBootstrapServer = bootstrapServer;
            mGroupId = groupId;
            mTopic = topic;
        }
    
        // Public
    
        void run() {
            mLogger.info("Creating consumer thread");
    
            CountDownLatch latch = new CountDownLatch(1);
    
            ConsumerRunnable consumerRunnable = new ConsumerRunnable(mBootstrapServer, mGroupId, mTopic, latch);
            Thread thread = new Thread(consumerRunnable);
            thread.start();
    
            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                mLogger.info("Caught shutdown hook");
                consumerRunnable.shutdown();
                await(latch);
    
                mLogger.info("Application has exited");
            }));
    
            await(latch);
        }
    
        // Private
    
        void await(CountDownLatch latch) {
            try {
                latch.await();
            } catch (InterruptedException e) {
                mLogger.error("Application got interrupted", e);
            } finally {
                mLogger.info("Application is closing");
            }
        }
    
        // Inner classes
    
        private class ConsumerRunnable implements Runnable {
    
            private CountDownLatch mLatch;
            private KafkaConsumer<String, String> mConsumer;
    
            ConsumerRunnable(String bootstrapServer, String groupId, String topic, CountDownLatch latch) {
                mLatch = latch;
    
                Properties props = consumerProps(bootstrapServer, groupId);
                mConsumer = new KafkaConsumer<>(props);
                mConsumer.subscribe(Collections.singletonList(topic));
            }
    
            @Override
            public void run() {
                try {
                    while (true) {
                        ConsumerRecords<String, String> records = mConsumer.poll(Duration.ofMillis(100));
    
                        for (ConsumerRecord<String, String> record : records) {
                            mLogger.info("Key: " + record.key() + ", Value: " + record.value());
                            mLogger.info("Partition: " + record.partition() + ", Offset: " + record.offset());
                        }
                    }
                } catch (WakeupException e) {
                    mLogger.info("Received shutdown signal!");
                } finally {
                    mConsumer.close();
                    mLatch.countDown();
                }
            }
    
            void shutdown() {
                mConsumer.wakeup();
            }
    
            private Properties consumerProps(String bootstrapServer, String groupId) {
                String deserializer = StringDeserializer.class.getName();
                Properties properties = new Properties();
                properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
                properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
                properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserializer);
                properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer);
                properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    
                return properties;
            }
        }
    }
    

    By adding more processes/threads will let Kafka to re-balance. That is, uf any consumer or broker fails to send heartbeat to ZooKeeper, then it can be re-configured via the Kafka cluster. During this re-balance, Kafka will assign available partitions to the available threads, possibly moving a partition to another process.

    FREE WildFly Application Server - JBoss - Quarkus - Drools Tutorials
    © 2020 masteringintegration.com. All Rights Reserved.

    Please publish modules in offcanvas position.