Schema Drift

Schema drift is detection of any schema changes in incoming data file. There are several schema drift modes that handle drifted data in specific ways.

API for schema drift is inflow API:

Dev - https://apps.dev.az.eagleinvsys.com:8443/api/vault/eds/api/doc/#/inflow-rest/submitV3InflowPOST

Stage - https://apps.stage.az.eagleinvsys.com:8443/api/vault/eds/api/doc/#/inflow-rest/submitV3InflowPOST

Prod - https://apps.prod.az.eagleinvsys.com:8443/api/vault/eds/api/doc/#/inflow-rest/submitV3InflowPOST

IGNORE Schema Drift Mode

This schema drift mode is set by default

Schema drift mode IGNORE will ignore any schema drift if it was found or not and will load data without drifted columns.

Example of request:

{ "resourcename": "legalentityid", "dbprovider": "snowflake", "schemadriftmode": "ignore" }

VALIDATE Schema Drift Mode

Schema drift mode VALIDATE is detecting any schema changes for resource on load.

Example of request:

{ "resourcename": "legalentityid", "dbprovider": "snowflake", "schemadriftmode": "validate" }

If drift was not found - data loads into database.

If drift was found - user receives response with drift columns.

Example of validate response when schema drift was found:

{ "status": 0, "proc_stats": null, "additional_data_type": "string", "processing_statuses": [ [ "Information", "Starting processing an EBS request", "eaglepy-i-1000", "", 0, "2022-07-07T11:22:16.136+00:00" ], [ "Error", "Schema validation failed", "eaglepy-e-1026", "", 0, "2022-07-07T11:22:16.143+00:00" ], [ "Information", "Executed on svc-eds-5579bc956-fzl48/PID=12/WORKER=/IMPL=CPython", "eaglepy-i-1000", "", 0, "2022-07-07T11:22:16.143+00:00" ] ], "drift_columns": { "_id": "legalentityid", "load": { "dataframe": { "_id": "legalentityid", "group_name": "Reference", "type": "dftFile", "description": "Lei details load", "maxrows": -1, "source_sink": { "sink_type": "ssFileType", "sink_params": {}, "source_descriptor_type": "Mmap", "allow_format_discovery": true, "source_format_dialect": { "lineterminator": "\n", "delimiter": ",", "headerat": 1, "headerlines": 1, "dialectname": "csv_char_strip", "encode_header": true, "force_vocab_stru": true } }, "format": "delimited", "vocabulary": { "newElementChar": { "path": "extension_newElementChar", "datatype": "string", "formatdialect": { "length": 16 } }, "newElementDate": { "path": "extension_newElementDate", "datatype": "date", "formatdialect": { "dialect": "YYYY-MM-DD" } }, "NewElementNum": { "path": "extension_NewElementNum", "datatype": "number", "formatdialect": { "precision": 38, "scale": 12 } } } }, "interface": { "_id": "legalentityid", "format": "eaglemlformat", "dataset": "legalentityid", "dataframe": "legalentityid" } } }, "worker_id": "svc-eds-5579bc956-fzl48/PID=12/WORKER=/IMPL=CPython" }

AUTO_APPROVE schema drift mode

Schema drift mode AUTO_APPROVE handles schema drift without any manual steps.

Loading a New Custom Resource with a Schema Drift

When schema drift detects new custom resource, these steps will be executed:

  • generate ontology for new resource

  • generate processing rule

  • execute ddl

  • publish data models

  • load data into new table

IGNORE Schema Drift Mode

This schema drift mode is set by default

Schema drift mode IGNORE will ignore any schema drift if it was found or not and will load data without drifted columns.

Example of request:

{ "resourcename": "legalentityid", "dbprovider": "snowflake", "schemadriftmode": "ignore" }

VALIDATE schema drift mode

Schema drift mode VALIDATE is detecting any schema changes for resource on load.

Example of request:

{ "resourcename": "legalentityid", "dbprovider": "snowflake", "schemadriftmode": "validate" }

If drift was not found - data loads into database.

If drift was found - user receives response with drift columns.

Example of validate response when schema drift was found:

{ "status": 0, "proc_stats": null, "additional_data_type": "string", "processing_statuses": [ [ "Information", "Starting processing an EBS request", "eaglepy-i-1000", "", 0, "2022-07-07T11:22:16.136+00:00" ], [ "Error", "Schema validation failed", "eaglepy-e-1026", "", 0, "2022-07-07T11:22:16.143+00:00" ], [ "Information", "Executed on svc-eds-5579bc956-fzl48/PID=12/WORKER=/IMPL=CPython", "eaglepy-i-1000", "", 0, "2022-07-07T11:22:16.143+00:00" ] ], "drift_columns": { "_id": "legalentityid", "load": { "dataframe": { "_id": "legalentityid", "group_name": "Reference", "type": "dftFile", "description": "Lei details load", "maxrows": -1, "source_sink": { "sink_type": "ssFileType", "sink_params": {}, "source_descriptor_type": "Mmap", "allow_format_discovery": true, "source_format_dialect": { "lineterminator": "\n", "delimiter": ",", "headerat": 1, "headerlines": 1, "dialectname": "csv_char_strip", "encode_header": true, "force_vocab_stru": true } }, "format": "delimited", "vocabulary": { "newElementChar": { "path": "extension_newElementChar", "datatype": "string", "formatdialect": { "length": 16 } }, "newElementDate": { "path": "extension_newElementDate", "datatype": "date", "formatdialect": { "dialect": "YYYY-MM-DD" } }, "NewElementNum": { "path": "extension_NewElementNum", "datatype": "number", "formatdialect": { "precision": 38, "scale": 12 } } } }, "interface": { "_id": "legalentityid", "format": "eaglemlformat", "dataset": "legalentityid", "dataframe": "legalentityid" } } }, "worker_id": "svc-eds-5579bc956-fzl48/PID=12/WORKER=/IMPL=CPython" }

AUTO_APPROVE schema drift mode

Schema drift mode AUTO_APPROVE handles schema drift without any manual steps.

Load New Custom Resource with Schema Drift

When schema drift detects new custom resource, these steps will be executed:

  • generate ontology for new resource

  • generate processing rule

  • execute ddl

  • publish data models

  • load data into new table

Example of request:

{ "resourcename": "priceMaster", "dbprovider": "snowflake", "schemadriftmode": "auto_approve" }

Example of request with specified vendor and feedsystem:

By default vendor is BNYM, feedsystem is EAGLE

{ "resourcename": "priceMaster", "dbprovider": "snowflake", "schemadriftmode": "auto_approve", "vendor": "VENDOR_NAME", "feedsystem": "FEEDSYSTEM_NAME" }

Example of request with schema drift tolerance:

This param is used on update of custom resource.

  • If drifted columns count is more than tolerance then Error will be added to processing result.

  • If drifted columns count is less or equal than tolerance - then schema drift process will be done.

{ "resourcename": "priceMaster", "dbprovider": "snowflake", "schemadriftmode": "auto_approve", "schemadrifttolerance": "2" }

Data file example:

Example of auto_approve response when schema drift was found for new resource:

{ "status": 0, "proc_stats": null, "extract_file_name": "/app/data/estar/tpe/data/mc2/temp/da1t1cedstest_priceMaster_resource_output.json", "additional_data_type": "string", "processing_statuses": [ [ "Information", "Starting processing an EBS request", "eaglepy-i-1000", "", 0, "2022-07-07T10:53:58.606+00:00" ], [ "Information", "Executed on svc-eds-5579bc956-fzl48/PID=17/WORKER=/IMPL=CPython", "eaglepy-i-1000", "", 0, "2022-07-07T10:54:19.684+00:00" ] ], "worker_id": "svc-eds-5579bc956-fzl48/PID=17/WORKER=/IMPL=CPython", "pipe": [ { "status": 2, "number_of_records": 0, "proc_stats": { "summary": { "number_of_records": 0 } }, "extract_file_name": "/app/data/estar/tpe/data/mc2/temp/da1t1cedstest_priceMaster_resource_output.json", "additional_data": "{\"priceMaster\": {\"resource_version\": \"1.0.1\"}}", "additional_data_type": "string", "mime_type": "application/json", "processing_statuses": [], "worker_id": "svc-eds-5579bc956-fzl48/PID=17/WORKER=/IMPL=CPython" }, { "status": 0, "number_of_records": 7, "proc_stats": null, "additional_data": "BEGIN;\nCREATE SCHEMA IF NOT EXISTS DA1T1CEDSTESTDBO;\nDROP TABLE IF EXISTS DA1T1CEDSTESTDBO.PRICEMASTER;\nDROP SEQUENCE IF EXISTS DA1T1CEDSTESTDBO.PRICEMASTER_SEQ;\nCREATE SEQUENCE IF NOT EXISTS DA1T1CEDSTESTDBO.PRICEMASTER_SEQ WITH START WITH = 1 INCREMENT BY = 1;\nCREATE TABLE IF NOT EXISTS DA1T1CEDSTESTDBO.PRICEMASTER (\nAASSKEVALUATIONCLEAN NUMBER(38,12),\nACCRUEDINTEREST NUMBER(38,12),\nADJUSTMENTFACTOR INTEGER,\nASKEVALUATIONCLEAN VARCHAR(100),\nASKEVALUATIONDIRTY NUMBER(38,12),\nASKEVALUATIONSPREAD VARCHAR(100),\nASKPRICE VARCHAR(100),\nASKYIELDCLEAN NUMBER(38,12),\nBENCHMARKIDENTIFIER VARCHAR(32),\nBENCHMARKNAME VARCHAR(152),\nBENCHMARKYIELD NUMBER(38,12),\nBIDEVALUATIONCLEAN VARCHAR(100),\nBIDEVALUATIONDIRTY NUMBER(38,12),\nBIDEVALUATIONSPREAD VARCHAR(100),\nBIDPRICE VARCHAR(100),\nBIDYIELDCLEAN NUMBER(38,12),\nCLOSECOMPPRICE VARCHAR(100),\nCONVEXITY NUMBER(38,12),\nCOUNTRYQUOTE VARCHAR(52),\nCOUNTRYQUOTECODE VARCHAR(8),\nCURRENCY VARCHAR(80),\nDURATION NUMBER(38,12),\nEVALUATIONTIMESTAMP DATETIME,\nEXCHANGE VARCHAR(148),\nEXCHANGECODE NUMBER(38,12),\nGPMASSETTYPE VARCHAR(100),\nHIGH VARCHAR(100),\nIDENTIFIER VARCHAR(48),\nISOCURRENCY VARCHAR(12),\nLOW VARCHAR(100),\nMACAULAYDURATION NUMBER(38,12),\nMEANEVALUATIONCLEAN VARCHAR(100),\nMEANEVALUATIONDIRTY NUMBER(38,12),\nMEANEVALUATIONSPREAD NUMBER(38,12),\nMEANYIELDCLEAN NUMBER(38,12),\nOPEN VARCHAR(100),\nPRICECURRENCY VARCHAR(12),\nPRICEDATE DATE,\nPRICEDERIVATION INTEGER,\nPRICEMASTER_INSTANCE INTEGER,\nPRICESOURCE VARCHAR(112),\nPRICETODATE DATE,\nPRIMEXCHTRADEPRICE VARCHAR(100),\nREDEMPTIONDATE DATE,\nREDEMPTIONPRICE INTEGER,\nVOLUME INTEGER,\nVWAP NUMBER(38,12),\nZCREATED_AT DATETIME,\nZINGESTED_AT DATETIME,\nZWHCREATED_AT DATETIME,\nZQS_PASSED_AT DATETIME,\nZCREATED_AT_END DATETIME,\nZUID VARCHAR(200),\nZROW_NUMBER INTEGER,\nZLINEAGE VARCHAR(300),\nZVARIANT VARCHAR(200),\nZTENANT_ID VARCHAR(50),\nZLINEAGE_HASH VARCHAR(300),\nZTRACKING_ID VARCHAR(200));\nCOMMIT;", "additional_data_type": "string", "mime_type": "text/plain", "processing_statuses": [], "worker_id": "svc-eds-5579bc956-fzl48/PID=17/WORKER=/IMPL=CPython" }, { "status": 0, "number_of_records": 3, "proc_stats": null, "additional_data_type": "string", "processing_statuses": [ [ "Information", "Executed DM with flag updateldm for priceMaster resource", "eaglepy-i-1000", "", 0, "2022-09-15T23:46:24.290-04:00" ], [ "Information", "Executed DM with flag updatevdm for priceMaster resource", "eaglepy-i-1000", "", 0, "2022-09-15T23:46:24.290-04:00" ], [ "Information", "Executed DM with flag updateeqlldm for priceMaster resource", "eaglepy-i-1000", "", 0, "2022-09-15T23:46:24.291-04:00" ] ], "worker_id": "IKUZIN7400/PID=65284/WORKER=/IMPL=CPython" }, { "status": 0, "number_of_records": 951, "proc_stats": { "partitions": [ { "partition_id": "1", "source": { "input_records": 951, "input_batches": 1 }, "number_of_records": 951, "number_of_batches": 1, "destinations": [ { "destination_id": "priceMaster_in", "number_of_records": 951, "number_of_batches": 1 } ] } ], "summary": { "source": { "input_records": 951, "input_batches": 1 }, "number_of_records": 951, "number_of_batches": 1, "destinations": [ { "destination_id": "priceMaster_in", "number_of_records": 951, "number_of_batches": 1 } ] }, "details": { "processes": [ { "process_id": "1", "timings": [ { "processing_stage": "sniff", "total_time": 0.41, "cpu_time": 0.42 }, { "processing_stage": "e", "total_time": 0, "cpu_time": 0 }, { "processing_stage": "g", "total_time": 0.43, "cpu_time": 0.44 }, { "processing_stage": "openconn", "total_time": 2.17, "cpu_time": 1.31 }, { "processing_stage": "topensinks", "total_time": 2.17, "cpu_time": 1.31 }, { "processing_stage": "tout", "total_time": 0.08, "cpu_time": 0.08 }, { "processing_stage": "tclosesinks", "total_time": 2.57, "cpu_time": 0.56 }, { "processing_stage": "t", "total_time": 5.27, "cpu_time": 2.41 } ], "target_sink": { "partitions": [ { "partition_id": 1, "async_writes": 4, "buffers": [ { "stage_name": "asyncwrite0", "number_of_buffered_records": 238, "number_of_commits": 1, "number_of_committed_records": 238, "timings": { "sf_cursor_execute": [ 1.53, 0.15, 1, 1.53 ], "dbexecutemany": [ 1.53, 0.15, 1, 1.53 ] } }, { "stage_name": "asyncwrite1", "number_of_buffered_records": 238, "number_of_commits": 1, "number_of_committed_records": 238, "timings": { "sf_cursor_execute": [ 1.8, 0.15, 1, 1.8 ], "dbexecutemany": [ 1.8, 0.15, 1, 1.8 ] } }, { "stage_name": "asyncwrite2", "number_of_buffered_records": 238, "number_of_commits": 1, "number_of_committed_records": 238, "timings": { "sf_cursor_execute": [ 1.44, 0.1, 1, 1.44 ], "dbexecutemany": [ 1.44, 0.1, 1, 1.44 ] } }, { "stage_name": "asyncwrite3", "number_of_buffered_records": 237, "number_of_commits": 1, "number_of_committed_records": 237, "timings": { "sf_cursor_execute": [ 1.73, 0.1, 1, 1.73 ], "dbexecutemany": [ 1.73, 0.1, 1, 1.73 ] } } ] } ] } } ] } }, "additional_data_type": "string", "processing_statuses": [], "worker_id": "svc-eds-5579bc956-fzl48/PID=17/WORKER=/IMPL=CPython" } ] }

Load Core EDS Resource with Schema Drift

When schema drift is detected in data for core resource, these steps will be executed:

  • generate extension for resource

  • generate processing rule

  • execute alter ddl

  • publish data models

  • load data into core and extension tables

Example of request:

{ "resourcename": "legalentityid", "dbprovider": "snowflake", "schemadriftmode": "auto_approve" }

Data file example:

Example of auto_approve response when schema drift was found for core resource:

{ "status": 0, "proc_stats": null, "extract_file_name": "/app/data/estar/tpe/data/mc2/temp/da1t1cedstest_legalentityid_resource_output.json", "additional_data_type": "string", "processing_statuses": [ [ "Information", "Starting processing an EBS request", "eaglepy-i-1000", "", 0, "2022-07-07T11:04:00.617+00:00" ], [ "Information", "Executed on svc-eds-5579bc956-fzl48/PID=13/WORKER=/IMPL=CPython", "eaglepy-i-1000", "", 0, "2022-07-07T11:04:18.327+00:00" ] ], "worker_id": "svc-eds-5579bc956-fzl48/PID=13/WORKER=/IMPL=CPython", "pipe": [ { "status": 2, "number_of_records": 0, "proc_stats": { "summary": { "number_of_records": 0 } }, "extract_file_name": "/app/data/estar/tpe/data/mc2/temp/da1t1cedstest_legalentityid_resource_output.json", "additional_data": "{\"legalentityid\": {\"resource_version\": \"2.0.19\"}}", "additional_data_type": "string", "mime_type": "application/json", "processing_statuses": [], "worker_id": "svc-eds-5579bc956-fzl48/PID=13/WORKER=/IMPL=CPython" }, { "status": 0, "number_of_records": 8, "proc_stats": null, "additional_data": "BEGIN;\nCREATE SCHEMA IF NOT EXISTS SECURITYDBO;\nCREATE SCHEMA IF NOT EXISTS DA1T1CEDSTESTDBO;\nCREATE SEQUENCE IF NOT EXISTS SECURITYDBO.LEI_DETAILS_SEQ WITH START WITH = 1 INCREMENT BY = 1;\nCREATE SEQUENCE IF NOT EXISTS DA1T1CEDSTESTDBO.LEGALENTITYID_EXT_SEQ WITH START WITH = 1 INCREMENT BY = 1;\nCREATE TABLE IF NOT EXISTS SECURITYDBO.LEI_DETAILS (\nCITY VARCHAR(510),\nCOUNTRY_CODE VARCHAR(60),\nENTITY_STATUS VARCHAR(60),\nID_ASSIGNED_DATE DATE,\nID_CERTIFICATION_STATE VARCHAR(60),\nID_DISABLED_DATE DATE,\nID_LAST_UPDATE_DATE DATE,\nID_RECORD_STATE VARCHAR(60),\nLEGAL_FORM VARCHAR(510),\nLEGAL_NAME VARCHAR(510),\nLEI VARCHAR(20),\nNEXT_RENEWAL_DATE DATE,\nSTATE VARCHAR(510),\nSTREET VARCHAR(510),\nULTIMATE_PARENT_COMPANY VARCHAR(510),\nUPDATE_DATE DATETIME,\nUPDATE_SOURCE VARCHAR(255),\nVALIDATION_SOURCES VARCHAR(510),\nZIP VARCHAR(510),\nZCREATED_AT DATETIME,\nZINGESTED_AT DATETIME,\nZWHCREATED_AT DATETIME,\nZQS_PASSED_AT DATETIME,\nZCREATED_AT_END DATETIME,\nZUID VARCHAR(200),\nZROW_NUMBER INTEGER,\nZLINEAGE VARCHAR(300),\nZVARIANT VARCHAR(200),\nZTENANT_ID VARCHAR(50),\nZLINEAGE_HASH VARCHAR(300),\nZTRACKING_ID VARCHAR(200));\nCREATE TABLE IF NOT EXISTS DA1T1CEDSTESTDBO.LEGALENTITYID_EXT (\nLEI VARCHAR(20),\nNEWELEMENTCHAR VARCHAR(16),\nNEWELEMENTDATE DATE,\nNEWELEMENTNUM NUMBER(38,12),\nZCREATED_AT DATETIME,\nZINGESTED_AT DATETIME,\nZWHCREATED_AT DATETIME,\nZQS_PASSED_AT DATETIME,\nZCREATED_AT_END DATETIME,\nZUID VARCHAR(200),\nZROW_NUMBER INTEGER,\nZLINEAGE VARCHAR(300),\nZVARIANT VARCHAR(200),\nZTENANT_ID VARCHAR(50),\nZLINEAGE_HASH VARCHAR(300),\nZTRACKING_ID VARCHAR(200));\nCOMMIT;", "additional_data_type": "string", "mime_type": "text/plain", "processing_statuses": [], "worker_id": "svc-eds-5579bc956-fzl48/PID=13/WORKER=/IMPL=CPython" }, { "status": 0, "number_of_records": 3, "proc_stats": null, "additional_data_type": "string", "processing_statuses": [ [ "Information", "Executed DM with flag updateldm for legalentityid resource", "eaglepy-i-1000", "", 0, "2022-09-15T23:46:24.290-04:00" ], [ "Information", "Executed DM with flag updatevdm for legalentityid resource", "eaglepy-i-1000", "", 0, "2022-09-15T23:46:24.290-04:00" ], [ "Information", "Executed DM with flag updateeqlldm for legalentityid resource", "eaglepy-i-1000", "", 0, "2022-09-15T23:46:24.291-04:00" ] ], "worker_id": "IKUZIN7400/PID=65284/WORKER=/IMPL=CPython" }, { "status": 0, "number_of_records": 1, "proc_stats": { "partitions": [ { "partition_id": "1", "source": { "input_records": 1, "input_batches": 1 }, "number_of_records": 1, "number_of_batches": 1, "destinations": [ { "destination_id": "legalentityid_in", "number_of_records": 1, "number_of_batches": 1 } ] } ], "summary": { "source": { "input_records": 1, "input_batches": 1 }, "number_of_records": 1, "number_of_batches": 1, "destinations": [ { "destination_id": "legalentityid_in", "number_of_records": 1, "number_of_batches": 1 } ] }, "details": { "processes": [ { "process_id": "1", "timings": [ { "processing_stage": "sniff", "total_time": 0, "cpu_time": 0 }, { "processing_stage": "e", "total_time": 0, "cpu_time": 0 }, { "processing_stage": "g", "total_time": 0, "cpu_time": 0 }, { "processing_stage": "openconn", "total_time": 0.76, "cpu_time": 0.48 }, { "processing_stage": "topensinks", "total_time": 0.76, "cpu_time": 0.48 }, { "processing_stage": "tout", "total_time": 0, "cpu_time": 0 }, { "processing_stage": "tclosesinks", "total_time": 2.94, "cpu_time": 0.16 }, { "processing_stage": "t", "total_time": 3.7, "cpu_time": 0.65 } ], "target_sink": { "partitions": [ { "partition_id": 1, "async_writes": 1, "buffers": [ { "stage_name": "asyncwrite0", "number_of_buffered_records": 1, "number_of_commits": 1, "number_of_committed_records": 1, "timings": { "sf_cursor_execute": [ 2.67, 0.06, 1, 2.67 ], "dbexecutemany": [ 2.67, 0.06, 1, 2.67 ] } } ] } ] } } ] } }, "additional_data_type": "string", "processing_statuses": [], "worker_id": "svc-eds-5579bc956-fzl48/PID=13/WORKER=/IMPL=CPython" } ] }