IT博客汇
  • 首页
  • 精华
  • 技术
  • 设计
  • 资讯
  • 扯淡
  • 权利声明
  • 登录 注册

    聊聊jetcache的MultiLevelCache

    codecraft发表于 2024-06-18 12:59:14
    love 0

    序

    本文主要研究一下jetcache的MultiLevelCache

    Cache

    jetcache-core/src/main/java/com/alicp/jetcache/Cache.java

    public interface Cache<K, V> extends Closeable {
    
        Logger logger = LoggerFactory.getLogger(Cache.class);
    
        default V get(K key) throws CacheInvokeException {
            CacheGetResult<V> result = GET(key);
            if (result.isSuccess()) {
                return result.getValue();
            } else {
                return null;
            }
        }
    
        default Map<K, V> getAll(Set<? extends K> keys) throws CacheInvokeException {
            MultiGetResult<K, V> cacheGetResults = GET_ALL(keys);
            return cacheGetResults.unwrapValues();
        }
    
        default void put(K key, V value) {
            PUT(key, value);
        }     
    
        default void putAll(Map<? extends K, ? extends V> map) {
            PUT_ALL(map);
        }
    
        default boolean putIfAbsent(K key, V value) {
            CacheResult result = PUT_IF_ABSENT(key, value, config().getExpireAfterWriteInMillis(), TimeUnit.MILLISECONDS);
            return result.getResultCode() == CacheResultCode.SUCCESS;
        }
    
        default boolean remove(K key) {
            return REMOVE(key).isSuccess();
        }
    
        default void removeAll(Set<? extends K> keys) {
            REMOVE_ALL(keys);
        }
    
        <T> T unwrap(Class<T> clazz);
    
        @Override
        default void close() {
        }
    
        CacheConfig<K, V> config();
    
        default AutoReleaseLock tryLock(K key, long expire, TimeUnit timeUnit) {
            if (key == null) {
                return null;
            }
            final String uuid = UUID.randomUUID().toString();
            final long expireTimestamp = System.currentTimeMillis() + timeUnit.toMillis(expire);
            final CacheConfig config = config();
    
    
            AutoReleaseLock lock = () -> {
                int unlockCount = 0;
                while (unlockCount++ < config.getTryLockUnlockCount()) {
                    if(System.currentTimeMillis() < expireTimestamp) {
                        CacheResult unlockResult = REMOVE(key);
                        if (unlockResult.getResultCode() == CacheResultCode.FAIL
                                || unlockResult.getResultCode() == CacheResultCode.PART_SUCCESS) {
                            logger.info("[tryLock] [{} of {}] [{}] unlock failed. Key={}, msg = {}",
                                    unlockCount, config.getTryLockUnlockCount(), uuid, key, unlockResult.getMessage());
                            // retry
                        } else if (unlockResult.isSuccess()) {
                            logger.debug("[tryLock] [{} of {}] [{}] successfully release the lock. Key={}",
                                    unlockCount, config.getTryLockUnlockCount(), uuid, key);
                            return;
                        } else {
                            logger.warn("[tryLock] [{} of {}] [{}] unexpected unlock result: Key={}, result={}",
                                    unlockCount, config.getTryLockUnlockCount(), uuid, key, unlockResult.getResultCode());
                            return;
                        }
                    } else {
                        logger.info("[tryLock] [{} of {}] [{}] lock already expired: Key={}",
                                unlockCount, config.getTryLockUnlockCount(), uuid, key);
                        return;
                    }
                }
            };
    
            int lockCount = 0;
            Cache cache = this;
            while (lockCount++ < config.getTryLockLockCount()) {
                CacheResult lockResult = cache.PUT_IF_ABSENT(key, uuid, expire, timeUnit);
                if (lockResult.isSuccess()) {
                    logger.debug("[tryLock] [{} of {}] [{}] successfully get a lock. Key={}",
                            lockCount, config.getTryLockLockCount(), uuid, key);
                    return lock;
                } else if (lockResult.getResultCode() == CacheResultCode.FAIL || lockResult.getResultCode() == CacheResultCode.PART_SUCCESS) {
                    logger.info("[tryLock] [{} of {}] [{}] cache access failed during get lock, will inquiry {} times. Key={}, msg={}",
                            lockCount, config.getTryLockLockCount(), uuid,
                            config.getTryLockInquiryCount(), key, lockResult.getMessage());
                    int inquiryCount = 0;
                    while (inquiryCount++ < config.getTryLockInquiryCount()) {
                        CacheGetResult inquiryResult = cache.GET(key);
                        if (inquiryResult.isSuccess()) {
                            if (uuid.equals(inquiryResult.getValue())) {
                                logger.debug("[tryLock] [{} of {}] [{}] successfully get a lock after inquiry. Key={}",
                                        inquiryCount, config.getTryLockInquiryCount(), uuid, key);
                                return lock;
                            } else {
                                logger.debug("[tryLock] [{} of {}] [{}] not the owner of the lock, return null. Key={}",
                                        inquiryCount, config.getTryLockInquiryCount(), uuid, key);
                                return null;
                            }
                        } else {
                            logger.info("[tryLock] [{} of {}] [{}] inquiry failed. Key={}, msg={}",
                                    inquiryCount, config.getTryLockInquiryCount(), uuid, key, inquiryResult.getMessage());
                            // retry inquiry
                        }
                    }
                } else {
                    // others holds the lock
                    logger.debug("[tryLock] [{} of {}] [{}] others holds the lock, return null. Key={}",
                            lockCount, config.getTryLockLockCount(), uuid, key);
                    return null;
                }
            }
    
            logger.debug("[tryLock] [{}] return null after {} attempts. Key={}", uuid, config.getTryLockLockCount(), key);
            return null;
        }
    
        default boolean tryLockAndRun(K key, long expire, TimeUnit timeUnit, Runnable action){
            try (AutoReleaseLock lock = tryLock(key, expire, timeUnit)) {
                if (lock != null) {
                    action.run();
                    return true;
                } else {
                    return false;
                }
            }
        }
    
        CacheGetResult<V> GET(K key);
    
        MultiGetResult<K, V> GET_ALL(Set<? extends K> keys);
    
        default V computeIfAbsent(K key, Function<K, V> loader) {
            return computeIfAbsent(key, loader, config().isCacheNullValue());
        }
    
        V computeIfAbsent(K key, Function<K, V> loader, boolean cacheNullWhenLoaderReturnNull);
    
        V computeIfAbsent(K key, Function<K, V> loader, boolean cacheNullWhenLoaderReturnNull, long expireAfterWrite, TimeUnit timeUnit);
    
        default void put(K key, V value, long expireAfterWrite, TimeUnit timeUnit) {
            PUT(key, value, expireAfterWrite, timeUnit);
        }
    
        default CacheResult PUT(K key, V value) {
            if (key == null) {
                return CacheResult.FAIL_ILLEGAL_ARGUMENT;
            }
            return PUT(key, value, config().getExpireAfterWriteInMillis(), TimeUnit.MILLISECONDS);
        }
    
        CacheResult PUT(K key, V value, long expireAfterWrite, TimeUnit timeUnit);
    
        default void putAll(Map<? extends K, ? extends V> map, long expireAfterWrite, TimeUnit timeUnit) {
            PUT_ALL(map, expireAfterWrite, timeUnit);
        }   
    
        default CacheResult PUT_ALL(Map<? extends K, ? extends V> map) {
            if (map == null) {
                return CacheResult.FAIL_ILLEGAL_ARGUMENT;
            }
            return PUT_ALL(map, config().getExpireAfterWriteInMillis(), TimeUnit.MILLISECONDS);
        }
    
        CacheResult PUT_ALL(Map<? extends K, ? extends V> map, long expireAfterWrite, TimeUnit timeUnit);
    
        CacheResult REMOVE(K key);
    
        CacheResult REMOVE_ALL(Set<? extends K> keys);
    
        CacheResult PUT_IF_ABSENT(K key, V value, long expireAfterWrite, TimeUnit timeUnit);
    
    }
    Cache接口主要是定义了大写的GET、GET_ALL、PUT、PUT_ALL、REMOVE、REMOVE_ALL、PUT_IF_ABSENT方法,以及基于这些大写方法包装的小写default方法
    V get(K key)这样的方法虽然用起来方便,但有功能上的缺陷,当get返回null的时候,无法断定是对应的key不存在,还是访问缓存发生了异常,所以JetCache针对部分操作提供了另外一套API,提供了完整的返回值
    另外主要是提供了tryLock、tryLockAndRun的default实现

    AbstractCache

    jetcache-core/src/main/java/com/alicp/jetcache/AbstractCache.java

    public abstract class AbstractCache<K, V> implements Cache<K, V> {
    
        private static Logger logger = LoggerFactory.getLogger(AbstractCache.class);
    
        private volatile ConcurrentHashMap<Object, LoaderLock> loaderMap;
    
        protected volatile boolean closed;
    
        //......
    
        @Override
        public final CacheGetResult<V> GET(K key) {
            long t = System.currentTimeMillis();
            CacheGetResult<V> result;
            if (key == null) {
                result = new CacheGetResult<V>(CacheResultCode.FAIL, CacheResult.MSG_ILLEGAL_ARGUMENT, null);
            } else {
                result = do_GET(key);
            }
            result.future().thenRun(() -> {
                CacheGetEvent event = new CacheGetEvent(this, System.currentTimeMillis() - t, key, result);
                notify(event);
            });
            return result;
        }
    
        protected abstract CacheGetResult<V> do_GET(K key);
    
        @Override
        public final MultiGetResult<K, V> GET_ALL(Set<? extends K> keys) {
            long t = System.currentTimeMillis();
            MultiGetResult<K, V> result;
            if (keys == null) {
                result = new MultiGetResult<>(CacheResultCode.FAIL, CacheResult.MSG_ILLEGAL_ARGUMENT, null);
            } else {
                result = do_GET_ALL(keys);
            }
            result.future().thenRun(() -> {
                CacheGetAllEvent event = new CacheGetAllEvent(this, System.currentTimeMillis() - t, keys, result);
                notify(event);
            });
            return result;
        }
    
        protected abstract MultiGetResult<K, V> do_GET_ALL(Set<? extends K> keys);
    
        @Override
        public final V computeIfAbsent(K key, Function<K, V> loader, boolean cacheNullWhenLoaderReturnNull) {
            return computeIfAbsentImpl(key, loader, cacheNullWhenLoaderReturnNull,
                    0, null, this);
        }
    
        @Override
        public final V computeIfAbsent(K key, Function<K, V> loader, boolean cacheNullWhenLoaderReturnNull,
                                       long expireAfterWrite, TimeUnit timeUnit) {
            return computeIfAbsentImpl(key, loader, cacheNullWhenLoaderReturnNull,
                    expireAfterWrite, timeUnit, this);
        }
    
        @Override
        public final CacheResult PUT(K key, V value, long expireAfterWrite, TimeUnit timeUnit) {
            long t = System.currentTimeMillis();
            CacheResult result;
            if (key == null) {
                result = CacheResult.FAIL_ILLEGAL_ARGUMENT;
            } else {
                result = do_PUT(key, value, expireAfterWrite, timeUnit);
            }
            result.future().thenRun(() -> {
                CachePutEvent event = new CachePutEvent(this, System.currentTimeMillis() - t, key, value, result);
                notify(event);
            });
            return result;
        }
    
        protected abstract CacheResult do_PUT(K key, V value, long expireAfterWrite, TimeUnit timeUnit);
    
        @Override
        public final CacheResult PUT_ALL(Map<? extends K, ? extends V> map, long expireAfterWrite, TimeUnit timeUnit) {
            long t = System.currentTimeMillis();
            CacheResult result;
            if (map == null) {
                result = CacheResult.FAIL_ILLEGAL_ARGUMENT;
            } else {
                result = do_PUT_ALL(map, expireAfterWrite, timeUnit);
            }
            result.future().thenRun(() -> {
                CachePutAllEvent event = new CachePutAllEvent(this, System.currentTimeMillis() - t, map, result);
                notify(event);
            });
            return result;
        }
    
        protected abstract CacheResult do_PUT_ALL(Map<? extends K, ? extends V> map, long expireAfterWrite, TimeUnit timeUnit);
    
        @Override
        public final CacheResult REMOVE(K key) {
            long t = System.currentTimeMillis();
            CacheResult result;
            if (key == null) {
                result = CacheResult.FAIL_ILLEGAL_ARGUMENT;
            } else {
                result = do_REMOVE(key);
            }
            result.future().thenRun(() -> {
                CacheRemoveEvent event = new CacheRemoveEvent(this, System.currentTimeMillis() - t, key, result);
                notify(event);
            });
            return result;
        }
    
        protected abstract CacheResult do_REMOVE(K key);
    
        @Override
        public final CacheResult REMOVE_ALL(Set<? extends K> keys) {
            long t = System.currentTimeMillis();
            CacheResult result;
            if (keys == null) {
                result = CacheResult.FAIL_ILLEGAL_ARGUMENT;
            } else {
                result = do_REMOVE_ALL(keys);
            }
            result.future().thenRun(() -> {
                CacheRemoveAllEvent event = new CacheRemoveAllEvent(this, System.currentTimeMillis() - t, keys, result);
                notify(event);
            });
            return result;
        }
    
        protected abstract CacheResult do_REMOVE_ALL(Set<? extends K> keys);
    
        @Override
        public final CacheResult PUT_IF_ABSENT(K key, V value, long expireAfterWrite, TimeUnit timeUnit) {
            long t = System.currentTimeMillis();
            CacheResult result;
            if (key == null) {
                result = CacheResult.FAIL_ILLEGAL_ARGUMENT;
            } else {
                result = do_PUT_IF_ABSENT(key, value, expireAfterWrite, timeUnit);
            }
            result.future().thenRun(() -> {
                CachePutEvent event = new CachePutEvent(this, System.currentTimeMillis() - t, key, value, result);
                notify(event);
            });
            return result;
        }
    
        protected abstract CacheResult do_PUT_IF_ABSENT(K key, V value, long expireAfterWrite, TimeUnit timeUnit);
    
        @Override
        public void close() {
            this.closed = true;
        }
    
        //......                        
    }    
    AbstractCache主要是实现了Cache的GET、GET_ALL、PUT、PUT_ALL、REMOVE、REMOVE_ALL、PUT_IF_ABSENT、computeIfAbsent、close方法,同时它定义了do_GET、do_GET_ALL、do_PUT、do_PUT_ALL、do_REMOVE、do_REMOVE_ALL、do_PUT_IF_ABSENT抽象方法
    它有几个子类,分别是AbstractEmbeddedCache、AbstractExternalCache、MultiLevelCache

    AbstractEmbeddedCache

    jetcache-core/src/main/java/com/alicp/jetcache/embedded/AbstractEmbeddedCache.java

    public abstract class AbstractEmbeddedCache<K, V> extends AbstractCache<K, V> {
        protected EmbeddedCacheConfig<K, V> config;
        protected InnerMap innerMap;
    
        protected abstract InnerMap createAreaCache();
    
        public AbstractEmbeddedCache(EmbeddedCacheConfig<K, V> config) {
            this.config = config;
            innerMap = createAreaCache();
        }
    
        @Override
        protected CacheGetResult<V> do_GET(K key) {
            Object newKey = buildKey(key);
            CacheValueHolder<V> holder = (CacheValueHolder<V>) innerMap.getValue(newKey);
            return parseHolderResult(holder);
        }
    
        @Override
        protected MultiGetResult<K, V> do_GET_ALL(Set<? extends K> keys) {
            ArrayList<K> keyList = new ArrayList<K>(keys.size());
            ArrayList<Object> newKeyList = new ArrayList<Object>(keys.size());
            keys.stream().forEach((k) -> {
                Object newKey = buildKey(k);
                keyList.add(k);
                newKeyList.add(newKey);
            });
            Map<Object, CacheValueHolder<V>> innerResultMap = innerMap.getAllValues(newKeyList);
            Map<K, CacheGetResult<V>> resultMap = new HashMap<>();
            for (int i = 0; i < keyList.size(); i++) {
                K key = keyList.get(i);
                Object newKey = newKeyList.get(i);
                CacheValueHolder<V> holder = innerResultMap.get(newKey);
                resultMap.put(key, parseHolderResult(holder));
            }
            MultiGetResult<K, V> result = new MultiGetResult<>(CacheResultCode.SUCCESS, null, resultMap);
            return result;
        }
    
        @Override
        protected CacheResult do_PUT(K key, V value, long expireAfterWrite, TimeUnit timeUnit) {
            CacheValueHolder<V> cacheObject = new CacheValueHolder(value ,timeUnit.toMillis(expireAfterWrite));
            innerMap.putValue(buildKey(key), cacheObject);
            return CacheResult.SUCCESS_WITHOUT_MSG;
        }
    
        @Override
        protected CacheResult do_PUT_ALL(Map<? extends K, ? extends V> map, long expireAfterWrite, TimeUnit timeUnit) {
            HashMap newKeyMap = new HashMap();
            for (Map.Entry<? extends K, ? extends V> en : map.entrySet()) {
                CacheValueHolder<V> cacheObject = new CacheValueHolder(en.getValue(), timeUnit.toMillis(expireAfterWrite));
                newKeyMap.put(buildKey(en.getKey()), cacheObject);
            }
            innerMap.putAllValues(newKeyMap);
    
            final HashMap resultMap = new HashMap();
            map.keySet().forEach((k) -> resultMap.put(k, CacheResultCode.SUCCESS));
            return CacheResult.SUCCESS_WITHOUT_MSG;
        }
                    
        //......
    }    
    AbstractEmbeddedCache继承了AbstractCache,它基于InnerMap实现了AbstractCache定义的一系列do_XXX方法,config方法返回的是EmbeddedCacheConfig
    它主要有两个实现类,分别是LinkedHashMapCache、CaffeineCache

    AbstractExternalCache

    jetcache-core/src/main/java/com/alicp/jetcache/external/AbstractExternalCache.java

    public abstract class AbstractExternalCache<K, V> extends AbstractCache<K, V> {
    
        private ExternalCacheConfig<K, V> config;
    
        public AbstractExternalCache(ExternalCacheConfig<K, V> config) {
            this.config = config;
            checkConfig();
        }
    
        protected void checkConfig() {
            if (config.getValueEncoder() == null) {
                throw new CacheConfigException("no value encoder");
            }
            if (config.getValueDecoder() == null) {
                throw new CacheConfigException("no value decoder");
            }
            if (config.getKeyPrefix() == null) {
                throw new CacheConfigException("keyPrefix is required");
            }
        }
    
        public byte[] buildKey(K key) {
            try {
                Object newKey = key;
                if (config.getKeyConvertor() != null) {
                    if (config.getKeyConvertor() instanceof KeyConvertor) {
                        if (!isPreservedKey(key)) {
                            // since 2.7.3 KeyConvertor extends Function<Object, Object>
                            newKey = config.getKeyConvertor().apply(key);
                        }
                    } else {
                        // before 2.7.3, KeyConvertor is interface only place some constants.
                        // "key convertor" is Function<Object, Object> and can't process byte[] and String
                        if (key instanceof byte[]) {
                            newKey = key;
                        } else if (key instanceof String) {
                            newKey = key;
                        } else {
                            newKey = config.getKeyConvertor().apply(key);
                        }
                    }
                }
                return ExternalKeyUtil.buildKeyAfterConvert(newKey, config.getKeyPrefix());
            } catch (IOException e) {
                throw new CacheException(e);
            }
        }
    
        private boolean isPreservedKey(Object key) {
            if (key instanceof byte[]) {
                byte[] keyBytes = (byte[]) key;
                return endWith(keyBytes, RefreshCache.LOCK_KEY_SUFFIX)
                        || endWith(keyBytes, RefreshCache.TIMESTAMP_KEY_SUFFIX);
            }
            return false;
        }
    
        private boolean endWith(byte[] key, byte[] suffix) {
            int len = suffix.length;
            if (key.length < len) {
                return false;
            }
            int startPos = key.length - len;
            for (int i = 0; i < len; i++) {
                if (key[startPos + i] != suffix[i]) {
                    return false;
                }
            }
            return true;
        }
    
    }
    AbstractExternalCache继承了AbstractCache,其config方法返回的是ExternalCacheConfig,提供了buildKey方法;它有几个实现类,分别是MockRemoteCache、RedisCache(jedis实现)、RedisLettuceCache、RedisSpringDataCache、RedissonCache

    MultiLevelCache

    jetcache-core/src/main/java/com/alicp/jetcache/MultiLevelCache.java

    public class MultiLevelCache<K, V> extends AbstractCache<K, V> {
    
        private Cache[] caches;
    
        private MultiLevelCacheConfig<K, V> config;
    
        @SuppressWarnings("unchecked")
        @Deprecated
        public MultiLevelCache(Cache... caches) throws CacheConfigException {
            this.caches = caches;
            checkCaches();
            CacheConfig lastConfig = caches[caches.length - 1].config();
            config = new MultiLevelCacheConfig<>();
            config.setCaches(Arrays.asList(caches));
            config.setExpireAfterWriteInMillis(lastConfig.getExpireAfterWriteInMillis());
            config.setCacheNullValue(lastConfig.isCacheNullValue());
        }
    
        @SuppressWarnings("unchecked")
        public MultiLevelCache(MultiLevelCacheConfig<K, V> cacheConfig) throws CacheConfigException {
            this.config = cacheConfig;
            this.caches = cacheConfig.getCaches().toArray(new Cache[]{});
            checkCaches();
        }
    
        private void checkCaches() {
            if (caches == null || caches.length == 0) {
                throw new IllegalArgumentException();
            }
            for (Cache c : caches) {
                if (c.config().getLoader() != null) {
                    throw new CacheConfigException("Loader on sub cache is not allowed, set the loader into MultiLevelCache.");
                }
            }
        }
    
        public Cache[] caches() {
            return caches;
        }
    
        @Override
        public MultiLevelCacheConfig<K, V> config() {
            return config;
        }
    
        @Override
        public CacheResult PUT(K key, V value) {
            if (config.isUseExpireOfSubCache()) {
                return PUT(key, value, 0, null);
            } else {
                return PUT(key, value, config().getExpireAfterWriteInMillis(), TimeUnit.MILLISECONDS);
            }
        }
    
        @Override
        public CacheResult PUT_ALL(Map<? extends K, ? extends V> map) {
            if (config.isUseExpireOfSubCache()) {
                return PUT_ALL(map, 0, null);
            } else {
                return PUT_ALL(map, config().getExpireAfterWriteInMillis(), TimeUnit.MILLISECONDS);
            }
        }
    
        @Override
        protected CacheGetResult<V> do_GET(K key) {
            for (int i = 0; i < caches.length; i++) {
                Cache cache = caches[i];
                CacheGetResult result = cache.GET(key);
                if (result.isSuccess()) {
                    CacheValueHolder<V> holder = unwrapHolder(result.getHolder());
                    checkResultAndFillUpperCache(key, i, holder);
                    return new CacheGetResult(CacheResultCode.SUCCESS, null, holder);
                }
            }
            return CacheGetResult.NOT_EXISTS_WITHOUT_MSG;
        }
    
        private CacheValueHolder<V> unwrapHolder(CacheValueHolder<V> h) {
            // if @Cached or @CacheCache change type from REMOTE to BOTH (or from BOTH to REMOTE),
            // during the dev/publish process, the value type which different application server put into cache server will be different
            // (CacheValueHolder<V> and CacheValueHolder<CacheValueHolder<V>>, respectively).
            // So we need correct the problem at here and in CacheGetResult.
            Objects.requireNonNull(h);
            if (h.getValue() instanceof CacheValueHolder) {
                return (CacheValueHolder<V>) h.getValue();
            } else {
                return h;
            }
        }
    
        private void checkResultAndFillUpperCache(K key, int i, CacheValueHolder<V> h) {
            Objects.requireNonNull(h);
            long currentExpire = h.getExpireTime();
            long now = System.currentTimeMillis();
            if (now <= currentExpire) {
                if(config.isUseExpireOfSubCache()){
                    PUT_caches(i, key, h.getValue(), 0, null);
                } else {
                    long restTtl = currentExpire - now;
                    if (restTtl > 0) {
                        PUT_caches(i, key, h.getValue(), restTtl, TimeUnit.MILLISECONDS);
                    }
                }
            }
        }
    
        @Override
        protected MultiGetResult<K, V> do_GET_ALL(Set<? extends K> keys) {
            HashMap<K, CacheGetResult<V>> resultMap = new HashMap<>();
            Set<K> restKeys = new HashSet<>(keys);
            for (int i = 0; i < caches.length; i++) {
                if (restKeys.size() == 0) {
                    break;
                }
                Cache<K, CacheValueHolder<V>> c = caches[i];
                MultiGetResult<K, CacheValueHolder<V>> allResult = c.GET_ALL(restKeys);
                if (allResult.isSuccess() && allResult.getValues() != null) {
                    for (Map.Entry<K, CacheGetResult<CacheValueHolder<V>>> en : allResult.getValues().entrySet()) {
                        K key = en.getKey();
                        CacheGetResult result = en.getValue();
                        if (result.isSuccess()) {
                            CacheValueHolder<V> holder = unwrapHolder(result.getHolder());
                            checkResultAndFillUpperCache(key, i, holder);
                            resultMap.put(key, new CacheGetResult(CacheResultCode.SUCCESS, null, holder));
                            restKeys.remove(key);
                        }
                    }
                }
            }
            for (K k : restKeys) {
                resultMap.put(k, CacheGetResult.NOT_EXISTS_WITHOUT_MSG);
            }
            return new MultiGetResult<>(CacheResultCode.SUCCESS, null, resultMap);
        }
    
        @Override
        protected CacheResult do_PUT(K key, V value, long expireAfterWrite, TimeUnit timeUnit) {
            return PUT_caches(caches.length, key, value, expireAfterWrite, timeUnit);
        }
    
        @Override
        protected CacheResult do_PUT_ALL(Map<? extends K, ? extends V> map, long expireAfterWrite, TimeUnit timeUnit) {
            CompletableFuture<ResultData> future = CompletableFuture.completedFuture(null);
            for (Cache c : caches) {
                CacheResult r;
                if(timeUnit == null) {
                    r = c.PUT_ALL(map);
                } else {
                    r = c.PUT_ALL(map, expireAfterWrite, timeUnit);
                }
                future = combine(future, r);
            }
            return new CacheResult(future);
        }
    
        private CacheResult PUT_caches(int lastIndex, K key, V value, long expire, TimeUnit timeUnit) {
            CompletableFuture<ResultData> future = CompletableFuture.completedFuture(null);
            for (int i = 0; i < lastIndex; i++) {
                Cache cache = caches[i];
                CacheResult r;
                if (timeUnit == null) {
                    r = cache.PUT(key, value);
                } else {
                    r = cache.PUT(key, value, expire, timeUnit);
                }
                future = combine(future, r);
            }
            return new CacheResult(future);
        }
    
        private CompletableFuture<ResultData> combine(CompletableFuture<ResultData> future, CacheResult result) {
            return future.thenCombine(result.future(), (d1, d2) -> {
                if (d1 == null) {
                    return d2;
                }
                if (d1.getResultCode() != d2.getResultCode()) {
                    return new ResultData(CacheResultCode.PART_SUCCESS, null, null);
                }
                return d1;
            });
        }
    
        @Override
        protected CacheResult do_REMOVE(K key) {
            CompletableFuture<ResultData> future = CompletableFuture.completedFuture(null);
            for (Cache cache : caches) {
                CacheResult r = cache.REMOVE(key);
                future = combine(future, r);
            }
            return new CacheResult(future);
        }
    
        @Override
        protected CacheResult do_REMOVE_ALL(Set<? extends K> keys) {
            CompletableFuture<ResultData> future = CompletableFuture.completedFuture(null);
            for (Cache cache : caches) {
                CacheResult r = cache.REMOVE_ALL(keys);
                future = combine(future, r);
            }
            return new CacheResult(future);
        }
    
        @Override
        public <T> T unwrap(Class<T> clazz) {
            Objects.requireNonNull(clazz);
            for (Cache cache : caches) {
                try {
                    T obj = (T) cache.unwrap(clazz);
                    if (obj != null) {
                        return obj;
                    }
                } catch (IllegalArgumentException e) {
                    // ignore
                }
            }
            throw new IllegalArgumentException(clazz.getName());
        }
    
        @Override
        public AutoReleaseLock tryLock(K key, long expire, TimeUnit timeUnit) {
            if (key == null) {
                return null;
            }
            return caches[caches.length - 1].tryLock(key, expire, timeUnit);
        }
    
        @Override
        public boolean putIfAbsent(K key, V value) {
            throw new UnsupportedOperationException("putIfAbsent is not supported by MultiLevelCache");
        }
    
        @Override
        protected CacheResult do_PUT_IF_ABSENT(K key, V value, long expireAfterWrite, TimeUnit timeUnit) {
            throw new UnsupportedOperationException("PUT_IF_ABSENT is not supported by MultiLevelCache");
        }
    
        @Override
        public void close() {
            super.close();
            for (Cache c : caches) {
                c.close();
            }
        }
    }  
    MultiLevelCache继承了AbstractCache,它定义了caches(Cache[])属性,其config方法返回的是MultiLevelCacheConfig
    其do_GET方法遍历caches,执行其GET方法,若result为success,则返回,否则继续遍历下一个cache,若都没有success的则返回CacheGetResult.NOT_EXISTS_WITHOUT_MSG;当result为success的时候会执行checkResultAndFillUpperCache方法,将查询到的值回填到遍历到的index及以下的cache上
    其do_PUT方法委托给了PUT_caches,其lastIndex参数为caches.length,它遍历caches挨个更新指定key的值
    其do_REMOVE方法遍历caches,挨个执行每个cache的REMOVE方法

    示例

    public class MultiLevelCacheExample {
        public static void main(String[] args) {
            Cache<Object, Object> l1Cache = CaffeineCacheBuilder.createCaffeineCacheBuilder()
                    .limit(100)
                    .expireAfterWrite(200, TimeUnit.SECONDS)
                    .keyConvertor(Fastjson2KeyConvertor.INSTANCE)
                    .buildCache();
    
            GenericObjectPoolConfig pc = new GenericObjectPoolConfig();
            pc.setMinIdle(2);
            pc.setMaxIdle(10);
            pc.setMaxTotal(10);
            JedisPool pool = new JedisPool(pc, "127.0.0.1", 6379);
            Cache<Object, Object> l2Cache = RedisCacheBuilder.createRedisCacheBuilder()
                    .jedisPool(pool)
                    .keyPrefix("projectD")
                    .expireAfterWrite(200, TimeUnit.SECONDS)
                    .keyConvertor(Fastjson2KeyConvertor.INSTANCE)
                    .valueEncoder(JavaValueEncoder.INSTANCE)
                    .valueDecoder(JavaValueDecoder.INSTANCE)
                    .buildCache();
    
            Cache<Object, Object> multiLevelCache = MultiLevelCacheBuilder.createMultiLevelCacheBuilder()
                    .addCache(l1Cache, l2Cache)
                    .buildCache();
    
            multiLevelCache.put("K1", "V1");
            multiLevelCache.put("K2", "V2", 20, TimeUnit.SECONDS);
            System.out.println(multiLevelCache.get("K1"));
            multiLevelCache.remove("K2");
    
        }
    
    }

    小结

    • Cache接口主要是定义了大写的GET、GET_ALL、PUT、PUT_ALL、REMOVE、REMOVE_ALL、PUT_IF_ABSENT方法,以及基于这些大写方法包装的小写default方法;
    • AbstractCache主要是实现了Cache的GET、GET_ALL、PUT、PUT_ALL、REMOVE、REMOVE_ALL、PUT_IF_ABSENT、computeIfAbsent、close方法,同时它定义了do_GET、do_GET_ALL、do_PUT、do_PUT_ALL、do_REMOVE、do_REMOVE_ALL、do_PUT_IF_ABSENT抽象方法;它有几个子类,分别是AbstractEmbeddedCache、AbstractExternalCache、MultiLevelCache
    • MultiLevelCache继承了AbstractCache,它定义了caches(Cache[])属性,其config方法返回的是MultiLevelCacheConfig;其do_GET方法遍历caches,执行其GET方法,若result为success,则返回,否则继续遍历下一个cache,若都没有success的则返回CacheGetResult.NOT_EXISTS_WITHOUT_MSG;当result为success的时候会执行checkResultAndFillUpperCache方法,将查询到的值回填到遍历到的index及以下的cache上;其do_PUT方法委托给了PUT_caches,其lastIndex参数为caches.length,它遍历caches挨个更新指定key的值;其do_REMOVE方法遍历caches,挨个执行每个cache的REMOVE方法


沪ICP备19023445号-2号
友情链接