一、示例说明
本章节通过java代码实现连接redis主从(配置教程参考其他文章)高可用示例代码,该代码实现redis主从模式下读写分离、故障检测切换、自动切换主master节点写入、自动切换slave备用节点读取、主从节点自动同步等功能。该代码主要依赖包:jedis-2.1.0.jar、commons-pool.jar、commons-logging-1.1.jar、cglib-2.1.3.jar、asm.jar,完整的项目点击"关于redis四种不同模式(单例/主从/哨兵/集群)java示例代码项目下载(含完整依赖包)"进行下载
二、代码示例
1. HAJedisInfo.java、HAJedis.java依赖类
package com.xwood.redis.slave;@b@@b@import redis.clients.jedis.Jedis;@b@@b@class HAJedis extends Jedis{@b@@b@ public HAJedis(String host, int port, int timeout) {@b@ super(host, port, timeout);@b@ // TODO Auto-generated constructor stub@b@ }@b@@b@ public HAJedis(String host, int port) {@b@ super(host, port);@b@ // TODO Auto-generated constructor stub@b@ }@b@@b@ public HAJedis(String host) {@b@ super(host);@b@ // TODO Auto-generated constructor stub@b@ }@b@@b@ @Override@b@ public boolean equals(Object obj) {@b@ // TODO Auto-generated method stub@b@ if (obj == null) return false;@b@ Jedis jedis = (Jedis)obj;@b@ return this.getClient().getHost().equals(jedis.getClient().getHost())@b@ && this.getClient().getPort()==jedis.getClient().getPort();@b@ }@b@ @b@ public boolean equals(String ip,int port) {@b@ // TODO Auto-generated method stub@b@ return this.getClient().getHost().equals(ip)@b@ && this.getClient().getPort()==port;@b@ }@b@@b@ @Override@b@ public String toString() {@b@ // TODO Auto-generated method stub@b@ return this.getClient().getHost()+":"+this.getClient().getPort();@b@ }@b@ @b@ public String getIp(){@b@ return this.getClient().getHost();@b@ }@b@ public int getPort(){@b@ return this.getClient().getPort();@b@ }@b@ public int getTimeout(){@b@ return this.getClient().getTimeout();@b@ }@b@ @b@}
package com.xwood.redis.slave;@b@@b@public class HAJedisInfo {@b@ @b@ private String ip;@b@ private int port;@b@ private int timeout;@b@ private boolean ismaster;@b@@b@ public HAJedisInfo(String ip, int port, int timeout) {@b@ super();@b@ this.ip = ip;@b@ this.port = port;@b@ this.timeout = timeout;@b@ }@b@@b@ public HAJedisInfo(String ip, int port) {@b@ super();@b@ this.ip = ip;@b@ this.port = port;@b@ this.timeout = 5000;@b@ }@b@@b@ public boolean isIsmaster() {@b@ return ismaster;@b@ }@b@@b@ public void setIsmaster(boolean ismaster) {@b@ this.ismaster = ismaster;@b@ }@b@@b@ public boolean testisConnection() {@b@ return true;@b@ }@b@@b@ public int getTimeout() {@b@ return timeout;@b@ }@b@@b@ public void setTimeout(int timeout) {@b@ this.timeout = timeout;@b@ }@b@@b@ public String getIp() {@b@ return ip;@b@ }@b@@b@ public void setIp(String ip) {@b@ this.ip = ip;@b@ }@b@@b@ public int getPort() {@b@ return port;@b@ }@b@@b@ public void setPort(int port) {@b@ this.port = port;@b@ }@b@@b@}
2. RedisSlaveManager.java运行测试类
package com.xwood.redis.slave;@b@@b@import java.lang.reflect.InvocationTargetException;@b@import java.lang.reflect.Method;@b@import java.util.ArrayList;@b@import java.util.List;@b@@b@import net.sf.cglib.proxy.Enhancer;@b@import net.sf.cglib.proxy.MethodInterceptor;@b@import net.sf.cglib.proxy.MethodProxy;@b@@b@import org.apache.commons.logging.Log;@b@import org.apache.commons.logging.LogFactory;@b@import org.apache.commons.pool.BasePoolableObjectFactory;@b@import org.apache.commons.pool.impl.GenericObjectPool;@b@import org.apache.commons.pool.impl.GenericObjectPool.Config;@b@@b@import redis.clients.jedis.Jedis;@b@import redis.clients.jedis.exceptions.JedisConnectionException;@b@import redis.clients.jedis.exceptions.JedisException;@b@@b@/**@b@ * redis主从架构的客户端管理类,依赖包cglib-2.1.3.jar、commons-loggin-1.1.jar、asm.jar、jedis-2.0.jar@b@ * .0.jar、commons-pool.jar 1@b@ * 支持主从复制key/value,pop/push,pub/sub,读/写分离等功能的灾难失败自动选择和恢复 2@b@ * 可以选择读写分离功能,主写从读,默认不启用,都使用主服务进行所有操作@b@ */@b@public class RedisSlaveManager{ @b@ @b@ @b@ RedisManagerProxy proxy;@b@ @b@ /**@b@ * 连接ip格式,默认第一个为主服务器,后面的为从服务器,若只设置一个,则为主服务器@b@ */@b@ public static final String DEFAULTIPFORMAT = "127.0.0.1:6379,127.0.0.1:6380"; @b@ @b@ static String ips = null;@b@ @b@ boolean enableReadWriteSeparation;@b@ @b@ ThreadLocal<RedisReadOrWriteEnum> cop = new ThreadLocal<RedisReadOrWriteEnum>();@b@ @b@ final static Log logger = LogFactory.getLog("RedisSlaveManager");@b@ @b@ /**@b@ * 安全创建单例对象@b@ */@b@ private static class StaticHolder {@b@ static final RedisManagerProxy proxy = new RedisManagerProxy(ips);@b@ static final RedisSlaveManager instance = (RedisSlaveManager) proxy.getProxy(new RedisSlaveManager(proxy));@b@ }@b@ @b@ public static RedisSlaveManager getInstance(String ips){@b@ RedisSlaveManager.ips = ips;@b@ return StaticHolder.instance; @b@ } @b@ @b@ @b@ public RedisSlaveManager() {@b@ super();@b@ }@b@@b@@b@ private RedisSlaveManager(RedisManagerProxy proxy){@b@ this.proxy = proxy;@b@ }@b@ @b@ private void initOP(RedisReadOrWriteEnum op){@b@ if(enableReadWriteSeparation){@b@ cop.set(op);@b@ }else{@b@ cop.set(RedisReadOrWriteEnum.READORWRITE);@b@ }@b@ }@b@ @b@ public void enableReadWriteSeparation(boolean enable){@b@ enableReadWriteSeparation = enable;@b@ }@b@ @b@ //************************以下针对单字符串的操作***********************// @b@ @b@ public void put(String key, String value) {@b@ initOP(RedisReadOrWriteEnum.WRITE);@b@ proxy.getCurrJedis().set(key, value);@b@ } @b@ @b@ public void put(String key, String value, int timeout) {@b@ initOP(RedisReadOrWriteEnum.WRITE);@b@ Jedis jedis = proxy.getCurrJedis();@b@ jedis.set(key, value);@b@ jedis.expire(key, timeout);@b@ }@b@ @b@ public void put(String key, byte[] object) {@b@ initOP(RedisReadOrWriteEnum.WRITE);@b@ proxy.getCurrJedis().set(key.getBytes(), object);@b@ } @b@ @b@ public void put(String key, byte[] object, int timeout) {@b@ initOP(RedisReadOrWriteEnum.WRITE);@b@ Jedis jedis = proxy.getCurrJedis();@b@ jedis.set(key.getBytes(), object);@b@ jedis.expire(key.getBytes(), timeout);@b@ }@b@ @b@ public void flush(String... keys) {@b@ initOP(RedisReadOrWriteEnum.WRITE);@b@ proxy.getCurrJedis().del(keys); @b@ }@b@ @b@ public void flushByte(String... keys) {@b@ initOP(RedisReadOrWriteEnum.WRITE);@b@ byte[][] b = new byte[keys.length][];@b@ for (int i=0;i<b.length;i++){@b@ b[i] = keys[i].getBytes();@b@ }@b@ proxy.getCurrJedis().del(b); @b@ }@b@ @b@ public String get(String key) {@b@ initOP(RedisReadOrWriteEnum.READ);@b@ return proxy.getCurrJedis().get(key);@b@ }@b@ @b@ public byte[] getByte(String key) {@b@ initOP(RedisReadOrWriteEnum.READ);@b@ return proxy.getCurrJedis().get(key.getBytes());@b@ }@b@ @b@ public boolean exists(String key){@b@ initOP(RedisReadOrWriteEnum.READ);@b@ return proxy.getCurrJedis().exists(key); @b@ } @b@ @b@ public boolean existsByte(String key){@b@ initOP(RedisReadOrWriteEnum.READ);@b@ return proxy.getCurrJedis().exists(key.getBytes()); @b@ } @b@ @b@ @b@ public static void main(String[] arg){@b@ @b@ final RedisSlaveManager rm = RedisSlaveManager.getInstance("192.168.1.202:6379,127.0.0.1:6379,127.0.0.1:6380");@b@ rm.enableReadWriteSeparation(true);@b@ rm.put("zhang", "test");@b@ System.out.println(rm.get("zhang"));@b@ @b@ for (int i=0;i<5;i++){@b@ System.out.println(rm.get("zhang"));@b@ try {@b@ Thread.sleep(1000);@b@ } catch (InterruptedException e) {@b@ e.printStackTrace();@b@ }@b@ }@b@ @b@ }@b@ @b@}@b@@b@//操作类型@b@enum RedisReadOrWriteEnum {READORWRITE,READ,WRITE};@b@@b@class RedisManagerProxy implements MethodInterceptor{@b@@b@ final static Log logger = LogFactory.getLog("redisManager");@b@ HAJedisPool jedisPool;@b@ private static final ThreadLocal<Jedis> currJedis = new ThreadLocal<Jedis>();@b@ List<ChangeMasterListener> obserers = new ArrayList<ChangeMasterListener>();@b@ @b@ public RedisManagerProxy(String ips) {@b@ Config config = new Config();@b@ config.maxActive = 50;@b@ config.maxIdle = 5;@b@ config.maxWait = 5000;@b@ config.testOnBorrow=true;@b@ List<HAJedisInfo> list = new ArrayList<HAJedisInfo>();@b@ try{@b@ String[] ip = ips.split(",");@b@ for (int i=0;i<ip.length;i++){@b@ String[] ipinfo = ip[i].split(":");@b@ if (ipinfo.length == 2){@b@ HAJedisInfo jedisInfo = new HAJedisInfo(ipinfo[0],Integer.valueOf(ipinfo[1]));@b@ if (i == 0) jedisInfo.setIsmaster(true);@b@ list.add(jedisInfo);@b@ }@b@ @b@ }}catch(Exception e){@b@ logger.error("ip格式不对,示例:"+RedisSlaveManager.DEFAULTIPFORMAT);@b@ System.exit(-1);@b@ }@b@ if (list.size()==0){@b@ logger.error("ip格式不对,示例:"+RedisSlaveManager.DEFAULTIPFORMAT);@b@ System.exit(-1);@b@ }@b@ jedisPool = new HAJedisPool(config,list);@b@ }@b@ public void registerChangeMaster(ChangeMasterListener lis){@b@ this.obserers.add(lis);@b@ }@b@ public void gcJedis(Jedis jedis) {@b@ jedisPool.returnResource(jedis);@b@ }@b@ public Jedis createJedis(){@b@ return jedisPool.getResource();@b@ }@b@ public Jedis getCurrJedis(){@b@ return currJedis.get();@b@ }@b@ RedisSlaveManager target;@b@ public Object getProxy(Object target){@b@ this.target = (RedisSlaveManager) target;@b@ Enhancer enhancer = new Enhancer(); @b@ enhancer.setSuperclass(target.getClass()); @b@ enhancer.setCallback(this); @b@ return enhancer.create(); @b@ }@b@ @b@ @Override@b@ public Object intercept(Object arg0, Method arg1, Object[] arg2,@b@ MethodProxy arg3) throws Throwable {@b@ // TODO Auto-generated method stub@b@ try{@b@ if (target.cop.get()==RedisReadOrWriteEnum.READ){@b@ currJedis.set(jedisPool.getSlaveResource());@b@ logger.info("now get read jedis from:"+currJedis.get());@b@ }else{@b@ currJedis.set(jedisPool.getResource());@b@ logger.info("now get jedis from:"+currJedis.get());@b@ }@b@ Object oj = arg3.invoke(target, arg2);@b@ return oj;@b@ }catch(Throwable e){@b@ if (e instanceof InvocationTargetException){@b@ e = ((InvocationTargetException) e).getTargetException();@b@ }@b@ logger.error(e.getMessage(),e);@b@ }finally{@b@ if (target.cop.get()==RedisReadOrWriteEnum.READ){@b@ jedisPool.returnSlaveResource(currJedis.get());@b@ }else{@b@ jedisPool.returnResource(currJedis.get());@b@ }@b@@b@ currJedis.remove();@b@ }@b@ return null;@b@ }@b@ @b@ //支持双机主从复制的失败自动选择和自动master恢复的连接池@b@ private class HAJedisPool{@b@ @b@ //读写分离支持@b@ private final GenericObjectPool slavePool;@b@ @b@ private final GenericObjectPool masterPool;@b@ @b@ public HAJedisPool(Config config,String ip,int port,int timeout) {@b@ List<HAJedisInfo> HAJedisInfo = new ArrayList<HAJedisInfo>();@b@ HAJedisInfo ha = new HAJedisInfo(ip,port,timeout);@b@ ha.setIsmaster(true);@b@ HAJedisInfo.add(ha);@b@ HAJedisFactory hafc = new HAJedisFactory(HAJedisInfo);@b@ this.masterPool = new GenericObjectPool( hafc, config);@b@ this.slavePool = new GenericObjectPool(new HAJedisSlaveFactory(HAJedisInfo,hafc) , config);@b@ @b@ }@b@ public HAJedisPool(Config config,List<HAJedisInfo> HAJedisInfo) {@b@ HAJedisFactory hafc = new HAJedisFactory(HAJedisInfo);@b@ this.masterPool = new GenericObjectPool( hafc, config);@b@ this.slavePool = new GenericObjectPool(new HAJedisSlaveFactory(HAJedisInfo,hafc) , config);@b@ }@b@ @b@ @SuppressWarnings("unchecked")@b@ public Jedis getResource() {@b@ try {@b@ return (Jedis) masterPool.borrowObject();@b@ } catch (Exception e) {@b@ throw new JedisConnectionException(@b@ "获取jedis连接失败", e);@b@ }@b@ }@b@ @b@ @SuppressWarnings("unchecked")@b@ public Jedis getSlaveResource() {@b@ try {@b@ return (Jedis) slavePool.borrowObject();@b@ } catch (Exception e) {@b@ throw new JedisConnectionException(@b@ "获取jedis连接失败", e);@b@ }@b@ }@b@ @b@ public void returnResource(final Object resource) {@b@ try {@b@ masterPool.returnObject(resource);@b@ } catch (Exception e) {@b@ throw new JedisException(@b@ "回收jedis连接失败", e);@b@ }@b@ }@b@ @b@ public void returnSlaveResource(final Object resource) {@b@ try {@b@ slavePool.returnObject(resource);@b@ } catch (Exception e) {@b@ throw new JedisException(@b@ "回收jedis连接失败", e);@b@ }@b@ }@b@ @b@ private void destoryMasterAll(){@b@ if (masterPool != null){@b@ masterPool.clear();@b@ } @b@ }@b@ private void destorySlaveAll(){@b@ if (slavePool != null){@b@ slavePool.clear();@b@ } @b@ }@b@@b@ private class HAJedisSlaveFactory extends BasePoolableObjectFactory implements ChangeMasterListener{@b@ private HAJedisFactory masterFactory;@b@ private List<HAJedisInfo> HAJedisList;@b@ private HAJedis currentSlave;@b@ Log logger = LogFactory.getLog("RedisSlaveManager");@b@ public HAJedisSlaveFactory(List<HAJedisInfo> HAJedisInfo,@b@ HAJedisFactory masterFactory) {@b@ this.HAJedisList = HAJedisInfo;@b@ this.masterFactory = masterFactory;@b@ obserers.add(this);@b@ }@b@ private void switchIp(){@b@ destorySlaveAll();@b@ findBaseSlave();@b@ logger.info("switchToSlave:"+currentSlave);@b@ }@b@ @Override@b@ public Object makeObject() throws Exception {@b@ // TODO Auto-generated method stub@b@ if (currentSlave == null || !checkIsAlive(currentSlave)){@b@ switchIp();@b@ }@b@ HAJedis jedis = new HAJedis(currentSlave.getIp(),currentSlave.getPort(),currentSlave.getTimeout());@b@ return jedis;@b@ }@b@ @Override@b@ public void destroyObject(final Object obj) {@b@ masterFactory.destroyObject(obj);@b@ }@b@ @Override@b@ public boolean validateObject(final Object obj) {@b@ try {@b@ HAJedis jedis = (HAJedis) obj;@b@ return (currentSlave.equals(jedis)@b@ )@b@ &&checkIsAlive(jedis);@b@ } catch (Exception ex) {@b@ return false;@b@ }@b@ }@b@ @b@ private boolean checkIsAlive(HAJedis jedis){@b@ return masterFactory.checkIsAlive(jedis);@b@ }@b@ private boolean checkIsAlive(String ip,int port){@b@ return masterFactory.checkIsAlive(ip,port);@b@ }@b@ private synchronized void findBaseSlave(){@b@ try{@b@ boolean finded = false;@b@ for (HAJedisInfo ha : HAJedisList){@b@ //第一次选时@b@ if (currentSlave == null){@b@ if (!ha.isIsmaster() @b@ && !masterFactory.currentMaster.equals(ha.getIp(), ha.getPort())@b@ && checkIsAlive(ha.getIp(),ha.getPort())){@b@ currentSlave = new HAJedis(ha.getIp(),ha.getPort(),ha.getTimeout());@b@ finded = true;@b@ break;@b@ }@b@ }@b@ //后继选择时@b@ if (!masterFactory.currentMaster.equals(ha.getIp(), ha.getPort())@b@ && checkIsAlive(ha.getIp(),ha.getPort())){@b@ this.destroyObject(currentSlave);@b@ currentSlave = new HAJedis(ha.getIp(),ha.getPort(),ha.getTimeout());@b@ finded = true;@b@ break;@b@ }@b@ }@b@ if (!finded) {@b@ //一次未找到可用的后,则不再尝试检测从服务器的的状态,直接使用主服务器@b@ logger.warn("未找到可用的从服务器,切换到主服务器服务!");@b@ currentSlave = masterFactory.currentMaster;@b@ logger.warn("切换到的主服务器:"+currentSlave);@b@ }@b@ }catch(Exception e){@b@ logger.error(e.getMessage(),e);@b@ }@b@ }@b@ @Override@b@ public void changeMaster(String ip, int port) {@b@ // TODO Auto-generated method stub@b@ switchIp();@b@ }@b@ }@b@ private class HAJedisFactory extends BasePoolableObjectFactory{@b@ private List<HAJedisInfo> HAJedisList;@b@ private HAJedisInfo currentActiveHAJedis;@b@ private volatile boolean isBaseMaster;@b@ private HAJedis baseMaster;@b@ private HAJedis currentMaster;@b@ Log logger = LogFactory.getLog("RedisSlaveManager");@b@ //private final ScheduledExecutorService schedule = Executors.newScheduledThreadPool(2);@b@ public HAJedisFactory(List<HAJedisInfo> HAJedisInfo) { @b@ this.HAJedisList = HAJedisInfo;@b@ findBaseMaster(); @b@ Thread heart = new Thread(new HeartBeat());@b@ heart.setDaemon(true);@b@ heart.start();@b@ }@b@ @b@ public synchronized Object makeObject() throws Exception {@b@ checkConnection();@b@ HAJedis jedis = new HAJedis(currentActiveHAJedis.getIp(),currentActiveHAJedis.getPort(),currentActiveHAJedis.getTimeout());@b@ return jedis;@b@ }@b@ @b@ private synchronized void switchIp(){@b@ destoryMasterAll();@b@ findOneSavleAsMaster();@b@ }@b@ @b@ private synchronized void findOneSavleAsMaster(){@b@ for (HAJedisInfo ha : HAJedisList){@b@ String currIP = currentActiveHAJedis.getIp()+":"+currentActiveHAJedis.getPort();@b@ String haIP = ha.getIp()+":"+ha.getPort();@b@ if (!ha.isIsmaster() @b@ &&!currIP.equals(haIP)){@b@ HAJedis jedis = null;@b@ try{@b@ jedis = new HAJedis(ha.getIp(),ha.getPort(),ha.getTimeout());@b@ if (checkIsAlive(jedis))@b@ {@b@ currentActiveHAJedis = ha;@b@ isBaseMaster = false;@b@ changeMasterTo(ha.getIp(),ha.getPort());@b@ break;@b@ }@b@ }finally{@b@ this.destroyObject(jedis);@b@ }@b@ @b@ }@b@ }@b@ }@b@ @b@ private synchronized void findBaseMaster(){@b@ boolean finded = false;@b@ for (HAJedisInfo ha : HAJedisList){@b@ if (ha.isIsmaster()){@b@ baseMaster = new HAJedis(ha.getIp(),ha.getPort(),ha.getTimeout());@b@ currentMaster = baseMaster;@b@ isBaseMaster = true;@b@ currentActiveHAJedis = ha;@b@ finded = true;@b@ break;@b@ }@b@ }@b@ if (!finded || !checkIsAlive(baseMaster)) {@b@ logger.warn("未设置主服务器或主服务器当前不可用,选择其它slave作为主服务器!");@b@ switchIp();@b@ }@b@ }@b@ @b@ //始终保持当前活动的服务为主服务状态@b@ private synchronized void changeMasterTo(String toip,int toport){@b@ destoryMasterAll();@b@ for (HAJedisInfo ha : HAJedisList){@b@ if (toip.equals(ha.getIp()) && (toport == ha.getPort())){@b@ if (!baseMaster.equals(currentMaster))destroyObject(currentMaster);@b@ this.currentMaster = new HAJedis(ha.getIp(),ha.getPort(),ha.getTimeout());@b@ if (baseMaster.getIp().equals(ha.getIp())@b@ &&baseMaster.getPort() == ha.getPort())@b@ {@b@ isBaseMaster = true;@b@ }else{@b@ isBaseMaster = false; @b@ }@b@ currentActiveHAJedis = ha;@b@ @b@ }@b@ }@b@ HAJedis jedis = new HAJedis(toip,toport);@b@ jedis.slaveofNoOne();@b@ destroyObject(jedis);@b@ //其它所有节点都设置从节点@b@ Jedis other = null;@b@ for (HAJedisInfo ha : HAJedisList){@b@ try{@b@ if (!toip.equals(ha.getIp()) && (toport != ha.getPort())){@b@ other = new HAJedis(ha.getIp(),ha.getPort());@b@ other.slaveof(toip, toport);@b@ }@b@ @b@ }catch(Exception e){}finally{@b@ destroyObject(other);@b@ }@b@ }@b@ for (ChangeMasterListener cm : obserers){@b@ cm.changeMaster(currentActiveHAJedis.getIp(), currentActiveHAJedis.getPort());@b@ }@b@ logger.warn("switchToMaster-->"+currentActiveHAJedis.getIp()+":"+currentActiveHAJedis.getPort());@b@ }@b@ @b@ private boolean checkIsAlive(Jedis jedis){@b@ try{@b@ return jedis.ping().equals("PONG");@b@ }catch(Exception e){@b@ logger.error(e.getMessage());@b@ try{@b@ jedis.disconnect();@b@ }catch(Exception e1){};@b@ }@b@ return false;@b@ }@b@ @b@ private boolean checkIsAlive(String ip,int port){@b@ HAJedis jedis = null;@b@ try{@b@ jedis = new HAJedis(ip,port);@b@ return jedis.ping().equals("PONG");@b@ }catch(Exception e){@b@ logger.error(e.getMessage());@b@ }finally{@b@ this.destroyObject(jedis);@b@ }@b@ return false;@b@ }@b@ @b@ public void destroyObject(final Object obj) {@b@ if ((obj != null) && (obj instanceof Jedis)) {@b@ Jedis jedis = (Jedis) obj;@b@ try {@b@ try {@b@ jedis.quit();@b@ } catch (Exception e) {@b@@b@ }@b@ jedis.disconnect();@b@ } catch (Exception e) {@b@@b@ }@b@@b@ }@b@ }@b@ //必须是主节点且是活动状态@b@ public boolean validateObject(final Object obj) {@b@ try {@b@ Jedis jedis = (Jedis) obj;@b@ return (currentMaster.equals(jedis)@b@ )@b@ &&checkIsAlive(jedis);@b@ } catch (Exception ex) {@b@ return false;@b@ }@b@ }@b@ @b@ private void checkConnection(){@b@ try{@b@ if (isBaseMaster && !checkIsAlive(baseMaster)){@b@ logger.warn("当前是basemaster,心跳检测到其不可用,选择一个slave替换为master");@b@ switchIp();@b@ }@b@ //检测可用时,如果当前不是master,则恢复到master@b@ //规则,把当前从的设置成主的,恢复的主的设置成从的,保证数据一致性@b@ if (!isBaseMaster && checkIsAlive(baseMaster)){@b@ logger.warn("检测basemaster可用,恢复到basemaster");@b@ String ip = baseMaster.getIp();@b@ int port = baseMaster.getPort();@b@ //同步数据@b@ logger.warn("同步新数据到master");@b@ changeMasterTo(ip,port);@b@ isBaseMaster = true;@b@ }@b@ //如果当前savle不可用,则换另一个savle@b@ if (!isBaseMaster && !checkIsAlive(currentMaster)){@b@ logger.warn("当前currentmaster不可用,换另一个slave为currentmaster");@b@ switchIp();@b@ }@b@ }catch(Exception e){@b@ logger.error(e.getMessage(),e);@b@ }@b@ @b@ }@b@@b@ class HeartBeat implements Runnable{@b@ @b@ Log logger = LogFactory.getLog("RedisSlaveManager");@b@ //心跳检测@b@ @Override@b@ public void run() {@b@ // TODO Auto-generated method stub@b@ while(true){@b@ checkConnection();@b@ try {@b@ Thread.sleep(1000);@b@ } catch (InterruptedException e) {@b@ // TODO Auto-generated catch block@b@ e.printStackTrace();@b@ }@b@ }@b@ @b@ }@b@ }@b@ @b@ }@b@ }@b@}@b@@b@interface ChangeMasterListener{@b@ public abstract void changeMaster(String ip,int port);@b@}
控制台打印结果
四月 12, 2019 12:48:11 上午 com.xwood.redis.slave.RedisManagerProxy intercept@b@信息: now get jedis from:192.168.1.202:6379@b@@b@@b@四月 12, 2019 12:48:12 上午 com.xwood.redis.slave.RedisManagerProxy intercept@b@信息: now get jedis from:192.168.1.202:6379@b@四月 12, 2019 12:48:12 上午 com.xwood.redis.slave.RedisManagerProxy intercept@b@信息: now get jedis from:192.168.1.202:6379@b@test@b@四月 12, 2019 12:48:12 上午 com.xwood.redis.slave.RedisManagerProxy$HAJedisPool$HAJedisSlaveFactory switchIp@b@信息: switchToSlave:127.0.0.1:6379@b@四月 12, 2019 12:48:12 上午 com.xwood.redis.slave.RedisManagerProxy intercept@b@信息: now get read jedis from:127.0.0.1:6379@b@test@b@四月 12, 2019 12:48:13 上午 com.xwood.redis.slave.RedisManagerProxy intercept@b@信息: now get read jedis from:127.0.0.1:6379@b@test@b@四月 12, 2019 12:48:14 上午 com.xwood.redis.slave.RedisManagerProxy intercept@b@信息: now get read jedis from:127.0.0.1:6379@b@test@b@四月 12, 2019 12:48:15 上午 com.xwood.redis.slave.RedisManagerProxy intercept@b@信息: now get read jedis from:127.0.0.1:6379@b@test@b@四月 12, 2019 12:48:16 上午 com.xwood.redis.slave.RedisManagerProxy intercept@b@信息: now get read jedis from:127.0.0.1:6379@b@test@b@四月 12, 2019 12:48:17 上午 com.xwood.redis.slave.RedisManagerProxy intercept@b@信息: now get read jedis from:127.0.0.1:6379@b@test@b@四月 12, 2019 12:48:18 上午 com.xwood.redis.slave.RedisManagerProxy intercept@b@信息: now get read jedis from:127.0.0.1:6379@b@test@b@四月 12, 2019 12:48:19 上午 com.xwood.redis.slave.RedisManagerProxy intercept@b@信息: now get read jedis from:127.0.0.1:6379@b@test@b@四月 12, 2019 12:48:20 上午 com.xwood.redis.slave.RedisManagerProxy intercept@b@信息: now get read jedis from:127.0.0.1:6379@b@test@b@四月 12, 2019 12:48:21 上午 com.xwood.redis.slave.RedisManagerProxy intercept@b@信息: now get read jedis from:127.0.0.1:6379@b@test@b@四月 12, 2019 12:48:22 上午 com.xwood.redis.slave.RedisManagerProxy intercept@b@信息: now get jedis from:192.168.1.202:6379@b@四月 12, 2019 12:48:27 上午 com.xwood.redis.slave.RedisManagerProxy intercept@b@信息: now get read jedis from:127.0.0.1:6379@b@...