3rd-Party Custom codec example - gRPC

Rule requirements

The device publishes an arbitrary message to verify that the self-deployed codec service is working normally.

Create a resource Parser gRPC

In the EMQX Dashboard 的 Resource 创建3rd-Party Custom codec example - gRPC - 图1 (opens new window) interface of EMQX, create a Parser gRPC resource using the following parameters:

Create Schema

In the Dashboard3rd-Party Custom codec example - gRPC - 图2 (opens new window) interface of EMQX, create a 3rd-Party Schema using the following parameters:

  1. Name: my_grpc_parser
  2. Codec Type: 3rd-party
  3. Third Party Type: Resources
  4. Resource: my_grpc_parser_resource (select the Parser gRPC resource we created just now)

All other configurations remain default.

Creating rules

Use the Schema you have just created to write the rule SQL statement:

  1. SELECT
  2. schema_encode('my_grpc_parser', payload) as encode_resp,
  3. schema_decode('my_grpc_parser', encode_resp.result) as decode_resp
  4. FROM
  5. "t/#"

This SQL statement first encodes and then decodes the data to verify that the encoding and decoding process is correct:

  • The schema_encode function encodes the contents of the payload field according to the Schema ‘my_grpc_parser’ and stores the result in the variable encode_resp;
  • The schema_decode function decodes the contents of the payload field according to the Schema ‘my_grpc_parser’ and stores the result in the variable decode_resp;

The final filtered result of this SQL statement is the variables encode_resp and decode_resp.

Then add the action using the following parameters:

  • Action Type: Check (debug)

This check action prints the results filtered by the SQL statement to the emqx console (erlang shell).

If the service is started with emqx console, the print will be displayed directly in the console; if the service is started with emqx start, the print will be output to the erlang.log.N file in the log directory, where “N” is an integer, e.g. “erlang.log.1”, “ erlang.log.2”.

Codec server-side code

Once the rules have been created, it is time to simulate the data for testing. Therefore, the first thing you need to do is write your own codec service.

The following code implements an gRPC codec service using the Python language. For simplicity, this service just do base64_encode on received string when encoding, and do base64_decode when decoding. See full code3rd-Party Custom codec example - gRPC - 图3 (opens new window) for details.

  1. class Parser(emqx_schema_registry_pb2_grpc.ParserServicer):
  2. def HealthCheck(self, request, context):
  3. return request
  4. def Parse(self, request, context):
  5. if request.type == 1:
  6. print("parser got encode request: ", request)
  7. encoded_d = base64.b64encode(request.data)
  8. return emqx_schema_registry_pb2.ParseResponse(code='SUCCESS', message="ok",
  9. result=encoded_d)
  10. elif request.type == 0:
  11. print("parser got decode request: ", request)
  12. decoded_d = base64.b64decode(request.data)
  13. return emqx_schema_registry_pb2.ParseResponse(code='SUCCESS', message="ok",
  14. result=decoded_d)
  15. def serve():
  16. server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
  17. emqx_schema_registry_pb2_grpc.add_ParserServicer_to_server(
  18. Parser(), server)
  19. server.add_insecure_port('[::]:50051')
  20. server.start()
  21. server.wait_for_termination()
  22. if __name__ == '__main__':
  23. logging.basicConfig()
  24. serve()

Run this service:

  1. pip3 install grpcio
  2. pip3 install grpcio-tools
  3. python3 -m grpc_tools.protoc -I./protos --python_out=. --grpc_python_out=. ./protos/emqx_schema_registry.proto
  4. python3 emqx_schema_registry_server.py

Checking rule execution results

Since this example is relatively simple, we’ll use the MQTT Websocket client directly to simulate sending a message on the device side.

  1. In the Dashboard’s Websocket3rd-Party Custom codec example - gRPC - 图4 (opens new window) tools, log in to an MQTT Client and publish a message to “t/1” with the text “hello”.

  2. Check what is printed in the emqx console (erlang shell):

  1. (emqx@127.0.0.1)1> [inspect]
  2. Selected Data: #{<<"decode_resp">> =>
  3. #{code => 'SUCCESS',message => <<"ok">>,
  4. result => <<"hello">>},
  5. <<"encode_resp">> =>
  6. #{code => 'SUCCESS',message => <<"ok">>,
  7. result => <<"aGVsbG8=">>}}
  8. Envs: #{'__bindings__' =>
  9. #{'Id' => <<"inspect_1649928007719256000">>,
  10. 'Params' => #{}},
  11. clientid => <<"mqttjs_4c8818ae">>,event => 'message.publish',
  12. flags => #{dup => false,retain => false},
  13. headers =>
  14. #{peerhost => <<"127.0.0.1">>,properties => #{},
  15. proto_ver => 4,protocol => mqtt,username => <<>>},
  16. id => <<"0005DC99CDA113B6F44200000CEB0001">>,
  17. metadata => #{rule_id => <<"rule:440083">>},
  18. node => 'emqx@127.0.0.1',payload => <<"hello">>,
  19. peerhost => <<"127.0.0.1">>,pub_props => #{},
  20. publish_received_at => 1649928021545,qos => 0,
  21. timestamp => 1649928021545,topic => <<"t/1">>,
  22. username => <<>>}
  23. Action Init Params: #{}

Select Data is the data filtered by the SQL statement, Envs are available environment variables within the rule engine and Action Init Params is the initialization parameters for actions. All three data are in Map format.

The two fields decode_resp and encode_resp in Selected Data correspond to the two ASs in the SELECT statement.

Because decode_resp is the result of encoding and then decoding, it is reverted to the content we sent, “hello”, indicating that the codec plugin is working correctly.