this is the second tutorial about creating a Java Producer an Consumer with Apache Kafka. In the first tutorial we have learnt how to set up a Maven project to run a Kafka Java Consumer and Producer (Kafka Tutorial: Creating a Java Producer and ConsumerNow we will code a more advanced use case, when custom Java types are used in messaging.

This project is composed of the following Classes:

  • SampleKafkaProducer: A standalone Java class which sends messages to a Kafka topic.
  • SampleKafkaConsumer: A standalone Java class which consumes Kafka messages from to a Topic
  • StockQuote: A Java Bean which is sent as a message
  • StockQuoteSerializer: A Java class which is used by Kafka to Serialize the StockQuote in a stream of bytes
  • StockQuoteDeserializer: A Java class which is used by Kafka to deserialize the stream of bytes into a StockQuote object

It is required that you start a single Node Kafka cluster as discussed in this tutorial: Kafka tutorial #2: Getting started with Kafka

Let's start from the SampleKafkaProducer class:

package com.masteringintegration.kafka;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

// Example class sending text Messages to Kafka cluster

public class SampleKafkaProducer extends Thread {
    private final KafkaProducer<String, StockQuote> producer;
    private final String topic;
    private final Boolean isAsync;

    public static final String KAFKA_SERVER_URL = "localhost";
    public static final int KAFKA_SERVER_PORT = 9092;
    public static final String CLIENT_ID = "SampleProducer";
    public static final String TOPIC = "testTopic";
    public static final int MESSAGES = 100;

    public SampleKafkaProducer(String topic, Boolean isAsync) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", KAFKA_SERVER_URL + ":" + KAFKA_SERVER_PORT);
        properties.put("client.id", CLIENT_ID);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "com.masteringintegration.kafka.StockQuoteSerializer");
        producer = new KafkaProducer(properties);
        this.topic = topic;
        this.isAsync = isAsync;
    }

    public static void main( String[] args )
    {

        boolean isAsync = false;
        SampleKafkaProducer producer = new SampleKafkaProducer(TOPIC, isAsync);
        // start the producer
        producer.start();
    }

    public void run() {
        int messageNo = 1;
        while (messageNo < MESSAGES) {
            String key = String.valueOf(messageNo);
            StockQuote quote = new StockQuote();
            long startTime = System.currentTimeMillis();
            if (isAsync) { // Send asynchronously
                producer.send(new ProducerRecord<>(topic,
                        key,
                        quote), new MessageCallBack(startTime, key, quote));
            } else { // Send synchronously
                try {
                    producer.send(new ProducerRecord<>(topic,
                            key,
                            quote)).get();
                    System.out.println("Sent Quote: (" + key + ", " + quote + ")");
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                    // handle the exception
                }
            }
            ++messageNo;
        }
    }
}

class MessageCallBack implements Callback {

    private final long startTime;
    private final String key;
    private final StockQuote quote;

    public MessageCallBack(long startTime, String key, StockQuote quote) {
        this.startTime = startTime;
        this.key = key;
        this.quote = quote;
    }


    public void onCompletion(RecordMetadata metadata, Exception exception) {
        long elapsedTime = System.currentTimeMillis() - startTime;
        if (metadata != null) {
            System.out.println(
                    "quote(" + key + ", " + quote + ") sent to partition(" + metadata.partition() +
                            "), " +
                            "offset(" + metadata.offset() + ") in " + elapsedTime + " ms");
        } else {
            exception.printStackTrace();
        }
    }
}

This class extends Thread so that each message is sent asynchronously to a Topic named "testTopic".

Let's check now the Kafka Consumer class:

package com.masteringintegration.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class SampleKafkaConsumer {

    public static void main(String[] args) {
        String server = "127.0.0.1:9092";
        String groupId = "SampleKafkaConsumer";
        String topic = "testTopic";

        new SampleKafkaConsumer(server, groupId, topic).run();
    }

    // Variables

    private final Logger mLogger = LoggerFactory.getLogger(SampleKafkaConsumer.class.getName());
    private final String mBootstrapServer;
    private final String mGroupId;
    private final String mTopic;

    // Constructor

    SampleKafkaConsumer(String bootstrapServer, String groupId, String topic) {
        mBootstrapServer = bootstrapServer;
        mGroupId = groupId;
        mTopic = topic;
    }

    // Public

    void run() {
        mLogger.info("Creating consumer thread");

        CountDownLatch latch = new CountDownLatch(1);

        ConsumerRunnable consumerRunnable = new ConsumerRunnable(mBootstrapServer, mGroupId, mTopic, latch);
        Thread thread = new Thread(consumerRunnaable);
        thread.start();

        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            mLogger.info("Caught shutdown hook");
            consumerRunnable.shutdown();
            await(latch);

            mLogger.info("Application has exited");
        }));

        await(latch);
    }

    // Private

    void await(CountDownLatch latch) {
        try {
            latch.await();
        } catch (InterruptedException e) {
            mLogger.error("Application got interrupted", e);
        } finally {
            mLogger.info("Application is closing");
        }
    }

    // Inner classes

    private class ConsumerRunnable implements Runnable {

        private CountDownLatch mLatch;
        private KafkaConsumer<String, StockQuote> mConsumer;

        ConsumerRunnable(String bootstrapServer, String groupId, String topic, CountDownLatch latch) {
            mLatch = latch;

            Properties props = consumerProps(bootstrapServer, groupId);
            mConsumer = new KafkaConsumer<>(props);
            mConsumer.subscribe(Collections.singletonList(topic));
        }

        @Override
        public void run() {
            try {
                while (true) {
                    ConsumerRecords<String, StockQuote> records = mConsumer.poll(Duration.ofMillis(100));

                    for (ConsumerRecord<String, StockQuote> record : records) {
                        mLogger.info("Key: " + record.key() + ", Value: " + record.value());
                        mLogger.info("Partition: " + record.partition() + ", Offset: " + record.offset());
                    }
                }
            } catch (WakeupException e) {
                mLogger.info("Received shutdown signal!");
            } finally {
                mConsumer.close();
                mLatch.countDown();
            }
        }

        void shutdown() {
            mConsumer.wakeup();
        }

        private Properties consumerProps(String bootstrapServer, String groupId) {
            String deserializer = "com.masteringintegration.kafka.StockQuoteDeserializer";
            Properties properties = new Properties();
            properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
            properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
            properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer);
            properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

            return properties;
        }
    }
}

The consumer class implements Runnable so that messages can be consumed asynchronously as well.

The Java Bean class which is sent over the wire is StockQuote which is basically an example of a random Stock Quote ticker:

package com.masteringintegration.kafka;

import java.util.Random;
public class StockQuote {
  private int quote;
  private String name;


    public StockQuote() {
        String company[] = new String[] { "Microsoft", "IBM", "Oracle", "Accenture", "HPE" };
        Random rand = new Random();

        quote = rand.nextInt(1000);
        name = company[rand.nextInt(5)];

    }
    public int getQuote() {
        return quote;
    }

    public void setQuote(int quote) {
        this.quote = quote;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    @java.lang.Override
    public java.lang.String toString() {
        return "StockQuote{" +
                "quote=" + quote +
                ", name='" + name + '\'' +
                '}';
    }
}

In order to be sent over the wire, we need a custom Serializer and Deserializer for this class. Here is the StockQuoteDeserializer which implements Deserializer:

package com.masteringintegration.kafka;
import java.util.Map;

import org.apache.kafka.common.serialization.Deserializer;

import com.fasterxml.jackson.databind.ObjectMapper;

public class StockQuoteDeserializer implements Deserializer<StockQuote> {
    @Override public void close() {
    }
    @Override public void configure(Map<String, ?> arg0, boolean arg1) {
    }
    @Override
    public StockQuote deserialize(String arg0, byte[] arg1) {
        ObjectMapper mapper = new ObjectMapper();
        StockQuote quote = null;
        try {
            quote = mapper.readValue(arg1, StockQuote.class);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return quote;
    }
}

And the corresponding StockQuoteSerializer class:

package com.masteringintegration.kafka;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.errors.SerializationException;
import java.util.Map;
import com.fasterxml.jackson.databind.ObjectMapper;

public class StockQuoteSerializer implements Serializera {
    private ObjectMapper objectMapper;
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        this.objectMapper = new ObjectMapper();
    }
    @Override
    public byte[] serialize(String topic, StockQuote quote) {
        try {
            return objectMapper.writeValueAsBytes(quote);
        } catch (Exception e) {
            throw new SerializationException("Error serializing object", e);
        }
    }
    @Override
    public void close() {
    }
}

In addition to Kafka client classes, we have added also Jackson Data-bind dependencies in pom.xml file:

<dependencies>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>2.5.0</version>
    </dependency>

    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-databind</artifactId>
      <version>2.10.3</version>
    </dependency>
 
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-simple</artifactId>
      <version>1.7.25</version>
    </dependency>

  </dependencies>

  <build>
    <plugins>
      <plugin>
        <groupId>org.codehaus.mojo</groupId>
        <artifactId>exec-maven-plugin</artifactId>
        <version>1.6.0</version>
        <executions>
          <execution>
            <id>consumer</id>
            <configuration>
              <mainClass>com.masteringintegration.kafka.SampleKafkaConsumer</mainClass>
            </configuration>
          </execution>
          <execution>
            <id>producer</id>
            <configuration>
              <mainClass>com.masteringintegration.kafka.SampleKafkaProducer</mainClass>
            </configuration>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>

As you can see, we have included two different executions so that you can run both java standalone applications. You can run the Producer as follows:

mvn exec:java@producer

The messages will be sent to the destination:

Sent Quote: (1, StockQuote{quote=281, name='Accenture'})
Sent Quote: (2, StockQuote{quote=458, name='IBM'})
Sent Quote: (3, StockQuote{quote=209, name='Microsoft'})
Sent Quote: (4, StockQuote{quote=764, name='IBM'})
Sent Quote: (5, StockQuote{quote=489, name='IBM'})
Sent Quote: (6, StockQuote{quote=518, name='HPE'})
Sent Quote: (7, StockQuote{quote=708, name='Oracle'})
Sent Quote: (8, StockQuote{quote=921, name='Accenture'})
Sent Quote: (9, StockQuote{quote=964, name='Oracle'})
Sent Quote: (10, StockQuote{quote=706, name='Accenture'})
 . . . . .

Now start the Consumer application:

mvn exec:java@consumer

You will see that messages are consumed from the Topic:

Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Key: , Value: StockQuote{quote=890, name='Microsoft'}
[Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Partition: 0, Offset: 24772
[Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Key: , Value: StockQuote{quote=908, name='HPE'}
[Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Partition: 0, Offset: 24773
[Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Key: , Value: StockQuote{quote=763, name='HPE'}
[Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Partition: 0, Offset: 24774
[Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Key: , Value: StockQuote{quote=841, name='Accenture'}
[Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Partition: 0, Offset: 24775
[Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Key: , Value: StockQuote{quote=971, name='HPE'}
[Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Partition: 0, Offset: 24776
[Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Key: , Value: StockQuote{quote=176, name='Microsoft'}
[Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Partition: 0, Offset: 24777
[Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Key:, Value: StockQuote{quote=945, name='Microsoft'}
[Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Partition: 0, Offset: 24778
[Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Key: 	, Value: StockQuote{quote=201, name='Oracle'}

 . . . .

Congratulations! You have just managed to connect to Kafka cluster using a Java Producer and a Consumer and custom Java objects.

Source code for this tutorial: https://github.com/fmarchioni/masteringintegration/tree/master/kafka/java-custom-objects