layline.io Blog

Output to Kafka

Showcase on how to read data from a structured file, map record data, and output the data Kafka cloud.

March 21, 2022
Reading time: 10 min.

What we’re demonstrating

We’re showcasing a simple layline.io Project which reads data from a file and outputs its contents into a Kafka topic.

Planned workflow sketch
Planned workflow sketch

To follow the showcase in a real setup, you can download the assets of this Project from the Resource section at the bottom. Read this to learn how to import the project into your environment.

Configuration

The Workflow

Outline

The workflow setup of this showcase was put together using layline.io's Workflow Editor and looks like this:

File-to-Kafka workflow
File-to-Kafka workflow
  • (1) Input Processor: reading an input file with a header/detail/trailer structure, then
  • (2) Flow Processor: mapping this into an output format, which is subsequently
  • (3) Output Processor: written to a Kafka topic.

For the purpose of this showcase we are using a Kafka topic hosted by Cloud Karafka. So if you run the showcase yourself, you do not require your own Kafka installation.

Configuration of underlying Assets

The Workflow is based on a number of underlying Assets which are configured using the Asset Editor. The logical association between Workflow and Assets can be understood like this:

Logical association between Workflow and Assets
Logical association between Workflow and Assets

Workflows are comprised of a number of Processors which are connected by Links.

Processors are based on Assets (check Resources for more info). Assets are configuration entities which are of a specific class and type. In the image above we can see a Processor by the name of "InputFile", which is of class Input Processor and type Stream Input Processor. It in turn relies on two other assets "Source Asset" and "Format Asset" which are of type File System Source and Generic Format respectively.

In short:

  • A Workflow is composed of interconnected Processors
  • Processors rely on Assets which define them
  • Assets can rely on other Assets

In the following sections we will show

  • the anatomy of the Workflow and some key setups, and
  • how to deploy and run the Workflow

Environment Asset: "My-Environment"

First: layline.io can help manage multiple different environments using Environment Assets. This greatly helps when using the same Project in test-, staging-, and production environments which may require different directories, connections, passwords etc.
We are using one Environment Asset (2) in this Project. Let's take a look.

Environment Asset definition
Environment Asset definition

At (3) you see the environment variables which we have created. In case you are test-driving the Project yourself, you likely need to change the values to point them to valid directories of yours, or else you will get an error on startup.

Variables like these can be used throughout the project by using a macro like so ${lay:dirIn}. OS or Java system environment variables are prefixed with env: or sys: respectively, instead of lay:. When exploring the Input Source Asset below, you will see how this is put to work, and why it's important that you adjust them to your environment to run the Project. You can also create a completely new Environment Asset next to the one supplied with the sample, and then deploy the Project together with your Environment Asset as we will see in the Deployment section of this showcase.

Stream Input Processor: "InputFile"

The Input Processor (2) (name: InputFile / type: Stream Input Processor) takes care of reading the input files and forwarding the data downstream within the Workflow. To do its job, the Input Processor needs to know where to read the data and how to interpret the content. Both sets of information are represented by specialized assets (Source Asset and Format Asset) which are assigned to the Input Asset:

Stream Input Processor and Asset association
Stream Input Processor and Asset association

Let's look at how the supporting Assets are configured:

Generic Format Asset: "InputFileFormat"

layline.io provides the means to define complex data structures with its own grammar language. The file in our example is a bank transaction sample. It has a header record with two fields, a number of detail records holding the transaction details, and finally a trailer record. An overall very common file structure.

Generic Format Asset definition
Generic Format Asset definition

All Assets can be found in the Asset Editor (1). The configured Generic Format Asset "InputFileFormat" is allocated in the Asset tree on the left side of the editor.

This format configuration is used in the File System Source Asset "InputSource".

File-System-Source Asset: "InputSource"

The (1) "InputSource" is an Asset of type File System Source which is used to define where the file is read from. It contains information about directory location for input (2), error and completion (done) of file processing, polling intervals, file sorting, error behavior, and more.

File-System-Source Asset definition
File-System-Source Asset definition

Revisiting: Stream Input Processor "InputFile"

With the input format and input source Assets ready, the Input File Processor (2) (of type Stream Input Processor) in the Workflow (1) can be defined:

Stream Input Processor definition
Stream Input Processor definition

Both the previously defined Assets "InputFileFormat" and "InputSource" are assigned to this Processor ((3) and (4)). Apart from a few other parameters the configuration of the Processor "InputFile" (2) is now complete.

Flow Processor: Map

We next define the Flow Processor "Mapping" which is based on an Asset of type Mapping as defined in the next section.

Logical Mapping Processor to Asset association
Logical Mapping Processor to Asset association

Asset: Mapping

The structure of the file we are reading is not exactly the same structure of what we are writing to Kafka. We therefore need to map values from the input to the output with a respective Asset (1).

Mapping Asset definition
Mapping Asset definition

As explained before, the input file format has a header, details and a trailer record. The same is true for the output format. The image above shows the mapping from the source (input) to the target (output) format for the detail data (2). The notation which is used is based on how both the input ("BT_IN") and output ("BT_OUT") grammar is defined.

Stream Output Processor: Kafka

The last Processor in the Workflow which we define is the Output Processor "Kafka-Out".

Stream Output Processor definition
Stream Output Processor definition

It depends on three underlying Assets:

  • Output Asset: Defines Kafka topics and partitions we are writing to. This in turn relies on two more Assets which are
  • Kafka Sink Asset: The Sink that the Output Asset can use to send data to
  • Generic Format Asset: Defines in what format to write the data to Kafka
  • Kafka Connection Asset: Defines the physical Kafka connection parameters.

Generic Format Asset: "OutputMessageFormat"

The data which we want to output to Kafka is similar to the input, but not completely the same. Like with the input format we therefore define an Asset of type Generic Format and name it "OutputMessageFormat". We define the grammar in an analog fashion to the input grammar with only minor differences (2).

Configuration of Generic Format Asset for Kafka output
Configuration of Generic Format Asset for Kafka output

Kafka Connection Asset: "Cloud-Karafka-Connection"

To output to Kafka we first have to define a Kafka Connection Asset. This is where we define how to physically connect to Cloud Karafka.

Configuration of Kafka Connection
Configuration of Kafka Connection

The most important part here are the Kafka settings of course. Let's take a closer look at them:

Kafka Settings
Kafka Settings
  • (1) Bootstrap servers: The addresses of one or more Bootstrap servers. For our Karafka hosting they are defined as sulky-01.srvs.cloudkafka.com:9094,sulky-02.srvs.cloudkafka.com:9094,sulky-03.srvs.cloudkafka.com:9094.
  • (2) Use SSL: Defines whether this is an SSL connection. For Cloud Karafka this is a SASL/SCRAM secured connection. We therefore enable SSL.
  • (3) Authentication type: You can pick None, SASL / Plaintext, or SASL / SCRAM here. Cloud Karafka requires SASL/SCRAM
  • (4) Credential type: You have a choice of User/Secret or User/Password. We have used User/Password for simplicity in our case.
  • (5/6) Username/Password: The correct username and password are contained in the showcase Project which you can download from the Resources section.

Kafka Sink Asset: "CloudKarafka"

Next, we define a Kafka Sink Asset. The Kafka Sink Asset uses the Kafka Connection which we just defined. It's not where we define to which topic to output, however.

Check that the Kafka Connection which we have previously defined is assigned (2).

Configuration of Kafka-Sink Asset
Configuration of Kafka-Sink Asset

Revisiting: Kafka Output Processor "Kafka-Out"

Now that we have defined the Format and Sink Assets above, we can now define the Kafka Output Asset in the Workflow (1):

Final Kafka Output Processor
Final Kafka Output Processor

Check how both the Format OutputMessageFormat (2) and the Sink (3) are assigned to this Processor.

Additional settings:

Configuration of Kafka topics and partitions
Configuration of Kafka topics and partitions
  • Routing Mode: Choice of Routing Table or Exclusive Partition. In this case Routing Table is appropriate as we do not require exclusive partition access for this showcase.
  • Topic: Name of the topic we are writing to.

There are other options which we can set but are not relevant for the purpose of the demo. You can look them up in the online documentation.

Config Summary

With this, the configuration of the Workflow is complete.

Deploy & Run

Transferring the Deployment

We are ready to test the Workflow. To do so we need to deploy it to a Reactive Engine Cluster. You can simply use the one on your laptop (single node). Or if you have a larger layline.io cluster elsewhere you, can deploy it there.

To deploy we switch to the DEPLOYMENT tab of the Project (1):

Deployment to local cluster setup
Deployment to local cluster setup

We create an Engine Configuration (2) to deploy the Project. This defines the parts of the Project which we wish to deploy. In our example that would be the one Workflow we defined, as well as the Environment Asset that goes along with it.

Since we want to deploy to the local cluster we pick "Deploy to Cluster" and then our pre-defined "Local Cluster" setup (3 and 4).

Every deployment needs a Tag (5). We use "tut-kafka-" followed by a macro "${build-timestamp}" to identify the Deployment. The macro will be replaced by a timestamp upon deployment transfer. This ensures that we always get a different tag with each Deployment.

Make sure the Workflow we just created is added to the list of "Assets to deploy" (6), otherwise nothing will be deployed.

We also need the environment variables which we are using in the Source Asset to define the directories we are reading from (7).

Lastly we start the transfer of the deployment by clicking "TRANSFER DEPLOYMENT TO CLUSTER" (8) (Make sure the Cluster you are deploying to is up and running).

If the deployment went well you should see this:

Deployment result
Deployment result

Otherwise, an error message will be displayed, guiding you on how to fix the problem.

Activating the Deployment

We should now be ready to activate the Deployment. For this, we switch to the "CLUSTER" tab (1). This gives us a view of all things "cluster". In case you are managing more than one cluster, make sure you have the correct one selected from the top left drop-down-box.

Activating a Deployment setup
Activating a Deployment setup

Make it the default Deployment

Select "Deployment Storage" from the tree on the left.
This is where we find all the Deployments which are currently known to the selected cluster. In our example screenshot, in section "Deployment Configuration" we see

  1. "DeploymentRoot": This is the basic empty default deployment which is always present.
  2. "tut-kafka-20220204184304": This is the Deployment we just transferred to the cluster.

To now activate the new Deployment on the cluster select it (1) and then check the box "is the cluster's default deployment" (2).

Make a Deployment the default
Make a Deployment the default

Schedule

Now that the Deployment is active on the Cluster, we need to check whether it is actually running any instances of the Workflow. If this is the first time you deployed this Workflow the answer is likely "no".

Let's check:

  • Select the "Cluster" tab (1)
  • Select the "Scheduler Master" entry in the tree on the left (2)
Scheduling a Workflow
Scheduling a Workflow
  • In the Scheduler Settings Box select the Scheduler node in the tree (3)
  • Next make sure that the target number of instances is set to at least 1 (4). Setting it to a higher number will start more instances of the same Workflow.

You only need to do this once. Next time you deploy the Workflow, the Reactive Engine will remember the number of instances you want to run off of this Workflow. You can also define the Scheduler settings you want as part of a Deployment. That's for another showcase, though.

Engine Status

Switch over to the "Engine" tab (1). Make sure that all Engine categories are green (2). Also check the status of individual Assets in the tree to the left (3).

Engine status with Deployment activated
Engine status with Deployment activated

Feeding the test file

To test, we feed our testfile to the input directory which we have configured here.

Then, to check whether processing was successful switch over to the "Audit Trail" tab (1). Make sure you are looking at "Streams" (2). Highlight the first entry in the list (3). It should state that four messages have been processed. Make sure the timestamp of the entry corresponds with the time you have dropped the input file in the directory for processing. You can view individual log entries for the processing of this file to the right (4).

Checking Audit Trail for successful processing status
Checking Audit Trail for successful processing status

You can check the Cloud Karafka topic (credentials in the resource at the end of the document) using a tool of your choice the check the topic entries:

Kafka topic view (3rd party)
Kafka topic view (3rd party)

Summary

This showcase highlights how you can create a File-to-Kafka Workflow on-the-fly without a hassle. And you get a lot more with that out-of-the-box:

  • Reactive -- Embraces the reactive processing paradigm. Fully built on reactive stream management at the core
  • High scalability -- Scales within one engine instance and beyond across multiple engines and distributed nodes
  • Resilience -- Failover safe in distributed environments. Full 24/7 operation and upgradability
  • Automatic deployment -- Deploy changed configurations with one click
  • Real-time and batch -- Run both real-time and batch data integrations using the same platform
  • Metrics -- Automatic metric generation to use in your favorite monitoring and alerting toolkit (e.g. Prometheus)

There are too many features to explain here. For more information please check the documentation or simply contact us at hello@layline.io.

Thanks for reading!

Resources

#Project files
1Github: Simple Kafka Project
2input test files in the directory _test_files of the Project
3Cloud Karafka credentials found in file cloud-karafka-credentials.txt of the Project
#Documentation
1Getting Started
2Importing a Project
3What are Assets, etc?

Previous

Advantage of layline.io Workflows compared to traditional Microservices within layline.io using the K8S/Docker model

Next

Damn that Data Format Hell