kafka-platform_bny_lake_draft_v3
Objective
BNY Lake data integration.
Data Requirements
For the first phase on Lake integration, we were given 10 different feed types. This represents entire day worth of data for 50 funds that has to be ingested into Vault on a daily basis. The data can come both intraday and end-of-day. The actual feed names and sizes are indicated below
Feed | Row Count | Data Size (kb) |
Positions | 10000 | 80,000 |
Transactions | 10000 | 82,500 |
Tax lots | 10000 | 42,500 |
Net asset values | 550 | 4,500 |
Ledger balance | 5000 | 23,500 |
Account dimensions | 50 | 162 |
Account share class | 450 | 462 |
Fund instruments | 10000 | 75,000 |
Snapshot | 50 | 30 |
Accounts | 50 | 155 |
On this page
The data will come unevenly with the peak volume EOD where most of this data (95%, possibly 100% initially) will arrive.
In future the number of funds will increase and will reach 500, so, we should expect 10 times of the volume of data and we should be able to expand the Cluster easily to accommodate the growth.
It is also expected that future integrations would become more even with at least 20-30% of data arriving during the day.
The data in given in JSON format. The format is simple JSON object with only 2 nested levels. For all data feeds the schema is known.
Technical requirements
Reliability: no data loss
Data Retention: 5 days' worth of messages
High Availability: there is always available broker to process requests
Throughput: at the moment 5000 objects per second, which is necessary to process peak volumes in EOD. Accounting for future growth the rate would have to become 20000 objects a second to provide comparable SLAs (that assumes more even data arrival)
Roundtrip: not a concern for this data integration use case
Security: authentication and encryption SASL/SSL
Resource Allocation: due to dedicated Cluster and uneven data arrival resource utilization will have to expect peak volumes EOD
Disaster Recovery: To be defined with DevOps. Our recommendation is to have stretched Clusters, which have brokers in 2 different data centers and they are setup as a Single Cluster
Administration: the use case will have only one Clusters and only one topic (see explanation below)
Data/Metadata Flow Diagram
On this diagram on the data flows from BNY Lake system into DNA Kafka platform dedicated Cluster via DNA Produce library.
Metadata service will have API for schema registry. Provisioning service, which provides API for Maintaining Schema and Cluster and Topic creation. IWS is used for visual schema management. The schema is registered in Vault ontology and corresponding LDMs are created. EDS service consumes data and loads into snowflake tables that are generated based on schema and LDM.
Cluster Configuration
Based on requirements we suggest to configure one dedicated Cluster and one topic.
BNY Lake Cluster
In order to calculate the size of such Cluster we are going to use Data requirements presented above for Lake data.
5 GB per day
5 days of data retention
1 topic with 80 partitions
3 total replicas per partition with 2 in-sync replicas, to fulfill 'no data loss' and high availability requirements
80 brokers
Topic
Based on recommendation from architectural doc the name of the topic should be:
BNY.IN.PRD.LAKE.DEFAULT.DEFAULT
This topic can be created once when Cluster is available.
Design considerations
Based on Kafka architecture Producer is so-called "topic owner". It would be producer who will create batches and load balance records between topic partitions and will control commits and acks. It also controls compression. That is why it is critical for us own this part. Our business logic requires proper micro-batching. For example, if we allowed some foreign producer to send records of indicated above feeds, we can have batches that are absolutely of mixed feeds, which will be very slow to process. We need to create batches based on feed type as the key.
On the other hand, we do not want to create key-based partitions and restrict partition to a specific feed type as it would make partitions very unevenly populated. We should achieve micro-batches where we serialize multiple records of the same feed type into the same physical Kafka message while the schema during serialization process. Finally compress it to the configured size. This way we will not need to have key partitions and we will spread physical messages evenly between partitions.
Finally, we will be able to easily add partitions if we saw that volume has grown.
We also must achieve exactly-once delivery guarantee (this term actually means exactly-once processing). It is only possible by implementing two design principals:
At-least once delivery, which Kafka provides
Idempotent processing, which we must implement in our application. The way people implement it by taking care of duplicate records, which in turn only possible if both producer and consumer adhere to same (de)serialization technique. In our case we will rely on correlationId
Producing records to Kafka topic is much faster than consuming. We will implement so called "Pipelined Consumer". This design allows one to separate polling from actual processing and fully control committing offsets, which is critically important during rebalancing within the consumer group, which is also part of Kafka architecture. Such a consumer can also scale vertically by adding threads into processing pool, which can be very useful for peak volumes.
Development items
In this section we define items that have to be developed.
DNA Produce/Consume Library
DNA Producer
DNA producer needs to be implemented in java as pluggable jar module. The basis for the library could be current in-market Apache Camel based. However, it is possible to choose another open-source implementation. The items to be implemented:
Analysis of open-source Kafka java-based implementations
Schema registry
We would need to decide on whether to get schema from Metadata service or from Kafka topic (where it would be published by Metadata service ahead of time)
ProducerRecord serialization using registered schema. The format is Avro or JSON. Avro is preferred, however, JSON could faster to get off the ground
Unique CorrelationId assigned with payload to properly process duplicates. It can be part of headers of the physical Kafka message
Micro-batching. Kafka does support batching of messages; however, we will be able to batch them based on data rules for specific interfaces based on more applicable for those interfaces' rules
Producer Partitioner. At the moment is not critical item as we will process data using map/filter streaming pattern
Control commits with ack_level=all to assure reliability of data
Exceptions: retriable vs non-retriable. This can be one of the most important aspects to properly catch and process where possible and report non-retriable errors, which will be reported back to calling app right at the time of producing
Topic for both Authentication. We can skip Authorization for time being.
SASL/SSL for securing the in-flight data
DNA Consumer
EDS service will implement Kafka consume operations. Most of the code is already in EDS project, which includes Kafka consumer client and data rules to load data into snowflake tables in accordance to schema. The outstanding items are:
Implement idempotent processing by avoiding duplicates based on correlationId
Implement "Pipelined consumer"
Verify and possibly optimize performance of consume operations
Implement micro-batching for cases where necessary. It is not necessary if we use our DNA producer
Avro/JSON format
Schema Registry
The schemas will be managed via Schema Registry end-point. Schema can be discovered based on a sample data from producer or by producer supplying schema. Once schema from producer was either discovered or provided it would be integrated into Vault ontology and catalog. We will build LDM at the same time to allow Data Fabric consumption.
It is also possible to keep schema in one of the Kafka topics.
The producer/consumer library will be able to get access to the schema via schema registry. We will however try to have equivalent representation of the provided schema in Avro format/schema.