Schema drift is detection of any schema changes in incoming data file. There are several schema drift modes that handle drifted data in specific ways.
API for schema drift is inflow API:
Dev - https://apps.dev.az.eagleinvsys.com:8443/api/vault/eds/api/doc/#/inflow-rest/submitV3InflowPOST
Stage - https://apps.stage.az.eagleinvsys.com:8443/api/vault/eds/api/doc/#/inflow-rest/submitV3InflowPOST
Prod - https://apps.prod.az.eagleinvsys.com:8443/api/vault/eds/api/doc/#/inflow-rest/submitV3InflowPOST
IGNORE Schema Drift Mode
This schema drift mode is set by default
Schema drift mode IGNORE will ignore any schema drift if it was found or not and will load data without drifted columns.
Example of request:
{
"resourcename": "legalentityid",
"dbprovider": "snowflake",
"schemadriftmode": "ignore"
}
VALIDATE Schema Drift Mode
Schema drift mode VALIDATE is detecting any schema changes for resource on load.
Example of request:
{
"resourcename": "legalentityid",
"dbprovider": "snowflake",
"schemadriftmode": "validate"
}
If drift was not found - data loads into database.
If drift was found - user receives response with drift columns.
Example of validate response when schema drift was found:
{
"status": 0,
"proc_stats": null,
"additional_data_type": "string",
"processing_statuses": [
[
"Information",
"Starting processing an EBS request",
"eaglepy-i-1000",
"",
0,
"2022-07-07T11:22:16.136+00:00"
],
[
"Error",
"Schema validation failed",
"eaglepy-e-1026",
"",
0,
"2022-07-07T11:22:16.143+00:00"
],
[
"Information",
"Executed on svc-eds-5579bc956-fzl48/PID=12/WORKER=/IMPL=CPython",
"eaglepy-i-1000",
"",
0,
"2022-07-07T11:22:16.143+00:00"
]
],
"drift_columns": {
"_id": "legalentityid",
"load": {
"dataframe": {
"_id": "legalentityid",
"group_name": "Reference",
"type": "dftFile",
"description": "Lei details load",
"maxrows": -1,
"source_sink": {
"sink_type": "ssFileType",
"sink_params": {},
"source_descriptor_type": "Mmap",
"allow_format_discovery": true,
"source_format_dialect": {
"lineterminator": "\n",
"delimiter": ",",
"headerat": 1,
"headerlines": 1,
"dialectname": "csv_char_strip",
"encode_header": true,
"force_vocab_stru": true
}
},
"format": "delimited",
"vocabulary": {
"newElementChar": {
"path": "extension_newElementChar",
"datatype": "string",
"formatdialect": {
"length": 16
}
},
"newElementDate": {
"path": "extension_newElementDate",
"datatype": "date",
"formatdialect": {
"dialect": "YYYY-MM-DD"
}
},
"NewElementNum": {
"path": "extension_NewElementNum",
"datatype": "number",
"formatdialect": {
"precision": 38,
"scale": 12
}
}
}
},
"interface": {
"_id": "legalentityid",
"format": "eaglemlformat",
"dataset": "legalentityid",
"dataframe": "legalentityid"
}
}
},
"worker_id": "svc-eds-5579bc956-fzl48/PID=12/WORKER=/IMPL=CPython"
}
AUTO_APPROVE schema drift mode
Schema drift mode AUTO_APPROVE handles schema drift without any manual steps.
Loading a New Custom Resource with a Schema Drift
When schema drift detects new custom resource, these steps will be executed:
IGNORE Schema Drift Mode
This schema drift mode is set by default
Schema drift mode IGNORE will ignore any schema drift if it was found or not and will load data without drifted columns.
Example of request:
{
"resourcename": "legalentityid",
"dbprovider": "snowflake",
"schemadriftmode": "ignore"
}
VALIDATE schema drift mode
Schema drift mode VALIDATE is detecting any schema changes for resource on load.
Example of request:
{
"resourcename": "legalentityid",
"dbprovider": "snowflake",
"schemadriftmode": "validate"
}
If drift was not found - data loads into database.
If drift was found - user receives response with drift columns.
Example of validate response when schema drift was found:
{
"status": 0,
"proc_stats": null,
"additional_data_type": "string",
"processing_statuses": [
[
"Information",
"Starting processing an EBS request",
"eaglepy-i-1000",
"",
0,
"2022-07-07T11:22:16.136+00:00"
],
[
"Error",
"Schema validation failed",
"eaglepy-e-1026",
"",
0,
"2022-07-07T11:22:16.143+00:00"
],
[
"Information",
"Executed on svc-eds-5579bc956-fzl48/PID=12/WORKER=/IMPL=CPython",
"eaglepy-i-1000",
"",
0,
"2022-07-07T11:22:16.143+00:00"
]
],
"drift_columns": {
"_id": "legalentityid",
"load": {
"dataframe": {
"_id": "legalentityid",
"group_name": "Reference",
"type": "dftFile",
"description": "Lei details load",
"maxrows": -1,
"source_sink": {
"sink_type": "ssFileType",
"sink_params": {},
"source_descriptor_type": "Mmap",
"allow_format_discovery": true,
"source_format_dialect": {
"lineterminator": "\n",
"delimiter": ",",
"headerat": 1,
"headerlines": 1,
"dialectname": "csv_char_strip",
"encode_header": true,
"force_vocab_stru": true
}
},
"format": "delimited",
"vocabulary": {
"newElementChar": {
"path": "extension_newElementChar",
"datatype": "string",
"formatdialect": {
"length": 16
}
},
"newElementDate": {
"path": "extension_newElementDate",
"datatype": "date",
"formatdialect": {
"dialect": "YYYY-MM-DD"
}
},
"NewElementNum": {
"path": "extension_NewElementNum",
"datatype": "number",
"formatdialect": {
"precision": 38,
"scale": 12
}
}
}
},
"interface": {
"_id": "legalentityid",
"format": "eaglemlformat",
"dataset": "legalentityid",
"dataframe": "legalentityid"
}
}
},
"worker_id": "svc-eds-5579bc956-fzl48/PID=12/WORKER=/IMPL=CPython"
}
AUTO_APPROVE schema drift mode
Schema drift mode AUTO_APPROVE handles schema drift without any manual steps.
Load New Custom Resource with Schema Drift
When schema drift detects new custom resource, these steps will be executed:
Example of request:
{
"resourcename": "priceMaster",
"dbprovider": "snowflake",
"schemadriftmode": "auto_approve"
}
Example of request with specified vendor and feedsystem:
By default vendor is BNYM, feedsystem is EAGLE
{
"resourcename": "priceMaster",
"dbprovider": "snowflake",
"schemadriftmode": "auto_approve",
"vendor": "VENDOR_NAME",
"feedsystem": "FEEDSYSTEM_NAME"
}
Example of request with schema drift tolerance:
This param is used on update of custom resource.
{
"resourcename": "priceMaster",
"dbprovider": "snowflake",
"schemadriftmode": "auto_approve",
"schemadrifttolerance": "2"
}
Data file example:
Example of auto_approve response when schema drift was found for new resource:
{
"status": 0,
"proc_stats": null,
"extract_file_name": "/app/data/estar/tpe/data/mc2/temp/da1t1cedstest_priceMaster_resource_output.json",
"additional_data_type": "string",
"processing_statuses": [
[
"Information",
"Starting processing an EBS request",
"eaglepy-i-1000",
"",
0,
"2022-07-07T10:53:58.606+00:00"
],
[
"Information",
"Executed on svc-eds-5579bc956-fzl48/PID=17/WORKER=/IMPL=CPython",
"eaglepy-i-1000",
"",
0,
"2022-07-07T10:54:19.684+00:00"
]
],
"worker_id": "svc-eds-5579bc956-fzl48/PID=17/WORKER=/IMPL=CPython",
"pipe": [
{
"status": 2,
"number_of_records": 0,
"proc_stats": {
"summary": {
"number_of_records": 0
}
},
"extract_file_name": "/app/data/estar/tpe/data/mc2/temp/da1t1cedstest_priceMaster_resource_output.json",
"additional_data": "{\"priceMaster\": {\"resource_version\": \"1.0.1\"}}",
"additional_data_type": "string",
"mime_type": "application/json",
"processing_statuses": [],
"worker_id": "svc-eds-5579bc956-fzl48/PID=17/WORKER=/IMPL=CPython"
},
{
"status": 0,
"number_of_records": 7,
"proc_stats": null,
"additional_data": "BEGIN;\nCREATE SCHEMA IF NOT EXISTS DA1T1CEDSTESTDBO;\nDROP TABLE IF EXISTS DA1T1CEDSTESTDBO.PRICEMASTER;\nDROP SEQUENCE IF EXISTS DA1T1CEDSTESTDBO.PRICEMASTER_SEQ;\nCREATE SEQUENCE IF NOT EXISTS DA1T1CEDSTESTDBO.PRICEMASTER_SEQ WITH START WITH = 1 INCREMENT BY = 1;\nCREATE TABLE IF NOT EXISTS DA1T1CEDSTESTDBO.PRICEMASTER (\nAASSKEVALUATIONCLEAN NUMBER(38,12),\nACCRUEDINTEREST NUMBER(38,12),\nADJUSTMENTFACTOR INTEGER,\nASKEVALUATIONCLEAN VARCHAR(100),\nASKEVALUATIONDIRTY NUMBER(38,12),\nASKEVALUATIONSPREAD VARCHAR(100),\nASKPRICE VARCHAR(100),\nASKYIELDCLEAN NUMBER(38,12),\nBENCHMARKIDENTIFIER VARCHAR(32),\nBENCHMARKNAME VARCHAR(152),\nBENCHMARKYIELD NUMBER(38,12),\nBIDEVALUATIONCLEAN VARCHAR(100),\nBIDEVALUATIONDIRTY NUMBER(38,12),\nBIDEVALUATIONSPREAD VARCHAR(100),\nBIDPRICE VARCHAR(100),\nBIDYIELDCLEAN NUMBER(38,12),\nCLOSECOMPPRICE VARCHAR(100),\nCONVEXITY NUMBER(38,12),\nCOUNTRYQUOTE VARCHAR(52),\nCOUNTRYQUOTECODE VARCHAR(8),\nCURRENCY VARCHAR(80),\nDURATION NUMBER(38,12),\nEVALUATIONTIMESTAMP DATETIME,\nEXCHANGE VARCHAR(148),\nEXCHANGECODE NUMBER(38,12),\nGPMASSETTYPE VARCHAR(100),\nHIGH VARCHAR(100),\nIDENTIFIER VARCHAR(48),\nISOCURRENCY VARCHAR(12),\nLOW VARCHAR(100),\nMACAULAYDURATION NUMBER(38,12),\nMEANEVALUATIONCLEAN VARCHAR(100),\nMEANEVALUATIONDIRTY NUMBER(38,12),\nMEANEVALUATIONSPREAD NUMBER(38,12),\nMEANYIELDCLEAN NUMBER(38,12),\nOPEN VARCHAR(100),\nPRICECURRENCY VARCHAR(12),\nPRICEDATE DATE,\nPRICEDERIVATION INTEGER,\nPRICEMASTER_INSTANCE INTEGER,\nPRICESOURCE VARCHAR(112),\nPRICETODATE DATE,\nPRIMEXCHTRADEPRICE VARCHAR(100),\nREDEMPTIONDATE DATE,\nREDEMPTIONPRICE INTEGER,\nVOLUME INTEGER,\nVWAP NUMBER(38,12),\nZCREATED_AT DATETIME,\nZINGESTED_AT DATETIME,\nZWHCREATED_AT DATETIME,\nZQS_PASSED_AT DATETIME,\nZCREATED_AT_END DATETIME,\nZUID VARCHAR(200),\nZROW_NUMBER INTEGER,\nZLINEAGE VARCHAR(300),\nZVARIANT VARCHAR(200),\nZTENANT_ID VARCHAR(50),\nZLINEAGE_HASH VARCHAR(300),\nZTRACKING_ID VARCHAR(200));\nCOMMIT;",
"additional_data_type": "string",
"mime_type": "text/plain",
"processing_statuses": [],
"worker_id": "svc-eds-5579bc956-fzl48/PID=17/WORKER=/IMPL=CPython"
},
{
"status": 0,
"number_of_records": 3,
"proc_stats": null,
"additional_data_type": "string",
"processing_statuses": [
[
"Information",
"Executed DM with flag updateldm for priceMaster resource",
"eaglepy-i-1000",
"",
0,
"2022-09-15T23:46:24.290-04:00"
],
[
"Information",
"Executed DM with flag updatevdm for priceMaster resource",
"eaglepy-i-1000",
"",
0,
"2022-09-15T23:46:24.290-04:00"
],
[
"Information",
"Executed DM with flag updateeqlldm for priceMaster resource",
"eaglepy-i-1000",
"",
0,
"2022-09-15T23:46:24.291-04:00"
]
],
"worker_id": "IKUZIN7400/PID=65284/WORKER=/IMPL=CPython"
},
{
"status": 0,
"number_of_records": 951,
"proc_stats": {
"partitions": [
{
"partition_id": "1",
"source": {
"input_records": 951,
"input_batches": 1
},
"number_of_records": 951,
"number_of_batches": 1,
"destinations": [
{
"destination_id": "priceMaster_in",
"number_of_records": 951,
"number_of_batches": 1
}
]
}
],
"summary": {
"source": {
"input_records": 951,
"input_batches": 1
},
"number_of_records": 951,
"number_of_batches": 1,
"destinations": [
{
"destination_id": "priceMaster_in",
"number_of_records": 951,
"number_of_batches": 1
}
]
},
"details": {
"processes": [
{
"process_id": "1",
"timings": [
{
"processing_stage": "sniff",
"total_time": 0.41,
"cpu_time": 0.42
},
{
"processing_stage": "e",
"total_time": 0,
"cpu_time": 0
},
{
"processing_stage": "g",
"total_time": 0.43,
"cpu_time": 0.44
},
{
"processing_stage": "openconn",
"total_time": 2.17,
"cpu_time": 1.31
},
{
"processing_stage": "topensinks",
"total_time": 2.17,
"cpu_time": 1.31
},
{
"processing_stage": "tout",
"total_time": 0.08,
"cpu_time": 0.08
},
{
"processing_stage": "tclosesinks",
"total_time": 2.57,
"cpu_time": 0.56
},
{
"processing_stage": "t",
"total_time": 5.27,
"cpu_time": 2.41
}
],
"target_sink": {
"partitions": [
{
"partition_id": 1,
"async_writes": 4,
"buffers": [
{
"stage_name": "asyncwrite0",
"number_of_buffered_records": 238,
"number_of_commits": 1,
"number_of_committed_records": 238,
"timings": {
"sf_cursor_execute": [
1.53,
0.15,
1,
1.53
],
"dbexecutemany": [
1.53,
0.15,
1,
1.53
]
}
},
{
"stage_name": "asyncwrite1",
"number_of_buffered_records": 238,
"number_of_commits": 1,
"number_of_committed_records": 238,
"timings": {
"sf_cursor_execute": [
1.8,
0.15,
1,
1.8
],
"dbexecutemany": [
1.8,
0.15,
1,
1.8
]
}
},
{
"stage_name": "asyncwrite2",
"number_of_buffered_records": 238,
"number_of_commits": 1,
"number_of_committed_records": 238,
"timings": {
"sf_cursor_execute": [
1.44,
0.1,
1,
1.44
],
"dbexecutemany": [
1.44,
0.1,
1,
1.44
]
}
},
{
"stage_name": "asyncwrite3",
"number_of_buffered_records": 237,
"number_of_commits": 1,
"number_of_committed_records": 237,
"timings": {
"sf_cursor_execute": [
1.73,
0.1,
1,
1.73
],
"dbexecutemany": [
1.73,
0.1,
1,
1.73
]
}
}
]
}
]
}
}
]
}
},
"additional_data_type": "string",
"processing_statuses": [],
"worker_id": "svc-eds-5579bc956-fzl48/PID=17/WORKER=/IMPL=CPython"
}
]
}
Load Core EDS Resource with Schema Drift
When schema drift is detected in data for core resource, these steps will be executed:
Example of request:
{
"resourcename": "legalentityid",
"dbprovider": "snowflake",
"schemadriftmode": "auto_approve"
}
Data file example:
Example of auto_approve response when schema drift was found for core resource:
{
"status": 0,
"proc_stats": null,
"extract_file_name": "/app/data/estar/tpe/data/mc2/temp/da1t1cedstest_legalentityid_resource_output.json",
"additional_data_type": "string",
"processing_statuses": [
[
"Information",
"Starting processing an EBS request",
"eaglepy-i-1000",
"",
0,
"2022-07-07T11:04:00.617+00:00"
],
[
"Information",
"Executed on svc-eds-5579bc956-fzl48/PID=13/WORKER=/IMPL=CPython",
"eaglepy-i-1000",
"",
0,
"2022-07-07T11:04:18.327+00:00"
]
],
"worker_id": "svc-eds-5579bc956-fzl48/PID=13/WORKER=/IMPL=CPython",
"pipe": [
{
"status": 2,
"number_of_records": 0,
"proc_stats": {
"summary": {
"number_of_records": 0
}
},
"extract_file_name": "/app/data/estar/tpe/data/mc2/temp/da1t1cedstest_legalentityid_resource_output.json",
"additional_data": "{\"legalentityid\": {\"resource_version\": \"2.0.19\"}}",
"additional_data_type": "string",
"mime_type": "application/json",
"processing_statuses": [],
"worker_id": "svc-eds-5579bc956-fzl48/PID=13/WORKER=/IMPL=CPython"
},
{
"status": 0,
"number_of_records": 8,
"proc_stats": null,
"additional_data": "BEGIN;\nCREATE SCHEMA IF NOT EXISTS SECURITYDBO;\nCREATE SCHEMA IF NOT EXISTS DA1T1CEDSTESTDBO;\nCREATE SEQUENCE IF NOT EXISTS SECURITYDBO.LEI_DETAILS_SEQ WITH START WITH = 1 INCREMENT BY = 1;\nCREATE SEQUENCE IF NOT EXISTS DA1T1CEDSTESTDBO.LEGALENTITYID_EXT_SEQ WITH START WITH = 1 INCREMENT BY = 1;\nCREATE TABLE IF NOT EXISTS SECURITYDBO.LEI_DETAILS (\nCITY VARCHAR(510),\nCOUNTRY_CODE VARCHAR(60),\nENTITY_STATUS VARCHAR(60),\nID_ASSIGNED_DATE DATE,\nID_CERTIFICATION_STATE VARCHAR(60),\nID_DISABLED_DATE DATE,\nID_LAST_UPDATE_DATE DATE,\nID_RECORD_STATE VARCHAR(60),\nLEGAL_FORM VARCHAR(510),\nLEGAL_NAME VARCHAR(510),\nLEI VARCHAR(20),\nNEXT_RENEWAL_DATE DATE,\nSTATE VARCHAR(510),\nSTREET VARCHAR(510),\nULTIMATE_PARENT_COMPANY VARCHAR(510),\nUPDATE_DATE DATETIME,\nUPDATE_SOURCE VARCHAR(255),\nVALIDATION_SOURCES VARCHAR(510),\nZIP VARCHAR(510),\nZCREATED_AT DATETIME,\nZINGESTED_AT DATETIME,\nZWHCREATED_AT DATETIME,\nZQS_PASSED_AT DATETIME,\nZCREATED_AT_END DATETIME,\nZUID VARCHAR(200),\nZROW_NUMBER INTEGER,\nZLINEAGE VARCHAR(300),\nZVARIANT VARCHAR(200),\nZTENANT_ID VARCHAR(50),\nZLINEAGE_HASH VARCHAR(300),\nZTRACKING_ID VARCHAR(200));\nCREATE TABLE IF NOT EXISTS DA1T1CEDSTESTDBO.LEGALENTITYID_EXT (\nLEI VARCHAR(20),\nNEWELEMENTCHAR VARCHAR(16),\nNEWELEMENTDATE DATE,\nNEWELEMENTNUM NUMBER(38,12),\nZCREATED_AT DATETIME,\nZINGESTED_AT DATETIME,\nZWHCREATED_AT DATETIME,\nZQS_PASSED_AT DATETIME,\nZCREATED_AT_END DATETIME,\nZUID VARCHAR(200),\nZROW_NUMBER INTEGER,\nZLINEAGE VARCHAR(300),\nZVARIANT VARCHAR(200),\nZTENANT_ID VARCHAR(50),\nZLINEAGE_HASH VARCHAR(300),\nZTRACKING_ID VARCHAR(200));\nCOMMIT;",
"additional_data_type": "string",
"mime_type": "text/plain",
"processing_statuses": [],
"worker_id": "svc-eds-5579bc956-fzl48/PID=13/WORKER=/IMPL=CPython"
},
{
"status": 0,
"number_of_records": 3,
"proc_stats": null,
"additional_data_type": "string",
"processing_statuses": [
[
"Information",
"Executed DM with flag updateldm for legalentityid resource",
"eaglepy-i-1000",
"",
0,
"2022-09-15T23:46:24.290-04:00"
],
[
"Information",
"Executed DM with flag updatevdm for legalentityid resource",
"eaglepy-i-1000",
"",
0,
"2022-09-15T23:46:24.290-04:00"
],
[
"Information",
"Executed DM with flag updateeqlldm for legalentityid resource",
"eaglepy-i-1000",
"",
0,
"2022-09-15T23:46:24.291-04:00"
]
],
"worker_id": "IKUZIN7400/PID=65284/WORKER=/IMPL=CPython"
},
{
"status": 0,
"number_of_records": 1,
"proc_stats": {
"partitions": [
{
"partition_id": "1",
"source": {
"input_records": 1,
"input_batches": 1
},
"number_of_records": 1,
"number_of_batches": 1,
"destinations": [
{
"destination_id": "legalentityid_in",
"number_of_records": 1,
"number_of_batches": 1
}
]
}
],
"summary": {
"source": {
"input_records": 1,
"input_batches": 1
},
"number_of_records": 1,
"number_of_batches": 1,
"destinations": [
{
"destination_id": "legalentityid_in",
"number_of_records": 1,
"number_of_batches": 1
}
]
},
"details": {
"processes": [
{
"process_id": "1",
"timings": [
{
"processing_stage": "sniff",
"total_time": 0,
"cpu_time": 0
},
{
"processing_stage": "e",
"total_time": 0,
"cpu_time": 0
},
{
"processing_stage": "g",
"total_time": 0,
"cpu_time": 0
},
{
"processing_stage": "openconn",
"total_time": 0.76,
"cpu_time": 0.48
},
{
"processing_stage": "topensinks",
"total_time": 0.76,
"cpu_time": 0.48
},
{
"processing_stage": "tout",
"total_time": 0,
"cpu_time": 0
},
{
"processing_stage": "tclosesinks",
"total_time": 2.94,
"cpu_time": 0.16
},
{
"processing_stage": "t",
"total_time": 3.7,
"cpu_time": 0.65
}
],
"target_sink": {
"partitions": [
{
"partition_id": 1,
"async_writes": 1,
"buffers": [
{
"stage_name": "asyncwrite0",
"number_of_buffered_records": 1,
"number_of_commits": 1,
"number_of_committed_records": 1,
"timings": {
"sf_cursor_execute": [
2.67,
0.06,
1,
2.67
],
"dbexecutemany": [
2.67,
0.06,
1,
2.67
]
}
}
]
}
]
}
}
]
}
},
"additional_data_type": "string",
"processing_statuses": [],
"worker_id": "svc-eds-5579bc956-fzl48/PID=13/WORKER=/IMPL=CPython"
}
]
}