Distribution to Multiple Destinations (Starting August 2019 Release)

Eagle Data Services has a facility to define distribution profiles to distribute data to multiple destinations.

Distribution Profiles

The distribution profiles are JSON files stored in eagle/estar/tpe/dynamic/metadata/distributions/profilename_profile.json

In the distribution profile, you can specify one or several destinations (sinks) to distribute the data. For example, to distribute to two different SFTP destinations, you can use the following profile:

 

Sample Distribution Profile
{ "data_sinks": [ { "sink": "sftp_sink", "type": "sftp", "credentials": { "user_id": "eagle", "password": "eagle1" }, "cfg": { "host": "inno-mlperfapp01", "port": 22, "compression_codec": "gzip" } }, { "sink": "sftp_extracts", "type": "sftp", "credentials": { "user_id": "eagle", "password": "eagle1" }, "cfg": { "host": "inno-mlperfapp01", "port": 22, "compression_codec": "gzip", "location": "extracts" } } ] }

You can specify the distribution profile in the RTR either by name using the DistributionProfileName parameter or as an embedded distribution profile in the DistributionProfileJsonDefinition parameter as a JSON document.

For example, if eagle/estar/tpe/dynamic/metadata/distributions/client_destinations_profile.json is defined, you can reference it in the RTR by using the DistributionProfileName=client_destinations parameter.

Encrypted Credentials

You can embed the credentials in the distribution profile as shown in the example above or the credentials may point to an encrypted configuration file.

For example:

"credentials": "clientcredentials/sftpcredentials"

In this case, the credentials are encrypted and retrieved from a file from

dynamic/dataservices/configs/clientcredentials.json.

The credentials reference has two parts: a credential group name and the specific credential name. In the above example, the credentials group name is clientcredentials and is translated into the credentials file name. The credential name is sftpcredentials and points to a section within the credentials file name.

Each encrypted credentials file has a region designation, which is the Oracle SID of the environment. If the credentials file is copied to a different environment, it fails to work.

There is a SOAP endpoint provided to save encrypted credentials. An example RTR to save the credentials is provided below:

Sample RTR To Store Credentials
<EagleML xmlns="http://www.eagleinvsys.com/2011/EagleML-2-0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:type="RunTaskRequest" eaglemlVersion="2-0" xsi:schemaLocation="http://www.eagleinvsys.com/2011/EagleML-2-0 eagleml-main-2-0.xsd" eaglemlType="RunTaskRequest"> <header> <messageId>B70DHO72GCLFRHGI</messageId> <sentBy>http://www.eagleinvsys.com</sentBy> <sendTo>http://www.eagleinvsys.com/eagle_default_ruleservice</sendTo> </header> <taskIdentifier> <correlationId>MLPERFSFTPCRED_{GUID}</correlationId> <businessTaskId>BB0573A416C18678</businessTaskId> </taskIdentifier> <taskTypeEnum>NEW</taskTypeEnum> <synchronousExecution>yes</synchronousExecution> <taskParameters> <taskParameter> <name>ActionType</name> <value>EXECUTE</value> </taskParameter> <taskParameter> <name>REST</name> <value>POST /eagle/v2/configurations/clientcredentials/sftpcredentials</value> </taskParameter> <taskParameter> <name>rest_body</name> <value>{ "user_id": "eagle", "password": "eagle1234", "passphrase": "test123" }</value> </taskParameter> </taskParameters> </EagleML>

The above RTR creates a credentials configuration which may be referred with the element

"credentials": "clientcredentials/sftpcredentials"

in the distribution profile file.

Example profile for sftp and encrypted credentials:

{ "data_sinks": [ { "sink": "sftp_sink", "type": "sftp", "credentials": "clientcredentials/sftpcredentials", "cfg": { "host": "o171-s123-la01.eagleinvsys.com", "port": 22, "compression_codec": "none", "location": "/apps/eagle/tmptxt/sftpdir" } } ] }

Data Encryption

The data to be distributed may be encrypted. Only GPG is currently supported. The encryption is enabled using the encryption_policy element in the distribution profile as shown in the example below.

Sample of Distribution With Encryption Profile
{ "data_sinks": [ { "sink": "sftp_extracts", "type": "sftp", "encryption_policy": { "codec" : "gpg", "credentials" : { "client_id": "eaglekey" } }, "credentials": "clientcredentials/sftpcredentials", "cfg": { "host": "inno-mlperfapp01", "port": 22, "compression_codec": "gzip", "location": "extracts" } } ] }
  • The encryption_policy has a codec element specifying the encryption type. Currently only gpg is supported.

  • The credentials/client_id specifies the GPG destination name.

  • The "credentials" element may be an embedded configuration as shown in the example or a string with a reference to the encrypted configuration as shown in Encrypted Credentials section.

  • The "encryption_policy" may be a string reference to an encryption policy configuration stored in the eagle/estar/tpe/dynamic/metadata/encryptions/policyname.json files.

For example, a file eagle/estar/tpe/dynamic/metadata/encryptions/gpgpolicy.json can exist with the following content:

Sample Encrypton Policies File
{ "encryption_policies" : { "gpg_policy" : { "codec" : "GPG", "armor": "gpgarmor", "credentials" : "eagle_certificates/gpgcert" } } }

In the distribution profile, you can reference it as:

"encryption_policy": "gpgpolicy/gpg_policy"

The GPG encryption policy currently supports the following parameters:

  • "armor" - the armor for GPG - please refer to GPG documentation for description.

  • "credentials"/"client_id" - the recipient, for which should have a certificate imported in GPG.

Include Sink Definitions

In the Distribution Profile, you can include other sink definitions and redefine their parameters. For example, if the file is named "test_sinks.json" in the eagle/estar/tpe/dynamic/metadata/distributions/ path with the following content:

Sample Include File
{ "sinks": { "sftp_sink": { "type":"SFTP", "credentials" : "test_cred/test_credentials", "cfg" : { "host":"sftp.eagleinvsys.com", "port":22, "location" : "client1/extract", "encryption_policy" : "test_encryption_policy/enc_policy1", "compression_codec": "GZIP" } } } }

You also can included it in a policy file "send_to_sftp_profile.json" as shown below:

Distribution Profile With Include
{ "include": ["test_sinks.json"], "data_sinks": [{ "sink" : "sftp_sink" } }

Data Compression

You can compress the distributed data by adding the "compression_codec": "GZIP" parameter to the "cfg" section of the sink definition. Only GZIP is supported presently.

SFTP Distribution Sink Parameters

The SFTP sink is designated with a "type": "SFTP" sink parameter.

The SFTP Distribution may take the following parameters:

  • "credentials"/"user_id" - the login user id on the SFTP server

  • "credentials"/"password" - the password to login to the SFTP server

  • "credentials"/"private_key" - the name to the private key file, if necessary. The private key file should be in the estar/tpe/dynamic/dataservices/certificates folder.

  • "credentials"/"passphrase" - the passphrase for the private key

  • "credentials"/"private_key_type"  - the private key type, for example RSA or DSA, defaults to RSA

  • "cfg"/"host" - the host name or IP address of the SFTP server

  • "cfg"/"port" - the port number of the SFTP server, usually set to 22

  • "cfg"/"make_dir" - values of true or false.  Set to true to create missing directories in the distribution path on the SFTP server and false to fail if the directory is missing. The default is true.

  • "cfg"/"location" - a path on the SFTP server to distribute the files to.

AZURE Publishing Configuration (Starting December 2019 release)

The Azure publishing configuration profile sample is below:

Sample Azure Publishing Configuration
{ "data_sinks": [{ "sink": "azure_dev", "type": "azureblob", "cfg": { "credentials": { "account": "devdataaccount", "application_id": "11111111-1111-470c-9d01-9999999999", "application_secret": "dalkjflj*SDjhDSljadaok", "tenant_id": "1111111111-1111111-1111-1111-111111111" }, "container_name": "devcontainer", "target_dir": "dropdir/dest", "compression_codec": "gzip" } } ] }

The "compression_codec" parameter has to be set to gzip to compress data or none to send uncompressed data.

EMAIL Distribution Sink Parameters

The EMAIL sink is designated with a "type": "EMAIL" sink parameter. (Available as of January 2021 release)

The EMAIL Distribution may take the following parameters:

  • "credentials"/"secure_connection" - values of true or false.  Set to true to tells an email server that an email client wants to turn an existing insecure connection into a secure one. The default is false. Should be used only when SMTP server support the secure connection

  • "credentials"/"ca_certs" -  the name to the CA certificate, if necessary to setup secure connection and load CA certificates. The CA certificate file should be in the estar/tpe/dynamic/dataservices/certificates folder and in PEM format. Load a set of “certification authority” (CA) certificates used to validate other peers’ certificates.  The CA certificate needs only to verify the validity of SSL certificates that have been signed by this certificate

  • "credentials"/"certfile" - the name to the certificate, if necessary to setup secure connection and load certificate. The certificate file should be in the estar/tpe/dynamic/dataservices/certificates folder and in PEM format. Load a private key and the corresponding certificate. Containing the certificate as well as any number of CA certificates needed to establish the certificate’s authenticity. If keyfile is not provided the private key will be taken from certfile as well

  • "credentials"/"keyfile" - the name to the key file, if necessary for certfile. The key file should be in the estar/tpe/dynamic/dataservices/certificates folder and in PEM format. If present, must point to a file containing the private key in certfile

  • "credentials"/"password"  - string value in profile. It will only be called if the private key is encrypted and a password is necessary. It will be ignored if the keyfile is not encrypted and no password is needed

  • "cfg"/"host" - the host name or IP address of the SMTP server. By default got from database for each environment

  • "cfg"/"port" - the port number of the SMTP server, usually set to 25.By default got from database for each environment

  • "cfg"/"contentType" - entity header is used to indicate the media type of the attached resource in email. Optional parameter

  • "cfg"/"mailFrom" -  address that identifies from where the email came. By default EDS@$hostname.com

  • "cfg"/"subject"  - the first text which recipients see when  an email reaches their inbox. Next placeholders are available: {env} - return the environment name; {profile_name} – return the name of the used distribution profile

  • "cfg"/"body"  - the text of the email body. Following placeholders are available: {env} - return the environment name; {profile_name} – return the name of the used distribution profile

  • "cfg"/"mailTo" - required parameter. Include the list of recipients. Used to send an email to a specific addresses

EMAIL publishing configuration profile sample is below:

{     "data_sinks":     [{             "sink": "email_dev",             "type": "EMAIL",             "cfg": {                 "credentials": {                     "secure_connection": true,                     "ca_certs": "test.pem"                 },                 "host": "eaglesmtp01.eagleinvsys.com",                 "port": "25",                 "contentType": "text/plain",                 "mailFrom": "test@sender.com",                 "subject": "Email from {env} was delivered by {profile_name}",                 "mailTo": "first_recipient@email.com,second_recipient@email.com",                 "body": "Example text of email body.\nAnother line of text.",                 "compression_codec": "gzip",                 "distributiondeliveryattempts": 5,                 "distributionretryintervalseconds": 3             },             encryption_policy": {             "codec" : "gpg",             "credentials" : {               "client_id": "eaglekey"                 }             }         }     ] }

FILE Distribution Sink Parameters

The FILE sink is designated with a"type": "ssFileType"sink parameter.

The FILE Distribution may take the following parameters:

  • "file_folder" - the target path for distribution file. The path start from root

  • "mode" -  setup a mode to work with a distributed file. Optional parameter

The FILE publishing configuration profile sample is below:

Sample FILE Publishing Configuration

{

"data_sinks":

[{

"sink":"file_dev",

"type":"ssFileType",

"file_folder":"/apps/eagle/estar/tpe/data/msgcenter/cmw/in/",

"mode":"w",

"compression_codec":"none",

"distributiondeliveryattempts": 5,

"distributionretryintervalseconds": 3,

"encryption_policy": {

"codec":"gpg",

"credentials": {

"client_id":"eaglekey"

}

}

}

]

}

AMQP Distribution

AMQP Distribution can be used to distribute to messaging brokers supporting AMQP (ActiveMQ, IBM MQ, ServiceBus, RabbitMQ).

The sink type in the distribution profile should be amqp.

The "cfg" part of the distribution sink definition will contain three sections:

  1. "publisher_config"  - will contain the configuration parameters to initialize the publisher and will identify the URL(s) of the AMQP server, the destination name, i.e. topic and queue, the user and password. The user, password and other identifiable information should be defined as "secrets:VAULT_KEY". We do support storing the certificates and keys for TLS communication in vault. In this case the certificates should be specified as:

"ssl_cert_file_content": "secrets:vault key for cert file",

"ssl_key_file_content": "secrets:vault key for key file",

"ssl_trust_store_content": "secrets:vault key for trust store file content",

"ssl_cert_file_content_base64": "secrets:vault key for base64 encoded cert file content",

"ssl_key_file_content_base64": "secrets:vault key for base64 encoded key file content",

"ssl_trust_store_content_base64": "secrets:vault key for base64 encoded trust store content"

The following parameters are currently supported for the publisher_config:

  • url  - the AMQP url. Can be amqp://host:port or for SSL connections: amqps://host:port

  • urls - a list of AMQP urls. Either url or urls should be specified.

  • address - the queue or topic name

  • sasl_enabled - determines if a SASL layer is used

  • allowed_mechs - an optional string specifying the SASL mechanisms allowed for this connection; the value is a space-separated list of mechanism names; the mechanisms allowed by default are determined by your SASL library and system configuration, with two exceptions: ``GSSAPI`` and ``GSS-SPNEGO`` are disabled by default; to enable them, you must explicitly add them using this option; clients must set the allowed mechanisms before the outgoing connection is attempted; servers must set them before the listening connection is setup

  • allow_insecure_mechs - true/false, a flag indicating whether insecure mechanisms, such as PLAIN over a non-encrypted socket, are allowed.

  • password - the authentication secret. Ignored without ``user`` parameter also being present.

  • user - the user to authenticate

  • virtual_host - the hostname to set in the Open performative used by peer to determine the correct back-end service for the client; if ``virtual_host`` is not supplied the host field from the URL is used instead.

  • offered_capabilities - a list of capabilities being offered to the peer. The list must contain symbols (or strings, which will be converted to symbols).

  • desired_capabilities - a list of capabilities desired from the peer. The list must contain symbols (or strings, which will be converted to symbols)

  • sni - a hostname to use with SSL/TLS Server Name Indication (SNI)

  • max_frame_size - the maximum allowable TCP packet size between the peers.

  • ssl_cert_file_content or ssl_cert_file_content_base64 - the content of the certificate for SSL connections

  • ssl_key_file_content or ssl_key_file_content_base64 - the content of the key for SSL connections

  • ssl_trust_store_content or ssl_trust_store_content_base64 - the content of the trust store for SSL connections

2. "application_properties"  - an object with key/value pairs specifying the properties of the messages to be published to the destination. These properties will be mapped as headers of the message. The values can contain placeholders from the publishing request in curly braces. 

3. "message_properties" - the message properties. Only a specific list of properties are supported. Using an invalid property will result in an error in delivery. The following properties are supported:

  • inferred - the inferred flag for a message indicates how the message content is encoded into AMQP sections. If inferred is true then binary and list values in the body of the message will be encoded as AMQP DATA and AMQP SEQUENCE sections, respectively. If inferred is false, then all values in the body of the message will be encoded as AMQP VALUE sections regardless of their type.

  • durable - The durable property indicates that the message should be held durably by any intermediaries taking responsibility for the message.

  • priority - The relative priority of the message, with higher numbers indicating higher priority. The number of available priorities depends on the implementation, but AMQP defines the default priority as the value 4

  • ttl - The time to live of the message measured in seconds. Expired messages may be dropped.

  • id - The globally unique id of the message, and can be used to determine if a received message is a duplicate.

  • user_id - The user id of the message creator.

  • subject - The subject of the message

  • reply_to  - the reply to address of the message

  • correlation_id - the correlation-id of the message

  • content_type - the RFC-2046 content type of the message

  • content_encoding - the content-encoding of the message

  • expiry_time - The absolute expiry time of the message in seconds using the Unix time_t [IEEE1003] encoding.

  • creation_time - The creation time of the message in seconds using the Unix time_t [IEEE1003] encoding.

  • group_id - the group-id of the message

  • group_sequence - the sequence of the message within its group

  • reply_to_group_id - the group-id for any replies

An example of a profile definition for AMQP distribution will be:

Example of AMQP distribution profile
{ "data_sinks": [ { "sink": "amqp_test", "type": "amqp", "cfg": { "publisher_config": { "url": "amqp://localhost:5672", "address": "test.queue.amqp" }, "application_properties": { "test_header": "test_value {test_parameter}" }, "message_properties": { "durable": True } } } ] }

 

The Distribution service sendmessage endpoint can be used to send messages to AMQP destinations. The following parameters have to be specified:

X-Eagle-Context - the tenant to which the distribution profile is assigned to

instructions - instructions to send the message. It should have the distributionprofilename parameter and optional processingparameters. The processing parameters will be used as placeholder values in application_properties and message_properties .in distribution profile.

If distribution service detects that there is no compression or encryption enabled it will use a direct asynchronous interface to send the messages to the AMQP destination. Otherwise the distribution will use the synchronous interface which can be slower. To disable the direct asynchronous interface the instructions request can have a disable_direct_message_send parameter. If set to true it will use the synchronous interface. In this case the reply will be different and contain a trace of the execution of the message.

The instructions can also have a content_type parameter which will override the default content type of the message which is derived from the file name.

Enable AMQP endpoint in ActiveMQ

To enable the AMQP endpoint in ActiveMQ add the following entry to activemq.xml configuration file under <transportConnectors>:

<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?transport.transformer=jms&maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>