Integrating with the asynchronous Workflow framework

Server

You can follow the detailed example below:

  • Echo RPC sends an HTTP request to the upstream modules when it receives the request.
  • After the request to the upstream modules is completed, the server populates the body of HTTP response into the message of the response and send a reply to the client.
  • We don’t want to block/occupy the handler thread, so the request to the upstream must be asynchronous.
  • First, we can use WFTaskFactory::create_http_task() of the factory of Workflow to create an asynchronous http_task.
  • Then, we use ctx->get_series() of the RPCContext to get the SeriesWork of the current ServerTask.
  • Finally, we use the push_back() interface of the SeriesWork to append the http_task to the SeriesWork.
  1. class ExampleServiceImpl : public Example::Service
  2. {
  3. public:
  4. void Echo(EchoRequest *request, EchoResponse *response, RPCContext *ctx) override
  5. {
  6. auto *http_task = WFTaskFactory::create_http_task("https://www.sogou.com", 0, 0,
  7. [request, response](WFHttpTask *task) {
  8. if (task->get_state() == WFT_STATE_SUCCESS)
  9. {
  10. const void *data;
  11. size_t len;
  12. task->get_resp()->get_parsed_body(&data, &len);
  13. response->mutable_message()->assign((const char *)data, len);
  14. }
  15. else
  16. response->set_message("Error: " + std::to_string(task->get_error()));
  17. printf("Server Echo()\nget_req:\n%s\nset_resp:\n%s\n",
  18. request->DebugString().c_str(),
  19. response->DebugString().c_str());
  20. });
  21. ctx->get_series()->push_back(http_task);
  22. }
  23. };

Client

You can follow the detailed example below:

  • We send two requests in parallel. One is an RPC request and the other is an HTTP request.
  • After both requests are finished, we initiate a calculation task again to calculate the sum of the squares of the two numbers.
  • First, use create_Echo_task() of the RPC Client to create an rpc_task, which is an asynchronous RPC network request.
  • Then, use WFTaskFactory::create_http_task and WFTaskFactory::create_go_task in the the factory of Workflow to create an asynchronous network task http_task and an asynchronous computing task calc_task respectively.
  • Finally, use the serial-parallel graph to organize three asynchronous tasks, in which the multiplication sign indicates parallel tasks and the greater than sign indicates serial tasks and then execute start().
  1. void calc(int x, int y)
  2. {
  3. int z = x * x + y * y;
  4. printf("calc result: %d\n", z);
  5. }
  6. int main()
  7. {
  8. Example::SRPCClient client("127.0.0.1", 1412);
  9. auto *rpc_task = client.create_Echo_task([](EchoResponse *response, RPCContext *ctx) {
  10. if (ctx->success())
  11. printf("%s\n", response->DebugString().c_str());
  12. else
  13. printf("status[%d] error[%d] errmsg:%s\n",
  14. ctx->get_status_code(), ctx->get_error(), ctx->get_errmsg());
  15. });
  16. auto *http_task = WFTaskFactory::create_http_task("https://www.sogou.com", 0, 0, [](WFHttpTask *task) {
  17. if (task->get_state() == WFT_STATE_SUCCESS)
  18. {
  19. std::string body;
  20. const void *data;
  21. size_t len;
  22. task->get_resp()->get_parsed_body(&data, &len);
  23. body.assign((const char *)data, len);
  24. printf("%s\n\n", body.c_str());
  25. }
  26. else
  27. printf("Http request fail\n\n");
  28. });
  29. auto *calc_task = WFTaskFactory::create_go_task(calc, 3, 4);
  30. EchoRequest req;
  31. req.set_message("Hello, sogou rpc!");
  32. req.set_name("1412");
  33. rpc_task->serialize_input(&req);
  34. ((*http_task * rpc_task) > calc_task).start();
  35. pause();
  36. return 0;
  37. }