Configure the Async Engine

Async Engine is an asynchronous processing engine that allows you to address scenarios that require time-consuming logic or a high volume of requests through asynchronous processing. Some use cases include:

  • Large scale data processing - Scenarios where large volumes of data need to be processed, and synchronous processing would result in unacceptably long response times. Examples include processing bulk uploads, generating complex reports, or handling data-intensive calculations. The Async Engine can queue incoming requests, process them asynchronously, and provide the results to the caller when processing is complete.
  • Time consuming operations - Some operations may require extended processing time, making synchronous communication impractical. For instance, calling external APIs with high latency or performing computationally intensive tasks can benefit from asynchronous processing. The Async Engine allows these operations to be offloaded, ensuring that the caller is not blocked waiting for a response.
  • Rate limiting and throttling - APIs and services often impose rate limits or throttling constraints on their consumers. The Async Engine can help manage these constraints by queueing requests and processing them at a pace that adheres to the rate limits imposed by the APIs or services. This ensures that the system operates efficiently while respecting external constraints.
  • Resilient processing with redelivery and DLQ - In scenarios where errors or failures can occur during processing, the Async Engine offers resiliency through redelivery mechanisms and the use of a Dead Letter Queue (DLQ). This allows the system to retry processing in case of failures and to store messages that could not be processed in the DLQ for further analysis, reprocessing, or deletion.
  • Scalable workload distribution - As workloads increase, the Async Engine enables horizontal scaling by distributing the processing across multiple instances or workers. This ensures that the system can handle the increasing demand while maintaining performance and responsiveness.
  • Service decoupling - The Async Engine promotes a decoupled architecture, allowing individual components to evolve independently, enhancing maintainability and reducing the impact of changes to the system. This allows better separation of concerns and easier management of the application.

Asynchronous Flow

In synchronous processing, each step of a flow is executed sequentially, and the flow must wait for each step to complete before proceeding to the next one. This means that the entire flow is blocked until all steps finish.

The asynchronous implementation introduces message queues that collect incoming step inputs. Each step can operate independently as it consumes inputs from its queue, allowing for parallel processing and efficient resource utilization. Additionally, the message queues provide a buffer that can handle bursts of incoming requests, ensuring smooth processing and improved resilience in case of spikes in load.

Async Engine Configuration

To set up the Async Engine on your environment, you need to configure its storage account, queues, flows, tables, redelivery logic, outbound HTTP connections, and inbound thread pool.

1 Configure the Azure Storage Account

To configure the storage account that hosts the Async Engine, in the Configuration Manager, at the kv/<environment name>/async-engine path, edit the following key:

Key Value
azure.storage.config
Copy
{
    "accountName": "myAzureStorageAccount",
    "accountKey": "myAzureStorageAccountAuthKey",
    "suffix": "core.windows.net"
}
 

 

  • accountName - Azure storage account name
  • accountKey - Azure storage account key
  • suffix - The termination of the storage account URL. Defaults to core.windows.net for the Azure Cloud, but may differ for countries with their own cloud infrastructure (e.g. China).

2 Configure the Queues

To describe the configuration for each queue involved in the async processing, in the Configuration Manager, at the kv/<environment name>/async-engine path, edit the following key:

Key Value
queue.config
Copy
[
    {
      "name": "testasyncqueue1",
      "maxMessages": 10,
      "timeToLive": 1,
      "delay": 1000
    },
    {
      "name": "testasyncqueue2",
      "maxMessages": 10,
      "timeToLive": 1,
      "greedy": true
    },
    {
      "name": "testasyncqueue3",
      "maxMessages": 4,
      "timeToLive": 1,
      "delay": 2000,
      "greedy": false
    }
]
 
Parameter Description Data Type Default Value
name Name of the queue. string  
maxMessages Maximum number of messages that can be retrieved from the queue in one poll. The allowed range is 1 to 32 messages. integer 1
delay Number of milliseconds between polls. long 500
greedy When set to true, if a poll processes any messages, the next poll will run immediately, ignoring the delay setting. boolean false
timeToLive Number of hours a message stays alive in the queue. Set to -1 for no expiration. Must be -1 or any positive number. integer 1

3 Configure the Flows

To set up your flows, in the Configuration Manager, at the kv/<environment name>/async-engine path, edit the following key:

Key Value
flow.config
Copy
[
    {
      "flowName": "test",
      "queueMappings": [
        {
          "sourceQueueName": "testasyncqueue1",
          "endpointName": "ServicePipesTest_Endpoint1",
          "targetQueueName": "testasyncqueue2",
          "endpointUrl": "https: //google.com",
          "maxThreads": 10,
          "initial": true
        },
        {
          "sourceQueueName": "testasyncqueue2",
          "endpointName": "ServicePipesTest_Endpoint2",
          "targetQueueName": "testasyncqueue3",
          "maxThreads": 10
        },
        {
          "sourceQueueName": "testasyncqueue3",
          "endpointName": "ServicePipesTest_Endpoint3",
          "maxThreads": 10
        }
      ]
    }
]
 
  • flowName - The name of the asynchronous business flow. You can define multiple flows, each with its own queue mapping.
  • queueMappings - Settings for each step in the flow:
    • initial - Flag that indicates the first step in the flow.
    • sourceQueueName - The name of the queue that stores the step's inputs.
    • endpointName - Name of the endpoint called to process the inputs. If the endpoint is outside the FintechOS Platform, use the endpointUrl parameter instead.
    • endpointUrl - The URL of the endpoint called to process the inputs. If this is a FintechOS Platform endpoint, use the endpointName parameter instead. If you set up both an endpointName and an endpointUrl, the step will use only the endpointName.
    • targetQueueName - The name of the queue where the responses from the endpoint are sent (the sourceQueueName for the next step in the flow, as the step's outputs become inputs for the next step). The last step of the flow doesn't have a targetQueueName.
    • maxThreads - The number of processing threads that are used to process messages read from the step's queue. This depends on the maxMessages property from the queue.config key. For example, if the poll reads a batch of 32 messages, you should set this to at least 32 to ensure parallel processing. If processing takes too long, when the next batch of 32 messages is read, some of the processing threads may still be busy with messages from the previous batch. This means that new messages have to wait for the threads to finish their processing. Depending on your performance requirements, you may typically need to set up a higher number of threads than messages.

4 Configure the Storage Tables

The Async Engine uses Azure Table Storage to track the status of each asynchronous action, as well as the messages that encountered processing errors or exceeded the allowed number of redelivery attempts (the Dead Letter Queue - DLQ).

To configure the tables, in the Configuration Manager, at the kv/<environment name>/async-engine path, edit the following key:

Key Value
table.config
Copy
{
    "statusTable": "AsyncStatusTable",
    "dlqTable": "AsyncDlqTable"
}
 
  • statusTable - Name of the table where the processing status is stored.
  • dlqTable - Name of the Dead Letter Queue table, where messages are stored in case of an error.

5 Configure the Redelivery Logic

If a message delivery fails, you can set up a number of redelivery attempts before sending that message to the Dead Letter Queue. To do so, in the Configuration Manager, at the kv/<environment name>/async-engine path, edit the following key:

Key Value
redelivery.config
Copy
{
    "default": {
      "count": 2,
      "delay": 2000
    },
    "additional": [
      {
        "httpStatus": 502,
        "count": 2,
        "delay": 1000
      }
    ]
}
 
  • default - Default redelivery settings.
    • count - Number of redelivery attempts.
    • delay - Delay between redelivery attempts in milliseconds.
  • additional - Additional redelivery attempts settings for specific HTTP response status codes.
    • httpStatus - HTTP response status code.
    • count - Number of redelivery attempts.
    • delay - Delay between redelivery attempts in milliseconds.

If an Async Engine exception occurs during internal processing, the default redelivery configuration is used.

6 Configure the Outgoing HTTP Connection Pool

To optimize the usage of network resources and improve the overall performance and responsiveness, the Async Engine employs HTTP connection pooling. Instead of opening and closing a connection on each call, multiple endpoint connections are kept alive in a connection pool and reused in subsequent requests.

To configure the outgoing HTTP connection pool, in the Configuration Manager, at the kv/<environment name>/async-engine path, edit the following key:

Key Value
http.connection.config
Copy
{
    "requestTimeout": 5000,
    "connectionTimeout": 20000,
    "socketTimeout": 230000,
    "totalConnections": 2000,
    "routeConnections": 200,
    "connectionTimeToLive": 30000
}
 
Parameter Description Data Type Default Value
routeConnections The maximum number of connections per route (endpoint). integer 20
totalConnections The maximum number of connections in the pool. integer 200
connectionTimeToLive The number of milliseconds to keep a connection alive. By default, the connection is permanently kept alive. long  
requestTimeout The timeout in milliseconds to wait for a connection from the pool to be allocated to the request. A timeout value of zero is interpreted as an infinite timeout. A negative value is interpreted as undefined (system default).
If a finite timeout is set and reached before a connection becomes available, an error is thrown and the message is sent to the DLQ.
This waiting period is highly impacted by the volume of requests.
integer -1
connectionTimeout The timeout in milliseconds to attempt to establish communication with the endpoint. A timeout value of zero is interpreted as an infinite timeout. A negative value is interpreted as undefined (system default).
If a finite timeout is set and reached before the communication with the endpoint is established, an error is thrown and the message is sent to the DLQ.
This waiting period is highly impacted by the volume of requests.
integer -1
socketTimeout The timeout in milliseconds for waiting for data (a maximum period of inactivity between two consecutive data packets). A timeout value of zero is interpreted as an infinite timeout. A negative value is interpreted as undefined (system default).
If a finite timeout is set and the endpoint is unresponsive for the set time, an error is thrown and the message is sent to the DLQ.
This waiting period is highly impacted by the volume of requests.
integer -1
 
IMPORTANT!  
routeConnections and totalConnections should be set relative to the maxThreads property of the flow.config key. Each processing thread may require its own connection from the pool. How fast the responses are received also has an impact on the values that should be set. You should test and set up the number of connections accordingly.

7 Configure the Threads Pool for Incoming Requests

To optimize the usage of system resources and improve performance, a  pool of worker threads (with a corresponding work queue) is kept for all the requests arriving to the Async Engine that need to be routed to the initial queues of the various flows.

To configure the request threads pool, in the Configuration Manager, at the kv/<environment name>/async-engine path, edit the following key:

Key Value
threadpool.config
Copy
{
    "poolSize": 5,
    "maxPoolSize": "5",
    "maxQueueSize": "10"
}
 
Parameter Description Default Value
poolSize Default number of request threads to be kept alive in the pool. 10
maxPoolSize Maximum number of concurrent request threads. 20
maxQueueSize Maximum number of pending requests stored in the requests queue. Use -1 for an unbounded queue. 1000

8 Configure Async Engine Authentication and Authorization

To set up the Async Engine's authentication and authorization settings, in the Configuration Manager, at the kv/<environment name>/async-engine path, edit the following keys:

Key Value Description
openid.config
Copy
{
    "realm": "fintechOSRealm",
    "auth-server-url": "https://myServer.azurewebsites.net/auth",
    "ssl-required": "external",
    "resource": "admin-async-dev",
    "principal-attribute": "preferred_username",
    "credentials": {
        "secret": "mySecret"
    },
    "use-resource-role-mappings": "false",
    "autodetect-bearer-only": "true"
}
  • realm - The FintechOS realm configured in the FintechOS Identity Provider.
  • auth-server-url -The FintechOS Identity Provider discovery endpoint.
  • ssl-required - The default value is external meaning that HTTPS is required by default for external requests. In production environments, this should be set to all.
  • resource - Name of the Async Engine resource as defined in the FintechOS realm configured in the FintechOS Identity Provider.
  • principal-attribute - OpenID Connect ID Token attribute used to populate the UserPrincipal name. Possible values are: sub, preferred_username, email, name, nickname, given_name, and family_name. Default: preferred_username.
  • secret - Secret key set up in the FintechOS Identity Provider for the Async Engine.
  • use-resource-role-mappings - When true, the adapter retrieves the user's application level role mappings from the token. When false, it looks at the realm level role mappings. This should be set up in accordance with your OpenID configuration. Default: false.

  • autodetect-bearer-only - Set it to true. Do not change.

rbac.config

Copy
{
   "apiMappings":[
      {
         "url":"/api",
         "roles":[
            "async-user"
         ]
      }
   ],
   "applicationMappings":[
      {
         "url":"/actuator",
         "roles":[
            "async-admin"
         ]
      }
   ]
}
Property used to configure role based access (RBAC) in the Async Engine.
  • apiMappings - RBAC for Async Engine API endpoints
    • url - Relative path to the API endpoints of the async flows. Do not change.
    • roles - Platform user roles that have access to the async flows endpoints.
  • applicationMappings - RBAC for application management/configuration URL.
    • url - Relative path to the management URL. Do not change.
    • roles - Platform user roles that have access to the management URLs. Here power-users or administrator roles should be configured.
openapi.url https://myServer.azurewebsites.net/ftosapi/automation-processors/actions/ URL address of the OpenAPI component.
openapi.username

{username}

User name used by Async Engine when generating an access token to call the platform.
openapi.password

{password}

Password used by Async Engine when generating an access token to call the platform.

Configuration Example

Copy
{
  "azure.storage.config": {
    "accountName": "myAccount",
    "accountKey": "myAccountKey",
    "suffix": "core.windows.net"
  },
  "flow.config": [
    {
      "flowName": "test",
      "queueMappings": [
        {
          "sourceQueueName": "testasyncqueue1",
          "endpointName": "ServicePipesTest_Endpoint1",
          "targetQueueName": "testasyncqueue2",
          "endpointUrl": "https: //google.com",
          "maxThreads": 10,
          "initial": true
        },
        {
          "sourceQueueName": "testasyncqueue2",
          "endpointName": "ServicePipesTest_Endpoint2",
          "targetQueueName": "testasyncqueue3",
          "maxThreads": 10
        },
        {
          "sourceQueueName": "testasyncqueue3",
          "endpointName": "ServicePipesTest_Endpoint3",
          "maxThreads": 10
        }
      ]
    }
  ],
  "http.connection.config": {
    "requestTimeout": 5000,
    "connectionTimeout": 20000,
    "socketTimeout": 230000,
    "totalConnections": 2000,
    "routeConnections": 200,
    "connectionTimeToLive": 30000
  },
  "openapi.password": "myPassword",
  "openapi.url": "https://myEnvironment/ftosapi/automation-processors/actions/",
  "openapi.username": "host",
  "openid.config": {
    "use-resource-role-mappings": false,
    "autodetect-bearer-only": true,
    "ssl-required": "external",
    "auth-server-url": "https: //myEnvironment/auth",
    "principal-attribute": "preferred_username",
    "resource": "admin-servicepipes-mybank",
    "credentials": {
      "secret": "mySecret"
    },
    "realm": "fintechOSrealm"
  },
  "queue.config": [
    {
      "name": "testasyncqueue1",
      "maxMessages": 10,
      "timeToLive": 1
    },
    {
      "name": "testasyncqueue2",
      "maxMessages": 10,
      "timeToLive": 1
    },
    {
      "name": "testasyncqueue3",
      "maxMessages": 4,
      "timeToLive": 1
    }
  ],
  "rbac.config": {
    "apiMappings": [
      {
        "url": "/api",
        "roles": [
          "async-engine"
        ]
      }
    ],
    "applicationMappings": [
      {
        "url": "/actuator",
        "roles": [
          "async-engine"
        ]
      }
    ]
  },
  "redelivery.config": {
    "default": {
      "count": 2,
      "delay": 2000
    },
    "additional": [
      {
        "httpStatus": 502,
        "count": 2,
        "delay": 1000
      }
    ]
  },
  "table.config": {
    "statusTable": "AsyncStatusTable",
    "dlqTable": "AsyncDlqTable"
  },
  "threadpool.config": {
    "poolSize": 5,
    "maxPoolSize": 5,
    "maxQueueSize": 10
  }
}