kafka-platform_ihub_draft_v2
Objective
BNY iHUB data integration.
Data Requirements
For the first phase on iHUB integration we were given 10 different feed types. This represents entire day worth of data for 50 funds that has to be ingested into Vault on a daily basis. The data can come both intraday and end-of-day. The actual feed names and sizes are indicated below
On this page
Feed | Row Count | Data Size (kb) |
Positions | 10000 | 80,000 |
Transactions | 10000 | 82,500 |
Tax lots | 10000 | 42,500 |
Net asset values | 4,500 | |
Ledger balance | 5000 | 23,500 |
Account dimensions | 162 | |
Account share class | 462 | |
Fund instruments | 75,000 | |
Snapshot | 30 | |
Accounts | 155 |
The data will come unevenly with the peak volume EOD where most of this data (95%, possibly 100% initially) will arrive.
In future the number of funds will increase and will reach 500, so, we should expect 10 times of the volume of data and we should be able to expand the Cluster easily to accommodate the growth.
It is also expected that future integrations would become more even with at least 20-30% of data arriving during the day.
The data in given in JSON format. The format is simple JSON object with only 2 nested levels. For all data feeds the schema is known.
Technical Requirements
Reliability: no data loss
Data Retention: 5 days' worth of messages
High Availability: there is always available broker to process requests
Throughput: at the moment 5000 objects per second, which is necessary to process peak volumes in EOD. Accounting for future growth the rate would have to become 20000 objects a second to provide comparable SLAs (that assumes more even data arrival)
Roundtrip: not a concern for this data integration use case
Security: authentication and encryption SASL/SSL
Resource Allocation: due to dedicated Cluster and uneven data arrival resource utilization will have to expect peak volumes EOD
Disaster Recovery: To be defined with DevOps. Our recommendation is to have stretched Clusters, which have brokers in 2 different data centers and they are setup as a Single Cluster
Administration: the use case will have only one Clusters and only one topic (see explanation below)
Data/Metadata Flow Diagram
On this diagram on the data flows from BNY iHUB system into DNA Kafka platform dedicated Cluster via DNA Produce library.
Metadata service will have API for schema registry. Provisioning service, which provides API for Maintaining Schema and Cluster and Topic creation. IWS is used for visual schema management. The schema is registered in Vault ontology and corresponding LDMs are created. EDS service consumes data and loads into snowflake tables that are generated based on schema and LDM.
Cluster Configuration
Based on requirements we suggest to configure one dedicated Cluster and one topic.
BNY iHUB Cluster
In order to calculate the size of such Cluster we are going to use Data requirements presented above for iHUB data.
5 GB per day
5 days of data retention
1 topic with 80 partitions
3 total replicas per partition with 2 in-sync replicas, to fulfill 'no data loss' and high availability requirements
160 brokers
Topic
Based on recommendation from architectural doc the name of the topic should be:
BNY.IN.PRD.IHUB.DEFAULT.DEFAULT
This topic can be created once when Cluster is available.
Design considerations
Based on Kafka architecture Producer is so-called "topic owner". It would be producer who will create batches and load balance records between topic partitions and will control commits and acks. It also controls compression. That is why it is critical for us own this part. Our business logic requires proper micro-batching. For example, if we allowed some foreign producer to send records of indicated above feeds, we can have batches that are absolutely of mixed feeds, which will be very slow to process. We need to create batches based on feed type as the key
On the other hand, we do not want to create key-based partitions and restrict partition to a specific feed type as it would make partitions very unevenly populated. We should achieve micro-batches where we serialize multiple records of the same feed type into the same physical Kafka message while the schema during serialization process. Finally compress it to the configured size. This way we will not need to have key partitions and we will spread physical messages evenly between partitions.
Finally, we will be able to easily add partitions if we saw that volume has grown. We also must achieve exactly-once delivery guarantee (this term actually means exactly-once processing). It is only possible by implementing two design principals:
At-least once delivery, which Kafka provides
Idempotent processing, which we must implement in our application. The way people implement it by taking care of duplicate records, which in turn only possible if both producer and consumer adhere to same (de)serialization technique. In our case we will rely on correlationId
Producing records to Kafka topic is much faster than consuming. We will implement so called "Pipelined Consumer". This design allows one to separate polling from actual processing and fully control committing offsets, which is critically important during rebalancing within the consumer group, which is also part of Kafka architecture. Such a consumer can also scale vertically by adding threads into processing pool, which can be very useful for peak volumes.
Possible implementations
There are 3 possible implementations
DNA Producer library is the implementation where we provide to the BNY (or any other client for that matter) our dna_client.jar library. This library is the wrapper over Kafka producer classes and so it has the exact same API as Kafka producer. Internally, we implement all the requirements listed below, and most important are (which we consider advantages for overall integration):
business level micro-batching with compression
schema checking during produce operation, the actual schema will always correspond to Vault's LDM (ontology), thus eliminating any need for extra transformations
custom partitioning, which in turn would allow us to scale topic by adding partitions if necessary
encryption using TLS (SSL)
with these features being under our control on producing side we will create the most optimal producer->consumer integration, which in turn would allow the fastest SLA and most simple for maintenance and configuration and monitoring. The only possible disadvantage here is BNY will have to deploy dna_producer.jar
Mirroring is when a third-party tool such as Confluent Replicator is connecting to both BNY Kafka and DNA Kafka Clusters. This is a viable option, however, below are pros and cons:
The advantages:
Ready to use tool to just run and start sending data across
No need to change BNY app
The disadvantages:
we need to understand who is going to manage and maintain this infrastructure. Configuration might not be trivial as might require filtering of messages
The need to have some transformation to convert into Vault schema (LDM)
We will have to rebuild micro-batches as we will have to group them based on appropriate business logic
The error messages are harder to process as we will need to pipe them somewhere, probably dedicated error topic in BNY Kafka Cluster
Direct consumption from BNY Kafka is when we connect to BNY Kafka as consumer. We have done this type of integration with other clients and it is a viable option. Connection can be done via VPN channel. The advantages here are:
no need for DNA Kafka Cluster at this time (even though eventually we will have it anyway)
the disadvantages are:
Possible need to implement filtering of records as at the moment we need only 50 funds
The need to have some transformation to convert into Vault schema (LDM)
We will have to rebuild micro-batches as we will have to group them based on appropriate business logic
The error messages are harder to process as we will need to pipe them somewhere, probably dedicated error topic in BNY Kafka Cluster
Scalability is hard to control as BNY will have to manage number of partitions and topics
We do not consider at this point Kafka Connectors as this is separate piece of infrastructure, which has to be maintained and most importantly it would go directly to snowflake and thus bypassing our current ingestion pipe-line and data rules, which would create a need for completely different monitoring consoles and the necessity to synchronize schema between connector and Vault. We believe that option number 1 using DNA Producer library is the best and, so, below is the further discussion how to implement it.Development items
In this section we define items that have to be developed.
DNA Produce/Consume Library
DNA Producer
DNA producer needs to be implemented in java as pluggable jar module. The basis for the library could be current in-market Apache Camel based. However, it is possible to choose another open-source implementation. The items to be implemented:
Analysis of open-source Kafka java-based implementations
Schema registry
We would need to decide on whether to get schema from Metadata service or from Kafka topic (where it would be published by Metadata service ahead of time)
ProducerRecord serialization using registered schema. The format is Avro or JSON. Avro is preferred, however, JSON could faster to get of the ground.
Unique CorrelationId assigned with payload to properly process duplicates. It can be part of headers of the physical Kafka message.
Micro-batching. Kafka does support batching of messages; however, we will be able to batch them based on data rules for specific interfaces based on more applicable for those interfaces' rules.
Producer Partitioner. At the moment is not critical item as we will process data using map/filter streaming pattern.
Control commits with ack_level=all to assure reliability of data.
Exceptions: retriable vs non-retriable. This can be one of the most important aspects to properly catch and process where possible and report non-retriable errors, which will be reported back to calling app right at the time of producing.
Topic for both Authentication. We can skip Authorization for time being.
SASL/SSL for securing the in-flight data.
DNA Consumer
EDS service will implement Kafka consume operations. Most of the code is already in EDS project, which includes Kafka consumer client and data rules to load data into snowflake tables in accordance to schema. The outstanding items are:
Implement idempotent processing by avoiding duplicates based on correlationId
Implement "Pipelined consumer"
Verify and possibly optimize performance of consume operations
Implement micro-batching for cases where necessary. It is not necessary if we use our DNA producer
Avro/JSON format
Schema Registry
The schemas will be managed via Schema Registry end-point. Schema can be discovered based on a sample data from producer or by producer supplying schema. Once schema from producer was either discovered or provided it would be integrated into Vault ontology and catalog. We will build LDM at the same time to allow Data Fabric consumption. It is also possible to keep schema in one of the Kafka topic.
The producer/consumer library will be able to get access to the schema via schema registry. We will however try to have equivalent representation of the provided schema in Avro format/schema.