Consuming data streams using Apache Kafka
Prerequisites:
Learn how to deploy the Apache Kafka platform to implement real-time data streaming between systems
What you’ll learn
You will learn about Kafka, a platform that handles real-time data streams that allows for more elegant and efficient communication between systems. You will then learn to implement a Kafka solution. First, you will create a producer that will generate records and push them to a Kafka server. Then you will create a consumer that will pull those records from the server and process them.
What is Kafka
Apache Kafka is a stream-processing platform that manages communication in distributed systems. Communication is message-oriented, and follows the publish-subscribe model. Kafka allows for real-time stream processing and distributed, replicated storage of streams and messages.
A stream of records is called a topic which is stored as a partitioned log. Records are added to the logs by the producer, which will add records to the ends of each partition along with a timestamp. Consumers will then read data from these logs, starting from the beginning of each one. Once a record has been consumed, it will be removed after a configured retention period, typically 24 hours, has passed. Records are removed starting from the head of the partition (FIFO). Simply put, a topic is a collection of different queues of records. Multiple consumers can read from the same partition and each consumer will have its own offset value to keep track of where it is reading from the partition. Each record will only be read once and there are multiple consumers active in the same partition, so you may wonder if this system is error prone and the answer is no. Kafka uses Apache Zookeeper to store data about the consumers and partition. Zookeeper keeps track of, among other things, the already consumed records, meaning we can avoid duplicate consumptions and avoid faults in this respect.
Topics run on servers that are called Kafka brokers, and groups of these brokers form a cluster. These brokers then act as the contact points for the producers and the consumers. The producers will, in batches, push messages to the broker and similarly, the consumers will pull records from the broker. In your application, you will have to create a producer and consumer that will interact with each other through the Kafka server.
Application
The application you will be working with is a job manager that maintains an inventory of available systems.
It consists of four microservices, gateway, job, system, and inventory.
The job microservice allows you to dispatch jobs that will be run by the system microservice.
The job is a sleep operation used to represent a slow task that lasts for a duration of 5 to 10 seconds. When it completes, the system microservice
reports the sleep time as the result of this job. In addition to running jobs, the system microservice also registers
itself on startup with the inventory microservice that keeps track of all instances of the system microservice. Finally,
the gateway microservice is a backend for frontend service.
It communicates with the backend job and inventory microservices on the caller’s behalf.
The two microservices you will modify are the system and inventory services. The inventory service monitors the status of instances of the system service.
The implementations of the application and its services are provided for you in the start/src directory.
Getting started
The fastest way to work through this guide is to clone the Git repository and use the projects that are provided inside:
git clone https://github.com/openliberty/guide-kafka-intro.git
cd guide-kafka-intro
The start directory contains the starting project that you will build upon.
The finish directory contains the finished project that you will build.
Before you begin, make sure you have all the necessary prerequisites.
Configuring the producer
To begin, we have to set up our producer so that we can generate data for our system. The producer has to generate records and push them to the Kafka broker.
#Replace the `SystemProducer` class.#
`system/src/main/java/io/openliberty/guides/system/SystemProducer.java`
SystemProducer.java
First, we have to configure the properties of the producer of which we require at least three. The first is setting the bootstrap server property. This property is set with the ProducerConfig.BOOTSTRAP_SERVERS_CONFIG which creates the connection between the cluster and the producer. The next two properties are the ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG and the ProducerConfig.VALUE_SERIALIZER_CONFIG. These are required so that the system knows what kind of serializer to use to convert your records into bytes. In this case, we are using StringSerializer because the keys and values of our produced records will be of type String. These properties are then used to create the actual producer by calling KafkaProducer()
Finally, we create the sendMessage() method to create a ProducerRecord, which will be serialized and sent to the broker. Notice how the actual sending is performed by the built-in send() and not our defined sendMessage() method. The former is asynchronous and will have a result containing the partition the record was sent to, its offset within the partition as well as its timestamp.
Producers can batch records before they are actually sent. Once send() completes, records are put in the buffer and the method immediately returns so more records may be fetched. This means that multiple records can be grouped together and sent at once instead of sending one at a time. This allows Kafka to achieve better overall throughput while only paying a small latency penalty.
Producers support a lot more functionality that is not shown in this example. We can set an acks configuration so that transactions are only complete if we receive an acknowledgement, allowing us to trade speed for consistency. We can also configure the buffer size, allowing us to reduce our total number of transactions with the broker and increasing efficiency. You can read about these configurations and more in the KafkaProducer documentation
Configuring the consumer
Similarly to the producer, we must first set some basic configurations before actually creating an instance of the KafkaConsumer class.
#Replace the `SystemConsumer` class.# `inventory/src/main/java/io/openliberty/guides/inventory/SystemConsumer.java`
SystemConsumer.java
We have the three equivalent minimal required configurations: ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ConsumerConfig.KEY_SERIALIZER_CLASS_CONFIG and ConsumerConfig.VALUE_SERIALIZER_CLASS_CONFIG. On top of these three, we need some additional configurations. One required configuration is the ConsumerConfig.GROUP_ID_CONFIG, which allows you to group your various consumers for better synchronization. The final configuration in this consumer is the ConsumerConfig.AUTO_OFFSET_RESET_CONFIG which defines the behaviour of the consumer when its offset (index in partition) is not valid or defined. So it helps handle the offset when the consumer is initialized or when its offset is out of index. In this case, we reset the offset its earliest valid offset. You can add additional configurations to suit your needs in the KafkaConsumer documentation
After the properties have been set and the consumer created, the consumer is connected to the producer via the subscribe() method. The consumer subscribes to the producer, so that it can retrieve records after they are created. Subscribers are not explicitly informed of updates, instead they must poll the Kafka server for changes. This is done using the Consumer.poll() method in the Kafka.clients.consumer.Consumer class. In this case, the method is called in the consumeMessages() method and takes an argument of Duration.ofSeconds(3) which tells the consumer to only poll every 3 seconds. Records are stored as ConsumerRecords<T> objects and can then be manipulated however we want. The consumer will then commit its offset to the broker to indicate the number of consecutive records that have been committed. This is done using the commitAsync() method. Kafka also supports automatic commits with the enable.auto.commit = true property as well as synchronous manual commits with the commitSync() method. Without this call, the consumers index in each partition will never be updated.
The retrieved records are then parsed from JSON into a Properties class and the system status will be updated depending on the results in the record.
Big Picture
What you have made is a simple connection that moves a service status from one service to another. In the overall application, there are more Kafka connections in the whole application that we have not explicitly shown you. There are two more sets of producers and consumers between the Job and System services. The Job service has a producer (JobProducer) that creates new jobs that are published to a Kafka broker, and SystemRunnable contains the System service’s consumer that will take those jobs, and process them. SystemProducer (note: same producer as before) will then reciprocate and push the results to the Kafka broker. Then the SystemConsumer class and the JobConsumer class will consume their respective data. So in total, there are 3 Kafka connections in this application, of which you created one.
Running the application
Testing the application
A test has been provided for you to test the basic functionality of the consumer. If the test fails, then you may have introduced a bug into the code.
Create theInventoryEndpointITclass.inventory/src/test/java/it/io/openliberty/guides/inventory/InventoryEndpointIT.java
InventoryEndpointIT.java
Prerequisites:
Great work! You're done!
What did you think of this guide?
Thank you for your feedback!
What could make this guide better?
Raise an issue to share feedback
Create a pull request to contribute to this guide
Need help?
Ask a question on Stack Overflow