分布式对象

每个 Redisson 对象都绑定到一个 Redis 键(即对象名称),且可以通过 getName 方法读取。

  1. RMap map = redisson.getMap("mymap");
  2. map.getName(); // = mymap

所有和 Redis 键相关的操作被抽象到了 RKeys 接口:

  1. RKeys keys = redisson.getKeys();
  2. Iterable<String> allKeys = keys.getKeys();
  3. Iterable<String> foundedKeys = keys.getKeysByPattern('key*');
  4. long numOfDeletedKeys = keys.delete("obj1", "obj2", "obj3");
  5. long deletedKeysAmount = keys.deleteByPattern("test?");
  6. String randomKey = keys.randomKey();
  7. long keysAmount = keys.count();

TOC

Object

Redisson 分布式的 RBucket 对象可用作任意类型对象的通用容器。

  1. RBucket<AnyObject> bucket = redisson.getBucket("anyObject");
  2. bucket.set(new AnyObject(1));
  3. AnyObject obj = bucket.get();
  4. bucket.trySet(new AnyObject(3));
  5. bucket.compareAndSet(new AnyObject(4), new AnyObject(5));
  6. bucket.getAndSet(new AnyObject(6));

地理位置容器

Redisson 分布式的 RGeo 对象 可用作地理位置项的容器。

  1. RGeo<String> geo = redisson.getGeo("test");
  2. geo.add(new GeoEntry(13.361389, 38.115556, "Palermo"),
  3. new GeoEntry(15.087269, 37.502669, "Catania"));
  4. geo.addAsync(37.618423, 55.751244, "Moscow");
  5. Double distance = geo.dist("Palermo", "Catania", GeoUnit.METERS);
  6. geo.hashAsync("Palermo", "Catania");
  7. Map<String, GeoPosition> positions = geo.pos("test2", "Palermo", "test3", "Catania", "test1");
  8. List<String> cities = geo.radius(15, 37, 200, GeoUnit.KILOMETERS);
  9. Map<String, GeoPosition> citiesWithPositions = geo.radiusWithPosition(15, 37, 200, GeoUnit.KILOMETERS);

BitSet

Redisson 分布式的 RBitSet 对象具有类似于 java.util.BitSet 的结构,
且表示的位向量会根据需要增长。BitSet 的大小由 Redis 限制为 4 294 967 295

  1. RBitSet set = redisson.getBitSet("simpleBitset");
  2. set.set(0, true);
  3. set.set(1812, false);
  4. set.clear(0);
  5. set.addAsync("e");
  6. set.xor("anotherBitset");

AtomicLong

Redisson 分布式的 RAtomicLong 对象具有类似于
java.util.concurrent.atomic.AtomicLong 对象的结构。

  1. RAtomicLong atomicLong = redisson.getAtomicLong("myAtomicLong");
  2. atomicLong.set(3);
  3. atomicLong.incrementAndGet();
  4. atomicLong.get();

AtomicDouble

Redisson 分布式的 AtomicDouble 对象.

  1. RAtomicDouble atomicDouble = redisson.getAtomicDouble("myAtomicDouble");
  2. atomicDouble.set(2.81);
  3. atomicDouble.addAndGet(4.11);
  4. atomicDouble.get();

Topic

Redisson 分布式的 Topic 对象实现了 发布/订阅 机制。

  1. RTopic<SomeObject> topic = redisson.getTopic("anyTopic");
  2. topic.addListener(new MessageListener<SomeObject>() {
  3. @Override
  4. public void onMessage(String channel, SomeObject message) {
  5. //...
  6. }
  7. });
  8. // in other thread or JVM
  9. RTopic<SomeObject> topic = redisson.getTopic("anyTopic");
  10. long clientsReceivedMessage = topic.publish(new SomeObject());

Topic 模式

Redisson Topic pattern 对象可通过指定模式订阅到多个 topics。

  1. // subscribe to all topics by `topic1.*` pattern
  2. RPatternTopic<Message> topic1 = redisson.getPatternTopic("topic1.*");
  3. int listenerId = topic1.addListener(new PatternMessageListener<Message>() {
  4. @Override
  5. public void onMessage(String pattern, String channel, Message msg) {
  6. Assert.fail();
  7. }
  8. });

Topic 模式监听器在重连到 Redis 服务器或者 Redis 服务器故障恢复时自动重新订阅。

Bloom filter

Redisson 分布式的 Bloom filter 对象。

  1. RBloomFilter<SomeObject> bloomFilter = redisson.getBloomFilter("sample");
  2. // initialize bloom filter with
  3. // expectedInsertions = 55000000
  4. // falseProbability = 0.03
  5. bloomFilter.tryInit(55000000L, 0.03);
  6. bloomFilter.add(new SomeObject("field1Value", "field2Value"));
  7. bloomFilter.add(new SomeObject("field5Value", "field8Value"));
  8. bloomFilter.contains(new SomeObject("field1Value", "field8Value"));

HyperLogLog

Redisson 分布式的 HyperLogLog 对象。

  1. RHyperLogLog<Integer> log = redisson.getHyperLogLog("log");
  2. log.add(1);
  3. log.add(2);
  4. log.add(3);
  5. log.count();

远程服务

基于 Redis 的分布式 Java 远程服务使得可以在不同的 Redisson 实例上执行远程接口上的对象方法。
或者说它可以执行 Java 远程调用。
使用 POJO 对象,方法参数和结果对象类型可以是任意的。

RemoteService 提供了两种类型的 RRemoteService 实例:

  • 服务端实例 - 执行远程方法(工作者实例),如:
  1. RRemoteService remoteService = redisson.getRemoteService();
  2. SomeServiceImpl someServiceImpl = new SomeServiceImpl();
  3. // register remote service before any remote invocation
  4. // can handle only 1 invocation concurrently
  5. remoteService.register(SomeServiceInterface.class, someServiceImpl);
  6. // register remote service able to handle up to 12 invocations concurrently
  7. remoteService.register(SomeServiceInterface.class, someServiceImpl, 12);
  • 客户端实例 - 调用远程方法。如:
  1. RRemoteService remoteService = redisson.getRemoteService();
  2. SomeServiceInterface service = remoteService.get(SomeServiceInterface.class);
  3. String result = service.doSomeStuff(1L, "secondParam", new AnyParam());

客户端和服务端实例应使用相同的远程接口且后端是使用相同的服务器连接配置所创建的 Redisson 实例。
客户端和服务端实例可运行在相同的 JVM 上。
对客户端和/或服务端实例的数量没有限制。
(注:尽管 Redisson 没有强制任何限制,但
Redis 本事的限制还是存在的。 )

远程调用在 1+ 个工作者存在时会以 并行模式 执行。

并行模式

并行执行器总量可计算如下:
T = R * N

T - 总可用并行执行器数量
R - Redisson 服务端实例数量
N - 在服务注册时所定义的执行器数量

超过这个数量的命令将放到队列中,在后续有可用执行器时再执行。

远程调用在 仅有 1 个工作者存在时将以 顺序模式 执行。
这时仅有一个命令会被同时执行,其它的命令都会被放到队列中。

顺序模式

远程服务消息流

RemoteService 对每个调用创建了两个队列。
一个队列用于请求(由服务端实例监听),而另一个用于应答响应和结构响应(由客户端实例监听)。
应答响应用于确定方法执行器是否有一个请求。
若在超时时间内没有得到应答,将抛出一个 RemoteServiceAckTimeoutException

下图描绘了每个远程调用的消息流:

远程服务消息流

Remote service. Fire-and-forget and ack-response modes

RemoteService 可通过 org.redisson.core.RemoteInvocationOptions 对象来
为每次远程调用提供选项。
这些选项可用于改变超时时间和跳过应答响应和/或结果响应。如:

  1. // 1 second ack timeout and 30 seconds execution timeout
  2. RemoteInvocationOptions options = RemoteInvocationOptions.defaults();
  3. // no ack but 30 seconds execution timeout
  4. RemoteInvocationOptions options = RemoteInvocationOptions.defaults().noAck();
  5. // 1 second ack timeout then forget the result
  6. RemoteInvocationOptions options = RemoteInvocationOptions.defaults().noResult();
  7. // 1 minute ack timeout then forget about the result
  8. RemoteInvocationOptions options = RemoteInvocationOptions.defaults().expectAckWithin(1, TimeUnit.MINUTES).noResult();
  9. // no ack and forget about the result (fire and forget)
  10. RemoteInvocationOptions options = RemoteInvocationOptions.defaults().noAck().noResult();
  11. RRemoteService remoteService = redisson.getRemoteService();
  12. YourService service = remoteService.get(YourService.class, options);

远程服务异步调用

远程调用可以异步方式执行。
这时应该使用单独的以 @RRemoteAsync 注解标注的接口。
其方法签名需匹配远程接口中的相同的方法。
每个方法应返回 io.netty.util.concurrent.Future 对象。
异步接口验证将在 RRemoteService.get 方法调用期间被执行。
并不需要把所有方法都列出来,只需要列出需要以异步方式调用的方法。

  1. public interface RemoteInterface {
  2. Long someMethod1(Long param1, String param2);
  3. void someMethod2(MyObject param);
  4. MyObject someMethod3();
  5. }
  6. // async interface for RemoteInterface
  7. @RRemoteAsync(RemoteInterface.class)
  8. public interface RemoteInterfaceAsync {
  9. Future<Long> someMethod1(Long param1, String param2);
  10. Future<Void> someMethod2(MyObject param);
  11. }
  12. RRemoteService remoteService = redisson.getRemoteService();
  13. RemoteInterfaceAsync asyncService = remoteService.get(RemoteInterfaceAsync.class);