概述

redis缓存在项目中经常用到,如果每个需要的地方独立实现,会有各种各样的问题,如实现的功能不健壮,代码冗余

本篇的宗旨是构建一个抽象的,功能完善,代码健壮的缓存模块,使得接入时,通够快速、便捷的使用缓存

Cache

redis中缓存的对象

/**
 * @author liuk
 * 封装此对象的目的
 * 1. 当从redis批量获取对象时,能够清楚获取到的value对应的key
 * 当然redis返回时,是与请求时的key的顺序是一致的,如果未取到会返回null,通过顺序的一致性也是可以确定获取到的value对应的key的
 * 2. 如果该key确实不存在对应的对象,防止一直透过缓存查数据库,故该场景保存的value=null
 */
@Data
public class Cache<K, V> {

    public Cache() {
    }
    public Cache(K key, V value) {
        this.key = key;
        this.value = value;
    }
    /**
     * 缓存key
     */
    private K key;
    /**
     * 缓存value
     */
    private V value;
}

RedisKeyEnum

redis key的枚举,包括key的构造及expire

public enum RedisKeyEnum {

    APPLY_BATCH_EXEC_LOCK_KEY("xx:xx:exec_lock_key:%s", 10)

    RedisKeyEnum(String key, int expire) {
        this.key = key;
        this.expire = expire;
    }

    private String key;
    /**
     * second
     */
    private int expire;

    public String getKey(Object... args) {
        return String.format(key, args);
    }

    public int getExpire() {
        return expire;
    }
}

RedisCache

cache的接口,不止是redis cache 所有的cache基本都是以下方法的实现

public interface RedisCache<K, V> {

    /**
     * 获取缓存
     *
     * @param code
     * @return
     */
    V getByCache(K code);


    /**
     * 批量获取缓存
     *
     * @param codes
     * @return
     */
    Map<K, V> getByCache(Set<K> codes);

    /**
     * 删除缓存
     *
     * @param code
     */
    void delCache(K code);
}

AbstractRedisCache

抽象redis cache实现


/**
 * @author liuk
 */
public abstract class AbstractRedisCache<K, V> implements RedisCache<K, V> {
    private static final ILog LOGGER = LogFactory.getLog(AbstractRedisCache.class);

    /**
     * 需要注入自己的的redis commands 工具
     */
    @Resource
    private JedisCommands jedisCommands;
    /**
     * 需要注入自己的的redis commands 批量获取key value的工具
     */
    @Resource
    private MultiKeyCommands multiKeyCommands;

    /**
     * 获取缓存
     *
     * @param code
     * @return
     */
    //todo key 抽象   解决多参数问题 以后再改造吧
    public V getByCache(K code) {
        if(code == null){
            return null;
        }
        RedisKeyEnum keyEnum = keyEnum();
        String key = keyEnum.getKey(code);
        String cacheStr = this.get(code);
        if (!StringUtils.isEmpty(cacheStr)) {
            Cache<K, V> cache = map2cache(cacheStr);
            return cache.getValue();
        }
        //get
        LOGGER.info("redis_cache||createCache||code={}", code);
        V value = createCache(code);
        this.set(code, value);
        return value;
    }

    private Cache<K, V> map2cache(String cacheStr) {
        Cache<K, V> cache = JSONObject.parseObject(cacheStr, Cache.class);
        V v = JSONObject.parseObject(cacheStr).getObject("value", valueClass());
        cache.setValue(v);
        return cache;
    }

    /**
     * 批量获取缓存
     *
     * @param codes
     * @return
     */
    public Map<K, V> getByCache(Set<K> codes) {
        
        if(CollectionUtils.isEmpty(codes)){
            return new HashMap();
        }
        RedisKeyEnum keyEnum = keyEnum();
        Map<K, V> result = new HashMap<>();
        int max = 20;
        Set<K> noCacheGoodsIds = new HashSet<>(codes);
        Map<String, K> codesMap = map2String(codes);
        for (int i = 0; i < codes.size(); i += max) {
            List<K> tempCodes = new ArrayList<>(codes).subList(i, Math.min(i + max, codes.size()));
            //redis 操作弱依赖
            List<String> values = this.mget(tempCodes);
            if (org.springframework.util.CollectionUtils.isEmpty(values)) {
                continue;
            }
            for (String value : values) {
                //竟然拿到的是null.....
                if (StringUtils.isEmpty(value)) {
                    continue;
                }
                Cache<K, V> cache = map2cache(value);
                if (cache.getValue() != null) {
                    result.put(codesMap.get(String.valueOf(cache.getKey())), cache.getValue());
                    noCacheGoodsIds.remove(codesMap.get(String.valueOf(cache.getKey())));
                }
                noCacheGoodsIds.remove(codesMap.get(String.valueOf(cache.getKey())));

            }
        }
        if (!CollectionUtils.isEmpty(noCacheGoodsIds)) {
            LOGGER.info("redis_cache||createCache||size={}", noCacheGoodsIds.size());
            noCacheGoodsIds = noCacheGoodsIds.stream().filter(Objects::nonNull).collect(Collectors.toSet());
            Map<K, V> values = createCache(noCacheGoodsIds);
            this.set(values);
            result.putAll(values);
        }
        return result;
    }

    /**
     * 异步 and 弱依赖
     *
     * @param cacheMap
     */
    private void set(Map<K, V> cacheMap) {
        ThreadPoolExecutorEnum.BATCH_EXEC_THREAD_POOL.execute(() -> {
            try {
                RedisKeyEnum keyEnum = keyEnum();
                for (K key : cacheMap.keySet()) {
                    Cache<K, V> cache = new Cache<>(key, cacheMap.get(key));
                    jedisCommands.setex(keyEnum.getKey(key), keyEnum.getExpire(), JSONObject.toJSONString(cache));
                }
            } catch (Exception e) {
                LOGGER.error("redis_cache||set||cacheMap.size={}", cacheMap.size());
            }
        });

    }

    /**
     * redis弱依赖,超时或故障不影响业务
     * todo redis sentinal熔断
     *
     * @param keyList
     * @return
     */
    private List<String> mget(List<K> keyList) {
        try {
            RedisKeyEnum keyEnum = keyEnum();
            String[] keys = new String[keyList.size()];
            for (int j = 0; j < keyList.size(); j++) {
                keys[j] = keyEnum.getKey(keyList.get(j));
            }
            List<String> values = multiKeyCommands.mget(keys);
            return values;
        } catch (Exception e) {
            LOGGER.error("redis_cache||mget||keys.size={}", keyList.size());
        }
        return new ArrayList<>();
    }

    /**
     * redis弱依赖,超时或故障不影响业务
     * 当redis超时时,直接调用createCache  是仍然能获取到value的,但是数据库压力会比较大,需考虑是否可承压,可通过开关控制是否可直接调用 createValue
     *
     * @param keyList
     * @return
     */
    private String get(K key) {
        try {
            RedisKeyEnum keyEnum = keyEnum();
            String cacheStr = jedisCommands.get(keyEnum.getKey(key));
            return cacheStr;
        } catch (Exception e) {
            LOGGER.error("redis_cache||get||key={}", key);
        }
        return null;
    }

    /**
     * redis弱依赖,当redis超时或故障时不影响业务
     * 此处如有需要可考虑异步 参考 delCache
     * @param keyList
     * @return
     */
    private void set(K key, V value) {
        try {
            RedisKeyEnum keyEnum = keyEnum();
            Cache<K, V> cache = new Cache<>(key, value);
            jedisCommands.setex(keyEnum.getKey(key), keyEnum.getExpire(), JSONObject.toJSONString(cache));
        } catch (Exception e) {
            LOGGER.error("redis_cache||set||key={}", key);
        }
    }

    /**
     * 映射code,进行K与string的映射,否则当key是Long时,可能会映射为Integer而非Long
     * 故通过此方法,记录key的原值
     *
     * @param codes
     * @return
     */
    private Map<String, K> map2String(Set<K> codes) {
        Map<String, K> result = new HashMap<>();
        for (K code : codes) {
            result.put(String.valueOf(code), code);
        }
        return result;
    }

    /**
     * 删除缓存
     * 弱依赖,删除失败不影响现有逻辑,等缓存超时,但是也需要关注删除失败的场景,如果出现,则后续缓存失效前使用的都是老数据,根据数据及时性,做不同的逻辑
     *
     * @param code
     */
    public void delCache(K code) {
        try {

            RedisKeyEnum keyEnum = keyEnum();
            String key = keyEnum.getKey(code);

            //有些是在事务中的操作,为了尽快释放事务,异步删除
            ThreadPoolExecutorEnum.REDIS_CACHE_THREAD_POOL.execute(() -> {
                jedisCommands.del(key);
            });

            //延时双删,防止主备延迟,导致上面删除后,仍然读到旧的数据,此处延时双删下,如不需要可删除
            ScheduledExecutor.run(() -> {
                jedisCommands.del(key);
            }, 1);
        } catch (Exception e) {
            LOGGER.error("redis_cache||delCache||key={}", code);
        }

    }

    /**
     * 缓存不存在时单个生成缓存对象
     *
     * @param key
     * @return
     */
    private V createCache(K key) {
        Map<K, V> map = createCache(Sets.newHashSet(key));
        if (map != null && map.containsKey(key)) {
            return map.get(key);
        }
        return null;
    }

    /**
     * 缓存不存在时,用于批量生成缓存对象
     *
     * @param key
     * @return
     */
    protected abstract Map<K, V> createCache(Set<K> key);

    /**
     * 缓存key枚举  存在key及expire
     *
     * @return
     */
    protected abstract RedisKeyEnum keyEnum();

    /**
     * 缓存class类型,用于将字符串map为V 对象,因为泛型在编译后会被擦除,所以通过泛型无法正常映射
     *
     * @return
     */
    protected abstract Class<V> valueClass();
}


ThreadPoolExecutorEnum

异常处理线程池

public enum ThreadPoolExecutorEnum {

    BATCH_EXEC_THREAD_POOL(2, 5, 60, 100, "batch_exec_thread_pool", new ThreadPoolExecutor.AbortPolicy()),
    REDIS_CACHE_THREAD_POOL(2, 5, 60, 100, "cache_batch_set_thread_pool", new ThreadPoolExecutor.DiscardPolicy()),
    ;

    private TaxiThreadPoolExecutor executor;

    ThreadPoolExecutorEnum(int corePoolSize, int maxPoolSize, int keepAliveTime, int queueSize, String name, RejectedExecutionHandler handler) {
        //XXXThreadPoolExecutor 保证trace等数据完整
        this.executor = new XXXThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(queueSize), new ThreadFactoryBuilder(name), handler);
    }


    public void execute(Runnable runnable) {
        executor.execute(runnable);
    }
}

延时处理器

用于进行延时处理

public class ScheduledExecutor {

    /**
     *  延时处理器
     */
    private static ScheduledExecutorService executorService = new TaxiScheduledThreadPoolExecutorWrapper(2,
            new ThreadFactoryBuilder("scheduled_executor"));

    /**
     * 延时执行
     *
     * @param runnable
     * @param second   秒
     */
    public static void run(Runnable runnable, int second) {
        //ScheduledExecutorService 用于延时,为了防止阻塞其它的延时处理逻辑,延时后通过新的线程池执行
        Runnable threadPoolRunnable = () -> {
            ThreadPoolExecutorEnum.REDIS_CACHE_THREAD_POOL.execute(runnable);
        };
        executorService.schedule(threadPoolRunnable, second, TimeUnit.SECONDS);
    }

}