Nacos注册中心:发现篇

经过上一节的努力,我们已经将RPC服务成功的注册到Nacos上了。

我们还是以老生常谈的A调用B为例,B的所有实例B1、B2…都在Nacos上了。我们本节要实现的,都客户端,也就是A的部分。

老规矩,先引入依赖:

  1. implementation 'com.alibaba.nacos:nacos-client:2.0.3'
  2. implementation 'org.springframework.boot:spring-boot-autoconfigure:2.2.0.RELEASE'

上述除了引入nacos的依赖外,还引入了spring-boot的自动配置包,后续做客户端的自动装配时会用到。

客户端改造

在正式对接Nacos前,我们先对客户端的包做一些改造。

首先,引入一个通用的Grpc客户端实现:

  1. public abstract class HSGrpcClient implements AutoCloseable {
  2. private ManagedChannel channel;
  3. private String ip;
  4. private int port;
  5. public HSGrpcClient(String ip, int port) {
  6. this.ip = ip;
  7. this.port = port;
  8. }
  9. public void init() {
  10. channel = ManagedChannelBuilder
  11. .forTarget(ip + ":" + port)
  12. .usePlaintext()
  13. .build();
  14. initSub(channel);
  15. }
  16. protected abstract void initSub(Channel channel);
  17. public void close() throws InterruptedException {
  18. channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
  19. }
  20. }

代码如上所示:

  • HSGrpcClient管理了ManagedChannel,这是用于实际网络通信的连接池。

  • 提供了initStub抽象方法,让子类根据自己的需求,去初始化自己的stub。

  • 实现了AutoCloseable接口,让客户端可以通过close方法自动关闭。

在这个基础上,我们改造之前的具体RPC客户端,如下:

  1. public class HomsDemoGrpcClient extends HSGrpcClient {
  2. private Logger LOG = LoggerFactory.getLogger(HomsDemoGrpcClient.class);
  3. private HomsDemoGrpc.HomsDemoFutureStub futureStub;
  4. /**
  5. * Construct client for accessing HelloWorld server using the existing channel.
  6. */
  7. public HomsDemoGrpcClient(String ip, int port) {
  8. super(ip, port);
  9. }
  10. @Override
  11. protected void initSub(Channel channel) {
  12. futureStub = HomsDemoGrpc.newFutureStub(channel);
  13. }
  14. public Optional<Integer> add(int val1, int val2) {
  15. AddRequest request = AddRequest.newBuilder().setVal1(val1).setVal2(val2).build();
  16. try {
  17. AddResponse response = futureStub.add(request).get();
  18. return Optional.ofNullable(response.getVal());
  19. } catch (Exception e) {
  20. LOG.error("grpc add exception", e);
  21. return Optional.empty();
  22. }
  23. }
  24. }

如上,我们改用了FutureStub,并且将Manage的管理部分,移到了基类中。

SimpleGrpcClientManager的实现

在正式引入Nacos之前,我们先实现一个“看起来没什么营养”的SimpleGrpcClientManager,它可以提供IP、Port直连的客户端管理。

首先是基类:

  1. public abstract class AbstractGrpcClientManager<T extends HSGrpcClient> {
  2. protected Logger LOG = LoggerFactory.getLogger(getClass());
  3. protected volatile CopyOnWriteArrayList<T> clientPools = new CopyOnWriteArrayList<>();
  4. protected Class<T> kind;
  5. public AbstractGrpcClientManager(Class<T> kind) {
  6. this.kind = kind;
  7. }
  8. public Optional<T> getClient() {
  9. if (clientPools.size() == 0) {
  10. return Optional.empty();
  11. }
  12. int pos = ThreadLocalRandom.current().nextInt(clientPools.size());
  13. return Optional.ofNullable(clientPools.get(pos));
  14. }
  15. public abstract void init() throws Exception;
  16. public void shutdown() {
  17. clientPools.forEach(c -> {
  18. try {
  19. shutdown(c);
  20. } catch (InterruptedException e) {
  21. LOG.error("shutdown client exception", e);
  22. }
  23. });
  24. }
  25. protected void shutdown(HSGrpcClient client) throws InterruptedException {
  26. client.close();
  27. }
  28. protected Optional<HSGrpcClient> buildHsGrpcClient(String ip, int port) {
  29. try {
  30. Class[] cArg = {String.class, int.class};
  31. HSGrpcClient client = kind.getDeclaredConstructor(cArg)
  32. .newInstance(ip, port);
  33. client.init();
  34. return Optional.ofNullable(client);
  35. } catch (Exception e) {
  36. LOG.error("build MyGrpcClient exception, ip = "+ ip + " port = "+ port, e);
  37. return Optional.empty();
  38. }
  39. }
  40. }

代码如上,解释一下:

  • clientPools是一组HSGrpcClient对象,即支持同时与多个微服务实例(多组不同的ip和端口)建立连接。在微服务场景下,这一特性尤为重要。
  • 而从每一个HSGrpcClient的视角来看,其内置的ManagedChannel内部实现了连接池。因此针对同一个微服务的ip和端口,我们只需要一个HSGrpcClient的实例即可。

下面,我们看一下基础的、不带服务发现的实现:

  1. package com.coder4.homs.demo.client;
  2. import org.slf4j.Logger;
  3. import org.slf4j.LoggerFactory;
  4. import java.util.Arrays;
  5. import java.util.concurrent.CopyOnWriteArrayList;
  6. /**
  7. * @author coder4
  8. */
  9. public class SimpleGrpcClientManager<T extends HSGrpcClient> extends AbstractGrpcClientManager<T> {
  10. protected Logger LOG = LoggerFactory.getLogger(SimpleGrpcClientManager.class);
  11. private String ip;
  12. private int port;
  13. public SimpleGrpcClientManager(Class<T> kind, String ip, int port) {
  14. super(kind);
  15. this.ip = ip;
  16. this.port = port;
  17. }
  18. public void init() {
  19. // init one client only
  20. HSGrpcClient client = buildHsGrpcClient(ip, port)
  21. .orElseThrow(() -> new RuntimeException("build HsGrpcClient fail"));
  22. clientPools = new CopyOnWriteArrayList(Arrays.asList(client));
  23. }
  24. public static void main(String[] args) throws Exception {
  25. SimpleGrpcClientManager<HomsDemoGrpcClient> manager = new SimpleGrpcClientManager(HomsDemoGrpcClient.class, "127.0.0.1", 5000);
  26. manager.init();
  27. manager.getClient().ifPresent(t -> System.out.println(t.add(1, 2)));
  28. manager.shutdown();
  29. }
  30. }

从上述实现中不难发现:

  • 该实现中,默认只与预先设定的IP和端口,构造一个单独的HSGrpcClient。

  • 由于IP和端口通过外部指定,因此使用了CopyOnWriteArrayList以保证线程安全。

NacosGrpcClientManager的实现

下面,我们着手实现带Nacos服务发现的版本。

  1. package com.coder4.homs.demo.client;
  2. import com.alibaba.nacos.api.naming.NamingFactory;
  3. import com.alibaba.nacos.api.naming.NamingService;
  4. import com.alibaba.nacos.api.naming.listener.NamingEvent;
  5. import com.alibaba.nacos.api.naming.pojo.Instance;
  6. import java.util.ArrayList;
  7. import java.util.List;
  8. import java.util.concurrent.CopyOnWriteArrayList;
  9. /**
  10. * @author coder4
  11. */
  12. public class NacosGrpcClientManager<T extends HSGrpcClient> extends AbstractGrpcClientManager<T> {
  13. protected String serviceName;
  14. protected String nacosServer;
  15. protected NamingService namingService;
  16. public NacosGrpcClientManager(Class<T> kind, String nacosServer, String serviceName) {
  17. super(kind);
  18. this.nacosServer = nacosServer;
  19. this.serviceName = serviceName;
  20. }
  21. @Override
  22. public void init() throws Exception {
  23. namingService = NamingFactory
  24. .createNamingService(nacosServer);
  25. namingService.subscribe(serviceName, e -> {
  26. if (e instanceof NamingEvent) {
  27. NamingEvent event = (NamingEvent) e;
  28. rebuildClientPools(event.getInstances());
  29. }
  30. });
  31. rebuildClientPools(namingService.selectInstances(serviceName, true));
  32. }
  33. private void rebuildClientPools(List<Instance> instanceList) {
  34. ArrayList<HSGrpcClient> list = new ArrayList<>();
  35. for (Instance instance : instanceList) {
  36. buildHsGrpcClient(instance.getIp(), instance.getPort()).ifPresent(c -> list.add(c));
  37. }
  38. CopyOnWriteArrayList<T> oldClientPools = clientPools;
  39. clientPools = new CopyOnWriteArrayList(list);
  40. // destory old ones
  41. oldClientPools.forEach(c -> {
  42. try {
  43. c.close();
  44. } catch (InterruptedException e) {
  45. LOG.error("MyGrpcClient shutdown exception", e);
  46. }
  47. });
  48. }
  49. }

解释如下:

  • 在init方法中,初始化了NamingService,并订阅对应serviceName服务的更新事件。

  • 当第一次,或者有服务更新时,我们会根据最新列表,重建所有的HSGrpcClient

  • 每次重建后,关闭老的HSGrpcClient

为了让上述客户端使用更加方便,我们添加了如下的自动配置:

  1. @Configuration
  2. public class HomsDemoGrpcClientManagerConfiguration {
  3. @Bean(name = "homsDemoGrpcClientManager")
  4. @ConditionalOnMissingBean(name = "homsDemoGrpcClientManager")
  5. @ConditionalOnProperty(name = {"nacos.server"})
  6. public AbstractGrpcClientManager<HomsDemoGrpcClient> nacosManager(
  7. @Value("${nacos.server}") String nacosServer) throws Exception {
  8. NacosGrpcClientManager<HomsDemoGrpcClient> manager =
  9. new NacosGrpcClientManager<>(HomsDemoGrpcClient.class,
  10. nacosServer, HomsDemoConstant.SERVICE_NAME);
  11. manager.init();
  12. return manager;
  13. }
  14. }

如上所示:

  • nacos的server地址由yaml中配置

  • serviceName由client包中的常量文件HomsDemoConstant提供(即homs-demo)

为了让上述自动配置自动生效,我们还需要添加META-INF/spring.factories文件

  1. org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  2. com.coder4.homs.demo.configuration.HomsDemoGrpcClientManagerConfiguration

最后,我们来实验一下服务发现的效果

  1. 启动Server进程,检查Nacos上,应当出现了自动注册的RPC服务。

  2. 开发客户端驱动的项目,引用上述client包、配置yaml中的nacos服务地址

  3. 最后,在客户端驱动项目中,通过Autowired自动装配,代码类似:

  1. @Autowired
  2. private AbstractGrpcClientManager<HomsDemoGrpcClient> homsClientManager;
  3. // Usage
  4. homsClientManager.getClient().ifPresent(client -> client.add(1, 2));

如果一切顺利,会自动发现nacos上已经注册的服务实例,并成功执行rpc调用。