redis cache抽象实现
概述
redis缓存在项目中经常用到,如果每个需要的地方独立实现,会有各种各样的问题,如实现的功能不健壮,代码冗余
本篇的宗旨是构建一个抽象的,功能完善,代码健壮的缓存模块,使得接入时,通够快速、便捷的使用缓存
Cache
redis中缓存的对象
/**
* @author liuk
* 封装此对象的目的
* 1. 当从redis批量获取对象时,能够清楚获取到的value对应的key
* 当然redis返回时,是与请求时的key的顺序是一致的,如果未取到会返回null,通过顺序的一致性也是可以确定获取到的value对应的key的
* 2. 如果该key确实不存在对应的对象,防止一直透过缓存查数据库,故该场景保存的value=null
*/
@Data
public class Cache<K, V> {
public Cache() {
}
public Cache(K key, V value) {
this.key = key;
this.value = value;
}
/**
* 缓存key
*/
private K key;
/**
* 缓存value
*/
private V value;
}
RedisKeyEnum
redis key的枚举,包括key的构造及expire
public enum RedisKeyEnum {
APPLY_BATCH_EXEC_LOCK_KEY("xx:xx:exec_lock_key:%s", 10)
RedisKeyEnum(String key, int expire) {
this.key = key;
this.expire = expire;
}
private String key;
/**
* second
*/
private int expire;
public String getKey(Object... args) {
return String.format(key, args);
}
public int getExpire() {
return expire;
}
}
RedisCache
cache的接口,不止是redis cache 所有的cache基本都是以下方法的实现
public interface RedisCache<K, V> {
/**
* 获取缓存
*
* @param code
* @return
*/
V getByCache(K code);
/**
* 批量获取缓存
*
* @param codes
* @return
*/
Map<K, V> getByCache(Set<K> codes);
/**
* 删除缓存
*
* @param code
*/
void delCache(K code);
}
AbstractRedisCache
抽象redis cache实现
/**
* @author liuk
*/
public abstract class AbstractRedisCache<K, V> implements RedisCache<K, V> {
private static final ILog LOGGER = LogFactory.getLog(AbstractRedisCache.class);
/**
* 需要注入自己的的redis commands 工具
*/
@Resource
private JedisCommands jedisCommands;
/**
* 需要注入自己的的redis commands 批量获取key value的工具
*/
@Resource
private MultiKeyCommands multiKeyCommands;
/**
* 获取缓存
*
* @param code
* @return
*/
//todo key 抽象 解决多参数问题 以后再改造吧
public V getByCache(K code) {
if(code == null){
return null;
}
RedisKeyEnum keyEnum = keyEnum();
String key = keyEnum.getKey(code);
String cacheStr = this.get(code);
if (!StringUtils.isEmpty(cacheStr)) {
Cache<K, V> cache = map2cache(cacheStr);
return cache.getValue();
}
//get
LOGGER.info("redis_cache||createCache||code={}", code);
V value = createCache(code);
this.set(code, value);
return value;
}
private Cache<K, V> map2cache(String cacheStr) {
Cache<K, V> cache = JSONObject.parseObject(cacheStr, Cache.class);
V v = JSONObject.parseObject(cacheStr).getObject("value", valueClass());
cache.setValue(v);
return cache;
}
/**
* 批量获取缓存
*
* @param codes
* @return
*/
public Map<K, V> getByCache(Set<K> codes) {
if(CollectionUtils.isEmpty(codes)){
return new HashMap();
}
RedisKeyEnum keyEnum = keyEnum();
Map<K, V> result = new HashMap<>();
int max = 20;
Set<K> noCacheGoodsIds = new HashSet<>(codes);
Map<String, K> codesMap = map2String(codes);
for (int i = 0; i < codes.size(); i += max) {
List<K> tempCodes = new ArrayList<>(codes).subList(i, Math.min(i + max, codes.size()));
//redis 操作弱依赖
List<String> values = this.mget(tempCodes);
if (org.springframework.util.CollectionUtils.isEmpty(values)) {
continue;
}
for (String value : values) {
//竟然拿到的是null.....
if (StringUtils.isEmpty(value)) {
continue;
}
Cache<K, V> cache = map2cache(value);
if (cache.getValue() != null) {
result.put(codesMap.get(String.valueOf(cache.getKey())), cache.getValue());
noCacheGoodsIds.remove(codesMap.get(String.valueOf(cache.getKey())));
}
noCacheGoodsIds.remove(codesMap.get(String.valueOf(cache.getKey())));
}
}
if (!CollectionUtils.isEmpty(noCacheGoodsIds)) {
LOGGER.info("redis_cache||createCache||size={}", noCacheGoodsIds.size());
noCacheGoodsIds = noCacheGoodsIds.stream().filter(Objects::nonNull).collect(Collectors.toSet());
Map<K, V> values = createCache(noCacheGoodsIds);
this.set(values);
result.putAll(values);
}
return result;
}
/**
* 异步 and 弱依赖
*
* @param cacheMap
*/
private void set(Map<K, V> cacheMap) {
ThreadPoolExecutorEnum.BATCH_EXEC_THREAD_POOL.execute(() -> {
try {
RedisKeyEnum keyEnum = keyEnum();
for (K key : cacheMap.keySet()) {
Cache<K, V> cache = new Cache<>(key, cacheMap.get(key));
jedisCommands.setex(keyEnum.getKey(key), keyEnum.getExpire(), JSONObject.toJSONString(cache));
}
} catch (Exception e) {
LOGGER.error("redis_cache||set||cacheMap.size={}", cacheMap.size());
}
});
}
/**
* redis弱依赖,超时或故障不影响业务
* todo redis sentinal熔断
*
* @param keyList
* @return
*/
private List<String> mget(List<K> keyList) {
try {
RedisKeyEnum keyEnum = keyEnum();
String[] keys = new String[keyList.size()];
for (int j = 0; j < keyList.size(); j++) {
keys[j] = keyEnum.getKey(keyList.get(j));
}
List<String> values = multiKeyCommands.mget(keys);
return values;
} catch (Exception e) {
LOGGER.error("redis_cache||mget||keys.size={}", keyList.size());
}
return new ArrayList<>();
}
/**
* redis弱依赖,超时或故障不影响业务
* 当redis超时时,直接调用createCache 是仍然能获取到value的,但是数据库压力会比较大,需考虑是否可承压,可通过开关控制是否可直接调用 createValue
*
* @param keyList
* @return
*/
private String get(K key) {
try {
RedisKeyEnum keyEnum = keyEnum();
String cacheStr = jedisCommands.get(keyEnum.getKey(key));
return cacheStr;
} catch (Exception e) {
LOGGER.error("redis_cache||get||key={}", key);
}
return null;
}
/**
* redis弱依赖,当redis超时或故障时不影响业务
* 此处如有需要可考虑异步 参考 delCache
* @param keyList
* @return
*/
private void set(K key, V value) {
try {
RedisKeyEnum keyEnum = keyEnum();
Cache<K, V> cache = new Cache<>(key, value);
jedisCommands.setex(keyEnum.getKey(key), keyEnum.getExpire(), JSONObject.toJSONString(cache));
} catch (Exception e) {
LOGGER.error("redis_cache||set||key={}", key);
}
}
/**
* 映射code,进行K与string的映射,否则当key是Long时,可能会映射为Integer而非Long
* 故通过此方法,记录key的原值
*
* @param codes
* @return
*/
private Map<String, K> map2String(Set<K> codes) {
Map<String, K> result = new HashMap<>();
for (K code : codes) {
result.put(String.valueOf(code), code);
}
return result;
}
/**
* 删除缓存
* 弱依赖,删除失败不影响现有逻辑,等缓存超时,但是也需要关注删除失败的场景,如果出现,则后续缓存失效前使用的都是老数据,根据数据及时性,做不同的逻辑
*
* @param code
*/
public void delCache(K code) {
try {
RedisKeyEnum keyEnum = keyEnum();
String key = keyEnum.getKey(code);
//有些是在事务中的操作,为了尽快释放事务,异步删除
ThreadPoolExecutorEnum.REDIS_CACHE_THREAD_POOL.execute(() -> {
jedisCommands.del(key);
});
//延时双删,防止主备延迟,导致上面删除后,仍然读到旧的数据,此处延时双删下,如不需要可删除
ScheduledExecutor.run(() -> {
jedisCommands.del(key);
}, 1);
} catch (Exception e) {
LOGGER.error("redis_cache||delCache||key={}", code);
}
}
/**
* 缓存不存在时单个生成缓存对象
*
* @param key
* @return
*/
private V createCache(K key) {
Map<K, V> map = createCache(Sets.newHashSet(key));
if (map != null && map.containsKey(key)) {
return map.get(key);
}
return null;
}
/**
* 缓存不存在时,用于批量生成缓存对象
*
* @param key
* @return
*/
protected abstract Map<K, V> createCache(Set<K> key);
/**
* 缓存key枚举 存在key及expire
*
* @return
*/
protected abstract RedisKeyEnum keyEnum();
/**
* 缓存class类型,用于将字符串map为V 对象,因为泛型在编译后会被擦除,所以通过泛型无法正常映射
*
* @return
*/
protected abstract Class<V> valueClass();
}
ThreadPoolExecutorEnum
异常处理线程池
public enum ThreadPoolExecutorEnum {
BATCH_EXEC_THREAD_POOL(2, 5, 60, 100, "batch_exec_thread_pool", new ThreadPoolExecutor.AbortPolicy()),
REDIS_CACHE_THREAD_POOL(2, 5, 60, 100, "cache_batch_set_thread_pool", new ThreadPoolExecutor.DiscardPolicy()),
;
private TaxiThreadPoolExecutor executor;
ThreadPoolExecutorEnum(int corePoolSize, int maxPoolSize, int keepAliveTime, int queueSize, String name, RejectedExecutionHandler handler) {
//XXXThreadPoolExecutor 保证trace等数据完整
this.executor = new XXXThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(queueSize), new ThreadFactoryBuilder(name), handler);
}
public void execute(Runnable runnable) {
executor.execute(runnable);
}
}
延时处理器
用于进行延时处理
public class ScheduledExecutor {
/**
* 延时处理器
*/
private static ScheduledExecutorService executorService = new TaxiScheduledThreadPoolExecutorWrapper(2,
new ThreadFactoryBuilder("scheduled_executor"));
/**
* 延时执行
*
* @param runnable
* @param second 秒
*/
public static void run(Runnable runnable, int second) {
//ScheduledExecutorService 用于延时,为了防止阻塞其它的延时处理逻辑,延时后通过新的线程池执行
Runnable threadPoolRunnable = () -> {
ThreadPoolExecutorEnum.REDIS_CACHE_THREAD_POOL.execute(runnable);
};
executorService.schedule(threadPoolRunnable, second, TimeUnit.SECONDS);
}
}