Messaging and Events Framework
SINCE VERSION 7.0
What is Messaging?
PIPEFORCE has a built-in messaging system where application messages can be routed between microservices based on conditions, like routing keys, for example.
By default, as internal message broker RabbitMQ is used as a service module: https://www.rabbitmq.com/documentation.html . It's one of the most advanced and widely used messaging brokers in the world.
In order to send and receive messages to/from this messaging broker, you have two options:
Develop a microservice using a RabbitMQ client library and deploy it to PIPEFORCE using the service.deploy command. For details about writing such a microservice which produces and consumes messages, see the section messaging and microservices.
Write a pipeline using the message.receive and message.send commands and let PIPEORCE manage connections, exchanges, queues, bindings and consumers for you.
This section will cover the second part: How to write pipelines which send and receive messages to/from the messaging broker.
Receiving messages
Receiving messages in a pipeline is simple: Use the command message.receive and specify the message key of interest. After you have stored the pipeline, it will be executed every time this key occurs. No need to manage connections, queues, bindings or similar.
Let's assume you have a pipeline, which sends an email like this:
pipeline:
- mail.send:
to: "sales@company.tld"
subject: "New Sales Order"
message: "Hello, a new sales order has been created!"
Now you would like to listen for new sales orders. Every time a new such sales order has been created, we would like to send this email.
Let's assume, the unique messaging key sales.order.created
was defined for this.
With this information we can now extend our pipeline easily to listen to messages with this key and automatically send an email, every time such a message appears:
pipeline:
- message.receive:
key: "sales.order.created"
- mail.send:
to: "sales@company.tld"
subject: "New Sales Order"
message: "Hello, a new sales order has been created!"
As you can see, we added the command message.receive
at the very beginning. It's important that this command is always at the very beginning and its the only message.receive
command in the pipeline. After the pipeline has been stored in the property stores, Any command below message.receive
will then be executed every time a message with the given key appears.
After you stored it, the pipeline then starts to listen: Any time a message with key sales.order.created
happens, this pipeline will be informed about this and executes any command below message.receive
. So in this example this will send a new email any time this message happens.
Managed Queue
PIPEFORCE can manage the creation, registration and deletion of exchanges, consumers, queues and bindings automatically for you.
As soon as you save a pipeline containing a message.receive
command to the property store, by default a new queue with a name given by parameter queue
will be automatically created for you, if not already exists. In case no queue
parameter is given, the queue name will be automatically derived from the pipeline name (= default name).
This default name has the format APPNAME_pipeline_PIPELINENAME
, whereas APPNAME
will be replaced by the name of the app, the pipeline resides in and PIPELINENAME
by the name of the pipeline which contains the message.receive
command. For example:
tld.domain.myapp_pipeline_listen-invoices
Additionally, a binding and a consumer listening to the given message key will be automatically created for you and linked with the queue. So no queue, binding or consumer management is required by default.
If you delete or change a message.receive
command inside a pipeline, the according consumer will be removed, but the queue and bindings will not be deleted by default.
How to change the default?
You can change this default behaviour by using the parameter manageQueue
which can be set to these values:
false
= No message entities like queues and bindings will be created or deleted automatically. You have to manage all of this by your own (not recommended).create
= This is the default. In this case, the queue will be created automatically in case it doesn't exist yet and the bindings will be attached to it. But it wont be altered or deleted automatically afterwards.delete
= In this case, the queue will be deleted in case themessage.receive
command has been changed or removed from the pipeline or the pipeline got deleted. The creation of queue and bindings is not automated.create,delete
= This combines automation of creation and deletion as described above.
Regardless of the parameter manageQueue
, the creation, deletion and scaling of the according consumer is always done automatically.
Accessing Payload
It's also possible to send message with additional data: Which is called the payload.
Let's assume, the data structure of a sales order was defined by the integration team and looks like this:
This is the payload of a message. Such a payload will be automatically provided in the pipeline body to all commands below message.receive
.
So let's use this payload in order to send more information with our email, like this:
Non JSON payload
In case you're sending a message in a non JSON format, for example as a simple text string Hello World!
, it will be internally wrapped into a JSON envelope using this structure:
Using Wildcard Keys
In some situations you probably would like to listen to all messages of a certain type. So lets assume you would like to be informed about any sales order changes in the sales department and let's assume the integration team publishes all changes to a message key structure like this:
sales.order.created
sales.order.closed
sales.lead.created
sales.lead.converted
sales.incident.created
Now in case you would like to listen to all messages according to sales orders, but not the other ones, you can use a key pattern like this: sales.order.*
. Note the asterisk *
which indicates that you're interested in any message starting with sales.order
. The asterisk means anything of the third section. So you will be informed about:
sales.order.created
sales.order.closed
But you won't be informed about:
sales.lead.created
sales.lead.converted
sales.incident.created
This is how the pipeline could look like for example to listen to all sales order actions:
And in this example we listen to all messages which are related to create something in the sales department:
Furthermore, you can use the hash #
in order to indicate any level. So for example if we would like to listen to anything inside the sales department, we could use a pipeline like this:
The hash #
matches any level of the message key regardless of the number of periods (sections) in it.
Batched Messages
Sometimes it is required to execute the message listener only for a bunch of messages, not for each single one. This is useful for example for performance reasons in case you have a lot of tiny messages or in case the target accepts only groups of messages. For this you can use the messaging batching feature of PIPEFORCE using these parameters on the message.receive
command:
maxBatchSize
: Buffers messages up to the given size in bytes and then processes this pipeline with all of these messages. The messages will be provided as an array to the body. The maximum size is 200KB (204800).maxBatchItems
: Buffers the amount of messages up to the given number and then processes this pipeline with all of these messages. The messages will be provided as array to the body.
If both parameters are given, the one which matches first is considered.
The messages in the buffer are not acknowledged until they got delivered to the pipeline.
Auto and manual ACK, NACK and DROP
SINCE VERSION 9.0
By default, any message received using the trigger command message.receive
will automatically be acknowledged (ACK) after pipeline execution, independently if execution was successful or has been failed for any reason.
In some situations you probably want to have a different behaviour and do a message ACK, NACK or DROP manually by yourself inside the pipeline before its execution has ended. To do so, you have to set the parameter autoack
to false
on the message.receive
command:
Then, inside your pipeline you can then use these commands to manage the message:
message.ack
= Sends a message ACK so the current message will be removed from the queue.message.nack
= Sends a message NACK so the current message will be returned to queue and the pipeline will be re-executed again after a while.message.drop
= Sends a message DROP so the current message will be removed from the queue and will be added to the default dead letter queue for this pipeline.
Retry parameters
Since version 10
Additionally when parameter autoack
is set to false
, you can configure the behaviour using these parameters on command message.receive
:
retryAttempts
= How many retry attempts in backend in case of an error in a pipeline, before the action defined byfailureAction
will be applied to the message. Default value is1
.retryDelay
= How long to wait in milliseconds after a pipeline error happened, before next attempt. Default is0
.failureAction
= Defines the action on the message in case an error happened in the pipeline. Possible values aredrop
andnack
(default).
Note: Even if you set autoack
to false
, a message ACK
will be send at the very end of a successful pipeline execution if not done before manually using message.ack
. The similar is true in case of an error in the pipeline: In this case the action defined by failureAction
will be applied on the message. This is to keep in sync with the RabbitMQ specification, as it requires an answer with ACK
, NACK
, or DROP
after a while to keep the connection healthy.
Sending Messages
To send messages in a pipeline, you can use the command message.send.
Here is an example:
This example sends a new message with key sales.order.created
and the given JSON document as payload to the default exchange. By default, the content type of the payload is application/json
. You can change this by using the parameter contentType
. In case the payload is different from a JSON document, it will be automatically wrapped into a JSON envelope to ensure that consumers can always expect a valid JSON. The structure of this JSON envelope looks like this example:
The field status
status indicates whether the value is OK or not. In case there was some problem with the value (for example too big, conversion error or similar), this will be indicated here. The status code is similar to the HTTP status codes. In cases of an error status, also the field statusMessage
is used which has more information about the error occured.
The field valueType
specifies the content type of the value
field which can be one of the default JSON types like:
string
object
integer
number
boolean
array
Or any specific content type, defined by the contentType
parameter of the command message.send
.
The payload
can also be set to null
or empty string in case the message has no payload at all. In case the parameter payload
is missing, the current body content of the pipeline is used as payload.