<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:content="http://purl.org/rss/1.0/modules/content/"
    xmlns:atom="http://www.w3.org/2005/Atom" xmlns:media="http://search.yahoo.com/mrss/" version="2.0">
    <channel>
        
        <title>
            <![CDATA[ Apache Kafka - freeCodeCamp.org ]]>
        </title>
        <description>
            <![CDATA[ Browse thousands of programming tutorials written by experts. Learn Web Development, Data Science, DevOps, Security, and get developer career advice. ]]>
        </description>
        <link>https://www.freecodecamp.org/news/</link>
        <image>
            <url>https://cdn.freecodecamp.org/universal/favicons/favicon.png</url>
            <title>
                <![CDATA[ Apache Kafka - freeCodeCamp.org ]]>
            </title>
            <link>https://www.freecodecamp.org/news/</link>
        </image>
        <generator>Eleventy</generator>
        <lastBuildDate>Wed, 27 May 2026 16:22:05 +0000</lastBuildDate>
        <atom:link href="https://www.freecodecamp.org/news/tag/apache-kafka/rss.xml" rel="self" type="application/rss+xml" />
        <ttl>60</ttl>
        
            <item>
                <title>
                    <![CDATA[ How to Build a Real-Time Notification System with Go and Kafka ]]>
                </title>
                <description>
                    <![CDATA[ By Hermann Rösch These days, applications need to have instant data access and processing capabilities. Whether it's updating real-time stock trades for financial institutions or navigating through live traffic data, the ability to process and react ... ]]>
                </description>
                <link>https://www.freecodecamp.org/news/build-a-real-time-notification-system-with-go-and-kafka/</link>
                <guid isPermaLink="false">66d45f313dce891ac3a96802</guid>
                
                    <category>
                        <![CDATA[ Apache Kafka ]]>
                    </category>
                
                    <category>
                        <![CDATA[ golang ]]>
                    </category>
                
                    <category>
                        <![CDATA[ message broker ]]>
                    </category>
                
                <dc:creator>
                    <![CDATA[ freeCodeCamp ]]>
                </dc:creator>
                <pubDate>Fri, 25 Aug 2023 21:41:27 +0000</pubDate>
                <media:content url="https://www.freecodecamp.org/news/content/images/2023/08/Go_Kafka_Vert_Rev2.svg" medium="image" />
                <content:encoded>
                    <![CDATA[ <p>By Hermann Rösch</p>
<p>These days, applications need to have instant data access and processing capabilities. Whether it's updating real-time stock trades for financial institutions or navigating through live traffic data, the ability to process and react to data in real time is crucial.</p>
<p>In this tutorial, you’ll delve into the mechanics of <strong>Kafka</strong> and then integrate it with <strong>Go</strong> to develop a real-time notification system.</p>
<p>In order to understand this article fully, you should have prior knowledge of <a target="_blank" href="https://go.dev/tour/concurrency/1#:~:text=A%20goroutine%20is%20a%20lightweight,happens%20in%20the%20new%20goroutine."><strong>Goroutines</strong></a>, the <a target="_blank" href="https://gin-gonic.com/docs/introduction/"><strong>Gin</strong></a> framework, and containerization tools like <a target="_blank" href="https://www.docker.com/"><strong>Docker</strong></a>.</p>
<h2 id="heading-table-of-contents">Table of Contents</h2>
<ol>
<li><a class="post-section-overview" href="#heading-what-is-kafka">What is Kafka?</a></li>
<li><a class="post-section-overview" href="#heading-how-to-set-up-the-project-workspace">How to Set Up the Project Workspace</a></li>
<li><a class="post-section-overview" href="#heading-how-to-create-the-user-and-notification-models">How to Create the User and Notification Models</a></li>
<li><a class="post-section-overview" href="#heading-how-to-set-up-the-kafka-producer">How to Set Up the Kafka Producer</a></li>
<li><a class="post-section-overview" href="#heading-how-to-set-up-the-kafka-consumer">How to Set Up the Kafka Consumer</a></li>
<li><a class="post-section-overview" href="#heading-lets-test-the-real-time-notification-system">Let's Test the Real-Time Notification System</a></li>
<li><a class="post-section-overview" href="#heading-wrapping-up">Wrapping Up</a></li>
</ol>
<h2 id="heading-what-is-kafka">What is Kafka? 🤔</h2>
<p><a target="_blank" href="https://kafka.apache.org/intro">Kafka</a> is a distributed event streaming platform. Initially developed by LinkedIn, Kafka was subsequently contributed to the Apache Software Foundation and made open-source. This transition marked its role as a key participant in real-time data streaming.</p>
<p>More than a simple communication tool, Kafka is an “event broker” — a system that controls and handles events or messages between various applications or services. It can handle massive daily event volumes as a distributed event streaming platform, guaranteeing that data is seamlessly transported and analyzed in real time.</p>
<p>Apart from its fundamental role as an event broker, Kafka offers durability, scalability, and fault-tolerance features. It also helps ensure that large-scale data streams are managed efficiently and reliably with very low latency.</p>
<h3 id="heading-kafkas-core-components">Kafka’s core components ⚙️</h3>
<p>Now that you’ve been acquainted with Kafka, let’s dive into the main elements that make up its architecture:</p>
<h4 id="heading-events">Events</h4>
<p>An event records the fact that “something happened”. It can be thought of as a message or a piece of data representing a change or an action. In the context of our real-time notification system, you could consider an event as follows:</p>
<ul>
<li>Event key: <code>“1”</code>   (representing the user <strong>ID</strong> for <strong>Emma</strong>)</li>
<li>Event value: <code>“Bruno started following you.”</code></li>
</ul>
<h4 id="heading-brokers">Brokers</h4>
<p>A Kafka broker is a server that runs the Kafka software and stores data. While large-scale production setups often involve multiple brokers across several machines, you’ll use a single broker setup for this tutorial.</p>
<h4 id="heading-topics">Topics</h4>
<p>Topics in Kafka are similar to folders in a filesystem. They represent categories under which data or events are stored. For instance, an example topic name could be <code>"notifications"</code>.</p>
<h4 id="heading-producers">Producers</h4>
<p>Producers are entities that publish (write) or send messages to Kafka, such as a Go program or a service. When a producer has an event to send, it chooses a topic to address the event to.</p>
<h4 id="heading-consumers">Consumers</h4>
<p>Consumers read and process events or messages from Kafka. After producers send messages to topics, consumers can subscribe to one or more topics to receive the messages.</p>
<h4 id="heading-partitions">Partitions</h4>
<p>Each topic in Kafka can be further divided into partitions. Think of partitions as segments within a topic that enable Kafka to manage data more efficiently, especially in setups with multiple brokers. </p>
<p>You’ll stick with a basic configuration without delving into multiple partitions, but you should understand their role in larger Kafka deployments.</p>
<h4 id="heading-consumer-groups">Consumer groups</h4>
<p>While individual consumers handle messages from specific partitions, consumer groups manage coordination across multiple consumers. </p>
<p>A consumer group consists of multiple consumers collaboratively processing messages from different partitions of a topic. This ensures that each message from a partition is processed by just one consumer in the group, allowing for efficient and scalable consumption. </p>
<p>Think of it as a team of consumers working together, with each member responsible for messages from specific partitions, ensuring no message is overlooked.</p>
<h4 id="heading-replicas">Replicas</h4>
<p>Replication ensures data safety. In larger Kafka deployments, storing multiple data replicas is common to help recover from unexpected failures. </p>
<p>You won’t be using replicas in this tutorial, but it’s beneficial to understand their significance in ensuring data durability in Kafka.</p>
<h4 id="heading-kraft">KRaft</h4>
<p><a target="_blank" href="https://developer.confluent.io/learn/kraft/">KRaft</a> is Kafka’s own consensus protocol introduced to eliminate the need for <a target="_blank" href="https://kafka.apache.org/documentation/#zk_depr">ZooKeeper</a>. In short, KRaft manages metadata directly within Kafka, providing scalability, simplicity, and improved failover, among other benefits.</p>
<p>To tie all these components together, here’s a visual representation of Kafka’s core architecture, illustrating a broker, topics, partitions, and consumer groups:</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2023/08/Kafka_Architecture_Transparent_V2.png" alt="Image" width="600" height="400" loading="lazy">
<em>Visualizing Kafka’s event streaming Architecture</em></p>
<h2 id="heading-how-to-set-up-the-project-workspace">How to Set Up the Project Workspace 👨‍💻👩‍💻</h2>
<p>Enough theory for now! Let’s get our hands dirty with the actual project.</p>
<p>Assuming that you have Docker and Go installed on your machine, let’s create a directory for the project named <code>kafka-notify</code><em>.</em> Then, you will pull <a target="_blank" href="https://hub.docker.com/r/bitnami/kafka/"><strong>Bitnami’s Kafka Docker image</strong></a> for the Kafka setup, providing a hassle-free installation:</p>
<pre><code class="lang-bash">mkdir kafka-notify &amp;&amp; <span class="hljs-built_in">cd</span> kafka-notify

curl -sSL \
https://raw.githubusercontent.com/bitnami/containers/main/bitnami/kafka/docker-compose.yml &gt; docker-compose.yml
</code></pre>
<p>Before starting the Kafka broker, there’s a slight modification required in the docker-compose.yml file. Find the following string:</p>
<pre><code class="lang-yaml"><span class="hljs-string">KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092</span>
</code></pre>
<p>And replace it with:</p>
<pre><code class="lang-yaml"><span class="hljs-string">KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092</span>
</code></pre>
<p>The above change ensures Kafka advertises its listener on <code>localhost</code>, which allows our local Go application to connect seamlessly. Now, you can start the Kafka broker via the following Docker command:</p>
<pre><code class="lang-bash">docker-compose up -d
</code></pre>
<p>Next, you’ll need to create a few directories to organize the project files. The <code>cmd/producer</code> and <code>cmd/consumer</code> directories will contain the main application files, and the <code>pkg/models</code> directory will store the model declarations:</p>
<pre><code class="lang-bash">mkdir -p cmd/producer cmd/consumer pkg/models
</code></pre>
<p>The last step is to initialize Go modules and install external packages. You’ll be using <code>sarama</code> to establish a connection with the Kafka broker and <code>gin</code> to handle the API endpoints for the notification system:</p>
<pre><code class="lang-bash">go mod init kafka-notify
go get github.com/IBM/sarama github.com/gin-gonic/gin
</code></pre>
<h2 id="heading-how-to-create-the-user-and-notification-models">How to Create the User and Notification Models</h2>
<p>With the workspace set up, the first step is to create the <code>User</code> and <code>Notification</code> structs. Move to the <code>pkg/models</code> directory, then create a new file named <code>models.go</code> and declare these structs within it:</p>
<pre><code class="lang-go"><span class="hljs-keyword">package</span> models

<span class="hljs-keyword">type</span> User <span class="hljs-keyword">struct</span> {
    ID   <span class="hljs-keyword">int</span>    <span class="hljs-string">`json:"id"`</span>
    Name <span class="hljs-keyword">string</span> <span class="hljs-string">`json:"name"`</span>
}

<span class="hljs-keyword">type</span> Notification <span class="hljs-keyword">struct</span> {
    From    User   <span class="hljs-string">`json:"from"`</span>
    To      User   <span class="hljs-string">`json:"to"`</span>
    Message <span class="hljs-keyword">string</span> <span class="hljs-string">`json:"message"`</span>
}
</code></pre>
<h2 id="heading-how-to-set-up-the-kafka-producer">How to Set Up the Kafka Producer 📤</h2>
<p>The next step is to write the code for the producer. You’ll create a simple Gin web API where a user can send a notification to another user via an HTTP POST request. This request will then “produce” (send) a message to a Kafka topic named <code>"notifications"</code>.</p>
<p>Let’s navigate to the <code>cmd/producer</code> directory and create a new file named <code>producer.go</code>. Within it, you’ll set up the producer logic:</p>
<pre><code class="lang-go"><span class="hljs-keyword">package</span> main

<span class="hljs-keyword">import</span> (
    <span class="hljs-string">"encoding/json"</span>
    <span class="hljs-string">"errors"</span>
    <span class="hljs-string">"fmt"</span>
    <span class="hljs-string">"log"</span>
    <span class="hljs-string">"net/http"</span>
    <span class="hljs-string">"strconv"</span>

    <span class="hljs-string">"kafka-notify/pkg/models"</span>

    <span class="hljs-string">"github.com/IBM/sarama"</span>
    <span class="hljs-string">"github.com/gin-gonic/gin"</span>
)

<span class="hljs-keyword">const</span> (
    ProducerPort       = <span class="hljs-string">":8080"</span>
    KafkaServerAddress = <span class="hljs-string">"localhost:9092"</span>
    KafkaTopic         = <span class="hljs-string">"notifications"</span>
)

<span class="hljs-comment">// ============== HELPER FUNCTIONS ==============</span>
<span class="hljs-keyword">var</span> ErrUserNotFoundInProducer = errors.New(<span class="hljs-string">"user not found"</span>)

<span class="hljs-function"><span class="hljs-keyword">func</span> <span class="hljs-title">findUserByID</span><span class="hljs-params">(id <span class="hljs-keyword">int</span>, users []models.User)</span> <span class="hljs-params">(models.User, error)</span></span> {
    <span class="hljs-keyword">for</span> _, user := <span class="hljs-keyword">range</span> users {
        <span class="hljs-keyword">if</span> user.ID == id {
            <span class="hljs-keyword">return</span> user, <span class="hljs-literal">nil</span>
        }
    }
    <span class="hljs-keyword">return</span> models.User{}, ErrUserNotFoundInProducer
}

<span class="hljs-function"><span class="hljs-keyword">func</span> <span class="hljs-title">getIDFromRequest</span><span class="hljs-params">(formValue <span class="hljs-keyword">string</span>, ctx *gin.Context)</span> <span class="hljs-params">(<span class="hljs-keyword">int</span>, error)</span></span> {
    id, err := strconv.Atoi(ctx.PostForm(formValue))
    <span class="hljs-keyword">if</span> err != <span class="hljs-literal">nil</span> {
        <span class="hljs-keyword">return</span> <span class="hljs-number">0</span>, fmt.Errorf(
            <span class="hljs-string">"failed to parse ID from form value %s: %w"</span>, formValue, err)
    }
    <span class="hljs-keyword">return</span> id, <span class="hljs-literal">nil</span>
}

<span class="hljs-comment">// ============== KAFKA RELATED FUNCTIONS ==============</span>
<span class="hljs-function"><span class="hljs-keyword">func</span> <span class="hljs-title">sendKafkaMessage</span><span class="hljs-params">(producer sarama.SyncProducer,
    users []models.User, ctx *gin.Context, fromID, toID <span class="hljs-keyword">int</span>)</span> <span class="hljs-title">error</span></span> {
    message := ctx.PostForm(<span class="hljs-string">"message"</span>)

    fromUser, err := findUserByID(fromID, users)
    <span class="hljs-keyword">if</span> err != <span class="hljs-literal">nil</span> {
        <span class="hljs-keyword">return</span> err
    }

    toUser, err := findUserByID(toID, users)
    <span class="hljs-keyword">if</span> err != <span class="hljs-literal">nil</span> {
        <span class="hljs-keyword">return</span> err
    }

    notification := models.Notification{
        From: fromUser,
        To:   toUser, Message: message,
    }

    notificationJSON, err := json.Marshal(notification)
    <span class="hljs-keyword">if</span> err != <span class="hljs-literal">nil</span> {
        <span class="hljs-keyword">return</span> fmt.Errorf(<span class="hljs-string">"failed to marshal notification: %w"</span>, err)
    }

    msg := &amp;sarama.ProducerMessage{
        Topic: KafkaTopic,
        Key:   sarama.StringEncoder(strconv.Itoa(toUser.ID)),
        Value: sarama.StringEncoder(notificationJSON),
    }

    _, _, err = producer.SendMessage(msg)
    <span class="hljs-keyword">return</span> err
}

<span class="hljs-function"><span class="hljs-keyword">func</span> <span class="hljs-title">sendMessageHandler</span><span class="hljs-params">(producer sarama.SyncProducer,
    users []models.User)</span> <span class="hljs-title">gin</span>.<span class="hljs-title">HandlerFunc</span></span> {
    <span class="hljs-keyword">return</span> <span class="hljs-function"><span class="hljs-keyword">func</span><span class="hljs-params">(ctx *gin.Context)</span></span> {
        fromID, err := getIDFromRequest(<span class="hljs-string">"fromID"</span>, ctx)
        <span class="hljs-keyword">if</span> err != <span class="hljs-literal">nil</span> {
            ctx.JSON(http.StatusBadRequest, gin.H{<span class="hljs-string">"message"</span>: err.Error()})
            <span class="hljs-keyword">return</span>
        }

        toID, err := getIDFromRequest(<span class="hljs-string">"toID"</span>, ctx)
        <span class="hljs-keyword">if</span> err != <span class="hljs-literal">nil</span> {
            ctx.JSON(http.StatusBadRequest, gin.H{<span class="hljs-string">"message"</span>: err.Error()})
            <span class="hljs-keyword">return</span>
        }

        err = sendKafkaMessage(producer, users, ctx, fromID, toID)
        <span class="hljs-keyword">if</span> errors.Is(err, ErrUserNotFoundInProducer) {
            ctx.JSON(http.StatusNotFound, gin.H{<span class="hljs-string">"message"</span>: <span class="hljs-string">"User not found"</span>})
            <span class="hljs-keyword">return</span>
        }
        <span class="hljs-keyword">if</span> err != <span class="hljs-literal">nil</span> {
            ctx.JSON(http.StatusInternalServerError, gin.H{
                <span class="hljs-string">"message"</span>: err.Error(),
            })
            <span class="hljs-keyword">return</span>
        }

        ctx.JSON(http.StatusOK, gin.H{
            <span class="hljs-string">"message"</span>: <span class="hljs-string">"Notification sent successfully!"</span>,
        })
    }
}

<span class="hljs-function"><span class="hljs-keyword">func</span> <span class="hljs-title">setupProducer</span><span class="hljs-params">()</span> <span class="hljs-params">(sarama.SyncProducer, error)</span></span> {
    config := sarama.NewConfig()
    config.Producer.Return.Successes = <span class="hljs-literal">true</span>
    producer, err := sarama.NewSyncProducer([]<span class="hljs-keyword">string</span>{KafkaServerAddress},
        config)
    <span class="hljs-keyword">if</span> err != <span class="hljs-literal">nil</span> {
        <span class="hljs-keyword">return</span> <span class="hljs-literal">nil</span>, fmt.Errorf(<span class="hljs-string">"failed to setup producer: %w"</span>, err)
    }
    <span class="hljs-keyword">return</span> producer, <span class="hljs-literal">nil</span>
}

<span class="hljs-function"><span class="hljs-keyword">func</span> <span class="hljs-title">main</span><span class="hljs-params">()</span></span> {
    users := []models.User{
        {ID: <span class="hljs-number">1</span>, Name: <span class="hljs-string">"Emma"</span>},
        {ID: <span class="hljs-number">2</span>, Name: <span class="hljs-string">"Bruno"</span>},
        {ID: <span class="hljs-number">3</span>, Name: <span class="hljs-string">"Rick"</span>},
        {ID: <span class="hljs-number">4</span>, Name: <span class="hljs-string">"Lena"</span>},
    }

    producer, err := setupProducer()
    <span class="hljs-keyword">if</span> err != <span class="hljs-literal">nil</span> {
        log.Fatalf(<span class="hljs-string">"failed to initialize producer: %v"</span>, err)
    }
    <span class="hljs-keyword">defer</span> producer.Close()

    gin.SetMode(gin.ReleaseMode)
    router := gin.Default()
    router.POST(<span class="hljs-string">"/send"</span>, sendMessageHandler(producer, users))

    fmt.Printf(<span class="hljs-string">"Kafka PRODUCER 📨 started at http://localhost%s\n"</span>,
        ProducerPort)

    <span class="hljs-keyword">if</span> err := router.Run(ProducerPort); err != <span class="hljs-literal">nil</span> {
        log.Printf(<span class="hljs-string">"failed to run the server: %v"</span>, err)
    }
}
</code></pre>
<p>Let’s break down the Kafka-related components within <code>producer.go</code>:</p>
<p>Within the <code>**setupProducer()**</code> function:</p>
<ul>
<li><code>config := sarama.NewConfig()</code>: Initializes a new default configuration for Kafka. Think of it as setting up the parameters before connecting to the broker.</li>
<li><code>config.Producer.Return.Successes = true</code>: Ensures that the producer receives an acknowledgment once the message is successfully stored in the <code>"notifications"</code> topic.</li>
<li><code>producer, err := sarama.NewSyncProducer(…)</code>: Initializes a synchronous Kafka producer that connects to the Kafka broker running at <code>localhost:9092</code>.</li>
</ul>
<p>Inside the <code>**sendKafkaMessage()**</code> function:</p>
<ul>
<li>This function starts by retrieving the <code>message</code> from the context and then attempts to find both the sender and the recipient using their IDs.</li>
<li><code>notification := models.Notification{…}</code>: Initializes a <code>Notification</code> struct that encapsulates information about the sender, the recipient, and the actual message.</li>
<li><code>msg := &amp;sarama.ProducerMessage{…}</code>: Constructs a <code>ProducerMessage</code> for the <code>"notifications"</code> topic, setting the recipient’s ID as the <code>Key</code> and the message content, which is the serialized form of the <code>Notification</code> as the <code>Value</code>.</li>
<li><code>producer.SendMessage(msg)</code>: Sends the constructed message to the <code>"notifications"</code> topic.</li>
</ul>
<p>In the <code>**sendMessageHandler()**</code> function:</p>
<ul>
<li>This function serves as an endpoint handler for the <code>/send</code> POST request. It processes the incoming request to ensure valid sender and recipient IDs are provided.</li>
<li>After fetching the IDs, it invokes the <code>sendKafkaMessage()</code> function to send the Kafka message. Depending on the result, it dispatches appropriate HTTP responses: a <code>404 Not Found</code> for nonexistent users, a <code>400 Bad Request</code> for invalid IDs, and a <code>500 Internal Server Error</code> for other failures, along with a specific error message.</li>
</ul>
<p>Finally, within the <code>**main()**</code> function:</p>
<ul>
<li>You initialize a Kafka producer via the <code>setupProducer()</code> function.</li>
<li>Then, you create a Gin <code>router</code> via <code>gin.Default()</code>, setting up a web server. Next, you define a POST endpoint <code>/send</code> to handle notifications. This endpoint expects the sender and recipient’s IDs and the message content.</li>
<li>The notification is processed upon receiving a POST request via the <code>sendMessageHandler()</code> function, and an appropriate HTTP response is dispatched.</li>
</ul>
<p>The above setup provides a simple way to simulate users sending notifications to each other and showcases how these notifications are produced to the <code>"notifications"</code> topic.</p>
<h2 id="heading-how-to-set-up-the-kafka-consumer">How to Set Up the Kafka Consumer 📥</h2>
<p>After creating the producer, the next step is to set up a consumer that listens to the <code>"notifications"</code> topic and provides an endpoint to list notifications for a specific user.</p>
<p>Let’s move to the <code>cmd/consumer</code> directory and create a new file named <code>consumer.go</code>. Within it, you’ll set up the consumer logic and the Gin-based API:</p>
<pre><code class="lang-go"><span class="hljs-keyword">package</span> main

<span class="hljs-keyword">import</span> (
    <span class="hljs-string">"context"</span>
    <span class="hljs-string">"encoding/json"</span>
    <span class="hljs-string">"errors"</span>
    <span class="hljs-string">"fmt"</span>
    <span class="hljs-string">"log"</span>
    <span class="hljs-string">"net/http"</span>
    <span class="hljs-string">"sync"</span>

    <span class="hljs-string">"kafka-notify/pkg/models"</span>

    <span class="hljs-string">"github.com/IBM/sarama"</span>
    <span class="hljs-string">"github.com/gin-gonic/gin"</span>
)

<span class="hljs-keyword">const</span> (
    ConsumerGroup      = <span class="hljs-string">"notifications-group"</span>
    ConsumerTopic      = <span class="hljs-string">"notifications"</span>
    ConsumerPort       = <span class="hljs-string">":8081"</span>
    KafkaServerAddress = <span class="hljs-string">"localhost:9092"</span>
)

<span class="hljs-comment">// ============== HELPER FUNCTIONS ==============</span>
<span class="hljs-keyword">var</span> ErrNoMessagesFound = errors.New(<span class="hljs-string">"no messages found"</span>)

<span class="hljs-function"><span class="hljs-keyword">func</span> <span class="hljs-title">getUserIDFromRequest</span><span class="hljs-params">(ctx *gin.Context)</span> <span class="hljs-params">(<span class="hljs-keyword">string</span>, error)</span></span> {
    userID := ctx.Param(<span class="hljs-string">"userID"</span>)
    <span class="hljs-keyword">if</span> userID == <span class="hljs-string">""</span> {
        <span class="hljs-keyword">return</span> <span class="hljs-string">""</span>, ErrNoMessagesFound
    }
    <span class="hljs-keyword">return</span> userID, <span class="hljs-literal">nil</span>
}

<span class="hljs-comment">// ====== NOTIFICATION STORAGE ======</span>
<span class="hljs-keyword">type</span> UserNotifications <span class="hljs-keyword">map</span>[<span class="hljs-keyword">string</span>][]models.Notification

<span class="hljs-keyword">type</span> NotificationStore <span class="hljs-keyword">struct</span> {
    data UserNotifications
    mu   sync.RWMutex
}

<span class="hljs-function"><span class="hljs-keyword">func</span> <span class="hljs-params">(ns *NotificationStore)</span> <span class="hljs-title">Add</span><span class="hljs-params">(userID <span class="hljs-keyword">string</span>,
    notification models.Notification)</span></span> {
    ns.mu.Lock()
    <span class="hljs-keyword">defer</span> ns.mu.Unlock()
    ns.data[userID] = <span class="hljs-built_in">append</span>(ns.data[userID], notification)
}

<span class="hljs-function"><span class="hljs-keyword">func</span> <span class="hljs-params">(ns *NotificationStore)</span> <span class="hljs-title">Get</span><span class="hljs-params">(userID <span class="hljs-keyword">string</span>)</span> []<span class="hljs-title">models</span>.<span class="hljs-title">Notification</span></span> {
    ns.mu.RLock()
    <span class="hljs-keyword">defer</span> ns.mu.RUnlock()
    <span class="hljs-keyword">return</span> ns.data[userID]
}

<span class="hljs-comment">// ============== KAFKA RELATED FUNCTIONS ==============</span>
<span class="hljs-keyword">type</span> Consumer <span class="hljs-keyword">struct</span> {
    store *NotificationStore
}

<span class="hljs-function"><span class="hljs-keyword">func</span> <span class="hljs-params">(*Consumer)</span> <span class="hljs-title">Setup</span><span class="hljs-params">(sarama.ConsumerGroupSession)</span> <span class="hljs-title">error</span></span>   { <span class="hljs-keyword">return</span> <span class="hljs-literal">nil</span> }
<span class="hljs-function"><span class="hljs-keyword">func</span> <span class="hljs-params">(*Consumer)</span> <span class="hljs-title">Cleanup</span><span class="hljs-params">(sarama.ConsumerGroupSession)</span> <span class="hljs-title">error</span></span> { <span class="hljs-keyword">return</span> <span class="hljs-literal">nil</span> }

<span class="hljs-function"><span class="hljs-keyword">func</span> <span class="hljs-params">(consumer *Consumer)</span> <span class="hljs-title">ConsumeClaim</span><span class="hljs-params">(
    sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim)</span> <span class="hljs-title">error</span></span> {
    <span class="hljs-keyword">for</span> msg := <span class="hljs-keyword">range</span> claim.Messages() {
        userID := <span class="hljs-keyword">string</span>(msg.Key)
        <span class="hljs-keyword">var</span> notification models.Notification
        err := json.Unmarshal(msg.Value, &amp;notification)
        <span class="hljs-keyword">if</span> err != <span class="hljs-literal">nil</span> {
            log.Printf(<span class="hljs-string">"failed to unmarshal notification: %v"</span>, err)
            <span class="hljs-keyword">continue</span>
        }
        consumer.store.Add(userID, notification)
        sess.MarkMessage(msg, <span class="hljs-string">""</span>)
    }
    <span class="hljs-keyword">return</span> <span class="hljs-literal">nil</span>
}

<span class="hljs-function"><span class="hljs-keyword">func</span> <span class="hljs-title">initializeConsumerGroup</span><span class="hljs-params">()</span> <span class="hljs-params">(sarama.ConsumerGroup, error)</span></span> {
    config := sarama.NewConfig()

    consumerGroup, err := sarama.NewConsumerGroup(
        []<span class="hljs-keyword">string</span>{KafkaServerAddress}, ConsumerGroup, config)
    <span class="hljs-keyword">if</span> err != <span class="hljs-literal">nil</span> {
        <span class="hljs-keyword">return</span> <span class="hljs-literal">nil</span>, fmt.Errorf(<span class="hljs-string">"failed to initialize consumer group: %w"</span>, err)
    }

    <span class="hljs-keyword">return</span> consumerGroup, <span class="hljs-literal">nil</span>
}

<span class="hljs-function"><span class="hljs-keyword">func</span> <span class="hljs-title">setupConsumerGroup</span><span class="hljs-params">(ctx context.Context, store *NotificationStore)</span></span> {
    consumerGroup, err := initializeConsumerGroup()
    <span class="hljs-keyword">if</span> err != <span class="hljs-literal">nil</span> {
        log.Printf(<span class="hljs-string">"initialization error: %v"</span>, err)
    }
    <span class="hljs-keyword">defer</span> consumerGroup.Close()

    consumer := &amp;Consumer{
        store: store,
    }

    <span class="hljs-keyword">for</span> {
        err = consumerGroup.Consume(ctx, []<span class="hljs-keyword">string</span>{ConsumerTopic}, consumer)
        <span class="hljs-keyword">if</span> err != <span class="hljs-literal">nil</span> {
            log.Printf(<span class="hljs-string">"error from consumer: %v"</span>, err)
        }
        <span class="hljs-keyword">if</span> ctx.Err() != <span class="hljs-literal">nil</span> {
            <span class="hljs-keyword">return</span>
        }
    }
}

<span class="hljs-function"><span class="hljs-keyword">func</span> <span class="hljs-title">handleNotifications</span><span class="hljs-params">(ctx *gin.Context, store *NotificationStore)</span></span> {
    userID, err := getUserIDFromRequest(ctx)
    <span class="hljs-keyword">if</span> err != <span class="hljs-literal">nil</span> {
        ctx.JSON(http.StatusNotFound, gin.H{<span class="hljs-string">"message"</span>: err.Error()})
        <span class="hljs-keyword">return</span>
    }

    notes := store.Get(userID)
    <span class="hljs-keyword">if</span> <span class="hljs-built_in">len</span>(notes) == <span class="hljs-number">0</span> {
        ctx.JSON(http.StatusOK,
            gin.H{
                <span class="hljs-string">"message"</span>:       <span class="hljs-string">"No notifications found for user"</span>,
                <span class="hljs-string">"notifications"</span>: []models.Notification{},
            })
        <span class="hljs-keyword">return</span>
    }

    ctx.JSON(http.StatusOK, gin.H{<span class="hljs-string">"notifications"</span>: notes})
}

<span class="hljs-function"><span class="hljs-keyword">func</span> <span class="hljs-title">main</span><span class="hljs-params">()</span></span> {
    store := &amp;NotificationStore{
        data: <span class="hljs-built_in">make</span>(UserNotifications),
    }

    ctx, cancel := context.WithCancel(context.Background())
    <span class="hljs-keyword">go</span> setupConsumerGroup(ctx, store)
    <span class="hljs-keyword">defer</span> cancel()

    gin.SetMode(gin.ReleaseMode)
    router := gin.Default()
    router.GET(<span class="hljs-string">"/notifications/:userID"</span>, <span class="hljs-function"><span class="hljs-keyword">func</span><span class="hljs-params">(ctx *gin.Context)</span></span> {
        handleNotifications(ctx, store)
    })

    fmt.Printf(<span class="hljs-string">"Kafka CONSUMER (Group: %s) 👥📥 "</span>+
        <span class="hljs-string">"started at http://localhost%s\n"</span>, ConsumerGroup, ConsumerPort)

    <span class="hljs-keyword">if</span> err := router.Run(ConsumerPort); err != <span class="hljs-literal">nil</span> {
        log.Printf(<span class="hljs-string">"failed to run the server: %v"</span>, err)
    }
}
</code></pre>
<p>Let’s examine the Kafka-related operations within <code>consumer.go</code>:</p>
<p>Within the <code>**initializeConsumerGroup()**</code> function:</p>
<ul>
<li><code>config := sarama.NewConfig()</code>: Initializes a new default configuration for Kafka.</li>
<li><code>consumerGroup, err := sarama.NewConsumerGroup(…)</code>: Creates a new Kafka consumer group that connects to the broker running on <code>localhost:9092</code>. The group name is <code>"notifications-group"</code>.</li>
</ul>
<p>Inside the <code>**Consumer**</code> struct and its methods:</p>
<ul>
<li>The <code>Consumer</code> struct has a <code>store</code> field, which is a reference to the <code>NotificationStore</code> to keep track of the received notifications.</li>
<li><code>Setup()</code> and <code>Cleanup()</code> methods are required to satisfy the <code>sarama.ConsumerGroupHandler</code>  interface. While they will NOT be used in this tutorial, they can serve  potential roles for initialization and cleanup during message consumption but act as placeholders here.</li>
<li>In the <code>ConsumeClaim()</code> method: The consumer listens for new messages on the topic. For each message, it fetches the <code>userID</code> (the <code>Key</code> of the message), un-marshals the message into a <code>Notification</code> struct, and adds the notification to the <code>NotificationStore</code>.</li>
</ul>
<p>In the <code>**setupConsumerGroup()**</code> function:</p>
<ul>
<li>This function sets up the Kafka consumer group, listens for incoming messages, and processes them using the <code>Consumer</code> struct methods.</li>
<li>It runs a <code>for</code> loop indefinitely, consuming messages from the <code>“notifications”</code> topic and processing any errors that arise.</li>
</ul>
<p>The <code>**handleNotifications()**</code> function:</p>
<ul>
<li>Initially, it attempts to retrieve the <code>userID</code> from the request. If it doesn’t exist, it returns a <code>404 Not Found</code> status.</li>
<li>Then, it fetches the notifications for the provided user ID from the <code>NotificationStore</code>.  Depending on whether the user has notifications, it responds with a <code>200 OK</code> status and either an empty notifications slice or sends back the current notifications.</li>
</ul>
<p>Lastly, within the <code>**main()**</code> function:</p>
<ul>
<li><code>store := &amp;NotificationStore{…}</code>: Creates an instance of <code>NotificationStore</code> to hold the notifications</li>
<li><code>ctx, cancel := context.WithCancel(context.Background())</code>: Sets up a cancellable context that can be used to stop the consumer group.</li>
<li><code>go setupConsumerGroup(ctx, store)</code>: Starts the consumer group in a separate Goroutine, allowing it to operate concurrently without blocking the main thread.</li>
<li>The last step is to create a Gin <code>router</code> and define a GET endpoint <code>/notifications/:userID</code> that will fetch the notifications for a specific user via the <code>handleNotifications()</code> function when accessed.</li>
</ul>
<p>This setup provides a straightforward way to consume messages from the <code>"notifications"</code> topic and present them to users through a web endpoint.</p>
<h2 id="heading-lets-test-the-real-time-notification-system">Let's Test the Real-Time Notification System👨‍🔬🖥️👩‍🔬</h2>
<p>Now that the producer and consumer are ready, it’s time to see the system in action.</p>
<h3 id="heading-1-start-the-producer">1. Start the producer</h3>
<p>Open up a terminal, move into the <code>kafka-notify</code> directory, and run the producer with the following command:</p>
<pre><code class="lang-bash">go run cmd/producer/producer.go
</code></pre>
<h3 id="heading-2-start-the-consumer">2. Start the consumer</h3>
<p>Open up a second terminal window, navigate to the <code>kafka-notify</code> directory, and start the consumer by running:</p>
<pre><code class="lang-bash">go run cmd/consumer/consumer.go
</code></pre>
<h3 id="heading-3-sending-notifications">3. Sending notifications</h3>
<p>With both producer and consumer running, you can simulate sending notifications. Open up a third terminal and use the below <code>curl</code> commands to send notifications:</p>
<p><strong>User 1 (Emma) receives a notification from User 2 (Bruno):</strong></p>
<pre><code class="lang-bash">curl -X POST http://localhost:8080/send \
-d <span class="hljs-string">"fromID=2&amp;toID=1&amp;message=Bruno started following you."</span>
</code></pre>
<p><strong>User 2 (Bruno) receives a notification from User 1 (Emma):</strong></p>
<pre><code class="lang-bash">curl -X POST http://localhost:8080/send \
-d <span class="hljs-string">"fromID=1&amp;toID=2&amp;message=Emma mentioned you in a comment: 'Great seeing you yesterday, @Bruno!'"</span>
</code></pre>
<p><strong>User 1 (Emma) receives a notification from User 4 (Lena):</strong></p>
<pre><code class="lang-bash">curl -X POST http://localhost:8080/send \
-d <span class="hljs-string">"fromID=4&amp;toID=1&amp;message=Lena liked your post: 'My weekend getaway!'"</span>
</code></pre>
<h3 id="heading-4-retrieving-notifications">4. Retrieving notifications</h3>
<p>Finally, you can fetch the notifications of a specific user. You can use the below <code>curl</code> commands to fetch notifications:</p>
<p><strong>Retrieving notifications for User 1 (Emma):</strong></p>
<pre><code class="lang-bash">curl http://localhost:8081/notifications/1
</code></pre>
<p><strong>Output:</strong></p>
<pre><code class="lang-json">{<span class="hljs-attr">"notifications"</span>: [{<span class="hljs-attr">"from"</span>: {<span class="hljs-attr">"id"</span>: <span class="hljs-number">2</span>, <span class="hljs-attr">"name"</span>: <span class="hljs-string">"Bruno"</span>}, <span class="hljs-attr">"to"</span>: {<span class="hljs-attr">"id"</span>: <span class="hljs-number">1</span>, <span class="hljs-attr">"name"</span>: <span class="hljs-string">"Emma"</span>}, <span class="hljs-attr">"message"</span>: <span class="hljs-string">"Bruno started following you."</span>}]}
{<span class="hljs-attr">"notifications"</span>: [{<span class="hljs-attr">"from"</span>: {<span class="hljs-attr">"id"</span>: <span class="hljs-number">4</span>, <span class="hljs-attr">"name"</span>: <span class="hljs-string">"Lena"</span>}, <span class="hljs-attr">"to"</span>: {<span class="hljs-attr">"id"</span>: <span class="hljs-number">1</span>, <span class="hljs-attr">"name"</span>: <span class="hljs-string">"Emma"</span>}, <span class="hljs-attr">"message"</span>: <span class="hljs-string">"Lena liked your post: 'My weekend getaway!'"</span>}]}
</code></pre>
<p>In the above output, you see a <code>JSON</code> response listing all the notifications for <strong>Emma</strong>. As you send more notifications, they accumulate, and you can fetch them using the consumer’s API.</p>
<h2 id="heading-wrapping-up">Wrapping up 📝</h2>
<p>In this tutorial, you’ve learned how to set up a basic real-time notification system using Kafka in Go. </p>
<p>By simulating the process of users sending and fetching notifications, you’ve gained hands-on experience with Kafka components. This is a foundational step in understanding how Kafka can be integrated into Go applications for various real-time data processing tasks.</p>
<p>You can access the entire codebase for this project in the following GitHub repository: <a target="_blank" href="https://github.com/gutyoh/kafka-notify">https://github.com/gutyoh/kafka-notify</a></p>
<p>If you found this article helpful and want to expand your knowledge on Golang 🐿️, Docker 🐳, and Gin 🍸, consider checking out the <strong><a target="_blank" href="https://hyperskill.org/tracks/25?category=12&amp;utm_source=fc_hs&amp;utm_medium=social&amp;utm_campaign=hermann">Introduction to Go track</a></strong>, which covers basic Golang concepts. </p>
<p>Registered users can also check the <strong><a target="_blank" href="https://hyperskill.org/tracks/43?category=20&amp;utm_source=fc_hs&amp;utm_medium=social&amp;utm_campaign=hermann">Go Developer</a></strong> and <strong><a target="_blank" href="https://hyperskill.org/tracks/64?category=20&amp;utm_source=fc_hs&amp;utm_medium=social&amp;utm_campaign=hermann">Introduction to Docker</a></strong> tracks on Hyperskill. I've been actively involved as an expert in curating these tracks, ensuring they deliver top-notch educational content.</p>
<p>Before concluding, I owe a big thank you to my dear friend <strong>Anton Illarionov</strong>. His expertise in integrating Go with Kafka inspired the idea for this article. You can explore his projects on <a target="_blank" href="https://github.com/ant1k9">GitHub</a>.</p>
<p>Let me know if you have any questions or feedback regarding this article.</p>
<p>Thank you for reading, and keep on coding!</p>
 ]]>
                </content:encoded>
            </item>
        
            <item>
                <title>
                    <![CDATA[ The Apache Kafka Handbook – How to Get Started Using Kafka ]]>
                </title>
                <description>
                    <![CDATA[ Apache Kafka is an open-source event streaming platform that can transport huge volumes of data at very low latency. Companies like LinkedIn, Uber, and Netflix use Kafka to process trillions of events and petabtyes of data each day. Kafka was origina... ]]>
                </description>
                <link>https://www.freecodecamp.org/news/apache-kafka-handbook/</link>
                <guid isPermaLink="false">66d45edb787a2a3b05af43a0</guid>
                
                    <category>
                        <![CDATA[ Apache Kafka ]]>
                    </category>
                
                    <category>
                        <![CDATA[ big data ]]>
                    </category>
                
                    <category>
                        <![CDATA[ data ]]>
                    </category>
                
                <dc:creator>
                    <![CDATA[ Gerard Hynes ]]>
                </dc:creator>
                <pubDate>Fri, 03 Feb 2023 23:48:22 +0000</pubDate>
                <media:content url="https://www.freecodecamp.org/news/content/images/2023/02/apache-kafka-handbook.png" medium="image" />
                <content:encoded>
                    <![CDATA[ <p>Apache Kafka is an open-source event streaming platform that can transport huge volumes of data at very low latency.</p>
<p>Companies like LinkedIn, Uber, and Netflix use Kafka to process trillions of events and petabtyes of data each day.</p>
<p>Kafka was <a target="_blank" href="https://engineering.linkedin.com/27/project-kafka-distributed-publish-subscribe-messaging-system-reaches-v06">originally developed at LinkedIn</a>, to help handle their real-time data feeds. It's now maintained by the <a target="_blank" href="https://kafka.apache.org/">Apache Software Foundation</a>, and is widely adopted in industry (being used by 80% of Fortune 100 companies).</p>
<h2 id="heading-why-should-you-learn-apache-kafka">Why Should You Learn Apache Kafka?</h2>
<p>Kafka lets you:</p>
<ul>
<li><p>Publish and subscribe to streams of events</p>
</li>
<li><p>Store streams of events in the same order they happened</p>
</li>
<li><p>Process streams of events in real time</p>
</li>
</ul>
<p>The main thing Kafka does is help you efficiently connect diverse data sources with the many different systems that might need to use that data.</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2023/01/before-and-after-kafka-1.PNG" alt="Messy data integrations without Kafka, more organized data integrations with Kafka." width="600" height="400" loading="lazy"></p>
<p><em>Kafka helps you connect data sources to the systems using that data</em></p>
<p>Some of the things you can use Kafka for include:</p>
<ul>
<li><p>Personalizing recommendations for customers</p>
</li>
<li><p>Notifying passengers of flight delays</p>
</li>
<li><p>Payment processing in banking</p>
</li>
<li><p>Online fraud detection</p>
</li>
<li><p>Managing inventory and supply chains</p>
</li>
<li><p>Tracking order shipments</p>
</li>
<li><p>Collecting telemetry data from Internet of Things (IoT) devices</p>
</li>
</ul>
<p>What all these uses have in common is that they need to take in and process data in real time, often at huge scales. This is something Kafka excels at. To give one example, <a target="_blank" href="https://www.confluent.io/blog/running-kafka-at-scale-at-pinterest/">Pinterest uses Kafka to handle up to 40 million events per second</a>.</p>
<p>Kafka is distributed, which means it runs as a cluster of nodes spread across multiple servers. It's also replicated, meaning that data is copied in multiple locations to protect it from a single point of failure. This makes Kafka both scalable and fault-tolerant.</p>
<p>Kafka is also fast. It's optimized for high throughput, making effective use of disk storage and batched network requests.</p>
<p>This article will:</p>
<ul>
<li><p>Introduce you to the core concepts behind Kafka</p>
</li>
<li><p>Show you how to install Kafka on your own computer</p>
</li>
<li><p>Get you started with the Kafka Command Line Interface (CLI)</p>
</li>
<li><p>Help you build a simple Java application that produces and consumes events via Kafka</p>
</li>
</ul>
<p>Things the article won't cover:</p>
<ul>
<li><p>More advanced Kafka topics, such as security, performance, and monitoring</p>
</li>
<li><p>Deploying a Kafka cluster to a server</p>
</li>
<li><p>Using managed Kafka services like Amazon MSK or Confluent Cloud</p>
</li>
</ul>
<h2 id="heading-table-of-contents">Table of Contents</h2>
<ol>
<li><p><a class="post-section-overview" href="#heading-event-streaming-and-event-driven-architectures">Event Streaming and Event-Driven Architectures</a></p>
</li>
<li><p><a class="post-section-overview" href="#heading-core-kafka-concepts">Core Kafka Concepts</a><br> a. <a class="post-section-overview" href="#heading-event-messages-in-kafka">Event Messages in Kafka</a><br> b. <a class="post-section-overview" href="#heading-topics-in-kafka">Topics in Kafka</a><br> c. <a class="post-section-overview" href="#heading-partitions-in-kafka">Partitions in Kafka</a><br> d. <a class="post-section-overview" href="#heading-offsets-in-kafka">Offsets in Kafka</a><br> e. <a class="post-section-overview" href="#heading-brokers-in-kafka">Brokers in Kafka</a><br> f. <a class="post-section-overview" href="#heading-replication-in-kafka">Replication in Kafka</a><br> g. <a class="post-section-overview" href="#heading-producers-in-kafka">Producers in Kafka</a><br> h. <a class="post-section-overview" href="#heading-consumers-in-kafka">Consumers in Kafka</a><br> i. <a class="post-section-overview" href="#heading-consumer-groups-in-kafka">Consumer Groups in Kafka</a><br> j. <a class="post-section-overview" href="#heading-kafka-zookeeper">Kafka Zookeeper</a></p>
</li>
<li><p><a class="post-section-overview" href="#heading-how-to-install-kafka-on-your-computer">How to Install Kafka on Your Computer</a></p>
</li>
<li><p><a class="post-section-overview" href="#heading-how-to-start-zookeeper-and-kafka">How to Start Zookeeper and Kafka</a></p>
</li>
<li><p><a class="post-section-overview" href="#heading-the-kafka-cli">The Kafka CLI</a><br> a. <a class="post-section-overview" href="#heading-how-to-list-topics">How to List Topics</a><br> b. <a class="post-section-overview" href="#heading-how-to-create-a-topic">How to Create a Topic</a><br> c. <a class="post-section-overview" href="#heading-how-to-describe-topics">How to Describe Topics</a><br> d. <a class="post-section-overview" href="#heading-how-to-partition-a-topic">How to Partition a Topic</a><br> e. <a class="post-section-overview" href="#heading-how-to-set-a-replication-factor">How to Set a Replication Factor</a><br> f. <a class="post-section-overview" href="#heading-how-to-delete-a-topic">How to Delete a Topic</a><br> g. <a class="post-section-overview" href="#heading-how-to-use-kafka-console-producer">How to use <code>kafka-console-producer</code></a><br> h. <a class="post-section-overview" href="#heading-how-to-use-kafka-console-consumer">How to use <code>kafka-console-consumer</code></a><br> i. <a class="post-section-overview" href="#heading-how-to-use-kafka-consumer-groups">How to use <code>kafka-consumer-groups</code></a></p>
</li>
<li><p><a class="post-section-overview" href="#heading-how-to-build-a-kafka-client-app-with-java">How to Build a Kafka Client App with Java</a><br> a. <a class="post-section-overview" href="#heading-how-to-set-up-the-project">How to Set Up the Project</a><br> b. <a class="post-section-overview" href="#heading-how-to-install-the-dependencies">How to Install the Dependencies</a><br> c. <a class="post-section-overview" href="#heading-how-to-create-a-kafka-producer">How to Create a Kafka Producer</a><br> d. <a class="post-section-overview" href="#heading-how-to-send-multiple-messages-and-use-callbacks">How to Send Multiple Messages and Use Callbacks</a><br> e. <a class="post-section-overview" href="#heading-how-to-create-a-kafka-consumer">How to Create a Kafka Consumer</a><br> f. <a class="post-section-overview" href="#heading-how-to-shut-down-the-consumer">How to Shut Down the Consumer</a></p>
</li>
<li><p><a class="post-section-overview" href="#heading-where-to-take-it-from-here">Where to Take it From Here</a></p>
</li>
</ol>
<p>Before we dive into Kafka, we need some context on event streaming and event-driven architectures.</p>
<h2 id="heading-event-streaming-and-event-driven-architectures">Event Streaming and Event-Driven Architectures</h2>
<p>An event is a record that something happened, as well as information about what happened. For example: a customer placed an order, a bank approved a transaction, inventory management updated stock levels.</p>
<p>Events can triggers one or more processes to respond to them. For example: sending an email receipt, transmitting funds to an account, updating a real-time dashboard.</p>
<p>Event streaming is the process of capturing events in real-time from sources (such as web applications, databases, or sensors) to create streams of events. These streams are potentially unending sequences of records.</p>
<p>The event stream can be stored, processed, and sent to different destinations, also called sinks. The destinations that consume the streams could be other applications, databases, or data pipelines for further processing.</p>
<p>As applications have become more complex, often being broken up into different microservices distributed across multiple data centers, many organizations have adopted an event-driven architecture for their applications.</p>
<p>This means that instead of parts of your application directly asking each other for updates about what happened, they each publish events to event streams. Other parts of the application continuously subscribe to these streams and only act when they receive an event that they are interested in.</p>
<p>This architecture helps ensure that if part of your application goes down, other parts won't also fail. Additionally, you can add new features by adding new subscribers to the event stream, without having to rewrite the existing codebase.</p>
<h2 id="heading-core-kafka-concepts">Core Kafka Concepts</h2>
<p>Kafka has become one of the most popular ways to implement event streaming and event-driven architectures. But it does have a bit of a learning curve and you need to understand a couple of concepts before you can make effective use of it.</p>
<p>These core concepts are:</p>
<ul>
<li><p>event messages</p>
</li>
<li><p>topics</p>
</li>
<li><p>partitions</p>
</li>
<li><p>offsets</p>
</li>
<li><p>brokers</p>
</li>
<li><p>producers</p>
</li>
<li><p>consumers</p>
</li>
<li><p>consumer groups</p>
</li>
<li><p>Zookeeper</p>
</li>
</ul>
<h3 id="heading-event-messages-in-kafka">Event Messages in Kafka</h3>
<p>When you write data to Kafka, or read data from it, you do this in the form of messages. You'll also see them called events or records.</p>
<p>A message consists of:</p>
<ul>
<li><p>a key</p>
</li>
<li><p>a value</p>
</li>
<li><p>a timestamp</p>
</li>
<li><p>a compression type</p>
</li>
<li><p>headers for metadata (optional)</p>
</li>
<li><p>partition and offset id (once the message is written to a topic)</p>
</li>
</ul>
<p><img src="https://www.freecodecamp.org/news/content/images/2023/01/kafka-message-anatomy.PNG" alt="A Kafka message consisting of key, value, timestamp, compression type, and headers." width="600" height="400" loading="lazy"></p>
<p><em>A Kafka message consisting of key, value, timestamp, compression type, and headers</em></p>
<p>Every event in Kafka is, at its simplest, a key-value pair. These are serialized into binary, since Kafka itself handles arrays of bytes rather than complex language-specific objects.</p>
<p><strong>Keys</strong> are usually strings or integers and aren't unique for every message. Instead, they point to a particular entity in the system, such as a specific user, order, or device. Keys can be null, but when they are included they are used for dividing topics into partitions (more on partitions below).</p>
<p>The message <strong>value</strong> contains details about the event that happened. This could be as simple as a string or as complex as an object with many nested properties. Values can be null, but usually aren't.</p>
<p>By default, the <strong>timestamp</strong> records when the message was created. You can overwrite this if your event actually occurred earlier and you want to record that time instead.</p>
<p>Messages are usually small (less than 1 MB) and sent in a standard data format, such as JSON, Avro, or Protobuf. Even so, they can be compressed to save on data. The <strong>compression type</strong> can be set to <code>gzip</code>, <code>lz4</code>, <code>snappy</code>, <code>zstd</code>, or <code>none</code>.</p>
<p>Events can also optionally have <strong>headers</strong>, which are key-value pairs of strings containing metadata, such as where the event originated from or where you want it routed to.</p>
<p>Once a message is sent into a Kafka topic, it also receives a partition number and offset id (more about these later).</p>
<h3 id="heading-topics-in-kafka">Topics in Kafka</h3>
<p>Kafka stores messages in a <strong>topic</strong>, an ordered sequence of events, also called an event log.</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2023/01/topic.PNG" alt="A Kafka topic containing messages, each with a unique offset." width="600" height="400" loading="lazy"></p>
<p><em>A Kafka topic containing messages, each with a unique offset</em></p>
<p>Different topics are identified by their names and will store different kinds of events. For example a social media application might have <code>posts</code>, <code>likes</code>, and <code>comments</code> topics to record every time a user creates a post, likes a post, or leaves a comment.</p>
<p>Multiple applications can write to and read from the same topic. An application might also read messages from one topic, filter or transform the data, and then write the result to another topic.</p>
<p>One important feature of topics is that they are append-only. When you write a message to a topic, it's added to the end of the log. Events in a topic are immutable. Once they're written to a topic, you can't change them.</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2023/01/producer-to-topics-consumer-from-topics.PNG" alt="A Producer writing events to topics and a Consumer reading events from topics." width="600" height="400" loading="lazy"></p>
<p><em>A Producer writing events to topics and a Consumer reading events from topics</em></p>
<p>Unlike with messaging queues, reading an event from a topic doesn't delete it. Events can be read as often as needed, perhaps several times by multiple different applications.</p>
<p>Topics are also durable, holding onto messages for a specific period (by default 7 days) by saving them to physical storage on disk.</p>
<p>You can configure topics so that messages expire after a certain amount of time, or when a certain amount of storage is exceeded. You can even store messages indefinitely as long as you can pay for the storage costs.</p>
<h3 id="heading-partitions-in-kafka">Partitions in Kafka</h3>
<p>In order to help Kafka to scale, topics can be divided into <strong>partitions</strong>. This breaks up the event log into multiple logs, each of which lives on a separate node in the Kafka cluster. This means that the work of writing and storing messages can be spread across multiple machines.</p>
<p>When you create a topic, you specify the amount of partitions it has. The partitions are themselves numbered, starting at 0. When a new event is written to a topic, it's appended to one of the topic's partitions.</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2023/01/partitioned-topic.PNG" alt="A topic divided into three partitions." width="600" height="400" loading="lazy"></p>
<p><em>A topic divided into three partitions</em></p>
<p>If messages have no key, they will be evenly distributed among partitions in a round robin manner: partition 0, then partition 1, then partition 2, and so on. This way, all partitions get an even share of the data but there's no guarantee about the ordering of messages.</p>
<p>Messages that have the same key will always be sent to the same partition, and in the same order. The key is run through a hashing function which turns it into an integer. This output is then used to select a partition.</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2023/01/messages-with-without-keys.PNG" alt="Messages without keys being sent across partitions while messages with the same keys are sent to the same partition" width="600" height="400" loading="lazy"></p>
<p><em>Messages without keys are sent across partitions, while messages with the same keys are sent to the same partition</em></p>
<p>Messages within each partition are guaranteed to be ordered. For example, all messages with the same <code>customer_id</code> as their key will be sent to the same partition in the order in which Kafka received them.</p>
<h3 id="heading-offsets-in-kafka">Offsets in Kafka</h3>
<p>Each message in a partition gets an id that is an incrementing integer, called an <strong>offset</strong>. Offsets start at 0 and are incremented every time Kafka writes a message to a partition. This means that each message in a given partition has a unique offset.</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2023/01/offsets.PNG" alt="Three partitions with offsets. Offsets are unique within a partition but not between partitions" width="600" height="400" loading="lazy"></p>
<p><em>Offsets are unique within a partition but not between partitions</em></p>
<p>Offsets are not reused, even when older messages get deleted. They continue to increment, giving each new message in the partition a unique id.</p>
<p>When data is read from a partition, it is read in order from the lowest existing offset upwards. We'll see more about offsets when we cover Kafka consumers.</p>
<h3 id="heading-brokers-in-kafka">Brokers in Kafka</h3>
<p>A single "server" running Kafka is called a <strong>broker</strong>. In reality, this might be a Docker container running in a virtual machine. But it can be a helpful mental image to think of brokers as individual servers.</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2023/01/cluster-with-three-brokers.PNG" alt="A Kafka cluster made up of three brokers" width="600" height="400" loading="lazy"></p>
<p><em>A Kafka cluster made up of three brokers</em></p>
<p>Multiple brokers working together make up a Kafka cluster. There might be a handful of brokers in a cluster, or more than 100. When a client application connects to one broker, Kafka automatically connects it to every broker in the cluster.</p>
<p>By running as a cluster, Kafka becomes more scalable and fault-tolerant. If one broker fails, the others will take over its work to ensure there is no downtime or data loss.</p>
<p>Each broker manages a set of partitions and handles requests to write data to or read data from these partitions. Partitions for a given topic will be spread evenly across the brokers in a cluster to help with load balancing. Brokers also manage replicating partitions to keep their data backed up.</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2023/01/brokers-with-partitions.PNG" alt="Partitions spread across brokers" width="600" height="400" loading="lazy"></p>
<p><em>Partitions spread across brokers</em></p>
<h3 id="heading-replication-in-kafka">Replication in Kafka</h3>
<p>To protect against data loss if a broker fails, Kafka writes the same data to copies of a partition on multiple brokers. This is called <strong>replication</strong>.</p>
<p>The main copy of a partition is called the leader, while the replicas are called followers.</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2023/01/brokers-replication.PNG" alt="The data from the leader partition is copied to follower partitions on different brokers" width="600" height="400" loading="lazy"></p>
<p><em>The data from the leader partition is copied to follower partitions on different brokers</em></p>
<p>When a topic is created, you set a replication factor for it. This controls how many replicas get written to. A replication factor of three is common, meaning data gets written to one leader and replicated to two followers. So even if two brokers failed, your data would still be safe.</p>
<p>Whenever you write messages to a partition, you're writing to the leader partition. Kafka then automatically copies these messages to the followers. As such, the logs on the followers will have the same messages and offsets as on the leader.</p>
<p>Followers that are up to date with the leader are called <strong>In-Sync Replicas</strong> (ISRs). Kafka considers a message to be committed once a minimum number of replicas have saved it to their logs. You can configure this to get higher throughput at the expense of less certainty that a message has been backed up.</p>
<h3 id="heading-producers-in-kafka">Producers in Kafka</h3>
<p>Producers are client applications that write events to Kafka topics. These apps aren't themselves part of Kafka – you write them.</p>
<p>Usually you will use a library to help manage writing events to Kafka. There is an official client library for Java as well as dozens of community-supported libraries for languages such as Scala, JavaScript, Go, Rust, Python, C#, and C++.</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2023/01/producer-writing-to-topics.PNG" alt="A Producer application writing to multiple topics" width="600" height="400" loading="lazy"></p>
<p><em>A Producer application writing to multiple topics</em></p>
<p>Producers are totally decoupled from consumers, which read from Kafka. They don't know about each other and their speed doesn't affect each other. Producers aren't affected if consumers fail, and the same is true for consumers.</p>
<p>If you need to, you could write an application that writes certain events to Kafka and reads other events from Kafka, making it both a producer and a consumer.</p>
<p>Producers take a key-value pair, generate a Kafka message, and then serialize it into binary for transmission across the network. You can adjust the configuration of producers to batch messages together based on their size or some fixed time limit to optimize writing messages to the Kafka brokers.</p>
<p>It's the producer that decides which partition of a topic to send each message to. Again, messages without keys will be distributed evenly among partitions, while messages with keys are all sent to the same partition.</p>
<h3 id="heading-consumers-in-kafka">Consumers in Kafka</h3>
<p>Consumers are client applications that read messages from topics in a Kafka cluster. Like with producers, you write these applications yourself and can make use of client libraries to support the programming language your application is built with.</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2023/01/consumer-reading-from-topics.PNG" alt="A Consumer reading messages from multiple topics." width="600" height="400" loading="lazy"></p>
<p><em>A Consumer reading messages from multiple topics</em></p>
<p>Consumers can read from one or more partitions within a topic, and from one or more topics. Messages are read in order within a partition, from the lowest available offset to the highest. But if a consumer reads data from several partitions in the same topic, the message order <strong>between</strong> these partitions is not guaranteed.</p>
<p>For example, a consumer might read messages from partition 0, then partition 2, then partition 1, then back to partition 0. The messages from partition 0 will be read in order, but there might be messages from the other partitions mixed among them.</p>
<p>It's important to remember that reading a message does not delete it. The message is still available to be read by any other consumer that needs to access it. It's normal for multiple consumers to read from the same topic if they each have uses for the data in it.</p>
<p>By default, when a consumer starts up it will read from the current offset in a partition. But consumers can also be configured to go back and read from the oldest existing offset.</p>
<p>Consumers deserialize messages, converting them from binary into a collection of key-value pairs that your application can then work with. The format of a message should not change during a topic's lifetime or your producers and consumers won't be able to serialize and deserialize it correctly.</p>
<p>One thing to be aware of is that consumers request messages from Kafka, it doesn't push messages to them. This protects consumers from becoming overwhelmed if Kafka is handling a high volume of messages. If you want to scale consumers, you can run multiple instances of a consumer together in a <strong>consumer group</strong>.</p>
<h3 id="heading-consumer-groups-in-kafka">Consumer Groups in Kafka</h3>
<p>An application that reads from Kafka can create multiple instances of the same consumer to split up the work of reading from different partitions in a topic. These consumers work together as a <strong>consumer group</strong>.</p>
<p>When you create a consumer, you can assign it a group id. All consumers in a group will have the same group id.</p>
<p>You can create consumer instances in a group up to the number of partitions in a topic. So if you have a topic with 5 partitions, you can create up to 5 instances of the same consumer in a consumer group. If you ever have more consumers in a group than partitions, the extra consumer will remain idle.</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2023/01/consumer-group.PNG" alt="Consumers in a consumer group reading messages from a topic's partitions" width="600" height="400" loading="lazy"></p>
<p><em>Consumers in a consumer group reading messages from a topic's partitions</em></p>
<p>If you add another consumer instance to a consumer group, Kafka will automatically redistribute the partitions among the consumers in a process called <strong>rebalancing</strong>.</p>
<p>Each partition is only assigned to one consumer in a group, but a consumer can read from multiple partitions. Also, multiple different consumer groups (meaning different applications) can read from the same topic at the same time.</p>
<p>Kafka brokers use an internal topic called <code>__consumer_offsets</code> to keep track of which messages a specific consumer group has successfully processed.</p>
<p>As a consumer reads from a partition, it regularly saves the offset it has read up to and sends this data to the broker it is reading from. This is called the <strong>consumer offset</strong> and is handled automatically by most client libraries.</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2023/01/consumer-committing-offsets.PNG" alt="A Consumer committing the offsets it has read up to." width="600" height="400" loading="lazy"></p>
<p><em>A Consumer committing the offsets it has read up to</em></p>
<p>If a consumer crashes, the consumer offset helps the remaining consumers to know where to start from when they take over reading from the partition.</p>
<p>The same thing happens if a new consumer is added to the group. The consumer group rebalances, the new consumer is assigned a partition, and it picks up reading from the consumer offset of that partition.</p>
<h3 id="heading-kafka-zookeeper">Kafka Zookeeper</h3>
<p>One other topic that we briefly need to cover here is how Kafka clusters are managed. Currently this is usually done using <a target="_blank" href="https://zookeeper.apache.org/">Zookeeper</a>, a service for managing and synchronizing distributed systems. Like Kafka, it's maintained by the Apache Foundation.</p>
<p>Kafka uses Zookeeper to manage the brokers in a cluster, and requires Zookeeper even if you're running a Kafka cluster with only one broker.</p>
<p>Recently, a proposal has been accepted to remove Zookeeper and have Kafka manage itself (<a target="_blank" href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-500%3A+Replace+ZooKeeper+with+a+Self-Managed+Metadata+Quorum">KIP-500</a>), but this is not yet widely used in production.</p>
<p>Zookeeper keeps track of things like:</p>
<ul>
<li><p>Which brokers are part of a Kafka cluster</p>
</li>
<li><p>Which broker is the leader for a given partition</p>
</li>
<li><p>How topics are configured, such as the number of partitions and the location of replicas</p>
</li>
<li><p>Consumer groups and their members</p>
</li>
<li><p>Access Control Lists – who is allowed to write to and read from each topic</p>
</li>
</ul>
<p><img src="https://www.freecodecamp.org/news/content/images/2023/01/zookeeper-ensemble-1.PNG" alt="A Zookeeper ensemble managing the brokers in a Kafka cluster." width="600" height="400" loading="lazy"></p>
<p><em>A Zookeeper ensemble managing the brokers in a Kafka cluster</em></p>
<p>Zookeeper itself runs as a cluster called an ensemble. This means that Zookeeper can keep working even if one node in the cluster fails. New data gets written to the ensemble's leader and replicated to the followers. Your Kafka brokers can read this data from any of the Zookeeper nodes in the ensemble.</p>
<p>Now that you understand the main concepts behind Kafka, let's get some hands-on practice working with Kafka.</p>
<p>You're going to install Kafka on your own computer, practice interacting with Kafka brokers from the command line, and then build a simple producer and consumer application with Java.</p>
<h2 id="heading-how-to-install-kafka-on-your-computer">How to Install Kafka on Your Computer</h2>
<p>At the time of writing this guide, the latest stable version of Kafka is 3.3.1. Check <a target="_blank" href="https://kafka.apache.org/downloads">kafka.apache.org/downloads</a> to see if there is a more recent stable version. If there is, you can replace "3.3.1" with the latest stable version in all of the following instructions.</p>
<h3 id="heading-install-kafka-on-macos">Install Kafka on macOS</h3>
<p>If you're using macOS, I recommend using Homebrew to install Kafka. It will make sure you have Java installed before it installs Kafka.</p>
<p>If you don't already have Homebrew installed, install it by following the instructions at <a target="_blank" href="https://brew.sh/">brew.sh</a>.</p>
<p>Next, run <code>brew install kafka</code> in a terminal. This will install Kafka's binaries at <code>usr/local/bin</code>.</p>
<p>Finally, run <code>kafka-topics --version</code> in a terminal and you should see <code>3.3.1</code>. If you do, you're all set.</p>
<p>To make it easier to work with Kafka, you can add Kafka to the <code>PATH</code> environment variable. Open your <code>~/.bashrc</code> (if using Bash) or <code>~/.zshrc</code> (if using Zsh) and add the following line, replacing <code>USERNAME</code> with your username:</p>
<pre><code class="lang-python">PATH=<span class="hljs-string">"$PATH:/Users/USERNAME/kafka_2.13-3.3.1/bin"</span>
</code></pre>
<p>You'll need to close your terminal for this change to take effect.</p>
<p>Now, if you run <code>echo $PATH</code> you should see that the Kafka <code>bin</code> directory has been added to your path.</p>
<h3 id="heading-install-kafka-on-windows-wsl2-and-linux">Install Kafka on Windows (WSL2) and Linux</h3>
<p>Kafka isn't natively supported on Windows, so you will need to use either WSL2 or Docker. I'm going to show you WSL2 since it's the same steps as Linux.</p>
<p>To set up WSL2 on Widows, follow <a target="_blank" href="https://learn.microsoft.com/en-us/windows/wsl/install">the instructions in the official docs</a>.</p>
<p>From here on, the instructions are the same for both WSL2 and Linux.</p>
<p>First, install Java 11 by running the following commands:</p>
<pre><code class="lang-python">wget -O- https://apt.corretto.aws/corretto.key | sudo apt-key add - 

sudo add-apt-repository <span class="hljs-string">'deb https://apt.corretto.aws stable main'</span>

sudo apt-get update; sudo apt-get install -y java<span class="hljs-number">-11</span>-amazon-corretto-jdk
</code></pre>
<p>Once this has finished, run <code>java -version</code> and you should see something like:</p>
<pre><code class="lang-python">openjdk version <span class="hljs-string">"11.0.17"</span> <span class="hljs-number">2022</span><span class="hljs-number">-10</span><span class="hljs-number">-18</span> LTS
OpenJDK Runtime Environment Corretto<span class="hljs-number">-11.0</span><span class="hljs-number">.17</span><span class="hljs-number">.8</span><span class="hljs-number">.1</span> (build <span class="hljs-number">11.0</span><span class="hljs-number">.17</span>+<span class="hljs-number">8</span>-LTS)
OpenJDK <span class="hljs-number">64</span>-Bit Server VM Corretto<span class="hljs-number">-11.0</span><span class="hljs-number">.17</span><span class="hljs-number">.8</span><span class="hljs-number">.1</span> (build <span class="hljs-number">11.0</span><span class="hljs-number">.17</span>+<span class="hljs-number">8</span>-LTS, mixed mode)
</code></pre>
<p>From your root directory, download Kafka with the following command:</p>
<pre><code class="lang-bash">wget https://archive.apache.org/dist/kafka/3.3.1/kafka_2.13-3.3.1.tgz
</code></pre>
<p>The <code>2.13</code> means it is using version <code>2.13</code> of Scala, while <code>3.3.1</code> refers to the Kafka version.</p>
<p>Extract the contents of the download with:</p>
<pre><code class="lang-bash">tar xzf kafka_2.13-3.3.1.tgz
</code></pre>
<p>If you run <code>ls</code>, you'll now see <code>kafka_2.13-3.3.1</code> in your root directory.</p>
<p>To make it easier to work with Kafka, you can add Kafka to the <code>PATH</code> environment variable. Open your <code>~/.bashrc</code> (if using Bash) or <code>~/.zshrc</code> (if using Zsh) and add the following line, replacing <code>USERNAME</code> with your username:</p>
<pre><code class="lang-python">PATH=<span class="hljs-string">"$PATH:home/USERNAME/kafka_2.13-3.3.1/bin"</span>
</code></pre>
<p>You'll need to close your terminal for this change to take effect.</p>
<p>Now, if you run <code>echo $PATH</code> you should see that the Kafka <code>bin</code> directory has been added to your path.</p>
<p>Run <code>kafka-topics.sh --version</code> in a terminal and you should see <code>3.3.1</code>. If you do, you're all set.</p>
<h2 id="heading-how-to-start-zookeeper-and-kafka">How to Start Zookeeper and Kafka</h2>
<p>Since Kafka uses Zookeeper to manage clusters, you need to start Zookeeper before you start Kafka.</p>
<h3 id="heading-how-to-start-kafka-on-macos">How to Start Kafka on macOS</h3>
<p>In one terminal window, start Zookeeper with:</p>
<pre><code class="lang-bash">/usr/<span class="hljs-built_in">local</span>/bin/zookeeper-server-start /usr/<span class="hljs-built_in">local</span>/etc/zookeeper/zoo.cfg
</code></pre>
<p>In another terminal window, start Kafka with:</p>
<pre><code class="lang-bash">/usr/<span class="hljs-built_in">local</span>/bin/kafka-server-start /usr/<span class="hljs-built_in">local</span>/etc/kafka/server.properties
</code></pre>
<p>While using Kafka, you need to keep both these terminal windows open. Closing them will shut down Kafka.</p>
<h3 id="heading-how-to-start-kafka-on-windows-wsl2-and-linux">How to Start Kafka on Windows (WSL2) and Linux</h3>
<p>In one terminal window, start Zookeeper with:</p>
<pre><code class="lang-bash">~/kafka_2.13-3.3.1/bin/zookeeper-server-start.sh ~/kafka_2.13-3.3.1/config/zookeeper.properties
</code></pre>
<p>In another terminal window, start Kafka with:</p>
<pre><code class="lang-bash">~/kafka_2.13-3.3.1/bin/kafka-server-start.sh ~/kafka_2.13-3.3.1/config/server.properties
</code></pre>
<p>While using Kafka, you need to keep both these terminal windows open. Closing them will shut down Kafka.</p>
<p>Now that you have Kafka installed and running on your machine, it's time to get some hands-on practice.</p>
<h2 id="heading-the-kafka-cli">The Kafka CLI</h2>
<p>When you install Kafka, it comes with a Command Line Interface (CLI) that lets you create and manage topics, as well as produce and consume events.</p>
<p>First, make sure Zookeeper and Kafka are running in two terminal windows.</p>
<p>In a third terminal window, run <code>kafka-topics.sh</code> (on WSL2 or Linux) or <code>kafka-topics</code> (on macOS) to make sure the CLI is working. You'll see a list of all the options you can pass to the CLI.</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2023/01/kafka-topics-sh.PNG" alt="A terminal displaying kafka-topics options." width="600" height="400" loading="lazy"></p>
<p><em>kafka-topics options</em></p>
<p><strong>Note:</strong> When working with the Kafka CLI, the command will be <code>kafka-topics.sh</code> on WSL2 and Linux. It will be <code>kafka-topics.sh</code> on macOS if you directly installed the Kafka binaries and <code>kafka-topics</code> if you used Homebrew. So if you're using Homebrew, remove the <code>.sh</code> extension from the example commands in this section.</p>
<h3 id="heading-how-to-list-topics">How to List Topics</h3>
<p>To see the topics available on the Kafka broker on your local machine, use:</p>
<pre><code class="lang-bash">kafka-topics.sh --bootstrap-server localhost:9092 --list
</code></pre>
<p>This means "Connect to the Kafka broker running on localhost:9092 and list all topics there". <code>--bootstrap-server</code> refers to the Kafka broker you are trying to connect to and <code>localhost:9092</code> is the IP address it's running at. You won't see any output since you haven't created any topics yet.</p>
<h3 id="heading-how-to-create-a-topic">How to Create a Topic</h3>
<p>To create a topic (with the default replication factor and number of partitions), use the <code>--create</code> and <code>--topic</code> options and pass them a topic name:</p>
<pre><code class="lang-bash">kafka-topics.sh --bootstrap-server localhost:9092 --create --topic my_first_topic
</code></pre>
<p>If you use an <code>_</code> or <code>.</code> in your topic name, you will see the following warning:</p>
<pre><code class="lang-python">WARNING: Due to limitations <span class="hljs-keyword">in</span> metric names, topics <span class="hljs-keyword">with</span> a period (<span class="hljs-string">'.'</span>) <span class="hljs-keyword">or</span> underscore (<span class="hljs-string">'_'</span>) could collide. To avoid issues it <span class="hljs-keyword">is</span> best to use either, but <span class="hljs-keyword">not</span> both.
</code></pre>
<p>Since Kafka could confuse <code>my.first.topic</code> with <code>my_first_topic</code>, it's best to only use either underscores or periods when naming topics.</p>
<h3 id="heading-how-to-describe-topics">How to Describe Topics</h3>
<p>To describe the topics on a broker, use the <code>--describe</code> option:</p>
<pre><code class="lang-bash">kafka-topics.sh --bootstrap-server localhost:9092 --describe
</code></pre>
<p>This will print the details of all the topics on this broker, including the number of partitions and their replication factor. By default, these will both be set to <code>1</code>.</p>
<p>If you add the <code>--topic</code> option and the name of a topic, it will describe only that topic:</p>
<pre><code class="lang-bash">kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic my_first_topic
</code></pre>
<h3 id="heading-how-to-partition-a-topic">How to Partition a Topic</h3>
<p>To create a topic with multiple partitions, use the <code>--partitions</code> option and pass it a number:</p>
<pre><code class="lang-bash">kafka-topics.sh --bootstrap-server localhost:9092 --create --topic my_second_topic --partitions 3
</code></pre>
<h3 id="heading-how-to-set-a-replication-factor">How to Set a Replication Factor</h3>
<p>To create a topic with a replication factor higher than the default, use the <code>--replication-factor</code> option and pass it a number:</p>
<pre><code class="lang-bash">kafka-topics.sh --bootstrap-server localhost:9092 --create --topic my_third_topic --partitions 3 --replication-factor 3
</code></pre>
<p>You should get the following error:</p>
<pre><code class="lang-bash">ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 2 larger than available brokers: 1.
</code></pre>
<p>Since you're only running one Kafka broker on your machine, you can't set a replication factor higher than one. If you were running a cluster with multiple brokers, you could set a replication factor as high as the total number of brokers.</p>
<h3 id="heading-how-to-delete-a-topic">How to Delete a Topic</h3>
<p>To delete a topic, use the <code>--delete</code> option and specify a topic with the <code>--topic</code> option:</p>
<pre><code class="lang-bash">kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic my_first_topic
</code></pre>
<p>You won't get any output to say the topic was deleted but you can check using <code>--list</code> or <code>--describe</code>.</p>
<h3 id="heading-how-to-use-kafka-console-producer">How to Use <code>kafka-console-producer</code></h3>
<p>You can produce messages to a topic from the command line using <code>kafka-console-producer</code>.</p>
<p>Run <code>kafka-console-producer.sh</code> to see the options you can pass to it.</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2023/01/kafka-console-producer.PNG" alt="Terminal showing kafka-console-producer options." width="600" height="400" loading="lazy"></p>
<p><em>kafka-console-producer options</em></p>
<p>To create a producer connected to a specific topic, run:</p>
<pre><code class="lang-bash">kafka-console-producer.sh --bootstrap-server localhost:9092 --topic TOPIC_NAME
</code></pre>
<p>Let's produce messages to the <code>my_first_topic</code> topic.</p>
<pre><code class="lang-bash">kafka-console-producer.sh --bootstrap-server localhost:9092 --topic my_first_topic
</code></pre>
<p>Your prompt will change and you will be able to type text. Press <code>enter</code> to send that message. You can keep sending messages until you press <code>ctrl</code> + <code>c</code>.</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2023/01/kafka-console-producer-sample-messages.PNG" alt="Sending messages using kafka-console-producer" width="600" height="400" loading="lazy"></p>
<p><em>Sending messages using kafka-console-producer</em></p>
<p>If you produce messages to a topic that doesn't exist, you'll get a warning, but the topic will be created and the messages will still get sent. It's better to create a topic in advance, however, so you can specify partitions and replication.</p>
<p>By default, the messages sent from <code>kafka-console-producer</code> have their keys set to <code>null</code>, and so they will be evenly distributed to all partitions.</p>
<p>You can set a key by using the <code>--property</code> option to set <code>parse.key</code> to be true and providing a key separator, such as <code>:</code></p>
<p>For example, we can create a <code>books</code> topic and use the books' genre as a key.</p>
<pre><code class="lang-bash">kafka-topics.sh --bootstrap-server localhost:9092 --topic books --create

kafka-console-producer.sh --bootstrap-server localhost:9092 --topic books --property parse.key=<span class="hljs-literal">true</span> --property key.separator=:
</code></pre>
<p>Now you can enter keys and values in the format <code>key:value</code>. Anything to the left of the key separator will be interpreted as a message key, anything to the right as a message value.</p>
<pre><code class="lang-python">science_fiction:All Systems Red
fantasy:Uprooted
horror:Mexican Gothic
</code></pre>
<p><img src="https://www.freecodecamp.org/news/content/images/2023/01/producing-messages-with-keys.PNG" alt="Producing messages with keys and values." width="600" height="400" loading="lazy"></p>
<p><em>Producing messages with keys and values</em></p>
<p>Now that you've produced messages to a topic from the command line, it's time to consume those messages from the command line.</p>
<h3 id="heading-how-to-use-kafka-console-consumer">How to Use <code>kafka-console-consumer</code></h3>
<p>You can consumer messages from a topic from the command line using <code>kafka-console-consumer</code>.</p>
<p>Run <code>kafka-console-consumer.sh</code> to see the options you can pass to it.</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2023/01/kafka-console-consumer.PNG" alt="Terminal showing kafka-console-consumer options" width="600" height="400" loading="lazy"></p>
<p><em>kafka-console-consumer options</em></p>
<p>To create a consumer, run:</p>
<pre><code class="lang-bash">kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TOPIC_NAME
</code></pre>
<p>When you start a consumer, by default it will read messages as they are written to the end of the topic. It won't read messages that were previously sent to the topic.</p>
<p>If you want to read the messages you already sent to a topic, use the <code>--from-beginning</code> option to read from the beginning of the topic:</p>
<pre><code class="lang-bash">kafka-console-consumer --bootstrap-server localhost:9092 --topic my_first_topic --from-beginning
</code></pre>
<p>The messages might appear "out of order". Remember, messages are ordered <strong>within</strong> a partition but ordering can't be guaranteed <strong>between</strong> partitions. If you don't set a key, they will be sent round robin between partitions and ordering isn't guaranteed.</p>
<p>You can display additional information about messages, such as their key and timestamp, by using the <code>--property</code> option and setting the <code>print</code> property to true.</p>
<p>Use the <code>--formatter</code> option to set the message formatter and the <code>--property</code> option to select which message properties to print.</p>
<pre><code class="lang-bash">kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my_first_topic --from-beginning --formatter kafka.tools.DefaultMessageFormatter --property print.timestamp=<span class="hljs-literal">true</span> --property print.key=<span class="hljs-literal">true</span> --property print.value=<span class="hljs-literal">true</span>
</code></pre>
<p><img src="https://www.freecodecamp.org/news/content/images/2023/01/consuming-messages-from-a-topic-1.PNG" alt="Consuming messages from a topic" width="600" height="400" loading="lazy"></p>
<p><em>Consuming messages from a topic</em></p>
<p>We get the messages' timestamp, key, and value. Since we didn't assign any keys when we sent these messages to <code>my_first_topic</code>, their <code>key</code> is <code>null</code>.</p>
<h3 id="heading-how-to-use-kafka-consumer-groups">How to Use <code>kafka-consumer-groups</code></h3>
<p>You can run consumers in a consumer group using the Kafka CLI. To view the documentation for this, run:</p>
<pre><code class="lang-bash">kafka-consumer-groups.sh
</code></pre>
<p><img src="https://www.freecodecamp.org/news/content/images/2023/01/kafka-consumer-groups.PNG" alt="kafka-consumer-groups options" width="600" height="400" loading="lazy"></p>
<p><em>kafka-consumer-groups options</em></p>
<p>First, create a topic with three partitions. Each consumer in a group will consume from one partition. If there are more consumers than partitions, any extra consumers will be idle.</p>
<pre><code class="lang-bash">kafka-topics.sh --bootstrap-server localhost:9092 --topic fantasy_novels --create --partitions 3
</code></pre>
<p>You add a consumer to a group when you create it using the <code>--group</code> option. If you run the same command multiple times with the same group name, each new consumer will be added to the group.</p>
<p>To create the first consumer in your consumer group, run:</p>
<pre><code class="lang-bash">kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic fantasy_novels --group fantasy_consumer_group
</code></pre>
<p>Next, open two new terminal windows and run the same command again to add a second and third consumer to the consumer group.</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2023/01/three-consumers-in-group.PNG" alt="Three consumers running in a consumer group." width="600" height="400" loading="lazy"></p>
<p><em>Three consumers running in a consumer group</em></p>
<p>In a different terminal window, create a producer and send a few messages with keys to the topic.</p>
<p><strong>Note:</strong> Since Kafka 2.4, Kafka will send messages in batches to one "sticky" partition for better performance. In order to demonstrate messages being sent round robin between partitions (without sending a large volume of messages), we can set the partitioner to <code>RoundRobinPartitioner</code>.</p>
<pre><code class="lang-bash">kafka-console-producer.sh --bootstrap-server localhost:9092 --topic fantasy_novels --property parse.key=<span class="hljs-literal">true</span> --property key.separator=: --property partitioner.class=org.apache.kafka.clients.producer.RoundRobinPartitioner

tolkien:The Lord of the Rings
le_guin:A Wizard of Earthsea
leckie:The Raven Tower
de_bodard:The House of Shattered Wings
okorafor:Who Fears Death
liu:The Grace of Kings
</code></pre>
<p><img src="https://www.freecodecamp.org/news/content/images/2023/01/messages-spread-across-consumer-group.PNG" alt="Messages spread between consumers in a consumer group" width="600" height="400" loading="lazy"></p>
<p><em>Messages spread between consumers in a consumer group</em></p>
<p>If you stop one of the consumers, the consumer group will rebalance and future messages will be sent to the remaining consumers.</p>
<p>Now that you have some experience working with Kafka from the command line, the next step is to build a small application that connects to Kafka.</p>
<h2 id="heading-how-to-build-a-kafka-client-app-with-java">How to Build a Kafka Client App with Java</h2>
<p>We're going to build a simple Java app that both produces messages to and consumes messages from Kafka. For this we'll use the official Kafka Java client.</p>
<p>If at any point you get stuck, the full code for this project is <a target="_blank" href="https://github.com/gerhynes/kafka-java-app">available on GitHub</a>.</p>
<h3 id="heading-preliminaries">Preliminaries</h3>
<p>First of all, make sure you have Java (at least JDK 11) and Kafka installed.</p>
<p>We're going to send messages about characters from <em>The Lord of the Rings</em>. So let's create a topic for these messages with three partitions.</p>
<p>From the command line, run:</p>
<pre><code class="lang-bash">kafka-topics.sh --bootstrap-server localhost:9092 --create --topic lotr_characters --partitions 3
</code></pre>
<h3 id="heading-how-to-set-up-the-project">How to Set Up the Project</h3>
<p>I recommend using IntelliJ for Java projects, so go ahead and install the Community Edition if you don't already have it. You can download it from <a target="_blank" href="https://www.jetbrains.com/idea/">jetbrains.com/idea</a></p>
<p>In Intellij, select <code>File</code>, <code>New</code>, and <code>Project</code>.</p>
<p>Give your project a name and select a location for it on your computer. Make sure you have selected Java as the language, Maven as the build system, and that the JDK is at least Java 11. Then click <code>Create</code>.</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2023/01/new-maven-project.PNG" alt="Setting up a Maven project in IntelliJ" width="600" height="400" loading="lazy"></p>
<p><em>Setting up a Maven project in IntelliJ</em></p>
<p><strong>Note:</strong> If you're on Windows, IntelliJ can't use a JDK installed on WSL. To install Java on the Windows side of things, go to <a target="_blank" href="https://docs.aws.amazon.com/corretto/latest/corretto-11-ug/downloads-list.html">docs.aws.amazon.com/corretto/latest/corretto-11-ug/downloads-list</a> and download the Windows installer. Follow the installation steps, open a command prompt, and run <code>java -version</code>. You should see something like:</p>
<pre><code class="lang-python">openjdk version <span class="hljs-string">"11.0.18"</span> <span class="hljs-number">2023</span><span class="hljs-number">-01</span><span class="hljs-number">-17</span> LTS
OpenJDK Runtime Environment Corretto<span class="hljs-number">-11.0</span><span class="hljs-number">.18</span><span class="hljs-number">.10</span><span class="hljs-number">.1</span> (build <span class="hljs-number">11.0</span><span class="hljs-number">.18</span>+<span class="hljs-number">10</span>-LTS)
OpenJDK <span class="hljs-number">64</span>-Bit Server VM Corretto<span class="hljs-number">-11.0</span><span class="hljs-number">.18</span><span class="hljs-number">.10</span><span class="hljs-number">.1</span> (build <span class="hljs-number">11.0</span><span class="hljs-number">.18</span>+<span class="hljs-number">10</span>-LTS, mixed mode)
</code></pre>
<p>Once your Maven project finishes setting up, run the <code>Main</code> class to see "Hello world!" and make sure everything worked.</p>
<h3 id="heading-how-to-install-the-dependencies">How to Install the Dependencies</h3>
<p>Next, we're going to install our dependencies. Open up <code>pom.xml</code> and inside the <code>&lt;project&gt;</code> element, create a <code>&lt;dependencies&gt;</code> element.</p>
<p>We're going to use the Java Kafka client for interacting with Kafka and SLF4J for logging, so add the following inside your <code>&lt;dependencies&gt;</code> element:</p>
<pre><code class="lang-xml"><span class="hljs-comment">&lt;!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --&gt;</span>  
<span class="hljs-tag">&lt;<span class="hljs-name">dependency</span>&gt;</span>  
    <span class="hljs-tag">&lt;<span class="hljs-name">groupId</span>&gt;</span>org.apache.kafka<span class="hljs-tag">&lt;/<span class="hljs-name">groupId</span>&gt;</span>  
    <span class="hljs-tag">&lt;<span class="hljs-name">artifactId</span>&gt;</span>kafka-clients<span class="hljs-tag">&lt;/<span class="hljs-name">artifactId</span>&gt;</span>  
    <span class="hljs-tag">&lt;<span class="hljs-name">version</span>&gt;</span>3.3.1<span class="hljs-tag">&lt;/<span class="hljs-name">version</span>&gt;</span>  
<span class="hljs-tag">&lt;/<span class="hljs-name">dependency</span>&gt;</span>  
<span class="hljs-comment">&lt;!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api --&gt;</span>  
<span class="hljs-tag">&lt;<span class="hljs-name">dependency</span>&gt;</span>  
    <span class="hljs-tag">&lt;<span class="hljs-name">groupId</span>&gt;</span>org.slf4j<span class="hljs-tag">&lt;/<span class="hljs-name">groupId</span>&gt;</span>  
    <span class="hljs-tag">&lt;<span class="hljs-name">artifactId</span>&gt;</span>slf4j-api<span class="hljs-tag">&lt;/<span class="hljs-name">artifactId</span>&gt;</span>  
    <span class="hljs-tag">&lt;<span class="hljs-name">version</span>&gt;</span>2.0.6<span class="hljs-tag">&lt;/<span class="hljs-name">version</span>&gt;</span>  
<span class="hljs-tag">&lt;/<span class="hljs-name">dependency</span>&gt;</span>  
<span class="hljs-comment">&lt;!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-simple --&gt;</span>  
<span class="hljs-tag">&lt;<span class="hljs-name">dependency</span>&gt;</span>  
    <span class="hljs-tag">&lt;<span class="hljs-name">groupId</span>&gt;</span>org.slf4j<span class="hljs-tag">&lt;/<span class="hljs-name">groupId</span>&gt;</span>  
    <span class="hljs-tag">&lt;<span class="hljs-name">artifactId</span>&gt;</span>slf4j-simple<span class="hljs-tag">&lt;/<span class="hljs-name">artifactId</span>&gt;</span>  
    <span class="hljs-tag">&lt;<span class="hljs-name">version</span>&gt;</span>2.0.6<span class="hljs-tag">&lt;/<span class="hljs-name">version</span>&gt;</span>  
<span class="hljs-tag">&lt;/<span class="hljs-name">dependency</span>&gt;</span>
</code></pre>
<p>The package names and version numbers might be red, meaning you haven't downloaded them yet. If this happens, click on <code>View</code>, <code>Tool Windows</code>, and <code>Maven</code> to open the Maven menu. Click on the <code>Reload All Maven Projects</code> icon and Maven will install these dependencies.</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2023/01/reload-maven.png" alt="Reloading Maven dependencies in IntelliJ" width="600" height="400" loading="lazy"></p>
<p><em>Reloading Maven dependencies in IntelliJ</em></p>
<p>Create a <code>HelloKafka</code> class in the same directory as your <code>Main</code> class and give it the following contents:</p>
<pre><code class="lang-java"><span class="hljs-keyword">package</span> org.example;

<span class="hljs-keyword">import</span> org.slf4j.Logger;  
<span class="hljs-keyword">import</span> org.slf4j.LoggerFactory;  

<span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">HelloKafka</span> </span>{  
    <span class="hljs-keyword">private</span> <span class="hljs-keyword">static</span> <span class="hljs-keyword">final</span> Logger log = LoggerFactory.getLogger(HelloKafka.class);  

    <span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> <span class="hljs-title">main</span><span class="hljs-params">(String[] args)</span> </span>{  
        log.info(<span class="hljs-string">"Hello Kafka"</span>);  
    }  
}
</code></pre>
<p>To make sure your dependencies are installed, run this class and you should see <code>[main] INFO org.example.HelloKafka - Hello Kafka</code> printed to the IntelliJ console.</p>
<h3 id="heading-how-to-create-a-kafka-producer">How to Create a Kafka Producer</h3>
<p>Next, we're going to create a <code>Producer</code> class. You can call this whatever you want as long as it doesn't clash with another class. So don't use <code>KafkaProducer</code> as you'll need that class in a minute.</p>
<pre><code class="lang-java"><span class="hljs-keyword">package</span> org.example;  

<span class="hljs-keyword">import</span> org.slf4j.Logger;  
<span class="hljs-keyword">import</span> org.slf4j.LoggerFactory;  

<span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">Producer</span> </span>{  
    <span class="hljs-keyword">private</span> <span class="hljs-keyword">static</span> <span class="hljs-keyword">final</span> Logger log = LoggerFactory.getLogger(KafkaProducer.class);  

    <span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> <span class="hljs-title">main</span><span class="hljs-params">(String[] args)</span> </span>{  
        log.info(<span class="hljs-string">"This class will produce messages to Kafka"</span>);  
    }  
}
</code></pre>
<p>All of our Kafka-specific code is going to go inside this class's <code>main()</code> method.</p>
<p>The first thing we need to do is configure a few properties for the producer. Add the following inside the <code>main()</code> method:</p>
<pre><code class="lang-java">Properties properties = <span class="hljs-keyword">new</span> Properties(); 

properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, <span class="hljs-string">"localhost:9092"</span>);  
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());  
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
</code></pre>
<p><code>Properties</code> stores a set of properties as pairs of strings. The ones we're using are:</p>
<ul>
<li><p><code>ProducerConfig.BOOTSTRAP_SERVERS_CONFIG</code> which specifies the IP address to use to access the Kafka cluster</p>
</li>
<li><p><code>ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG</code> which specifies the serializer to use for message keys</p>
</li>
<li><p><code>ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG</code> which specifies the serializer to use for message values</p>
</li>
</ul>
<p>We're going to connect to our local Kafka cluster running on <code>localhost:9092</code>, and use the <code>StringSerializer</code> since both our keys and values will be strings.</p>
<p>Now we can create our producer and pass it the configuration properties.</p>
<pre><code class="lang-java">KafkaProducer&lt;String, String&gt; producer = <span class="hljs-keyword">new</span> KafkaProducer&lt;&gt;(properties);
</code></pre>
<p>To send a message, we need to create a <code>ProducerRecord</code> and pass it to our producer. <code>ProducerRecord</code> contains a topic name, and optionally a key, value, and partition number.</p>
<p>We're going to create the <code>ProducerRecord</code> with the topic to use, the message's key, and the message's value.</p>
<pre><code class="lang-java">ProducerRecord&lt;String, String&gt; producerRecord = <span class="hljs-keyword">new</span> ProducerRecord&lt;&gt;(<span class="hljs-string">"lotr_characters"</span>, <span class="hljs-string">"hobbits"</span>, <span class="hljs-string">"Bilbo"</span>);
</code></pre>
<p>We can now use the producer's <code>send()</code> method to send the message to Kafka.</p>
<pre><code class="lang-java">producer.send(producerRecord);
</code></pre>
<p>Finally, we need to call the <code>close()</code> method to stop the producer. This method handles any messages currently being processed by <code>send()</code> and then closes the producer.</p>
<pre><code class="lang-java">producer.close();
</code></pre>
<p>Now it's time to run our producer. <strong>Make sure you have Zookeeper and Kafka running.</strong> Then run the <code>main()</code> method of the <code>Producer</code> class.</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2023/01/java-producer-single-message.PNG" alt="Sending a message from a producer in a Java Kafka client app." width="600" height="400" loading="lazy"></p>
<p><em>Sending a message from a producer in a Java Kafka client app</em></p>
<p><strong>Note:</strong> On Windows, your producer might not be able to connect to a Kafka broker running on WSL. To fix this, you're going to need to do the following:</p>
<ul>
<li><p>In a WSL terminal, navigate to Kafka's config folder: <code>cd ~/kafka_2.13-3.3.1/config/</code></p>
</li>
<li><p>Open <code>server.properties</code>, for example with Nano: <code>nano server.properties</code></p>
</li>
<li><p>Uncomment <code>#listeners=PLAINTEXT//:9092</code></p>
</li>
<li><p>Replace it with <code>listeners=PLAINTEXT//[::1]:9092</code></p>
</li>
<li><p>In your <code>Producer</code> class, replace <code>"localhost:9092"</code> with <code>"[::1]:9092"</code></p>
</li>
</ul>
<p><code>[::1]</code>, or <code>0:0:0:0:0:0:0:1</code>, refers to the loopback address (or localhost) in IPv6. This is equivalent to <code>127.0.0.1</code> in IPv4.</p>
<p>If you change <code>listeners</code>, when you try to access the Kafka broker from the command line you'll also have to use the new IP address, so use <code>--bootstrap-server ::1:9092</code> instead of <code>--bootstrap-server localhost:9092</code> and it should work.</p>
<p>We can now check that <code>Producer</code> worked by using <code>kafka-console-consumer</code> in another terminal window to read from the <code>lotr_characters</code> topic and see the message printed to the console.</p>
<pre><code class="lang-bash">kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic lotr_characters --from-beginning
</code></pre>
<p><img src="https://www.freecodecamp.org/news/content/images/2023/01/consumer-reading-single-message.PNG" alt="kafka-console-consumer reading the message sent by the producer in our Java app" width="600" height="400" loading="lazy"></p>
<p><em>kafka-console-consumer reading the message sent by the producer in our Java app</em></p>
<h3 id="heading-how-to-send-multiple-messages-and-use-callbacks">How to Send Multiple Messages and Use Callbacks</h3>
<p>So far we're only sending one message. If we update <code>Producer</code> to send multiple messages, we'll be able to see how keys are used to divide messages between partitions. We can also take this opportunity to use a callback to view the sent message's metadata.</p>
<p>To do this, we're going to loop over a collection of characters to generate our messages.</p>
<p>So replace this:</p>
<pre><code class="lang-java">ProducerRecord&lt;String, String&gt; producerRecord = <span class="hljs-keyword">new</span> ProducerRecord&lt;&gt;(<span class="hljs-string">"lotr_characters"</span>, <span class="hljs-string">"hobbits"</span>, <span class="hljs-string">"Bilbo"</span>);  

producer.send(producerRecord);
</code></pre>
<p>with this:</p>
<pre><code class="lang-java">HashMap&lt;String, String&gt; characters = <span class="hljs-keyword">new</span> HashMap&lt;String, String&gt;();  
characters.put(<span class="hljs-string">"hobbits"</span>, <span class="hljs-string">"Frodo"</span>);  
characters.put(<span class="hljs-string">"hobbits"</span>, <span class="hljs-string">"Sam"</span>);  
characters.put(<span class="hljs-string">"elves"</span>, <span class="hljs-string">"Galadriel"</span>);  
characters.put(<span class="hljs-string">"elves"</span>, <span class="hljs-string">"Arwen"</span>);
characters.put(<span class="hljs-string">"humans"</span>, <span class="hljs-string">"Éowyn"</span>);  
characters.put(<span class="hljs-string">"humans"</span>, <span class="hljs-string">"Faramir"</span>);

<span class="hljs-keyword">for</span> (HashMap.Entry&lt;String, String&gt; character : characters.entrySet()) {  
    ProducerRecord&lt;String, String&gt; producerRecord = <span class="hljs-keyword">new</span> ProducerRecord&lt;&gt;(<span class="hljs-string">"lotr_characters"</span>, character.getKey(), character.getValue());  

    producer.send(producerRecord, (RecordMetadata recordMetadata, Exception err) -&gt; {  
        <span class="hljs-keyword">if</span> (err == <span class="hljs-keyword">null</span>) {  
            log.info(<span class="hljs-string">"Message received. \n"</span> +  
                    <span class="hljs-string">"topic ["</span> + recordMetadata.topic() + <span class="hljs-string">"]\n"</span> +  
                    <span class="hljs-string">"partition ["</span> + recordMetadata.partition() + <span class="hljs-string">"]\n"</span> +  
                    <span class="hljs-string">"offset ["</span> + recordMetadata.offset() + <span class="hljs-string">"]\n"</span> +  
                    <span class="hljs-string">"timestamp ["</span> + recordMetadata.timestamp() + <span class="hljs-string">"]"</span>);  
        } <span class="hljs-keyword">else</span> {  
            log.error(<span class="hljs-string">"An error occurred while producing messages"</span>, err);  
        }  
    });  
}
</code></pre>
<p>Here, we're iterating over the collection, creating a <code>ProducerRecord</code> for each entry, and passing the record to <code>send()</code>. Behind the scenes, Kafka will batch these messages together to make fewer network requests. <code>send()</code> can also take a callback as a second argument. We're going to pass it a lambda which will run code when the <code>send()</code> request completes.</p>
<p>If the request completed successfully, we get back a <code>RecordMetadata</code> object with metadata about the message, which we can use to see things such as the partition and offset the message ended up in.</p>
<p>If we get back an exception, we could handle it by retrying to send the message, or alerting our application. In this case, we're just going to log the exception.</p>
<p>Run the <code>main()</code> method of the <code>Producer</code> class and you should see the message metadata get logged.</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2023/01/java-producer.PNG" alt="Image" width="600" height="400" loading="lazy"></p>
<p>The full code for the <code>Producer</code> class should now be:</p>
<pre><code class="lang-java"><span class="hljs-keyword">package</span> org.example;  

<span class="hljs-keyword">import</span> org.apache.kafka.clients.producer.KafkaProducer;  
<span class="hljs-keyword">import</span> org.apache.kafka.clients.producer.ProducerConfig;  
<span class="hljs-keyword">import</span> org.apache.kafka.clients.producer.ProducerRecord;  
<span class="hljs-keyword">import</span> org.apache.kafka.clients.producer.RecordMetadata;  
<span class="hljs-keyword">import</span> org.apache.kafka.common.serialization.StringSerializer;  
<span class="hljs-keyword">import</span> org.slf4j.Logger;  
<span class="hljs-keyword">import</span> org.slf4j.LoggerFactory;  

<span class="hljs-keyword">import</span> java.util.HashMap;  
<span class="hljs-keyword">import</span> java.util.Properties;  

<span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">Producer</span> </span>{  
    <span class="hljs-keyword">private</span> <span class="hljs-keyword">static</span> <span class="hljs-keyword">final</span> Logger log = LoggerFactory.getLogger(Producer.class);  

    <span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> <span class="hljs-title">main</span><span class="hljs-params">(String[] args)</span> </span>{  
        log.info(<span class="hljs-string">"This class produces messages to Kafka"</span>);  

        Properties properties = <span class="hljs-keyword">new</span> Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, <span class="hljs-string">"localhost:9092"</span>); 
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());  
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());  

        KafkaProducer&lt;String, String&gt; producer = <span class="hljs-keyword">new</span> KafkaProducer&lt;&gt;(properties);  

        HashMap&lt;String, String&gt; characters = <span class="hljs-keyword">new</span> HashMap&lt;String, String&gt;();  
        characters.put(<span class="hljs-string">"hobbits"</span>, <span class="hljs-string">"Frodo"</span>);  
        characters.put(<span class="hljs-string">"hobbits"</span>, <span class="hljs-string">"Sam"</span>);  
        characters.put(<span class="hljs-string">"elves"</span>, <span class="hljs-string">"Galadriel"</span>);  
        characters.put(<span class="hljs-string">"elves"</span>, <span class="hljs-string">"Arwen"</span>);
        characters.put(<span class="hljs-string">"humans"</span>, <span class="hljs-string">"Éowyn"</span>);  
        characters.put(<span class="hljs-string">"humans"</span>, <span class="hljs-string">"Faramir"</span>); 

        <span class="hljs-keyword">for</span> (HashMap.Entry&lt;String, String&gt; character : characters.entrySet()) {  
            ProducerRecord&lt;String, String&gt; producerRecord = <span class="hljs-keyword">new</span> ProducerRecord&lt;&gt;(<span class="hljs-string">"lotr_characters"</span>, character.getKey(), character.getValue());  

            producer.send(producerRecord, (RecordMetadata recordMetadata, Exception err) -&gt; {  
                <span class="hljs-keyword">if</span> (err == <span class="hljs-keyword">null</span>) {  
                    log.info(<span class="hljs-string">"Message received. \n"</span> +  
                            <span class="hljs-string">"topic ["</span> + recordMetadata.topic() + <span class="hljs-string">"]\n"</span> +  
                            <span class="hljs-string">"partition ["</span> + recordMetadata.partition() + <span class="hljs-string">"]\n"</span> +  
                            <span class="hljs-string">"offset ["</span> + recordMetadata.offset() + <span class="hljs-string">"]\n"</span> +  
                            <span class="hljs-string">"timestamp ["</span> + recordMetadata.timestamp() + <span class="hljs-string">"]"</span>);  
                } <span class="hljs-keyword">else</span> {  
                    log.error(<span class="hljs-string">"An error occurred while producing messages"</span>, err);  
                }  
            });  
        }
        producer.close();  
    }  
}
</code></pre>
<p>Next, we're going to create a consumer to read these messages from Kafka.</p>
<h3 id="heading-how-to-create-a-kafka-consumer">How to Create a Kafka Consumer</h3>
<p>First, create a <code>Consumer</code> class. Again, you can call it whatever you want, but don't call it <code>KafkaConsumer</code> as you will need that class in a moment.</p>
<p>All the Kafka-specific code will go in <code>Consumer</code>'s <code>main()</code> method.</p>
<pre><code class="lang-java"><span class="hljs-keyword">package</span> org.example;  

<span class="hljs-keyword">import</span> org.slf4j.Logger;  
<span class="hljs-keyword">import</span> org.slf4j.LoggerFactory;  

<span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">Consumer</span> </span>{  
    <span class="hljs-keyword">private</span> <span class="hljs-keyword">static</span> <span class="hljs-keyword">final</span> Logger log = LoggerFactory.getLogger(Consumer.class);  

    <span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> <span class="hljs-title">main</span><span class="hljs-params">(String[] args)</span> </span>{  
        log.info(<span class="hljs-string">"This class consumes messages from Kafka"</span>);  
    }  
}
</code></pre>
<p>Next, configure the consumer properties.</p>
<pre><code class="lang-java">Properties properties = <span class="hljs-keyword">new</span> Properties();  
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, <span class="hljs-string">"localhost:9092"</span>);  
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());  
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());  
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, <span class="hljs-string">"lotr_consumer_group"</span>);  
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, <span class="hljs-string">"earliest"</span>);
</code></pre>
<p>Just like with <code>Producer</code>, these properties are a set of string pairs. The ones we're using are:</p>
<ul>
<li><p><code>ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG</code> which specifies the IP address to use to access the Kafka cluster</p>
</li>
<li><p><code>ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG</code> which specifies the deserializer to use for message keys</p>
</li>
<li><p><code>ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG</code> which specifies the deserializer to use for message values</p>
</li>
<li><p><code>ConsumerConfig.GROUP_ID_CONFIG</code> which specifies the consumer group this consumer belongs to</p>
</li>
<li><p><code>ConsumerConfig.AUTO_OFFSET_RESET_CONFIG</code> which specifies the offset to start reading from</p>
</li>
</ul>
<p>We're connecting to the Kafka cluster on <code>localhost:9092</code>, using string deserializers since our keys and values are strings, setting a group id for our consumer, and telling the consumer to read from the start of the topic.</p>
<p><strong>Note:</strong> If you're running the consumer on Windows and accessing a Kafka broker running on WSL, you'll need to change <code>"localhost:9091"</code> to <code>"[::1]:9092"</code> or <code>"0:0:0:0:0:0:0:1:9092"</code>, like you did in <code>Producer</code>.</p>
<p>Next, we create a <code>KafkaConsumer</code> and pass it the configuration properties.</p>
<pre><code class="lang-java">KafkaConsumer&lt;String, String&gt; consumer = <span class="hljs-keyword">new</span> KafkaConsumer&lt;&gt;(properties);
</code></pre>
<p>We need to tell the consumer which topic, or topics, to subscribe to. The <code>subscribe()</code> method takes in a collection of one or more strings, naming the topics you want to read from. Remember, consumers can subscribe to more than one topic at the same time. For this example, we'll use one topic, the <code>lotr_characters</code> topic.</p>
<pre><code class="lang-java">String topic = <span class="hljs-string">"lotr_characters"</span>;  

consumer.subscribe(Arrays.asList(topic));
</code></pre>
<p>The consumer is now ready to start reading messages from the topic. It does this by regularly polling for new messages.</p>
<p>We'll use a while loop to repeatedly call the <code>poll()</code> method to check for new messages.</p>
<p><code>poll()</code> takes in a duration for how long it should read for at a time. It then batches these messages into an iterable called <code>ConsumerRecords</code>. We can then iterate over <code>ConsumerRecords</code> and do something with each individual <code>ConsumerRecord</code>.</p>
<p>In a real-world application, we would process this data or send it to some further destination, like a database or data pipeline. Here, we're just going to log the key, value, partition, and offset for each message we receive.</p>
<pre><code class="lang-java"><span class="hljs-keyword">while</span>(<span class="hljs-keyword">true</span>){  
    ConsumerRecords&lt;String, String&gt; messages = consumer.poll(Duration.ofMillis(<span class="hljs-number">100</span>));  

    <span class="hljs-keyword">for</span> (ConsumerRecord&lt;String, String&gt; message : messages){  
        log.info(<span class="hljs-string">"key ["</span> + message.key() + <span class="hljs-string">"] value ["</span> + message.value() +<span class="hljs-string">"]"</span>);  
        log.info(<span class="hljs-string">"partition ["</span> + message.partition() + <span class="hljs-string">"] offset ["</span> + message.offset() + <span class="hljs-string">"]"</span>);  
    }  
}
</code></pre>
<p>Now it's time to run our consumer. <strong>Make sure you have Zookeeper and Kafka running.</strong> Run the <code>Consumer</code> class and you'll see the messages that <code>Producer</code> previously sent to the <code>lotr_characters</code> topic in Kafka.</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2023/01/java-consumer-reading-from-topic.PNG" alt="The Kafka client app consuming messages that were previously produced to Kafka." width="600" height="400" loading="lazy"></p>
<p><em>The Kafka client app consuming messages that were previously produced to Kafka</em></p>
<h3 id="heading-how-to-shut-down-the-consumer">How to Shut Down the Consumer</h3>
<p>Right now, our consumer is running in an infinite loop and polling for new messages every 100 ms. This isn't a problem, but we should add safeguards to handle shutting down the consumer if an exception occurs.</p>
<p>We're going to wrap our code in a try-catch-finally block. If an exception occurs, we can handle it in the <code>catch</code> block.</p>
<p>The <code>finally</code> block will then call the consumer's <code>close()</code> method. This will close the socket the consumer is using, commit the offsets it has processed, and trigger a consumer group rebalance so any other consumers in the group can take over reading the partitions this consumer was handling.</p>
<pre><code class="lang-java"><span class="hljs-keyword">try</span> {
            <span class="hljs-comment">// subscribe to topic(s)</span>
            String topic = <span class="hljs-string">"lotr_characters"</span>;
            consumer.subscribe(Arrays.asList(topic));

            <span class="hljs-keyword">while</span> (<span class="hljs-keyword">true</span>) {
                <span class="hljs-comment">// poll for new messages</span>
                ConsumerRecords&lt;String, String&gt; messages = consumer.poll(Duration.ofMillis(<span class="hljs-number">100</span>));

                <span class="hljs-comment">// handle message contents</span>
                <span class="hljs-keyword">for</span> (ConsumerRecord&lt;String, String&gt; message : messages) {
                    log.info(<span class="hljs-string">"key ["</span> + message.key() + <span class="hljs-string">"] value ["</span> + message.value() + <span class="hljs-string">"]"</span>);
                    log.info(<span class="hljs-string">"partition ["</span> + message.partition() + <span class="hljs-string">"] offset ["</span> + message.offset() + <span class="hljs-string">"]"</span>);
                }
            }
        } <span class="hljs-keyword">catch</span> (Exception err) {
            <span class="hljs-comment">// catch and handle exceptions</span>
            log.error(<span class="hljs-string">"Error: "</span>, err);
        } <span class="hljs-keyword">finally</span> {
            <span class="hljs-comment">// close consumer and commit offsets</span>
            consumer.close();
            log.info(<span class="hljs-string">"consumer is now closed"</span>);
        }
</code></pre>
<p><code>Consumer</code> will continuously poll its assigned topics for new messages and shut down safely if it experiences an exception.</p>
<p>The full code for the <code>Consumer</code> class should now be:</p>
<pre><code class="lang-java"><span class="hljs-keyword">package</span> org.example;

<span class="hljs-keyword">import</span> org.apache.kafka.clients.consumer.ConsumerConfig;
<span class="hljs-keyword">import</span> org.apache.kafka.clients.consumer.ConsumerRecord;
<span class="hljs-keyword">import</span> org.apache.kafka.clients.consumer.ConsumerRecords;
<span class="hljs-keyword">import</span> org.apache.kafka.clients.consumer.KafkaConsumer;
<span class="hljs-keyword">import</span> org.apache.kafka.common.serialization.StringDeserializer;
<span class="hljs-keyword">import</span> org.slf4j.Logger;
<span class="hljs-keyword">import</span> org.slf4j.LoggerFactory;

<span class="hljs-keyword">import</span> java.time.Duration;
<span class="hljs-keyword">import</span> java.util.Arrays;
<span class="hljs-keyword">import</span> java.util.Properties;

<span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">Consumer</span> </span>{
    <span class="hljs-keyword">private</span> <span class="hljs-keyword">static</span> <span class="hljs-keyword">final</span> Logger log = LoggerFactory.getLogger(Consumer.class);

    <span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> <span class="hljs-title">main</span><span class="hljs-params">(String[] args)</span> </span>{
        log.info(<span class="hljs-string">"This class consumes messages from Kafka"</span>);

        Properties properties = <span class="hljs-keyword">new</span> Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, <span class="hljs-string">"localhost:9092"</span>);
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "lotr_consumer_group");
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        KafkaConsumer&lt;String, String&gt; consumer = <span class="hljs-keyword">new</span> KafkaConsumer&lt;&gt;(properties);

        <span class="hljs-keyword">try</span> {
            String topic = <span class="hljs-string">"lotr_characters"</span>;
            consumer.subscribe(Arrays.asList(topic));

            <span class="hljs-keyword">while</span> (<span class="hljs-keyword">true</span>) {
                ConsumerRecords&lt;String, String&gt; messages = consumer.poll(Duration.ofMillis(<span class="hljs-number">100</span>));

                <span class="hljs-keyword">for</span> (ConsumerRecord&lt;String, String&gt; message : messages) {
                    log.info(<span class="hljs-string">"key ["</span> + message.key() + <span class="hljs-string">"] value ["</span> + message.value() + <span class="hljs-string">"]"</span>);
                    log.info(<span class="hljs-string">"partition ["</span> + message.partition() + <span class="hljs-string">"] offset ["</span> + message.offset() + <span class="hljs-string">"]"</span>);
                }
            }
        } <span class="hljs-keyword">catch</span> (Exception err) {
            log.error(<span class="hljs-string">"Error: "</span>, err);
        } <span class="hljs-keyword">finally</span> {
            consumer.close();
            log.info(<span class="hljs-string">"The consumer is now closed"</span>);
        }
    }
}
</code></pre>
<p>You now have a basic Java application that can send messages to and read messages from Kafka. If you got stuck at any point, <a target="_blank" href="https://github.com/gerhynes/kafka-java-app">the full code is available on GitHub</a>.</p>
<h2 id="heading-where-to-take-it-from-here">Where to Take it from Here</h2>
<p>Congratulations on making it this far. You've learned:</p>
<ul>
<li><p>the main concepts behind Kafka</p>
</li>
<li><p>how to communicate with Kafka from the command line</p>
</li>
<li><p>how to build a Java app that produces to and consumes from Kafka</p>
</li>
</ul>
<p>There's plenty more to learn about Kafka, whether that's <a target="_blank" href="https://kafka.apache.org/documentation/#connect">Kafka Connect</a> for connecting Kafka to common data systems or the <a target="_blank" href="https://kafka.apache.org/documentation/streams/">Kafka Streams API</a> for processing and transforming your data.</p>
<p>Some resources you might find useful as you continue your journey with Kafka are:</p>
<ul>
<li><p>the <a target="_blank" href="https://kafka.apache.org/documentation/">official Kafka docs</a></p>
</li>
<li><p><a target="_blank" href="https://developer.confluent.io/learn-kafka/">courses from Confluent</a></p>
</li>
<li><p><a target="_blank" href="https://www.conduktor.io/kafka">Conduktor's kafkademy</a></p>
</li>
</ul>
<p>I hope this guide has been helpful and made you excited to learn more about Kafka, event streaming, and real-time data processing.</p>
 ]]>
                </content:encoded>
            </item>
        
            <item>
                <title>
                    <![CDATA[ How to implement Change Data Capture using Kafka Streams ]]>
                </title>
                <description>
                    <![CDATA[ By Luca Florio Change Data Capture (CDC) involves observing the changes happening in a database and making them available in a form that can be exploited by other systems.  One of the most interesting use-cases is to make them available as a stream o... ]]>
                </description>
                <link>https://www.freecodecamp.org/news/how-to-implement-the-change-data-capture-pattern-using-kafka-streams/</link>
                <guid isPermaLink="false">66d45e444a7504b7409c3398</guid>
                
                    <category>
                        <![CDATA[ Apache Kafka ]]>
                    </category>
                
                    <category>
                        <![CDATA[ cdc ]]>
                    </category>
                
                    <category>
                        <![CDATA[ change data capture ]]>
                    </category>
                
                    <category>
                        <![CDATA[ elasticsearch ]]>
                    </category>
                
                    <category>
                        <![CDATA[ Kafka streams ]]>
                    </category>
                
                    <category>
                        <![CDATA[ MongoDB ]]>
                    </category>
                
                    <category>
                        <![CDATA[ Scala ]]>
                    </category>
                
                    <category>
                        <![CDATA[ stream processing ]]>
                    </category>
                
                <dc:creator>
                    <![CDATA[ freeCodeCamp ]]>
                </dc:creator>
                <pubDate>Mon, 20 Jan 2020 21:10:30 +0000</pubDate>
                <media:content url="https://cdn-media-2.freecodecamp.org/w1280/5f9c9dac740569d1a4ca3900.jpg" medium="image" />
                <content:encoded>
                    <![CDATA[ <p>By Luca Florio</p>
<p><strong>Change Data Capture</strong> (CDC) involves observing the changes happening in a database and making them available in a form that can be exploited by other systems. </p>
<p>One of the most interesting use-cases is to make them available as a stream of events. This means you can, for example, catch the events and update a search index as the data are written to the database.</p>
<p>Interesting right? Let's see how to implement a CDC system that can observe the changes made to a NoSQL database (<strong>MongoDB</strong>), stream them through a message broker (<strong>Kafka</strong>), process the messages of the stream (<strong>Kafka Streams</strong>), and update a search index (<strong>Elasticsearch</strong>)!?</p>
<h2 id="heading-tldr">TL;DR</h2>
<p>The full code of the project is available on GitHub in this <a target="_blank" href="https://github.com/elleFlorio/kafka-streams-playground">repository</a>. If you want to skip all my jibber jabber and just run the example, go straight to the 
<strong>How to run the project</strong> section near the end of the article!?</p>
<h1 id="heading-use-case-amp-infrastructure">Use case &amp; infrastructure</h1>
<p>We run a web application that stores photos uploaded by users. People can share their shots, let others download them, create albums, and so on. Users can also provide a description of their photos, as well as Exif metadata and other useful information. </p>
<p>We want to store such information and use it to improve our search engine. We will focus on this part of our system that is depicted in the following diagram.</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2019/12/kafka-stream-playground.png" alt="Image" width="600" height="400" loading="lazy">
<em>Photo information storage architecture</em></p>
<p>The information is provided in <code>JSON</code> format. Since I like to post my shots on <a target="_blank" href="https://unsplash.com/">Unsplash</a>, and the website provides free access to its API, I used their model for the photo <code>JSON</code> document.</p>
<p>Once the <code>JSON</code> is sent through a <code>POST</code> request to our server, we store the document inside a <strong>MongoDB</strong> database. We will also store it in <strong>Elasticsearch</strong> for indexing and quick search. </p>
<p>However, we love <strong>long exposure shots</strong>, and we would like to store in a separate index a subset of information regarding this kind of photo. It can be the exposure time, as well as the location (latitude and longitude) where the photo has been taken. In this way, we can create a map of locations where photographers usually take long exposure photos.</p>
<p>Here comes the interesting part: instead of explicitly calling Elasticsearch in our code once the photo info is stored in MongoDB, we can implement a <strong>CDC</strong> exploiting Kafka and <strong>Kafka Streams</strong>. </p>
<p>We listen to modifications to MongoDB <strong>oplog</strong> using the interface provided by MongoDB itself. When the photo is stored we send it to a <code>photo</code> Kafka topic. Using <strong>Kafka Connect</strong>, an Elasticsearch sink is configured to save everything sent to that topic to a specific index. In this way, we can index all photos stored in MongoDB automatically.</p>
<p>We need to take care of the long exposure photos too. It requires some processing of the information to extract what we need. For this reason, we use Kafka Streams to create a <strong>processing topology</strong> to:</p>
<ol>
<li>Read from the <code>photo</code> topic</li>
<li>Extract Exif and location information</li>
<li>Filter long exposure photos (exposure time &gt; 1 sec.)</li>
<li>Write to a <code>long-exposure</code> topic.</li>
</ol>
<p>Then another Elasticsearch sink will read data from the <code>long-exposure</code> topic and write it to a specific index in Elasticsearch.</p>
<p>It is quite simple, but it's enough to have fun with CDC and Kafka Streams! ?</p>
<h1 id="heading-server-implementation">Server implementation</h1>
<p>Let's have a look at what we need to implement: our server exposing the <strong>REST API</strong>s!</p>
<h3 id="heading-models-and-dao">Models and DAO</h3>
<p>First things first, we need a model of our data and a <strong>Data Access Object</strong> (DAO) to talk to our MongoDB database. </p>
<p>As I said, the model for the photo <code>JSON</code> information is the one used by Unsplash. Check out the free API <a target="_blank" href="https://unsplash.com/documentation#get-a-photo">documentation</a> for an example of the <code>JSON</code> we will use. </p>
<p>I created the mapping for the serializaion/deserialization of the photo <code>JSON</code> using <a target="_blank" href="https://github.com/spray/spray-json">spray-json</a>. I'll skip the details about this, if you are curious just look at the <a target="_blank" href="https://github.com/elleFlorio/kafka-streams-playground/tree/master/src/main/scala/com/elleflorio/kafka/streams/playground/dao/model/unsplash">repo</a>!</p>
<p>Let's focus on the model for the long exposusure photo.</p>
<pre><code class="lang-scala"><span class="hljs-keyword">case</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">LongExposurePhoto</span>(<span class="hljs-params">id: <span class="hljs-type">String</span>, exposureTime: <span class="hljs-type">Float</span>, createdAt: <span class="hljs-type">Date</span>, location: <span class="hljs-type">Location</span></span>)</span>

<span class="hljs-class"><span class="hljs-keyword">object</span> <span class="hljs-title">LongExposurePhotoJsonProtocol</span> <span class="hljs-keyword">extends</span> <span class="hljs-title">DefaultJsonProtocol</span> </span>{
  <span class="hljs-keyword">implicit</span> <span class="hljs-keyword">val</span> longExposurePhotoFormat:<span class="hljs-type">RootJsonFormat</span>[<span class="hljs-type">LongExposurePhoto</span>] = jsonFormat(<span class="hljs-type">LongExposurePhoto</span>, <span class="hljs-string">"id"</span>, <span class="hljs-string">"exposure_time"</span>, <span class="hljs-string">"created_at"</span>, <span class="hljs-string">"location"</span>)
}
</code></pre>
<p>This is quite simple: we keep from the photo <code>JSON</code> the information about the <code>id</code>, the exposure time (<code>exposureTime</code>), when the photo has been created (<code>createdAt</code>), and the <code>location</code> where it has been taken. The <code>location</code> comprehends the <code>city</code>, the <code>country</code>, and the <code>position</code> composed of <code>latitude</code> and <code>longitude</code>.</p>
<p>The DAO consists of just the <code>PhotoDao.scala</code> class. </p>
<pre><code class="lang-scala"><span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">PhotoDao</span>(<span class="hljs-params">database: <span class="hljs-type">MongoDatabase</span>, photoCollection: <span class="hljs-type">String</span></span>) </span>{

  <span class="hljs-keyword">val</span> collection: <span class="hljs-type">MongoCollection</span>[<span class="hljs-type">Document</span>] = database.getCollection(photoCollection)

  <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">createPhoto</span></span>(photo: <span class="hljs-type">Photo</span>): <span class="hljs-type">Future</span>[<span class="hljs-type">String</span>] = {
    <span class="hljs-keyword">val</span> doc = <span class="hljs-type">Document</span>(photo.toJson.toString())
    doc.put(<span class="hljs-string">"_id"</span>, photo.id)
    collection.insertOne(doc).toFuture()
      .map(_ =&gt; photo.id)
  }
}
</code></pre>
<p>Since I want to keep this example minimal and focused on the CDC implementation, the DAO has just one method to create a new photo document in MongoDB. </p>
<p>It is straightforward: create a document from the photo <code>JSON</code>, and insert it in mongo using <code>id</code> as the one of the photo itself. Then, we can return the <code>id</code> of the photo just inserted in a <code>Future</code> (the MongoDB API is async).</p>
<h3 id="heading-kafka-producer">Kafka Producer</h3>
<p>Once the photo is stored inside MongoDB, we have to send it to the <code>photo</code> Kafka topic. This means we need a producer to write the message in its topic. The <code>PhotoProducer.scala</code> class looks like this.</p>
<pre><code class="lang-scala"><span class="hljs-keyword">case</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">PhotoProducer</span>(<span class="hljs-params">props: <span class="hljs-type">Properties</span>, topic: <span class="hljs-type">String</span></span>) </span>{

  createKafkaTopic(props, topic)
  <span class="hljs-keyword">val</span> photoProducer = <span class="hljs-keyword">new</span> <span class="hljs-type">KafkaProducer</span>[<span class="hljs-type">String</span>, <span class="hljs-type">String</span>](props)

  <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">sendPhoto</span></span>(photo: <span class="hljs-type">Photo</span>): <span class="hljs-type">Future</span>[<span class="hljs-type">RecordMetadata</span>] = {
    <span class="hljs-keyword">val</span> record = <span class="hljs-keyword">new</span> <span class="hljs-type">ProducerRecord</span>[<span class="hljs-type">String</span>, <span class="hljs-type">String</span>](topic, photo.id, photo.toJson.compactPrint)
    photoProducer.send(record)
  }

  <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">closePhotoProducer</span></span>(): <span class="hljs-type">Unit</span> = photoProducer.close()
}
</code></pre>
<p>I would say that this is pretty self-explanatory. The most interesting part is probably the <code>createKafkaTopic</code> method that is implemented in the <code>utils</code> package.</p>
<pre><code class="lang-scala"><span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">createKafkaTopic</span></span>(props: <span class="hljs-type">Properties</span>, topic: <span class="hljs-type">String</span>): <span class="hljs-type">Unit</span> = {
    <span class="hljs-keyword">val</span> adminClient = <span class="hljs-type">AdminClient</span>.create(props)
    <span class="hljs-keyword">val</span> photoTopic = <span class="hljs-keyword">new</span> <span class="hljs-type">NewTopic</span>(topic, <span class="hljs-number">1</span>, <span class="hljs-number">1</span>)
    adminClient.createTopics(<span class="hljs-type">List</span>(photoTopic).asJava)
  }
</code></pre>
<p>This method creates the topic in Kafka setting 1 as a partition and replication factor (it is enough for this example). It is not required, but creating the topic in advance lets Kafka balance partitions, select leaders, and so on. This will be useful to get our stream topology ready to process as we start our server.</p>
<h3 id="heading-event-listener">Event Listener</h3>
<p>We have the DAO that writes in MongoDB and the producer that sends the message in Kafka. We need to glue them together in some way so that when the document is stored in MongoDB the message is sent to the <code>photo</code> topic. This is the purpose of the <code>PhotoListener.scala</code> class.</p>
<pre><code class="lang-scala"><span class="hljs-keyword">case</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">PhotoListener</span>(<span class="hljs-params">collection: <span class="hljs-type">MongoCollection</span>[<span class="hljs-type">Document</span>], producer: <span class="hljs-type">PhotoProducer</span></span>) </span>{

  <span class="hljs-keyword">val</span> cursor: <span class="hljs-type">ChangeStreamObservable</span>[<span class="hljs-type">Document</span>] = collection.watch()

  cursor.subscribe(<span class="hljs-keyword">new</span> <span class="hljs-type">Observer</span>[<span class="hljs-type">ChangeStreamDocument</span>[<span class="hljs-type">Document</span>]] {
    <span class="hljs-keyword">override</span> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">onNext</span></span>(result: <span class="hljs-type">ChangeStreamDocument</span>[<span class="hljs-type">Document</span>]): <span class="hljs-type">Unit</span> = {
      result.getOperationType <span class="hljs-keyword">match</span> {
        <span class="hljs-keyword">case</span> <span class="hljs-type">OperationType</span>.<span class="hljs-type">INSERT</span> =&gt; {
          <span class="hljs-keyword">val</span> photo = result.getFullDocument.toJson().parseJson.convertTo[<span class="hljs-type">Photo</span>]
          producer.sendPhoto(photo).get()
          println(<span class="hljs-string">s"Sent photo with Id <span class="hljs-subst">${photo.id}</span>"</span>)
        }
        <span class="hljs-keyword">case</span> _ =&gt; println(<span class="hljs-string">s"Operation <span class="hljs-subst">${result.getOperationType}</span> not supported"</span>)
      }
    }
    <span class="hljs-keyword">override</span> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">onError</span></span>(e: <span class="hljs-type">Throwable</span>): <span class="hljs-type">Unit</span> = println(<span class="hljs-string">s"onError: <span class="hljs-subst">$e</span>"</span>)
    <span class="hljs-keyword">override</span> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">onComplete</span></span>(): <span class="hljs-type">Unit</span> = println(<span class="hljs-string">"onComplete"</span>)})
}
</code></pre>
<p>We exploit the <a target="_blank" href="https://docs.mongodb.com/manual/changeStreams/">Chage Streams interface</a> provided by the MongoDB scala library. </p>
<p>Here is how it works: we <code>watch()</code> the collection where photos are stored. When there is a new event (<code>onNext</code>) we run our logic. </p>
<p>For this example we are interested only in the creation of new documents, so we explicitly check that the operation is of type <code>OperationType.INSERT</code>. If the operation is the one we are interested in, we get the document and convert it to a <code>Photo</code> object to be sent by our producer. </p>
<p>That's it! With few lines of code we connected the creation of documents in MongoDB to a stream of events in Kafka.?</p>
<p>As a side note, be aware that to use the Change Streams interface <strong>we have to setup a MongoDB replica set</strong>. This means we need to run 3 instances of MongoDB and configure them to act as a replica set using the following command in mongo client:</p>
<pre><code class="lang-shell">rs.initiate({_id : "r0", members: [{ _id : 0, host : "mongo1:27017", priority : 1 },{ _id : 1, host :"mongo2:27017", priority : 0 },{ _id : 2, host : "mongo3:27017", priority : 0, arbiterOnly: true }]})
</code></pre>
<p>Here our instances are the containers we will run in the docker-compose file, that is <code>mongo1</code>, <code>mongo2</code>, and <code>mongo3</code>.</p>
<h3 id="heading-processing-topology">Processing Topology</h3>
<p>Time to build our processing topology! It will be in charge of the creation of the <code>long-exposure</code> index in Elasticsearch. The topology is described by the following diagram:</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2020/01/processing-topology.png" alt="Image" width="600" height="400" loading="lazy">
<em>Processing topology</em></p>
<p>and it is implemented in the <code>LongExposureTopology.scala</code> object class.
Let's analyse every step of our processing topology.</p>
<pre><code class="lang-scala"><span class="hljs-keyword">val</span> stringSerde = <span class="hljs-keyword">new</span> <span class="hljs-type">StringSerde</span>

<span class="hljs-keyword">val</span> streamsBuilder = <span class="hljs-keyword">new</span> <span class="hljs-type">StreamsBuilder</span>()

<span class="hljs-keyword">val</span> photoSource: <span class="hljs-type">KStream</span>[<span class="hljs-type">String</span>, <span class="hljs-type">String</span>] = streamsBuilder.stream(sourceTopic, <span class="hljs-type">Consumed</span>.`<span class="hljs-keyword">with</span>`(stringSerde, stringSerde))
</code></pre>
<p>The first step is to read from a source topic. We start a stream from the <code>sourceTopic</code> (that is <code>photo</code> topic) using the <code>StreamsBuilder()</code> object. The <code>stringSerde</code> object is used to serialise and deserialise the content of the topic as a <code>String</code>. </p>
<p>Please notice that at each step of the processing we create a new stream of data with a <code>KStream</code> object. When creating the stream, we specify the key and the value produced by the stream. In our topology the key will always be a <code>String</code>. In this step the value produced is still a <code>String</code>.</p>
<pre><code class="lang-scala"><span class="hljs-keyword">val</span> covertToPhotoObject: <span class="hljs-type">KStream</span>[<span class="hljs-type">String</span>, <span class="hljs-type">Photo</span>] =
      photoSource.mapValues((_, jsonString) =&gt; {
        <span class="hljs-keyword">val</span> photo = jsonString.parseJson.convertTo[<span class="hljs-type">Photo</span>]
        println(<span class="hljs-string">s"Processing photo <span class="hljs-subst">${photo.id}</span>"</span>)
        photo
      })
</code></pre>
<p>The next step is to convert the value extracted from the <code>photo</code> topic into a proper <code>Photo</code> object. </p>
<p>So we start from the <code>photoSource</code> stream and work on the values using the <code>mapValues</code> function. We simply parse the value as a <code>JSON</code> and create the <code>Photo</code> object that will be sent in the <code>convertToPhotoObject</code> stream.</p>
<pre><code class="lang-scala"><span class="hljs-keyword">val</span> filterWithLocation: <span class="hljs-type">KStream</span>[<span class="hljs-type">String</span>, <span class="hljs-type">Photo</span>] = covertToPhotoObject.filter((_, photo) =&gt; photo.location.exists(_.position.isDefined))
</code></pre>
<p>There is no guarantee that the photo we are processing will have the info about the location, but we want it in our long exposure object. This step of the topology filters out from the <code>covertToPhotoObject</code> stream the photos that have no info about the location, and creates the <code>filterWithLocation</code> stream.</p>
<pre><code class="lang-scala"><span class="hljs-keyword">val</span> filterWithExposureTime: <span class="hljs-type">KStream</span>[<span class="hljs-type">String</span>, <span class="hljs-type">Photo</span>] = filterWithLocation.filter((_, photo) =&gt; photo.exif.exists(_.exposureTime.isDefined))
</code></pre>
<p>Another important fact for our processing is the exposure time of the photo. For this reason, we filter out from the <code>filterWithLocation</code> stream the photos without exposure time info, creating the <code>filterWithExposureTime</code>.</p>
<pre><code class="lang-scala"><span class="hljs-keyword">val</span> dataExtractor: <span class="hljs-type">KStream</span>[<span class="hljs-type">String</span>, <span class="hljs-type">LongExposurePhoto</span>] =
      filterWithExposureTime.mapValues((_, photo) =&gt; <span class="hljs-type">LongExposurePhoto</span>(photo.id, parseExposureTime(photo.exif.get.exposureTime.get), photo.createdAt, photo.location.get))
</code></pre>
<p>We now have all we need to create a <code>LongExposurePhoto</code> object! That is the result of the <code>dataExtractor</code>: it takes the <code>Photo</code> coming from the <code>filterWithExposureTime</code> stream and produces a new stream containing <code>LongExposurePhoto</code>.</p>
<pre><code class="lang-scala"><span class="hljs-keyword">val</span> longExposureFilter: <span class="hljs-type">KStream</span>[<span class="hljs-type">String</span>, <span class="hljs-type">String</span>] =
      dataExtractor.filter((_, item) =&gt; item.exposureTime &gt; <span class="hljs-number">1.0</span>).mapValues((_, longExposurePhoto) =&gt; {
        <span class="hljs-keyword">val</span> jsonString = longExposurePhoto.toJson.compactPrint
        println(<span class="hljs-string">s"completed processing: <span class="hljs-subst">$jsonString</span>"</span>)
        jsonString
      })
</code></pre>
<p>We are almost there. We now have to keep the photos with a long exposure time (that we decided is more then 1 sec.). So we create a new <code>longExposureFilter</code> stream without the photos that are not long exposure. </p>
<p>This time we also serialise the <code>LongExposurePhotos</code> into the corresponding <code>JSON</code> string, which will be written to Elasticsearch in the next step.</p>
<pre><code class="lang-scala">longExposureFilter.to(sinkTopic, <span class="hljs-type">Produced</span>.`<span class="hljs-keyword">with</span>`(stringSerde, stringSerde))

streamsBuilder.build()
</code></pre>
<p>This is the last step of our topology. We write <code>to</code> our <code>sinkTopic</code> (that is <code>long-exposure</code> topic) using the string serialiser/deserialiser what is inside the <code>longExposureFilter</code> stream.
The last command simply <code>build</code>s the topology we just created.</p>
<p>Now that we have our topology, we can use it in our server. The <code>PhotoStreamProcessor.scala</code> class is what manages the processing.</p>
<pre><code class="lang-scala"><span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">PhotoStreamProcessor</span>(<span class="hljs-params">kafkaProps: <span class="hljs-type">Properties</span>, streamProps: <span class="hljs-type">Properties</span>, sourceTopic: <span class="hljs-type">String</span>, sinkTopic: <span class="hljs-type">String</span></span>) </span>{

  createKafkaTopic(kafkaProps, sinkTopic)
  <span class="hljs-keyword">val</span> topology: <span class="hljs-type">Topology</span> = <span class="hljs-type">LongExposureTopology</span>.build(sourceTopic, sinkTopic)
  <span class="hljs-keyword">val</span> streams: <span class="hljs-type">KafkaStreams</span> = <span class="hljs-keyword">new</span> <span class="hljs-type">KafkaStreams</span>(topology, streamProps)

  sys.<span class="hljs-type">ShutdownHookThread</span> {
    streams.close(java.time.<span class="hljs-type">Duration</span>.ofSeconds(<span class="hljs-number">10</span>))
  }

  <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">start</span></span>(): <span class="hljs-type">Unit</span> = <span class="hljs-keyword">new</span> <span class="hljs-type">Thread</span> {
    <span class="hljs-keyword">override</span> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">run</span></span>(): <span class="hljs-type">Unit</span> = {
      streams.cleanUp()
      streams.start()
      println(<span class="hljs-string">"Started long exposure processor"</span>)
    }
  }.start()
}
</code></pre>
<p>First we create the <code>sinkTopic</code>, using the same utility method we saw before. Then we build the stream topology and initialize a <code>KafkaStreams</code> object with that topology.</p>
<p>To start the stream processing, we need to create a dedicated <code>Thread</code> that will run the streaming while the server is alive. According to the official documentation, it is always a good idea to <code>cleanUp()</code> the stream before starting it. </p>
<p>Our <code>PhotoStreamProcessor</code> is ready to go!?</p>
<h3 id="heading-rest-api">REST API</h3>
<p>The server exposes REST APIs to send it the photo information to store. We make use of <a target="_blank" href="https://doc.akka.io/docs/akka-http/current/index.html">Akka HTTP</a> for the API implementation.</p>
<pre><code class="lang-scala"><span class="hljs-class"><span class="hljs-keyword">trait</span> <span class="hljs-title">AppRoutes</span> <span class="hljs-keyword">extends</span> <span class="hljs-title">SprayJsonSupport</span> </span>{

  <span class="hljs-keyword">implicit</span> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">system</span></span>: <span class="hljs-type">ActorSystem</span>
  <span class="hljs-keyword">implicit</span> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">photoDao</span></span>: <span class="hljs-type">PhotoDao</span>
  <span class="hljs-keyword">implicit</span> <span class="hljs-keyword">lazy</span> <span class="hljs-keyword">val</span> timeout = <span class="hljs-type">Timeout</span>(<span class="hljs-number">5.</span>seconds)

  <span class="hljs-keyword">lazy</span> <span class="hljs-keyword">val</span> healthRoute: <span class="hljs-type">Route</span> = pathPrefix(<span class="hljs-string">"health"</span>) {
    concat(
      pathEnd {
        concat(
          get {
            complete(<span class="hljs-type">StatusCodes</span>.<span class="hljs-type">OK</span>)
          }
        )
      }
    )
  }

  <span class="hljs-keyword">lazy</span> <span class="hljs-keyword">val</span> crudRoute: <span class="hljs-type">Route</span> = pathPrefix(<span class="hljs-string">"photo"</span>) {
    concat(
      pathEnd {
        concat(
          post {
            entity(as[<span class="hljs-type">Photo</span>]) { photo =&gt;
              <span class="hljs-keyword">val</span> photoCreated: <span class="hljs-type">Future</span>[<span class="hljs-type">String</span>] =
                photoDao.createPhoto(photo)
              onSuccess(photoCreated) { id =&gt;
              complete((<span class="hljs-type">StatusCodes</span>.<span class="hljs-type">Created</span>, id))
              }
            }
          }
        )
      }
    )
  }

}
</code></pre>
<p>To keep the example minimal, we have only two routes:</p>
<ul>
<li><code>GET /health</code> - to check if the server is up &amp; running</li>
<li><code>POST /photo</code> - to send to the system the <code>JSON</code> of the photo information we want to store. This endpoint uses the DAO to store the document in MongoDB and returns a <code>201</code> with the id of the stored photo if the operation succeeded.</li>
</ul>
<p>This is by no means a complete set of APIs, but it is enough to run our example.?</p>
<h3 id="heading-server-main-class">Server main class</h3>
<p>OK, we implemented all the components of our server, so it's time to wrap everything up. This is our <code>Server.scala</code> object class.</p>
<pre><code class="lang-scala"><span class="hljs-keyword">implicit</span> <span class="hljs-keyword">val</span> system: <span class="hljs-type">ActorSystem</span> = <span class="hljs-type">ActorSystem</span>(<span class="hljs-string">"kafka-stream-playground"</span>)
<span class="hljs-keyword">implicit</span> <span class="hljs-keyword">val</span> materializer: <span class="hljs-type">ActorMaterializer</span> = <span class="hljs-type">ActorMaterializer</span>()
</code></pre>
<p>First a couple of <strong>Akka</strong> utility values. Since we use <a target="_blank" href="https://doc.akka.io/docs/akka-http/current/index.html">Akka HTTP</a> to run our server and REST API, these implicit values are required.</p>
<pre><code class="lang-scala"><span class="hljs-keyword">val</span> config: <span class="hljs-type">Config</span> = <span class="hljs-type">ConfigFactory</span>.load()
<span class="hljs-keyword">val</span> address = config.getString(<span class="hljs-string">"http.ip"</span>)
<span class="hljs-keyword">val</span> port = config.getInt(<span class="hljs-string">"http.port"</span>)

<span class="hljs-keyword">val</span> mongoUri = config.getString(<span class="hljs-string">"mongo.uri"</span>)
<span class="hljs-keyword">val</span> mongoDb = config.getString(<span class="hljs-string">"mongo.db"</span>)
<span class="hljs-keyword">val</span> mongoUser = config.getString(<span class="hljs-string">"mongo.user"</span>)
<span class="hljs-keyword">val</span> mongoPwd = config.getString(<span class="hljs-string">"mongo.pwd"</span>)
<span class="hljs-keyword">val</span> photoCollection = config.getString(<span class="hljs-string">"mongo.photo_collection"</span>)

<span class="hljs-keyword">val</span> kafkaHosts = config.getString(<span class="hljs-string">"kafka.hosts"</span>).split(',').toList
<span class="hljs-keyword">val</span> photoTopic = config.getString(<span class="hljs-string">"kafka.photo_topic"</span>)
<span class="hljs-keyword">val</span> longExposureTopic = config.getString(<span class="hljs-string">"kafka.long_exposure_topic"</span>)
</code></pre>
<p>Then we read all the configuration properties. We will come back to the configuration file in a moment.</p>
<pre><code class="lang-scala"><span class="hljs-keyword">val</span> kafkaProps = <span class="hljs-keyword">new</span> <span class="hljs-type">Properties</span>()
kafkaProps.put(<span class="hljs-string">"bootstrap.servers"</span>, kafkaHosts.mkString(<span class="hljs-string">","</span>))
kafkaProps.put(<span class="hljs-string">"key.serializer"</span>, <span class="hljs-string">"org.apache.kafka.common.serialization.StringSerializer"</span>)
kafkaProps.put(<span class="hljs-string">"value.serializer"</span>, <span class="hljs-string">"org.apache.kafka.common.serialization.StringSerializer"</span>)

<span class="hljs-keyword">val</span> streamProps = <span class="hljs-keyword">new</span> <span class="hljs-type">Properties</span>()
streamProps.put(<span class="hljs-type">StreamsConfig</span>.<span class="hljs-type">APPLICATION_ID_CONFIG</span>, <span class="hljs-string">"long-exp-proc-app"</span>)
streamProps.put(<span class="hljs-type">StreamsConfig</span>.<span class="hljs-type">BOOTSTRAP_SERVERS_CONFIG</span>, kafkaHosts.mkString(<span class="hljs-string">","</span>))

<span class="hljs-keyword">val</span> photoProducer = <span class="hljs-type">PhotoProducer</span>(kafkaProps, photoTopic)
<span class="hljs-keyword">val</span> photoStreamProcessor = <span class="hljs-keyword">new</span> <span class="hljs-type">PhotoStreamProcessor</span>(kafkaProps, streamProps, photoTopic, <span class="hljs-string">"long-exposure"</span>)
photoStreamProcessor.start()
</code></pre>
<p>We have to configure both our Kafka producer and the stream processor. We also start the stream processor, so the server will be ready to process the documents sent to it.</p>
<pre><code class="lang-scala"><span class="hljs-keyword">val</span> client = <span class="hljs-type">MongoClient</span>(<span class="hljs-string">s"mongodb://<span class="hljs-subst">$mongoUri</span>/<span class="hljs-subst">$mongoUser</span>"</span>)
<span class="hljs-keyword">val</span> db = client.getDatabase(mongoDb)
<span class="hljs-keyword">val</span> photoDao: <span class="hljs-type">PhotoDao</span> = <span class="hljs-keyword">new</span> <span class="hljs-type">PhotoDao</span>(db, photoCollection)
<span class="hljs-keyword">val</span> photoListener = <span class="hljs-type">PhotoListener</span>(photoDao.collection, photoProducer)
</code></pre>
<p>Also MongoDB needs to be configured. We setup the connection and initialize the DAO as well as the listener.</p>
<pre><code class="lang-scala"><span class="hljs-keyword">lazy</span> <span class="hljs-keyword">val</span> routes: <span class="hljs-type">Route</span> = healthRoute ~ crudRoute

<span class="hljs-type">Http</span>().bindAndHandle(routes, address, port)
<span class="hljs-type">Await</span>.result(system.whenTerminated, <span class="hljs-type">Duration</span>.<span class="hljs-type">Inf</span>)
</code></pre>
<p>Everything has been initialized. We create the REST routes for the communication to the server, bind them to the handlers, and finally start the server!?</p>
<h4 id="heading-server-configuration">Server configuration</h4>
<p>This is the configuration file used to setup the server:</p>
<pre><code>http {
  ip = <span class="hljs-string">"127.0.0.1"</span>
  ip = ${?SERVER_IP}

  port = <span class="hljs-number">8000</span>
  port = ${?SERVER_PORT}
}
mongo {
  uri = <span class="hljs-string">"127.0.0.1:27017"</span>
  uri = ${?MONGO_URI}
  db = <span class="hljs-string">"kafka-stream-playground"</span>
  user = <span class="hljs-string">"admin"</span>
  pwd = <span class="hljs-string">"admin"</span>
  photo_collection = <span class="hljs-string">"photo"</span>
}
kafka {
  hosts = <span class="hljs-string">"127.0.0.1:9092"</span>
  hosts = ${?KAFKA_HOSTS}
  photo_topic = <span class="hljs-string">"photo"</span>
  long_exposure_topic = <span class="hljs-string">"long-exposure"</span>
}
</code></pre><p>I think that this one does not require much explanation, right??</p>
<h1 id="heading-connectors-configuration">Connectors configuration</h1>
<p>The server we implemented writes in two Kafka topics: <code>photo</code> and <code>long-exposure</code>. But how are messages written in Elasticsearch as documents? Using <strong>Kafka Connect</strong>!</p>
<p>We can setup two connectors, one per topic, and tell the connectors to write  every message going through that topic in Elasticsearch. </p>
<p>First we need <a target="_blank" href="https://docs.confluent.io/current/connect/index.html">Kafka Connect</a>. We can use the container provided by Confluence in the docker-compose file:</p>
<pre><code class="lang-yml"><span class="hljs-attr">connect:</span>
    <span class="hljs-attr">image:</span> <span class="hljs-string">confluentinc/cp-kafka-connect</span>
    <span class="hljs-attr">ports:</span>
      <span class="hljs-bullet">-</span> <span class="hljs-number">8083</span><span class="hljs-string">:8083</span>
    <span class="hljs-attr">networks:</span>
      <span class="hljs-bullet">-</span> <span class="hljs-string">kakfa_stream_playground</span>
    <span class="hljs-attr">depends_on:</span>
      <span class="hljs-bullet">-</span> <span class="hljs-string">zookeeper</span>
      <span class="hljs-bullet">-</span> <span class="hljs-string">kafka</span>
    <span class="hljs-attr">volumes:</span>
      <span class="hljs-bullet">-</span> <span class="hljs-string">$PWD/connect-plugins:/connect-plugins</span>
    <span class="hljs-attr">environment:</span>
      <span class="hljs-attr">CONNECT_BOOTSTRAP_SERVERS:</span> <span class="hljs-string">kafka:9092</span>
      <span class="hljs-attr">CONNECT_REST_ADVERTISED_HOST_NAME:</span> <span class="hljs-string">connect</span>
      <span class="hljs-attr">CONNECT_REST_PORT:</span> <span class="hljs-number">8083</span>
      <span class="hljs-attr">CONNECT_GROUP_ID:</span> <span class="hljs-string">compose-connect-group</span>
      <span class="hljs-attr">CONNECT_CONFIG_STORAGE_TOPIC:</span> <span class="hljs-string">docker-connect-configs</span>
      <span class="hljs-attr">CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR:</span> <span class="hljs-number">1</span>
      <span class="hljs-attr">CONNECT_OFFSET_FLUSH_INTERVAL_MS:</span> <span class="hljs-number">10000</span>
      <span class="hljs-attr">CONNECT_OFFSET_STORAGE_TOPIC:</span> <span class="hljs-string">docker-connect-offsets</span>
      <span class="hljs-attr">CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR:</span> <span class="hljs-number">1</span>
      <span class="hljs-attr">CONNECT_STATUS_STORAGE_TOPIC:</span> <span class="hljs-string">docker-connect-status</span>
      <span class="hljs-attr">CONNECT_STATUS_STORAGE_REPLICATION_FACTOR:</span> <span class="hljs-number">1</span>
      <span class="hljs-attr">CONNECT_KEY_CONVERTER:</span> <span class="hljs-string">"org.apache.kafka.connect.storage.StringConverter"</span>
      <span class="hljs-attr">CONNECT_VALUE_CONVERTER:</span> <span class="hljs-string">"org.apache.kafka.connect.json.JsonConverter"</span>
      <span class="hljs-attr">CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE:</span> <span class="hljs-string">"false"</span>
      <span class="hljs-attr">CONNECT_INTERNAL_KEY_CONVERTER:</span> <span class="hljs-string">"org.apache.kafka.connect.json.JsonConverter"</span>
      <span class="hljs-attr">CONNECT_INTERNAL_VALUE_CONVERTER:</span> <span class="hljs-string">"org.apache.kafka.connect.json.JsonConverter"</span>
      <span class="hljs-attr">CONNECT_ZOOKEEPER_CONNECT:</span> <span class="hljs-string">zookeeper:2181</span>
      <span class="hljs-attr">CONNECT_PLUGIN_PATH:</span> <span class="hljs-string">/connect-plugins</span>
      <span class="hljs-attr">CONNECT_LOG4J_ROOT_LOGLEVEL:</span> <span class="hljs-string">INFO</span>
</code></pre>
<p>I want to focus on some of the configuration values. </p>
<p>First of all, we need to expose the port <code>8083</code> - that will be our endpoint to configure the connectors (<code>CONNECT_REST_PORT</code>). </p>
<p>We also need to map a volume to the <code>/connect-plugins</code> path, where we will place the <a target="_blank" href="https://docs.confluent.io/current/connect/kafka-connect-elasticsearch/index.html">Elasticsearch Sink Connector</a> to write to Elasticsearch. This is reflected also in the <code>CONNECT_PLUGIN_PATH</code>. </p>
<p>The <code>connect</code> container should know how to find the Kafka servers, so we set <code>CONNECT_BOOTSTRAP_SERVERS</code> as <code>kafka:9092</code>.</p>
<p>Once Kafka Connect is ready, we can send the configurations of our connectors to the <code>http://localhost:8083/connectors</code> endpoint. We need 2 connectors, one for the <code>photo</code> topic and one for the <code>long-exposure</code> topic. We can send the configuration as a <code>JSON</code> with a <code>POST</code> request.</p>
<pre><code class="lang-json">{
  <span class="hljs-attr">"name"</span>: <span class="hljs-string">"photo-connector"</span>,
  <span class="hljs-attr">"config"</span>: {
    <span class="hljs-attr">"connector.class"</span>: <span class="hljs-string">"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector"</span>,
    <span class="hljs-attr">"tasks.max"</span>: <span class="hljs-string">"1"</span>,
    <span class="hljs-attr">"topics"</span>: <span class="hljs-string">"photo"</span>,
    <span class="hljs-attr">"key.converter"</span>: <span class="hljs-string">"org.apache.kafka.connect.storage.StringConverter"</span>,
    <span class="hljs-attr">"value.converter"</span>: <span class="hljs-string">"org.apache.kafka.connect.json.JsonConverter"</span>,
    <span class="hljs-attr">"value.converter.schemas.enable"</span>: <span class="hljs-string">"false"</span>,
    <span class="hljs-attr">"schema.ignore"</span>: <span class="hljs-string">"true"</span>,
    <span class="hljs-attr">"connection.url"</span>: <span class="hljs-string">"http://elastic:9200"</span>,
    <span class="hljs-attr">"type.name"</span>: <span class="hljs-string">"kafka-connect"</span>,
    <span class="hljs-attr">"behavior.on.malformed.documents"</span>: <span class="hljs-string">"warn"</span>,
    <span class="hljs-attr">"name"</span>: <span class="hljs-string">"photo-connector"</span>
  }
}
</code></pre>
<p>We explicitly say we are gonna use the <code>ElasticsearchSinkConnector</code> as the <code>connector.class</code> , as well as the <code>topics</code> that we want to sink - in this case <code>photo</code>. </p>
<p>We don't want to use a schema for the <code>value.converter</code>, so we can disable it (<code>value.converter.schemas.enable</code>) and tell the connector to ignore the schema (<code>schema.ignore</code>).</p>
<p>The connector for the <code>long-exposure</code> topic is exactly like this one. The only difference is the <code>name</code> and of course the <code>topics</code>.</p>
<h1 id="heading-how-to-run-the-project">How to run the project</h1>
<p>We have all we need to test the CDC! How can we do it? It's quite easy: simply run the <code>setup.sh</code> script in the root folder of the repo!</p>
<p>What will the script do?</p>
<ol>
<li>Run the <code>docker-compose</code> file with all the services.</li>
<li>Configure MongoDB replica set. This is required to enable the <strong>Change Stream interface</strong> to capture data changes. More info about this <a target="_blank" href="https://docs.mongodb.com/manual/changeStreams/">here</a>.</li>
<li>Configure the Kafka connectors.</li>
<li>Connect to the logs of the server.</li>
</ol>
<p>The docker-compose will run the following services:</p>
<ul>
<li>Our Server</li>
<li>3 instances of MongoDB (required for the replica set)</li>
<li>Mongoku, a MongoDB client</li>
<li>Kafka (single node)</li>
<li>Kafka connect</li>
<li>Zookeeper (required by Kafka)</li>
<li>Elasticsearch</li>
<li>Kibana</li>
</ul>
<p>There are a lot of containers to run, so make sure you have enough resources to run everything properly. If you want, remove Mongoku and Kibana from the compose-file, since they are used just for a quick look inside the DBs.</p>
<p>Once everything is up and running, you just have to send data to the server. </p>
<p>I collected some <code>JSON</code> documents of photos from Unplash that you can use to test the system in the <code>photos.txt</code> file. </p>
<p>There are a total of 10 documents, with 5 of them containing info about long exposure photos. Send them to the server running the <code>send-photos.sh</code> script in the root of the repo. Check that everything is stored in MongoDB connecting to Mongoku at <code>http://localhost:3100</code>. Then connect to Kibana at <code>http://localhost:5601</code> and you will find two indexes in Elasticsearch: <code>photo</code>, containing the JSON of all the photos stored in MongoDB, and <code>long-exposure</code>, containing just the info of the long exposure photos.</p>
<p>Amazing, right? ?</p>
<h1 id="heading-conclusion">Conclusion</h1>
<p>We made it guys!? </p>
<p>Starting from the design of the use-case, we built our system that connected a MongoDB database to Elasticsearch using CDC. </p>
<p>Kafka Streams is the enabler, allowing us to convert database events to a stream that we can process. </p>
<p>Do you need to see the whole project? Just checkout the <a target="_blank" href="https://github.com/elleFlorio/kafka-streams-playground">repository</a> on GitHub!?</p>
<p>That's it, enjoy! ?</p>
 ]]>
                </content:encoded>
            </item>
        
            <item>
                <title>
                    <![CDATA[ How to produce and consume data streams directly via Cypher with Streams Procedures ]]>
                </title>
                <description>
                    <![CDATA[ By Andrea Santurbano Leveraging Neo4j Streams — Part 3 This article is the third part of the Leveraging Neo4j Streams series (Part 1 is here, Part 2 is here). In it, I’ll show you how to bring Neo4j into your Apache Kafka flow by using the streams pr... ]]>
                </description>
                <link>https://www.freecodecamp.org/news/how-to-produce-and-consume-data-streams-directly-via-cypher-with-streams-procedures-52cbc5f543f1/</link>
                <guid isPermaLink="false">66c353fd5f85c1948b3fabaf</guid>
                
                    <category>
                        <![CDATA[ Apache Kafka ]]>
                    </category>
                
                    <category>
                        <![CDATA[ data ]]>
                    </category>
                
                    <category>
                        <![CDATA[ Neo4j ]]>
                    </category>
                
                    <category>
                        <![CDATA[ streaming ]]>
                    </category>
                
                    <category>
                        <![CDATA[ tech  ]]>
                    </category>
                
                <dc:creator>
                    <![CDATA[ freeCodeCamp ]]>
                </dc:creator>
                <pubDate>Thu, 09 May 2019 17:13:07 +0000</pubDate>
                <media:content url="https://cdn-media-1.freecodecamp.org/images/1*47Ktwi-Gdj5S7keZpiteZA.gif" medium="image" />
                <content:encoded>
                    <![CDATA[ <p>By Andrea Santurbano</p>
<h4 id="heading-leveraging-neo4j-streams-part-3">Leveraging Neo4j Streams — Part 3</h4>
<p>This article is the third part of the <strong>Leveraging Neo4j Streams</strong> series (Part 1 is <a target="_blank" href="https://medium.freecodecamp.org/how-to-leverage-neo4j-streams-and-build-a-just-in-time-data-warehouse-64adf290f093">here</a>, Part 2 is <a target="_blank" href="https://medium.freecodecamp.org/how-to-ingest-data-into-neo4j-from-a-kafka-stream-a34f574f5655">here</a>). In it, I’ll show you how to bring Neo4j into your <strong>Apache Kafka</strong> flow by using the streams procedures available with <a target="_blank" href="https://medium.com/neo4j/a-new-neo4j-integration-with-apache-kafka-6099c14851d2"><strong>Neo4j Streams</strong></a>.</p>
<p>In order to show how to integrate them, simplify the integration, and let you test the whole project by hand, I’ll use <a target="_blank" href="https://towardsdatascience.com/building-a-graph-data-pipeline-with-zeppelin-spark-and-neo4j-8b6b83f4fb70"><strong>Apache Zeppelin</strong></a><strong>, a notebook runner that simply allows you to <a target="_blank" href="https://towardsdatascience.com/building-a-graph-data-pipeline-with-zeppelin-spark-and-neo4j-8b6b83f4fb70">natively interact with Neo4j</a>.</strong></p>
<h3 id="heading-what-is-a-neo4j-stored-procedure">What is a Neo4j Stored Procedure?</h3>
<p>Starting from Neo4j 3.x, the concept of <a target="_blank" href="https://neo4j.com/docs/java-reference/current/extending-neo4j/procedures/"><strong>user-defined procedures and functions</strong></a> was introduced. These are custom implementations of certain functionalities and/or business rules that can’t be (easily) expressed in Cypher itself.</p>
<p>Neo4j provides a number of built-in procedures. The <a target="_blank" href="http://neo4j-contrib.github.io/neo4j-apoc-procedures/">APOC</a> library adds another 450 to cover all kinds of uses from data integration to graph refactorings.</p>
<h3 id="heading-what-are-the-streams-procedures">What are the streams procedures?</h3>
<p>The Neo4j Streams project comes out with two procedures:</p>
<ul>
<li><code>streams.publish</code>: allows custom message streaming from Neo4j to the configured environment by using the underlying configured Producer</li>
<li><code>streams.consume</code>: allows consuming messages from a given topic.</li>
</ul>
<h3 id="heading-set-up-the-environment">Set-Up the Environment</h3>
<p>Going to the following <a target="_blank" href="https://github.com/conker84/leveraging-neo4j-streams">Github repo</a>, you’ll find everything necessary in order to replicate what I’m presenting in this article. What you will need to start is <a target="_blank" href="https://docs.docker.com/"><strong>Docker</strong></a>, and then you can simply spin-up the stack by entering into the directory and from the Terminal execute the following command:</p>
<pre><code>$ docker-compose up
</code></pre><p>This will start-up the whole environment that comprises:</p>
<ul>
<li>Neo4j + Neo4j Streams module + APOC procedures</li>
<li>Apache Kafka</li>
<li>Apache Spark (which is not necessary in this article, but it’s used in the previous two)</li>
<li>Apache Zeppelin</li>
</ul>
<p>By going into Apache Zeppelin @ <code>http://localhost:8080</code> you’ll find in directory <code>Medium/Part 3</code> one notebook called “<strong>Streams Procedures</strong>” which is the subject of this article.</p>
<h3 id="heading-streamspublish">streams.publish</h3>
<p>This procedure allows custom message streaming from Neo4j to the configured environment by using the underlying configured Producer.</p>
<p>It takes two variables as input and returns nothing (as it sends its payload asynchronously to the stream):</p>
<ul>
<li><strong>topic</strong>, <em>type String</em>: where the data will be published</li>
<li><strong>payload</strong>, <em>type Object</em>: what you want to stream.</li>
</ul>
<p>Example:</p>
<pre><code>CALL streams.publish(<span class="hljs-string">'my-topic'</span>, <span class="hljs-string">'Hello World from Neo4j!'</span>)
</code></pre><p>The message retrieved from the Consumer is the following:</p>
<pre><code>{<span class="hljs-string">"payload"</span>: <span class="hljs-string">"Hello world from Neo4j!"</span>}
</code></pre><p>You can send any kind of data in the payload: <strong>nodes, relationships, paths, lists, maps, scalar values and nested versions thereof</strong>.</p>
<p>In case of nodes and/or relationships, if the topic is defined in the patterns provided by the Change Data Capture (CDC) configuration, their properties will be filtered according to the configuration.</p>
<p>Following is a simple video that shows the procedure in action:</p>
<p><img src="https://cdn-media-1.freecodecamp.org/images/jmaPyKRDXsCEdwZEdeMzNyE2BoKXolGMEbjR" alt="Image" width="1665" height="820" loading="lazy">
<em>The streams.publish procedure in action</em></p>
<h3 id="heading-streamsconsume">streams.consume</h3>
<p>This procedure allows for consuming messages from a given topic.</p>
<p>It takes two variables as input:</p>
<ul>
<li><strong>topic</strong>, <em>type String</em>: where you want to consume the data</li>
<li><strong>config</strong>, _type Map: the configuration parameters</li>
</ul>
<p>and returns a list of collected events.</p>
<p>The <strong>config</strong> params are:</p>
<ul>
<li><strong>timeout</strong>, <em>type Long</em>: it’s the value passed to Kafka <code>[Consumer#poll](https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll-long-)</code> method (milliseconds). Default 1000.</li>
<li><strong>from</strong>, <em>type String</em>: it’s the Kafka configuration parameter <code>auto.offset.reset</code></li>
</ul>
<p>Use:</p>
<pre><code>CALL streams.consume(<span class="hljs-string">'my-topic'</span>, {<span class="xml"><span class="hljs-tag">&lt;<span class="hljs-name">config</span>&gt;</span>}) YIELD event RETURN event</span>
</code></pre><p>Example: Imagine you have a producer that publishes events like this:</p>
<pre><code>{<span class="hljs-string">"name"</span>: <span class="hljs-string">"Andrea"</span>, <span class="hljs-string">"surname"</span>: <span class="hljs-string">"Santurbano"</span>}
</code></pre><p>We can create user nodes in this way:</p>
<pre><code>CALL streams.consume(<span class="hljs-string">'my-topic'</span>, {<span class="xml"><span class="hljs-tag">&lt;<span class="hljs-name">config</span>&gt;</span>}) YIELD eventCREATE (p:Person{firstName: event.data.name, lastName: event.data.surname})</span>
</code></pre><p>Following is a simple video that shows the procedure in action:</p>
<p><img src="https://cdn-media-1.freecodecamp.org/images/m0Lui2cBqiT0OQO9DTuiQfOYHZmpAKwRB4Ys" alt="Image" width="1665" height="818" loading="lazy">
<em>The stream.consume procedure in action</em></p>
<p>So this is the end of the “Leveraging Neo4j Streams” series, I hope you enjoyed it!</p>
<p>If you have already tested the Neo4j-Streams module or tested it via this notebook, please fill out our <a target="_blank" href="https://goo.gl/forms/VLwvqwsIvdfdm9fL2"><strong>feedback survey</strong></a>.</p>
<p>If you run into any issues or have thoughts about improving our work, <a target="_blank" href="http://github.com/neo4j-contrib/neo4j-streams/issues">please raise a GitHub issue</a>.</p>
 ]]>
                </content:encoded>
            </item>
        
            <item>
                <title>
                    <![CDATA[ How to ingest data into Neo4j from a Kafka stream ]]>
                </title>
                <description>
                    <![CDATA[ By Andrea Santurbano This article is the second part of the Leveraging Neo4j Streams series (Part 1 is here). I’ll show how to bring Neo4j into your Apache Kafka flow by using the Sink module of the Neo4j Streams project in combination with Apache Sp... ]]>
                </description>
                <link>https://www.freecodecamp.org/news/how-to-ingest-data-into-neo4j-from-a-kafka-stream-a34f574f5655/</link>
                <guid isPermaLink="false">66c352d879660e79296c1dcf</guid>
                
                    <category>
                        <![CDATA[ Apache Kafka ]]>
                    </category>
                
                    <category>
                        <![CDATA[ #apache-spark ]]>
                    </category>
                
                    <category>
                        <![CDATA[ data ]]>
                    </category>
                
                    <category>
                        <![CDATA[ kafka ]]>
                    </category>
                
                    <category>
                        <![CDATA[ Neo4j ]]>
                    </category>
                
                <dc:creator>
                    <![CDATA[ freeCodeCamp ]]>
                </dc:creator>
                <pubDate>Fri, 15 Feb 2019 16:47:10 +0000</pubDate>
                <media:content url="https://cdn-media-1.freecodecamp.org/images/1*I3lIfJ7LFzRpfk0hdAbsww.jpeg" medium="image" />
                <content:encoded>
                    <![CDATA[ <p>By Andrea Santurbano</p>
<p>This article is the second part of the <strong>Leveraging Neo4j Streams</strong> series (<a target="_blank" href="https://medium.freecodecamp.org/how-to-leverage-neo4j-streams-and-build-a-just-in-time-data-warehouse-64adf290f093">Part 1 is here</a>). I’ll show how to bring Neo4j into your <strong>Apache Kafka</strong> flow by using the Sink module of the <a target="_blank" href="https://medium.com/neo4j/a-new-neo4j-integration-with-apache-kafka-6099c14851d2"><strong>Neo4j Streams</strong></a> project in combination with <strong>Apache Spark</strong>’s Structured Streaming Apis.</p>
<p><img src="https://cdn-media-1.freecodecamp.org/images/ucRQaTumqnuCgJXKJTQfxR5pnsTeSxEQN0-k" alt="Image" width="800" height="484" loading="lazy">
_Photo by [Unsplash](https://unsplash.com/photos/-qrcOR33ErA?utm_source=unsplash&amp;utm_medium=referral&amp;utm_content=creditCopyText" rel="noopener" target="_blank" title=""&gt;Hendrik Cornelissen on &lt;a href="https://unsplash.com/search/photos/stream?utm_source=unsplash&amp;utm_medium=referral&amp;utm_content=creditCopyText" rel="noopener" target="<em>blank" title=")</em></p>
<p>In order to show how to integrate them, simplify the integration, and let you test the whole project yourself, I’ll use <a target="_blank" href="https://towardsdatascience.com/building-a-graph-data-pipeline-with-zeppelin-spark-and-neo4j-8b6b83f4fb70"><strong>Apache Zeppelin</strong></a> <strong>— a notebook runner that simply allows you to <a target="_blank" href="https://towardsdatascience.com/building-a-graph-data-pipeline-with-zeppelin-spark-and-neo4j-8b6b83f4fb70">natively interact with Neo4j</a>.</strong></p>
<p><img src="https://cdn-media-1.freecodecamp.org/images/btZPmk6Xpd650yTa-cU7FgXPvdlMMbEGrM7K" alt="Image" width="1665" height="542" loading="lazy">
<em>The result</em></p>
<h3 id="heading-leveraging-neo4j-streams">Leveraging Neo4j Streams</h3>
<p>The Neo4j Streams project is composed of three main pillars:</p>
<ul>
<li>The <strong>Change Data Capture</strong> that allows you to stream database changes over Kafka topics</li>
<li>The <strong>Sink</strong> (the subject of the first article) that allows consuming data streams from Kafka topics</li>
<li>A <strong>set of procedures</strong> that allows you to Produce/Consume data to/from Kafka Topics</li>
</ul>
<h3 id="heading-the-neo4j-streams-sink">The Neo4j Streams Sink</h3>
<p>This module allows Neo4j to consume data from a Kafka topic. It does it in a “smart” way: by allowing you to define your custom queries. What you need to do is write in your neo4j.conf something like this:</p>
<pre><code>streams.sink.topic.cypher.&lt;TOPIC&gt;=<span class="xml"><span class="hljs-tag">&lt;<span class="hljs-name">CYPHER_QUERY</span>&gt;</span></span>
</code></pre><p>So if you define a query just like this:</p>
<pre><code>streams.sink.topic.my-topic=MERGE (n:Person{<span class="hljs-attr">id</span>: event.id}) \
    ON CREATE SET n += event.properties
</code></pre><p>And for events like this:</p>
<pre><code>{<span class="hljs-attr">id</span>:<span class="hljs-string">"alice@example.com"</span>,<span class="hljs-attr">properties</span>:{<span class="hljs-attr">name</span>:<span class="hljs-string">"Alice"</span>,<span class="hljs-attr">age</span>:<span class="hljs-number">32</span>}}
</code></pre><p>Under the hood the Sink module will execute a query like this:</p>
<pre><code>UNWIND {batch} AS event
MERGE (n:Label {<span class="hljs-attr">id</span>: event.id})
    ON CREATE SET n += event.properties
</code></pre><p>The <code>batch</code> parameter is a set of Kafka events that are gathered from the SINK and processed in a single transaction in order to maximize the execution efficiency.</p>
<p>So continuing with the example above, a possible full representation could be:</p>
<pre><code>WITH [{<span class="hljs-attr">id</span>:<span class="hljs-string">"alice@example.com"</span>,<span class="hljs-attr">properties</span>:{<span class="hljs-attr">name</span>:<span class="hljs-string">"Alice"</span>,<span class="hljs-attr">age</span>:<span class="hljs-number">32</span>}},
    {<span class="hljs-attr">id</span>:<span class="hljs-string">"bob@example.com"</span>,<span class="hljs-attr">properties</span>:{<span class="hljs-attr">name</span>:<span class="hljs-string">"Bob"</span>,<span class="hljs-attr">age</span>:<span class="hljs-number">42</span>}}] AS batch
UNWIND batch AS event
MERGE (n:Person {<span class="hljs-attr">id</span>: event.id})
    ON CREATE SET n += event.properties
</code></pre><p>This gives to the developer the power to define their own business rules because you can choose to update, add to, remove, or adapt your graph data based on the events you get.</p>
<h3 id="heading-a-simple-use-case-ingest-data-from-open-data-apis">A simple use case: Ingest data from Open Data APIs</h3>
<p>Imagine your data pipeline needs to read data from an Open Data API, enrich it with some other internal source, and in the end persist it into Neo4j. In this case, the best solution for doing this is using Apache Spark. This easily allows managing different data sources with the same Dataset abstraction.</p>
<h4 id="heading-set-up-the-environment">Set-Up the Environment</h4>
<p>Going to the following <a target="_blank" href="https://github.com/conker84/leveraging-neo4j-streams">Github repo</a>, you’ll find the whole code necessary in order to replicate what I’m presenting in this article. What you will need to start is <a target="_blank" href="https://docs.docker.com/"><strong>Docker</strong></a>, and then you can simply spin up the stack by entering the directory and executing the following command from the terminal:</p>
<pre><code>$ docker-compose up
</code></pre><p>This will start up the whole environment that comprises:</p>
<ul>
<li>Neo4j + Neo4j Streams module + APOC procedures</li>
<li>Apache Kafka</li>
<li>Apache Spark</li>
<li>Apache Zeppelin</li>
</ul>
<p><img src="https://cdn-media-1.freecodecamp.org/images/lMf2OG4Sw0iv1hVUHehYkcF9b0xFk5H9I8Qe" alt="Image" width="800" height="180" loading="lazy">
<em>The whole architecture based on Docker containers</em></p>
<p>By going into Apache Zeppelin @ <code>http://localhost:8080</code> you’ll find in the directory <code>Medium/Part 2</code> one notebook “<strong>From Open Data to Sink</strong>” which is the subject of this article.</p>
<h4 id="heading-the-open-data-api">The Open Data API</h4>
<p>We’ll choose the Italian Ministry of Health dataset of Pharmacy stores.</p>
<h4 id="heading-define-the-sink-query">Define the Sink Query</h4>
<p>If you go into the <code>[d](http://localhost:8080)ocker-compose.yml</code> file you’ll find a new property that corresponds to the Sink query definition:</p>
<pre><code>NEO4J_streams_sink_topic_cypher_pharma: <span class="hljs-string">"
MERGE (p:Pharmacy{fiscalId: event.FISCAL_ID}) ON CREATE SET p.name = event.NAME
MERGE (t:PharmacyType{type: event.TYPE_NAME})
MERGE (a:Address{name: event.ADDRESS + ', ' + event.CITY})
  ON CREATE SET a.latitude = event.LATITUDE,
                a.longitude = event.LONGITUDE,
                a.code = event.POSTAL_CODE,
                a.point = event.POINT
MERGE (c:City{name: event.CITY})
MERGE (p)-[:IS_TYPE]-(t)
MERGE (p)-[:HAS_ADDRESS]-(a)
MERGE (a)-[:IS_LOCATED_IN]-&gt;(c)"</span>
</code></pre><p>The <code>NEO4J_streams_sink_topic_cypher_pharma</code> property defines that all the data that comes from a topic named <code>pharma</code> will be consumed with the corresponding query.</p>
<p>The graph model that results from the query above is:</p>
<p><img src="https://cdn-media-1.freecodecamp.org/images/mMpAsz0co84uukQ95JDiNSWYQvnMrM7PcYsk" alt="Image" width="481" height="134" loading="lazy">
<em>Our data model</em></p>
<h4 id="heading-the-notebook-from-open-data-to-sink">The Notebook — <strong>From Open Data to Sink</strong></h4>
<p>The first step is download the CSV from the Open Data Portal and load it into a Spark Dataframe:</p>
<pre><code>val fileUrl = z.input(<span class="hljs-string">"File Url"</span>).toString

val url = <span class="hljs-keyword">new</span> java.net.URL(fileUrl)
val localFilePath = s<span class="hljs-string">"/zeppelin/spark-warehouse/${url.getPath.split("</span>/<span class="hljs-string">").last}"</span>

val src = scala.io.Source.fromURL(fileUrl)(<span class="hljs-string">"ISO-8859-1"</span>)
val out = <span class="hljs-keyword">new</span> java.io.FileWriter(localFilePath)
out.write(src.mkString)
out.close

val csvDF = (spark.read
    .format(<span class="hljs-string">"csv"</span>)
    .option(<span class="hljs-string">"delimiter"</span>, <span class="hljs-string">";"</span>)
    .option(<span class="hljs-string">"header"</span>, <span class="hljs-string">"true"</span>)
    .load(localFilePath))
</code></pre><p>Now let’s explore the structure of the <code>csvDF</code>:</p>
<pre><code>root
|-- CODICEIDENTIFICATIVOFARMACIA: string (nullable = <span class="hljs-literal">true</span>)
|-- CODFARMACIAASSEGNATODAASL: string (nullable = <span class="hljs-literal">true</span>)
|-- INDIRIZZO: string (nullable = <span class="hljs-literal">true</span>)
|-- DESCRIZIONEFARMACIA: string (nullable = <span class="hljs-literal">true</span>)
|-- PARTITAIVA: string (nullable = <span class="hljs-literal">true</span>)
|-- CAP: string (nullable = <span class="hljs-literal">true</span>)
|-- CODICECOMUNEISTAT: string (nullable = <span class="hljs-literal">true</span>)
|-- DESCRIZIONECOMUNE: string (nullable = <span class="hljs-literal">true</span>)
|-- FRAZIONE: string (nullable = <span class="hljs-literal">true</span>)
|-- CODICEPROVINCIAISTAT: string (nullable = <span class="hljs-literal">true</span>)
|-- SIGLAPROVINCIA: string (nullable = <span class="hljs-literal">true</span>)
|-- DESCRIZIONEPROVINCIA: string (nullable = <span class="hljs-literal">true</span>)
|-- CODICEREGIONE: string (nullable = <span class="hljs-literal">true</span>)
|-- DESCRIZIONEREGIONE: string (nullable = <span class="hljs-literal">true</span>)
|-- DATAINIZIOVALIDITA: string (nullable = <span class="hljs-literal">true</span>)
|-- DATAFINEVALIDITA: string (nullable = <span class="hljs-literal">true</span>)
|-- DESCRIZIONETIPOLOGIA: string (nullable = <span class="hljs-literal">true</span>)
|-- CODICETIPOLOGIA: string (nullable = <span class="hljs-literal">true</span>)
|-- LATITUDINE: string (nullable = <span class="hljs-literal">true</span>)
|-- LONGITUDINE: string (nullable = <span class="hljs-literal">true</span>)
|-- LOCALIZE: string (nullable = <span class="hljs-literal">true</span>)
</code></pre><p>We want to focus on two fields:</p>
<ul>
<li><strong>CODICEIDENTIFICATIVOFARMACIA</strong>: it “should” be the unique identifier given by the Italian Ministry of Health to a Pharmacy Store</li>
<li><strong>DATAFINEVALIDITA</strong>: it indicates if the Pharmacy Store is still active (if it has no value it is active, otherwise it is closed)</li>
</ul>
<p>We now save the Dataframe into a Spark temp view called <code>OPEN_DATA</code>:</p>
<pre><code>csvDF.createOrReplaceTempView(<span class="hljs-string">"open_data"</span>)
</code></pre><p>Let’s now overwrite the <code>OPEN_DATA</code> temp view by filtering the dataset for valid records and renaming some fields:</p>
<pre><code>%sql
CREATE OR REPLACE TEMP VIEW OPEN_DATA AS
SELECT CODICEIDENTIFICATIVOFARMACIA AS PHARMA_ID,
 INDIRIZZO AS ADDRESS,
 DESCRIZIONEFARMACIA AS NAME,
 PARTITAIVA AS FISCAL_ID,
 CAP AS POSTAL_CODE,
 DESCRIZIONECOMUNE AS CITY,
 DESCRIZIONEPROVINCIA AS PROVINCE,
 DATAFINEVALIDITA,
 DESCRIZIONETIPOLOGIA AS TYPE_NAME,
 CODICETIPOLOGIA AS TYPE,
 REPLACE(LATITUDINE, ‘,’, ‘.’) AS LATITUDE,
 REPLACE(LONGITUDINE, ‘,’, ‘.’) AS LONGITUDE,
 REPLACE(LATITUDINE, ‘,’, ‘.’) || ‘,’ || REPLACE(LONGITUDINE, ‘,’, ‘.’) AS POINT
FROM OPEN_DATA
WHERE DATAFINEVALIDITA &lt;&gt; ‘-’
AND CODICEIDENTIFICATIVOFARMACIA &lt;&gt; ‘-’
</code></pre><p>Let’s now create the <code>OPEN_DATA_KAFKA_STAGE</code> temp view that must contain two columns:</p>
<ul>
<li><strong>VALUE</strong>: JSON that represents the data that we want to send to the Kafka topic</li>
<li><strong>KEY</strong>: a key that identifies the row</li>
</ul>
<p>You may notice that this is exactly the minimum requirement for a <code>ProducerRecord:</code></p>
<pre><code>%sql
CREATE OR REPLACE TEMP VIEW OPEN_DATA_KAFKA_STAGE AS
SELECT TO_JSON(
    STRUCT(PHARMA_ID,
        ADDRESS,
        NAME,
        FISCAL_ID,
        POSTAL_CODE,
        CITY,
        PROVINCE,
        TYPE_NAME,
        TYPE,
        LATITUDE,
        LONGITUDE,
        POINT)
    ) AS VALUE,
    PHARMA_ID AS KEY
FROM OPEN_DATA
</code></pre><p>Let’s now send the data to the <code>pharma</code> topic via spark:</p>
<pre><code>(spark.table(<span class="hljs-string">"OPEN_DATA_KAFKA_STAGE"</span>).selectExpr(<span class="hljs-string">"CAST(key AS STRING)"</span>, <span class="hljs-string">"CAST(value AS STRING)"</span>)
    .write
    .format(<span class="hljs-string">"kafka"</span>)
    .option(<span class="hljs-string">"kafka.enable.auto.commit"</span>, <span class="hljs-string">"true"</span>)
    .option(<span class="hljs-string">"kafka.bootstrap.servers"</span>, <span class="hljs-string">"broker:9093"</span>)
    .option(<span class="hljs-string">"topic"</span>, <span class="hljs-string">"pharma"</span>)
    .save())
</code></pre><p>The data streamed to the <code>pharma</code> topic via the spark job will now be consumed from the Neo4j Streams Sink module thanks to the Cypher template that we defined at the beginning of the article.</p>
<p>Now in the final paragraph, we can explore the ingested data. In the following video we are exploring all the Pharmacy stores located in Turin:</p>
<p><img src="https://cdn-media-1.freecodecamp.org/images/w4WY8A3yV-wnxKngp167PaDrrlZxNnZKfwyZ" alt="Image" width="1665" height="542" loading="lazy">
<em>Explore the data just ingested</em></p>
<h3 id="heading-wrapping-up">Wrapping up</h3>
<p>In this second article (<a target="_blank" href="https://medium.freecodecamp.org/how-to-leverage-neo4j-streams-and-build-a-just-in-time-data-warehouse-64adf290f093">please check the first one</a> if you haven’t already) we have seen how to use the SINK module in order to transform Apache Kafka events into arbitrary Graph Structures. You can do it in a very simple way by using the Apache Spark APIs.</p>
<p>In Part 3 we’ll discover how to use the Streams procedure in order to produce/consume data directly via Cypher queries, so please stay tuned!</p>
<p>If you have already tested the Neo4j-Streams module or tested it via this notebook please fill out our <a target="_blank" href="https://goo.gl/forms/VLwvqwsIvdfdm9fL2"><strong>feedback survey</strong></a>.</p>
<p>If you run into any issues or have thoughts about improving our work, <a target="_blank" href="http://github.com/neo4j-contrib/neo4j-streams/issues">please raise a GitHub issue</a>.</p>
 ]]>
                </content:encoded>
            </item>
        
            <item>
                <title>
                    <![CDATA[ How to leverage Neo4j Streams and build a just-in-time data warehouse ]]>
                </title>
                <description>
                    <![CDATA[ By Andrea Santurbano In this article, we’ll show how to create a Just-In-Time Data Warehouse by using Neo4j and the Neo4j Streams module with Apache Spark’s Structured Streaming Apis and Apache Kafka. In order to show how to integrate them, simplify ... ]]>
                </description>
                <link>https://www.freecodecamp.org/news/how-to-leverage-neo4j-streams-and-build-a-just-in-time-data-warehouse-64adf290f093/</link>
                <guid isPermaLink="false">66c3531b0107ba195e79f72a</guid>
                
                    <category>
                        <![CDATA[ Apache Kafka ]]>
                    </category>
                
                    <category>
                        <![CDATA[ kafka ]]>
                    </category>
                
                    <category>
                        <![CDATA[ Neo4j ]]>
                    </category>
                
                    <category>
                        <![CDATA[ General Programming ]]>
                    </category>
                
                    <category>
                        <![CDATA[ streaming ]]>
                    </category>
                
                <dc:creator>
                    <![CDATA[ freeCodeCamp ]]>
                </dc:creator>
                <pubDate>Tue, 29 Jan 2019 16:37:47 +0000</pubDate>
                <media:content url="https://cdn-media-1.freecodecamp.org/images/1*lwaAjWM8LuAvRZ1T67vWQw.jpeg" medium="image" />
                <content:encoded>
                    <![CDATA[ <p>By Andrea Santurbano</p>
<p>In this article, we’ll show how to create a <a target="_blank" href="https://databricks.com/blog/2015/11/30/building-a-just-in-time-data-warehouse-platform-with-databricks.html">Just-In-Time Data Warehouse</a> by using <a target="_blank" href="https://neo4j.com/"><strong>Neo4j</strong></a> <strong>and the <a target="_blank" href="https://medium.com/neo4j/a-new-neo4j-integration-with-apache-kafka-6099c14851d2">Neo4j Streams</a></strong> module with <strong>Apache Spark</strong>’s Structured Streaming Apis and <strong>Apache Kafka.</strong></p>
<p>In order to show how to integrate them, simplify the integration, and let you test the whole project by hand, I’ll use <a target="_blank" href="https://towardsdatascience.com/building-a-graph-data-pipeline-with-zeppelin-spark-and-neo4j-8b6b83f4fb70"><strong>Apache Zeppelin</strong></a> <strong>a notebook runner that simply allows to <a target="_blank" href="https://towardsdatascience.com/building-a-graph-data-pipeline-with-zeppelin-spark-and-neo4j-8b6b83f4fb70">natively interact with Neo4j</a>.</strong></p>
<p><img src="https://cdn-media-1.freecodecamp.org/images/qrtYkmywS6MwhLmsVKauFIQHuuu2vKwNUmp7" alt="Image" width="800" height="339" loading="lazy">
<em>The final result: how a kafka event streamed by Neo4j gets collected by Apache Spark</em></p>
<h3 id="heading-leveraging-neo4j-streams">Leveraging Neo4j Streams</h3>
<p>The Neo4j Streams project is composed of three main pillars:</p>
<ul>
<li>The <strong>Change Data Capture</strong> (the subject of this first article) that allows us to stream database changes over Kafka topics</li>
<li>The <strong>Sink</strong> that allows consuming data streams from the Kafka topic</li>
<li>A <strong>set of procedures</strong> that allows us to Produce/Consume data to/from Kafka Topics</li>
</ul>
<h3 id="heading-what-is-a-change-data-capture">What is a Change Data Capture?</h3>
<p>It’s a system that automatically captures changes from a source system (a Database, for instance) and automatically provides these changes to downstream systems for a variety of use cases.</p>
<p>CDC typically forms part of an ETL pipeline. This is an important component for ensuring Data Warehouses (DWH) are kept up to date with any record changes.</p>
<p>Also traditionally CDC applications used to work off of transaction logs, thereby allowing us to replicate databases without having much of a performance impact on its operation.</p>
<h3 id="heading-how-does-the-neo4j-streams-cdc-module-deal-with-database-changes">How does the Neo4j Streams CDC module deal with database changes?</h3>
<p>Every transaction inside Neo4j gets captured and transformed in order to stream an atomic element of the transaction.</p>
<p>Let’s suppose we have a simple creation of two nodes and one relationship between them:</p>
<pre><code>CREATE (andrea:Person{<span class="hljs-attr">name</span>:<span class="hljs-string">"Andrea"</span>})-[knows:KNOWS{<span class="hljs-attr">since</span>:<span class="hljs-number">2014</span>}]-&amp;gt;(michael:Person{<span class="hljs-attr">name</span>:<span class="hljs-string">"Michael"</span>})
</code></pre><p>The CDC module will transform this transaction into 3 events (2 node creation, 1 relationship creation).</p>
<p>The Event structure was inspired by the <a target="_blank" href="https://debezium.io/">Debezium</a> format and has the following general structure:</p>
<pre><code>{  <span class="hljs-string">"meta"</span>: { <span class="hljs-comment">/* transaction meta-data */</span> },  <span class="hljs-string">"payload"</span>: { <span class="hljs-comment">/* the data related to the transaction */</span>    <span class="hljs-string">"before"</span>: { <span class="hljs-comment">/* the data before the transaction */</span>},    <span class="hljs-string">"after"</span>: { <span class="hljs-comment">/* the data after the transaction */</span>}  }}
</code></pre><p>Node source <code>(andrea)</code>:</p>
<pre><code>{  <span class="hljs-string">"meta"</span>: {    <span class="hljs-string">"timestamp"</span>: <span class="hljs-number">1532597182604</span>,    <span class="hljs-string">"username"</span>: <span class="hljs-string">"neo4j"</span>,    <span class="hljs-string">"tx_id"</span>: <span class="hljs-number">1</span>,    <span class="hljs-string">"tx_event_id"</span>: <span class="hljs-number">0</span>,    <span class="hljs-string">"tx_events_count"</span>: <span class="hljs-number">3</span>,    <span class="hljs-string">"operation"</span>: <span class="hljs-string">"created"</span>,    <span class="hljs-string">"source"</span>: {      <span class="hljs-string">"hostname"</span>: <span class="hljs-string">"neo4j.mycompany.com"</span>    }  },  <span class="hljs-string">"payload"</span>: {    <span class="hljs-string">"id"</span>: <span class="hljs-string">"1004"</span>,    <span class="hljs-string">"type"</span>: <span class="hljs-string">"node"</span>,    <span class="hljs-string">"after"</span>: {      <span class="hljs-string">"labels"</span>: [<span class="hljs-string">"Person"</span>],      <span class="hljs-string">"properties"</span>: {        <span class="hljs-string">"name"</span>: <span class="hljs-string">"Andrea"</span>      }    }  }}
</code></pre><p>Node target <code>(michael)</code>:</p>
<pre><code>{  <span class="hljs-string">"meta"</span>: {    <span class="hljs-string">"timestamp"</span>: <span class="hljs-number">1532597182604</span>,    <span class="hljs-string">"username"</span>: <span class="hljs-string">"neo4j"</span>,    <span class="hljs-string">"tx_id"</span>: <span class="hljs-number">1</span>,    <span class="hljs-string">"tx_event_id"</span>: <span class="hljs-number">1</span>,    <span class="hljs-string">"tx_events_count"</span>: <span class="hljs-number">3</span>,    <span class="hljs-string">"operation"</span>: <span class="hljs-string">"created"</span>,    <span class="hljs-string">"source"</span>: {      <span class="hljs-string">"hostname"</span>: <span class="hljs-string">"neo4j.mycompany.com"</span>    }  },  <span class="hljs-string">"payload"</span>: {    <span class="hljs-string">"id"</span>: <span class="hljs-string">"1006"</span>,    <span class="hljs-string">"type"</span>: <span class="hljs-string">"node"</span>,    <span class="hljs-string">"after"</span>: {      <span class="hljs-string">"labels"</span>: [<span class="hljs-string">"Person"</span>],      <span class="hljs-string">"properties"</span>: {        <span class="hljs-string">"name"</span>: <span class="hljs-string">"Michael"</span>      }    }  }}
</code></pre><p>Relationship <code>knows</code>:</p>
<pre><code>{  <span class="hljs-string">"meta"</span>: {    <span class="hljs-string">"timestamp"</span>: <span class="hljs-number">1532597182604</span>,    <span class="hljs-string">"username"</span>: <span class="hljs-string">"neo4j"</span>,    <span class="hljs-string">"tx_id"</span>: <span class="hljs-number">1</span>,    <span class="hljs-string">"tx_event_id"</span>: <span class="hljs-number">2</span>,    <span class="hljs-string">"tx_events_count"</span>: <span class="hljs-number">3</span>,    <span class="hljs-string">"operation"</span>: <span class="hljs-string">"created"</span>,    <span class="hljs-string">"source"</span>: {      <span class="hljs-string">"hostname"</span>: <span class="hljs-string">"neo4j.mycompany.com"</span>    }  },  <span class="hljs-string">"payload"</span>: {    <span class="hljs-string">"id"</span>: <span class="hljs-string">"1007"</span>,    <span class="hljs-string">"type"</span>: <span class="hljs-string">"relationship"</span>,    <span class="hljs-string">"label"</span>: <span class="hljs-string">"KNOWS"</span>,    <span class="hljs-string">"start"</span>: {      <span class="hljs-string">"labels"</span>: [<span class="hljs-string">"Person"</span>],      <span class="hljs-string">"id"</span>: <span class="hljs-string">"1005"</span>    },    <span class="hljs-string">"end"</span>: {      <span class="hljs-string">"labels"</span>: [<span class="hljs-string">"Person"</span>],      <span class="hljs-string">"id"</span>: <span class="hljs-string">"106"</span>    },    <span class="hljs-string">"after"</span>: {      <span class="hljs-string">"properties"</span>: {        <span class="hljs-string">"since"</span>: <span class="hljs-number">2014</span>      }    }  }}
</code></pre><p>By default, all the data will be streamed on the <code>neo4j</code> topic. The CDC module allows controlling which nodes are sent to Kafka, and which of their properties you want to send to the topic:</p>
<pre><code>streams.source.topic.nodes.&lt;TOPIC_NAME&gt;=<span class="xml"><span class="hljs-tag">&lt;<span class="hljs-name">PATTERN</span>&gt;</span></span>
</code></pre><p>With the following example:</p>
<pre><code>streams.source.topic.nodes.products=Product{name, code}
</code></pre><p>The CDC module will send to the <code>products</code> topic all the nodes that have the label <code>Product</code>. It then sends, to that topic, only the changes about <code>name</code> and <code>code</code> properties. Please go the official documentation for a full description on <a target="_blank" href="https://neo4j-contrib.github.io/neo4j-streams/#_patterns">how label filtering works</a>.</p>
<p>For a more in-depth description of the Neo4j Streams project and how/why we at <a target="_blank" href="http://www.larus-ba.it/"><strong>LARUS</strong></a> and <a target="_blank" href="https://neo4j.com/"><strong>Neo4j</strong></a> built it, check out this article that provides an <a target="_blank" href="https://medium.com/neo4j/a-new-neo4j-integration-with-apache-kafka-6099c14851d2">in-depth description</a>.</p>
<h3 id="heading-beyond-the-traditional-data-warehouse">Beyond the traditional Data Warehouse</h3>
<p>A traditional DWH requires data teams to constantly build multiple costly and time-consuming Extract Transform Load (ETL) pipelines to ultimately derive business insights.</p>
<p>One of the biggest pain points is that, due to its <strong>rigid architecture that’s difficult to change</strong>, Enterprise Data Warehouses are <strong>inherently rigid.</strong> That’s because:</p>
<ul>
<li>they are <strong>based on the</strong> <strong>Schema-On-Write architecture:</strong> first, you define your schema, then you write your data, then you read your data and it comes back in the schema you defined up-front</li>
<li>they are <strong>based</strong> on (expensive) <strong>batched/scheduled jobs</strong></li>
</ul>
<p><strong>This results in having to build costly and time-consuming ETL pipelines</strong> to access and manipulate the data. And as <strong>new data types</strong> and sources are introduced, the need to augment your ETL pipelines <strong>exacerbates the problem</strong>.</p>
<p>Thanks to the <strong>combination</strong> of the stream data processing with the <strong>Neo4j Streams CDC module</strong> and the <strong>Schema-On-Read</strong> approach provided by Apache Spark, we can <strong>overcome this rigidity</strong> and build a new kind of (flexible) DWH.</p>
<h3 id="heading-a-paradigm-shift-just-in-time-data-warehouse">A paradigm shift: Just-In-Time Data Warehouse</h3>
<p>A JIT-DWH solution is designed to easily handle a wider variety of data from different sources and starts from a different approach about how to deal with and manage data: <strong>Schema-On-Read.</strong></p>
<h3 id="heading-schema-on-read">Schema-On-Read</h3>
<p><a target="_blank" href="https://www.marklogic.com/blog/schema-on-read-vs-schema-on-write/">Schema-On-Read</a> follows a different sequence: <strong>it just loads the data as-is and applies your own lens to the data when you read it back out</strong>. With this kind of approach, you can present data in a schema that is adapted best to the queries being issued. You’re not stuck with a one-size-fits-all schema. With schema-on-read, you can present the data back in a schema that is most relevant to the task at hand.</p>
<h4 id="heading-set-up-the-environment">Set-Up the Environment</h4>
<p>Going to the following <a target="_blank" href="https://github.com/conker84/leveraging-neo4j-streams"><strong>Github repo</strong></a> you’ll find everything you need in order to replicate what I’m presenting in this article. What you will need to start is <a target="_blank" href="https://docs.docker.com/"><strong>Docker</strong></a><strong>.</strong> Then you can simply spin-up the stack by entering into the directory and from the Terminal, executing the following command:</p>
<pre><code>$ docker-compose up
</code></pre><p>This will start-up the whole environment that comprises:</p>
<ul>
<li>Neo4j + Neo4j Streams module + APOC procedures</li>
<li>Apache Kafka</li>
<li>Apache Spark</li>
<li>Apache Zeppelin</li>
</ul>
<p><img src="https://cdn-media-1.freecodecamp.org/images/j4n2GGDDTdZZFuoNuyP9eioEs8C4aAk5hfg0" alt="Image" width="800" height="213" loading="lazy">
<em>The whole architecture based on Docker containers</em></p>
<p>By going into Apache Zeppelin @ <code>http://localhost:8080</code> you’ll find in the directory <code>Medium/Part 1</code> two notebooks:</p>
<ul>
<li><strong>Create a Just-In-Time Data Warehouse</strong>: in this notebook, we will build the JIT-DWH</li>
<li><strong>Query The JIT-DWH</strong>: in this notebook, we will perform some queries over the JIT-DWH</li>
</ul>
<h3 id="heading-the-use-case">The Use-Case:</h3>
<p>We’ll create a fake social network like dataset. This will activate the CDC module of Neo4j Stream, and via Apache Spark we’ll intercept this event and persist them on the File System as JSON.</p>
<p>Then we’ll demonstrate how new fields added in our nodes will be automatically added to our JIT-DWL without the modification of the ETL pipeline, thanks to the Schema-On-Read approach.</p>
<p>We’ll execute the following steps:</p>
<ol>
<li>Create the fake data set</li>
<li>Build our data pipeline that intercepts the Kafka events published by the Neo4j Streams CDC module</li>
<li>Make the first query over our JIT-DWH on Spark</li>
<li>Add a new field in our graph model</li>
<li>Show how the new field is automatically exposed in real time thanks to the Neo4j Streams CDC module (without the need for changes over our ETL pipeline thanks to the Schema-On-Read approach).</li>
</ol>
<h3 id="heading-notebook-1-create-a-just-in-time-data-warehouse">Notebook 1: Create a Just-In-Time Data Warehouse</h3>
<p>We’ll create a fake social network by using the APOC <code>apoc.periodic.repeat</code> procedure that executes this query every 15 seconds:</p>
<pre><code>WITH [<span class="hljs-string">"M"</span>, <span class="hljs-string">"F"</span>, <span class="hljs-string">""</span>] AS genderUNWIND range(<span class="hljs-number">1</span>, <span class="hljs-number">10</span>) AS idCREATE (p:Person {<span class="hljs-attr">id</span>: apoc.create.uuid(), <span class="hljs-attr">name</span>: <span class="hljs-string">"Name-"</span> +  apoc.text.random(<span class="hljs-number">10</span>), <span class="hljs-attr">age</span>: round(rand() * <span class="hljs-number">100</span>), <span class="hljs-attr">index</span>: id, <span class="hljs-attr">gender</span>: gender[toInteger(size(gender) * rand())]})WITH collect(p) AS peopleUNWIND people AS p1UNWIND range(<span class="hljs-number">1</span>, <span class="hljs-number">3</span>) AS friendWITH p1, people[(p1.index + friend) % size(people)] AS p2CREATE (p1)-[:KNOWS{<span class="hljs-attr">years</span>: round(rand() * <span class="hljs-number">10</span>), <span class="hljs-attr">engaged</span>: (rand() &gt; <span class="hljs-number">0.5</span>)}]-&amp;gt;(p2)
</code></pre><p>If you need more details about the APOC project, please follow this <a target="_blank" href="https://neo4j-contrib.github.io/neo4j-apoc-procedures/">link</a>.</p>
<p>So the resulting graph model is quite straightforward:</p>
<p><img src="https://cdn-media-1.freecodecamp.org/images/ZxNqMi-FAYLjHLNr7hFEJ21Bdoq6UtA2Qus2" alt="Image" width="95" height="165" loading="lazy">
<em>The Graph Model</em></p>
<p>Let’s create an index over the Person node:</p>
<pre><code>%neo4jCREATE INDEX ON :Person(id)
</code></pre><p>Now let’s set the Background Job in Neo4j:</p>
<pre><code>%neo4jCALL apoc.periodic.repeat(<span class="hljs-string">'create-fake-social-data'</span>, <span class="hljs-string">'WITH ["M", "F", "X"] AS gender UNWIND range(1, 10) AS id CREATE (p:Person {id: apoc.create.uuid(), name: "Name-" +  apoc.text.random(10), age: round(rand() * 100), index: id, gender: gender[toInteger(size(gender) * rand())]}) WITH collect(p) AS people UNWIND people AS p1 UNWIND range(1, 3) AS friend WITH p1, people[(p1.index + friend) % size(people)] AS p2 CREATE (p1)-[:KNOWS{years: round(rand() * 10), engaged: (rand() &gt; 0.5)}]-&gt;(p2)'</span>, <span class="hljs-number">15</span>) YIELD nameRETURN name AS created
</code></pre><p>This background query brings the Neo4j-Streams CDC module to stream related events over the “neo4j” Kafka topic (the default topic of the CDC).</p>
<p>Now let’s create a Structured Streaming Dataset that consumes the data from the “neo4j” topic:</p>
<pre><code>val kafkaStreamingDF = (spark    .readStream    .format(<span class="hljs-string">"kafka"</span>)    .option(<span class="hljs-string">"kafka.bootstrap.servers"</span>, <span class="hljs-string">"broker:9093"</span>)    .option(<span class="hljs-string">"startingoffsets"</span>, <span class="hljs-string">"earliest"</span>)    .option(<span class="hljs-string">"subscribe"</span>, <span class="hljs-string">"neo4j"</span>)    .load())
</code></pre><p>The <code>kafkaStreamingDF</code> Dataframe is basically a <code>ProducerRecord</code> representation. And in fact its schema is:</p>
<pre><code>root|-- key: binary (nullable = <span class="hljs-literal">true</span>)|-- value: binary (nullable = <span class="hljs-literal">true</span>)|-- topic: string (nullable = <span class="hljs-literal">true</span>)|-- partition: integer (nullable = <span class="hljs-literal">true</span>)|-- offset: long (nullable = <span class="hljs-literal">true</span>)|-- timestamp: timestamp (nullable = <span class="hljs-literal">true</span>)|-- timestampType: integer (nullable = <span class="hljs-literal">true</span>)
</code></pre><p>Now let’s create the Structure of the data streamed by the CDC using the Spark APIs in order to read the streamed data:</p>
<pre><code>val cdcMetaSchema = (<span class="hljs-keyword">new</span> StructType()    .add(<span class="hljs-string">"timestamp"</span>, LongType)    .add(<span class="hljs-string">"username"</span>, StringType)    .add(<span class="hljs-string">"operation"</span>, StringType)    .add(<span class="hljs-string">"source"</span>, MapType(StringType, StringType, <span class="hljs-literal">true</span>)))    val cdcPayloadSchemaBeforeAfter = (<span class="hljs-keyword">new</span> StructType()    .add(<span class="hljs-string">"labels"</span>, ArrayType(StringType, <span class="hljs-literal">false</span>))    .add(<span class="hljs-string">"properties"</span>, MapType(StringType, StringType, <span class="hljs-literal">true</span>)))    val cdcPayloadSchema = (<span class="hljs-keyword">new</span> StructType()    .add(<span class="hljs-string">"id"</span>, StringType)    .add(<span class="hljs-string">"type"</span>, StringType)    .add(<span class="hljs-string">"label"</span>, StringType)    .add(<span class="hljs-string">"start"</span>, MapType(StringType, StringType, <span class="hljs-literal">true</span>))    .add(<span class="hljs-string">"end"</span>, MapType(StringType, StringType, <span class="hljs-literal">true</span>))    .add(<span class="hljs-string">"before"</span>, cdcPayloadSchemaBeforeAfter)    .add(<span class="hljs-string">"after"</span>, cdcPayloadSchemaBeforeAfter))    val cdcSchema = (<span class="hljs-keyword">new</span> StructType()    .add(<span class="hljs-string">"meta"</span>, cdcMetaSchema)    .add(<span class="hljs-string">"payload"</span>, cdcPayloadSchema))
</code></pre><p>The <code>cdcSchema</code> is suitable for both node and relationships events.</p>
<p>What we need now is to extract only the CDC event from the Dataframe, so let’s perform a simple transformation query over Spark:</p>
<pre><code>val cdcDataFrame = (kafkaStreamingDF    .selectExpr(<span class="hljs-string">"CAST(value AS STRING) AS VALUE"</span>)    .select(from_json(<span class="hljs-string">'VALUE, cdcSchema) as '</span><span class="hljs-built_in">JSON</span>))
</code></pre><p>The <code>cdcDataFrame</code> contains just one column <strong>JSON</strong> which is the data streamed from the Neo4j-Streams CDC module.</p>
<p>Let’s perform a simple ETL query in order to extract fields of interest:</p>
<pre><code>val dataWarehouseDataFrame = (cdcDataFrame    .where(<span class="hljs-string">"json.payload.type = 'node' and (array_contains(nvl(json.payload.after.labels, json.payload.before.labels), 'Person'))"</span>)    .selectExpr(<span class="hljs-string">"json.payload.id AS neo_id"</span>, <span class="hljs-string">"CAST(json.meta.timestamp / 1000 AS Timestamp) AS timestamp"</span>,        <span class="hljs-string">"json.meta.source.hostname AS host"</span>,        <span class="hljs-string">"json.meta.operation AS operation"</span>,        <span class="hljs-string">"nvl(json.payload.after.labels, json.payload.before.labels) AS labels"</span>,        <span class="hljs-string">"explode(json.payload.after.properties)"</span>))
</code></pre><p>This query is quite important, because it represents how the data will be persisted over the filesystem. Every node will be <strong>exploded</strong> in a number of JSON snippets, one for each node property, just like this:</p>
<pre><code>{<span class="hljs-string">"neo_id"</span>:<span class="hljs-string">"35340"</span>,<span class="hljs-string">"timestamp"</span>:<span class="hljs-string">"2018-12-19T23:07:10.465Z"</span>,<span class="hljs-string">"host"</span>:<span class="hljs-string">"neo4j"</span>,<span class="hljs-string">"operation"</span>:<span class="hljs-string">"created"</span>,<span class="hljs-string">"labels"</span>:[<span class="hljs-string">"Person"</span>],<span class="hljs-string">"key"</span>:<span class="hljs-string">"name"</span>,<span class="hljs-string">"value"</span>:<span class="hljs-string">"Name-5wc62uKO5l"</span>}
</code></pre><pre><code>{<span class="hljs-string">"neo_id"</span>:<span class="hljs-string">"35340"</span>,<span class="hljs-string">"timestamp"</span>:<span class="hljs-string">"2018-12-19T23:07:10.465Z"</span>,<span class="hljs-string">"host"</span>:<span class="hljs-string">"neo4j"</span>,<span class="hljs-string">"operation"</span>:<span class="hljs-string">"created"</span>,<span class="hljs-string">"labels"</span>:[<span class="hljs-string">"Person"</span>],<span class="hljs-string">"key"</span>:<span class="hljs-string">"index"</span>,<span class="hljs-string">"value"</span>:<span class="hljs-string">"8"</span>}
</code></pre><pre><code>{<span class="hljs-string">"neo_id"</span>:<span class="hljs-string">"35340"</span>,<span class="hljs-string">"timestamp"</span>:<span class="hljs-string">"2018-12-19T23:07:10.465Z"</span>,<span class="hljs-string">"host"</span>:<span class="hljs-string">"neo4j"</span>,<span class="hljs-string">"operation"</span>:<span class="hljs-string">"created"</span>,<span class="hljs-string">"labels"</span>:[<span class="hljs-string">"Person"</span>],<span class="hljs-string">"key"</span>:<span class="hljs-string">"id"</span>,<span class="hljs-string">"value"</span>:<span class="hljs-string">"944e58bf-0cf7-49cf-af4a-c803d44f222a"</span>}
</code></pre><pre><code>{<span class="hljs-string">"neo_id"</span>:<span class="hljs-string">"35340"</span>,<span class="hljs-string">"timestamp"</span>:<span class="hljs-string">"2018-12-19T23:07:10.465Z"</span>,<span class="hljs-string">"host"</span>:<span class="hljs-string">"neo4j"</span>,<span class="hljs-string">"operation"</span>:<span class="hljs-string">"created"</span>,<span class="hljs-string">"labels"</span>:[<span class="hljs-string">"Person"</span>],<span class="hljs-string">"key"</span>:<span class="hljs-string">"gender"</span>,<span class="hljs-string">"value"</span>:<span class="hljs-string">"F"</span>}
</code></pre><p>This kind of structure can be easily turned into tabular representation (we’ll see in the next few steps how to do this).</p>
<p>Now let's write a Spark continuous streaming query that saves the data to the file system as JSON:</p>
<pre><code>val writeOnDisk = (dataWarehouseDataFrame    .writeStream    .format(<span class="hljs-string">"json"</span>)    .option(<span class="hljs-string">"checkpointLocation"</span>, <span class="hljs-string">"/zeppelin/spark-warehouse/jit-dwh/checkpoint"</span>)    .option(<span class="hljs-string">"path"</span>, <span class="hljs-string">"/zeppelin/spark-warehouse/jit-dwh"</span>)    .queryName(<span class="hljs-string">"nodes"</span>)    .start())
</code></pre><p>We have now created a simple JIT-DWH. In the second notebook we’ll learn how to query it and how simple it is to deal with dynamical changes in the data structures thanks schema-on-read.</p>
<h3 id="heading-notebook-2-query-the-jit-dwh">Notebook 2: Query The JIT-DWH</h3>
<p>The first paragraph let us query and display our JIT-DWH</p>
<pre><code>val flattenedDF = (spark.read.format(<span class="hljs-string">"json"</span>).load(<span class="hljs-string">"/zeppelin/spark-warehouse/jit-dwh/**"</span>)    .where(<span class="hljs-string">"neo_id is not null"</span>)    .groupBy(<span class="hljs-string">"neo_id"</span>, <span class="hljs-string">"timestamp"</span>, <span class="hljs-string">"host"</span>, <span class="hljs-string">"labels"</span>, <span class="hljs-string">"operation"</span>)    .pivot(<span class="hljs-string">"key"</span>)    .agg(first($<span class="hljs-string">"value"</span>)))z.show(flattenedDF)
</code></pre><p>Remember how we saved the data in JSON some row above? The <code>flattenedDF</code> simply pivoted the JSONs over the <code>key</code> field thus grouping the data over 5 columns that represent the “unique key” (_“neo<em>id”, “timestamp”, “host”, “labels”, “operation”</em>). This allows us to have this tabular representation of the source data as follows:</p>
<p><img src="https://cdn-media-1.freecodecamp.org/images/XA4vskTWdUra50ncym941E78zHZcwGM6TY4q" alt="Image" width="800" height="277" loading="lazy">
<em>The result of the query</em></p>
<p>Now imagine that our Person dataset gets a new field: <strong>birth.</strong> Let's add this new field to one node; in this case, you must choose an id from your dataset and update it with the following paragraph:</p>
<p><img src="https://cdn-media-1.freecodecamp.org/images/onz1jNYkzyiaEAFAcYi-4f2lay8rtqs55PBM" alt="Image" width="800" height="276" loading="lazy">
<em>Just fill the form with your data and execute the paragraph</em></p>
<p>Now the final step: reuse the same query and filter the DWH by the id that we have previously changed in order to check how our dataset changed according to the changes made over Neo4j.</p>
<p><img src="https://cdn-media-1.freecodecamp.org/images/dTgF8U8-F--yYJOUm3BLa70MaOd0E5XAKnaH" alt="Image" width="800" height="339" loading="lazy">
<em>The birth field is present without changes to our queries</em></p>
<h3 id="heading-conclusions">Conclusions</h3>
<p>In this first part, we learned how to leverage the events produced by Neo4j Stream CDC module in order to build a simple (Real-Time) JIT-DWL that uses the Schema-On-Read approach.</p>
<p>In Part 2 we’ll discover how to use the Sink module in order to ingest data into Neo4j directly from Kafka.</p>
<p>If you have already tested the Neo4j-Streams module or tested it via these notebooks please fill out our <a target="_blank" href="https://goo.gl/forms/VLwvqwsIvdfdm9fL2"><strong>feedback survey</strong></a>.</p>
<p>If you run into any issues or have thoughts about improving our work, <a target="_blank" href="http://github.com/neo4j-contrib/neo4j-streams/issues">please raise a GitHub issue</a>.</p>
 ]]>
                </content:encoded>
            </item>
        
            <item>
                <title>
                    <![CDATA[ What to consider for painless Apache Kafka integration ]]>
                </title>
                <description>
                    <![CDATA[ By Adi Polak Apache Kafka’s real-world adoption is exploding, and it claims to dominate the world of stream data. It has a huge developer community all over the world that keeps on growing. But, it can be painful too. So, just before jumping head fir... ]]>
                </description>
                <link>https://www.freecodecamp.org/news/what-to-consider-for-painless-apache-kafka-integration-df559e828876/</link>
                <guid isPermaLink="false">66d45d5923b027d0ff16f2c2</guid>
                
                    <category>
                        <![CDATA[ Apache Kafka ]]>
                    </category>
                
                    <category>
                        <![CDATA[ architecture ]]>
                    </category>
                
                    <category>
                        <![CDATA[ big data ]]>
                    </category>
                
                    <category>
                        <![CDATA[ General Programming ]]>
                    </category>
                
                    <category>
                        <![CDATA[ tech  ]]>
                    </category>
                
                <dc:creator>
                    <![CDATA[ freeCodeCamp ]]>
                </dc:creator>
                <pubDate>Tue, 22 Jan 2019 18:04:27 +0000</pubDate>
                <media:content url="https://cdn-media-2.freecodecamp.org/w1280/5f9ca635740569d1a4ca6eb0.jpg" medium="image" />
                <content:encoded>
                    <![CDATA[ <p>By Adi Polak</p>
<p>Apache Kafka’s real-world adoption is exploding, and it claims to dominate the world of stream data. It has a huge developer community all over the world that keeps on growing. But, it can be painful too. So, just before jumping head first and fully integrating with Apache Kafka, let’s check the water and plan ahead for painless integration.</p>
<p><img src="https://cdn-media-1.freecodecamp.org/images/DdJhig6V7tsyyPxWQThFzKGd6R-8dbmw3-nw" alt="Image" width="800" height="533" loading="lazy">
_nPhoto by [Unsplash](https://unsplash.com/photos/5fNmWej4tAA?utm_source=unsplash&amp;utm_medium=referral&amp;utm_content=creditCopyText" rel="noopener" target="_blank" title=""&gt;Helloquence on &lt;a href="https://unsplash.com/search/photos/computer?utm_source=unsplash&amp;utm_medium=referral&amp;utm_content=creditCopyText" rel="noopener" target="<em>blank" title=")</em></p>
<h4 id="heading-what-is-it">What is it?</h4>
<p>Apache Kafka is an open source framework for asynchronous messaging and it’s a distributed streaming platform. It is TCP based. The messages are persisted in topics. Message producers are called <em>publishers</em> and message consumers are called <em>subscribers</em>.</p>
<p>Consumers can subscribe to <strong>one or more topics</strong> and consume all the messages in that topic. Messages are written into the topic partitions.</p>
<p><em>Topics</em> are always multilayer subscriber, they can have zero, one, or many consumers that subscribe to the data written to it. For each topic Kafka maintains a partition log. Metadata for the partition’s logs and topics are usually managed by Zookeeper.</p>
<p>If you would like to learn more about Kafka message delivery semantics — like, <em>at most once</em>, <em>at least once</em> and <em>exactly once —</em> read <a target="_blank" href="http://bit.ly/2Shu9L5">here</a>.</p>
<p>Many tech companies have already integrated Apache Kafka into their production as a <strong>message broker, user activities tracking pipeline, metrics gatherer, log aggregation mechanism, stream processing device</strong> and much more. Apache Kafka is written in Scala and Java.</p>
<h4 id="heading-why-kafka">Why Kafka?</h4>
<ul>
<li><em>Kafka provides <strong>High Availability</strong> and <strong>Fault Tolerance</strong> message logs.</em> Kafka clusters retain all published records. It is by default <strong>persistent —</strong> If you don’t set a limit for Kafka, it will keep records until it runs out of disk space. When <strong>data loss</strong> means awful failure for the product, this is essential for recovery.</li>
<li><strong>Multiple Topic Consumers</strong> — when configuring the consumers under multiple consumers groups, it helps to reduce the old bottleneck of sending the data to multiple applications for processing. Kafka is distributed, hence, it can send information to consumers from various physical machines/services instances. Replicating topics to a secondary cluster is also relatively easy using Apache Kafka’s mirroring feature, MirrorMaker — see an <a target="_blank" href="http://bit.ly/2CsENsU">example</a> of mirroring data between two HDInsight clusters. <strong>Just remember</strong>, if multiple consumers are defined as part of the same group (defined by the group.id) the data will be balanced over all the consumers within the group.</li>
<li>Kafka is <strong>polyglot</strong> — there are many clients in C#, Java, C, python and more. The ecosystem also provides a REST proxy which allows easy integration via HTTP and JSON.</li>
<li><strong>Real-Time Handling</strong> — Kafka can handle real-time data pipelines for real time messaging for applications.</li>
<li><strong>Scalable</strong> — due to distributed architecture, Kafka can scale out without incurring any downtime.</li>
<li>and more…</li>
</ul>
<h3 id="heading-lets-make-integration-with-kafka-painless">Let’s make integration with Kafka painless</h3>
<p><img src="https://cdn-media-1.freecodecamp.org/images/ltseK3C8ZbvxjNtH95aPcVfZhWGx1wCqoapf" alt="Image" width="800" height="533" loading="lazy"></p>
<h4 id="heading-here-are-6-things-to-know-before-integrating">Here are 6 things to know before integrating:</h4>
<p><strong>1 — Apache Zookeeper can become a pain point with a Kafka cluster</strong></p>
<p>In the past ( versions &lt; 0.81) Kafka used Zookeeper to maintain offsets of each topic and partition. Zookeeper used to take part in the read path, where too frequent commits and too many consumers led to sever performance and stability issues.</p>
<p>On top of that, it is better to use commits manually with old Zookeeper-based consumers, since careless auto-commits could lead to data loss.</p>
<p>The newer versions of Kafka offer their own management, where the consumer can use Kafka itself to manage offsets. This means that there is a specific topic that manages the read offsets instead of Zookeeper.</p>
<p>Yet, Kafka still needs a cluster with Zookeeper, even in the later versions 2.+. Zookeeper is used to store Kafka configs (reassigning partitions when needed) and the Kafka topics API, like create topic, add partition, etc.</p>
<p>The load on Kafka is strictly related to the number of consumers, brokers, partitions and frequency of commits from the consumer.</p>
<p><img src="https://cdn-media-1.freecodecamp.org/images/YHjRwjTTwgWD4nilJIADm7GF9KkNyZPWG4NG" alt="Image" width="800" height="603" loading="lazy"></p>
<p><strong>2 — You shouldn’t send large messages or payloads through Kafka</strong></p>
<p>According to Apache Kafka, for better throughput, the max message size should be <strong>10KB<em>.</em></strong> If the messages are larger than this, it is better to check the alternatives or find a way to chop the message into smaller parts before writing to Kafka. Best practice to do so is using a message key to make sure all chopped messages will be written to the same partition.</p>
<p><strong>3 — Apache Kafka can’t transform data</strong></p>
<p>Many developers are mistaken and think that they can create Kafka parsers or do a data transformation over Kafka. However, Kafka does not enable transformation of data. If you are using Azure services, there is a great list of <a target="_blank" href="http://bit.ly/2Sk8rpS">data factories services</a> that you can use to transform the data like <a target="_blank" href="http://bit.ly/2Lv6uEm">Azure Databricks</a>, <a target="_blank" href="http://bit.ly/2EK5EDp">HDInsights Spark</a> and others that connects to Kafka.</p>
<p>Another solution is using <a target="_blank" href="https://kafka.apache.org/documentation/streams/">Apache Kafka stream</a>. This is actually a new API that is build on top of Kafka’s producer and consumer clients. It’s significantly more powerful and also more expressive than the Kafka consumer client.</p>
<p>The <code>[KafkaStreams](https://kafka.apache.org/10/javadoc/org/apache/kafka/streams/KafkaStreams.html)</code> client allows us to perform continuous computation on input coming from one or more input topics and sends output to zero, one, or more output topics. Internally a <code>KafkaStreams</code> instance contains a normal <code>[KafkaProducer](https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html)</code> and <code>[KafkaConsumer](https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html)</code> instance that is used for reading input and writing output.</p>
<p>Another option is using Flink, check it out <a target="_blank" href="https://www.baeldung.com/kafka-flink-data-pipeline">here</a>.</p>
<p><strong>4 — Apache Kafka supports a binary protocol over TCP</strong></p>
<p>Apache Kafka communication protocol is TCP based. It doesn’t support MQTT or JMS or other non-based TCP protocols out of the box. However, many users have written adaptors to read data from those protocols and write to Apache Kafka. For example <a target="_blank" href="https://github.com/adispennette/apache-kafka-jms">kafka-jms-client</a>.</p>
<p><img src="https://cdn-media-1.freecodecamp.org/images/ScLTW1xegqrB9bKdZwmPrT0WUgzsmRzycyFV" alt="Image" width="800" height="591" loading="lazy">
<em>Simple TCP handshake</em></p>
<p><strong>5 — Apache Kafka management / support and the steep learning curve</strong></p>
<p>As of today, there are limited <em>free</em> UI based management system for Apache Kafka, and most the the DevOps I worked with are using scripting tools. However, it can be tedious for beginner to jump into Apache Kafka scripting tools without taking the time for training. The Learning curve is steep and takes some time to get moving and integrate into big running systems.</p>
<p>For experienced DevOps/ developers it might take a few months (2+) to fully understand how to integrate, support and work with Apache Kafka. It is important to learn how Kafka works in order to use the configuration in the way that will best suit the system’s needs.</p>
<p>Here’s a list of management tools that you can use for <strong>almost free (</strong>some are restricted to personal/community use):</p>
<ul>
<li><a target="_blank" href="http://www.kafkatool.com/">KafkaTool</a> — GUI application for managing and using <strong>Apache Kafka</strong> clusters.</li>
<li><a target="_blank" href="https://www.confluent.io/product/confluent-platform/">Confluent platform</a> — full enterprise streaming platform solution.</li>
<li><a target="_blank" href="https://github.com/HomeAdvisor/Kafdrop">KafDrop</a> — tool for displaying information such as brokers, topics, partitions, and even lets you view messages. It is a lightweight application that runs on Spring Boot and requires very little configuration.</li>
<li><a target="_blank" href="https://github.com/yahoo/kafka-manager">Yahoo Kafka Manager</a> —another tool for monitoring Kafka, yet it offers much less than the rest.</li>
</ul>
<p><strong>Supporting Managed Kafka on the cloud</strong></p>
<p>Today almost all clouds support Kafka, if it is fully managed or using integration with Confluent from the cloud store up to just purchasing Kafka machines:</p>
<ul>
<li><a target="_blank" href="https://www.confluent.io/confluent-cloud/">Confluent Cloud- Kafka as a Service</a></li>
<li><a target="_blank" href="http://bit.ly/2Ah5MGo">Azure Event Hub</a>- fully managed Kafka</li>
<li><a target="_blank" href="http://bit.ly/2BEpPyp">Managed Kafka on HDInsight — Azure</a></li>
<li><a target="_blank" href="https://console.cloud.google.com/marketplace/details/click-to-deploy-images/kafka?pli=1">Kafka Machine on Google cloud</a></li>
<li><a target="_blank" href="https://aws.amazon.com/quickstart/architecture/confluent-platform/">Kafka on AWS using Confluent solution</a></li>
<li>... many more</li>
</ul>
<p><strong>6 — Kafka is no magic — There is still a possibility of data loss</strong></p>
<p>Apache Kafka is probably the most popular tool for distributed asynchronous messaging. This is mainly due to his <strong>high throughput, low latency, scalability, centralised and real time</strong> abilities. Most of this is due to using data replicas which in Kafka are called partitions.</p>
<p>However, with misconfiguration there is a high chance of <em>data loss</em> when machines/processes are failing, and they will fail. Therefore, it’s important to understand how Kafka works and what the product/system requirements are.</p>
<p><strong>7 — Kafka built-in failure testing framework Trogdor</strong></p>
<p>To assist you in finding the right configuration, the Kafka team created <a target="_blank" href="https://cwiki.apache.org/confluence/display/KAFKA/Fault+Injection">Trogdor</a>. Trogdor is a failure testing framework.</p>
<p><strong>How it works</strong></p>
<ul>
<li>Configure Kafka the way you would in production</li>
<li>Create a producer that generates messages with sequence 1…X million.</li>
<li>Run the producer</li>
<li>Run the consumer</li>
<li>Create failure by crashing and/or hanging broker.</li>
<li>Test and check that every event produced was consumed.</li>
<li>… if that’s not the case, it is better to go back and update the configuration accordingly!</li>
</ul>
<h4 id="heading-on-top-of-that-it-is-important-to-remember-that-apache-kafka">On top of that, it is important to remember that Apache Kafka …</h4>
<ul>
<li>Is <strong><em>not a RPC</em></strong> —Apache Kafka is a messaging system. For RPC, service X needs to be aware of Service Y and the call signature. For example, in Kafka, if you send a message it doesn't mean that someone will consume it, ever. In RPC, there is always a consumer since the service itself is aware of the consumer Y and creates a call to its signature/function.</li>
<li>It is <strong><em>not a Database</em></strong> — it’s not a good place to save messages since you can’t jump between them or create a search without an expensive full scan.</li>
</ul>
<h4 id="heading-just-a-word-about-ksql">Just a word about KSQL</h4>
<p>An interesting library brought to us by the Confluent Community is <a target="_blank" href="https://github.com/confluentinc/ksql">KSQL</a>. It is build on top of Kafka stream. KSQL is a completely interactive SQL interface. You can use it without writing any code. KSQL is under the Confluent Community licensing.</p>
<h3 id="heading-tldr">TL;DR</h3>
<p>Apache Kafka has many benefits, yet before adding it in production, one should be aware that:</p>
<ul>
<li>It has a steep learning curve — make time to learn the bits and bits of Kafka</li>
<li>You must manage cluster resources — be aware of the requirements like Zookeeper</li>
<li>You can still lose data with Apache Kafka</li>
<li>Most clouds provide managed Apache Kafka</li>
<li>It won’t transform data</li>
<li>It’s not a Database</li>
<li>It support binary protocol over TCP protocol</li>
<li>At the moment, you can’t sent large messages using Kafka</li>
<li>You should use Trogdor for fault testing of your system</li>
</ul>
<p>All that being said, Apache Kafka is probably the best tool for messaging and streaming tasks.</p>
<p>Thank you <a target="_blank" href="https://www.freecodecamp.org/news/what-to-consider-for-painless-apache-kafka-integration-df559e828876/undefined">Gwen Shapira</a> for your input and guidance along the way.</p>
<p>If you enjoyed this story, please click the ? button. Feel free to leave a comment below.</p>
<p><img src="https://cdn-media-1.freecodecamp.org/images/aPn5w0kq5WP9lvUQ2a7dF1Yx-bUxVfvMDO1i" alt="Image" width="800" height="569" loading="lazy"></p>
<p><a target="_blank" href="https://medium.com/@adipolak">Follow me</a> here, or <a target="_blank" href="https://twitter.com/adipolak">here</a> for more posts about Scala, Kotlin, Big data, clean code and software engineers nonsense. Cheers!</p>
 ]]>
                </content:encoded>
            </item>
        
    </channel>
</rss>
