3rd-Party Custom codec example - HTTP

Rule requirements

The device publishes an arbitrary message to verify that the self-deployed codec service is working normally.

Create Schema

In the Dashboard3rd-Party Custom codec example - HTTP - 图1 (opens new window) interface of EMQX, create a 3rd-Party Schema using the following parameters:

  1. Name: my_parser
  2. Codec Type: 3rd-party
  3. Third Party Type: HTTP
  4. URL: http://127.0.0.1:9003/parser
  5. Codec Configuration: xor

All other configurations remain default.

Item 5 (i.e. codec configuration) above is optional and is a string, the content of which is related to the service of the codec service.

Creating rules

Use the Schema you have just created to write the rule SQL statement:

  1. SELECT
  2. schema_encode('my_parser', payload) as encoded_data,
  3. schema_decode('my_parser', encoded_data) as decoded_data
  4. FROM
  5. "t/#"

This SQL statement first encodes and then decodes the data to verify that the encoding and decoding process is correct:

  • The schema_encode function encodes the contents of the payload field according to the Schema ‘my_parser’ and stores the result in the variable encoded_data;
  • The schema_decode function decodes the contents of the payload field according to the Schema ‘my_parser’ and stores the result in the variable decoded_data;

The final filtered result of this SQL statement is the variables encoded_data and decoded_data.

Then add the action using the following parameters:

  • Action Type: Check (debug)

This check action prints the results filtered by the SQL statement to the emqx console (erlang shell).

If the service is started with emqx console, the print will be displayed directly in the console; if the service is started with emqx start, the print will be output to the erlang.log.N file in the log directory, where “N” is an integer, e.g. “erlang.log.1”, “ erlang.log.2”.

Codec server-side code

Once the rules have been created, it is time to simulate the data for testing. Therefore, the first thing you need to do is write your own codec service.

The following code implements an HTTP codec service using the Python language. For simplicity, this service provides two simple ways of coding and decoding (encryption and decryption). See full code3rd-Party Custom codec example - HTTP - 图2 (opens new window) for details.

  • xor
  • Character substitution
  1. def xor(data):
  2. """
  3. >>> xor(xor(b'abc'))
  4. b'abc'
  5. >>> xor(xor(b'!}~*'))
  6. b'!}~*'
  7. """
  8. length = len(data)
  9. bdata = bytearray(data)
  10. bsecret = bytearray(secret * length)
  11. result = bytearray(length)
  12. for i in range(length):
  13. result[i] = bdata[i] ^ bsecret[i]
  14. return bytes(result)
  15. def subst(dtype, data, n):
  16. """
  17. >>> subst('decode', b'abc', 3)
  18. b'def'
  19. >>> subst('decode', b'ab~', 1)
  20. b'bc!'
  21. >>> subst('encode', b'def', 3)
  22. b'abc'
  23. >>> subst('encode', b'bc!', 1)
  24. b'ab~'
  25. """
  26. adata = array.array('B', data)
  27. for i in range(len(adata)):
  28. if dtype == 'decode':
  29. adata[i] = shift(adata[i], n)
  30. elif dtype == 'encode':
  31. adata[i] = shift(adata[i], -n)
  32. return bytes(adata)

Run this service:

  1. $ pip3 install flask
  2. $ python3 http_parser_server.py
  3. * Serving Flask app "http_parser_server" (lazy loading)
  4. * Environment: production
  5. WARNING: This is a development server. Do not use it in a production deployment.
  6. Use a production WSGI server instead.
  7. * Debug mode: off
  8. * Running on http://127.0.0.1:9003/ (Press CTRL+C to quit)

Checking rule execution results

Since this example is relatively simple, we’ll use the MQTT Websocket client directly to simulate sending a message on the device side.

  1. In the Dashboard’s Websocket3rd-Party Custom codec example - HTTP - 图3 (opens new window) tools, log in to an MQTT Client and publish a message to “t/1” with the text “hello”.

  2. Check what is printed in the emqx console (erlang shell):

  1. (emqx@127.0.0.1)1> [inspect]
  2. Selected Data: #{decoded_data => <<"hello">>,
  3. encoded_data => <<9,4,13,13,14>>}
  4. Envs: #{event => 'message.publish',
  5. flags => #{dup => false,retain => false},
  6. from => <<"mqttjs_76e5a35b">>,
  7. headers =>
  8. #{allow_publish => true,
  9. peername => {{127,0,0,1},54753},
  10. username => <<>>},
  11. id => <<0,5,146,30,146,38,123,81,244,66,0,0,62,117,0,1>>,
  12. node => 'emqx@127.0.0.1',payload => <<"hello">>,qos => 0,
  13. timestamp => {1568,34882,222929},
  14. topic => <<"t/1">>}
  15. Action Init Params: #{}

Select Data is the data filtered by the SQL statement, Envs are available environment variables within the rule engine and Action Init Params is the initialization parameters for actions. All three data are in Map format.

The two fields decoded_data and encoded_data in Selected Data correspond to the two ASs in the SELECT statement. Because decoded_data is the result of encoding and then decoding, it is reverted to the content we sent, “hello”, indicating that the codec plugin is working correctly.