MC2-Kafka Messaging

A Brief Introduction into Apache Kafka

Apache Kafka® is a distributed streaming platform. What exactly does that mean?

A streaming platform has three key capabilities:

  • Publish and subscribe to streams of records, similar to a message queue or enterprise messaging system.

  • Store streams of records in a fault-tolerant durable way.

  • Process streams of records as they occur.

Kafka is generally used for two broad classes of applications:

  • Building real-time streaming data pipelines that reliably get data between systems or applications

  • Building real-time streaming applications that transform or react to the streams of data

First a few concepts:

  • Kafka is run as a cluster on one or more servers that can span multiple data-centers.

  • The Kafka cluster stores streams of records in categories called topics.

  • Each record consists of a key, a value, and a timestamp.

Kafka has five core APIs but for now we pay attention to 2 ones:

  • The Producer API allows an application to publish a stream of records to one or more Kafka topics.

  • The Consumer API allows an application to subscribe to one or more topics and process the stream of records produced to them.

Topics

A topic is a category or feed name to which records are published.

Topics in Kafka are always multi-subscriber; that is, a topic can have zero, one, or many consumers that subscribe to the data written to it.

One point that is worth noting is how consumers read messages from Kafka.

What official documentation says:

Consumers label themselves with a consumer group name, and each record published to a topic is delivered to one consumer instance within each subscribing consumer group.

Consumer instances can be in separate processes or on separate machines.

Suppose you have 2 consumer groups and each group has 2 consumers, the simplest illustration of how messages will be delivered to consumers:

If all the consumer instances have the same consumer group, then the records will effectively be load balanced over the consumer instances.

If all the consumer instances have different consumer groups, then each record will be broadcast to all the consumer processes.

 

Guarantees

At a high-level Kafka gives the following guarantees:

  • Messages sent by a producer to a particular topic partition will be appended in the order they are sent. That is, if a record M1 is sent by the same producer as a record M2, and M1 is sent first, then M1 will have a lower offset than M2 and appear earlier in the log.

  • A consumer instance sees records in the order they are stored in the log.

  • For a topic with replication factor N, we will tolerate up to N-1 server failures without losing any records committed to the log.

 

This information is sufficient to start using Kafka topics, consumers & producers in MC2 engine.

For more information please refer to official documentation: Apache Kafka

Eagle MC2-Kafka Messaging System

Eagle MC2 engine provides now an ability to Publish/Receive messages to/from Apache Kafka platform.

To setup Kafka-Messaging we have to create back-end configuration part (JSON format) and Eagle Streams (Inbound/Outbound) with MC2KAFKA delivery method.

Make sure that your topic already exists in Kafka instance or ask admin to do it for you.

We would like to pay attention to one thing: Kafka allows automatic topic creation on the broker when subscribing to or assigning a topic. A topic being subscribed to will be automatically created only if the broker allows it.

JSON Kafka Connections Configuration Files

Kafka Connections configuration files should be placed as JSON files in: estar/tpe/dynamic/mc2/mc2kafka/connections/*.json

MC2 will read all files with the extension .json from mc2/mc2kafka/connections/ folder.

The connection configuration file will define the following parameters:

  • Region name (Oracle instance name as defined in db_connection.ini)

  • List of Kafka instance host names and port numbers

For example:

{     "Region": "Specific Oracle SID of the region", /*Specific Oracle SID of region name to which this config belongs*/     "Connections": [         {             "KafkaName": "external.kafka.cluster", /* do NOT use underscore character ("_") in name*/             "Nodes": [                 { // Node specific parameters                     "host": "kafka-node-01.eagleinvsys.com",                     "port": 1415,                     "parameters": {                     }                 },                 { // Node specific parameters                     "host": "kafka-node-02.eagleinvsys.com",                     "port": 1415,                     "parameters": {                     }                 }             ]         }      ] }

KafkaName - is the name of the kafka instance you will use when configuring a MC2KAFKA stream in Message Center Console.

Kafka-Messaging Producer

To create Kafka-Messaging Producer we have to create appropriate Eagle Stream with the following parameters:

Message Stream Title

your own stream name

Message Stream Title

your own stream name

Instance

mc2 instance with its AppID (=10000)

Delivery Method

MC2KAFKA

Stream Direction

Outbound

Delivery Format

choose from list (e.g.: XML)

Ruleset File

your working ruleset file (or any ruleset to stub this field). The ruleset is not used to transform the files. The stream is just a configuration pointing to the right kafka instance and destination.

Kafka Instance Name

KafkaName reference name from json connection configuration file (required)

Topic Name

Your Kafka Topic name (required)

MC events log

MC log level

URI parameters

leave it empty (advanced configuration)

Data Encryption

No (default) / SSL

SSL TrustStore Password

SSL KeyStore Password

SSL Key Password

leave it empty

(support of SSL (Secure Sockets Layer) and its successor, TLS (Transport Layer Security), which are protocols for establishing authenticated and encrypted connections)

Example of Kafka-Messaging Producer stream:

After creation of Eagle Kafka-Messaging Producer Stream you will be able to send data into Kafka topic by using well known SendAdapter rule element.

For Kafka-Messaging we registered kafkamessaging protocol.

Example of code snippet:

<SENDADAPTER NAME="SEND_TO_KAFKA" DATA=":IN_MESSAGE:" ADAPTER="'kafkamessaging:kafka_msg_producer'" />

In the example above data in :IN_MESSAGE: variable will be sent into my_kafka_topic_name Kafka topic.

For EJM Workflows in MC2 the following parameters can be used to define the destination to send data to:

MessagingExtractDeliveryURI

MessagingAckDeliveryURI

MessagingReplyDeliveryURI

The URI for these parameters should be kafkamessaging:streamname

The ExtractDeliveryMethod, ACKDeliveryMethod, ReplyDeliveryMethod parameters should be set to MESSAGE to enable the delivery of Extracts, ACK and TSRs to kafka.

MC2 Kafka Messaging Default Settings (Advanced)

maxRequestSize

The maximum size of a request in bytes. This setting will limit the number of record batches the producer will send in a single request to avoid sending huge requests. This is also effectively a cap on the maximum uncompressed record batch size. Note that the server has its own cap on the record batch size (after compression if compression is enabled) which may be different from this.

Default: 20971520 (2.0 Mb)

Kafka Messaging Consumer

To create Kafka-Messaging Consumer (an ability to read data from topic) we have to create an MC2KAFKA Message Center Stream with the following parameters:

Message Stream Title

the stream name

Message Stream Title

the stream name

Instance

mc2 instance with its AppID (=10000)

Delivery Method

MC2KAFKA

Stream Direction

Inbound

Delivery Format

choose from list (e.g.: XML)

Ruleset File

your working ruleset file to process data being read from topic into :IN_MESSAGE: variable;

For EJM workflows use:

eagle_ml-2-0_cm/mc2/ejm/workflow/workflow_listener.rsf

For simple extracts use:

eagle_ml-2-0_cm/out/xml/extract_service.rsf

Kafka Instance Name

KafkaName reference name from connection.json file (required)

Topic Name

Your Kafka Topic name (required)

MC events log

MC log level

Consumers count

The number of consumers that connect to kafka server

(default: 2)

Auto Offset Reset

What to do when there is no initial offset (uniquely identifies each record) or if an offset is out of range:

earliest: automatically reset the offset to the earliest offset

latest: automatically reset the offset to the latest offset

fail: throw exception to the consumer.

(default: earliest)

Batch records size

Max records to be processed at the same time

(default: 1)

Data Encryption

No (default) / SSL

URI parameters

leave it empty (advanced configuration)

SSL TrustStore Password

SSL KeyStore Password

SSL Key Password

leave it empty

(support of SSL (Secure Sockets Layer) and its successor, TLS (Transport Layer Security), which are protocols for establishing authenticated and encrypted connections)

Rule Variables

  • IN_MESSAGE - this variable will contain consumed Kafka topic message.

Example of Kafka-Messaging Consumer stream:

After creation of Eagle Kafka-Messaging Consumer Stream you will be able to receive data from Kafka topic into predefined :IN_MESSAGE: variable to process it.

Parallelism, Ordering, Partitions

As you already know Kafka stores all messages in abstract items called Topics. JMS as you may know has the same abstraction but they have difference, but before describe it we have to understand some other concepts. Lets quick review Eagle JMS feature. We believe it will help you to make decision what to use in the end for your needs - JMS or Kafka Messaging.

For example you subscribed to JMS Topic and after that you will be receiving all messages published to this topic. In case of single jms consumer - you will process all messages in order, one by one. In case if you set more than one consumer - you will be processing published messages concurrently by your jms consumers. Messages will be consumed by consumers concurrently in the order but in result they can be processed in out of the order.

Other main point of Eagle JMS - Sequential Reading - it ensures that the messages are processed strictly sequentially across the mc2 cluster.

What about this Eagle Kafka Messaging says?

First main thing - Kafka in common cases (without some details) does not preserve global data order in topics. You may noticed that in the Guarantees article above was mentioned that messages are published by producer will be appended in the order they are sent but it does not say about Topic! So, let dive into Kafka Topic structure.

For each topic, the Kafka cluster maintains a partitioned log that looks like this:

 

Each partition is an ordered, immutable sequence of records that is continually appended to — a structured commit log. The records in the partitions are each assigned a sequential id number called the offset that uniquely identifies each record within the partition.

So, in other words, data stored in Topics will be distributed among all partitions in current Topic. It means that data in single partition will be ordered but for whole Topic - is not.

Kafka Consumers (MC2 Kafka Inbound Streams) will be receiving data from Topics according assigned partitions within the consumer group (all or partially according internal consumer logic). For example in case of 2 consumers each of them might read only half of the Topic data (being assigned to half of partitions).

Thus Consumers Count parameter will affect how many partitions will be assigned per one consumer being subscribed to current Topic (“efficiency of reading” - similar to the load balancing).

However, the Batch Records Size parameter will affect how many records from partitions can be read at once for concurrent processing (“efficiency of records processing” - parallelism)

MC2 Kafka-Messaging Default Settings (Advanced level)

autoOffsetReset

What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):

  • earliest: automatically reset the offset to the earliest offset

  • latest: automatically reset the offset to the latest offset

  • none: throw exception to the consumer if no previous offset is found for the consumer's group

  • anything else: throw exception to the consumer.

Default: earliest

consumersCount

The number of consumers that connect to kafka server.

Default: 2

groupId

A string that uniquely identifies the group of consumer processes to which this consumer belongs. By setting the same group id multiple processes indicate that they are all part of the same consumer group. This option is required for consumers.

Default: eagle_mc2_kafka_messaging_group

SSL/TLS Communication with Kafka

Encryption and Authentication using SSL

Apache Kafka allows clients to connect over SSL. By default, SSL is disabled but can be turned on as needed.

First of all you have to make sure that Apache Kafka cluster you want to be connected is ready to establish SSL connections. In this document SSL and TLS are used interchangeably, as the configuration is similar. Use of the latest TLS version is recommended.

SSL (Secure Sockets Layer) and its successor, TLS (Transport Layer Security), are protocols for establishing authenticated and encrypted links between networked computers.

In order to establish a secured SSL/TLS connection with the Kafka cluster a certificate exchange has to occur. On MC2 side we will need to:

  • purchase and save a client certificate and private key to authenticate MC2 with Kafka. The client certificate will have to be sent to the Kafka server administrators to add the certificate to the trusted list.

  • obtain the Kafka server certificate from the Kafka server administrators and add it to the trusted store on MC2 Application Server.

A Java KeyStore (JKS) is a repository of security certificates – either authorization certificates or public key certificates – plus corresponding private keys, used for instance in SSL encryption.

The Kafka instance certificate should be imported to the Java trust store file in:

/estar/tpe/dynamic/mc2/private/mc2kafka/client.truststore.jks

The MC2 client certificate and private key should be imported to the Java keystore file in:

/estar/tpe/dynamic/mc2/private/mc2kafka/client.keystore.jks

The MC2KAFKA stream in Message Center Console should be configured so that:

  1. The Data Encryption parameter is set to SSL

  2. The SSL Trust Store, Key Store and Key password should be set in the SSL TrustStore Password, SSL KeyStore Password & SSL Key Password parameters.

For example:

Authentication Using SASL

Simple Authentication and Security Layer (SASL) is a framework for authentication and data security in Internet protocols.

A SASL mechanism implements a series of challenges and responses. Defined SASL mechanisms include:

  • GSSAPI, for Kerberos V5 authentication via the GSSAPI. GSSAPI offers a data-security layer.

  • PLAIN, a simple cleartext password mechanism, defined in RFC 4616

  • SCRAM (RFC 5802), modern challenge-response scheme based mechanism with channel binding support

  • OAUTHBEARER, OAuth 2.0 bearer tokens (RFC 6750), communicated through TLS

  • ANONYMOUS, for unauthenticated guest access

  • OTP, a one-time password mechanism. Obsoletes the SKEY mechanism.

  • others…..

Kafka supports the following SASL mechanisms:

  • GSSAPI (Kerberos)

  • PLAIN

  • SCRAM-SHA-256

  • SCRAM-SHA-512

  • OAUTHBEARER

SASL may be used with PLAINTEXT or SSL as the transport layer using the security protocol SASL_PLAINTEXT or SASL_SSL respectively. If SASL_SSL is used, then SSL must also be configured.

Authentication using SASL/PLAIN

SASL/PLAIN is a simple username/password authentication mechanism that is typically used with TLS for encryption to implement secure authentication.

To use SASL-SSL/PLAIN with Kafka-Messaging you have to go through next steps:

1. Make correct Kafka-Messaging connection settings (JSON file in /tpe/dynamic/mc2/mc2kafka/connections), example of this file entry:.

{ "KafkaName": "kafkaservice.sasl.ssl", /* do NOT use underscore character ("_") in name*/ "Nodes": [ { // Node specific parameters "host": "1.eagleinvsys.com", "port": 20572, // your instance SASL-SSL port number "parameters": { } }, { // Node specific parameters "host": "2.eagleinvsys.com", "port": 20572, // your instance SASL-SSL port number "parameters": { } } ] }
  1. Create and setup MC2KAFKA Streams (IN and OUT): all steps should be the same as for SSL configurations, but for SASL-SSL you have to set Data Encryption to SASL_SSL and set Encrypted URI Parameters to:

    saslJaasConfig=RAW(org.apache.kafka.common.security.plain.PlainLoginModule required username="alice" password="alice-secret";)&saslMechanism=PLAIN&securityProtocol=SASL_SSL
  2. Next parameters you have to change for your task - username and password, here are username="alice" password="alice-secret".

  3. Set all rest required parameters the same way as for SSL configuration.
    Example of MC2KAFKA outbound stream (marked SASL-SSL parameters):