【Redisson使用手册】分布式对象

发布于:2024-08-15 ⋅ 阅读:(73) ⋅ 点赞:(0)

每个 Redisson 对象实例都会有一个与之对应的 Redis 数据实例,可以通过调用 getName 方法来取得 Redis 数据实例的名称(key)。

一、通用对象桶(Object Bucket)

Redisson 的分布式 RBucket ,Java 对象是一种通用对象桶可以用来存放任类型的对象。 

RBucket<Object> bucket = redisson.getBucket("test:bucket:key:001");
bucket.set(new Object());
Object obj = bucket.get();
bucket.trySet(new Object());
bucket.compareAndSet(new Object(), new OObject());
bucket.getAndSet(new Object());

还可以通过 RBuckets 接口实现批量操作多个 RBucket 对象

RBuckets buckets = redisson.getBuckets();
List<RBucket<V>> foundBuckets = buckets.find("test:bucket:key*");
Map<String, V> loadedBuckets = buckets.get("test:bucket:key:001", "test:bucket:key:002", "test:bucket:key:003");
Map<String, Object> map = new HashMap<>();
map.put("myBucket1", new MyObject());
map.put("myBucket2", new MyObject());
// 利用Redis的事务特性,同时保存所有的通用对象桶,如果任意一个通用对象桶已经存在则放弃保存其他所有数据。
buckets.trySet(map);
// 同时保存全部通用对象桶。
buckets.set(map);

单元测试

    /**
     * 获取缓存
     *
     * @param key   key
     * @param clazz 类型
     * @param <T>   类型
     */
    public <T> T get(String key, Class<T> clazz) {
        RBucket<T> rBucket = redissonClient().getBucket(key);
        return clazz.cast(rBucket.get());
    }
 @Test
    public void test_deleteRBucket_multi() {
        String redisKey1 = "test:deleteRBucket:multi:1";
        String redisValue1 = primaryRedissonHelper.get(redisKey1, String.class);
        Assertions.assertNull(redisValue1);
        primaryRedissonHelper.set(redisKey1, "test_deleteRBucket_single_multi_1", 30);
        redisValue1 = primaryRedissonHelper.get(redisKey1, String.class);
        Assertions.assertNotNull(redisValue1);
        Assertions.assertEquals("test_deleteRBucket_single_multi_1", redisValue1);

        String redisKey2 = "test_deleteRBucket_multi_2";
        String redisValue2 = primaryRedissonHelper.get(redisKey2, String.class);
        Assertions.assertNull(redisValue2);
        primaryRedissonHelper.set(redisKey2, "test_deleteRBucket_single_multi_2", 30);
        redisValue2 = primaryRedissonHelper.get(redisKey2, String.class);
        Assertions.assertNotNull(redisValue2);
        Assertions.assertEquals("test_deleteRBucket_single_multi_2", redisValue2);

        primaryRedissonHelper.deleteBucket(redisKey1, redisKey2);
        redisValue1 = primaryRedissonHelper.get(redisKey1, String.class);
        Assertions.assertNull(redisValue1);
        redisValue2 = primaryRedissonHelper.get(redisKey2, String.class);
        Assertions.assertNull(redisValue2);
    }

二、二进制流(Binary Stream)

Redisson 的分布式 RBinaryStream ,Java 对象同时提供了 InputStream 接口和 OutputStream 接口的实现。流的最大容量受 Redis 主节点的内存大小限制。

RBinaryStream stream = redisson.getBinaryStream("test:stream:001");
byte[] content = ...
stream.set(content);
InputStream is = stream.getInputStream();
byte[] readBuffer = new byte[512];
is.read(readBuffer);
OutputStream os = stream.getOuputStream();
byte[] contentToWrite = ...
os.write(contentToWrite);

三、地理空间对象桶(Geospatial Bucket)

Redisson 的分布式RGeo,Java 对象是一种专门用来储存与地理位置有关的对象桶。

RGeo<String> geo = redisson.getGeo("test:geo:001");
geo.add(new GeoEntry(13.361389, 38.115556, "Palermo"),
        new GeoEntry(15.087269, 37.502669, "Catania"));
geo.addAsync(37.618423, 55.751244, "Moscow");
Double distance = geo.dist("Palermo", "Catania", GeoUnit.METERS);
geo.hashAsync("Palermo", "Catania");
Map<String, GeoPosition> positions = geo.pos("test2", "Palermo", "test3", "Catania", "test1");
List<String> cities = geo.radius(15, 37, 200, GeoUnit.KILOMETERS);
Map<String, GeoPosition> citiesWithPositions = geo.radiusWithPosition(15, 37, 200, GeoUnit.KILOMETERS);

四、原子长整型(AtomicLong)

Redisson 的分布式整长形 RAtomicLong 对象和 Java 中的 java.util.concurrent.atomic.AtomicLong  对象类似。

RAtomicLong atomicLong = redisson.getAtomicLong("test:long:001");
atomicLong.set(3);
atomicLong.incrementAndGet();
atomicLong.get();

五、原子双精度浮点(AtomicDouble)

Redisson 还提供了分布式原子双精度浮点 RAtomicDouble,弥补了 Java 自身的不足。

RAtomicDouble atomicDouble = redisson.getAtomicDouble("test:atomicDouble:001");
atomicDouble.set(2.81);
atomicDouble.addAndGet(4.11);
atomicDouble.get();

六、话题(订阅分发)(RTopic)

Redisson 的分布式话题 (RTopic)、反射式(Reactive)和 RxJava2 标准的接口。

RTopic topic = redisson.getTopic("test:topic:001");
topic.addListener(SomeObject.class, new MessageListener<SomeObject>() {
    @Override
    public void onMessage(String channel, SomeObject message) {
        //...
    }
});
// 在其他线程或JVM节点
RTopic topic = redisson.getTopic("test:topic:001");
long clientsReceivedMessage = topic.publish(new SomeObject());

在 Redis 节点故障转移(主从切换)或断线重连以后,所有的话题监听器将自动完成话题的重新订阅。

七、整长型累加器(LongAdder)

基于 Redis 的 Redisson 分布式整长型累加器(LongAdder)采用了与 java.util.concurrent.atomic.LongAdder 类似的接口。通过利用客户端内置的 LongAdder 对象,为分布式环境下递增和递减操作提供了很高得性能。据统计其性能最高比分布式 AtomicLong 对象快 12000 倍。完美适用于分布式统计计量场景。

RLongAdder atomicLong = redisson.getLongAdder("test:atomicLong:001");
atomicLong.add(12);
atomicLong.increment();
atomicLong.decrement();
atomicLong.sum();

当不再使用整长型累加器对象的时候应该自行手动销毁,如果 Redisson 对象被关闭(shutdown)了,则不用手动销毁。

RLongAdder atomicLong = ...
atomicLong.destroy();

八、分布式锁(RLock)

    /**
     * 一直尝试获取锁,直到获取到为止
     *
     * @param lockName    锁名称
     * @param leaseTime   使用时间
     * @param maxAttempts 最大尝试次数
     */
    public void tryLock(String lockName, long leaseTime, int maxAttempts) {
        RLock lock = getLock(lockName);
        int attempt = 0;
        try {
            while (attempt < maxAttempts) {
                boolean isLocked = lock.tryLock(1, leaseTime, TimeUnit.SECONDS);
                if (isLocked) {
                    log.info("tryLock, isLocked=true, lockName={}, leaseTime={}", lockName, leaseTime);
                    return;
                }
                Thread.sleep(20);
                attempt++;
            }
            log.warn("tryLock, failed to acquire lock after {} attempts, lockName={}, leaseTime={}", maxAttempts, lockName, leaseTime);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt(); // 重新设置中断状态
            log.warn("tryLock interrupted, lockName={}, leaseTime={}", lockName, leaseTime);
        }
    }

测试

    @Test
    public void test_tryLock() throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        // 用于等待所有任务完成
        int count = 10;
        final CountDownLatch latch = new CountDownLatch(count);
        for (int i = 0; i < count; i++) {
            executorService.execute(() -> {
                String lockName = "test:lock:002";
                try {
                    // 添加最大尝试次数
                    primaryRedissonHelper.tryLock(lockName, 10, 10000);
                    RLock rLock = primaryRedissonHelper.getLock(lockName);
                    Assertions.assertTrue(rLock != null && rLock.isLocked());
                    Thread.sleep(3000L);
                } catch (InterruptedException e) {
                    log.warn("test_tryLock {}", ExceptionUtils.getStackTrace(e));
                } finally {
                    // 无论是否成功,都减少计数
                    latch.countDown();
                    primaryRedissonHelper.unLock(lockName);
                }
            });
        }
        // 等待所有任务完成
        latch.await();
        executorService.shutdown();
        // 等待线程池完全关闭
        log.info("executorService.awaitTermination {}", executorService.awaitTermination(1, TimeUnit.MINUTES));
    }