Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

A Brief Introduction into Apache Kafka

...

  • Kafka is run as a cluster on one or more servers that can span multiple data-centers.

  • The Kafka cluster stores streams of records in categories called topics.

  • Each record consists of a key, a value, and a timestamp.

Kafka has five core APIs but for now we pay attention to 2 ones:

  • The Producer API allows an application to publish a stream of records to one or more Kafka topics.

  • The Consumer API allows an application to subscribe to one or more topics and process the stream of records produced to them.

...

Topics

A topic is a category or feed name to which records are published.

...

Consumers label themselves with a consumer group name, and each record published to a topic is delivered to one consumer instance within each subscribing consumer group.

Consumer instances can be in separate processes or on separate machines.

...

To setup Kafka-Messaging we have to create back-end configuration part (JSON format) and Eagle Streams (Inbound/Outbound) with MC2KAFKA delivery method.

Make sure that your topic already exists in Kafka instance or ask admin to do it for you.

...

Kafka Connections configuration files should be placed as JSON files in: estar/tpe/dynamic/mc2/mc2kafka/connections/*.json

MC2 will read all files with the extension .json from mc2/mc2kafka/connections/ folder.

...

  • Region name (Oracle instance name as defined in db_connection.ini)

  • List of Kafka instance host names and port numbers

For example:

Code Block
languagejson
{
    "Region": "Specific Oracle SID of the region", /*Specific Oracle SID of region name to which this config belongs*/
    "Connections": [
        {
            "KafkaName": "external.kafka.cluster", /* do NOT use underscore character ("_") in name*/
            "Nodes": [
                { // Node specific parameters
                    "host": "kafka-node-01.eagleinvsys.com",
                    "port": 1415,
                    "parameters": {
                    }
                },
                { // Node specific parameters
                    "host": "kafka-node-02.eagleinvsys.com",
                    "port": 1415,
                    "parameters": {
                    }
                }
            ]
        }
     ]
}

KafkaName - is the name of the kafka instance you will use when configuring a MC2KAFKA stream in Message Center Console.

...

To create Kafka-Messaging Producer we have to create appropriate Eagle Stream with the following parameters:

Message Stream Title

your own stream name

Instance

mc2 instance with its AppID (=10000)

Delivery Method

MC2KAFKA

Stream Direction

Outbound

Delivery Format

choose from list (e.g.: XML)

Ruleset File

your working ruleset file (or any ruleset to stub this field). The ruleset is not used to transform the files. The stream is just a configuration pointing to the right kafka instance and destination.

Kafka Instance Name

KafkaName reference name from json connection configuration file (required)

Topic Name

Your Kafka Topic name (required)

MC events log

MC log level

URI parameters

leave it empty (advanced configuration)

Data Encryption

No (default) / SSL

SSL TrustStore Password

SSL KeyStore Password

SSL Key Password

leave it empty

(support of SSL (Secure Sockets Layer) and its successor, TLS (Transport Layer Security), which are protocols for establishing authenticated and encrypted connections)

Example of Kafka-Messaging Producer stream:

...

After creation of Eagle Kafka-Messaging Producer Stream you will be able to send data into Kafka topic by using well known SendAdapter rule element.

For Kafka-Messaging we registered kafkamessagingprotocol.

Example of code snippet:

...

MC2 Kafka Messaging Default Settings (Advanced)

maxRequestSize

The maximum size of a request in bytes. This setting will limit the number of record batches the producer will send in a single request to avoid sending huge requests. This is also effectively a cap on the maximum uncompressed record batch size. Note that the server has its own cap on the record batch size (after compression if compression is enabled) which may be different from this.

Default: 20971520 (2.0 Mb)

Kafka Messaging Consumer

To create Kafka-Messaging Consumer (an ability to read data from topic) we have to create an MC2KAFKA Message Center Stream with the following parameters:

Message Stream Title

the stream name

Instance

mc2 instance with its AppID (=10000)

Delivery Method

MC2KAFKA

Stream Direction

Inbound

Delivery Format

choose from list (e.g.: XML)

Ruleset File

your working ruleset file to process data being read from topic into :IN_MESSAGE: variable;

For EJM workflows use:

eagle_ml-2-0_cm/mc2/ejm/workflow/workflow_listener.rsf

For simple extracts use:

eagle_ml-2-0_cm/out/xml/extract_service.rsf

Kafka Instance Name

KafkaName reference name from connection.json file (required)

Topic Name

Your Kafka Topic name (required)

MC events log

MC log level

Consumers count

The number of consumers that connect to kafka server

(default: 2)

Auto Offset Reset

What to do when there is no initial offset (uniquely identifies each record) or if an offset is out of range:

earliest: automatically reset the offset to the earliest offset

latest: automatically reset the offset to the latest offset

fail: throw exception to the consumer.

(default: earliest)

Batch records size

Max records to be processed at the same time

(default: 1)

Data Encryption

No (default) / SSL

URI parameters

leave it empty (advanced configuration)

SSL TrustStore Password

SSL KeyStore Password

SSL Key Password

leave it empty

(support of SSL (Secure Sockets Layer) and its successor, TLS (Transport Layer Security), which are protocols for establishing authenticated and encrypted connections)

Rule Variables

  • IN_MESSAGE - this variable will contain consumed Kafka topic message.

...

MC2 Kafka-Messaging Default Settings (Advanced level)

autoOffsetReset

What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):

  • earliest: automatically reset the offset to the earliest offset

  • latest: automatically reset the offset to the latest offset

  • none: throw exception to the consumer if no previous offset is found for the consumer's group

  • anything else: throw exception to the consumer.

Default: earliest

consumersCount

The number of consumers that connect to kafka server.

Default: 2

groupId

A string that uniquely identifies the group of consumer processes to which this consumer belongs. By setting the same group id multiple processes indicate that they are all part of the same consumer group. This option is required for consumers.

Default: eagle_mc2_kafka_messaging_group

SSL/TLS Communication with Kafka

...

The Kafka instance certificate should be imported to the Java trust store file in:

/estar/tpe/dynamic/mc2/private/mc2kafka/client.truststore.jks

The MC2 client certificate and private key should be imported to the Java keystore file in:

/estar/tpe/dynamic/mc2/private/mc2kafka/client.keystore.jks

The MC2KAFKA stream in Message Center Console should be configured so that:

  1. The Data Encryption parameter is set to SSL

  2. The SSL Trust Store, Key Store and Key password should be set in the SSL TrustStore Password, SSL KeyStore Password & SSL Key Password parameters.

For example:

...

Authentication Using SASL

...

To use SASL-SSL/PLAIN with Kafka-Messaging you have to go through next steps:

  1. Make correct Kafka-Messaging connection settings (JSON file in /tpe/dynamic/mc2/mc2kafka/connections), example of this file entry:

.

Code Block
languagejs
{
    "KafkaName": "kafkaservice.sasl.ssl", /* do NOT use underscore character ("_") in name*/
    "Nodes": [
          { // Node specific parameters
            "host": "1.eagleinvsys.com",
            "port": 20572, // your instance SASL-SSL port number
            "parameters": {
            }
          },
		  { // Node specific parameters
            "host": "2.eagleinvsys.com",
            "port": 20572, // your instance SASL-SSL port number
            "parameters": {
            }
          }
    ]
}

  1. Create and setup MC2KAFKA Streams (IN and OUT): all steps should be the same as for SSL configurations, but for SASL-SSL you have to set Data Encryption to SASL_SSL and set Encrypted URI Parameters to

    Code Block
    saslJaasConfig=RAW(org.apache.kafka.common.security.plain.PlainLoginModule required username="alice" password="alice-secret";)&saslMechanism=PLAIN&securityProtocol=SASL_SSL

    next parameters you have to change for your task - username and password, here are username="alice" password="alice-secret".

  2. Set all rest required parameters the same way as for SSL configuration

Example of MC2KAFKA outbound stream (marked SASL-SSL parameters):

...