首页

分享基于jedis客户端实现redis集群cluster组件的实现完整源码类图及主要代码示例

标签:RedisCluster,分布式锁,JedisCluster,RedisSharding,集群,算法,setnx,原理,DubboSerialization,MethodInterceptor,拦截器,RedisLockImpl,BinaryJedisCommands     发布时间:2017-06-19   

一、前言

基于Jedis客户端连接命令操作redis服务器集群,通过定义集群节点共享RedisSharding、redis节点RedisNode、redis的生成RedisBean及工厂RedisFactoryBean、RedisLock分布式锁及实现类RedisLockImpl及工厂类RedisLockFactoryImpl、业务对象序列化方式FastJsonSerialization类及其常见命令操作类RedisCommands及其RedisCommandsInterceptor拦截器等,具体如下图所示

分享基于jedis客户端实现redis集群cluster组件的实现完整源码类图及主要代码示例

二、主要代码

1. Redis类 - 定义实际操作redis的常用类型 - RedisCommands命令方式、RedisNode节点方式(jedis客户端连接)、依赖的序列化数据Serialization..

public interface Redis {@b@@b@	RedisCommands getCommands();@b@	@b@	RedisNode shardingLocate(String key);@b@@b@	RedisNode shardingLocate(byte[] key);@b@	@b@	RedisNode getFirstNode();@b@	@b@	<T> T accessFirstNode(RedisCallback<T> callback);@b@	@b@	Serialization getDefSerialization();@b@	@b@	Charset getDefCharset();@b@	@b@	<K,V> RedisMap<K,V> loadMap(String mapId,Class<K> keyClazz@b@			,Class<V> valueClazz); @b@			@b@	<V> RedisCache<V> loadCache(String namespace@b@			,Class<V> valueClazz);@b@	 @b@	@b@	<E> RedisQueue<E> loadQueue(String queueId@b@			,Class<E> elementClazz); @b@	@b@	RedisInteger loadIntegerCounter(String counterId);@b@	@b@	RedisLong loadLongCounter(String counterId);@b@	@b@	Lock getThreadLock();@b@	@b@	void destroy();@b@}

2. RedisBean类 - redis实现类

public class RedisBean  implements Redis,DisposableBean {@b@	@b@	@b@	protected Log logger=LogFactory.getLog(this.getClass());@b@	@b@	private List<RedisNode> nodes;@b@	@b@	private RedisSharding sharding;@b@	@b@	private final Lock lock = new ReentrantLock();@b@	@b@	private RedisCommands commands;@b@	@b@	private Charset charset;@b@	@b@	private Serialization defSerialization;@b@	@b@	public RedisBean(List<RedisNode> nodes){@b@		//-----@b@		this.nodes=nodes;@b@		//@b@		this.charset=Charset.forName("UTF-8");@b@	}@b@@b@	@Override@b@	public RedisCommands getCommands() {@b@		if(commands==null){@b@			lock.lock();@b@			try{@b@				if(commands==null){@b@					RedisCommandsInterceptor interceptor=new RedisCommandsInterceptor(this);@b@					commands=ProxyFactory.getProxy(RedisCommands.class, interceptor);@b@				}@b@			}finally{@b@				lock.unlock();@b@			}@b@		}@b@		return commands;@b@	}@b@@b@	@b@	public List<RedisNode> getAllNodes() {@b@		return nodes;@b@	}@b@	@b@@b@	@Override@b@	public RedisNode shardingLocate(String key) {@b@		if(key==null){@b@			throw new java.lang.IllegalArgumentException("key be null.");@b@		}@b@		return shardingLocate(key.getBytes());@b@	}@b@@b@	@b@	@Override@b@	public RedisNode getFirstNode() {@b@		if(nodes==null || nodes.size()==0){@b@			throw new RedisException("not found any node.");@b@		}@b@		return nodes.get(0);@b@	}@b@@b@	@Override@b@	public <T> T accessFirstNode(RedisCallback<T> callback) {@b@		return getFirstNode().access(callback);@b@	}@b@@b@	public RedisNode shardingLocate(byte[] key) {@b@		if(key==null){@b@			throw new java.lang.IllegalArgumentException("key be null.");@b@		}@b@		RedisNode node=sharding.select(nodes, key);@b@		if(logger.isDebugEnabled()){@b@			logger.debug("Sharding node="+node+" for key="+new String(key));@b@		}@b@		return node;@b@	}@b@@b@	public void destroy()  {@b@		if(nodes!=null && nodes.size()>0){@b@			for(int i=0;i<nodes.size();i++){@b@				RedisNode node=nodes.get(i);@b@				try {@b@					if(node!=null)node.destroy();@b@					if(logger.isInfoEnabled()){@b@						logger.info("Destroy the redis  client success. registry: " + node );@b@					}@b@	            } catch (Throwable t) {@b@	            	logger.error("Failured to destroy the redis  client. registry: " + node + ", cause: " + t.getMessage(),t);@b@	            }@b@			}@b@		}@b@		commands=null;@b@	}@b@@b@	@Override@b@	public String toString() {@b@		return nodes+"";@b@	}@b@@b@	@b@	public Charset getDefCharset() {@b@		return charset;@b@	}@b@@b@	@b@	@b@@b@	@b@@b@	public Serialization getDefSerialization() {@b@		return defSerialization;@b@	}@b@@b@	public void setDefSerialization(Serialization defSerialization) {@b@		this.defSerialization = defSerialization;@b@	}@b@@b@	@Override@b@	public Lock getThreadLock() {@b@		return lock;@b@	}@b@	@b@@b@	@Override@b@	public <K, V> RedisMap<K, V> loadMap(String id, Class<K> keyClazz,@b@			Class<V> valueClazz) {@b@		RedisMapBean<K,V> bean= new RedisMapBean<K,V>(id,this@b@				,keyClazz,valueClazz);@b@		bean.afterPropertiesSet();@b@		return bean;@b@	}@b@@b@@b@	@Override@b@	public <V> RedisCache<V> loadCache(String namespace, Class<V> valueClazz) {@b@		RedisCacheBean<V> bean= new RedisCacheBean<V>(namespace,this@b@				,valueClazz);@b@		bean.afterPropertiesSet();@b@		return bean;@b@	}@b@@b@	@b@@b@	@Override@b@	public <E> RedisQueue<E> loadQueue(String queueId, Class<E> elementClazz) {@b@		RedisQueueBean<E> bean=new RedisQueueBean<E>(queueId,this,elementClazz);@b@		bean.afterPropertiesSet();@b@		return bean;@b@	}@b@@b@	@b@@b@	@Override@b@	public RedisInteger loadIntegerCounter(String counterId) {@b@		RedisIntegerBean ri=new RedisIntegerBean(counterId,this);@b@		ri.afterPropertiesSet();@b@		return ri;@b@	}@b@	@b@	@b@@b@	@Override@b@	public RedisLong loadLongCounter(String counterId) {@b@		RedisLongBean ri=new RedisLongBean(counterId,this);@b@		ri.afterPropertiesSet();@b@		return ri;@b@	}@b@@b@	public RedisSharding getSharding() {@b@		return sharding;@b@	}@b@@b@	public void setSharding(RedisSharding sharding) {@b@		this.sharding = sharding;@b@	}@b@@b@	public Charset getCharset() {@b@		return charset;@b@	}@b@@b@	public void setCharset(Charset charset) {@b@		this.charset = charset;@b@	}@b@ @b@	@b@}

3.RedisFactoryBean类 - RedisBean创建工厂

public class RedisFactoryBean extends DataSerializeHandler implements FactoryBean<Redis>,DisposableBean{@b@	@b@	private static final int DEFAULT_REDIS_PORT = 6379;@b@	@b@	protected Log logger=LogFactory.getLog(this.getClass());@b@	@b@	private Properties properties;@b@	@b@	private RedisBean redis;@b@	@b@	private RedisConfigDTO configure;@b@	@b@	private Resource configureResource;@b@	@b@	private String configureURL;@b@	@b@	@b@	@Override@b@	public Redis getObject() throws Exception {@b@		if(redis==null){@b@			redis=get();@b@		}@b@		return redis;@b@	}@b@	@b@@b@	@Override@b@	public void destroy() throws Exception {@b@		if(redis!=null){@b@			redis.destroy();@b@		}@b@	}@b@@b@	@Override@b@	public Class<?> getObjectType() {@b@		return Redis.class;@b@	}@b@@b@	@Override@b@	public boolean isSingleton() {@b@		return true;@b@	}@b@@b@	protected synchronized RedisBean get(){@b@		try{@b@			if(configure==null){@b@				configure=new RedisConfigDTO();@b@			}@b@			if(configureResource!=null){@b@				InputStream input=configureResource.getInputStream();@b@				if(input!=null){@b@					if(properties==null){@b@						properties=new Properties();@b@					}@b@					try{@b@						properties.load(input);@b@						if(logger.isInfoEnabled()){@b@							logger.info("Load properties by resource:"+configureResource+",properties="+properties);@b@						}@b@					}finally{@b@						try{input.close();}catch(IOException ex){}@b@					}@b@					@b@				}@b@			}@b@			if(this.configureURL!=null && this.configureURL.length()>0){@b@				try {@b@					if(logger.isInfoEnabled()){@b@						logger.info("configureURL="+configureURL);@b@					}@b@					PURL purl=PURL.valueOf(configureURL);@b@					if(properties==null){@b@						properties=new Properties();@b@					}@b@					properties.putAll(purl.getParameters());@b@					properties.put("servers",purl.getAddress());@b@				} catch (Exception e) {@b@					throw new RedisException("Process configureURL:"+configureURL+" error,cause:"+e.getMessage(),e);@b@				}@b@			}@b@			if(properties!=null){@b@				try {@b@					if(logger.isInfoEnabled()){@b@						logger.info("properties="+properties);@b@					}@b@					BeanUtils.populate(configure, properties);@b@				} catch (Exception e) {@b@					throw new RedisException("Process properties to class<"+this.getClass().getName()+"> error:"+e.getMessage(),e);@b@				}@b@			}@b@			if(logger.isInfoEnabled()){@b@				logger.info("configure="+JSONObject.toJSONString(configure));@b@			}@b@			//@b@			List<RedisNode> nodes=getNodes(configure);@b@			//@b@			RedisBean redis= new RedisBean(nodes);@b@			redis.setSharding(ShardingFactory.getInstance(redis, configure.getSharding()));@b@			redis.setDefSerialization(loadSerialization(configure.getDefSerialization(),"java"));@b@			//--------------@b@			String charset=configure.getCharset();@b@			if(charset!=null && charset.length()!=0){@b@				redis.setCharset(Charset.forName(charset));@b@			}@b@			return redis;@b@		}catch(Throwable th){@b@			throw new FatalBeanException("Redis config error:"+th.getMessage()+",Properties:\n"+properties,th);@b@		}@b@	}@b@	@b@@b@	protected JedisPool newJedisPool(RedisConfigDTO configure,PURL serverURL){@b@		GenericObjectPoolConfig config=new GenericObjectPoolConfig(); @b@		@b@		config.setJmxEnabled(configure.isJmxEnabled());@b@		//@b@		if(configure.isTestOnBorrow())config.setTestOnBorrow(true);@b@		if(configure.isTestOnReturn())config.setTestOnReturn(true);@b@		if(configure.isTestWhileIdle())config.setTestWhileIdle(true);;@b@		if(configure.getMaxIdle()>0)config.setMaxIdle(configure.getMaxIdle());@b@		if(configure.getMinIdle()>0)config.setMinIdle(configure.getMinIdle());@b@		@b@		if(configure.getMaxActive()>0)config.setMaxTotal(configure.getMaxActive());@b@		if(configure.getMaxTotal()>0)config.setMaxTotal(configure.getMaxTotal());@b@		if(configure.getMaxWait()>0)config.setMaxWaitMillis(configure.getMaxWait());@b@		@b@		@b@		if(configure.getNumTestsPerEvictionRun()>0)config.setNumTestsPerEvictionRun(configure.getNumTestsPerEvictionRun());@b@		if(configure.getTimeBetweenEvictionRunsMillis()>0)config.setTimeBetweenEvictionRunsMillis(configure.getTimeBetweenEvictionRunsMillis());@b@		if(configure.getMinEvictableIdleTimeMillis()>0)config.setMinEvictableIdleTimeMillis(configure.getTimeBetweenEvictionRunsMillis());@b@		String password=serverURL.getPassword();@b@		if(password==null){@b@			password=configure.getPassword();@b@		}@b@		String user=serverURL.getUsername();@b@		if(user==null){@b@			user=configure.getUser();@b@		}@b@		if(user!=null && password==null ){@b@			password=readPassword(user,configure.getPasswordKey(),configure.getPasswordProvider());@b@		}@b@		return  new JedisPool(config, serverURL.getHost(), serverURL.getPort(DEFAULT_REDIS_PORT)@b@				, configure.getTimeout(),password);@b@	}@b@	@b@	protected List<RedisNode> getNodes(RedisConfigDTO configure){@b@		String servers= configure.getServers();@b@		if(servers==null || (servers=servers.trim()).length()==0){@b@			throw new IllegalArgumentException("property<servers> requried.");@b@		}@b@		Set<String> addresses=Pafa5ConfigUtils.split(servers);@b@		List<RedisNode> nodes=new ArrayList<RedisNode>();@b@		for (String address : addresses) {@b@			PURL serverURL=PURL.valueOf(address);@b@            JedisPool jp= newJedisPool(configure,serverURL);@b@            nodes.add(new RedisNodeImpl(address,jp));@b@	    }@b@		return nodes;@b@	}@b@	@b@	protected String readPassword(String user,String passwordKey,String passwordProvider){@b@		if(passwordKey==null)passwordKey=user;@b@		return PasswordProviderFactory.getProvider(passwordProvider).getPassword(@b@				new PasswordContext(passwordKey).setRequired(true));@b@	}@b@@b@	public Properties getProperties() {@b@		return properties;@b@	}@b@@b@	public void setProperties(Properties properties) {@b@		this.properties = properties;@b@	}@b@@b@@b@	public RedisConfigDTO getConfigure() {@b@		return configure;@b@	}@b@@b@@b@	public void setConfigure(RedisConfigDTO configure) {@b@		this.configure = configure;@b@	}@b@@b@@b@	public Resource getConfigureResource() {@b@		return configureResource;@b@	}@b@@b@@b@	public void setConfigureResource(Resource configureResource) {@b@		this.configureResource = configureResource;@b@	}@b@@b@@b@	public String getConfigureURL() {@b@		return configureURL;@b@	}@b@@b@@b@	public void setConfigureURL(String configureURL) {@b@		this.configureURL = configureURL;@b@	}@b@@b@	@b@	@b@	@b@}

4. RedisSharding集群资源共享类 - 取余、hash关系表及基于CRC32后取模的Redis分区等算法方式

public class ShardingFactory {@b@	@b@	private static ModularRedisSharding modularRedisSharding=new ModularRedisSharding();@b@@b@	public static RedisSharding getInstance(Redis redis,String type){@b@		if("consistentHash".equalsIgnoreCase(type)){@b@			return new ConsistentHashRedisSharding();@b@		}else if("crcmodular".equalsIgnoreCase(type)){@b@			return new CRCModularRedisSharding();@b@		}else if(type==null|| type.length()==0 || "modular".equalsIgnoreCase(type)){@b@			return modularRedisSharding;@b@		}else{@b@			return  newSharding(type);@b@		}@b@	}@b@	@b@@b@	@SuppressWarnings("unchecked")@b@	protected static RedisSharding newSharding(String className){@b@		Class<RedisSharding> clazz=null;@b@		try {@b@			clazz = (Class<RedisSharding>) ClassUtils.forName(className@b@					,ClassUtils.getDefaultClassLoader());@b@			return BeanUtils.instantiate(clazz);@b@		} catch (Exception e) {@b@			throw new RedisException(e.getMessage(),e);@b@		}@b@	}@b@}@b@public abstract class AbstractSharding implements RedisSharding {@b@@b@	@b@	protected Log logger=LogFactory.getLog(this.getClass());@b@	@b@    @Override@b@	public RedisNode select(List<RedisNode> nodes,byte[] key) {@b@    	 if (nodes == null || nodes.size() == 0)@b@             return null;@b@         if (nodes.size() == 1)@b@             return nodes.get(0);@b@         RedisNode node= doSelect(nodes,key);@b@         if(node==null){@b@        	 throw new RedisException("Sharding error,not found node by nodes:"+nodes);@b@         }@b@         return node;@b@	}@b@@b@	@b@    protected abstract RedisNode doSelect(List<RedisNode> nodes, byte[] key);@b@@b@@b@}@b@public class ModularRedisSharding extends AbstractSharding {@b@@b@	@Override@b@	protected RedisNode doSelect(List<RedisNode> nodes, byte[] key) {@b@		long hashcode=HashUtils.KETAMA_HASH.hash(key);@b@		int modular=(int)(hashcode%nodes.size());@b@		RedisNode node=nodes.get(modular);@b@		return node;@b@	}@b@@b@	@b@}

5.分布式锁RedisLock - 通过jedis.setnx的接口将首次赋值返回1,其余的情况返回0的方式,加上Redis内部机制单进程单线程模式,将客户端的并发访问通过队列存储转换为串行访问,避免首次setnx并发竞争锁的情况

public interface RedisLock  {@b@@b@	@b@	 boolean tryLock();@b@@b@	 boolean tryLock(int waitMillsTime);@b@	 @b@	 void unlock();@b@@b@	 void reinit();@b@	 @b@	 boolean execute(Runnable command);@b@}
public class RedisLockImpl implements RedisLock{@b@	@b@	protected Log logger=LogFactory.getLog(this.getClass());@b@	@b@	private RedisCommands commands;@b@	@b@	private Redis redis;@b@	@b@	private String id;@b@	@b@	private byte[] bytesId;@b@	@b@	private int expireTime;@b@	@b@	private int threadSleepTime=500;@b@	@b@	public RedisLockImpl(String id,Redis redis,int expireTime){@b@		this.id=id;@b@		this.bytesId=id.getBytes(redis.getDefCharset());@b@		this.redis=redis;@b@		this.expireTime=expireTime;@b@		this.commands=redis.shardingLocate(bytesId).getCommands();@b@	}@b@	@b@	@b@	@Override@b@	public boolean tryLock() {@b@		String value =  "{ip="+PNetUtils.getLocalHost()@b@				+",time="+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")@b@				.format(new Date())+"}";@b@		long r=commands.setnx(bytesId, value.getBytes());@b@		if(r==1){@b@			int expireTime=this.getExpireTime();@b@			if(expireTime>0){@b@				commands.expire(bytesId, expireTime);@b@			}@b@			logger.info("Locked<"+id+"> by "+value+"  success.");@b@			return true;@b@		}else{@b@			if(logger.isDebugEnabled()){@b@				logger.debug("Try to Lock<"+id+">  fail.");@b@			}@b@			return false;@b@		}@b@	}@b@@b@	@b@	@b@	@Override@b@	public boolean execute(Runnable command) {@b@		if(this.tryLock()){@b@			try{@b@				command.run();@b@				return true;@b@			}finally{@b@				this.unlock();@b@			}@b@		}@b@		return false;@b@	}@b@@b@@b@@b@	@Override@b@	public boolean tryLock(int waitTime) {@b@		if(waitTime<1){@b@			return tryLock();@b@		}@b@		boolean result=false;@b@		long compare = System.nanoTime();@b@		do{@b@			if(tryLock()){@b@				result=true;@b@				break;@b@			}@b@			if(threadSleepTime>0){@b@				try {@b@					Thread.sleep(threadSleepTime);@b@				} catch (InterruptedException e1) {@b@					e1.printStackTrace();@b@				}@b@			}@b@		}while((System.nanoTime() - compare)/1000000 < waitTime);@b@		return result;@b@	}@b@@b@	@Override@b@	public void unlock() {@b@		commands.del(bytesId);@b@		if(logger.isInfoEnabled()){@b@			logger.info("Lock<"+id+"> unlock.");@b@		}@b@	}@b@@b@@b@@b@	public int getExpireTime() {@b@		return expireTime;@b@	}@b@@b@@b@@b@	public void setExpireTime(int expireTime) {@b@		this.expireTime = expireTime;@b@	}@b@@b@@b@@b@	public int getThreadSleepTime() {@b@		return threadSleepTime;@b@	}@b@@b@@b@@b@	public void setThreadSleepTime(int threadSleepTime) {@b@		this.threadSleepTime = threadSleepTime;@b@	}@b@@b@@b@@b@	public Redis getRedis() {@b@		return redis;@b@	}@b@@b@@b@@b@	public void setRedis(Redis redis) {@b@		this.redis = redis;@b@	}@b@@b@@b@@b@	@Override@b@	public void reinit() {@b@		commands.expire(bytesId, 0);@b@		if(logger.isInfoEnabled()){@b@			logger.info("Lock<"+id+"> reinit.");@b@		}@b@	}@b@@b@	@b@}
public class RedisLockFactoryImpl implements RedisLockFactory,InitializingBean{@b@	@b@	private static final int DEF_LOCK_EXPIRE_TIME=60*60;//1灏忔椂@b@	@b@	private String namespace;@b@	@b@	private int lockExpireTime=DEF_LOCK_EXPIRE_TIME;@b@	@b@	private Redis redis; @b@	@b@@b@	@Override@b@	public RedisLock getLock(String objId) {@b@		return getLock(objId,this.lockExpireTime);@b@	}@b@	@b@	@Override@b@	public RedisLock getLock(String objId,int lockExpireTime) {@b@		if(objId==null){@b@			throw  new IllegalArgumentException("objId is null");@b@		}@b@		RedisLockImpl lock=new RedisLockImpl(namespace+objId,redis,lockExpireTime);@b@		return lock;@b@	}@b@@b@	@Override@b@	public void afterPropertiesSet()  {@b@		if(namespace==null || (namespace=namespace.trim()).length()==0){@b@			throw new FatalBeanException("namespace is null");@b@		}@b@		if(redis==null){@b@			throw new FatalBeanException("redis is null");@b@		}@b@	}@b@@b@	public String getNamespace() {@b@		return namespace;@b@	}@b@@b@	public void setNamespace(String namespace) {@b@		this.namespace = namespace;@b@	}@b@@b@	public int getLockExpireTime() {@b@		return lockExpireTime;@b@	}@b@@b@	public void setLockExpireTime(int lockExpireTime) {@b@		this.lockExpireTime = lockExpireTime;@b@	}@b@@b@	public Redis getRedis() {@b@		return redis;@b@	}@b@@b@	public void setRedis(Redis redis) {@b@		this.redis = redis;@b@	}@b@@b@	@b@}