Master Executor: from Zero to Hub

Master Executor: from Zero to Hub - 图1

Cristian @ Jina AI

Master Executor: from Zero to Hub - 图2Sept. 10, 2021

This is a step-by-step walkthrough on how to create your Executors or use existing ones.

We will create a simple logging Executor. It will log the Documents’ information as they reach it, and save these to a file. We will also see how to push our Executor to Jina Hub to use it later.

Set-up & overview

We recommend creating a new python virtual environment to have a clean installation of Jina and prevent dependency clashing.

We can start by installing Jina:

  1. pip install jina

For more information on installing Jina, refer to this page.

Create your Executor

To create your Executor, you just need to run this command in your terminal:

  1. jina hub new

A wizard will ask you some questions about the Executor. For the basic configuration, you will be asked two things:

  • the Executor’s name

  • where it should be saved

For this tutorial, we will call ours RequestLogger. And you can save it wherever you want to have your project. The wizard will ask if you want to have a more advanced configuration, but it is unnecessary for this tutorial.

Logger Executor

Once we followed the wizard, we have our folder structure ready. We can start working with the executor.py. Open that file, and let’s import the following

  1. import os
  2. import time
  3. from typing import Dict
  4. from jina import Executor, DocumentArray, requests
  5. from jina.logging.logger import JinaLogger

Then we create our class that inherits from the Executor base class. We will call ours RequestLogger

Important

You always need to inherit from the Executor class, in order for the class to be properly registered into Jina.

  1. class RequestLogger(Executor):

Our Executor will have two methods: one for the constructor and one for the actual logging:

  1. class RequestLogger(Executor):
  2. def __init__(self, **args, **kwargs):
  3. # Whatever you need for our constructor
  4. def log():
  5. # Whatever we need for our logging

It could be helpful to specify the number of Documents we want to work with, so we pass this directly in the arguments of our constructor

  1. def __init__(self,
  2. default_log_docs: int = 1,
  3. # here you can pass whatever other arguments you need
  4. *args, **kwargs):

Important

You need to do this before writing any custom logic. It’s required in order to register the parent class, which instantiates special fields and methods.

  1. super().__init__(*args, **kwargs)

Now we start creating our constructor method. We set the default_log_docs we got from the arguments:

  1. self.default_log_docs = default_log_docs

For logging, we need to create an instance of JinaLogger. We also need to specify the path where we save our log file.

  1. self.logger = JinaLogger('req_logger')
  2. self.log_path = os.path.join(self.workspace, 'log.txt')

Note

self.workspace will be provided by the Executor parent class.

And finally, we need to create the file, in case it doesn’t exist.

  1. if not os.path.exists(self.log_path):
  2. with open(self.log_path, 'w'): pass

Ok, that’s it for our constructor, by now we should have something like this:

  1. class RequestLogger(Executor): # needs to inherit from Executor
  2. def __init__(self,
  3. default_log_docs: int = 1, # number of documents to log
  4. *args, **kwargs): # *args and **kwargs are required for Executor
  5. super().__init__(*args, **kwargs) # before any custom logic
  6. self.default_log_docs = default_log_docs
  7. self.logger = JinaLogger('req_logger') # create instance of JinaLogger
  8. self.log_path = os.path.join(self.workspace, 'log.txt') # set path to save the log.txt
  9. if not os.path.exists(self.log_path): # check the file doesn't exist already
  10. with open(self.log_path, 'w'): pass

We can start creating our log method now. First of all, we need the @requests decorator. This is to communicate to the Flow when the function will be called and on which endpoint. We use @requests without any endpoint, so we will call our function on every request:

  1. @requests
  2. def log(self,
  3. docs: Optional[DocumentArray],
  4. parameters: Dict,
  5. **kwargs):

It’s important to note the arguments here.

Important

It’s not possible to redefine the interface of the public methods decorated by @requests. You can’t change the name of these arguments. To see exactly which parameters you can use, check here.

If you would like to call your log function only on /index time, you specify the endpoint with on=, like this:

  1. @requests(on='/index')
  2. def log(self,
  3. docs: Optional[DocumentArray],
  4. parameters: Dict,
  5. **kwargs):

If you want more information on how to use this decorator, refer to the documentation. In this example, we want to call our log function on every request, so we don’t specify any endpoint.

Now we can add the logic for our function. First, we will print a line that displays some information. And then, we will save the details from our Documents:

  1. self.logger.info('Request being processed...')
  2. nr_docs = int(parameters.get('log_docs', self.default_log_docs)) # accesing parameters (nr are passed as float due to Protobuf)
  3. with open(self.log_path, 'a') as f:
  4. f.write(f'request at time {time.time()} with {len(docs)} documents:\n')
  5. for i, doc in enumerate(docs):
  6. f.write(f'\tsearching with doc.id {doc.id}. content = {doc.content}\n')
  7. if i + 1 == nr_docs:
  8. break

Here you can set whatever logic you need for your Executor. By now, your code should look like this:

  1. import os
  2. import time
  3. from typing import Dict, Optional
  4. from jina import Executor, DocumentArray, requests
  5. from jina.logging.logger import JinaLogger
  6. class RequestLogger(Executor): # needs to inherit from Executor
  7. def __init__(self,
  8. default_log_docs: int = 1, # your arguments
  9. *args, **kwargs): # *args and **kwargs are required for Executor
  10. super().__init__(*args, **kwargs) # before any custom logic
  11. self.default_log_docs = default_log_docs
  12. self.logger = JinaLogger('req_logger')
  13. self.log_path = os.path.join(self.workspace, 'log.txt')
  14. if not os.path.exists(self.log_path):
  15. with open(self.log_path, 'w'): pass
  16. @requests # decorate, by default it will be called on every request
  17. def log(self, # arguments are automatically received
  18. docs: Optional[DocumentArray],
  19. parameters: Dict,
  20. **kwargs):
  21. self.logger.info('Request being processed...')
  22. nr_docs = int(parameters.get('log_docs', self.default_log_docs)) # accesing parameters (nr are passed as float due to Protobuf)
  23. with open(self.log_path, 'a') as f:
  24. f.write(f'request at time {time.time()} with {len(docs)} documents:\n')
  25. for i, doc in enumerate(docs):
  26. f.write(f'\tsearching with doc.id {doc.id}. content = {doc.content}\n')
  27. if i + 1 == nr_docs:
  28. break

And that’s it. We have an Executor that takes whatever Documents we pass to it and logs them.

Ok, and what now? How can you use this in your app?

Push your Executor to Hub

We could use our Executor directly in our app, but here we will see how to push it to Jina Hub so we can share it with more people, or use it later.

First step is to actually make sure the manifest.yml and config.yml files are still relevant. Check that the data in there still represent you Executor’s purpose.

For this, you need to open a terminal in the folder of your executor.py, so in this case, open a terminal inside the RequestLogger folder. And there you just need to type:

  1. jina hub push --public .

This means you will push your Executor publicly to Jina Hub. The last dot means you will use your current path. Once you run that command, you should see something like this:

../../_images/push-executor.png

Note

Since we pushed our Executor using the --public flag, the only thing we will use is the ID. In this case, it’s zsor7fe6. Refer to Jina Hub usage.

Use your Executor

Let’s create a Jina Flow that can use the Executor we just wrote. Create an app.py in the same folder as RequestLogger. Now open it and import Flow, DocumentArray, Document before we create our `main function:

  1. from jina import Flow, DocumentArray, Document
  2. def main():
  3. # We'll have our Flow here
  4. if __name__ == '__main__':
  5. main()

The Executor we just created logs whatever Documents we pass to it. So we need to create some Documents first. We’ll do that in main()

  1. def main():
  2. docs = DocumentArray()
  3. docs.append(Document(content='I love cats')) # creating documents
  4. docs.append(Document(content='I love every type of cat'))
  5. docs.append(Document(content='I guess dogs are ok'))

We have three Documents in one DocumentArray. Now let’s create a Flow and add the Executor we created. We will reference it by the ID we got when we pushed it (in my case, it was zsor7fe6):

  1. flow = Flow().add(
  2. uses='jinahub+docker://zsor7fe6', # here we choose to use the Executor inside a docker container
  3. uses_with={ # RequestLogger arguments
  4. 'default_log_docs': 3
  5. },
  6. volumes='workspace:/internal_workspace', # mapping local folders to docker instance folders
  7. uses_metas={ # Executor (parent class) arguments
  8. 'workspace': '/internal_workspace', # this should match the above
  9. },
  10. )

This seems like plenty of details, so let’s explain them:

  1. uses='jinahub+docker://zsor7fe6',

Here you use uses= to specify the image of your Executor. This will start a Docker container with the image of the Executor we built and deployed in the previous step. So don’t forget to change the ID to the correct one.

  1. uses_with={ # RequestLogger arguments
  2. 'default_log_docs': 3
  3. },

We need uses_with= to pass the arguments we need. In our case, we have only one argument: default_log_docs. In the constructor of our RequestLogger Executor, we defined the default_log_docs as 1, but we override it here with 3, so 3 will be the new value.

The next line refers to our workspace:

  1. volumes='workspace:/internal_workspace',

Here we are mapping the workspace folder that will be created when we run our app to a folder called internal_workspace in Docker. We do this because our Executor logs the Documents into a file, and we want to save that file on our local disk. If we don’t do that, the information would be saved in the Docker container, and you would need to access that container to see files. To do this, we use volumes= and set it to our internal workspace.

The last part overrides arguments too, but this time for the Executor parent class:

  1. uses_metas={ # Executor (parent class) arguments
  2. 'workspace': '/internal_workspace', # this should match the above
  3. },

In our case, the only argument we want to override is the name of the workspace. If you don’t do this, a folder with the same name of your Executor class (RequestLogger) would be created, and your information would have been saved there. But since we just mounted our workspace with the name internal_workspace in Docker, we need to make a folder with that same name.

Ok, we have our Flow ready with the Executor we deployed previously. We can use it now. Let’s start by indexing the Documents:

  1. with flow as f: # Flow is a context manager
  2. f.post(
  3. on='/index', # the endpoint
  4. inputs=docs, # the documents we send as input
  5. )

The Executor we created doesn’t care about what endpoint is used, so it will perform the same operation no matter what endpoint you specify here. In this example, we set it to on='/index' anyway. Here you could use one for index and another one for query if you need it and your Executor has the proper endpoints.

So far, your code should look like this:

  1. from jina import Flow, DocumentArray, Document
  2. def main():
  3. docs = DocumentArray()
  4. docs.append(Document(content='I love cats')) # creating documents
  5. docs.append(Document(content='I love every type of cat'))
  6. docs.append(Document(content='I guess dogs are ok'))
  7. flow = Flow().add( # provide as class name or jinahub+docker URI
  8. uses='jinahub+docker://7dne55rj',
  9. uses_with={ # RequestLogger arguments
  10. 'default_log_docs': 3
  11. },
  12. volumes='workspace:/internal_workspace', # mapping local folders to docker instance folders
  13. uses_metas={ # Executor (parent class) arguments
  14. 'workspace': '/internal_workspace', # this should match the above
  15. },
  16. )
  17. with flow as f: # Flow is a context manager
  18. f.post(
  19. on='/index', # the endpoint
  20. inputs=docs, # the documents we send as input
  21. )
  22. if __name__ == '__main__':
  23. main()

When you run this, you will see a new workspace folder created with two other folders inside. One called RequestLogger or whatever name you used in your class. And another folder for the sharding, but we won’t talk about that in this tutorial because it’s out of scope. Inside the sharding folder called 0, you will see a log.txt file. And there you will have the 3 Documents with their information.

../../_images/log.png

And that’s it! You created an Executor, pushed it to Jina Hub, and used it in your app.

There are still a lot of concepts to learn. So stay tuned for our following tutorials.

If you have any issues following this tutorial, you can always get support from our Slack community