Inter-Protocol Property Conversions
Overview
RabbitMQ is a multi-protocol broker supporting:
- AMQP 1.0 (henceforth AMQP),
- AMQP 0.9.1
- MQTT
- STOMP
- RabbitMQ Streams
Messages can readily be published by one protocol and consumed by others. This necessitates data format conversions between different protocol formats. Prior to RabbitMQ 3.13 all messages were converted into an internal format based on the AMQP 0.9.1 protocol when published to RabbitMQ. This approach often resulted in unnecessary conversions and/or data fidelity issues.
In current RabbitMQ versions, the approach has changed for the AMQP, AMQP 0.9.1 and MQTT protocols in that messages will always be stored in their original format and only converted if consumed by a protocol different from the originating protocol. This has the advantage that no protocol specific information is lost when the originating and the consuming protocols are the same.
The only exception to this rule are streams: No matter which protocol published, streams internally store their messages as AMQP encoded data. Hence, when for example publishing using AMQP 0.9.1 a conversion to AMQP takes place.
All conversions have been re-written and formalised. This document aims to capture the conversions rules that are implemented so that application developers can look back at this to understand how their messages will be transformed in their multi-protocol messaging applications.
Conventions
shortstr
: (condition) a string that is smaller than 256 bytes and contains only valid
UTF-8 encoded data with no NUL
(binary zero octet) characters.
*
: is read as "any" (field/type etc).
Conditions can be read to be evaluated in the order they are written. For example
an AMQP message_id
of type utf8
with more than 255 bytes will fail the first
shortstr
condition and then fall through to the next line.
If there is no matching line then the data will not be converted (and therefore not be
available to the consuming application).
AMQP 1.0 -> AMQP 0.9.1
A conversion from AMQP to AMQP 0.9.1 takes place if the message is consumed with AMQP 0.9.1 and the message either
- was published with AMQP 1.0, or
- is consumed from a stream.
AMQP section | AMQP field | AMQP type | Condition | AMQP 0.9.1 section | AMQP 0.9.1 Field | AMQP 0.9.1 Type | Comment |
---|---|---|---|---|---|---|---|
header | durable | boolean | properties | delivery_mode | ubyte | 2 = durable, 1 = transient | |
header | priority | ubyte | properties | priority | ubyte | ||
header | ttl | milliseconds (uint) | properties | expiration | shortstr | milliseconds converted to string value | |
properties | message_id | utf8 | shortstr | properties | message_id | shortstr | |
properties | message_id | utf8 | > 256 bytes | properties | headers, Key: x-message-id | longstr | |
properties | message_id | uuid (16 byte binary) | properties | message_id | shortstr | converted to text e.g. "urn:uuid:550e8400-e29b-41d4-a716-446655440000" | |
properties | message_id | ulong | properties | message_id | shortstr | converted to text | |
properties | message_id | binary | properties | headers, Key: x-message-id | binary | ||
properties | user_id | binary | shortstr | properties | user_id | shortstr | |
properties | to | address | |||||
properties | subject | utf8 | |||||
properties | reply_to | address | shortstr | properties | reply_to | shortstr | |
properties | correlation_id | utf8 | shortstr | properties | correlation_id | shortstr | |
properties | correlation_id | utf8 | > 256 bytes | properties | headers, Key: x-correlation-id | longstr | |
properties | correlation_id | uuid (16 byte binary) | properties | correlation_id | shortstr | converted to text representation, eg. "urn:uuid:550e8400-e29b-41d4-a716-446655440000" | |
properties | correlation_id | ulong | properties | correlation_id | shortstr | Converted to text representation of number | |
properties | correlation_id | binary | properties | headers, Key: x-correlation-id | binary | ||
properties | content_type | symbol | properties | content_type | shortstr | ||
properties | content_encoding | symbol | properties | content_encoding | shortstr | ||
properties | absolute_expiry_time | timestamp | |||||
properties | creation_time | timestamp | properties | timestamp | timestamp | converted to seconds | |
properties | group_id | utf8 | shortstr | properties | app_id | shortstr | |
properties | group_sequence | sequence-no | |||||
properties | reply_to_group_id | utf8 | |||||
application properties | * | * (see type conversions) | Key: shortstr | properties | headers | * | |
message annotations | * (symbol - x-cc*) | Key: x-cc | properties | headers, Key: "CC" | longstr | ||
message annotations | * (symbol - x-*) | * (see type conversions) | Key: shortstr & x-* | properties | headers | * | Typically this means x- headers |
data | data | single data section | payload | binary | plain binary extracted from data section | ||
data (multiple) | * | * | multiple data sections | payload | AMQP 1.0 encoded binary | properties.type will be set to "amqp-1.0" | |
amqp.value | * | * | payload | AMQP 1.0 encoded binary | properties.type will be set to "amqp-1.0" | ||
amqp.sequence | * | * | payload | AMQP 1.0 encoded binary | properties.type will be set to "amqp-1.0" |
Type Conversions
AMQP 1.0 type | Condition | AMQP 0.9.1 type | Comment |
---|---|---|---|
utf8 (string) | longstr | RabbitMQ does not support shortstr header values so all utf8 inputs are converted to longstr unless it is a field in the basic properties | |
binary | binary | ||
long | long | ||
ulong | long | risk of overflow | |
ubyte | unsignedbyte | ||
short | short | ||
ushort | usignedshort | ||
uint | unsignedint | ||
int | signedint | ||
double | double | ||
float | float | ||
boolean | bool | ||
timestamp | timestamp (seconds) | value divided by 1000 | |
byte | byte | ||
null | void | ||
list | array | ||
map | table | ||
symbol | longstr |
AMQP 0.9.1 -> AMQP 1.0
AMQP 0.9.1 section | AMQP 0.9.1 field | AMQP 0.9.1 type | Condition | AMQP 1.0 section | AMQP 1.0 Field | AMQP 1.0 Type | Comment |
---|---|---|---|---|---|---|---|
basic properties | message_id | shortstr | valid urn uuid | properties | message_id | uuid | |
basic properties | message_id | shortstr | properties | message_id | utf8 | ||
basic properties | correlation_id | shortstr | valid urn uuid | properties | correlation_id | uuid | |
basic properties | correlation_id | shortstr | properties | correlation_id | utf8 | ||
basic properties | user_id | shortstr | properties | user_id | binary | ||
basic properties | expiration | shortstr | if convertible to numeric type | header | ttl | uint | |
basic properties | type | shortstr | message annotations | x-basic-type | utf8 | ||
basic properties | reply_to | shortstr | properties | reply_to | utf8 | ||
basic properties | app_id | shortstr | properties | group-id | utf8 | ||
basic properties | timestamp | timestamp (seconds) | properties | creation_time | timestamp | converted to ms | |
basic properties | content-type | shortstr | properties | content-type | symbol | ||
basic properties | content-encoding | shortstr | properties | content-encoding | symbol | ||
basic properties | delivery-mode | octet | header | durable | boolean | previously in x-basic-delivery-mode | |
basic properties | priority | octet | header | priority | ubyte | previously in x-basic-priority | |
basic.properties | headers | Key=x-amqp-1.0-properties | properties | - | - | legacy 1.0 plugin headers | |
basic.properties | headers | Key=x-amqp-1.0-application-properties | application properties | - | - | legacy 1.0 plugin headers | |
basic.properties | headers | Key=x-amqp-1.0-message-annotations | message annotations | - | - | legacy 1.0 plugin headers | |
basic.properties | headers | Key=x-reply-to-topic | properties | reply_to | utf8 | "/topic/" RK | |
basic properties | headers | * | Key begins with "x-" | message annotations | Key | * | see type conversion rules |
basic properties | headers | table | Value not array or table | application properties | Key | * | see type conversion rules |
basic properties | headers | table | Value is array or table | - | - | - | not converted |
payload | - | binary | data | - | data |
Type Conversions
AMQP 0.9.1 type | Condition | AMQP 1.0 type | Comment |
---|---|---|---|
longstr | shortstr | utf8 | Performance / accuracy trade-off |
longstr | binary | ||
long | long | ||
ubyte | ubyte | ||
short | short | ||
ushort | ushort | ||
uint | uint | ||
int | int | ||
double | double | ||
float | float | ||
bool | boolean | ||
binary | binary | ||
timestamp | timestamp | Converted from seconds to milliseconds | |
byte | byte | ||
void | null | ||
array | list | ||
table | map | ||
table.key | x-* header | symbol | |
table.key | utf8 | ||
table.value | * | converted according to this table |
MQTT 5.0 -> AMQP 1.0
MQTT 5.0 Section | MQTT Field | MQTT 5.0 Type | Condition | AMQP 1.0 Section | AMQP 1.0 Field | AMQP 1.0 Type | Comment |
---|---|---|---|---|---|---|---|
Fixed Header | Dup | Bits | Setting to header first-acquirer does not make sense because the DUP flag only applies from client to server (consumption from server to client is determined by the Redelivered flag sent from the queues) | ||||
Fixed Header | QoS | Bits | header | durable | boolean | durable is true if QoS > 0 | |
Fixed Header | Retain | Bits | |||||
Variable Header | Payload Format Indicator | Bits | see row "Payload" with condition "Payload Format Indicator set" | ||||
Variable Header | Message Expiry Interval | uint | header | ttl | milliseconds | seconds to milliseconds | |
Variable Header | Topic Alias | ushort | |||||
Variable Header | Response Topic | utf8 | properties | reply_to | utf8 | Translate MQTT topic to AMQP routing key (RK). Set reply-to address to "/exchange/" X "/" RK. | |
Variable Header | Correlation Data | binary | urn:uuid | properties | correlation_id | uuid | |
Variable Header | Correlation Data | binary | properties | correlation_id | binary | ||
Variable Header | User Property | utf8 | Key begins with "x-" and key is ASCII | message annotation | value: utf8 | Key is of type symbol | |
Key does not begin with "x-" | application properties | value: utf8 | Key is of type utf8 | ||||
Variable Header | Subscription Identifier | uint | |||||
Variable Header | Content Type | utf8 | valid ASCII | properties | content_type | symbol | MQTT content type is UTF-8 whereas AMQP 1.0 content type is only ASCII |
Payload | Payload Format Indicator unset | data | data | ||||
Payload | Payload Format Indicator set | amqp.value | utf8 | If the Payload Format Indicator is set, convert the MQTT payload to a string (i.e. single AMQP value section) because an AMQP string is UTF-8 encoded |
Type Conversions
MQTT 5.0 Type | Condition | AMQP 1.0 Type | Comment |
---|---|---|---|
Bits | boolean | only selected flags are converted | |
ushort | ushort | ||
uint | uint | ||
utf8 | utf8 | ||
binary data | utf8 | utf8 | |
binary data | binary | ||
utf8 string pairs | map | de-duplicate keys, but maintains order |