Stream processing with Apache Kafka without coding

Many know Apache Kafka for its pub/sub capabilities. As I talked about in a blog a few weeks ago, it goes much further.

For example, Kafka Streams is a framework to do stream processing within your Kafka environment.

Stream processing is a practice that involves the continuous processing (applying operations) of data in sequence.  

In the case of Kafka, this is a framework for processing events in a way that is fault tolerant, scalable, distributed fashion that allows grouping, windowing (grouping of data by time) and keeping state.  

Doing stream processing with Kafka typically requires using the Streams API within your custom application.  That is to say, the processing is not done on the Kafka brokers but in your code. It’s very powerful but it requires expertise.

Lenses from Landoop is a tool designed to make Kafka more accessible. Amongst other things, it allows data engineers to do stream processing with no coding.  

It leverages the Kafka Streams framework and abstracts coding with SQL-like syntax. It will also take care of scaling the processing by running workers on your existing Kafka Connect nodes or within Kubernetes.

This means developers, data engineers and data scientists can react more quickly to the business’ data processing and analytics needs.


In the following example, I’ll follow-up on my last blog to create a very basic stream processing job within Lenses in a few minutes.  This will involve the following steps:

  1. Sending high velocity data containing metrics to a Kafka topic from (fictional) IoT sensors
  2. Configuring a stream processing job that read the events in-motion and aggregate the sensor data into the sum value and total number within a one minute window per IoT sensor
  3. Republish aggregated data to a separate Kafka topic
  4. Configure Kafka Connect sink connector to send the aggregated data onwards

Pre-requiesies

Follow the guide in my last blog to get your Lenses Box with Kafka setup.

Step 1

Within a shell of the Lenses Box docker container, create a small temporary script to generate some JSON objects representing IoT sensors.  This will generate a random “metric_value” for a random “station_id”.

>vi /tmp/generateMetrics.sh
#!/bin/bash 
while true;
do
station_id=$(( ( RANDOM % 8 ) + 1));
echo $station_id':{"station_id":"'$station_id'","metric_value":"'$(( ( RANDOM % 5 )))'","timestamp":"'$(date +"%F %T")'","meta_station_status":"operational","meta_station_owner":"23423"}';   
sleep 0.5;  
done;
>chmod o+x /tmp/generateMetrics.sh

Test running the script


>/tmp/generateMetrics.sh

Step 2

In Lenses, create a new topic called “iot_metrics

Step 3

Publish the streaming metrics to the iot_metrics topic via the Kafka Console Producer client

>/tmp/generateMetrics.sh | kafka-console-producer --broker-list localhost:9092 --topic iot_metrics --property parse.key=true --property key.separator=:

The command above also extracts the integer at the beginning of the event as the key (_key) to the event (done because the the key.separator property)

Step 4

Within Lenses, verify if you see the data flowing into the iot_metrics topic by going to Topics >> iot_metrics.

You will need to instruct Lenses to deserialize the Key and Value using String and JSON respectively otherwise you will not see the data since Lenses has no way of knowing what type of data is contained within the events.

Step 5

Now we will create an SQL-like statement to aggregate results as a test.  We will do this outside of the stream processing framework just within the Lenses workbench on the topic. We group by the station_id field:

SELECT station_id, count(station_id) as number, SUM(cast(metric_value as INT)) as total FROM iot_metrics GROUP BY station_id

This should successfully return a table similar to the one shown in the screenshot above.

Step 6

Creating the SQL-like statement for stream processing is different and requires a slightly different syntax (SQL wasn’t designed for stream processing).  One big difference of course is that it requires the data to be windowed into discrete time slots in order to aggregate. In our case, we will calculate the sum and total count for every 1 minute window per station_id.

The statement we will use if the following:

SET autocreate = true;
INSERT INTO iot_metrics_aggregate  WITH aggregateStream as
(
 SELECT STREAM station_id, count(station_id) as number, SUM(cast(metric_value as INTEGER)) as total FROM iot_metrics GROUP BY station_id, TUMBLE(1,m)
)
SELECT STREAM station_id,number,total FROM aggregateStream

Some of the statement explained:

SET autocreate = true;

Instructs to automatically create the topic (in our case, “iot_metrics_aggregate”) if it does not already exist.

INSERT INTO iot_metrics_aggregate

Results will be published into the topic iot_metrics_aggregate

WITH aggregateStream as

Will create a temporary stream called aggregateStream before the results are finally written to the iot_metrics_aggregate topic.  The WITH directive allows us to create multiple temporary streams/tables if necessary before joining them together at the end.

TUMBLE(1,m)

Groups the results into 1 minute buckets.  There are also lots of different rolling and non-rolling windowing functions you can use, this is just one example.

SELECT STREAM station_id,number,total FROM aggregateStream

Forms the final computation of the results before the data is published to the iot_metrics_aggregate topic.  Further joins and computation could be done at this stage if necessary.

To create the stream processing job, paste the statement within a  SQL Processors >> New Processor of Lenses.

Leave the Runners value as 1.  This will be discussed in future blog.

Step 7

Within the created SQL Processor, you’ll see a topology view of the transformation of the data.

Within the Monitor tab of the SQL Processor, verify the throughput of both data coming in and out of the SQL processor.  It may take a minute or so before the out rate shows some data (since we are grouping in 1 minute buckets).

Finally, verify within the (newly created) iot_metrics_aggreage topic to see if you see data.

Step 8

You can forward the data to a downstream application by creating a sink connector as was demonstrated in my last blog.

Much more powerful processing is possible using Lenses, this is just a basic example. Give it a try. If you have any problems or questions, leave a comment below or send me a message on LinkedIn.


Follow The Data Difference for notifications of other blogs we publish.