layline.io Blog

Output zu Kafka.

Showcase zum Lesen von Daten aus einer strukturierten Datei, Mapping von Datensätzen und Ausgabe der Daten in der Kafka-Cloud.

March 21, 2022
Reading time: 10 min.

Was wir demonstrieren

Wir zeigen ein einfaches layline.io Projekt, das Daten aus einer Datei liest und deren Inhalt in ein Kafka-Topic ausgibt.

Geplante Ablaufskizze
Geplante Ablaufskizze

Um den Showcase in einem realen Setup zu verfolgen, können Sie die Assets dieses Projekts aus dem Ressourcen-Bereich unten herunterladen. Lesen Sie this, um zu erfahren, wie Sie das Projekt in Ihre Umgebung importieren können.

Konfiguration

Der Arbeitsablauf

Gliederung

Der Workflow dieses Showcases wurde mit dem Workflow-Editor von layline.io erstellt und sieht wie folgt aus:

File-to-Kafka workflow
File-to-Kafka workflow
  • (1) Eingabeprozessor: Einlesen einer Eingabedatei mit einer Header/Detail/Trailer-Struktur, dann
  • (2) Flow Processor: Umwandlung in ein Ausgabeformat, das anschließend
  • (3) Output Processor: Schreiben in ein Kafka-Thema.

Für diesen Showcase verwenden wir ein Kafka-Topic, das von Cloud Karafka (https://www.cloudkarafka.com/) gehostet wird. Wenn Sie den Showcase also selbst ausführen, benötigen Sie keine eigene Kafka-Installation.

Konfiguration der zugrunde liegenden Assets

Der Workflow basiert auf einer Reihe von zugrunde liegenden Assets, die mit dem Asset Editor konfiguriert werden. Die logische Verbindung zwischen Workflow und Assets kann wie folgt verstanden werden:

Logische Verknüpfung von Workflow und Assets
Logische Verknüpfung von Workflow und Assets

Workflows bestehen aus einer Reihe von Prozessoren, die durch Links miteinander verbunden sind.

Prozessoren basieren auf Assets (siehe Ressourcen für weitere Informationen). Assets sind Konfigurationseinheiten, die einer bestimmten Klasse und einem bestimmten Typ angehören. In der obigen Abbildung sehen wir einen Processor mit dem Namen "InputFile ", der der Klasse Input Processor und dem Typ Stream Input Processor angehört. Er stützt sich wiederum auf zwei andere Assets "Source Asset " und "Format Asset ", die vom Typ File System Source bzw. Generic Format sind.

Zusammengefasst:

  • Ein Workflow besteht aus miteinander verbundenen Prozessoren
  • Prozessoren stützen sich auf Assets, die sie definieren
  • Assets können auf andere Assets zurückgreifen

In den folgenden Abschnitten zeigen wir

  • die Anatomie des Workflows und einige wichtige Konfigurationen, und
  • wie man den Workflow einsetzt und ausführt

Umgebung Asset: "Meine-Umgebung"

Erstens: layline.io kann mit Hilfe von Umgebungs-Assets mehrere verschiedene Umgebungen verwalten. Dies ist sehr hilfreich, wenn das gleiche Projekt in Test-, Staging- und Produktionsumgebungen verwendet wird, die unterschiedliche Verzeichnisse, Verbindungen, Passwörter etc. erfordern.
Wir verwenden ein Environment Asset (2) in diesem Projekt. Schauen wir uns das mal an.

Environment Asset-Definition
Environment Asset-Definition

Unter (3) sehen Sie die Umgebungsvariablen, die wir erstellt haben. Falls Sie das Projekt selbst testen, müssen Sie die Werte wahrscheinlich so ändern, dass sie auf gültige Verzeichnisse von Ihnen verweisen, sonst erhalten Sie beim Start einen Fehler.

Variablen wie diese können im gesamten Projekt verwendet werden, indem man ein Makro wie ${lay:dirIn} verwendet. Betriebssystem- oder Java-Systemumgebungsvariablen werden mit env: bzw. sys: vorangestellt, anstatt mit lay:. Wenn Sie das Input Source Asset weiter unten untersuchen, werden Sie sehen, wie dies funktioniert und warum es wichtig ist, dass Sie sie an Ihre Umgebung anpassen, um das Projekt auszuführen. Sie können auch ein völlig neues Umgebungs-Asset neben dem mit dem Beispiel gelieferten erstellen und dann das Projekt zusammen mit Ihrem Umgebungs-Asset bereitstellen, wie wir im Abschnitt über die Bereitstellung dieses Showcases sehen werden.

Stream Input Processor: "InputFile"

Der Input Processor (2) (Name: InputFile / Typ: Stream Input Processor) sorgt für das Einlesen der Eingabedateien und die Weiterleitung der Daten innerhalb des Workflows. Um seine Aufgabe zu erfüllen, muss der Input Processor wissen, wo die Daten zu lesen sind und wie der Inhalt zu interpretieren ist. Beide Informationsgruppen werden durch spezielle Assets (Source Asset und Format Asset) repräsentiert, die dem Input Asset zugeordnet sind:

Stream-Input-Prozessor und Asset-Zuordnung
Stream-Input-Prozessor und Asset-Zuordnung

Schauen wir uns an, wie die unterstützenden Assets konfiguriert sind:

Generisches Format Asset: "InputFileFormat"

layline.io bietet die Möglichkeit, komplexe Datenstrukturen mit einer eigenen Grammatiksprache zu definieren. Die Datei in unserem Beispiel ist ein Beispiel für eine Banktransaktion. Sie besteht aus einem Header-Record mit zwei Feldern, einer Reihe von Detail-Records, die die Transaktionsdetails enthalten, und schließlich einem Trailer-Record. Insgesamt eine sehr gängige Dateistruktur.

Generic Format Asset-Definition
Generic Format Asset-Definition

Alle Assets sind im Asset-Editor (1) zu finden. Das konfigurierte Generic Format Asset "InputFileFormat " ist in der Asset-Struktur auf der linken Seite des Editors zu finden.

Diese Formatkonfiguration wird im Dateisystem-Quell-Asset "InputSource " verwendet.

Dateisystem-Quell-Asset: "InputSource"

Das (1) "InputSource " ist ein Asset des Typs Dateisystemquelle, das verwendet wird, um zu definieren, woher die Datei gelesen wird. Es enthält Informationen über den Speicherort des Verzeichnisses für die Eingabe (2), Fehler und Beendigung (done) der Dateiverarbeitung, Abfrageintervalle, Dateisortierung, Fehlerverhalten und mehr.

File-System-Source Asset-Definition
File-System-Source Asset-Definition

Rückblick: Stream Input Processor "InputFile"

Wenn das Eingabeformat und die Eingabequelle Assets bereit sind, kann der Input File Processor (2) (vom Typ Stream Input Processor) im Workflow (1) definiert werden:

Stream Input Processor-Definition
Stream Input Processor-Definition

Die beiden zuvor definierten Assets "InputFileFormat " und "InputSource " werden diesem Prozessor zugewiesen ((3) und (4)). Abgesehen von einigen weiteren Parametern ist die Konfiguration des Prozessors "InputFile " (2) nun abgeschlossen.

Flow Processor: Map

Als nächstes definieren wir den Flussprozessor "Mapping", der auf einem Asset des Typs Mapping basiert, wie im nächsten Abschnitt definiert.

Logical Mapping Processor to Asset association
Logical Mapping Processor to Asset association

Vermögenswert: Mapping

Die Struktur der Datei, die wir lesen, entspricht nicht genau der Struktur dessen, was wir in Kafka schreiben. Daher müssen wir die Werte von der Eingabe zur Ausgabe mit einem entsprechenden Asset (1) zuordnen.

Mapping Asset-Definition
Mapping Asset-Definition

Wie bereits erläutert, besteht das Format der Eingabedatei aus einem Header, Details und einem Trailer Record. Das Gleiche gilt für das Ausgabeformat. Die obige Abbildung zeigt das Mapping vom Quell- (Input) zum Zielformat (Output) für die Detaildaten (2). Die verwendete Notation basiert auf der Definition der Eingangs- ("BT_IN") und Ausgangsgrammatik ("BT_OUT").

Stream Output Processor: Kafka

Der letzte Prozessor im Workflow, den wir definieren, ist der Output-Prozessor "Kafka-Out".

Stream Output Processor-Definition
Stream Output Processor-Definition

Sie hängt von drei zugrunde liegenden Vermögenswerten ab:

  • Output Asset: Definiert die Kafka-Themen und -Partitionen, in die wir schreiben. Dies wiederum hängt von zwei weiteren Assets ab, nämlich
  • Kafka-Sink-Asset**_: Der Sink, den das Output-Asset verwenden kann, um Daten zu senden
  • Generic Format Asset: Definiert, in welchem Format die Daten in Kafka geschrieben werden sollen
  • Kafka-Verbindungs-Asset**_: Definiert die physischen Kafka-Verbindungsparameter.

Generisches Format Asset: "OutputMessageFormat"

Die Daten, die wir an Kafka ausgeben wollen, ähneln der Eingabe, sind aber nicht völlig identisch. Wie beim input format definieren wir daher ein Asset vom Typ Generic Format und nennen es "OutputMessageFormat". Wir definieren die Grammatik analog zur Eingabegrammatik mit nur geringen Unterschieden (2).

Konfiguration des generischen Format-Assets für die Kafka-Ausgabe
Konfiguration des generischen Format-Assets für die Kafka-Ausgabe

Kafka-Verbindungs-Asset: "Cloud-Karafka-Verbindung"

Für die Ausgabe an Kafka müssen wir zunächst ein Kafka Connection Asset definieren. Hier legen wir fest, wie wir uns physisch mit Cloud Karafka verbinden.

Konfiguration der Kafka-Verbindung
Konfiguration der Kafka-Verbindung

Der wichtigste Teil hier sind natürlich die Kafka-Einstellungen. Schauen wir sie uns genauer an:

Kafka Settings
Kafka Settings
  • (1) Bootstrap-Server: Die Adressen von einem oder mehreren Bootstrap-Servern. Für unser Karafka-Hosting sind sie definiert als sulky-01.srvs.cloudkafka.com:9094,sulky-02.srvs.cloudkafka.com: 9094,sulky-03.srvs.cloudkafka.com:9094.
  • (2) SSL verwenden: Legt fest, ob es sich um eine SSL-Verbindung handelt. Für Cloud Karafka ist dies eine SASL/SCRAM-gesicherte Verbindung. Wir aktivieren daher SSL.
  • (3) Authentifizierungstyp: Hier können Sie Keine, SASL / Klartext oder SASL / SCRAM wählen. Cloud Karafka erfordert SASL/SCRAM
  • (4) Zertifikatstyp: Sie haben die Wahl zwischen Benutzer/Geheimnis und Benutzer/Passwort. In unserem Fall haben wir der Einfachheit halber Benutzer/Passwort verwendet.
  • (5/6) Benutzername/Kennwort: Der richtige Benutzername und das richtige Passwort sind im Showcase-Projekt enthalten, das Sie im Abschnitt Ressourcen herunterladen können.

Kafka Sink Asset: "CloudKarafka"

Als nächstes definieren wir ein Kafka-Sink-Asset. Das Kafka-Sink-Asset verwendet die Kafka-Verbindung, die wir gerade definiert haben. Es ist jedoch nicht der Ort, an dem wir definieren, welches Topic ausgegeben werden soll.

Stellen Sie sicher, dass die zuvor definierte Kafka-Verbindung zugewiesen ist (2).

Konfiguration des Kafka-Sink-Assets
Konfiguration des Kafka-Sink-Assets

Rückblick: Kafka Output Processor "Kafka-Out"

Nachdem wir oben die Format- und Sink-Assets definiert haben, können wir nun das Kafka-Output-Asset im Workflow definieren (1):

Endgültiger Kafka-Ausgabeprozessor
Endgültiger Kafka-Ausgabeprozessor

Prüfen Sie, wie sowohl das Format OutputMessageFormat (2) als auch die Senke (3) diesem Prozessor zugeordnet sind.

Zusätzliche Einstellungen:

Konfiguration von Kafka-Topics und -Partitionen
Konfiguration von Kafka-Topics und -Partitionen
  • Routing-Modus: Sie haben die Wahl zwischen Routing Table und Exclusive Partition. In diesem Fall ist Routing Table geeignet, da wir für diesen Showcase keinen exklusiven Partitionszugang benötigen.
  • Thema: Name des Themas, in das wir schreiben.

Es gibt noch weitere Optionen, die wir setzen können, die aber für den Zweck der Demo nicht relevant sind. Sie können sie in der Online-Dokumentation nachschlagen.

Config Zusammenfassung

Damit ist die Konfiguration des Workflows abgeschlossen.

Deploy & Run

Übertragen der Bereitstellung

Wir sind bereit, den Workflow zu testen. Dazu müssen wir ihn auf einen Reactive Engine Cluster übertragen. Sie können einfach den Cluster auf Ihrem Laptop verwenden (einzelner Knoten). Oder wenn Sie anderswo einen größeren layline.io-Cluster haben, können Sie ihn dort bereitstellen.

Zum Deployment wechseln wir auf den Reiter DEPLOYMENT des Projekts (1):

Bereitstellung in einem lokalen Cluster
Bereitstellung in einem lokalen Cluster

Wir erstellen eine Engine-Konfiguration (2) für die Bereitstellung des Projekts. Diese definiert die Teile des Projekts, die wir bereitstellen möchten. In unserem Beispiel wäre das der eine Workflow, den wir definiert haben, sowie das dazugehörige Environment Asset.

Da wir das Projekt im lokalen Cluster bereitstellen wollen, wählen wir "Deploy to Cluster" und dann unser vordefiniertes "Local Cluster" Setup (3 und 4).

Jede Bereitstellung benötigt einen Tag (5). Wir verwenden "tut-kafka-" gefolgt von einem Makro "${build-timestamp}", um das Deployment zu identifizieren. Das Makro wird bei der Übertragung des Deployments durch einen Zeitstempel ersetzt. Auf diese Weise wird sichergestellt, dass wir mit jeder Bereitstellung immer ein anderes Tag erhalten.

Vergewissern Sie sich, dass der soeben erstellte Workflow in die Liste der "Assets to deploy" (6) aufgenommen wurde, da sonst nichts bereitgestellt wird.

Wir benötigen auch die Umgebungsvariablen, die wir im Quell-Asset verwenden, um die Verzeichnisse zu definieren, aus denen wir lesen (7).

Schließlich starten wir die Übertragung der Bereitstellung durch Klicken auf "TRANSFER DEPLOYMENT TO CLUSTER" (8) (Vergewissern Sie sich, dass der Cluster, für den Sie die Bereitstellung vornehmen, in Betrieb ist).

Wenn die Bereitstellung erfolgreich war, sollten Sie dies sehen:

Deployment Ergebnis
Deployment Ergebnis

Andernfalls wird eine Fehlermeldung angezeigt, die Sie anleitet, wie Sie das Problem beheben können.

Aktivieren der Bereitstellung

Wir sollten nun bereit sein, die Bereitstellung zu aktivieren. Dazu wechseln wir auf die Registerkarte "CLUSTER" (1). Hier erhalten wir einen Überblick über alle "Cluster". Falls Sie mehr als einen Cluster verwalten, vergewissern Sie sich, dass Sie in der Dropdown-Box oben links den richtigen ausgewählt haben.

Aktivieren eines Deployments
Aktivieren eines Deployments

Machen Sie sie zur Standardbereitstellung

Wählen Sie in der Baumstruktur auf der linken Seite "Einsatzspeicher".
Hier finden wir alle Einsätze, die dem ausgewählten Cluster derzeit bekannt sind. In unserem Beispiel-Screenshot sehen wir im Abschnitt "Einsatzkonfiguration"

  1. "EinsatzRoot": Dies ist die grundlegende leere Standardbereitstellung, die immer vorhanden ist.
  2. "tut-kafka-20220204184304": Dies ist das Deployment, das wir gerade in den Cluster übertragen haben.

Um nun das neue Deployment auf dem Cluster zu aktivieren, wählen Sie es aus (1) und aktivieren Sie das Kästchen "is the cluster's default deployment" (2).

Ein Deployment als Standard festlegen
Ein Deployment als Standard festlegen

Zeitplan

Jetzt, da die Bereitstellung auf dem Cluster aktiv ist, müssen wir überprüfen, ob tatsächlich Instanzen des Workflows ausgeführt werden. Wenn Sie diesen Workflow zum ersten Mal bereitstellen, lautet die Antwort wahrscheinlich "nein".

Lassen Sie uns das überprüfen:

  • Wählen Sie die Registerkarte "Cluster" (1)
  • Wählen Sie den Eintrag "Scheduler Master" in der Baumstruktur auf der linken Seite (2)
Scheduling eines Workflows
Scheduling eines Workflows
  • Wählen Sie in der Box Scheduler Settings den Knoten Scheduler in der Baumstruktur (3)
  • Vergewissern Sie sich anschließend, dass die Zielanzahl der Instanzen auf mindestens 1 gesetzt ist (4). Wenn Sie eine höhere Zahl einstellen, werden mehr Instanzen desselben Workflows gestartet.

Sie müssen dies nur einmal tun. Wenn Sie den Workflow das nächste Mal bereitstellen, merkt sich die Reactive Engine die Anzahl der Instanzen, die Sie mit diesem Workflow starten möchten. Sie können auch die gewünschten Scheduler-Einstellungen als Teil eines Deployments definieren. Das ist aber ein Thema für einen anderen Showcase.

Engine-Status

Wechseln Sie auf die Registerkarte "Engine" (1). Stellen Sie sicher, dass alle Engine-Kategorien grün sind (2). Überprüfen Sie auch den Status der einzelnen Assets in der Baumstruktur auf der linken Seite (3).

Engine-Status bei aktiviertem Deployment
Engine-Status bei aktiviertem Deployment

Einspeisen der Testdatei

Zum Testen füttern wir unsere Testdatei in das Eingabeverzeichnis, das wir hier konfiguriert haben.

Um dann zu überprüfen, ob die Verarbeitung erfolgreich war, wechseln Sie zur Registerkarte "Audit Trail" (1). Stellen Sie sicher, dass Sie "Streams" (2) sehen. Markieren Sie den ersten Eintrag in der Liste (3). Dort sollte stehen, dass vier Nachrichten verarbeitet wurden. Vergewissern Sie sich, dass der Zeitstempel des Eintrags mit dem Zeitpunkt übereinstimmt, zu dem Sie die Eingabedatei im Verzeichnis zur Verarbeitung abgelegt haben. Sie können die einzelnen Protokolleinträge für die Verarbeitung dieser Datei auf der rechten Seite einsehen (4).

Prüfung des Audit Trail auf erfolgreichen Bearbeitungsstatus
Prüfung des Audit Trail auf erfolgreichen Bearbeitungsstatus

Sie können das Cloud-Karafka-Thema (Anmeldeinformationen in der Ressource am Ende des Dokuments) mit einem Tool Ihrer Wahl überprüfen, um die Themeneinträge zu prüfen:

Kafka topic Sicht (3rd party)
Kafka topic Sicht (3rd party)

Zusammenfassung

Dieser Showcase zeigt, wie Sie einen File-to-Kafka Workflow on-the-fly ohne großen Aufwand erstellen können. Und Sie erhalten noch viel mehr mit diesem Out-of-the-Box: Reaktiv - Umfasst das Paradigma der reaktiven Verarbeitung. Vollständig auf reaktiver Stream-Verwaltung aufbauend

  • Hohe Skalierbarkeit -- Skaliert innerhalb einer Engine-Instanz und darüber hinaus über mehrere Engines und verteilte Knoten Ausfallsicherheit -- Failover sicher in verteilten Umgebungen. Vollständiger 24/7-Betrieb und Upgrademöglichkeit
  • Automatische Bereitstellung -- Bereitstellung geänderter Konfigurationen mit einem Klick
  • Echtzeit und Batch** -- Führen Sie sowohl Echtzeit- als auch Batch-Datenintegrationen über dieselbe Plattform aus
  • Metriken -- Automatische Generierung von Metriken zur Verwendung in Ihrem bevorzugten Überwachungs- und Alarmierungs-Toolkit (z.B. Prometheus)

Es gibt zu viele Funktionen, um sie hier zu erklären. Weitere Informationen finden Sie in der Dokumentation oder kontaktieren Sie uns einfach unter hello@layline.io.

Vielen Dank für die Lektüre!

Ressourcen

#Projektdateien
1Github: Simple Kafka Project
2Eingabetestdateien im Verzeichnis _test_files des Projekts
3Cloud-Karafka-Anmeldeinformationen in der Datei cloud-karafka-credentials.txt des Projekts
#Documentation
1Getting Started
2Importing a Project
3What are Assets, etc?