...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
A Brief Introduction into Apache Kafka
Apache Kafka® is a distributed streaming platform. What exactly does that mean?
...
Kafka is run as a cluster on one or more servers that can span multiple data-centers.
The Kafka cluster stores streams of records in categories called topics.
Each record consists of a key, a value, and a timestamp.
Kafka has five core APIs but for now we pay attention to 2 ones:
The Producer API allows an application to publish a stream of records to one or more Kafka topics.
The Consumer API allows an application to subscribe to one or more topics and process the stream of records produced to them.
...
Topics
A topic is a category or feed name to which records are published.
...
Consumers label themselves with a consumer group name, and each record published to a topic is delivered to one consumer instance within each subscribing consumer group.
Consumer instances can be in separate processes or on separate machines.
...
If all the consumer instances have different consumer groups, then each record will be broadcast to all the consumer processes.
...
Guarantees
At a high-level Kafka gives the following guarantees:
...
For more information please refer to official documentation: http://kafka.apache.org/
Eagle MC2-Kafka Messaging System
Eagle MC2 engine provides now an ability to Publish/Receive messages to/from Apache Kafka platform.
To setup Kafka-Messaging we have to create back-end configuration part (JSON format) and Eagle Streams (Inbound/Outbound) with MC2KAFKA delivery method.
Make sure that your topic already exists in Kafka instance or ask admin to do it for you.
We would like to pay attention to one thing: Kafka allows automatic topic creation on the broker when subscribing to or assigning a topic. A topic being subscribed to will be automatically created only if the broker allows it.
JSON Kafka Connections Configuration Files
Kafka Connections configuration files should be placed as JSON files in: estar/tpe/dynamic/mc2/mc2kafka/connections/*.json
MC2 will read all files with the extension .json from mc2/mc2kafka/connections/ folder.
...
KafkaName - is the name of the kafka instance you will use when configuring a MC2KAFKA stream in Message Center Console.
Kafka-Messaging Producer
To create Kafka-Messaging Producer we have to create appropriate Eagle Stream with the following parameters:
Message Stream Title | your own stream name |
---|---|
Instance | mc2 instance with its AppID (=10000) |
Delivery Method | MC2KAFKA |
Stream Direction | Outbound |
Delivery Format | choose from list (e.g.: XML) |
Ruleset File | your working ruleset file (or any ruleset to stub this field). The ruleset is not used to transform the files. The stream is just a configuration pointing to the right kafka instance and destination. |
Kafka Instance Name | KafkaName reference name from json connection configuration file (required) |
Topic Name | Your Kafka Topic name (required) |
MC events log | MC log level |
URI parameters | leave it empty (advanced configuration) |
Data Encryption | No (default) / SSL |
SSL TrustStore Password SSL KeyStore Password SSL Key Password | leave it empty (support of SSL (Secure Sockets Layer) and its successor, TLS (Transport Layer Security), which are protocols for establishing authenticated and encrypted connections) |
...
After creation of Eagle Kafka-Messaging Producer Stream you will be able to send data into Kafka topic by using well known SendAdapter rule element.
For Kafka-Messaging we registered kafkamessagingprotocol.
Example of code snippet:
...
The ExtractDeliveryMethod, ACKDeliveryMethod, ReplyDeliveryMethod parameters should be set to MESSAGE to enable the delivery of Extracts, ACK and TSRs to kafka.
MC2 Kafka Messaging Default Settings (Advanced)
maxRequestSize | The maximum size of a request in bytes. This setting will limit the number of record batches the producer will send in a single request to avoid sending huge requests. This is also effectively a cap on the maximum uncompressed record batch size. Note that the server has its own cap on the record batch size (after compression if compression is enabled) which may be different from this. Default: 20971520 (2.0 Mb) |
---|
Kafka Messaging Consumer
To create Kafka-Messaging Consumer (an ability to read data from topic) we have to create an MC2KAFKA Message Center Stream with the following parameters:
Message Stream Title | the stream name |
---|---|
Instance | mc2 instance with its AppID (=10000) |
Delivery Method | MC2KAFKA |
Stream Direction | Inbound |
Delivery Format | choose from list (e.g.: XML) |
Ruleset File | your working ruleset file to process data being read from topic into :IN_MESSAGE: variable; For EJM workflows use: eagle_ml-2-0_cm/mc2/ejm/workflow/workflow_listener.rsf For simple extracts use: eagle_ml-2-0_cm/out/xml/extract_service.rsf |
Kafka Instance Name | KafkaName reference name from connection.json file (required) |
Topic Name | Your Kafka Topic name (required) |
MC events log | MC log level |
Consumers count | The number of consumers that connect to kafka server (default: 2) |
Auto Offset Reset | What to do when there is no initial offset (uniquely identifies each record) or if an offset is out of range: earliest: automatically reset the offset to the earliest offset latest: automatically reset the offset to the latest offset fail: throw exception to the consumer. (default: earliest) |
Batch records size | Max records to be processed at the same time (default: 1) |
Data Encryption | No (default) / SSL |
URI parameters | leave it empty (advanced configuration) |
SSL TrustStore Password SSL KeyStore Password SSL Key Password | leave it empty (support of SSL (Secure Sockets Layer) and its successor, TLS (Transport Layer Security), which are protocols for establishing authenticated and encrypted connections) |
...
After creation of Eagle Kafka-Messaging Consumer Stream you will be able to receive data from Kafka topic into predefined :IN_MESSAGE: variable to process it.
Parallelism, Ordering, Partitions
As you already know Kafka stores all messages in abstract items called Topics. JMS as you may know has the same abstraction but they have difference, but before describe it we have to understand some other concepts.
...
However, the Batch Records Size parameter will affect how many records from partitions can be read at once for concurrent processing (“efficiency of records processing” - parallelism)
MC2 Kafka-Messaging Default Settings (Advanced level)
autoOffsetReset | What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):
Default: earliest |
---|---|
consumersCount | The number of consumers that connect to kafka server. Default: 2 |
groupId | A string that uniquely identifies the group of consumer processes to which this consumer belongs. By setting the same group id multiple processes indicate that they are all part of the same consumer group. This option is required for consumers. Default: eagle_mc2_kafka_messaging_group |
SSL/TLS Communication with Kafka
Encryption and Authentication using SSL
Apache Kafka allows clients to connect over SSL. By default, SSL is disabled but can be turned on as needed.
...
The Kafka instance certificate should be imported to the Java trust store file in:
/estar/tpe/dynamic/mc2/private/mc2kafka/client.truststore.jks
The MC2 client certificate and private key should be imported to the Java keystore file in:
/estar/tpe/dynamic/mc2/private/mc2kafka/client.keystore.jks
The MC2KAFKA stream in Message Center Console should be configured so that:
The Data Encryption parameter is set to SSL
The SSL Trust Store, Key Store and Key password should be set in the SSL TrustStore Password, SSL KeyStore Password & SSL Key Password parameters.
For example:
...
Authentication Using SASL
Simple Authentication and Security Layer (SASL) is a framework for authentication and data security in Internet protocols.
...
SASL may be used with PLAINTEXT or SSL as the transport layer using the security protocol SASL_PLAINTEXT or SASL_SSL respectively. If SASL_SSL is used, then SSL must also be configured.
Authentication using SASL/PLAIN
SASL/PLAIN is a simple username/password authentication mechanism that is typically used with TLS for encryption to implement secure authentication.
...