neutron-server的rpc

这个rpc服务端主要通过neutron.server中主函数中代码执行

  1. neutron_rpc = service.serve_rpc()

方法的实现代码如下

  1. def serve_rpc():
  2. plugin = manager.NeutronManager.get_plugin()
  3. # If 0 < rpc_workers then start_rpc_listeners would be called in a
  4. # subprocess and we cannot simply catch the NotImplementedError. It is
  5. # simpler to check this up front by testing whether the plugin supports
  6. # multiple RPC workers.
  7. if not plugin.rpc_workers_supported():
  8. LOG.debug(_("Active plugin doesn't implement start_rpc_listeners"))
  9. if 0 < cfg.CONF.rpc_workers:
  10. msg = _("'rpc_workers = %d' ignored because start_rpc_listeners "
  11. "is not implemented.")
  12. LOG.error(msg, cfg.CONF.rpc_workers)
  13. raise NotImplementedError
  14. try:
  15. rpc = RpcWorker(plugin)
  16. if cfg.CONF.rpc_workers < 1:
  17. rpc.start()
  18. return rpc
  19. else:
  20. launcher = common_service.ProcessLauncher(wait_interval=1.0)
  21. launcher.launch_service(rpc, workers=cfg.CONF.rpc_workers)
  22. return launcher
  23. except Exception:
  24. with excutils.save_and_reraise_exception():
  25. LOG.exception(_('Unrecoverable error: please check log '
  26. 'for details.'))

其中,RpcWorker(plugin)主要通过调用plugin的方法来创建rpc服务端。

  1. self._servers = self._plugin.start_rpc_listeners()

该方法在大多数plugin中并未被实现,目前ml2支持该方法。

在neutron.plugin.ml2.plugin.ML2Plugin类中,该方法创建了一个topic为topics.PLUGIN的消费rpc。

  1. def start_rpc_listeners(self):
  2. self.endpoints = [rpc.RpcCallbacks(self.notifier, self.type_manager),
  3. agents_db.AgentExtRpcCallback()]
  4. self.topic = topics.PLUGIN
  5. self.conn = n_rpc.create_connection(new=True)
  6. self.conn.create_consumer(self.topic, self.endpoints,
  7. fanout=False)
  8. return self.conn.consume_in_threads()