While working on an integration project, we came across a very interesting requirement of sequencing file fed to a system based on pre-defined dependencies. This problem was complex because the target system did not want to see a file in S3 bucket (or blob container) unless the system has already processed and sent an acknowledgement for all the parents for the file. We were able to solve this problem using Azure Logic Apps in conjunction with Azure Service Bus Topics to implement asynchronous correlation of messages This post attempts to describe the problem and explain the solution that was used.
An upstream system (Producer) generates different types of files and places them all at a location (blob container). However, the downstream system (Consumer) should see those files in S3 bucket only in certain order of dependency. The Consumer is able to send an acknowledgement to Service Bus once it has finished processing a file. As depicted in the diagram below, File A does not have any dependencies. File B and File C have dependency on File A. File D has a dependency on File B and File C:
Figure i: File Dependency
The Producer creates all these files together and places them on a blob. These files can potentially arrive out-of-sequence. The Producer also puts one message for each file in a Service Bus Topic following the claim-check pattern.
The consumer monitors a separate location (S3 bucket in our case) for files and processes them as they arrive. As mentioned above, in our scenario, the consumer is able to send an acknowledgement to Service Bus when it has finished processing a file. Now, File A should be placed in the S3 bucket. Once the consumer has processed the file, only then should it see File B and File C. Similarly, once the consumer processes File B and File C, only then should it see File D.
Additionally, the Consumer can take up to 10 minutes to process each file.
Logic Apps was an ideal candidate for solving this kind of problems due to its support for long-running (long waiting in this case) stateful workflows. In an earlier post, Paco has explained how Correlation Identifier pattern can be implemented with Logic Apps and Webhooks. This solution solves a similar problem without the need to implement a Webhook. What we will use, is the Service Bus trigger, but instead of using it at the beginning of the workflow, we will use it in the middle of it. By doing so, we are telling the stateful Logic App to poll for a message and wait until it gets it. We will be using SessionId as the correlation identifier so the Logic App can continue only when it receives the corresponding acknowledgement.
The following components are used as part of the solution:
- Service bus topic (claim): where Producer puts the claim check message after placing the files in blob.
- Subscription to claim topic: to be used for Logic App trigger.
- Service bus topic (ack): where Consumer puts the file processing acknowledgement.
- Multiple Subscriptions (with file type filters) on ack topic: to be used inside the Logic App to manage dependencies.
- Logic App: to orchestrate the dependency management with file movement.
The main Logic App flow is defined as follows:
1. The Logic App is triggered by a message in the claim topic.
Figure ii: Topic trigger to start Logic App instance
2. Use a Compose action to define AckSessionId [PdlC1] along with other variables as needed. AckSessionId in this case is calculated by concatenating the BusinessUnit and the timestamp present in the filename itself. This value will be unique for a group of files that are processed together hence is a good candidate for identifying the correlation between the files:
Figure iii: Compose action to define Acknowledgement Session Id
3. Once triggered, it checks for the message and the file and builds the dependency array for the file based on the business rules. This array contains the names of the acknowledgement topic subscriptions on which to wait for the acknowledgement message. The switch case construct is used for this part of the logic.
Figure iv: Switch Case to define the Dependency Array
4. Based on this dependency array:
a. For each element in the dependency matrix:
Figure v: For-each loop action for the dependency array
b. A service bus topic trigger action is defined. This action waits for an acknowledgement message in the subscription for the parent file type with the specific SessionId defined in step 2 (variable AckSessionId):
Figure vi: Topic Trigger to wait for the correct acknowledgement
c. This action has a time out defined to make sure that the Logic App instance is not waiting forever for the acknowledgement message:
Figure vii: Timeout definition for the Topic Trigger
5. Once either the acknowledgement message is received or the timeout has occurred, the Logic App proceeds further to pick the file up from the blob storage and moves it to S3 bucket. Since there is no connector available for AWS S3 bucket, an Azure function was utilized to complete this action.
Figure viii: Azure Function call to copy the file to S3 bucket
Note that Service bus topic trigger is used to trigger start (Step 1) the Logic App instance as well as to wait for the dependency (Step 4b) in the middle of the Logic App. This pattern of inserting triggers in middle of Logic Apps is finding a lot of uses in various business scenarios.
Webhooks or Service Bus Sessions to implement correlation on stateful Logic Apps?
As mentioned above, Paco has described how to implement the Correlation Identifier pattern on a previous post using the Webkook action with the webhook subscribe and unsubscribe pattern. In this post, I’ve shown how to implement the same pattern but using Service Bus. Here a brief comparison of both approaches.
- Webhook action, does not poll, thus can be more cost effective. However, it might require an extra layer to store the callback url. We use this approach when there is only one correlated message and on long-running asynchronous correlations, to avoid the cost constant polling to Service Bus.
- Service Bus. It requires polling, but it might be simpler to implement. We use this approach when we have to wait for more than one correlated message or when correlated messages that don’t take too long to arrive.
- Based on the hierarchy level of a given file type, the timeout should be different. A file closer to the bottom of the hierarchy should have a higher acknowledgement timeout and vise-versa for the file closer to the top. However, Logic Apps do not provide a way to use a variable to define the timeout value. As a workaround, we had to define two different branches for two timeout values.
- AWS SDK for .NET provides rich API libraries to interact with AWS services. We used this SDK to talk to S3 bucked from the Azure function written in C#.
As demonstrated in this blog post, Azure Logic Apps is a great serverless platform that allows us to handle message dependency management. The support for stateful workflows in conjunction with trigger actions anywhere in the Logic App flow helps define long running dependencies with minimal running costs.
Please feel free to ask questions about this post in the comments section and I will try to answer them.