InUseStateAggregator 的工作方式

上一节发现空闲模式的进入和退出是由 InUseStateAggregator 来进行控制的,继续看 InUseStateAggregator 是如何工作的。

类InUseStateAggregator

InUseStateAggregator,字面意思是”正在使用的状态聚合器”,javadoc如此描述:

Aggregates the in-use state of a set of objects.
聚合一个对象集合的正在使用的状态

这是一个grpc的内部抽象类:

  1. package io.grpc.internal;
  2. abstract class InUseStateAggregator<T> {}

实现原理

类InUseStateAggregator的实现原理很直白,就是每个要使用这个聚合器的调用者都要存进来一个对象(就是范型T),然后用完之后再取出来,这样就可以通过保存对象的数量变化判断开始使用(从无到有)或者已经不再使用(从有到无):

  1. private final HashSet<T> inUseObjects = new HashSet<T>();
  2. final void updateObjectInUse(T object, boolean inUse) {
  3. synchronized (getLock()) {
  4. // 记录更新前的原始数量
  5. int origSize = inUseObjects.size();
  6. if (inUse) {
  7. // inUse为true,表示增加正在使用的状态,保存对象
  8. inUseObjects.add(object);
  9. if (origSize == 0) {
  10. // 如果原始数量为0,表示从无到有,此时回调抽象方法 handleInUse()
  11. handleInUse();
  12. }
  13. } else {
  14. // inUse为false,表示检查正在使用的状态,删除对象
  15. boolean removed = inUseObjects.remove(object);
  16. if (removed && origSize == 1) {
  17. // 如果删除成功并且原始使用为1,表示从有到无,此时回调抽象方法 handleNotInUse()
  18. handleNotInUse();
  19. }
  20. }
  21. }
  22. }

isInUse() 函数通过检查是否有保存的对象来判断是否正在使用:

  1. final boolean isInUse() {
  2. synchronized (getLock()) {
  3. return !inUseObjects.isEmpty();
  4. }
  5. }

状态变更回调

当状态变更时,回调这两个抽象方法:

  1. handleInUse(): 当被聚合的使用中的状态被修改为 true 时调用,这意味着至少一个对象正在使用中。

    1. abstract void handleInUse();
  2. handleNotInUse(): 当被聚合的使用中的状态被修改为 false 时调用,这意味着没有对象正在使用中。

    1. abstract void handleNotInUse();

ManagedChannelImpl中的实现

回顾 InUseStateAggregator 定义

回顾 ManagedChannelImpl 中对 inUseStateAggregator 的定义:

  1. final InUseStateAggregator<Object> inUseStateAggregator =
  2. new InUseStateAggregator<Object>() {
  3. ......
  4. void handleInUse() {
  5. // 当状态变更为使用中时,退出空闲模式
  6. exitIdleMode();
  7. }
  8. void handleNotInUse() {
  9. // 当状态变更为没有人使用时,重新安排空闲timer以便在稍后进入空闲模式
  10. rescheduleIdleTimer();
  11. }
  12. };

调用 InUseStateAggregator

在ManagedChannelImpl中,当 transport 状态变更时就会调用 InUseStateAggregator:

  1. final TransportManager<ClientTransport> tm = new TransportManager<ClientTransport>() {
  2. ......
  3. ts = new TransportSet(......, new TransportSet.Callback() {
  4. ......
  5. public void onInUse(TransportSet ts) {
  6. // 当 transportset 的状态变更为 InUse 时
  7. inUseStateAggregator.updateObjectInUse(ts, true);
  8. }
  9. public void onNotInUse(TransportSet ts) {
  10. // 当 transportset 的状态变更为 NotInUse 时
  11. inUseStateAggregator.updateObjectInUse(ts, false);
  12. }
  13. }
  14. }

还有 InterimTransportImpl:

  1. private class InterimTransportImpl implements InterimTransport<ClientTransport> {
  2. public void transportTerminated() {
  3. ......
  4. inUseStateAggregator.updateObjectInUse(delayedTransport, false);
  5. }
  6. @Override public void transportInUse(boolean inUse) {
  7. inUseStateAggregator.updateObjectInUse(delayedTransport, inUse);
  8. }
  9. }

注: Transport 的详细内容不再继续展开了。