Python SDK for Portable Plugin

By using Python SDK for portable plugins, user can develop portable plugins with python language. The Python SDK provides APIs for the source, sink and function interfaces. Additionally, it provides a plugin start function as the execution entry point to define the plugin and its symbols.

To run python plugin, there are two prerequisites in the runtime environment:

  1. Install Python 3.x environment.
  2. Install nng and ekuiper package by pip install nng ekuiper.

By default, the eKuiper portable plugin runtime will run python script with python userscript.py. If users have multiple python instance or an alternative python executable command, they can specify the python command in the configuration file.

Development

The process is the same: develop the symbols and then develop the main program. Python SDK provides the similar source, sink and function interfaces in python language.

Source interface:

  1. class Source(object):
  2. """abstract class for eKuiper source plugin"""
  3. @abstractmethod
  4. def configure(self, datasource: str, conf: dict):
  5. """configure with the string datasource and conf map and raise error if any"""
  6. pass
  7. @abstractmethod
  8. def open(self, ctx: Context):
  9. """run continuously and send out the data or error with ctx"""
  10. pass
  11. @abstractmethod
  12. def close(self, ctx: Context):
  13. """stop running and clean up"""
  14. pass

Sink interface:

  1. class Sink(object):
  2. """abstract class for eKuiper sink plugin"""
  3. @abstractmethod
  4. def configure(self, conf: dict):
  5. """configure with conf map and raise error if any"""
  6. pass
  7. @abstractmethod
  8. def open(self, ctx: Context):
  9. """open connection and wait to receive data"""
  10. pass
  11. @abstractmethod
  12. def collect(self, ctx: Context, data: Any):
  13. """callback to deal with received data"""
  14. pass
  15. @abstractmethod
  16. def close(self, ctx: Context):
  17. """stop running and clean up"""
  18. pass

Function interface:

  1. class Function(object):
  2. """abstract class for eKuiper function plugin"""
  3. @abstractmethod
  4. def validate(self, args: List[Any]):
  5. """callback to validate against ast args, return a string error or empty string"""
  6. pass
  7. @abstractmethod
  8. def exec(self, args: List[Any], ctx: Context) -> Any:
  9. """callback to do execution, return result"""
  10. pass
  11. @abstractmethod
  12. def is_aggregate(self):
  13. """callback to check if function is for aggregation, return bool"""
  14. pass

Users need to create their own source, sink and function by implement these abstract classes. Then create the main program and declare the instantiation functions for these extensions like below:

  1. if __name__ == '__main__':
  2. c = PluginConfig("pysam", {"pyjson": lambda: PyJson()}, {"print": lambda: PrintSink()},
  3. {"revert": lambda: revertIns})
  4. plugin.start(c)

For the full example, please check the python sdk example.

Package

As python is an interpretive language, we don’t need to build an executable for it. Just specify the main program python file in the plugin json file is ok. For detail, please check packaing.

Deployment requirements

Running python script requires the python environment. Make sure python 3.x are installed in the target environment. If using docker image, we recommend to use tags like lfedge/ekuiper:<tag>-slim-python which have both eKuiper and python environment.