Overview

This post will cover how to configure end-to-end streaming in Mule 4 when performing data transformations with DataWeave. What streaming is and how it works in Mule 4 are covered in detail in the following MuleSoft blogs:

I would recommend you read those first if you need to brush up on those topics. In this post we'll create a prototype application that will receive a JSON array and stream it through a DataWeave transform. The complete array will be larger than memory, but we'll see how Mule can stream the payload through the flow, transform it, and start to return data to the sender right away, before it gets to the end of the payload.

How To Set Up The Tutorial

We'll be creating a simple Mule app and some Bash commands that allow the following to take place:

Screen-Shot-2019-09-16-at-08.31.17

To get started, create a new Mule 4 project. It doesn't need to have an API associated with it. Create a single flow with an HTTP Listener as the source. It should listen on the /stream path. Add a Transform Message component to the flow as well:

<http:listener-config name="HTTP_Listener_config" doc:name="HTTP Listener config" >
  <http:listener-connection host="0.0.0.0" port="8081" />
</http:listener-config>
<flow name="dw-streamingFlow" >
  <http:listener doc:name="Listener" config-ref="HTTP_Listener_config" path="/stream"/>
  <ee:transform doc:name="Transform Message" >
    <ee:message >
      <ee:set-payload ><![CDATA[%dw 2.0
output application/java
---
{
}]]></ee:set-payload>
    </ee:message>
  </ee:transform>
</flow>

Let's assume our input will be something like this:

[
  {"number": 1},
  {"number": 2},
  ...
  {"number": 1000000}
]

We want our application to return JSON and append whether each number is even or odd:

[
  {"number": 1, "parity": "even"},
  {"number": 2, "parity": "odd"},
  ...
  {"number": 1000000, "parity": "even"}
]

Let's add this functionality to the Transform Message component:

%dw 2.0
output application/json

fun parity(n: Number): String =
  if (isEven(n))
    "even" 
  else 
    "odd"
---
payload map (obj) -> 
  obj ++ {parity: parity(obj.number)}

So far we just have a normal app setup that takes in JSON and adds the parity of each value for the output. As the app is set up now, we're not going to get the end-to-end streaming we're going for. To get that, we will need to configure the HTTP Listener and make a small change to our Transform Message component.

For the HTTP Listener, we need to add streaming to the MIME type, and select a streaming strategy that works for this use case (non-repeatable-stream):

<http:listener 
  doc:name="Listener" 
  config-ref="HTTP_Listener_config" 
  path="/stream" 
  outputMimeType="application/json; streaming=true">
  <non-repeatable-stream />
</http:listener>

If you're using Anypoint Studio, you can modify the streaming strategy for the HTTP Listener in the "Advanced" tab

Setting streaming=true in the outputMimeType will also help inform DataWeave of how to handle the input.

If you're using Anypoint Studio, you can modify the MIME type for the HTTP Listener in the "MIME Type" tab

We're using a non-repeatable-stream in this case because we know a few things:

  • We won't need to read the contents of the stream more than once (e.g. log then read/modify)
  • We won't need to access the stream concurrently (e.g. scatter-gather)

For the Transform Message component, we just need to add deferred=true to the output declaration to enable streaming:

%dw 2.0
output application/json deferred=true
...

When you're finished, your configuation XML should look something like this:

<http:listener-config name="HTTP_Listener_config" doc:name="HTTP Listener config" >
  <http:listener-connection host="0.0.0.0" port="8081" />
</http:listener-config>
<flow name="dw-streamingFlow" >
  <http:listener doc:name="Listener" config-ref="HTTP_Listener_config" path="/stream" outputMimeType="application/json; streaming=true">
    <non-repeatable-stream />
  </http:listener>
  <ee:transform doc:name="Transform Message" >
    <ee:message >
      <ee:set-payload ><![CDATA[%dw 2.0
output application/json deferred=true

fun parity(n: Number): String =
  if (isEven(n))
    "even" 
  else 
    "odd"
---
payload map (obj) -> obj ++ {parity: parity(obj.number)}]]></ee:set-payload>
    </ee:message>
  </ee:transform>
</flow>

Now that our Mule app is configured we need to create our payload and stream it to the application. To do this, we'll use a combination of the DataWeave CLI and curl.

dw "output application/json --- (1 to 1000000000) map (n) -> {number: n}" \
  | curl http://localhost:8081/stream -X POST -T "-" 

The -X flag lets us set the HTTP method and the -T flag transfers a file to the specified URL. If you pass the -T flag "-" it will read from stdin. We're using the pipe, | to send the output of the dw command to the stdin of the next command, curl. This will effectively stream a JSON payload to the URL.

When you execute the command you should start to see JSON streaming to your terminal. If you terminate the process you'll see Mule inform you in the console that it crashed because of a broken pipe. Basically: the client terminated the stream before it could end. And there you have it, end to end streaming in Mule 4.

Additional Considerations

When streaming through a Mule application while performing transforms, it's important to note that DataWeave can currently only stream certains data types, and Java Iterators. The data types are:

  • CSV
  • JSON (if payload is an Array)

Note that when streaming, the HTTP server has already returned a 200 OK response to the client. In the middle of the stream if something goes wrong, Mule has no way of letting the client know. To see what happens in situation, modify the DataWeave code and run the script again:

%dw 2.0
output application/json deferred=true

import failIf from dw::Runtime

fun parity(n: Number): String =
  if (isEven(n))
    "even" 
  else 
    "odd"
---
payload map (obj, idx) -> 
  obj ++ {parity: parity(obj.number)} failIf (idx == 1000000)

curl will sit there for a while waiting for more data to come through before timing out and failing with a broken pipe, but it will not receive any information from Mule about what went wrong. Client applications will need to take this into consideration.

Conclusion

You can achieve end-to-end streaming transformations with DataWeave fairly simply if you know what steps you need to take. This will enable a client to stream data to a server, and receive streamed data from that server at the same time. Keep in mind that this functionality is currently only available for CSVs, JSONs where the root is an Array, and Java Iterables. It's also important to remember that in the cases of streaming, Mule immediately returns a response to the client, so if something goes wrong in the middle of streaming, the client will not receive any info from Mule about what went wrong. While this app streamed JSON to and from the client, it's also possible to stream CSV from the client and return JSON from Mule, or vice versa. Happy streaming!