一、前言
基于redis的客户端jedis分别基于其setnx(首次赋值返回1,其余的情况返回0的方式,且redis服务器端操作都是单线程队列操作的)、multi事务、watch监控器三种不同方式实现乐观锁,应用于在分布式高并发处理等相关场景。
二、代码示例
1. RedisLock类 - 其中 lock是基于setnx实现加锁、lock_2是基于multi事务的方式、lock_3是watch加Transaction,具体代码如下
import java.util.Random;@b@import redis.clients.jedis.Jedis;@b@import redis.clients.jedis.JedisPool;@b@import redis.clients.jedis.Transaction;@b@@b@@b@public class RedisLock {@b@ @b@ //加锁标志@b@ public static final String LOCKED = "TRUE";@b@ public static final long ONE_MILLI_NANOS = 1000000L;@b@ //默认超时时间(毫秒)@b@ public static final long DEFAULT_TIME_OUT = 3000;@b@ public static JedisPool pool;@b@ public static final Random r = new Random();@b@ //锁的超时时间(秒),过期删除@b@ public static final int EXPIRE = 5 * 60;@b@ @b@ private Jedis jedis;@b@ private String key;@b@ //锁状态标志@b@ private boolean locked = false;@b@@b@ public RedisLock(String key) {@b@ this.key = key;@b@ this.jedis= new Jedis("127.0.0.1",6379,60000);@b@ }@b@@b@ /**@b@ * 通过jedis.setnx实现锁@b@ * @param timeout@b@ * @return@b@ */@b@ public boolean lock(long timeout) {@b@ long nano = System.nanoTime();@b@ timeout *= ONE_MILLI_NANOS;@b@ try {@b@ while ((System.nanoTime() - nano) < timeout) {@b@ if (jedis.setnx(key, LOCKED) == 1) {@b@ jedis.expire(key, EXPIRE);@b@ locked = true;@b@ return locked;@b@ }@b@ // 短暂休眠,nano避免出现活锁@b@ Thread.sleep(3, r.nextInt(500));@b@ }@b@ } catch (Exception e) {@b@ }@b@ return false;@b@ }@b@ @b@ /**@b@ * 事务和管道都是异步模式。在事务和管道中不能同步查询结果,因此下面 t.getSet(key, LOCKED);只能被一个线程查询@b@ * 否则线程获取不到@b@ * @param timeout@b@ * @return@b@ */@b@ public boolean lock_2(long timeout) {@b@ long nano = System.nanoTime();@b@ timeout *= ONE_MILLI_NANOS;@b@ try {@b@ while ((System.nanoTime() - nano) < timeout) {@b@ Transaction t = jedis.multi();@b@ // 开启事务,当server端收到multi指令@b@ // 会将该client的命令放入一个队列,然后依次执行,知道收到exec指令@b@ t.getSet(key, LOCKED);@b@ t.expire(key, EXPIRE);@b@ String ret = (String) t.exec().get(0);@b@ System.out.println(Thread.currentThread()+" ========== "+ret);@b@ if(ret!=null&&ret.equalsIgnoreCase("TRUE"))@b@ return true;@b@// if (ret == null || ret.equals("UNLOCK")) {@b@// return true;@b@// }@b@ // 短暂休眠,nano避免出现活锁@b@ Thread.sleep(3, r.nextInt(500));@b@ }@b@ } catch (Exception e) {@b@ }@b@ return false;@b@ }@b@ @b@ public boolean lock_3(long timeout) {@b@ long nano = System.nanoTime();@b@ timeout *= ONE_MILLI_NANOS;@b@ try {@b@ while ((System.nanoTime() - nano) < timeout) {@b@ jedis.watch(key);@b@ // 开启watch之后,如果key的值被修改,则事务失败,exec方法返回null@b@ String value = jedis.get(key);@b@ if (value == null || value.equals("UNLOCK")) {@b@ Transaction t = jedis.multi();@b@ t.setex(key, EXPIRE, LOCKED);@b@ if (t.exec() != null) {@b@ return true;@b@ }@b@ }@b@ jedis.unwatch();@b@ // 短暂休眠,nano避免出现活锁@b@ Thread.sleep(3, r.nextInt(500));@b@ }@b@ } catch (Exception e) {@b@ }@b@ return false;@b@ }@b@ @b@ public boolean lock() {@b@ return lock(DEFAULT_TIME_OUT);@b@ }@b@@b@ // 无论是否加锁成功,必须调用@b@ public void unlock() {@b@ if (locked)@b@ jedis.del(key);@b@ }@b@@b@}
2. RedisLockTest类 - 通过三个线程同时进行add的递减,通过加锁可以控制add按照顺序递减10,9,8,7..3,2,1,否则在多线程上下文切换的情况下无法正常打印
2.1 - 如果没有加锁测试类,代码如下
public class RedisLockTest {@b@@b@ private static volatile int add=10;@b@ @b@ public static void main(String[] args) {@b@ @b@ Runnable handler=new Runnable(){@b@ @Override@b@ public void run() {@b@ while(add>0){@b@ System.out.println(Thread.currentThread().toString()+" ———————— add@"+add);@b@ add--;@b@ try {@b@ Thread.sleep(500);@b@ } catch (InterruptedException e) {@b@ }@b@ }@b@ }};@b@ @b@ new Thread(handler).start();@b@ new Thread(handler).start();@b@ new Thread(handler).start();@b@@b@ }@b@}
控制台打印结果
Thread[Thread-0,5,main] ———————— add@10@b@Thread[Thread-1,5,main] ———————— add@10@b@Thread[Thread-2,5,main] ———————— add@8@b@Thread[Thread-0,5,main] ———————— add@7@b@Thread[Thread-1,5,main] ———————— add@6@b@Thread[Thread-2,5,main] ———————— add@6@b@Thread[Thread-0,5,main] ———————— add@4@b@Thread[Thread-1,5,main] ———————— add@4@b@Thread[Thread-2,5,main] ———————— add@4@b@Thread[Thread-1,5,main] ———————— add@1@b@Thread[Thread-0,5,main] ———————— add@1
2.2 setnx方式配置lock测试类,代码如下
public class RedisLockTest {@b@@b@ private static volatile int add=10;@b@ @b@ public static void main(String[] args) {@b@ @b@ Runnable handler=new Runnable(){@b@ @Override@b@ public void run() {@b@ RedisLock mylock=new RedisLock("testlock1");@b@ if(mylock.lock(300000)){@b@ while(add>0){@b@ System.out.println(Thread.currentThread().toString()+" ———————— add@"+add);@b@ add--;@b@ try {@b@ Thread.sleep(500);@b@ } catch (InterruptedException e) {@b@ }@b@ }@b@ }@b@ mylock.unlock();@b@ }};@b@ @b@ new Thread(handler).start();@b@ new Thread(handler).start();@b@ new Thread(handler).start();@b@@b@ }@b@@b@}
控制台打印结果如下
Thread[Thread-0,5,main] ———————— add@10@b@Thread[Thread-0,5,main] ———————— add@9@b@Thread[Thread-0,5,main] ———————— add@8@b@Thread[Thread-0,5,main] ———————— add@7@b@Thread[Thread-0,5,main] ———————— add@6@b@Thread[Thread-0,5,main] ———————— add@5@b@Thread[Thread-0,5,main] ———————— add@4@b@Thread[Thread-0,5,main] ———————— add@3@b@Thread[Thread-0,5,main] ———————— add@2@b@Thread[Thread-0,5,main] ———————— add@1
2.3 lock_3是watch加Transaction的方式测试类
public class RedisLockTest {@b@@b@ private static volatile int add=10;@b@ @b@ public static void main(String[] args) {@b@ @b@ Runnable handler=new Runnable(){@b@ @Override@b@ public void run() {@b@ RedisLock mylock=new RedisLock("testlock3");@b@ if(mylock.lock_3(300000)){@b@ while(add>0){@b@ System.out.println(Thread.currentThread().toString()+" ———————— add@"+add);@b@ add--;@b@ try {@b@ Thread.sleep(500);@b@ } catch (InterruptedException e) {@b@ }@b@ }@b@ }@b@ mylock.unlock();@b@ }};@b@ @b@ new Thread(handler).start();@b@ new Thread(handler).start();@b@ new Thread(handler).start();@b@@b@ }@b@@b@}
控制台结果
Thread[Thread-2,5,main] ———————— add@10@b@Thread[Thread-2,5,main] ———————— add@9@b@Thread[Thread-2,5,main] ———————— add@8@b@Thread[Thread-2,5,main] ———————— add@7@b@Thread[Thread-2,5,main] ———————— add@6@b@Thread[Thread-2,5,main] ———————— add@5@b@Thread[Thread-2,5,main] ———————— add@4@b@Thread[Thread-2,5,main] ———————— add@3@b@Thread[Thread-2,5,main] ———————— add@2@b@Thread[Thread-2,5,main] ———————— add@1