Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 7 Next »

In this page:

Stream Interaction and Available Flags

The queue is enabled once the Message Center receives the RTR thru eagle_ml-2-0_default_cm_control_message stream.

To put a task on the queue, you would give its RTR a processingSequencerNumber as in the example on this page:

Note

Only WORKFLOW tasks can be put on the queue. The queue starts with number 1
The control message stream checks that the RTR launches a workflow type task and in the case processingSequencerNumber is set, it sends a trigger message to eagle_ml-2-0_default_cm_sequencerstream. Example of such trigger:
ACTION:ADDTOQUEUE:CORRELATION_ID:'+ :varCorrId: + '\n'

Once the eagle_ml-2-0_default_cm_sequencer stream receives the trigger, it performs one of the following actions:

ADDTOQUEUE Action

In the case of ACTION=ADDTOQUEUE:

  1. sendToExecution flag is set to '1'; outOfSequenceMessage is set to '0' (the message is sent for further processing without any priority privileges).
  2. Information about the current state of the queue is acquired by the means of seq_get_params.inc
  3. If current ProcessingSequenceNumber is 1 greater than max ProcessingSequenceNumber processed (fed as vMaxProcessedSeq) or a workflow is currently processing or the pause stream flag equals '1', then sendToExecution flag is set to '0'.
    If current ProcessingSequenceNumber is not 1 greater than max ProcessingSequenceNumber processed (not equal vMaxProcessedSeq+1) then it is not the next in the queue. Then out of turn processing flag outOfSequenceMessage is set to '1'.
  4. If out of turn processing flag outOfSequenceMessage equals '1', the following trigger message is sent to the reported stream:

    'CorrelationId:'+ :varWrfName: + ':MessageType:OutOfOrder\n'
  5. If sendToExecution flag equals '1', the WorkflowStatus is set as follows:

    WorkflowStatus := :varCorrelationId:+'_'+:varProcessingSequenceNumber: + '_' + :currDate: + :currTime:

    and  NewSeqQueue = :vSeqQueue:

  6. If sendToExecution equals '0', NewSeqQueue gets the following value:

    NewSeqQueue= iif(StringLength(:vSeqQueue:) > 0,:vSeqQueue:+'\n','')+'1176!'+:CorrelationId:+'!1177!ACK|'+:varProcessingSequenceNumber:+'|'+ :currDate: + :currTime:
  7. Queue values are updated via seq_upd_params.inc.

  8. If sendToExecution equals '1', workflow processing is started with execute_task.inc.

EOFW Action

In the case of ACTION= 'EOFW':

  1. Information about the current state of the queue is acquired by the means of seq_get_params.inc
  2. If current ProcessingSequenceNumber is greater than max ProcessingSequenceNumber processed (vMaxProcessedSeq) then vMaxProcessedSeq is given the value of current ProcessingSequenceNumber.
  3. LastProcessedSeqInfo is set as:

    LastProcessedSeqInfo= :vNewMaxProcessedSeq: + '_' + :vTimestamp:
  4. Queue values are updated via seq_upd_params.inc.

  5. vWorkflowStatus is erased, meaning that there are no workflows currently processing.

STATE Action (Coming from Schedule Rule)

Example of such incoming message:

ACTION:STATE:SEQ_STATE:state of the sequence:bus_task_id:business task id:correlation_id:corrid:proc_status:...

In the case of ACTION='STATE' or in case workflow queue list (vSeqQueue:) is not empty and action= 'EOFW', the following steps are performed:

  1. If ACTION='STATE' information about the current state of the queue is acquired by the means of seq_get_params.inc
  2. If the queue is empty (:vSeqQueue: = 0) or if a workflow is currently processing, the steps in section 3 below are skipped and this round of processing is complete. 
  3. The entire queue vSeqQueue is scanned in a loop and:

    • ProcessingSequencerNumber (vSeqNumber), time of message arrival (vTime) and correlation ID (varCorrelationId) are received for each workflow in the queue.
    •  If the ProcessingSequencerNumber for current workflow equals '1' or is greater than the max in the queue, then vReadyForProcessing is set to '1' (else set to '0').
    •  If current workflow ProcessingSequencerNumber is not the next in the processing queue, vOutOfSequence is set to '1'.
    •  If vMaxProcessedTimeStr >0 , then

      vMaxProcessedDate = SubString(:vMaxProcessedTimeStr:, 0, 8);
    • And if :vMaxProcessedDate: != :currDate:

      :vMaxProcessedTimeStr: := '';
      :vMaxProcessedSeq: := 0;
    • If vSuspensionPeriodFlag=1 then

      :vReadyForProcessing: := '0';

      Else if ProcessingSequencerNumber for current workflow equals '1' or matches the ProcessingSequencerNumber for the next process in the queue, the processing flag :vReadyForProcessing: is set to '1'.
      Else if ProcessingSequencerNumber for current workflow is greater than the ProcessingSequencerNumber for the next process in the queue and the time interval in seconds between the moment the message has arrived to the queue and current time is greater than :w_seq_timeout: set in w_config_custom.inc, then :vReadyForProcessing: is set to '1' .


  4. If :vReadyForProcessing: = '1', the following steps are performed:

    • WorkflowStatus information about the queue is updated to display that a workflow is currently being processed (other processes in the queue will wait until it is complete):

      NewWorkflowStatus= :varCorrelationId:+'_'+:vSeqNumber: + '_' + :currDate: + :currTime:
    • Information about this workflow is removed from the queue of processes awaiting their turn (SeqQueue).

    • Queue information is updated by the means of seq_upd_params.inc.
    • If :vOutOfSequence: flag equals '1', a trigger message is sent to the reporter stream to inform that the message is going to be processed out of turn:

      'CorrelationId:'+ :varCorrelationId: + ':MessageType:OutOfOrder\n'
    • The workflow is sent for processing (a trigger is sent to the execution stream set in w_exec_folder global parameter):

      'CorrelationId:'+ :varCorrelationId: + '\n'

Sequencer_sch stream

This stream is launched every 30 seconds to check the list of workflows in the queue for processing. The result of this check is then sent to the  eagle_ml-2-0_default_cm_sequencer stream.

Additional details:

  1. /eagle_ml-2-0_cm/schedule/sequencer.sch is called to get the data:

    <CODE>:query: :='select \'STATE\' as ACTION,replace(ORCH_STATE_CLOB,\':\',\'^\') as SEQ_STATE, t.bus_task_id,t.correlation_id,t.proc_status,o.status,o.instance Q_INST,t.instance DEF_INST from PACE_MASTERDBO.ORCH_REQUEST_DEF t where t.proc_status = \'ACK\' and t.bus_task_id  like \'%#sequencer\' '; </CODE>
    <MSGEXCHANGE NAME="RUN_SQL" CONDITION="atoi(:g_bOrchLegEnabled:) > 0">
      <CHANNEL CHANNELTYPE="DB" />
      <DBREQUEST RESULTVAR="outvar" TIMEOUT="30" FORMAT="TAGVALUE">
         <EVENT>
            <IMPL DRIVER="OCI" SQLTYPE="SQL" EXPRESSION=":query:" />
            <INPUT_PAR />
         </EVENT>
      </DBREQUEST>
    </MSGEXCHANGE>
  2. /eagle_ml-2-0_cm/out/tagvalue/sequencer_sch.xml is called to send the data to :w_seq_folder: 
    For example:

    :w_seq_folder: := :w_mcdata_dir: + 'out/TAGVALUE/eagle_ml-2-0_default_cm_sequencer/incoming/';


  • No labels