Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

A Brief Introduction into Apache Kafka

Apache Kafka® is a distributed streaming platform. What exactly does that mean?

...

  • Kafka is run as a cluster on one or more servers that can span multiple data-centers.

  • The Kafka cluster stores streams of records in categories called topics.

  • Each record consists of a key, a value, and a timestamp.

Kafka has five core APIs but for now we pay attention to 2 ones:

  • The Producer API allows an application to publish a stream of records to one or more Kafka topics.

  • The Consumer API allows an application to subscribe to one or more topics and process the stream of records produced to them.

...

Topics

A topic is a category or feed name to which records are published.

...

Consumers label themselves with a consumer group name, and each record published to a topic is delivered to one consumer instance within each subscribing consumer group.

Consumer instances can be in separate processes or on separate machines.

...

If all the consumer instances have different consumer groups, then each record will be broadcast to all the consumer processes.

...

Guarantees

At a high-level Kafka gives the following guarantees:

...

For more information please refer to official documentation: http://kafka.apache.org/

Eagle MC2-Kafka Messaging System

Eagle MC2 engine provides now an ability to Publish/Receive messages to/from Apache Kafka platform.

To setup Kafka-Messaging we have to create back-end configuration part (JSON format) and Eagle Streams (Inbound/Outbound) with MC2KAFKA delivery method.

Make sure that your topic already exists in Kafka instance or ask admin to do it for you.

We would like to pay attention to one thing: Kafka allows automatic topic creation on the broker when subscribing to or assigning a topic. A topic being subscribed to will be automatically created only if the broker allows it.

JSON Kafka Connections Configuration Files

Kafka Connections configuration files should be placed as JSON files in: estar/tpe/dynamic/mc2/mc2kafka/connections/*.json

MC2 will read all files with the extension .json from mc2/mc2kafka/connections/ folder.

...

KafkaName - is the name of the kafka instance you will use when configuring a MC2KAFKA stream in Message Center Console.

Kafka-Messaging Producer

To create Kafka-Messaging Producer we have to create appropriate Eagle Stream with the following parameters:

Message Stream Title

your own stream name

Instance

mc2 instance with its AppID (=10000)

Delivery Method

MC2KAFKA

Stream Direction

Outbound

Delivery Format

choose from list (e.g.: XML)

Ruleset File

your working ruleset file (or any ruleset to stub this field). The ruleset is not used to transform the files. The stream is just a configuration pointing to the right kafka instance and destination.

Kafka Instance Name

KafkaName reference name from json connection configuration file (required)

Topic Name

Your Kafka Topic name (required)

MC events log

MC log level

URI parameters

leave it empty (advanced configuration)

Data Encryption

No (default) / SSL

SSL TrustStore Password

SSL KeyStore Password

SSL Key Password

leave it empty

(support of SSL (Secure Sockets Layer) and its successor, TLS (Transport Layer Security), which are protocols for establishing authenticated and encrypted connections)

...

After creation of Eagle Kafka-Messaging Producer Stream you will be able to send data into Kafka topic by using well known SendAdapter rule element.

For Kafka-Messaging we registered kafkamessagingprotocol.

Example of code snippet:

...

The ExtractDeliveryMethod, ACKDeliveryMethod, ReplyDeliveryMethod parameters should be set to MESSAGE to enable the delivery of Extracts, ACK and TSRs to kafka.

MC2 Kafka Messaging Default Settings (Advanced)

maxRequestSize

The maximum size of a request in bytes. This setting will limit the number of record batches the producer will send in a single request to avoid sending huge requests. This is also effectively a cap on the maximum uncompressed record batch size. Note that the server has its own cap on the record batch size (after compression if compression is enabled) which may be different from this.

Default: 20971520 (2.0 Mb)

Kafka Messaging Consumer

To create Kafka-Messaging Consumer (an ability to read data from topic) we have to create an MC2KAFKA Message Center Stream with the following parameters:

Message Stream Title

the stream name

Instance

mc2 instance with its AppID (=10000)

Delivery Method

MC2KAFKA

Stream Direction

Inbound

Delivery Format

choose from list (e.g.: XML)

Ruleset File

your working ruleset file to process data being read from topic into :IN_MESSAGE: variable;

For EJM workflows use:

eagle_ml-2-0_cm/mc2/ejm/workflow/workflow_listener.rsf

For simple extracts use:

eagle_ml-2-0_cm/out/xml/extract_service.rsf

Kafka Instance Name

KafkaName reference name from connection.json file (required)

Topic Name

Your Kafka Topic name (required)

MC events log

MC log level

Consumers count

The number of consumers that connect to kafka server

(default: 2)

Auto Offset Reset

What to do when there is no initial offset (uniquely identifies each record) or if an offset is out of range:

earliest: automatically reset the offset to the earliest offset

latest: automatically reset the offset to the latest offset

fail: throw exception to the consumer.

(default: earliest)

Batch records size

Max records to be processed at the same time

(default: 1)

Data Encryption

No (default) / SSL

URI parameters

leave it empty (advanced configuration)

SSL TrustStore Password

SSL KeyStore Password

SSL Key Password

leave it empty

(support of SSL (Secure Sockets Layer) and its successor, TLS (Transport Layer Security), which are protocols for establishing authenticated and encrypted connections)

...

After creation of Eagle Kafka-Messaging Consumer Stream you will be able to receive data from Kafka topic into predefined :IN_MESSAGE: variable to process it.

Parallelism, Ordering, Partitions

As you already know Kafka stores all messages in abstract items called Topics. JMS as you may know has the same abstraction but they have difference, but before describe it we have to understand some other concepts.

...

However, the Batch Records Size parameter will affect how many records from partitions can be read at once for concurrent processing (“efficiency of records processing” - parallelism)

MC2 Kafka-Messaging Default Settings (Advanced level)

autoOffsetReset

What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):

  • earliest: automatically reset the offset to the earliest offset

  • latest: automatically reset the offset to the latest offset

  • none: throw exception to the consumer if no previous offset is found for the consumer's group

  • anything else: throw exception to the consumer.

Default: earliest

consumersCount

The number of consumers that connect to kafka server.

Default: 2

groupId

A string that uniquely identifies the group of consumer processes to which this consumer belongs. By setting the same group id multiple processes indicate that they are all part of the same consumer group. This option is required for consumers.

Default: eagle_mc2_kafka_messaging_group

SSL/TLS Communication with Kafka

Encryption and Authentication using SSL

Apache Kafka allows clients to connect over SSL. By default, SSL is disabled but can be turned on as needed.

...

The Kafka instance certificate should be imported to the Java trust store file in:

/estar/tpe/dynamic/mc2/private/mc2kafka/client.truststore.jks

The MC2 client certificate and private key should be imported to the Java keystore file in:

/estar/tpe/dynamic/mc2/private/mc2kafka/client.keystore.jks

The MC2KAFKA stream in Message Center Console should be configured so that:

  1. The Data Encryption parameter is set to SSL

  2. The SSL Trust Store, Key Store and Key password should be set in the SSL TrustStore Password, SSL KeyStore Password & SSL Key Password parameters.

For example:

...

Authentication Using SASL

Simple Authentication and Security Layer (SASL) is a framework for authentication and data security in Internet protocols.

...

SASL may be used with PLAINTEXT or SSL as the transport layer using the security protocol SASL_PLAINTEXT or SASL_SSL respectively. If SASL_SSL is used, then SSL must also be configured.

Authentication using SASL/PLAIN

SASL/PLAIN is a simple username/password authentication mechanism that is typically used with TLS for encryption to implement secure authentication.

...