概述

redis缓存在项目中经常用到,如果每个需要的地方独立实现,会有各种各样的问题,如实现的功能不健壮,代码冗余

本篇的宗旨是构建一个抽象的,功能完善,代码健壮的缓存模块,使得接入时,通够快速、便捷的使用缓存

重要的class

Cache

redis中缓存的对象

/**
 * @author liuk
 * 封装此对象的目的
 * 1. 当从redis批量获取对象时,能够清楚获取到的value对应的key
 * 当然redis返回时,是与请求时的key的顺序是一致的,如果未取到会返回null,通过顺序的一致性也是可以确定获取到的value对应的key的
 * 2. 如果该key确实不存在对应的对象,防止一直透过缓存查数据库,故该场景保存的value=null
 */
@Data
public class Cache<K, V> {

    public Cache() {
    }
    public Cache(K key, V value) {
        this.key = key;
        this.value = value;
    }
    /**
     * 缓存key
     */
    private K key;
    /**
     * 缓存value
     */
    private V value;
}

RedisKeyEnum

redis key的枚举,包括key的构造及expire

public enum RedisKeyEnum {

    APPLY_BATCH_EXEC_LOCK_KEY("xx:xx:exec_lock_key:%s", 10)

    RedisKeyEnum(String key, int expire) {
        this.key = key;
        this.expire = expire;
    }

    private String key;
    /**
     * second
     */
    private int expire;

    public String getKey(Object... args) {
        return String.format(key, args);
    }

    public int getExpire() {
        return expire;
    }
}

RedisCache

cache的接口,不止是redis cache 所有的cache基本都是以下方法的实现

public interface RedisCache<K, V> {

    /**
     * 获取缓存
     *
     * @param code
     * @return
     */
    V getByCache(K code);


    /**
     * 批量获取缓存
     *
     * @param codes
     * @return
     */
    Map<K, V> getByCache(Set<K> codes);

    /**
     * 删除缓存
     *
     * @param code
     */
    void delCache(K code);
}

AbstractRedisCache

抽象redis cache实现


/**
 * @author liuk
 */
public abstract class AbstractRedisCache<K, V> implements RedisCache<K, V> {
    private static final ILog LOGGER = LogFactory.getLog(AbstractRedisCache.class);

    /**
     * 需要注入自己的的redis commands 工具
     */
    @Resource
    private JedisCommands jedisCommands;
    /**
     * 需要注入自己的的redis commands 批量获取key value的工具
     */
    @Resource
    private MultiKeyCommands multiKeyCommands;

    /**
     * 获取缓存
     *
     * @param code
     * @return
     */
    //todo key 抽象   解决多参数问题 以后再改造吧
    public V getByCache(K code) {
        if(code == null){
            return null;
        }
        String cacheStr = this.get(code);
        if (!StringUtils.isEmpty(cacheStr)) {
            Cache<K, V> cache = map2cache(cacheStr);
            return cache.getValue();
        }
        //get
        LOGGER.info("redis_cache||createCache||code={}", code);
        V value = createCache(code);
        this.set(code, value);
        return value;
    }

    private Cache<K, V> map2cache(String cacheStr) {
        Cache<K, V> cache = JSONObject.parseObject(cacheStr, Cache.class);
        V v = JSONObject.parseObject(cacheStr).getObject("value", valueClass());
        cache.setValue(v);
        return cache;
    }

    /**
     * 批量获取缓存
     *
     * @param codes
     * @return
     */
    public Map<K, V> getByCache(Set<K> codes) {
        
        if(CollectionUtils.isEmpty(codes)){
            return new HashMap<>();
        }
        Map<K, V> result = new HashMap<>();
        int max = 20;
        Set<K> noCacheGoodsIds = new HashSet<>(codes);
        Map<String, K> codesMap = map2String(codes);
        List<K> codeList = new ArrayList<>(codes);

        for (int i = 0; i < codes.size(); i += max) {
            List<K> tempCodes = codeList.subList(i, Math.min(i + max, codes.size()));
            //redis 操作弱依赖
            List<String> values = this.mget(tempCodes);
            if (org.springframework.util.CollectionUtils.isEmpty(values)) {
                continue;
            }
            for (String value : values) {
                //竟然拿到的是null.....
                if (StringUtils.isEmpty(value)) {
                    continue;
                }
                Cache<K, V> cache = map2cache(value);
                if (cache.getValue() != null) {
                    result.put(codesMap.get(String.valueOf(cache.getKey())), cache.getValue());
                }
                noCacheGoodsIds.remove(codesMap.get(String.valueOf(cache.getKey())));

            }
        }
        if (!CollectionUtils.isEmpty(noCacheGoodsIds)) {
            LOGGER.info("redis_cache||createCache||size={}", noCacheGoodsIds.size());
            noCacheGoodsIds = noCacheGoodsIds.stream().filter(Objects::nonNull).collect(Collectors.toSet());
            Map<K, V> values = createCache(noCacheGoodsIds);
            this.set(values);
            result.putAll(values);
        }
        return result;
    }

    /**
     * 异步 and 弱依赖
     *
     * @param cacheMap
     */
    private void set(Map<K, V> cacheMap) {
        ThreadPoolExecutorEnum.BATCH_EXEC_THREAD_POOL.execute(() -> {
            try {
                RedisKeyEnum keyEnum = keyEnum();
                for (K key : cacheMap.keySet()) {
                    Cache<K, V> cache = new Cache<>(key, cacheMap.get(key));
                    jedisCommands.setex(keyEnum.getKey(key), keyEnum.getExpire(), JSONObject.toJSONString(cache));
                }
            } catch (Exception e) {
                LOGGER.error("redis_cache||set||cacheMap.size={}", cacheMap.size());
            }
        });

    }

    /**
     * redis弱依赖,超时或故障不影响业务
     * todo redis sentinal熔断
     *
     * @param keyList
     * @return
     */
    private List<String> mget(List<K> keyList) {
        try {
            RedisKeyEnum keyEnum = keyEnum();
            String[] keys = new String[keyList.size()];
            for (int j = 0; j < keyList.size(); j++) {
                keys[j] = keyEnum.getKey(keyList.get(j));
            }
            List<String> values = multiKeyCommands.mget(keys);
            return values;
        } catch (Exception e) {
            LOGGER.error("redis_cache||mget||keys.size={}", keyList.size());
        }
        return new ArrayList<>();
    }

    /**
     * redis弱依赖,超时或故障不影响业务
     * 当redis超时时,直接调用createCache  是仍然能获取到value的,但是数据库压力会比较大,需考虑是否可承压,可通过开关控制是否可直接调用 createValue
     *
     * @param keyList
     * @return
     */
    private String get(K key) {
        try {
            RedisKeyEnum keyEnum = keyEnum();
            String cacheStr = jedisCommands.get(keyEnum.getKey(key));
            return cacheStr;
        } catch (Exception e) {
            LOGGER.error("redis_cache||get||key={}", key);
        }
        return null;
    }

    /**
     * redis弱依赖,当redis超时或故障时不影响业务
     * 此处如有需要可考虑异步 参考 delCache
     * @param keyList
     * @return
     */
    private void set(K key, V value) {
        try {
            RedisKeyEnum keyEnum = keyEnum();
            Cache<K, V> cache = new Cache<>(key, value);
            jedisCommands.setex(keyEnum.getKey(key), keyEnum.getExpire(), JSONObject.toJSONString(cache));
        } catch (Exception e) {
            LOGGER.error("redis_cache||set||key={}", key);
        }
    }

    /**
     * 映射code,进行K与string的映射,否则当key是Long时,可能会映射为Integer而非Long
     * 故通过此方法,记录key的原值
     *
     * @param codes
     * @return
     */
    private Map<String, K> map2String(Set<K> codes) {
        Map<String, K> result = new HashMap<>();
        for (K code : codes) {
            result.put(String.valueOf(code), code);
        }
        return result;
    }

    /**
     * 删除缓存
     * 弱依赖,删除失败不影响现有逻辑,等缓存超时,但是也需要关注删除失败的场景,如果出现,则后续缓存失效前使用的都是老数据,根据数据及时性,做不同的逻辑
     *
     * @param code
     */
    public void delCache(K code) {
        try {

            RedisKeyEnum keyEnum = keyEnum();
            String key = keyEnum.getKey(code);

            //有些是在事务中的操作,为了尽快释放事务,异步删除
            ThreadPoolExecutorEnum.REDIS_CACHE_THREAD_POOL.execute(() -> {
                jedisCommands.del(key);
            });

            //延时双删,防止主备延迟,导致上面删除后,仍然读到旧的数据,此处延时双删下,如不需要可删除
            ScheduledExecutor.run(() -> {
                jedisCommands.del(key);
            }, 1);
        } catch (Exception e) {
            LOGGER.error("redis_cache||delCache||key={}", code);
        }

    }

    /**
     * 缓存不存在时单个生成缓存对象
     *
     * @param key
     * @return
     */
    private V createCache(K key) {
        Map<K, V> map = createCache(Sets.newHashSet(key));
        if (map != null && map.containsKey(key)) {
            return map.get(key);
        }
        return null;
    }

    /**
     * 缓存不存在时,用于批量生成缓存对象
     *
     * @param key
     * @return
     */
    protected abstract Map<K, V> createCache(Set<K> key);

    /**
     * 缓存key枚举  存在key及expire
     *
     * @return
     */
    protected abstract RedisKeyEnum keyEnum();

    /**
     * 缓存class类型,用于将字符串map为V 对象,因为泛型在编译后会被擦除,所以通过泛型无法正常映射
     *
     * @return
     */
    protected abstract Class<V> valueClass();
}


ThreadPoolExecutorEnum

异常处理线程池

public enum ThreadPoolExecutorEnum {

    BATCH_EXEC_THREAD_POOL(2, 5, 60, 100, "batch_exec_thread_pool", new ThreadPoolExecutor.AbortPolicy()),
    REDIS_CACHE_THREAD_POOL(2, 5, 60, 100, "cache_batch_set_thread_pool", new ThreadPoolExecutor.DiscardPolicy()),
    ;

    private TaxiThreadPoolExecutor executor;

    ThreadPoolExecutorEnum(int corePoolSize, int maxPoolSize, int keepAliveTime, int queueSize, String name, RejectedExecutionHandler handler) {
        //XXXThreadPoolExecutor 保证trace等数据完整
        this.executor = new XXXThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(queueSize), new ThreadFactoryBuilder(name), handler);
    }


    public void execute(Runnable runnable) {
        executor.execute(runnable);
    }
}

延时处理器

用于进行延时处理

public class ScheduledExecutor {

    /**
     *  延时处理器
     */
    private static ScheduledExecutorService executorService = new TaxiScheduledThreadPoolExecutorWrapper(2,
            new ThreadFactoryBuilder("scheduled_executor"));

    /**
     * 延时执行
     *
     * @param runnable
     * @param second   秒
     */
    public static void run(Runnable runnable, int second) {
        //ScheduledExecutorService 用于延时,为了防止阻塞其它的延时处理逻辑,延时后通过新的线程池执行
        Runnable threadPoolRunnable = () -> {
            ThreadPoolExecutorEnum.REDIS_CACHE_THREAD_POOL.execute(runnable);
        };
        executorService.schedule(threadPoolRunnable, second, TimeUnit.SECONDS);
    }

}

AbstractComplexRedisCache

支持复杂的key及泛型的数据缓存

/**
 * @author liuk
 * 支持复杂的key及集合的缓存,支持批量获取
 */
public abstract class AbstractComplexRedisCache<K extends CacheKey, V> implements RedisCache<K, V> {
    private static final ILog LOGGER = LogFactory.getLog(AbstractComplexRedisCache.class);

    @Resource
    private JedisCommands jedisCommands;


    @Resource
    private MultiKeyCommands multiKeyCommands;

    /**
     * 获取缓存
     *
     * @param code
     * @return
     */
    public V getByCache(K cacheKey) {
        if (cacheKey == null) {
            return null;
        }
        String cacheStr = this.get(cacheKey);
        if (!StringUtils.isEmpty(cacheStr)) {
            Cache<K, V> cache = map2cache(cacheKey, cacheStr);
            return cache.getValue();
        }
        //get1
        LOGGER.info("redis_cache||createCache||code={}", cacheKey.key());
        V value = createCache(cacheKey);
        this.set(cacheKey, value);
        return value;
    }

    private Cache<K, V> map2cache(K cacheKey, String cacheStr) {
        Cache<K, V> cache = new Cache<>();
        V v = JSONObject.parseObject(cacheStr).getObject("value", type());
        cache.setValue(v);
        cache.setKey(cacheKey);
        return cache;
    }


    /**
     * redis弱依赖,超时或故障不影响业务
     *
     * @param keyList
     * @return
     */
    private String get(K key) {
        try {
            String cacheStr = jedisCommands.get(key.key());
            return cacheStr;
        } catch (Exception e) {
            LOGGER.error("redis_cache||get||key={}", key);
        }
        return null;
    }

    /**
     * redis弱依赖,超时或故障不影响业务
     *
     * @param keyList
     * @return
     */
    private void set(K cacheKey, V value) {
        try {
            Cache<K, V> cache = new Cache<>(cacheKey, value);
            jedisCommands.setex(cacheKey.key(), this.expire(), JSONObject.toJSONString(cache));
        } catch (Exception e) {
            LOGGER.error("redis_cache||set||key={}", cacheKey.key());
        }
    }


    /**
     * 删除缓存
     *
     * @param code
     */
    public void delCache(K code) {
        try {

            String key = code.key();

            //有些是在事务中的操作,为了尽快释放事务,异步删除
            ThreadPoolExecutorEnum.REDIS_CACHE_THREAD_POOL.execute(() -> {
                jedisCommands.del(key);
            });

            //延时双删
            ScheduledExecutor.run(() -> {
                jedisCommands.del(key);
            }, 1);
        } catch (Exception e) {
            LOGGER.error("redis_cache||delCache||key={}", code);
        }

    }

    /**
     * 单个获取缓存对象
     *
     * @param key
     * @return
     */
    protected abstract V createCache(K key);


    /**
     * 单个获取缓存对象
     *
     * @param key
     * @return
     */
    protected abstract Map<K, V> createCache(Set<K> key);

    /**
     * 缓存类型
     *
     * @return
     */
    protected abstract Type type();

    /**
     * 秒
     *
     * @return
     */
    protected abstract int expire();


    @Override
    public Map<K, V> getByCache(Set<K> codes) {

        Map<K, V> result = new HashMap<>();

        List<String> keys = new ArrayList<>();
        Map<String, K> keyMap = new HashMap<>();
        int index = 0;
        for (K code : codes) {
            String key = code.key();
            keys.add(key);
            keyMap.put(key, code);
        }
        int max = 20;
        Set<K> noCacheGoodsIds = new HashSet<>(codes);
        for (int i = 0; i < codes.size(); i += max) {
            List<String> tempCodes = keys.subList(i, Math.min(i + max, codes.size()));
            //redis 操作弱依赖
            List<String> values = this.mget(tempCodes);
            if (org.springframework.util.CollectionUtils.isEmpty(values)) {
                continue;
            }

            for (int valIndex = 0; valIndex < values.size(); valIndex++) {
                String value = values.get(valIndex);
                K key = keyMap.get(tempCodes.get(valIndex));
                //竟然拿到的是null.....
                if (StringUtils.isEmpty(value)) {
                    continue;
                }
                Cache<K, V> cache = map2cache(key, value);

                if (cache.getValue() != null) {
                    result.put(key, cache.getValue());
                }
                noCacheGoodsIds.remove(key);
            }
        }
        if (!CollectionUtils.isEmpty(noCacheGoodsIds)) {
            LOGGER.info("redis_cache||createCache||size={}", noCacheGoodsIds.size());
            noCacheGoodsIds = noCacheGoodsIds.stream().filter(Objects::nonNull).collect(Collectors.toSet());
            Map<K, V> values = createCache(noCacheGoodsIds);
            this.set(values);
            result.putAll(values);
        }
        return result;
    }

    private List<String> mget(List<String> keyList) {
        try {
            String[] keys = new String[keyList.size()];
            for (int j = 0; j < keyList.size(); j++) {
                keys[j] = keyList.get(j);
            }
            List<String> values = multiKeyCommands.mget(keys);
            return values;
        } catch (Exception e) {
            LOGGER.error("redis_cache||mget||keys.size={}", keyList.size());
        }
        return new ArrayList<>();
    }

    private void set(Map<K, V> cacheMap) {
        ThreadPoolExecutorEnum.BATCH_EXEC_THREAD_POOL.execute(() -> {
            try {
                for (K key : cacheMap.keySet()) {
                    Cache<K, V> cache = new Cache<>(key, cacheMap.get(key));
                    jedisCommands.setex(key.key(), this.expire(), JSONObject.toJSONString(cache));
                }
            } catch (Exception e) {
                LOGGER.error("redis_cache||set||cacheMap.size={}", cacheMap.size());
            }
        });

    }

}

使用

AbstractRedisCache的使用

@Service
public class GoodsDao extends AbstractRedisCache<Long, Goods> {
    
    @Resource
    private GoodsCustomMapper goodsCustomMapper;
    
    @Override
    protected Map<Long, Goods> createCache(Set<Long> key) {
        Map<Long, Goods> result = new HashMap<>();
        GoodsExample goodsExample = new GoodsExample();
        goodsExample.createCriteria().andIsDeletedEqualTo(DeletedStatusEnum.NORMAL.getCode())
                .andGoodsIdIn(new ArrayList<>(key));
        List<Goods> goodsList = goodsCustomMapper.selectByExample(goodsExample);
        if (CollectionUtils.isEmpty(goodsList)) {
            return result;
        }
        for (Goods goods : goodsList) {
            result.put(goods.getGoodsId(), goods);
        }
        return result;
    }


    @Override
    protected RedisCacheKeyEnum keyEnum() {
        return RedisCacheKeyEnum.GOODS_CACHE_KEY;
    }

    @Override
    protected Class<Goods> valueClass() {
        return Goods.class;
    }
}

AbstractComplexRedisCache的使用

@Component
public class CityGoodsCache extends AbstractComplexRedisCache<CityGoodsCacheKey, List<Long>> {

    @Resource
    private GoodsCustomMapper goodsCustomMapper;

    /**
     * 全部的单独查-1
     *
     * @param key
     * @return
     */
    @Override
    protected List<Long> createCache(CityGoodsCacheKey key) {
        GoodsCityQueryCondition condition = new GoodsCityQueryCondition();
        condition.setMallType(key.getMallType());
        condition.setGoodsState(GoodsShelvesStateEnum.put.getCode());
        condition.setCityId(key.getCityId().toString());
        condition.setBizType(key.getBizType());
        List<Goods> goods = goodsCustomMapper.selectByCityId(condition);
        List<Long> goodsIdList = new ArrayList<>();
        if (CollectionUtils.isNotEmpty(goods)) {
            goodsIdList = goods.stream().map(Goods::getGoodsId).collect(Collectors.toList());
        }
        return goodsIdList;
    }

    @Override
    protected Map<CityGoodsCacheKey, List<Long>> createCache(Set<CityGoodsCacheKey> key) {
        Map<CityGoodsCacheKey, List<Long>> result = new HashMap<>();
        for (CityGoodsCacheKey cacheKey : key) {
            result.put(cacheKey, createCache(cacheKey));
        }
        return result;
    }

    @Override
    protected int expire() {
        return 60 * 10;
    }

    @Override
    protected Type type() {
        return new TypeToken<List<Long>>() {
        }.getType();
    }
}