With all the drag-and-drop goodness of AnyPoint Studio these days, it’s easy to forget that under the hood Mule ESB remains a very powerful, configurable and extendible framework. This power comes in handy when you’re faced with demanding file processing requirements in advance of Mule’s out-of-the-box functionality.
Old-school Mule bits
Time for a quick history lesson… Mule’s File Connector is an ‘old-school’ Mule transport (since let’s be honest, local filesystems haven’t changed that much in the last, ooh, 400 years or so). It’s based around the concepts of ‘connectors’ and ‘endpoints’ rather than ‘configurations’ and ‘operations’ common in newer connectors.
Importantly for us, the File Connector uses the concepts of Message Receiver, Requester and Dispatcher classes.
|MessageReceiver||Convert external source events into Mule messages.||Message Source (at the beginning of a flow)|
|MessageRequester||Retrieve an external event when requested by Mule and convert it into a Mule message.||Anywhere in the flow|
|MessageDispatcher||Send a Mule message out to an external endpoint.||Anywhere in the flow|
These concepts will become important as we discuss various file handling patterns.
Wait for it…
File outbound endpoints are inherently one-way endpoints - there’s no such thing as a ‘response’ to writing a file, so there’s no need to wait for one.
By default then, a file outbound endpoint is also asynchronous. Mule will create a thread pool behind the endpoint (the ‘dispatcher’ thread pool) and use these threads to write your file content while the main flow moves on to the next step.
This is great for throughput, but sometimes moving on immediately isn’t what you want. Consider:
- What if the file write fails?
- What if the file content is the result of a complex streaming transformation and you need to report any exceptions?
- What if you need to trigger an upload of the file once it has finished being written?
You can’t do any of those things unless your flow blocks and waits. We can make this happen by simply turning off that pesky dispatcher thread pool, instead forcing the flow thread to write the file.
To do this we need to explicitly create a
<file:connector> element (Mule will implicitly create and use a default File connector unless we tell it otherwise).
<file:connector name="synchronous-file-connector"> <dispatcher-threading-profile doThreading="false"/> </file:connector>
doThreading="false" attribute tells Mule to create a special thread pool with no actual threads in it. When a flow tries to write a file there will be no thread to hand off the I/O work to. Instead the flow thread itself will execute the file dispatcher code and not progress to the next flow step until the file write is completed.
We can then reference our synchronous connector in our flows:
<flow> <!-- Flow steps --> <file:outbound-endpoint path="/var/data" outputPattern="important.csv" connector-ref="synchronous-file-connector" /> <!-- More flow steps --> </flow>
The same concept applies to polling for inbound files, although in the inbound case thread control is not so important because the file receiver is doing so little work (just opening a FileInputStream essentially).
Note: If you have only a single
<file:connector>element in your Mule application, all file endpoints will use that connector (even if they don’t explicitly reference it). If you want some of your file endpoints to be blocking and some non-blocking, you’ll need to define two file connectors and explicitly reference which one to use at each file endpoint.
Don’t call us, we’ll call you
Mule supports reading files at the beginning of a flow. Sometimes though you need to read a file during a flow. e.g.
- Your flow is triggered by a job scheduler (e.g. Quartz), not a file polling inbound endpoint.
- You have to process files on a strict schedule, not as soon as they arrive.
- You need to retry file processing from the beginning of the file.
Sure you could do this with Mule’s scripting or custom Java extension points: just open the file using plain java.io classes right? But Mule gives us a lot of niceties like automatic close/move/delete handling. Can we have our cake and eat it too? It turns out we can, using Mule’s Requester Connector.
The Mule Requester Connector is a thin wrapper over Mule’s Java Client API. The client allows you to request
org.mule.api.MuleEvent objects from endpoints using their URL syntax. This invokes the Mule endpoint’s MessageRequester class (as discussed above). This allows you to get data in the middle of your flow from endpoints that can usually only start a flow (like a file receiver).
In this example below we are using the Requester Connecter to retrieve a file using a specially configured File Connector to not delete or move the file once we’ve finished reading it.
<!-- Special File Connector that does not auto-delete files once read --> <file:connector name="no-delete-file-connector" autoDelete="false" /> <!-- Mule Requester global config --> <mulerequester:config name="mule-requester-config"/> <flow> <!-- Flow steps --> <mulerequester:request config-ref="mule-requester-config" resource="file:///var/in/myfile.txt?connector=no-delete-file-connector" /> <!-- Payload now is a FileInputStream. --> <!-- More flow steps --> </flow>
Putting it all together: a friendly visit to your local (filesystem)
Mule has powerful abilities to handle large payloads with streaming (see our previous post). Unfortunately streaming brings its own problems, particularly around error handling. What if you’re half-way through transforming a large Salesforce CSV export when your network connection flakes out? Your only choice is to abort the entire transaction and start again.
How can we make this process more reliable? Instead of streaming direct from the source, we can use the local filesystem as a cache to download the file first and then stream the local copy into the transformation. This means we can retry the download as many times as we need to without abandoning the entire workflow. See below for an example:
<flow> <!-- Store the Salesforce Batch info POJO in a flow variable so we can reference it multiple times. --> <!-- Use a synchronous until-successful router to retry the batch download --> <until-successful maxRetries="3" synchronous="true"> <processor-chain> <sfdc:query-result-stream config-ref="salesforce-config"> <sfdc:batch-info ref="#[flowVars['batchInfo']]"/> </sfdc:query-result-stream> <file:outbound-endpoint path="/tmp/batches" outputPattern="#[flowVars['batchInfo'].id].csv" connector-ref="synchronous-file-connector"/> </processor-chain> </until-successful> <!-- If we get to this point we have successfully cached the SalesForce batch results to the local filesystem. --> <mulerequester:request config-ref="mule-requester-config" resource="file:///tmp/batches/#[flowVars['batchInfo'].id].xml"/> <!-- Now the message payload is a FileInputStream for our locally cached results file --> </flow>
Some key points to note:
<until-successful>router runs in synchronous mode, meaning the flow will block until the router succeeds or throws an exception after exceeding its retry limit.
- We stream the Salesforce result directly to the local filesystem (avoiding loading the entire file into memory).
<file:outbound-endpoint>uses our synchronous file connector so the
<until-successful>router blocks until the file is completely downloaded.
Knowledge is power. Understanding the concepts and design patterns behind Mule’s internals can help you better meet your requirements. Looking beyond the connectors bundled with AnyPoint Studio can make your solutions cleaner and more robust.
If you too identify as unapologetically 'geeky', join our team as we seek to solve wicked problems within Complex Programs, Process Engineering, Integration, Cloud Platforms, DevOps & more!
Leave a comment on this blog: