<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:content="http://purl.org/rss/1.0/modules/content/"
    xmlns:atom="http://www.w3.org/2005/Atom" xmlns:media="http://search.yahoo.com/mrss/" version="2.0">
    <channel>
        
        <title>
            <![CDATA[ stream processing - freeCodeCamp.org ]]>
        </title>
        <description>
            <![CDATA[ Browse thousands of programming tutorials written by experts. Learn Web Development, Data Science, DevOps, Security, and get developer career advice. ]]>
        </description>
        <link>https://www.freecodecamp.org/news/</link>
        <image>
            <url>https://cdn.freecodecamp.org/universal/favicons/favicon.png</url>
            <title>
                <![CDATA[ stream processing - freeCodeCamp.org ]]>
            </title>
            <link>https://www.freecodecamp.org/news/</link>
        </image>
        <generator>Eleventy</generator>
        <lastBuildDate>Tue, 23 Jun 2026 22:45:38 +0000</lastBuildDate>
        <atom:link href="https://www.freecodecamp.org/news/tag/stream-processing/rss.xml" rel="self" type="application/rss+xml" />
        <ttl>60</ttl>
        
            <item>
                <title>
                    <![CDATA[ How to implement Change Data Capture using Kafka Streams ]]>
                </title>
                <description>
                    <![CDATA[ By Luca Florio Change Data Capture (CDC) involves observing the changes happening in a database and making them available in a form that can be exploited by other systems.  One of the most interesting use-cases is to make them available as a stream o... ]]>
                </description>
                <link>https://www.freecodecamp.org/news/how-to-implement-the-change-data-capture-pattern-using-kafka-streams/</link>
                <guid isPermaLink="false">66d45e444a7504b7409c3398</guid>
                
                    <category>
                        <![CDATA[ Apache Kafka ]]>
                    </category>
                
                    <category>
                        <![CDATA[ cdc ]]>
                    </category>
                
                    <category>
                        <![CDATA[ change data capture ]]>
                    </category>
                
                    <category>
                        <![CDATA[ elasticsearch ]]>
                    </category>
                
                    <category>
                        <![CDATA[ Kafka streams ]]>
                    </category>
                
                    <category>
                        <![CDATA[ MongoDB ]]>
                    </category>
                
                    <category>
                        <![CDATA[ Scala ]]>
                    </category>
                
                    <category>
                        <![CDATA[ stream processing ]]>
                    </category>
                
                <dc:creator>
                    <![CDATA[ freeCodeCamp ]]>
                </dc:creator>
                <pubDate>Mon, 20 Jan 2020 21:10:30 +0000</pubDate>
                <media:content url="https://cdn-media-2.freecodecamp.org/w1280/5f9c9dac740569d1a4ca3900.jpg" medium="image" />
                <content:encoded>
                    <![CDATA[ <p>By Luca Florio</p>
<p><strong>Change Data Capture</strong> (CDC) involves observing the changes happening in a database and making them available in a form that can be exploited by other systems. </p>
<p>One of the most interesting use-cases is to make them available as a stream of events. This means you can, for example, catch the events and update a search index as the data are written to the database.</p>
<p>Interesting right? Let's see how to implement a CDC system that can observe the changes made to a NoSQL database (<strong>MongoDB</strong>), stream them through a message broker (<strong>Kafka</strong>), process the messages of the stream (<strong>Kafka Streams</strong>), and update a search index (<strong>Elasticsearch</strong>)!?</p>
<h2 id="heading-tldr">TL;DR</h2>
<p>The full code of the project is available on GitHub in this <a target="_blank" href="https://github.com/elleFlorio/kafka-streams-playground">repository</a>. If you want to skip all my jibber jabber and just run the example, go straight to the 
<strong>How to run the project</strong> section near the end of the article!?</p>
<h1 id="heading-use-case-amp-infrastructure">Use case &amp; infrastructure</h1>
<p>We run a web application that stores photos uploaded by users. People can share their shots, let others download them, create albums, and so on. Users can also provide a description of their photos, as well as Exif metadata and other useful information. </p>
<p>We want to store such information and use it to improve our search engine. We will focus on this part of our system that is depicted in the following diagram.</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2019/12/kafka-stream-playground.png" alt="Image" width="600" height="400" loading="lazy">
<em>Photo information storage architecture</em></p>
<p>The information is provided in <code>JSON</code> format. Since I like to post my shots on <a target="_blank" href="https://unsplash.com/">Unsplash</a>, and the website provides free access to its API, I used their model for the photo <code>JSON</code> document.</p>
<p>Once the <code>JSON</code> is sent through a <code>POST</code> request to our server, we store the document inside a <strong>MongoDB</strong> database. We will also store it in <strong>Elasticsearch</strong> for indexing and quick search. </p>
<p>However, we love <strong>long exposure shots</strong>, and we would like to store in a separate index a subset of information regarding this kind of photo. It can be the exposure time, as well as the location (latitude and longitude) where the photo has been taken. In this way, we can create a map of locations where photographers usually take long exposure photos.</p>
<p>Here comes the interesting part: instead of explicitly calling Elasticsearch in our code once the photo info is stored in MongoDB, we can implement a <strong>CDC</strong> exploiting Kafka and <strong>Kafka Streams</strong>. </p>
<p>We listen to modifications to MongoDB <strong>oplog</strong> using the interface provided by MongoDB itself. When the photo is stored we send it to a <code>photo</code> Kafka topic. Using <strong>Kafka Connect</strong>, an Elasticsearch sink is configured to save everything sent to that topic to a specific index. In this way, we can index all photos stored in MongoDB automatically.</p>
<p>We need to take care of the long exposure photos too. It requires some processing of the information to extract what we need. For this reason, we use Kafka Streams to create a <strong>processing topology</strong> to:</p>
<ol>
<li>Read from the <code>photo</code> topic</li>
<li>Extract Exif and location information</li>
<li>Filter long exposure photos (exposure time &gt; 1 sec.)</li>
<li>Write to a <code>long-exposure</code> topic.</li>
</ol>
<p>Then another Elasticsearch sink will read data from the <code>long-exposure</code> topic and write it to a specific index in Elasticsearch.</p>
<p>It is quite simple, but it's enough to have fun with CDC and Kafka Streams! ?</p>
<h1 id="heading-server-implementation">Server implementation</h1>
<p>Let's have a look at what we need to implement: our server exposing the <strong>REST API</strong>s!</p>
<h3 id="heading-models-and-dao">Models and DAO</h3>
<p>First things first, we need a model of our data and a <strong>Data Access Object</strong> (DAO) to talk to our MongoDB database. </p>
<p>As I said, the model for the photo <code>JSON</code> information is the one used by Unsplash. Check out the free API <a target="_blank" href="https://unsplash.com/documentation#get-a-photo">documentation</a> for an example of the <code>JSON</code> we will use. </p>
<p>I created the mapping for the serializaion/deserialization of the photo <code>JSON</code> using <a target="_blank" href="https://github.com/spray/spray-json">spray-json</a>. I'll skip the details about this, if you are curious just look at the <a target="_blank" href="https://github.com/elleFlorio/kafka-streams-playground/tree/master/src/main/scala/com/elleflorio/kafka/streams/playground/dao/model/unsplash">repo</a>!</p>
<p>Let's focus on the model for the long exposusure photo.</p>
<pre><code class="lang-scala"><span class="hljs-keyword">case</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">LongExposurePhoto</span>(<span class="hljs-params">id: <span class="hljs-type">String</span>, exposureTime: <span class="hljs-type">Float</span>, createdAt: <span class="hljs-type">Date</span>, location: <span class="hljs-type">Location</span></span>)</span>

<span class="hljs-class"><span class="hljs-keyword">object</span> <span class="hljs-title">LongExposurePhotoJsonProtocol</span> <span class="hljs-keyword">extends</span> <span class="hljs-title">DefaultJsonProtocol</span> </span>{
  <span class="hljs-keyword">implicit</span> <span class="hljs-keyword">val</span> longExposurePhotoFormat:<span class="hljs-type">RootJsonFormat</span>[<span class="hljs-type">LongExposurePhoto</span>] = jsonFormat(<span class="hljs-type">LongExposurePhoto</span>, <span class="hljs-string">"id"</span>, <span class="hljs-string">"exposure_time"</span>, <span class="hljs-string">"created_at"</span>, <span class="hljs-string">"location"</span>)
}
</code></pre>
<p>This is quite simple: we keep from the photo <code>JSON</code> the information about the <code>id</code>, the exposure time (<code>exposureTime</code>), when the photo has been created (<code>createdAt</code>), and the <code>location</code> where it has been taken. The <code>location</code> comprehends the <code>city</code>, the <code>country</code>, and the <code>position</code> composed of <code>latitude</code> and <code>longitude</code>.</p>
<p>The DAO consists of just the <code>PhotoDao.scala</code> class. </p>
<pre><code class="lang-scala"><span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">PhotoDao</span>(<span class="hljs-params">database: <span class="hljs-type">MongoDatabase</span>, photoCollection: <span class="hljs-type">String</span></span>) </span>{

  <span class="hljs-keyword">val</span> collection: <span class="hljs-type">MongoCollection</span>[<span class="hljs-type">Document</span>] = database.getCollection(photoCollection)

  <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">createPhoto</span></span>(photo: <span class="hljs-type">Photo</span>): <span class="hljs-type">Future</span>[<span class="hljs-type">String</span>] = {
    <span class="hljs-keyword">val</span> doc = <span class="hljs-type">Document</span>(photo.toJson.toString())
    doc.put(<span class="hljs-string">"_id"</span>, photo.id)
    collection.insertOne(doc).toFuture()
      .map(_ =&gt; photo.id)
  }
}
</code></pre>
<p>Since I want to keep this example minimal and focused on the CDC implementation, the DAO has just one method to create a new photo document in MongoDB. </p>
<p>It is straightforward: create a document from the photo <code>JSON</code>, and insert it in mongo using <code>id</code> as the one of the photo itself. Then, we can return the <code>id</code> of the photo just inserted in a <code>Future</code> (the MongoDB API is async).</p>
<h3 id="heading-kafka-producer">Kafka Producer</h3>
<p>Once the photo is stored inside MongoDB, we have to send it to the <code>photo</code> Kafka topic. This means we need a producer to write the message in its topic. The <code>PhotoProducer.scala</code> class looks like this.</p>
<pre><code class="lang-scala"><span class="hljs-keyword">case</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">PhotoProducer</span>(<span class="hljs-params">props: <span class="hljs-type">Properties</span>, topic: <span class="hljs-type">String</span></span>) </span>{

  createKafkaTopic(props, topic)
  <span class="hljs-keyword">val</span> photoProducer = <span class="hljs-keyword">new</span> <span class="hljs-type">KafkaProducer</span>[<span class="hljs-type">String</span>, <span class="hljs-type">String</span>](props)

  <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">sendPhoto</span></span>(photo: <span class="hljs-type">Photo</span>): <span class="hljs-type">Future</span>[<span class="hljs-type">RecordMetadata</span>] = {
    <span class="hljs-keyword">val</span> record = <span class="hljs-keyword">new</span> <span class="hljs-type">ProducerRecord</span>[<span class="hljs-type">String</span>, <span class="hljs-type">String</span>](topic, photo.id, photo.toJson.compactPrint)
    photoProducer.send(record)
  }

  <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">closePhotoProducer</span></span>(): <span class="hljs-type">Unit</span> = photoProducer.close()
}
</code></pre>
<p>I would say that this is pretty self-explanatory. The most interesting part is probably the <code>createKafkaTopic</code> method that is implemented in the <code>utils</code> package.</p>
<pre><code class="lang-scala"><span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">createKafkaTopic</span></span>(props: <span class="hljs-type">Properties</span>, topic: <span class="hljs-type">String</span>): <span class="hljs-type">Unit</span> = {
    <span class="hljs-keyword">val</span> adminClient = <span class="hljs-type">AdminClient</span>.create(props)
    <span class="hljs-keyword">val</span> photoTopic = <span class="hljs-keyword">new</span> <span class="hljs-type">NewTopic</span>(topic, <span class="hljs-number">1</span>, <span class="hljs-number">1</span>)
    adminClient.createTopics(<span class="hljs-type">List</span>(photoTopic).asJava)
  }
</code></pre>
<p>This method creates the topic in Kafka setting 1 as a partition and replication factor (it is enough for this example). It is not required, but creating the topic in advance lets Kafka balance partitions, select leaders, and so on. This will be useful to get our stream topology ready to process as we start our server.</p>
<h3 id="heading-event-listener">Event Listener</h3>
<p>We have the DAO that writes in MongoDB and the producer that sends the message in Kafka. We need to glue them together in some way so that when the document is stored in MongoDB the message is sent to the <code>photo</code> topic. This is the purpose of the <code>PhotoListener.scala</code> class.</p>
<pre><code class="lang-scala"><span class="hljs-keyword">case</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">PhotoListener</span>(<span class="hljs-params">collection: <span class="hljs-type">MongoCollection</span>[<span class="hljs-type">Document</span>], producer: <span class="hljs-type">PhotoProducer</span></span>) </span>{

  <span class="hljs-keyword">val</span> cursor: <span class="hljs-type">ChangeStreamObservable</span>[<span class="hljs-type">Document</span>] = collection.watch()

  cursor.subscribe(<span class="hljs-keyword">new</span> <span class="hljs-type">Observer</span>[<span class="hljs-type">ChangeStreamDocument</span>[<span class="hljs-type">Document</span>]] {
    <span class="hljs-keyword">override</span> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">onNext</span></span>(result: <span class="hljs-type">ChangeStreamDocument</span>[<span class="hljs-type">Document</span>]): <span class="hljs-type">Unit</span> = {
      result.getOperationType <span class="hljs-keyword">match</span> {
        <span class="hljs-keyword">case</span> <span class="hljs-type">OperationType</span>.<span class="hljs-type">INSERT</span> =&gt; {
          <span class="hljs-keyword">val</span> photo = result.getFullDocument.toJson().parseJson.convertTo[<span class="hljs-type">Photo</span>]
          producer.sendPhoto(photo).get()
          println(<span class="hljs-string">s"Sent photo with Id <span class="hljs-subst">${photo.id}</span>"</span>)
        }
        <span class="hljs-keyword">case</span> _ =&gt; println(<span class="hljs-string">s"Operation <span class="hljs-subst">${result.getOperationType}</span> not supported"</span>)
      }
    }
    <span class="hljs-keyword">override</span> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">onError</span></span>(e: <span class="hljs-type">Throwable</span>): <span class="hljs-type">Unit</span> = println(<span class="hljs-string">s"onError: <span class="hljs-subst">$e</span>"</span>)
    <span class="hljs-keyword">override</span> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">onComplete</span></span>(): <span class="hljs-type">Unit</span> = println(<span class="hljs-string">"onComplete"</span>)})
}
</code></pre>
<p>We exploit the <a target="_blank" href="https://docs.mongodb.com/manual/changeStreams/">Chage Streams interface</a> provided by the MongoDB scala library. </p>
<p>Here is how it works: we <code>watch()</code> the collection where photos are stored. When there is a new event (<code>onNext</code>) we run our logic. </p>
<p>For this example we are interested only in the creation of new documents, so we explicitly check that the operation is of type <code>OperationType.INSERT</code>. If the operation is the one we are interested in, we get the document and convert it to a <code>Photo</code> object to be sent by our producer. </p>
<p>That's it! With few lines of code we connected the creation of documents in MongoDB to a stream of events in Kafka.?</p>
<p>As a side note, be aware that to use the Change Streams interface <strong>we have to setup a MongoDB replica set</strong>. This means we need to run 3 instances of MongoDB and configure them to act as a replica set using the following command in mongo client:</p>
<pre><code class="lang-shell">rs.initiate({_id : "r0", members: [{ _id : 0, host : "mongo1:27017", priority : 1 },{ _id : 1, host :"mongo2:27017", priority : 0 },{ _id : 2, host : "mongo3:27017", priority : 0, arbiterOnly: true }]})
</code></pre>
<p>Here our instances are the containers we will run in the docker-compose file, that is <code>mongo1</code>, <code>mongo2</code>, and <code>mongo3</code>.</p>
<h3 id="heading-processing-topology">Processing Topology</h3>
<p>Time to build our processing topology! It will be in charge of the creation of the <code>long-exposure</code> index in Elasticsearch. The topology is described by the following diagram:</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2020/01/processing-topology.png" alt="Image" width="600" height="400" loading="lazy">
<em>Processing topology</em></p>
<p>and it is implemented in the <code>LongExposureTopology.scala</code> object class.
Let's analyse every step of our processing topology.</p>
<pre><code class="lang-scala"><span class="hljs-keyword">val</span> stringSerde = <span class="hljs-keyword">new</span> <span class="hljs-type">StringSerde</span>

<span class="hljs-keyword">val</span> streamsBuilder = <span class="hljs-keyword">new</span> <span class="hljs-type">StreamsBuilder</span>()

<span class="hljs-keyword">val</span> photoSource: <span class="hljs-type">KStream</span>[<span class="hljs-type">String</span>, <span class="hljs-type">String</span>] = streamsBuilder.stream(sourceTopic, <span class="hljs-type">Consumed</span>.`<span class="hljs-keyword">with</span>`(stringSerde, stringSerde))
</code></pre>
<p>The first step is to read from a source topic. We start a stream from the <code>sourceTopic</code> (that is <code>photo</code> topic) using the <code>StreamsBuilder()</code> object. The <code>stringSerde</code> object is used to serialise and deserialise the content of the topic as a <code>String</code>. </p>
<p>Please notice that at each step of the processing we create a new stream of data with a <code>KStream</code> object. When creating the stream, we specify the key and the value produced by the stream. In our topology the key will always be a <code>String</code>. In this step the value produced is still a <code>String</code>.</p>
<pre><code class="lang-scala"><span class="hljs-keyword">val</span> covertToPhotoObject: <span class="hljs-type">KStream</span>[<span class="hljs-type">String</span>, <span class="hljs-type">Photo</span>] =
      photoSource.mapValues((_, jsonString) =&gt; {
        <span class="hljs-keyword">val</span> photo = jsonString.parseJson.convertTo[<span class="hljs-type">Photo</span>]
        println(<span class="hljs-string">s"Processing photo <span class="hljs-subst">${photo.id}</span>"</span>)
        photo
      })
</code></pre>
<p>The next step is to convert the value extracted from the <code>photo</code> topic into a proper <code>Photo</code> object. </p>
<p>So we start from the <code>photoSource</code> stream and work on the values using the <code>mapValues</code> function. We simply parse the value as a <code>JSON</code> and create the <code>Photo</code> object that will be sent in the <code>convertToPhotoObject</code> stream.</p>
<pre><code class="lang-scala"><span class="hljs-keyword">val</span> filterWithLocation: <span class="hljs-type">KStream</span>[<span class="hljs-type">String</span>, <span class="hljs-type">Photo</span>] = covertToPhotoObject.filter((_, photo) =&gt; photo.location.exists(_.position.isDefined))
</code></pre>
<p>There is no guarantee that the photo we are processing will have the info about the location, but we want it in our long exposure object. This step of the topology filters out from the <code>covertToPhotoObject</code> stream the photos that have no info about the location, and creates the <code>filterWithLocation</code> stream.</p>
<pre><code class="lang-scala"><span class="hljs-keyword">val</span> filterWithExposureTime: <span class="hljs-type">KStream</span>[<span class="hljs-type">String</span>, <span class="hljs-type">Photo</span>] = filterWithLocation.filter((_, photo) =&gt; photo.exif.exists(_.exposureTime.isDefined))
</code></pre>
<p>Another important fact for our processing is the exposure time of the photo. For this reason, we filter out from the <code>filterWithLocation</code> stream the photos without exposure time info, creating the <code>filterWithExposureTime</code>.</p>
<pre><code class="lang-scala"><span class="hljs-keyword">val</span> dataExtractor: <span class="hljs-type">KStream</span>[<span class="hljs-type">String</span>, <span class="hljs-type">LongExposurePhoto</span>] =
      filterWithExposureTime.mapValues((_, photo) =&gt; <span class="hljs-type">LongExposurePhoto</span>(photo.id, parseExposureTime(photo.exif.get.exposureTime.get), photo.createdAt, photo.location.get))
</code></pre>
<p>We now have all we need to create a <code>LongExposurePhoto</code> object! That is the result of the <code>dataExtractor</code>: it takes the <code>Photo</code> coming from the <code>filterWithExposureTime</code> stream and produces a new stream containing <code>LongExposurePhoto</code>.</p>
<pre><code class="lang-scala"><span class="hljs-keyword">val</span> longExposureFilter: <span class="hljs-type">KStream</span>[<span class="hljs-type">String</span>, <span class="hljs-type">String</span>] =
      dataExtractor.filter((_, item) =&gt; item.exposureTime &gt; <span class="hljs-number">1.0</span>).mapValues((_, longExposurePhoto) =&gt; {
        <span class="hljs-keyword">val</span> jsonString = longExposurePhoto.toJson.compactPrint
        println(<span class="hljs-string">s"completed processing: <span class="hljs-subst">$jsonString</span>"</span>)
        jsonString
      })
</code></pre>
<p>We are almost there. We now have to keep the photos with a long exposure time (that we decided is more then 1 sec.). So we create a new <code>longExposureFilter</code> stream without the photos that are not long exposure. </p>
<p>This time we also serialise the <code>LongExposurePhotos</code> into the corresponding <code>JSON</code> string, which will be written to Elasticsearch in the next step.</p>
<pre><code class="lang-scala">longExposureFilter.to(sinkTopic, <span class="hljs-type">Produced</span>.`<span class="hljs-keyword">with</span>`(stringSerde, stringSerde))

streamsBuilder.build()
</code></pre>
<p>This is the last step of our topology. We write <code>to</code> our <code>sinkTopic</code> (that is <code>long-exposure</code> topic) using the string serialiser/deserialiser what is inside the <code>longExposureFilter</code> stream.
The last command simply <code>build</code>s the topology we just created.</p>
<p>Now that we have our topology, we can use it in our server. The <code>PhotoStreamProcessor.scala</code> class is what manages the processing.</p>
<pre><code class="lang-scala"><span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">PhotoStreamProcessor</span>(<span class="hljs-params">kafkaProps: <span class="hljs-type">Properties</span>, streamProps: <span class="hljs-type">Properties</span>, sourceTopic: <span class="hljs-type">String</span>, sinkTopic: <span class="hljs-type">String</span></span>) </span>{

  createKafkaTopic(kafkaProps, sinkTopic)
  <span class="hljs-keyword">val</span> topology: <span class="hljs-type">Topology</span> = <span class="hljs-type">LongExposureTopology</span>.build(sourceTopic, sinkTopic)
  <span class="hljs-keyword">val</span> streams: <span class="hljs-type">KafkaStreams</span> = <span class="hljs-keyword">new</span> <span class="hljs-type">KafkaStreams</span>(topology, streamProps)

  sys.<span class="hljs-type">ShutdownHookThread</span> {
    streams.close(java.time.<span class="hljs-type">Duration</span>.ofSeconds(<span class="hljs-number">10</span>))
  }

  <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">start</span></span>(): <span class="hljs-type">Unit</span> = <span class="hljs-keyword">new</span> <span class="hljs-type">Thread</span> {
    <span class="hljs-keyword">override</span> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">run</span></span>(): <span class="hljs-type">Unit</span> = {
      streams.cleanUp()
      streams.start()
      println(<span class="hljs-string">"Started long exposure processor"</span>)
    }
  }.start()
}
</code></pre>
<p>First we create the <code>sinkTopic</code>, using the same utility method we saw before. Then we build the stream topology and initialize a <code>KafkaStreams</code> object with that topology.</p>
<p>To start the stream processing, we need to create a dedicated <code>Thread</code> that will run the streaming while the server is alive. According to the official documentation, it is always a good idea to <code>cleanUp()</code> the stream before starting it. </p>
<p>Our <code>PhotoStreamProcessor</code> is ready to go!?</p>
<h3 id="heading-rest-api">REST API</h3>
<p>The server exposes REST APIs to send it the photo information to store. We make use of <a target="_blank" href="https://doc.akka.io/docs/akka-http/current/index.html">Akka HTTP</a> for the API implementation.</p>
<pre><code class="lang-scala"><span class="hljs-class"><span class="hljs-keyword">trait</span> <span class="hljs-title">AppRoutes</span> <span class="hljs-keyword">extends</span> <span class="hljs-title">SprayJsonSupport</span> </span>{

  <span class="hljs-keyword">implicit</span> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">system</span></span>: <span class="hljs-type">ActorSystem</span>
  <span class="hljs-keyword">implicit</span> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">photoDao</span></span>: <span class="hljs-type">PhotoDao</span>
  <span class="hljs-keyword">implicit</span> <span class="hljs-keyword">lazy</span> <span class="hljs-keyword">val</span> timeout = <span class="hljs-type">Timeout</span>(<span class="hljs-number">5.</span>seconds)

  <span class="hljs-keyword">lazy</span> <span class="hljs-keyword">val</span> healthRoute: <span class="hljs-type">Route</span> = pathPrefix(<span class="hljs-string">"health"</span>) {
    concat(
      pathEnd {
        concat(
          get {
            complete(<span class="hljs-type">StatusCodes</span>.<span class="hljs-type">OK</span>)
          }
        )
      }
    )
  }

  <span class="hljs-keyword">lazy</span> <span class="hljs-keyword">val</span> crudRoute: <span class="hljs-type">Route</span> = pathPrefix(<span class="hljs-string">"photo"</span>) {
    concat(
      pathEnd {
        concat(
          post {
            entity(as[<span class="hljs-type">Photo</span>]) { photo =&gt;
              <span class="hljs-keyword">val</span> photoCreated: <span class="hljs-type">Future</span>[<span class="hljs-type">String</span>] =
                photoDao.createPhoto(photo)
              onSuccess(photoCreated) { id =&gt;
              complete((<span class="hljs-type">StatusCodes</span>.<span class="hljs-type">Created</span>, id))
              }
            }
          }
        )
      }
    )
  }

}
</code></pre>
<p>To keep the example minimal, we have only two routes:</p>
<ul>
<li><code>GET /health</code> - to check if the server is up &amp; running</li>
<li><code>POST /photo</code> - to send to the system the <code>JSON</code> of the photo information we want to store. This endpoint uses the DAO to store the document in MongoDB and returns a <code>201</code> with the id of the stored photo if the operation succeeded.</li>
</ul>
<p>This is by no means a complete set of APIs, but it is enough to run our example.?</p>
<h3 id="heading-server-main-class">Server main class</h3>
<p>OK, we implemented all the components of our server, so it's time to wrap everything up. This is our <code>Server.scala</code> object class.</p>
<pre><code class="lang-scala"><span class="hljs-keyword">implicit</span> <span class="hljs-keyword">val</span> system: <span class="hljs-type">ActorSystem</span> = <span class="hljs-type">ActorSystem</span>(<span class="hljs-string">"kafka-stream-playground"</span>)
<span class="hljs-keyword">implicit</span> <span class="hljs-keyword">val</span> materializer: <span class="hljs-type">ActorMaterializer</span> = <span class="hljs-type">ActorMaterializer</span>()
</code></pre>
<p>First a couple of <strong>Akka</strong> utility values. Since we use <a target="_blank" href="https://doc.akka.io/docs/akka-http/current/index.html">Akka HTTP</a> to run our server and REST API, these implicit values are required.</p>
<pre><code class="lang-scala"><span class="hljs-keyword">val</span> config: <span class="hljs-type">Config</span> = <span class="hljs-type">ConfigFactory</span>.load()
<span class="hljs-keyword">val</span> address = config.getString(<span class="hljs-string">"http.ip"</span>)
<span class="hljs-keyword">val</span> port = config.getInt(<span class="hljs-string">"http.port"</span>)

<span class="hljs-keyword">val</span> mongoUri = config.getString(<span class="hljs-string">"mongo.uri"</span>)
<span class="hljs-keyword">val</span> mongoDb = config.getString(<span class="hljs-string">"mongo.db"</span>)
<span class="hljs-keyword">val</span> mongoUser = config.getString(<span class="hljs-string">"mongo.user"</span>)
<span class="hljs-keyword">val</span> mongoPwd = config.getString(<span class="hljs-string">"mongo.pwd"</span>)
<span class="hljs-keyword">val</span> photoCollection = config.getString(<span class="hljs-string">"mongo.photo_collection"</span>)

<span class="hljs-keyword">val</span> kafkaHosts = config.getString(<span class="hljs-string">"kafka.hosts"</span>).split(',').toList
<span class="hljs-keyword">val</span> photoTopic = config.getString(<span class="hljs-string">"kafka.photo_topic"</span>)
<span class="hljs-keyword">val</span> longExposureTopic = config.getString(<span class="hljs-string">"kafka.long_exposure_topic"</span>)
</code></pre>
<p>Then we read all the configuration properties. We will come back to the configuration file in a moment.</p>
<pre><code class="lang-scala"><span class="hljs-keyword">val</span> kafkaProps = <span class="hljs-keyword">new</span> <span class="hljs-type">Properties</span>()
kafkaProps.put(<span class="hljs-string">"bootstrap.servers"</span>, kafkaHosts.mkString(<span class="hljs-string">","</span>))
kafkaProps.put(<span class="hljs-string">"key.serializer"</span>, <span class="hljs-string">"org.apache.kafka.common.serialization.StringSerializer"</span>)
kafkaProps.put(<span class="hljs-string">"value.serializer"</span>, <span class="hljs-string">"org.apache.kafka.common.serialization.StringSerializer"</span>)

<span class="hljs-keyword">val</span> streamProps = <span class="hljs-keyword">new</span> <span class="hljs-type">Properties</span>()
streamProps.put(<span class="hljs-type">StreamsConfig</span>.<span class="hljs-type">APPLICATION_ID_CONFIG</span>, <span class="hljs-string">"long-exp-proc-app"</span>)
streamProps.put(<span class="hljs-type">StreamsConfig</span>.<span class="hljs-type">BOOTSTRAP_SERVERS_CONFIG</span>, kafkaHosts.mkString(<span class="hljs-string">","</span>))

<span class="hljs-keyword">val</span> photoProducer = <span class="hljs-type">PhotoProducer</span>(kafkaProps, photoTopic)
<span class="hljs-keyword">val</span> photoStreamProcessor = <span class="hljs-keyword">new</span> <span class="hljs-type">PhotoStreamProcessor</span>(kafkaProps, streamProps, photoTopic, <span class="hljs-string">"long-exposure"</span>)
photoStreamProcessor.start()
</code></pre>
<p>We have to configure both our Kafka producer and the stream processor. We also start the stream processor, so the server will be ready to process the documents sent to it.</p>
<pre><code class="lang-scala"><span class="hljs-keyword">val</span> client = <span class="hljs-type">MongoClient</span>(<span class="hljs-string">s"mongodb://<span class="hljs-subst">$mongoUri</span>/<span class="hljs-subst">$mongoUser</span>"</span>)
<span class="hljs-keyword">val</span> db = client.getDatabase(mongoDb)
<span class="hljs-keyword">val</span> photoDao: <span class="hljs-type">PhotoDao</span> = <span class="hljs-keyword">new</span> <span class="hljs-type">PhotoDao</span>(db, photoCollection)
<span class="hljs-keyword">val</span> photoListener = <span class="hljs-type">PhotoListener</span>(photoDao.collection, photoProducer)
</code></pre>
<p>Also MongoDB needs to be configured. We setup the connection and initialize the DAO as well as the listener.</p>
<pre><code class="lang-scala"><span class="hljs-keyword">lazy</span> <span class="hljs-keyword">val</span> routes: <span class="hljs-type">Route</span> = healthRoute ~ crudRoute

<span class="hljs-type">Http</span>().bindAndHandle(routes, address, port)
<span class="hljs-type">Await</span>.result(system.whenTerminated, <span class="hljs-type">Duration</span>.<span class="hljs-type">Inf</span>)
</code></pre>
<p>Everything has been initialized. We create the REST routes for the communication to the server, bind them to the handlers, and finally start the server!?</p>
<h4 id="heading-server-configuration">Server configuration</h4>
<p>This is the configuration file used to setup the server:</p>
<pre><code>http {
  ip = <span class="hljs-string">"127.0.0.1"</span>
  ip = ${?SERVER_IP}

  port = <span class="hljs-number">8000</span>
  port = ${?SERVER_PORT}
}
mongo {
  uri = <span class="hljs-string">"127.0.0.1:27017"</span>
  uri = ${?MONGO_URI}
  db = <span class="hljs-string">"kafka-stream-playground"</span>
  user = <span class="hljs-string">"admin"</span>
  pwd = <span class="hljs-string">"admin"</span>
  photo_collection = <span class="hljs-string">"photo"</span>
}
kafka {
  hosts = <span class="hljs-string">"127.0.0.1:9092"</span>
  hosts = ${?KAFKA_HOSTS}
  photo_topic = <span class="hljs-string">"photo"</span>
  long_exposure_topic = <span class="hljs-string">"long-exposure"</span>
}
</code></pre><p>I think that this one does not require much explanation, right??</p>
<h1 id="heading-connectors-configuration">Connectors configuration</h1>
<p>The server we implemented writes in two Kafka topics: <code>photo</code> and <code>long-exposure</code>. But how are messages written in Elasticsearch as documents? Using <strong>Kafka Connect</strong>!</p>
<p>We can setup two connectors, one per topic, and tell the connectors to write  every message going through that topic in Elasticsearch. </p>
<p>First we need <a target="_blank" href="https://docs.confluent.io/current/connect/index.html">Kafka Connect</a>. We can use the container provided by Confluence in the docker-compose file:</p>
<pre><code class="lang-yml"><span class="hljs-attr">connect:</span>
    <span class="hljs-attr">image:</span> <span class="hljs-string">confluentinc/cp-kafka-connect</span>
    <span class="hljs-attr">ports:</span>
      <span class="hljs-bullet">-</span> <span class="hljs-number">8083</span><span class="hljs-string">:8083</span>
    <span class="hljs-attr">networks:</span>
      <span class="hljs-bullet">-</span> <span class="hljs-string">kakfa_stream_playground</span>
    <span class="hljs-attr">depends_on:</span>
      <span class="hljs-bullet">-</span> <span class="hljs-string">zookeeper</span>
      <span class="hljs-bullet">-</span> <span class="hljs-string">kafka</span>
    <span class="hljs-attr">volumes:</span>
      <span class="hljs-bullet">-</span> <span class="hljs-string">$PWD/connect-plugins:/connect-plugins</span>
    <span class="hljs-attr">environment:</span>
      <span class="hljs-attr">CONNECT_BOOTSTRAP_SERVERS:</span> <span class="hljs-string">kafka:9092</span>
      <span class="hljs-attr">CONNECT_REST_ADVERTISED_HOST_NAME:</span> <span class="hljs-string">connect</span>
      <span class="hljs-attr">CONNECT_REST_PORT:</span> <span class="hljs-number">8083</span>
      <span class="hljs-attr">CONNECT_GROUP_ID:</span> <span class="hljs-string">compose-connect-group</span>
      <span class="hljs-attr">CONNECT_CONFIG_STORAGE_TOPIC:</span> <span class="hljs-string">docker-connect-configs</span>
      <span class="hljs-attr">CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR:</span> <span class="hljs-number">1</span>
      <span class="hljs-attr">CONNECT_OFFSET_FLUSH_INTERVAL_MS:</span> <span class="hljs-number">10000</span>
      <span class="hljs-attr">CONNECT_OFFSET_STORAGE_TOPIC:</span> <span class="hljs-string">docker-connect-offsets</span>
      <span class="hljs-attr">CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR:</span> <span class="hljs-number">1</span>
      <span class="hljs-attr">CONNECT_STATUS_STORAGE_TOPIC:</span> <span class="hljs-string">docker-connect-status</span>
      <span class="hljs-attr">CONNECT_STATUS_STORAGE_REPLICATION_FACTOR:</span> <span class="hljs-number">1</span>
      <span class="hljs-attr">CONNECT_KEY_CONVERTER:</span> <span class="hljs-string">"org.apache.kafka.connect.storage.StringConverter"</span>
      <span class="hljs-attr">CONNECT_VALUE_CONVERTER:</span> <span class="hljs-string">"org.apache.kafka.connect.json.JsonConverter"</span>
      <span class="hljs-attr">CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE:</span> <span class="hljs-string">"false"</span>
      <span class="hljs-attr">CONNECT_INTERNAL_KEY_CONVERTER:</span> <span class="hljs-string">"org.apache.kafka.connect.json.JsonConverter"</span>
      <span class="hljs-attr">CONNECT_INTERNAL_VALUE_CONVERTER:</span> <span class="hljs-string">"org.apache.kafka.connect.json.JsonConverter"</span>
      <span class="hljs-attr">CONNECT_ZOOKEEPER_CONNECT:</span> <span class="hljs-string">zookeeper:2181</span>
      <span class="hljs-attr">CONNECT_PLUGIN_PATH:</span> <span class="hljs-string">/connect-plugins</span>
      <span class="hljs-attr">CONNECT_LOG4J_ROOT_LOGLEVEL:</span> <span class="hljs-string">INFO</span>
</code></pre>
<p>I want to focus on some of the configuration values. </p>
<p>First of all, we need to expose the port <code>8083</code> - that will be our endpoint to configure the connectors (<code>CONNECT_REST_PORT</code>). </p>
<p>We also need to map a volume to the <code>/connect-plugins</code> path, where we will place the <a target="_blank" href="https://docs.confluent.io/current/connect/kafka-connect-elasticsearch/index.html">Elasticsearch Sink Connector</a> to write to Elasticsearch. This is reflected also in the <code>CONNECT_PLUGIN_PATH</code>. </p>
<p>The <code>connect</code> container should know how to find the Kafka servers, so we set <code>CONNECT_BOOTSTRAP_SERVERS</code> as <code>kafka:9092</code>.</p>
<p>Once Kafka Connect is ready, we can send the configurations of our connectors to the <code>http://localhost:8083/connectors</code> endpoint. We need 2 connectors, one for the <code>photo</code> topic and one for the <code>long-exposure</code> topic. We can send the configuration as a <code>JSON</code> with a <code>POST</code> request.</p>
<pre><code class="lang-json">{
  <span class="hljs-attr">"name"</span>: <span class="hljs-string">"photo-connector"</span>,
  <span class="hljs-attr">"config"</span>: {
    <span class="hljs-attr">"connector.class"</span>: <span class="hljs-string">"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector"</span>,
    <span class="hljs-attr">"tasks.max"</span>: <span class="hljs-string">"1"</span>,
    <span class="hljs-attr">"topics"</span>: <span class="hljs-string">"photo"</span>,
    <span class="hljs-attr">"key.converter"</span>: <span class="hljs-string">"org.apache.kafka.connect.storage.StringConverter"</span>,
    <span class="hljs-attr">"value.converter"</span>: <span class="hljs-string">"org.apache.kafka.connect.json.JsonConverter"</span>,
    <span class="hljs-attr">"value.converter.schemas.enable"</span>: <span class="hljs-string">"false"</span>,
    <span class="hljs-attr">"schema.ignore"</span>: <span class="hljs-string">"true"</span>,
    <span class="hljs-attr">"connection.url"</span>: <span class="hljs-string">"http://elastic:9200"</span>,
    <span class="hljs-attr">"type.name"</span>: <span class="hljs-string">"kafka-connect"</span>,
    <span class="hljs-attr">"behavior.on.malformed.documents"</span>: <span class="hljs-string">"warn"</span>,
    <span class="hljs-attr">"name"</span>: <span class="hljs-string">"photo-connector"</span>
  }
}
</code></pre>
<p>We explicitly say we are gonna use the <code>ElasticsearchSinkConnector</code> as the <code>connector.class</code> , as well as the <code>topics</code> that we want to sink - in this case <code>photo</code>. </p>
<p>We don't want to use a schema for the <code>value.converter</code>, so we can disable it (<code>value.converter.schemas.enable</code>) and tell the connector to ignore the schema (<code>schema.ignore</code>).</p>
<p>The connector for the <code>long-exposure</code> topic is exactly like this one. The only difference is the <code>name</code> and of course the <code>topics</code>.</p>
<h1 id="heading-how-to-run-the-project">How to run the project</h1>
<p>We have all we need to test the CDC! How can we do it? It's quite easy: simply run the <code>setup.sh</code> script in the root folder of the repo!</p>
<p>What will the script do?</p>
<ol>
<li>Run the <code>docker-compose</code> file with all the services.</li>
<li>Configure MongoDB replica set. This is required to enable the <strong>Change Stream interface</strong> to capture data changes. More info about this <a target="_blank" href="https://docs.mongodb.com/manual/changeStreams/">here</a>.</li>
<li>Configure the Kafka connectors.</li>
<li>Connect to the logs of the server.</li>
</ol>
<p>The docker-compose will run the following services:</p>
<ul>
<li>Our Server</li>
<li>3 instances of MongoDB (required for the replica set)</li>
<li>Mongoku, a MongoDB client</li>
<li>Kafka (single node)</li>
<li>Kafka connect</li>
<li>Zookeeper (required by Kafka)</li>
<li>Elasticsearch</li>
<li>Kibana</li>
</ul>
<p>There are a lot of containers to run, so make sure you have enough resources to run everything properly. If you want, remove Mongoku and Kibana from the compose-file, since they are used just for a quick look inside the DBs.</p>
<p>Once everything is up and running, you just have to send data to the server. </p>
<p>I collected some <code>JSON</code> documents of photos from Unplash that you can use to test the system in the <code>photos.txt</code> file. </p>
<p>There are a total of 10 documents, with 5 of them containing info about long exposure photos. Send them to the server running the <code>send-photos.sh</code> script in the root of the repo. Check that everything is stored in MongoDB connecting to Mongoku at <code>http://localhost:3100</code>. Then connect to Kibana at <code>http://localhost:5601</code> and you will find two indexes in Elasticsearch: <code>photo</code>, containing the JSON of all the photos stored in MongoDB, and <code>long-exposure</code>, containing just the info of the long exposure photos.</p>
<p>Amazing, right? ?</p>
<h1 id="heading-conclusion">Conclusion</h1>
<p>We made it guys!? </p>
<p>Starting from the design of the use-case, we built our system that connected a MongoDB database to Elasticsearch using CDC. </p>
<p>Kafka Streams is the enabler, allowing us to convert database events to a stream that we can process. </p>
<p>Do you need to see the whole project? Just checkout the <a target="_blank" href="https://github.com/elleFlorio/kafka-streams-playground">repository</a> on GitHub!?</p>
<p>That's it, enjoy! ?</p>
 ]]>
                </content:encoded>
            </item>
        
    </channel>
</rss>
