Onboarding Data Rules Kafka Consumer Service Tenant

To onboard a new tenant for Data Rules Kafka Consumption the following steps should be followed:

  1. Create a COPS ticket to create the new topics for tenant:

dna.kafka.client.tenant.prod (80 partitions, 1-week retention)

dna.kafka.client.tenant.prod.schema (1 partition, 1-year retention)

dna.kafka.client.tenant.prod.processed (1 partition, 1-week retention)

Replace tenant above with the name of the tenant.

Request to add CONFLUENT_API_KEY, CONFLUENT_API_SECRET, CONFLUENT_API_KEY_WEST, CONFLUENT_API_SECRET_WEST to vault secrets secrets-tenant-tenant in both eastus2 and westus2 Azure zones for service sd-svc-dna-kafka-client. The prod part of the topic name designates the tenant environment. Can also be dev or stage.

The producer’s secrets should be shared with the producer.

Request the consumer group (client id) dna-kafka-consumer-tenant (replace tenant with the tenant id) to have access to write and read from all tenant topics.

2. Onboard the tenant to Data Rules. See Data Rules Implementation Process

3. If data needs to be loaded to existing tables, please follow https://eagleinvsys.atlassian.net/wiki/spaces/SOLUTIONSDEL/pages/3620963123/Load+to+existing+SnowFlake+DB+tables

4. If the producer will use producer side schema validation, publish the schema to dna.kafka.client.tenant.prod.schema

5. Create a COPS ticket to use the DNA Kafka Client swagger UI to create the Kafka Consumer configuration as:

{ "topic": "dna.kafka.client.TENANT.prod", "maxCount": 100, "envConfig": { "dbprovider": "snowflake", "feed_system": "dhl", "feed_vendor": "bnym", "forcetenantid": "TENANT", "useprocessinglogs": "N", "EAGLE_PYSERVICE_DB_VENDOR": "snowflake" }, "pollTimeout": 100, "serviceName": "pipelinedConsumer", "correlationId": "setupconsumer", "consumerConfig": { "group.id": "dna-kafka-consumer-TENANT", "client.id": "dna-kafka-consumer-TENANT", "max_poll_records": 5, "max_poll_interval_ms": 1800000 }, "producerConfig": { "client.id": "dna-kafka-consumer-TENANT" }, "resourceConfig": {}, "connectionConfig": { "failover": [ { "eastus": { "password": "secrets:CONFLUENT_API_SECRET", "username": "secrets:CONFLUENT_API_KEY", "bootstrap.servers": "BOOTSTRAP SERVER FOR CONFLUENT EAST CLUSTER" } }, { "westus": { "password": "secrets:CONFLUENT_API_SECRET_WEST", "username": "secrets:CONFLUENT_API_KEY_WEST", "bootstrap.servers": "BOOTSTRAP SERVER FOR CONFLUENT WEST CLUSTER" } } ], "useSslSasl": true, "sasl.mechanism": "PLAIN", "security.protocol": "SASL_SSL" } }

In the above configuration replace TENANT with the tenant id. Replace BOOTSTRAP SERVER FOR CONFLUENT WEST CLUSTER and BOOTSTRAP SERVER FOR CONFLUENT EAST CLUSTER with the corresponding bootstrap server and port provided by DEVOPS.

The schemadrift parameter can be added to envConfig section if needed. Please check Schema Drift