首页

关于redis主从写模式读写动态分离、故障心跳检测自动切换等实现系统HA高可用java代码示例

标签:redis高可用,Redis读写分离代码,redis主从demo     发布时间:2019-04-12   

一、示例说明

本章节通过java代码实现连接redis主从(配置教程参考其他文章)高可用示例代码,该代码实现redis主从模式下读写分离、故障检测切换、自动切换主master节点写入、自动切换slave备用节点读取、主从节点自动同步等功能。该代码主要依赖包:jedis-2.1.0.jar、commons-pool.jar、commons-logging-1.1.jar、cglib-2.1.3.jar、asm.jar,完整的项目点击"关于redis四种不同模式(单例/主从/哨兵/集群)java示例代码项目下载(含完整依赖包)"进行下载

二、代码示例

1. HAJedisInfo.java、HAJedis.java依赖类

package com.xwood.redis.slave;@b@@b@import redis.clients.jedis.Jedis;@b@@b@class HAJedis extends Jedis{@b@@b@	public HAJedis(String host, int port, int timeout) {@b@		super(host, port, timeout);@b@		// TODO Auto-generated constructor stub@b@	}@b@@b@	public HAJedis(String host, int port) {@b@		super(host, port);@b@		// TODO Auto-generated constructor stub@b@	}@b@@b@	public HAJedis(String host) {@b@		super(host);@b@		// TODO Auto-generated constructor stub@b@	}@b@@b@	@Override@b@	public boolean equals(Object obj) {@b@		// TODO Auto-generated method stub@b@		if (obj == null) return false;@b@		Jedis jedis = (Jedis)obj;@b@		return this.getClient().getHost().equals(jedis.getClient().getHost())@b@		&& this.getClient().getPort()==jedis.getClient().getPort();@b@	}@b@	@b@	public boolean equals(String ip,int port) {@b@		// TODO Auto-generated method stub@b@		return this.getClient().getHost().equals(ip)@b@		&& this.getClient().getPort()==port;@b@	}@b@@b@	@Override@b@	public String toString() {@b@		// TODO Auto-generated method stub@b@		return this.getClient().getHost()+":"+this.getClient().getPort();@b@	}@b@	@b@	public String getIp(){@b@		return this.getClient().getHost();@b@	}@b@	public int getPort(){@b@		return this.getClient().getPort();@b@	}@b@	public int getTimeout(){@b@		return this.getClient().getTimeout();@b@	}@b@	@b@}
package com.xwood.redis.slave;@b@@b@public class HAJedisInfo {@b@	@b@	private String ip;@b@	private int port;@b@	private int timeout;@b@	private boolean ismaster;@b@@b@	public HAJedisInfo(String ip, int port, int timeout) {@b@		super();@b@		this.ip = ip;@b@		this.port = port;@b@		this.timeout = timeout;@b@	}@b@@b@	public HAJedisInfo(String ip, int port) {@b@		super();@b@		this.ip = ip;@b@		this.port = port;@b@		this.timeout = 5000;@b@	}@b@@b@	public boolean isIsmaster() {@b@		return ismaster;@b@	}@b@@b@	public void setIsmaster(boolean ismaster) {@b@		this.ismaster = ismaster;@b@	}@b@@b@	public boolean testisConnection() {@b@		return true;@b@	}@b@@b@	public int getTimeout() {@b@		return timeout;@b@	}@b@@b@	public void setTimeout(int timeout) {@b@		this.timeout = timeout;@b@	}@b@@b@	public String getIp() {@b@		return ip;@b@	}@b@@b@	public void setIp(String ip) {@b@		this.ip = ip;@b@	}@b@@b@	public int getPort() {@b@		return port;@b@	}@b@@b@	public void setPort(int port) {@b@		this.port = port;@b@	}@b@@b@}

2. RedisSlaveManager.java运行测试类

package com.xwood.redis.slave;@b@@b@import java.lang.reflect.InvocationTargetException;@b@import java.lang.reflect.Method;@b@import java.util.ArrayList;@b@import java.util.List;@b@@b@import net.sf.cglib.proxy.Enhancer;@b@import net.sf.cglib.proxy.MethodInterceptor;@b@import net.sf.cglib.proxy.MethodProxy;@b@@b@import org.apache.commons.logging.Log;@b@import org.apache.commons.logging.LogFactory;@b@import org.apache.commons.pool.BasePoolableObjectFactory;@b@import org.apache.commons.pool.impl.GenericObjectPool;@b@import org.apache.commons.pool.impl.GenericObjectPool.Config;@b@@b@import redis.clients.jedis.Jedis;@b@import redis.clients.jedis.exceptions.JedisConnectionException;@b@import redis.clients.jedis.exceptions.JedisException;@b@@b@/**@b@ * redis主从架构的客户端管理类,依赖包cglib-2.1.3.jar、commons-loggin-1.1.jar、asm.jar、jedis-2.0.jar@b@ * .0.jar、commons-pool.jar 1@b@ * 支持主从复制key/value,pop/push,pub/sub,读/写分离等功能的灾难失败自动选择和恢复 2@b@ * 可以选择读写分离功能,主写从读,默认不启用,都使用主服务进行所有操作@b@ */@b@public class RedisSlaveManager{  @b@	 @b@	@b@	RedisManagerProxy proxy;@b@	@b@	/**@b@	 * 连接ip格式,默认第一个为主服务器,后面的为从服务器,若只设置一个,则为主服务器@b@	 */@b@	public static final String DEFAULTIPFORMAT = "127.0.0.1:6379,127.0.0.1:6380"; @b@	@b@	static String ips = null;@b@	@b@	boolean enableReadWriteSeparation;@b@	@b@	ThreadLocal<RedisReadOrWriteEnum> cop = new ThreadLocal<RedisReadOrWriteEnum>();@b@	@b@	final static Log logger = LogFactory.getLog("RedisSlaveManager");@b@	@b@	/**@b@	 * 安全创建单例对象@b@	 */@b@	private static class StaticHolder {@b@	  static final RedisManagerProxy proxy = new RedisManagerProxy(ips);@b@	  static final RedisSlaveManager instance = (RedisSlaveManager) proxy.getProxy(new RedisSlaveManager(proxy));@b@	}@b@	  @b@	public static RedisSlaveManager getInstance(String ips){@b@	   RedisSlaveManager.ips = ips;@b@	   return StaticHolder.instance;   @b@	}   @b@  @b@	@b@    public RedisSlaveManager() {@b@		super();@b@	}@b@@b@@b@	private RedisSlaveManager(RedisManagerProxy proxy){@b@    	this.proxy = proxy;@b@    }@b@    @b@    private void initOP(RedisReadOrWriteEnum op){@b@    	if(enableReadWriteSeparation){@b@    		cop.set(op);@b@    	}else{@b@    		cop.set(RedisReadOrWriteEnum.READORWRITE);@b@    	}@b@    }@b@    @b@    public void enableReadWriteSeparation(boolean enable){@b@    	enableReadWriteSeparation = enable;@b@    }@b@    @b@  //************************以下针对单字符串的操作***********************//  @b@ @b@    public void put(String key, String value) {@b@    	initOP(RedisReadOrWriteEnum.WRITE);@b@    	proxy.getCurrJedis().set(key, value);@b@    }   @b@  @b@    public void put(String key, String value, int timeout) {@b@    	initOP(RedisReadOrWriteEnum.WRITE);@b@    	Jedis jedis = proxy.getCurrJedis();@b@    	jedis.set(key, value);@b@    	jedis.expire(key, timeout);@b@    }@b@    @b@    public void put(String key, byte[] object) {@b@    	initOP(RedisReadOrWriteEnum.WRITE);@b@    	proxy.getCurrJedis().set(key.getBytes(), object);@b@    }   @b@  @b@    public void put(String key, byte[] object, int timeout) {@b@    	initOP(RedisReadOrWriteEnum.WRITE);@b@    	Jedis jedis = proxy.getCurrJedis();@b@    	jedis.set(key.getBytes(), object);@b@    	jedis.expire(key.getBytes(), timeout);@b@    }@b@    @b@    public void flush(String... keys) {@b@    	initOP(RedisReadOrWriteEnum.WRITE);@b@    	proxy.getCurrJedis().del(keys); @b@    }@b@    @b@    public void flushByte(String... keys) {@b@    	initOP(RedisReadOrWriteEnum.WRITE);@b@    	byte[][] b = new byte[keys.length][];@b@    	for (int i=0;i<b.length;i++){@b@    		b[i] = keys[i].getBytes();@b@    	}@b@    	proxy.getCurrJedis().del(b); @b@    }@b@    @b@    public String get(String key) {@b@    	initOP(RedisReadOrWriteEnum.READ);@b@        return  proxy.getCurrJedis().get(key);@b@    }@b@    @b@    public byte[] getByte(String key) {@b@    	initOP(RedisReadOrWriteEnum.READ);@b@        return  proxy.getCurrJedis().get(key.getBytes());@b@    }@b@    @b@    public boolean exists(String key){@b@    	initOP(RedisReadOrWriteEnum.READ);@b@    	return proxy.getCurrJedis().exists(key);	@b@    } @b@    @b@    public boolean existsByte(String key){@b@    	initOP(RedisReadOrWriteEnum.READ);@b@    	return proxy.getCurrJedis().exists(key.getBytes());	@b@    } @b@    @b@    @b@	public static void main(String[] arg){@b@		@b@        final RedisSlaveManager rm = RedisSlaveManager.getInstance("192.168.1.202:6379,127.0.0.1:6379,127.0.0.1:6380");@b@        rm.enableReadWriteSeparation(true);@b@        rm.put("zhang", "test");@b@     	System.out.println(rm.get("zhang"));@b@     	@b@     	for (int i=0;i<5;i++){@b@     		System.out.println(rm.get("zhang"));@b@     		try {@b@				Thread.sleep(1000);@b@			} catch (InterruptedException e) {@b@				e.printStackTrace();@b@			}@b@     	}@b@     	@b@     }@b@	@b@}@b@@b@//操作类型@b@enum RedisReadOrWriteEnum {READORWRITE,READ,WRITE};@b@@b@class RedisManagerProxy implements MethodInterceptor{@b@@b@	final static Log logger = LogFactory.getLog("redisManager");@b@	HAJedisPool  jedisPool;@b@    private static final ThreadLocal<Jedis> currJedis = new ThreadLocal<Jedis>();@b@    List<ChangeMasterListener> obserers = new ArrayList<ChangeMasterListener>();@b@    @b@	public RedisManagerProxy(String ips) {@b@		Config config = new Config();@b@    	config.maxActive = 50;@b@    	config.maxIdle = 5;@b@    	config.maxWait = 5000;@b@    	config.testOnBorrow=true;@b@    	List<HAJedisInfo> list = new ArrayList<HAJedisInfo>();@b@    	try{@b@    	String[] ip = ips.split(",");@b@    	for (int i=0;i<ip.length;i++){@b@    		String[] ipinfo = ip[i].split(":");@b@    		if (ipinfo.length == 2){@b@    			HAJedisInfo jedisInfo = new HAJedisInfo(ipinfo[0],Integer.valueOf(ipinfo[1]));@b@        		if (i == 0) jedisInfo.setIsmaster(true);@b@    			list.add(jedisInfo);@b@    		}@b@    		@b@    	}}catch(Exception e){@b@    		logger.error("ip格式不对,示例:"+RedisSlaveManager.DEFAULTIPFORMAT);@b@    		System.exit(-1);@b@    	}@b@    	if (list.size()==0){@b@	    	logger.error("ip格式不对,示例:"+RedisSlaveManager.DEFAULTIPFORMAT);@b@	    	System.exit(-1);@b@	    }@b@    	jedisPool  = new HAJedisPool(config,list);@b@	}@b@	public void registerChangeMaster(ChangeMasterListener lis){@b@		this.obserers.add(lis);@b@	}@b@	public void gcJedis(Jedis jedis) {@b@		jedisPool.returnResource(jedis);@b@	}@b@	public Jedis createJedis(){@b@		return jedisPool.getResource();@b@	}@b@	public Jedis getCurrJedis(){@b@		return currJedis.get();@b@	}@b@	RedisSlaveManager target;@b@	public Object getProxy(Object target){@b@		 this.target = (RedisSlaveManager) target;@b@		 Enhancer enhancer = new Enhancer();  @b@		 enhancer.setSuperclass(target.getClass());  @b@		 enhancer.setCallback(this); @b@	     return enhancer.create(); 	@b@	}@b@	@b@	@Override@b@	public Object intercept(Object arg0, Method arg1, Object[] arg2,@b@			MethodProxy arg3) throws Throwable {@b@		// TODO Auto-generated method stub@b@		try{@b@		    if (target.cop.get()==RedisReadOrWriteEnum.READ){@b@		    	currJedis.set(jedisPool.getSlaveResource());@b@		    	logger.info("now get read jedis from:"+currJedis.get());@b@		    }else{@b@		    	currJedis.set(jedisPool.getResource());@b@		    	logger.info("now get jedis from:"+currJedis.get());@b@		    }@b@			Object oj = arg3.invoke(target, arg2);@b@			return oj;@b@		}catch(Throwable e){@b@			if (e instanceof InvocationTargetException){@b@				e = ((InvocationTargetException) e).getTargetException();@b@			}@b@			logger.error(e.getMessage(),e);@b@		}finally{@b@			if (target.cop.get()==RedisReadOrWriteEnum.READ){@b@				jedisPool.returnSlaveResource(currJedis.get());@b@		    }else{@b@				jedisPool.returnResource(currJedis.get());@b@		    }@b@@b@			currJedis.remove();@b@		}@b@		return null;@b@	}@b@	@b@	//支持双机主从复制的失败自动选择和自动master恢复的连接池@b@	private  class HAJedisPool{@b@		@b@		//读写分离支持@b@		private final GenericObjectPool slavePool;@b@		@b@		private final GenericObjectPool masterPool;@b@		@b@		 public HAJedisPool(Config config,String ip,int port,int timeout) {@b@			 List<HAJedisInfo> HAJedisInfo = new ArrayList<HAJedisInfo>();@b@			 HAJedisInfo ha = new HAJedisInfo(ip,port,timeout);@b@			 ha.setIsmaster(true);@b@			 HAJedisInfo.add(ha);@b@			 HAJedisFactory hafc = new HAJedisFactory(HAJedisInfo);@b@			 this.masterPool = new GenericObjectPool( hafc, config);@b@			 this.slavePool = new GenericObjectPool(new HAJedisSlaveFactory(HAJedisInfo,hafc) , config);@b@			@b@		}@b@		 public HAJedisPool(Config config,List<HAJedisInfo> HAJedisInfo) {@b@			 HAJedisFactory hafc = new HAJedisFactory(HAJedisInfo);@b@			 this.masterPool = new GenericObjectPool( hafc, config);@b@			 this.slavePool = new GenericObjectPool(new HAJedisSlaveFactory(HAJedisInfo,hafc) , config);@b@		}@b@		@b@		 @SuppressWarnings("unchecked")@b@		 public Jedis getResource() {@b@		        try {@b@		            return (Jedis) masterPool.borrowObject();@b@		        } catch (Exception e) {@b@		            throw new JedisConnectionException(@b@		                    "获取jedis连接失败", e);@b@		        }@b@		 }@b@		 @b@		 @SuppressWarnings("unchecked")@b@		 public Jedis getSlaveResource() {@b@		        try {@b@		            return (Jedis) slavePool.borrowObject();@b@		        } catch (Exception e) {@b@		            throw new JedisConnectionException(@b@		                    "获取jedis连接失败", e);@b@		        }@b@		 }@b@		        @b@		public void returnResource(final Object resource) {@b@		        try {@b@		        	masterPool.returnObject(resource);@b@		        } catch (Exception e) {@b@		            throw new JedisException(@b@		            		  "回收jedis连接失败", e);@b@		        }@b@		    }@b@		@b@		public void returnSlaveResource(final Object resource) {@b@	        try {@b@	        	slavePool.returnObject(resource);@b@	        } catch (Exception e) {@b@	            throw new JedisException(@b@	            		  "回收jedis连接失败", e);@b@	        }@b@	    }@b@		@b@		private void destoryMasterAll(){@b@			if (masterPool != null){@b@				masterPool.clear();@b@			}	@b@		}@b@		private void destorySlaveAll(){@b@			if (slavePool != null){@b@				slavePool.clear();@b@			}	@b@		}@b@@b@        private class HAJedisSlaveFactory extends BasePoolableObjectFactory implements ChangeMasterListener{@b@            private HAJedisFactory masterFactory;@b@            private List<HAJedisInfo> HAJedisList;@b@            private HAJedis currentSlave;@b@            Log logger = LogFactory.getLog("RedisSlaveManager");@b@			public HAJedisSlaveFactory(List<HAJedisInfo> HAJedisInfo,@b@					HAJedisFactory masterFactory) {@b@				this.HAJedisList = HAJedisInfo;@b@				this.masterFactory = masterFactory;@b@				obserers.add(this);@b@			}@b@			private void switchIp(){@b@				destorySlaveAll();@b@				findBaseSlave();@b@				logger.info("switchToSlave:"+currentSlave);@b@			}@b@			@Override@b@			public Object makeObject() throws Exception {@b@				// TODO Auto-generated method stub@b@				if (currentSlave == null || !checkIsAlive(currentSlave)){@b@					switchIp();@b@				}@b@				HAJedis jedis = new HAJedis(currentSlave.getIp(),currentSlave.getPort(),currentSlave.getTimeout());@b@				return jedis;@b@			}@b@			@Override@b@		    public void destroyObject(final Object obj) {@b@				masterFactory.destroyObject(obj);@b@		    }@b@			@Override@b@			public boolean validateObject(final Object obj) {@b@				try {@b@	        		HAJedis jedis = (HAJedis) obj;@b@	                return (currentSlave.equals(jedis)@b@	                        )@b@	                		&&checkIsAlive(jedis);@b@	            } catch (Exception ex) {@b@	                return false;@b@	            }@b@		    }@b@			@b@			private boolean checkIsAlive(HAJedis jedis){@b@				return  masterFactory.checkIsAlive(jedis);@b@			}@b@			private boolean checkIsAlive(String ip,int port){@b@				return  masterFactory.checkIsAlive(ip,port);@b@			}@b@			private synchronized void findBaseSlave(){@b@				try{@b@	        	boolean finded = false;@b@	        	for (HAJedisInfo ha : HAJedisList){@b@	        		//第一次选时@b@	        		if (currentSlave == null){@b@	        			if (!ha.isIsmaster() @b@	        					&& !masterFactory.currentMaster.equals(ha.getIp(), ha.getPort())@b@	        					&& checkIsAlive(ha.getIp(),ha.getPort())){@b@		        			currentSlave = new HAJedis(ha.getIp(),ha.getPort(),ha.getTimeout());@b@		        			finded = true;@b@		        			break;@b@		        		}@b@	        		}@b@	        		//后继选择时@b@	        		if (!masterFactory.currentMaster.equals(ha.getIp(), ha.getPort())@b@	        				&& checkIsAlive(ha.getIp(),ha.getPort())){@b@	        			this.destroyObject(currentSlave);@b@	        			currentSlave = new HAJedis(ha.getIp(),ha.getPort(),ha.getTimeout());@b@	        			finded = true;@b@	        			break;@b@	        		}@b@	        	}@b@	        	if (!finded) {@b@	        		//一次未找到可用的后,则不再尝试检测从服务器的的状态,直接使用主服务器@b@	        		logger.warn("未找到可用的从服务器,切换到主服务器服务!");@b@	        		currentSlave = masterFactory.currentMaster;@b@	        		logger.warn("切换到的主服务器:"+currentSlave);@b@	        	}@b@				}catch(Exception e){@b@					logger.error(e.getMessage(),e);@b@				}@b@	        }@b@			@Override@b@			public void changeMaster(String ip, int port) {@b@				// TODO Auto-generated method stub@b@				switchIp();@b@			}@b@        }@b@		private  class HAJedisFactory extends BasePoolableObjectFactory{@b@		        private List<HAJedisInfo> HAJedisList;@b@		        private HAJedisInfo currentActiveHAJedis;@b@		        private volatile boolean isBaseMaster;@b@		        private HAJedis baseMaster;@b@		        private HAJedis currentMaster;@b@		        Log logger = LogFactory.getLog("RedisSlaveManager");@b@		        //private final ScheduledExecutorService  schedule = Executors.newScheduledThreadPool(2);@b@		        public HAJedisFactory(List<HAJedisInfo> HAJedisInfo) { @b@		            this.HAJedisList = HAJedisInfo;@b@		            findBaseMaster();   @b@		            Thread heart = new Thread(new HeartBeat());@b@		            heart.setDaemon(true);@b@		            heart.start();@b@		        }@b@		        @b@		        public synchronized Object makeObject() throws Exception {@b@		        	checkConnection();@b@		        	HAJedis jedis = new HAJedis(currentActiveHAJedis.getIp(),currentActiveHAJedis.getPort(),currentActiveHAJedis.getTimeout());@b@		        	return jedis;@b@		        }@b@		        @b@		        private synchronized void switchIp(){@b@		        	destoryMasterAll();@b@		        	findOneSavleAsMaster();@b@		        }@b@		        @b@		        private synchronized void findOneSavleAsMaster(){@b@		        	for (HAJedisInfo ha : HAJedisList){@b@		        		String currIP = currentActiveHAJedis.getIp()+":"+currentActiveHAJedis.getPort();@b@		        		String haIP = ha.getIp()+":"+ha.getPort();@b@		        		if (!ha.isIsmaster() @b@		        				&&!currIP.equals(haIP)){@b@		        			HAJedis jedis = null;@b@		        			try{@b@		        			jedis = new HAJedis(ha.getIp(),ha.getPort(),ha.getTimeout());@b@		        			if (checkIsAlive(jedis))@b@		        				{@b@		        				currentActiveHAJedis = ha;@b@		        				isBaseMaster = false;@b@		        				changeMasterTo(ha.getIp(),ha.getPort());@b@		        				break;@b@		        				}@b@		        			}finally{@b@		        				this.destroyObject(jedis);@b@		        			}@b@		        			@b@		        		}@b@		        	}@b@		        }@b@		        @b@		        private synchronized void findBaseMaster(){@b@		        	boolean finded = false;@b@		        	for (HAJedisInfo ha : HAJedisList){@b@		        		if (ha.isIsmaster()){@b@		        			baseMaster = new HAJedis(ha.getIp(),ha.getPort(),ha.getTimeout());@b@		        			currentMaster = baseMaster;@b@		        			isBaseMaster = true;@b@		        			currentActiveHAJedis = ha;@b@		        			finded = true;@b@		        			break;@b@		        		}@b@		        	}@b@		        	if (!finded || !checkIsAlive(baseMaster)) {@b@		        		logger.warn("未设置主服务器或主服务器当前不可用,选择其它slave作为主服务器!");@b@		        		switchIp();@b@		        	}@b@		        }@b@		        @b@		        //始终保持当前活动的服务为主服务状态@b@		        private synchronized void changeMasterTo(String toip,int toport){@b@		        	destoryMasterAll();@b@		        	for (HAJedisInfo ha : HAJedisList){@b@		        		if (toip.equals(ha.getIp()) && (toport == ha.getPort())){@b@		        			if (!baseMaster.equals(currentMaster))destroyObject(currentMaster);@b@		        			this.currentMaster = new HAJedis(ha.getIp(),ha.getPort(),ha.getTimeout());@b@		        			if (baseMaster.getIp().equals(ha.getIp())@b@		        				&&baseMaster.getPort() == ha.getPort())@b@		        				{@b@		        				isBaseMaster = true;@b@		        				}else{@b@		        				isBaseMaster = false;	@b@		        				}@b@		        			currentActiveHAJedis = ha;@b@		        			@b@		        		}@b@		        	}@b@		        	HAJedis jedis = new HAJedis(toip,toport);@b@		        	jedis.slaveofNoOne();@b@		        	destroyObject(jedis);@b@		        	//其它所有节点都设置从节点@b@		        	Jedis other = null;@b@		        	for (HAJedisInfo ha : HAJedisList){@b@		        		try{@b@		        			if (!toip.equals(ha.getIp()) && (toport != ha.getPort())){@b@		        				other = new HAJedis(ha.getIp(),ha.getPort());@b@		        				other.slaveof(toip, toport);@b@		        			}@b@							@b@		        		}catch(Exception e){}finally{@b@		        			destroyObject(other);@b@		        		}@b@		        	}@b@		        	for (ChangeMasterListener cm : obserers){@b@		        		cm.changeMaster(currentActiveHAJedis.getIp(), currentActiveHAJedis.getPort());@b@		        	}@b@					logger.warn("switchToMaster-->"+currentActiveHAJedis.getIp()+":"+currentActiveHAJedis.getPort());@b@		        }@b@		        @b@		        private boolean checkIsAlive(Jedis jedis){@b@		        	try{@b@		        		return jedis.ping().equals("PONG");@b@		        	}catch(Exception e){@b@		        		logger.error(e.getMessage());@b@		        		try{@b@		        		jedis.disconnect();@b@		        		}catch(Exception e1){};@b@		        	}@b@		        	return false;@b@		        }@b@		        @b@		        private boolean checkIsAlive(String ip,int port){@b@		        	HAJedis jedis = null;@b@		        	try{@b@		        		jedis = new HAJedis(ip,port);@b@		        		return jedis.ping().equals("PONG");@b@		        	}catch(Exception e){@b@		        		logger.error(e.getMessage());@b@		        	}finally{@b@		        		this.destroyObject(jedis);@b@		        	}@b@		        	return false;@b@		        }@b@		        @b@		        public void destroyObject(final Object obj) {@b@		            if ((obj != null) && (obj instanceof Jedis)) {@b@		            	Jedis jedis = (Jedis) obj;@b@		            	 try {@b@		                   		try {@b@		                   			jedis.quit();@b@		                        } catch (Exception e) {@b@@b@		                        }@b@		                        jedis.disconnect();@b@		                    } catch (Exception e) {@b@@b@		                    }@b@@b@		                }@b@		        }@b@		        //必须是主节点且是活动状态@b@		        public boolean validateObject(final Object obj) {@b@		        	try {@b@		        		Jedis jedis = (Jedis) obj;@b@		                return (currentMaster.equals(jedis)@b@		                        )@b@		                		&&checkIsAlive(jedis);@b@		            } catch (Exception ex) {@b@		                return false;@b@		            }@b@		        }@b@		        @b@		        private void checkConnection(){@b@		        	try{@b@						if (isBaseMaster && !checkIsAlive(baseMaster)){@b@							logger.warn("当前是basemaster,心跳检测到其不可用,选择一个slave替换为master");@b@			        		switchIp();@b@			        	}@b@						//检测可用时,如果当前不是master,则恢复到master@b@						//规则,把当前从的设置成主的,恢复的主的设置成从的,保证数据一致性@b@						if (!isBaseMaster && checkIsAlive(baseMaster)){@b@							logger.warn("检测basemaster可用,恢复到basemaster");@b@							String ip = baseMaster.getIp();@b@							int port = baseMaster.getPort();@b@							//同步数据@b@							logger.warn("同步新数据到master");@b@							changeMasterTo(ip,port);@b@							isBaseMaster = true;@b@						}@b@						//如果当前savle不可用,则换另一个savle@b@						if (!isBaseMaster && !checkIsAlive(currentMaster)){@b@							logger.warn("当前currentmaster不可用,换另一个slave为currentmaster");@b@							switchIp();@b@						}@b@						}catch(Exception e){@b@							logger.error(e.getMessage(),e);@b@						}@b@						@b@		        }@b@@b@		        class HeartBeat implements Runnable{@b@		        	@b@		        	Log logger = LogFactory.getLog("RedisSlaveManager");@b@		        	//心跳检测@b@					@Override@b@					public void run() {@b@						// TODO Auto-generated method stub@b@					    while(true){@b@					    checkConnection();@b@						 try {@b@							Thread.sleep(1000);@b@						} catch (InterruptedException e) {@b@							// TODO Auto-generated catch block@b@							e.printStackTrace();@b@						}@b@					  }@b@					   @b@					}@b@		        }@b@		        @b@				}@b@	  }@b@}@b@@b@interface ChangeMasterListener{@b@	public abstract void changeMaster(String ip,int port);@b@}

控制台打印结果

四月 12, 2019 12:48:11 上午 com.xwood.redis.slave.RedisManagerProxy intercept@b@信息: now get jedis from:192.168.1.202:6379@b@@b@@b@四月 12, 2019 12:48:12 上午 com.xwood.redis.slave.RedisManagerProxy intercept@b@信息: now get jedis from:192.168.1.202:6379@b@四月 12, 2019 12:48:12 上午 com.xwood.redis.slave.RedisManagerProxy intercept@b@信息: now get jedis from:192.168.1.202:6379@b@test@b@四月 12, 2019 12:48:12 上午 com.xwood.redis.slave.RedisManagerProxy$HAJedisPool$HAJedisSlaveFactory switchIp@b@信息: switchToSlave:127.0.0.1:6379@b@四月 12, 2019 12:48:12 上午 com.xwood.redis.slave.RedisManagerProxy intercept@b@信息: now get read jedis from:127.0.0.1:6379@b@test@b@四月 12, 2019 12:48:13 上午 com.xwood.redis.slave.RedisManagerProxy intercept@b@信息: now get read jedis from:127.0.0.1:6379@b@test@b@四月 12, 2019 12:48:14 上午 com.xwood.redis.slave.RedisManagerProxy intercept@b@信息: now get read jedis from:127.0.0.1:6379@b@test@b@四月 12, 2019 12:48:15 上午 com.xwood.redis.slave.RedisManagerProxy intercept@b@信息: now get read jedis from:127.0.0.1:6379@b@test@b@四月 12, 2019 12:48:16 上午 com.xwood.redis.slave.RedisManagerProxy intercept@b@信息: now get read jedis from:127.0.0.1:6379@b@test@b@四月 12, 2019 12:48:17 上午 com.xwood.redis.slave.RedisManagerProxy intercept@b@信息: now get read jedis from:127.0.0.1:6379@b@test@b@四月 12, 2019 12:48:18 上午 com.xwood.redis.slave.RedisManagerProxy intercept@b@信息: now get read jedis from:127.0.0.1:6379@b@test@b@四月 12, 2019 12:48:19 上午 com.xwood.redis.slave.RedisManagerProxy intercept@b@信息: now get read jedis from:127.0.0.1:6379@b@test@b@四月 12, 2019 12:48:20 上午 com.xwood.redis.slave.RedisManagerProxy intercept@b@信息: now get read jedis from:127.0.0.1:6379@b@test@b@四月 12, 2019 12:48:21 上午 com.xwood.redis.slave.RedisManagerProxy intercept@b@信息: now get read jedis from:127.0.0.1:6379@b@test@b@四月 12, 2019 12:48:22 上午 com.xwood.redis.slave.RedisManagerProxy intercept@b@信息: now get jedis from:192.168.1.202:6379@b@四月 12, 2019 12:48:27 上午 com.xwood.redis.slave.RedisManagerProxy intercept@b@信息: now get read jedis from:127.0.0.1:6379@b@...
<<热门下载>>