Custom codec example - Protobuf

Rule requirements

The device publishes a binary message encoded using Protobuf, which needs to be matched by the rule engine and then republished to the topic associated with the “name” field. The format of the topic is “person/${name}”.

For example, republish a message with the “name” field as “Shawn” to the topic “person/Shawn”.

Create schema

In the DashboardCustom codec example - Protobuf - 图1 (opens new window) interface of EMQX, create a Protobuf Schema using the following parameters:

  1. Name: protobuf_person

  2. Codec Type: protobuf

  3. Schema: The following protobuf schema defines a Person message.

  1. message Person {
  2. required string name = 1;
  3. required int32 id = 2;
  4. optional string email = 3;
  5. }

Creating rules

Use the Schema you have just created to write the rule SQL statement:

  1. SELECT
  2. schema_decode('protobuf_person', payload, 'Person') as person, payload
  3. FROM
  4. "t/#"
  5. WHERE
  6. person.name = 'Shawn'

The key point here is schema_decode('protobuf_person', payload, 'Person'):

  • The schema_decode function decodes the contents of the payload field according to the Schema ‘protobuf_person’;
  • as person stores the decoded value in the variable “person”;
  • The last argument Person specifies that the message type in the payload is the ‘Person’ type defined in the protobuf schema.

Then add the action using the following parameters:

  • Action Type: Message republishing
  • Destination Topic: person/${person.name}
  • Message Content Template: ${person}

This action sends the decoded “person” to the topic person/${person.name} in JSON format. ${person.name} is a variable placeholder that will be replaced at runtime with the value of the “name” field in the message content.

Device side code

Once the rules have been created, it is time to simulate the data for testing.

The following code uses the Python language to fill a Person message and encode it as binary data, then sends it to the “t/1” topic. See full codeCustom codec example - Protobuf - 图2 (opens new window) for details.

  1. def publish_msg(client):
  2. p = person_pb2.Person()
  3. p.id = 1
  4. p.name = "Shawn"
  5. p.email = "liuxy@emqx.io"
  6. message = p.SerializeToString()
  7. topic = "t/1"
  8. print("publish to topic: t/1, payload:", message)
  9. client.publish(topic, payload=message, qos=0, retain=False)

Checking rule execution results

  1. In the Dashboard’s WebsocketCustom codec example - Protobuf - 图3 (opens new window) tools, log in to an MQTT Client and subscribe to “person/#”.

  2. Install the python dependency and execute the device-side code:

  1. $ pip3 install protobuf
  2. $ pip3 install paho-mqtt
  3. $ python3 ./pb2_mqtt.py
  4. Connected with result code 0
  5. publish to topic: t/1, payload: b'\n\x05Shawn\x10\x01\x1a\rliuxy@emqx.io'
  6. t/1 b'\n\x05Shawn\x10\x01\x1a\rliuxy@emqx.io'
  1. Check that a message with the topic person/Shawn is received on the Websocket side:
  1. {"email":"liuxy@emqx.io","id":1,"name":"Shawn"}