首页

通过Java的Semaphore控制协调线程资源的示例(线程同步及数据一致性)

标签:数据安全,同步,多线程,acquire,AbstractQueuedSynchronizer,Sync,concurrent,locks,redisson,分布式锁     发布时间:2017-02-20   

通过java的concurrent包对线程数量的控制,在分布式系统下控制数据的一致性或分布式事务起到重要的作用,下面是关于使用Semaphore(通过acquire()和release()获取和释放访问许可)线程控制示例

import java.util.concurrent.ExecutorService;@b@import java.util.concurrent.Executors;@b@import java.util.concurrent.Semaphore;@b@@b@public class SemaphoreTest {@b@	@b@	 public static void main(String[] args) {  @b@	        // 线程池 @b@	        ExecutorService exec = Executors.newCachedThreadPool();  @b@	        // 只能5个线程同时访问 @b@	        final Semaphore semp = new Semaphore(5);  @b@	        // 模拟20个客户端访问 @b@	        for (int index = 0; index < 20; index++) {@b@	            final int NO = index;  @b@	            Runnable run = new Runnable() {  @b@	                public void run() {  @b@	                    try {  @b@	                        // 获取许可 @b@	                        semp.acquire();  @b@	                        System.out.println(Thread.currentThread().getName()+" Accessing: " + NO);  @b@	                        Thread.sleep((long) (Math.random() * 10000));  @b@	                        // 访问完后,释放 ,如果屏蔽下面的语句,则在控制台只能打印5条记录,之后线程一直阻塞@b@	                        semp.release();  @b@	                    } catch (InterruptedException e) {  @b@	                    }  @b@	                }  @b@	            };  @b@	            exec.execute(run);  @b@	        }  @b@	        // 退出线程池 @b@	        exec.shutdown();  @b@	    }  @b@	@b@	@b@	 @b@}

控制台(5个线程释放完后,释放完后再继续打印....)

pool-1-thread-1 Accessing: 0@b@pool-1-thread-3 Accessing: 2@b@pool-1-thread-2 Accessing: 1@b@pool-1-thread-5 Accessing: 4@b@pool-1-thread-7 Accessing: 6

源码如下

import java.util.*;@b@import java.util.concurrent.locks.*;@b@import java.util.concurrent.atomic.*;@b@@b@@b@public class Semaphore implements java.io.Serializable {@b@	@b@    private static final long serialVersionUID = -3222578661600680210L;@b@    @b@    private final Sync sync;@b@@b@    abstract static class Sync extends AbstractQueuedSynchronizer {@b@        private static final long serialVersionUID = 1192457210091910933L;@b@@b@        Sync(int permits) {@b@            setState(permits);@b@        }@b@@b@        final int getPermits() {@b@            return getState();@b@        }@b@@b@        final int nonfairTryAcquireShared(int acquires) {@b@            for (;;) {@b@                int available = getState();@b@                int remaining = available - acquires;@b@                if (remaining < 0 ||@b@                    compareAndSetState(available, remaining))@b@                    return remaining;@b@            }@b@        }@b@@b@        protected final boolean tryReleaseShared(int releases) {@b@            for (;;) {@b@                int p = getState();@b@                if (compareAndSetState(p, p + releases))@b@                    return true;@b@            }@b@        }@b@@b@        final void reducePermits(int reductions) {@b@            for (;;) {@b@                int current = getState();@b@                int next = current - reductions;@b@                if (compareAndSetState(current, next))@b@                    return;@b@            }@b@        }@b@@b@        final int drainPermits() {@b@            for (;;) {@b@                int current = getState();@b@                if (current == 0 || compareAndSetState(current, 0))@b@                    return current;@b@            }@b@        }@b@    }@b@@b@    final static class NonfairSync extends Sync {@b@        private static final long serialVersionUID = -2694183684443567898L;@b@@b@        NonfairSync(int permits) {@b@            super(permits);@b@        }@b@@b@        protected int tryAcquireShared(int acquires) {@b@            return nonfairTryAcquireShared(acquires);@b@        }@b@    }@b@@b@    final static class FairSync extends Sync {@b@        private static final long serialVersionUID = 2014338818796000944L;@b@@b@        FairSync(int permits) {@b@            super(permits);@b@        }@b@@b@        protected int tryAcquireShared(int acquires) {@b@            Thread current = Thread.currentThread();@b@            for (;;) {@b@                Thread first = getFirstQueuedThread();@b@                if (first != null && first != current)@b@                    return -1;@b@                int available = getState();@b@                int remaining = available - acquires;@b@                if (remaining < 0 ||@b@                    compareAndSetState(available, remaining))@b@                    return remaining;@b@            }@b@        }@b@    }@b@@b@    public Semaphore(int permits) {@b@        sync = new NonfairSync(permits);@b@    }@b@@b@    public Semaphore(int permits, boolean fair) {@b@        sync = (fair)? new FairSync(permits) : new NonfairSync(permits);@b@    }@b@@b@    public void acquire() throws InterruptedException {@b@        sync.acquireSharedInterruptibly(1);@b@    }@b@@b@    public void acquireUninterruptibly() {@b@        sync.acquireShared(1);@b@    }@b@@b@    public boolean tryAcquire() {@b@        return sync.nonfairTryAcquireShared(1) >= 0;@b@    }@b@@b@    public boolean tryAcquire(long timeout, TimeUnit unit)@b@        throws InterruptedException {@b@        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));@b@    }@b@@b@    public void release() {@b@        sync.releaseShared(1);@b@    }@b@@b@    public void acquire(int permits) throws InterruptedException {@b@        if (permits < 0) throw new IllegalArgumentException();@b@        sync.acquireSharedInterruptibly(permits);@b@    }@b@@b@    public void acquireUninterruptibly(int permits) {@b@        if (permits < 0) throw new IllegalArgumentException();@b@        sync.acquireShared(permits);@b@    }@b@@b@    public boolean tryAcquire(int permits) {@b@        if (permits < 0) throw new IllegalArgumentException();@b@        return sync.nonfairTryAcquireShared(permits) >= 0;@b@    }@b@@b@   @b@    public boolean tryAcquire(int permits, long timeout, TimeUnit unit)@b@        throws InterruptedException {@b@        if (permits < 0) throw new IllegalArgumentException();@b@        return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));@b@    }@b@@b@   @b@    public void release(int permits) {@b@        if (permits < 0) throw new IllegalArgumentException();@b@        sync.releaseShared(permits);@b@    }@b@@b@    @b@    public int availablePermits() {@b@        return sync.getPermits();@b@    }@b@@b@    public int drainPermits() {@b@        return sync.drainPermits();@b@    }@b@@b@    protected void reducePermits(int reduction) {@b@	if (reduction < 0) throw new IllegalArgumentException();@b@        sync.reducePermits(reduction);@b@    }@b@@b@    public boolean isFair() {@b@        return sync instanceof FairSync;@b@    }@b@@b@    public final boolean hasQueuedThreads() {@b@        return sync.hasQueuedThreads();@b@    }@b@@b@    public final int getQueueLength() {@b@        return sync.getQueueLength();@b@    }@b@@b@    protected Collection<Thread> getQueuedThreads() {@b@        return sync.getQueuedThreads();@b@    }@b@@b@    public String toString() {@b@        return super.toString() + "[Permits = " + sync.getPermits() + "]";@b@    }@b@}