kafka-platform_bny_lake_draft_v3

Objective

BNY Lake data integration.

Data Requirements

For the first phase on Lake integration, we were given 10 different feed types. This represents entire day worth of data for 50 funds that has to be ingested into Vault on a daily basis. The data can come both intraday and end-of-day. The actual feed names and sizes are indicated below

Feed

Row Count

Data Size (kb)

Positions

10000

80,000

Transactions

10000

82,500

Tax lots

10000

42,500

Net asset values

550

4,500

Ledger balance

5000

23,500

Account dimensions

50

162

Account share class

450

462

Fund instruments

10000

75,000

Snapshot

50

30

Accounts

50

155



The data will come unevenly with the peak volume EOD where most of this data (95%, possibly 100% initially) will arrive.
In future the number of funds will increase and will reach 500, so, we should expect 10 times of the volume of data and we should be able to expand the Cluster easily to accommodate the growth.

It is also expected that future integrations would become more even with at least 20-30% of data arriving during the day.
The data in given in JSON format. The format is simple JSON object with only 2 nested levels. For all data feeds the schema is known.

Technical requirements

  • Reliability: no data loss

  • Data Retention: 5 days' worth of messages

  • High Availability: there is always available broker to process requests

  • Throughput: at the moment 5000 objects per second, which is necessary to process peak volumes in EOD. Accounting for future growth the rate would have to become 20000 objects a second to provide comparable SLAs (that assumes more even data arrival)

  • Roundtrip: not a concern for this data integration use case

  • Security: authentication and encryption SASL/SSL

  • Resource Allocation: due to dedicated Cluster and uneven data arrival resource utilization will have to expect peak volumes EOD

  • Disaster Recovery: To be defined with DevOps. Our recommendation is to have stretched Clusters, which have brokers in 2 different data centers and they are setup as a Single Cluster

  • Administration: the use case will have only one Clusters and only one topic (see explanation below)

Data/Metadata Flow Diagram


On this diagram on the data flows from BNY Lake system into DNA Kafka platform dedicated Cluster via DNA Produce library.
Metadata service will have API for schema registry. Provisioning service, which provides API for Maintaining Schema and Cluster and Topic creation. IWS is used for visual schema management. The schema is registered in Vault ontology and corresponding LDMs are created. EDS service consumes data and loads into snowflake tables that are generated based on schema and LDM.

Cluster Configuration

Based on requirements we suggest to configure one dedicated Cluster and one topic.

BNY Lake Cluster

In order to calculate the size of such Cluster we are going to use Data requirements presented above for Lake data.

  1. 5 GB per day

  2. 5 days of data retention

  3. 1 topic with 80 partitions

  4. 3 total replicas per partition with 2 in-sync replicas, to fulfill 'no data loss' and high availability requirements

  5. 80 brokers

Topic

Based on recommendation from architectural doc the name of the topic should be:
BNY.IN.PRD.LAKE.DEFAULT.DEFAULT
This topic can be created once when Cluster is available.

Design considerations

Based on Kafka architecture Producer is so-called "topic owner". It would be producer who will create batches and load balance records between topic partitions and will control commits and acks. It also controls compression. That is why it is critical for us own this part. Our business logic requires proper micro-batching. For example, if we allowed some foreign producer to send records of indicated above feeds, we can have batches that are absolutely of mixed feeds, which will be very slow to process. We need to create batches based on feed type as the key.

On the other hand, we do not want to create key-based partitions and restrict partition to a specific feed type as it would make partitions very unevenly populated. We should achieve micro-batches where we serialize multiple records of the same feed type into the same physical Kafka message while the schema during serialization process. Finally compress it to the configured size. This way we will not need to have key partitions and we will spread physical messages evenly between partitions.

Finally, we will be able to easily add partitions if we saw that volume has grown.
We also must achieve exactly-once delivery guarantee (this term actually means exactly-once processing). It is only possible by implementing two design principals:

  1. At-least once delivery, which Kafka provides

  2. Idempotent processing, which we must implement in our application. The way people implement it by taking care of duplicate records, which in turn only possible if both producer and consumer adhere to same (de)serialization technique. In our case we will rely on correlationId

Producing records to Kafka topic is much faster than consuming. We will implement so called "Pipelined Consumer". This design allows one to separate polling from actual processing and fully control committing offsets, which is critically important during rebalancing within the consumer group, which is also part of Kafka architecture. Such a consumer can also scale vertically by adding threads into processing pool, which can be very useful for peak volumes.

Development items

In this section we define items that have to be developed.

DNA Produce/Consume Library

DNA Producer

DNA producer needs to be implemented in java as pluggable jar module. The basis for the library could be current in-market Apache Camel based. However, it is possible to choose another open-source implementation. The items to be implemented:

  • Analysis of open-source Kafka java-based implementations

  • Schema registry

    • We would need to decide on whether to get schema from Metadata service or from Kafka topic (where it would be published by Metadata service ahead of time)

  • ProducerRecord serialization using registered schema. The format is Avro or JSON. Avro is preferred, however, JSON could faster to get off the ground

  • Unique CorrelationId assigned with payload to properly process duplicates. It can be part of headers of the physical Kafka message

  • Micro-batching. Kafka does support batching of messages; however, we will be able to batch them based on data rules for specific interfaces based on more applicable for those interfaces' rules

  • Producer Partitioner. At the moment is not critical item as we will process data using map/filter streaming pattern

  • Control commits with ack_level=all to assure reliability of data

  • Exceptions: retriable vs non-retriable. This can be one of the most important aspects to properly catch and process where possible and report non-retriable errors, which will be reported back to calling app right at the time of producing

  • Topic for both Authentication. We can skip Authorization for time being.

  • SASL/SSL for securing the in-flight data

DNA Consumer

EDS service will implement Kafka consume operations. Most of the code is already in EDS project, which includes Kafka consumer client and data rules to load data into snowflake tables in accordance to schema. The outstanding items are:

  • Implement idempotent processing by avoiding duplicates based on correlationId

  • Implement "Pipelined consumer"

  • Verify and possibly optimize performance of consume operations

  • Implement micro-batching for cases where necessary. It is not necessary if we use our DNA producer

  • Avro/JSON format

Schema Registry

The schemas will be managed via Schema Registry end-point. Schema can be discovered based on a sample data from producer or by producer supplying schema. Once schema from producer was either discovered or provided it would be integrated into Vault ontology and catalog. We will build LDM at the same time to allow Data Fabric consumption.
It is also possible to keep schema in one of the Kafka topics.

The producer/consumer library will be able to get access to the schema via schema registry. We will however try to have equivalent representation of the provided schema in Avro format/schema.