Configure the Async Engine
Async Engine is an asynchronous processing engine that allows you to address scenarios that require time-consuming logic or a high volume of requests through asynchronous processing. Some use cases include:
- Large scale data processing - Scenarios where large volumes of data need to be processed, and synchronous processing would result in unacceptably long response times. Examples include processing bulk uploads, generating complex reports, or handling data-intensive calculations. The Async Engine can queue incoming requests, process them asynchronously, and provide the results to the caller when processing is complete.
- Time consuming operations - Some operations may require extended processing time, making synchronous communication impractical. For instance, calling external APIs with high latency or performing computationally intensive tasks can benefit from asynchronous processing. The Async Engine allows these operations to be offloaded, ensuring that the caller is not blocked waiting for a response.
- Rate limiting and throttling - APIs and services often impose rate limits or throttling constraints on their consumers. The Async Engine can help manage these constraints by queueing requests and processing them at a pace that adheres to the rate limits imposed by the APIs or services. This ensures that the system operates efficiently while respecting external constraints.
- Resilient processing with redelivery and DLQ - In scenarios where errors or failures can occur during processing, the Async Engine offers resiliency through redelivery mechanisms and the use of a Dead Letter Queue (DLQ). This allows the system to retry processing in case of failures and to store messages that could not be processed in the DLQ for further analysis, reprocessing, or deletion.
- Scalable workload distribution - As workloads increase, the Async Engine enables horizontal scaling by distributing the processing across multiple instances or workers. This ensures that the system can handle the increasing demand while maintaining performance and responsiveness.
- Service decoupling - The Async Engine promotes a decoupled architecture, allowing individual components to evolve independently, enhancing maintainability and reducing the impact of changes to the system. This allows better separation of concerns and easier management of the application.
Asynchronous Flow
In synchronous processing, each step of a flow is executed sequentially, and the flow must wait for each step to complete before proceeding to the next one. This means that the entire flow is blocked until all steps finish.
The asynchronous implementation introduces message queues that collect incoming step inputs. Each step can operate independently as it consumes inputs from its queue, allowing for parallel processing and efficient resource utilization. Additionally, the message queues provide a buffer that can handle bursts of incoming requests, ensuring smooth processing and improved resilience in case of spikes in load.
Async Engine Configuration
To set up the Async Engine on your environment, you need to configure its storage account, queues, flows, tables, redelivery logic, outbound HTTP connections, and inbound thread pool.
1 Configure the Azure Storage Account
To configure the storage account that hosts the Async Engine, in the Configuration Manager, at the kv/<environment name>/async-engine
path, edit the following key:
Key | Value |
---|---|
azure.storage.config
|
Copy
|
accountName
- Azure storage account nameaccountKey
- Azure storage account keysuffix
- The termination of the storage account URL. Defaults to core.windows.net for the Azure Cloud, but may differ for countries with their own cloud infrastructure (e.g. China).
2 Configure the Queues
To describe the configuration for each queue involved in the async processing, in the Configuration Manager, at the kv/<environment name>/async-engine
path, edit the following key:
Key | Value |
---|---|
queue.config
|
Copy
|
Parameter | Description | Data Type | Default Value |
---|---|---|---|
name | Name of the queue. | string | |
maxMessages | Maximum number of messages that can be retrieved from the queue in one poll. The allowed range is 1 to 32 messages. | integer | 1 |
delay | Number of milliseconds between polls. | long | 500 |
greedy | When set to true, if a poll processes any messages, the next poll will run immediately, ignoring the delay setting. | boolean | false |
timeToLive | Number of hours a message stays alive in the queue. Set to -1 for no expiration. Must be -1 or any positive number. | integer | 1 |
3 Configure the Flows
To set up your flows, in the Configuration Manager, at the kv/<environment name>/async-engine
path, edit the following key:
Key | Value |
---|---|
flow.config
|
Copy
|
flowName
- The name of the asynchronous business flow. You can define multiple flows, each with its own queue mapping.queueMappings
- Settings for each step in the flow:initial
- Flag that indicates the first step in the flow.sourceQueueName
- The name of the queue that stores the step's inputs.endpointName
- Name of the endpoint called to process the inputs. If the endpoint is outside the FintechOS Platform, use theendpointUrl
parameter instead.endpointUrl
- The URL of the endpoint called to process the inputs. If this is a FintechOS Platform endpoint, use theendpointName
parameter instead. If you set up both anendpointName
and anendpointUrl
, the step will use only theendpointName
.targetQueueName
- The name of the queue where the responses from the endpoint are sent (thesourceQueueName
for the next step in the flow, as the step's outputs become inputs for the next step). The last step of the flow doesn't have atargetQueueName
.maxThreads
- The number of processing threads that are used to process messages read from the step's queue. This depends on themaxMessages
property from thequeue.config
key. For example, if the poll reads a batch of 32 messages, you should set this to at least 32 to ensure parallel processing. If processing takes too long, when the next batch of 32 messages is read, some of the processing threads may still be busy with messages from the previous batch. This means that new messages have to wait for the threads to finish their processing. Depending on your performance requirements, you may typically need to set up a higher number of threads than messages.
4 Configure the Storage Tables
The Async Engine uses Azure Table Storage to track the status of each asynchronous action, as well as the messages that encountered processing errors or exceeded the allowed number of redelivery attempts (the Dead Letter Queue - DLQ).
To configure the tables, in the Configuration Manager, at the kv/<environment name>/async-engine
path, edit the following key:
Key | Value |
---|---|
table.config
|
Copy
|
- statusTable - Name of the table where the processing status is stored.
- dlqTable - Name of the Dead Letter Queue table, where messages are stored in case of an error.
5 Configure the Redelivery Logic
If a message delivery fails, you can set up a number of redelivery attempts before sending that message to the Dead Letter Queue. To do so, in the Configuration Manager, at the kv/<environment name>/async-engine
path, edit the following key:
Key | Value |
---|---|
redelivery.config
|
Copy
|
default
- Default redelivery settings.count
- Number of redelivery attempts.delay
- Delay between redelivery attempts in milliseconds.
additional
- Additional redelivery attempts settings for specific HTTP response status codes.httpStatus
- HTTP response status code.count
- Number of redelivery attempts.delay
- Delay between redelivery attempts in milliseconds.
If an Async Engine exception occurs during internal processing, the default redelivery configuration is used.
6 Configure the Outgoing HTTP Connection Pool
To optimize the usage of network resources and improve the overall performance and responsiveness, the Async Engine employs HTTP connection pooling. Instead of opening and closing a connection on each call, multiple endpoint connections are kept alive in a connection pool and reused in subsequent requests.
To configure the outgoing HTTP connection pool, in the Configuration Manager, at the kv/<environment name>/async-engine
path, edit the following key:
Key | Value |
---|---|
http.connection.config
|
Copy
|
Parameter | Description | Data Type | Default Value |
---|---|---|---|
routeConnections | The maximum number of connections per route (endpoint). | integer | 20 |
totalConnections | The maximum number of connections in the pool. | integer | 200 |
connectionTimeToLive | The number of milliseconds to keep a connection alive. By default, the connection is permanently kept alive. | long | |
requestTimeout | The timeout in milliseconds to wait for a connection from the pool to be allocated to the request. A timeout value of zero is interpreted as an infinite timeout. A negative value is interpreted as undefined (system default). If a finite timeout is set and reached before a connection becomes available, an error is thrown and the message is sent to the DLQ. This waiting period is highly impacted by the volume of requests. |
integer | -1 |
connectionTimeout | The timeout in milliseconds to attempt to establish communication with the endpoint. A timeout value of zero is interpreted as an infinite timeout. A negative value is interpreted as undefined (system default). If a finite timeout is set and reached before the communication with the endpoint is established, an error is thrown and the message is sent to the DLQ. This waiting period is highly impacted by the volume of requests. |
integer | -1 |
socketTimeout | The timeout in milliseconds for waiting for data (a maximum period of inactivity between two consecutive data packets). A timeout value of zero is interpreted as an infinite timeout. A negative value is interpreted as undefined (system default). If a finite timeout is set and the endpoint is unresponsive for the set time, an error is thrown and the message is sent to the DLQ. This waiting period is highly impacted by the volume of requests. |
integer | -1 |
routeConnections
and totalConnections
should be set relative to the maxThreads
property of the flow.config key. Each processing thread may require its own connection from the pool. How fast the responses are received also has an impact on the values that should be set. You should test and set up the number of connections accordingly.7 Configure the Threads Pool for Incoming Requests
To optimize the usage of system resources and improve performance, a pool of worker threads (with a corresponding work queue) is kept for all the requests arriving to the Async Engine that need to be routed to the initial queues of the various flows.
To configure the request threads pool, in the Configuration Manager, at the kv/<environment name>/async-engine
path, edit the following key:
Key | Value |
---|---|
threadpool.config
|
Copy
|
Parameter | Description | Default Value |
---|---|---|
poolSize | Default number of request threads to be kept alive in the pool. | 10 |
maxPoolSize | Maximum number of concurrent request threads. | 20 |
maxQueueSize | Maximum number of pending requests stored in the requests queue. Use -1 for an unbounded queue. | 1000 |
8 Configure Async Engine Authentication and Authorization
To set up the Async Engine's authentication and authorization settings, in the Configuration Manager, at the kv/<environment name>/async-engine
path, edit the following keys:
Key | Value | Description |
---|---|---|
openid.config |
Copy
|
|
rbac.config |
Copy
|
Property used to configure role based access (RBAC) in the Async Engine.
|
openapi.url | https://myServer.azurewebsites.net/ftosapi/automation-processors/actions/ | URL address of the OpenAPI component. |
openapi.username |
{username} |
User name used by Async Engine when generating an access token to call the platform. |
openapi.password |
{password} |
Password used by Async Engine when generating an access token to call the platform. |
Configuration Example
{
"azure.storage.config": {
"accountName": "myAccount",
"accountKey": "myAccountKey",
"suffix": "core.windows.net"
},
"flow.config": [
{
"flowName": "test",
"queueMappings": [
{
"sourceQueueName": "testasyncqueue1",
"endpointName": "ServicePipesTest_Endpoint1",
"targetQueueName": "testasyncqueue2",
"endpointUrl": "https: //google.com",
"maxThreads": 10,
"initial": true
},
{
"sourceQueueName": "testasyncqueue2",
"endpointName": "ServicePipesTest_Endpoint2",
"targetQueueName": "testasyncqueue3",
"maxThreads": 10
},
{
"sourceQueueName": "testasyncqueue3",
"endpointName": "ServicePipesTest_Endpoint3",
"maxThreads": 10
}
]
}
],
"http.connection.config": {
"requestTimeout": 5000,
"connectionTimeout": 20000,
"socketTimeout": 230000,
"totalConnections": 2000,
"routeConnections": 200,
"connectionTimeToLive": 30000
},
"openapi.password": "myPassword",
"openapi.url": "https://myEnvironment/ftosapi/automation-processors/actions/",
"openapi.username": "host",
"openid.config": {
"use-resource-role-mappings": false,
"autodetect-bearer-only": true,
"ssl-required": "external",
"auth-server-url": "https: //myEnvironment/auth",
"principal-attribute": "preferred_username",
"resource": "admin-servicepipes-mybank",
"credentials": {
"secret": "mySecret"
},
"realm": "fintechOSrealm"
},
"queue.config": [
{
"name": "testasyncqueue1",
"maxMessages": 10,
"timeToLive": 1
},
{
"name": "testasyncqueue2",
"maxMessages": 10,
"timeToLive": 1
},
{
"name": "testasyncqueue3",
"maxMessages": 4,
"timeToLive": 1
}
],
"rbac.config": {
"apiMappings": [
{
"url": "/api",
"roles": [
"async-engine"
]
}
],
"applicationMappings": [
{
"url": "/actuator",
"roles": [
"async-engine"
]
}
]
},
"redelivery.config": {
"default": {
"count": 2,
"delay": 2000
},
"additional": [
{
"httpStatus": 502,
"count": 2,
"delay": 1000
}
]
},
"table.config": {
"statusTable": "AsyncStatusTable",
"dlqTable": "AsyncDlqTable"
},
"threadpool.config": {
"poolSize": 5,
"maxPoolSize": 5,
"maxQueueSize": 10
}
}