一、前言
基于Jedis客户端连接命令操作redis服务器集群,通过定义集群节点共享RedisSharding、redis节点RedisNode、redis的生成RedisBean及工厂RedisFactoryBean、RedisLock分布式锁及实现类RedisLockImpl及工厂类RedisLockFactoryImpl、业务对象序列化方式FastJsonSerialization类及其常见命令操作类RedisCommands及其RedisCommandsInterceptor拦截器等,具体如下图所示
二、主要代码
1. Redis类 - 定义实际操作redis的常用类型 - RedisCommands命令方式、RedisNode节点方式(jedis客户端连接)、依赖的序列化数据Serialization..
public interface Redis {@b@@b@ RedisCommands getCommands();@b@ @b@ RedisNode shardingLocate(String key);@b@@b@ RedisNode shardingLocate(byte[] key);@b@ @b@ RedisNode getFirstNode();@b@ @b@ <T> T accessFirstNode(RedisCallback<T> callback);@b@ @b@ Serialization getDefSerialization();@b@ @b@ Charset getDefCharset();@b@ @b@ <K,V> RedisMap<K,V> loadMap(String mapId,Class<K> keyClazz@b@ ,Class<V> valueClazz); @b@ @b@ <V> RedisCache<V> loadCache(String namespace@b@ ,Class<V> valueClazz);@b@ @b@ @b@ <E> RedisQueue<E> loadQueue(String queueId@b@ ,Class<E> elementClazz); @b@ @b@ RedisInteger loadIntegerCounter(String counterId);@b@ @b@ RedisLong loadLongCounter(String counterId);@b@ @b@ Lock getThreadLock();@b@ @b@ void destroy();@b@}
2. RedisBean类 - redis实现类
public class RedisBean implements Redis,DisposableBean {@b@ @b@ @b@ protected Log logger=LogFactory.getLog(this.getClass());@b@ @b@ private List<RedisNode> nodes;@b@ @b@ private RedisSharding sharding;@b@ @b@ private final Lock lock = new ReentrantLock();@b@ @b@ private RedisCommands commands;@b@ @b@ private Charset charset;@b@ @b@ private Serialization defSerialization;@b@ @b@ public RedisBean(List<RedisNode> nodes){@b@ //-----@b@ this.nodes=nodes;@b@ //@b@ this.charset=Charset.forName("UTF-8");@b@ }@b@@b@ @Override@b@ public RedisCommands getCommands() {@b@ if(commands==null){@b@ lock.lock();@b@ try{@b@ if(commands==null){@b@ RedisCommandsInterceptor interceptor=new RedisCommandsInterceptor(this);@b@ commands=ProxyFactory.getProxy(RedisCommands.class, interceptor);@b@ }@b@ }finally{@b@ lock.unlock();@b@ }@b@ }@b@ return commands;@b@ }@b@@b@ @b@ public List<RedisNode> getAllNodes() {@b@ return nodes;@b@ }@b@ @b@@b@ @Override@b@ public RedisNode shardingLocate(String key) {@b@ if(key==null){@b@ throw new java.lang.IllegalArgumentException("key be null.");@b@ }@b@ return shardingLocate(key.getBytes());@b@ }@b@@b@ @b@ @Override@b@ public RedisNode getFirstNode() {@b@ if(nodes==null || nodes.size()==0){@b@ throw new RedisException("not found any node.");@b@ }@b@ return nodes.get(0);@b@ }@b@@b@ @Override@b@ public <T> T accessFirstNode(RedisCallback<T> callback) {@b@ return getFirstNode().access(callback);@b@ }@b@@b@ public RedisNode shardingLocate(byte[] key) {@b@ if(key==null){@b@ throw new java.lang.IllegalArgumentException("key be null.");@b@ }@b@ RedisNode node=sharding.select(nodes, key);@b@ if(logger.isDebugEnabled()){@b@ logger.debug("Sharding node="+node+" for key="+new String(key));@b@ }@b@ return node;@b@ }@b@@b@ public void destroy() {@b@ if(nodes!=null && nodes.size()>0){@b@ for(int i=0;i<nodes.size();i++){@b@ RedisNode node=nodes.get(i);@b@ try {@b@ if(node!=null)node.destroy();@b@ if(logger.isInfoEnabled()){@b@ logger.info("Destroy the redis client success. registry: " + node );@b@ }@b@ } catch (Throwable t) {@b@ logger.error("Failured to destroy the redis client. registry: " + node + ", cause: " + t.getMessage(),t);@b@ }@b@ }@b@ }@b@ commands=null;@b@ }@b@@b@ @Override@b@ public String toString() {@b@ return nodes+"";@b@ }@b@@b@ @b@ public Charset getDefCharset() {@b@ return charset;@b@ }@b@@b@ @b@ @b@@b@ @b@@b@ public Serialization getDefSerialization() {@b@ return defSerialization;@b@ }@b@@b@ public void setDefSerialization(Serialization defSerialization) {@b@ this.defSerialization = defSerialization;@b@ }@b@@b@ @Override@b@ public Lock getThreadLock() {@b@ return lock;@b@ }@b@ @b@@b@ @Override@b@ public <K, V> RedisMap<K, V> loadMap(String id, Class<K> keyClazz,@b@ Class<V> valueClazz) {@b@ RedisMapBean<K,V> bean= new RedisMapBean<K,V>(id,this@b@ ,keyClazz,valueClazz);@b@ bean.afterPropertiesSet();@b@ return bean;@b@ }@b@@b@@b@ @Override@b@ public <V> RedisCache<V> loadCache(String namespace, Class<V> valueClazz) {@b@ RedisCacheBean<V> bean= new RedisCacheBean<V>(namespace,this@b@ ,valueClazz);@b@ bean.afterPropertiesSet();@b@ return bean;@b@ }@b@@b@ @b@@b@ @Override@b@ public <E> RedisQueue<E> loadQueue(String queueId, Class<E> elementClazz) {@b@ RedisQueueBean<E> bean=new RedisQueueBean<E>(queueId,this,elementClazz);@b@ bean.afterPropertiesSet();@b@ return bean;@b@ }@b@@b@ @b@@b@ @Override@b@ public RedisInteger loadIntegerCounter(String counterId) {@b@ RedisIntegerBean ri=new RedisIntegerBean(counterId,this);@b@ ri.afterPropertiesSet();@b@ return ri;@b@ }@b@ @b@ @b@@b@ @Override@b@ public RedisLong loadLongCounter(String counterId) {@b@ RedisLongBean ri=new RedisLongBean(counterId,this);@b@ ri.afterPropertiesSet();@b@ return ri;@b@ }@b@@b@ public RedisSharding getSharding() {@b@ return sharding;@b@ }@b@@b@ public void setSharding(RedisSharding sharding) {@b@ this.sharding = sharding;@b@ }@b@@b@ public Charset getCharset() {@b@ return charset;@b@ }@b@@b@ public void setCharset(Charset charset) {@b@ this.charset = charset;@b@ }@b@ @b@ @b@}
3.RedisFactoryBean类 - RedisBean创建工厂
public class RedisFactoryBean extends DataSerializeHandler implements FactoryBean<Redis>,DisposableBean{@b@ @b@ private static final int DEFAULT_REDIS_PORT = 6379;@b@ @b@ protected Log logger=LogFactory.getLog(this.getClass());@b@ @b@ private Properties properties;@b@ @b@ private RedisBean redis;@b@ @b@ private RedisConfigDTO configure;@b@ @b@ private Resource configureResource;@b@ @b@ private String configureURL;@b@ @b@ @b@ @Override@b@ public Redis getObject() throws Exception {@b@ if(redis==null){@b@ redis=get();@b@ }@b@ return redis;@b@ }@b@ @b@@b@ @Override@b@ public void destroy() throws Exception {@b@ if(redis!=null){@b@ redis.destroy();@b@ }@b@ }@b@@b@ @Override@b@ public Class<?> getObjectType() {@b@ return Redis.class;@b@ }@b@@b@ @Override@b@ public boolean isSingleton() {@b@ return true;@b@ }@b@@b@ protected synchronized RedisBean get(){@b@ try{@b@ if(configure==null){@b@ configure=new RedisConfigDTO();@b@ }@b@ if(configureResource!=null){@b@ InputStream input=configureResource.getInputStream();@b@ if(input!=null){@b@ if(properties==null){@b@ properties=new Properties();@b@ }@b@ try{@b@ properties.load(input);@b@ if(logger.isInfoEnabled()){@b@ logger.info("Load properties by resource:"+configureResource+",properties="+properties);@b@ }@b@ }finally{@b@ try{input.close();}catch(IOException ex){}@b@ }@b@ @b@ }@b@ }@b@ if(this.configureURL!=null && this.configureURL.length()>0){@b@ try {@b@ if(logger.isInfoEnabled()){@b@ logger.info("configureURL="+configureURL);@b@ }@b@ PURL purl=PURL.valueOf(configureURL);@b@ if(properties==null){@b@ properties=new Properties();@b@ }@b@ properties.putAll(purl.getParameters());@b@ properties.put("servers",purl.getAddress());@b@ } catch (Exception e) {@b@ throw new RedisException("Process configureURL:"+configureURL+" error,cause:"+e.getMessage(),e);@b@ }@b@ }@b@ if(properties!=null){@b@ try {@b@ if(logger.isInfoEnabled()){@b@ logger.info("properties="+properties);@b@ }@b@ BeanUtils.populate(configure, properties);@b@ } catch (Exception e) {@b@ throw new RedisException("Process properties to class<"+this.getClass().getName()+"> error:"+e.getMessage(),e);@b@ }@b@ }@b@ if(logger.isInfoEnabled()){@b@ logger.info("configure="+JSONObject.toJSONString(configure));@b@ }@b@ //@b@ List<RedisNode> nodes=getNodes(configure);@b@ //@b@ RedisBean redis= new RedisBean(nodes);@b@ redis.setSharding(ShardingFactory.getInstance(redis, configure.getSharding()));@b@ redis.setDefSerialization(loadSerialization(configure.getDefSerialization(),"java"));@b@ //--------------@b@ String charset=configure.getCharset();@b@ if(charset!=null && charset.length()!=0){@b@ redis.setCharset(Charset.forName(charset));@b@ }@b@ return redis;@b@ }catch(Throwable th){@b@ throw new FatalBeanException("Redis config error:"+th.getMessage()+",Properties:\n"+properties,th);@b@ }@b@ }@b@ @b@@b@ protected JedisPool newJedisPool(RedisConfigDTO configure,PURL serverURL){@b@ GenericObjectPoolConfig config=new GenericObjectPoolConfig(); @b@ @b@ config.setJmxEnabled(configure.isJmxEnabled());@b@ //@b@ if(configure.isTestOnBorrow())config.setTestOnBorrow(true);@b@ if(configure.isTestOnReturn())config.setTestOnReturn(true);@b@ if(configure.isTestWhileIdle())config.setTestWhileIdle(true);;@b@ if(configure.getMaxIdle()>0)config.setMaxIdle(configure.getMaxIdle());@b@ if(configure.getMinIdle()>0)config.setMinIdle(configure.getMinIdle());@b@ @b@ if(configure.getMaxActive()>0)config.setMaxTotal(configure.getMaxActive());@b@ if(configure.getMaxTotal()>0)config.setMaxTotal(configure.getMaxTotal());@b@ if(configure.getMaxWait()>0)config.setMaxWaitMillis(configure.getMaxWait());@b@ @b@ @b@ if(configure.getNumTestsPerEvictionRun()>0)config.setNumTestsPerEvictionRun(configure.getNumTestsPerEvictionRun());@b@ if(configure.getTimeBetweenEvictionRunsMillis()>0)config.setTimeBetweenEvictionRunsMillis(configure.getTimeBetweenEvictionRunsMillis());@b@ if(configure.getMinEvictableIdleTimeMillis()>0)config.setMinEvictableIdleTimeMillis(configure.getTimeBetweenEvictionRunsMillis());@b@ String password=serverURL.getPassword();@b@ if(password==null){@b@ password=configure.getPassword();@b@ }@b@ String user=serverURL.getUsername();@b@ if(user==null){@b@ user=configure.getUser();@b@ }@b@ if(user!=null && password==null ){@b@ password=readPassword(user,configure.getPasswordKey(),configure.getPasswordProvider());@b@ }@b@ return new JedisPool(config, serverURL.getHost(), serverURL.getPort(DEFAULT_REDIS_PORT)@b@ , configure.getTimeout(),password);@b@ }@b@ @b@ protected List<RedisNode> getNodes(RedisConfigDTO configure){@b@ String servers= configure.getServers();@b@ if(servers==null || (servers=servers.trim()).length()==0){@b@ throw new IllegalArgumentException("property<servers> requried.");@b@ }@b@ Set<String> addresses=Pafa5ConfigUtils.split(servers);@b@ List<RedisNode> nodes=new ArrayList<RedisNode>();@b@ for (String address : addresses) {@b@ PURL serverURL=PURL.valueOf(address);@b@ JedisPool jp= newJedisPool(configure,serverURL);@b@ nodes.add(new RedisNodeImpl(address,jp));@b@ }@b@ return nodes;@b@ }@b@ @b@ protected String readPassword(String user,String passwordKey,String passwordProvider){@b@ if(passwordKey==null)passwordKey=user;@b@ return PasswordProviderFactory.getProvider(passwordProvider).getPassword(@b@ new PasswordContext(passwordKey).setRequired(true));@b@ }@b@@b@ public Properties getProperties() {@b@ return properties;@b@ }@b@@b@ public void setProperties(Properties properties) {@b@ this.properties = properties;@b@ }@b@@b@@b@ public RedisConfigDTO getConfigure() {@b@ return configure;@b@ }@b@@b@@b@ public void setConfigure(RedisConfigDTO configure) {@b@ this.configure = configure;@b@ }@b@@b@@b@ public Resource getConfigureResource() {@b@ return configureResource;@b@ }@b@@b@@b@ public void setConfigureResource(Resource configureResource) {@b@ this.configureResource = configureResource;@b@ }@b@@b@@b@ public String getConfigureURL() {@b@ return configureURL;@b@ }@b@@b@@b@ public void setConfigureURL(String configureURL) {@b@ this.configureURL = configureURL;@b@ }@b@@b@ @b@ @b@ @b@}
4. RedisSharding集群资源共享类 - 取余、hash关系表及基于CRC32后取模的Redis分区等算法方式
public class ShardingFactory {@b@ @b@ private static ModularRedisSharding modularRedisSharding=new ModularRedisSharding();@b@@b@ public static RedisSharding getInstance(Redis redis,String type){@b@ if("consistentHash".equalsIgnoreCase(type)){@b@ return new ConsistentHashRedisSharding();@b@ }else if("crcmodular".equalsIgnoreCase(type)){@b@ return new CRCModularRedisSharding();@b@ }else if(type==null|| type.length()==0 || "modular".equalsIgnoreCase(type)){@b@ return modularRedisSharding;@b@ }else{@b@ return newSharding(type);@b@ }@b@ }@b@ @b@@b@ @SuppressWarnings("unchecked")@b@ protected static RedisSharding newSharding(String className){@b@ Class<RedisSharding> clazz=null;@b@ try {@b@ clazz = (Class<RedisSharding>) ClassUtils.forName(className@b@ ,ClassUtils.getDefaultClassLoader());@b@ return BeanUtils.instantiate(clazz);@b@ } catch (Exception e) {@b@ throw new RedisException(e.getMessage(),e);@b@ }@b@ }@b@}@b@public abstract class AbstractSharding implements RedisSharding {@b@@b@ @b@ protected Log logger=LogFactory.getLog(this.getClass());@b@ @b@ @Override@b@ public RedisNode select(List<RedisNode> nodes,byte[] key) {@b@ if (nodes == null || nodes.size() == 0)@b@ return null;@b@ if (nodes.size() == 1)@b@ return nodes.get(0);@b@ RedisNode node= doSelect(nodes,key);@b@ if(node==null){@b@ throw new RedisException("Sharding error,not found node by nodes:"+nodes);@b@ }@b@ return node;@b@ }@b@@b@ @b@ protected abstract RedisNode doSelect(List<RedisNode> nodes, byte[] key);@b@@b@@b@}@b@public class ModularRedisSharding extends AbstractSharding {@b@@b@ @Override@b@ protected RedisNode doSelect(List<RedisNode> nodes, byte[] key) {@b@ long hashcode=HashUtils.KETAMA_HASH.hash(key);@b@ int modular=(int)(hashcode%nodes.size());@b@ RedisNode node=nodes.get(modular);@b@ return node;@b@ }@b@@b@ @b@}
5.分布式锁RedisLock - 通过jedis.setnx的接口将首次赋值返回1,其余的情况返回0的方式,加上Redis内部机制单进程单线程模式,将客户端的并发访问通过队列存储转换为串行访问,避免首次setnx并发竞争锁的情况
public interface RedisLock {@b@@b@ @b@ boolean tryLock();@b@@b@ boolean tryLock(int waitMillsTime);@b@ @b@ void unlock();@b@@b@ void reinit();@b@ @b@ boolean execute(Runnable command);@b@}
public class RedisLockImpl implements RedisLock{@b@ @b@ protected Log logger=LogFactory.getLog(this.getClass());@b@ @b@ private RedisCommands commands;@b@ @b@ private Redis redis;@b@ @b@ private String id;@b@ @b@ private byte[] bytesId;@b@ @b@ private int expireTime;@b@ @b@ private int threadSleepTime=500;@b@ @b@ public RedisLockImpl(String id,Redis redis,int expireTime){@b@ this.id=id;@b@ this.bytesId=id.getBytes(redis.getDefCharset());@b@ this.redis=redis;@b@ this.expireTime=expireTime;@b@ this.commands=redis.shardingLocate(bytesId).getCommands();@b@ }@b@ @b@ @b@ @Override@b@ public boolean tryLock() {@b@ String value = "{ip="+PNetUtils.getLocalHost()@b@ +",time="+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")@b@ .format(new Date())+"}";@b@ long r=commands.setnx(bytesId, value.getBytes());@b@ if(r==1){@b@ int expireTime=this.getExpireTime();@b@ if(expireTime>0){@b@ commands.expire(bytesId, expireTime);@b@ }@b@ logger.info("Locked<"+id+"> by "+value+" success.");@b@ return true;@b@ }else{@b@ if(logger.isDebugEnabled()){@b@ logger.debug("Try to Lock<"+id+"> fail.");@b@ }@b@ return false;@b@ }@b@ }@b@@b@ @b@ @b@ @Override@b@ public boolean execute(Runnable command) {@b@ if(this.tryLock()){@b@ try{@b@ command.run();@b@ return true;@b@ }finally{@b@ this.unlock();@b@ }@b@ }@b@ return false;@b@ }@b@@b@@b@@b@ @Override@b@ public boolean tryLock(int waitTime) {@b@ if(waitTime<1){@b@ return tryLock();@b@ }@b@ boolean result=false;@b@ long compare = System.nanoTime();@b@ do{@b@ if(tryLock()){@b@ result=true;@b@ break;@b@ }@b@ if(threadSleepTime>0){@b@ try {@b@ Thread.sleep(threadSleepTime);@b@ } catch (InterruptedException e1) {@b@ e1.printStackTrace();@b@ }@b@ }@b@ }while((System.nanoTime() - compare)/1000000 < waitTime);@b@ return result;@b@ }@b@@b@ @Override@b@ public void unlock() {@b@ commands.del(bytesId);@b@ if(logger.isInfoEnabled()){@b@ logger.info("Lock<"+id+"> unlock.");@b@ }@b@ }@b@@b@@b@@b@ public int getExpireTime() {@b@ return expireTime;@b@ }@b@@b@@b@@b@ public void setExpireTime(int expireTime) {@b@ this.expireTime = expireTime;@b@ }@b@@b@@b@@b@ public int getThreadSleepTime() {@b@ return threadSleepTime;@b@ }@b@@b@@b@@b@ public void setThreadSleepTime(int threadSleepTime) {@b@ this.threadSleepTime = threadSleepTime;@b@ }@b@@b@@b@@b@ public Redis getRedis() {@b@ return redis;@b@ }@b@@b@@b@@b@ public void setRedis(Redis redis) {@b@ this.redis = redis;@b@ }@b@@b@@b@@b@ @Override@b@ public void reinit() {@b@ commands.expire(bytesId, 0);@b@ if(logger.isInfoEnabled()){@b@ logger.info("Lock<"+id+"> reinit.");@b@ }@b@ }@b@@b@ @b@}
public class RedisLockFactoryImpl implements RedisLockFactory,InitializingBean{@b@ @b@ private static final int DEF_LOCK_EXPIRE_TIME=60*60;//1灏忔椂@b@ @b@ private String namespace;@b@ @b@ private int lockExpireTime=DEF_LOCK_EXPIRE_TIME;@b@ @b@ private Redis redis; @b@ @b@@b@ @Override@b@ public RedisLock getLock(String objId) {@b@ return getLock(objId,this.lockExpireTime);@b@ }@b@ @b@ @Override@b@ public RedisLock getLock(String objId,int lockExpireTime) {@b@ if(objId==null){@b@ throw new IllegalArgumentException("objId is null");@b@ }@b@ RedisLockImpl lock=new RedisLockImpl(namespace+objId,redis,lockExpireTime);@b@ return lock;@b@ }@b@@b@ @Override@b@ public void afterPropertiesSet() {@b@ if(namespace==null || (namespace=namespace.trim()).length()==0){@b@ throw new FatalBeanException("namespace is null");@b@ }@b@ if(redis==null){@b@ throw new FatalBeanException("redis is null");@b@ }@b@ }@b@@b@ public String getNamespace() {@b@ return namespace;@b@ }@b@@b@ public void setNamespace(String namespace) {@b@ this.namespace = namespace;@b@ }@b@@b@ public int getLockExpireTime() {@b@ return lockExpireTime;@b@ }@b@@b@ public void setLockExpireTime(int lockExpireTime) {@b@ this.lockExpireTime = lockExpireTime;@b@ }@b@@b@ public Redis getRedis() {@b@ return redis;@b@ }@b@@b@ public void setRedis(Redis redis) {@b@ this.redis = redis;@b@ }@b@@b@ @b@}