kafka-platform_ihub_draft_v2

Objective

BNY iHUB data integration.

Data Requirements

For the first phase on iHUB integration we were given 10 different feed types. This represents entire day worth of data for 50 funds that has to be ingested into Vault on a daily basis. The data can come both intraday and end-of-day. The actual feed names and sizes are indicated below



Feed

Row Count

Data Size (kb)

Positions

10000

80,000

Transactions

10000

82,500

Tax lots

10000

42,500

Net asset values



4,500

Ledger balance

5000

23,500

Account dimensions



162

Account share class



462

Fund instruments



75,000

Snapshot



30

Accounts



155

The data will come unevenly with the peak volume EOD where most of this data (95%, possibly 100% initially) will arrive.
In future the number of funds will increase and will reach 500, so, we should expect 10 times of the volume of data and we should be able to expand the Cluster easily to accommodate the growth.

It is also expected that future integrations would become more even with at least 20-30% of data arriving during the day.
The data in given in JSON format. The format is simple JSON object with only 2 nested levels. For all data feeds the schema is known.

Technical Requirements

  • Reliability: no data loss

  • Data Retention: 5 days' worth of messages

  • High Availability: there is always available broker to process requests

  • Throughput: at the moment 5000 objects per second, which is necessary to process peak volumes in EOD. Accounting for future growth the rate would have to become 20000 objects a second to provide comparable SLAs (that assumes more even data arrival)

  • Roundtrip: not a concern for this data integration use case

  • Security: authentication and encryption SASL/SSL

  • Resource Allocation: due to dedicated Cluster and uneven data arrival resource utilization will have to expect peak volumes EOD

  • Disaster Recovery: To be defined with DevOps. Our recommendation is to have stretched Clusters, which have brokers in 2 different data centers and they are setup as a Single Cluster

  • Administration: the use case will have only one Clusters and only one topic (see explanation below)

Data/Metadata Flow Diagram


On this diagram on the data flows from BNY iHUB system into DNA Kafka platform dedicated Cluster via DNA Produce library.
Metadata service will have API for schema registry. Provisioning service, which provides API for Maintaining Schema and Cluster and Topic creation. IWS is used for visual schema management. The schema is registered in Vault ontology and corresponding LDMs are created. EDS service consumes data and loads into snowflake tables that are generated based on schema and LDM.

Cluster Configuration

Based on requirements we suggest to configure one dedicated Cluster and one topic.

BNY iHUB Cluster

In order to calculate the size of such Cluster we are going to use Data requirements presented above for iHUB data.

  1. 5 GB per day

  2. 5 days of data retention

  3. 1 topic with 80 partitions

  4. 3 total replicas per partition with 2 in-sync replicas, to fulfill 'no data loss' and high availability requirements

  5. 160 brokers

Topic

Based on recommendation from architectural doc the name of the topic should be:
BNY.IN.PRD.IHUB.DEFAULT.DEFAULT
This topic can be created once when Cluster is available.

Design considerations

Based on Kafka architecture Producer is so-called "topic owner". It would be producer who will create batches and load balance records between topic partitions and will control commits and acks. It also controls compression. That is why it is critical for us own this part. Our business logic requires proper micro-batching. For example, if we allowed some foreign producer to send records of indicated above feeds, we can have batches that are absolutely of mixed feeds, which will be very slow to process. We need to create batches based on feed type as the key

On the other hand, we do not want to create key-based partitions and restrict partition to a specific feed type as it would make partitions very unevenly populated. We should achieve micro-batches where we serialize multiple records of the same feed type into the same physical Kafka message while the schema during serialization process. Finally compress it to the configured size. This way we will not need to have key partitions and we will spread physical messages evenly between partitions.

Finally, we will be able to easily add partitions if we saw that volume has grown. We also must achieve exactly-once delivery guarantee (this term actually means exactly-once processing). It is only possible by implementing two design principals:

  1. At-least once delivery, which Kafka provides

  2. Idempotent processing, which we must implement in our application. The way people implement it by taking care of duplicate records, which in turn only possible if both producer and consumer adhere to same (de)serialization technique. In our case we will rely on correlationId

Producing records to Kafka topic is much faster than consuming. We will implement so called "Pipelined Consumer". This design allows one to separate polling from actual processing and fully control committing offsets, which is critically important during rebalancing within the consumer group, which is also part of Kafka architecture. Such a consumer can also scale vertically by adding threads into processing pool, which can be very useful for peak volumes.

Possible implementations

There are 3 possible implementations
DNA Producer library is the implementation where we provide to the BNY (or any other client for that matter) our dna_client.jar library. This library is the wrapper over Kafka producer classes and so it has the exact same API as Kafka producer. Internally, we implement all the requirements listed below, and most important are (which we consider advantages for overall integration):

  • business level micro-batching with compression

  • schema checking during produce operation, the actual schema will always correspond to Vault's LDM (ontology), thus eliminating any need for extra transformations

  • custom partitioning, which in turn would allow us to scale topic by adding partitions if necessary

  • encryption using TLS (SSL)

with these features being under our control on producing side we will create the most optimal producer->consumer integration, which in turn would allow the fastest SLA and most simple for maintenance and configuration and monitoring. The only possible disadvantage here is BNY will have to deploy dna_producer.jar
Mirroring is when a third-party tool such as Confluent Replicator is connecting to both BNY Kafka and DNA Kafka Clusters. This is a viable option, however, below are pros and cons:
The advantages:

  • Ready to use tool to just run and start sending data across

  • No need to change BNY app

The disadvantages:

  • we need to understand who is going to manage and maintain this infrastructure. Configuration might not be trivial as might require filtering of messages

  • The need to have some transformation to convert into Vault schema (LDM)

  • We will have to rebuild micro-batches as we will have to group them based on appropriate business logic

  • The error messages are harder to process as we will need to pipe them somewhere, probably dedicated error topic in BNY Kafka Cluster


Direct consumption from BNY Kafka is when we connect to BNY Kafka as consumer. We have done this type of integration with other clients and it is a viable option. Connection can be done via VPN channel. The advantages here are:

  • no need for DNA Kafka Cluster at this time (even though eventually we will have it anyway)

the disadvantages are:

  • Possible need to implement filtering of records as at the moment we need only 50 funds

  • The need to have some transformation to convert into Vault schema (LDM)

  • We will have to rebuild micro-batches as we will have to group them based on appropriate business logic

  • The error messages are harder to process as we will need to pipe them somewhere, probably dedicated error topic in BNY Kafka Cluster

  • Scalability is hard to control as BNY will have to manage number of partitions and topics

We do not consider at this point Kafka Connectors as this is separate piece of infrastructure, which has to be maintained and most importantly it would go directly to snowflake and thus bypassing our current ingestion pipe-line and data rules, which would create a need for completely different monitoring consoles and the necessity to synchronize schema between connector and Vault. We believe that option number 1 using DNA Producer library is the best and, so, below is the further discussion how to implement it.Development items

In this section we define items that have to be developed.

DNA Produce/Consume Library

DNA Producer

DNA producer needs to be implemented in java as pluggable jar module. The basis for the library could be current in-market Apache Camel based. However, it is possible to choose another open-source implementation. The items to be implemented:

  • Analysis of open-source Kafka java-based implementations

  • Schema registry

    • We would need to decide on whether to get schema from Metadata service or from Kafka topic (where it would be published by Metadata service ahead of time)

  • ProducerRecord serialization using registered schema. The format is Avro or JSON. Avro is preferred, however, JSON could faster to get of the ground.

  • Unique CorrelationId assigned with payload to properly process duplicates. It can be part of headers of the physical Kafka message.

  • Micro-batching. Kafka does support batching of messages; however, we will be able to batch them based on data rules for specific interfaces based on more applicable for those interfaces' rules.

  • Producer Partitioner. At the moment is not critical item as we will process data using map/filter streaming pattern.

  • Control commits with ack_level=all to assure reliability of data.

  • Exceptions: retriable vs non-retriable. This can be one of the most important aspects to properly catch and process where possible and report non-retriable errors, which will be reported back to calling app right at the time of producing.

  • Topic for both Authentication. We can skip Authorization for time being.

  • SASL/SSL for securing the in-flight data.

DNA Consumer

EDS service will implement Kafka consume operations. Most of the code is already in EDS project, which includes Kafka consumer client and data rules to load data into snowflake tables in accordance to schema. The outstanding items are:

  • Implement idempotent processing by avoiding duplicates based on correlationId

  • Implement "Pipelined consumer"

  • Verify and possibly optimize performance of consume operations

  • Implement micro-batching for cases where necessary. It is not necessary if we use our DNA producer

  • Avro/JSON format

Schema Registry

The schemas will be managed via Schema Registry end-point. Schema can be discovered based on a sample data from producer or by producer supplying schema. Once schema from producer was either discovered or provided it would be integrated into Vault ontology and catalog. We will build LDM at the same time to allow Data Fabric consumption. It is also possible to keep schema in one of the Kafka topic.

The producer/consumer library will be able to get access to the schema via schema registry. We will however try to have equivalent representation of the provided schema in Avro format/schema.