通过java的concurrent包对线程数量的控制,在分布式系统下控制数据的一致性或分布式事务起到重要的作用,下面是关于使用Semaphore(通过acquire()和release()获取和释放访问许可)线程控制示例
import java.util.concurrent.ExecutorService;@b@import java.util.concurrent.Executors;@b@import java.util.concurrent.Semaphore;@b@@b@public class SemaphoreTest {@b@ @b@ public static void main(String[] args) { @b@ // 线程池 @b@ ExecutorService exec = Executors.newCachedThreadPool(); @b@ // 只能5个线程同时访问 @b@ final Semaphore semp = new Semaphore(5); @b@ // 模拟20个客户端访问 @b@ for (int index = 0; index < 20; index++) {@b@ final int NO = index; @b@ Runnable run = new Runnable() { @b@ public void run() { @b@ try { @b@ // 获取许可 @b@ semp.acquire(); @b@ System.out.println(Thread.currentThread().getName()+" Accessing: " + NO); @b@ Thread.sleep((long) (Math.random() * 10000)); @b@ // 访问完后,释放 ,如果屏蔽下面的语句,则在控制台只能打印5条记录,之后线程一直阻塞@b@ semp.release(); @b@ } catch (InterruptedException e) { @b@ } @b@ } @b@ }; @b@ exec.execute(run); @b@ } @b@ // 退出线程池 @b@ exec.shutdown(); @b@ } @b@ @b@ @b@ @b@}
控制台(5个线程释放完后,释放完后再继续打印....)
pool-1-thread-1 Accessing: 0@b@pool-1-thread-3 Accessing: 2@b@pool-1-thread-2 Accessing: 1@b@pool-1-thread-5 Accessing: 4@b@pool-1-thread-7 Accessing: 6
源码如下
import java.util.*;@b@import java.util.concurrent.locks.*;@b@import java.util.concurrent.atomic.*;@b@@b@@b@public class Semaphore implements java.io.Serializable {@b@ @b@ private static final long serialVersionUID = -3222578661600680210L;@b@ @b@ private final Sync sync;@b@@b@ abstract static class Sync extends AbstractQueuedSynchronizer {@b@ private static final long serialVersionUID = 1192457210091910933L;@b@@b@ Sync(int permits) {@b@ setState(permits);@b@ }@b@@b@ final int getPermits() {@b@ return getState();@b@ }@b@@b@ final int nonfairTryAcquireShared(int acquires) {@b@ for (;;) {@b@ int available = getState();@b@ int remaining = available - acquires;@b@ if (remaining < 0 ||@b@ compareAndSetState(available, remaining))@b@ return remaining;@b@ }@b@ }@b@@b@ protected final boolean tryReleaseShared(int releases) {@b@ for (;;) {@b@ int p = getState();@b@ if (compareAndSetState(p, p + releases))@b@ return true;@b@ }@b@ }@b@@b@ final void reducePermits(int reductions) {@b@ for (;;) {@b@ int current = getState();@b@ int next = current - reductions;@b@ if (compareAndSetState(current, next))@b@ return;@b@ }@b@ }@b@@b@ final int drainPermits() {@b@ for (;;) {@b@ int current = getState();@b@ if (current == 0 || compareAndSetState(current, 0))@b@ return current;@b@ }@b@ }@b@ }@b@@b@ final static class NonfairSync extends Sync {@b@ private static final long serialVersionUID = -2694183684443567898L;@b@@b@ NonfairSync(int permits) {@b@ super(permits);@b@ }@b@@b@ protected int tryAcquireShared(int acquires) {@b@ return nonfairTryAcquireShared(acquires);@b@ }@b@ }@b@@b@ final static class FairSync extends Sync {@b@ private static final long serialVersionUID = 2014338818796000944L;@b@@b@ FairSync(int permits) {@b@ super(permits);@b@ }@b@@b@ protected int tryAcquireShared(int acquires) {@b@ Thread current = Thread.currentThread();@b@ for (;;) {@b@ Thread first = getFirstQueuedThread();@b@ if (first != null && first != current)@b@ return -1;@b@ int available = getState();@b@ int remaining = available - acquires;@b@ if (remaining < 0 ||@b@ compareAndSetState(available, remaining))@b@ return remaining;@b@ }@b@ }@b@ }@b@@b@ public Semaphore(int permits) {@b@ sync = new NonfairSync(permits);@b@ }@b@@b@ public Semaphore(int permits, boolean fair) {@b@ sync = (fair)? new FairSync(permits) : new NonfairSync(permits);@b@ }@b@@b@ public void acquire() throws InterruptedException {@b@ sync.acquireSharedInterruptibly(1);@b@ }@b@@b@ public void acquireUninterruptibly() {@b@ sync.acquireShared(1);@b@ }@b@@b@ public boolean tryAcquire() {@b@ return sync.nonfairTryAcquireShared(1) >= 0;@b@ }@b@@b@ public boolean tryAcquire(long timeout, TimeUnit unit)@b@ throws InterruptedException {@b@ return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));@b@ }@b@@b@ public void release() {@b@ sync.releaseShared(1);@b@ }@b@@b@ public void acquire(int permits) throws InterruptedException {@b@ if (permits < 0) throw new IllegalArgumentException();@b@ sync.acquireSharedInterruptibly(permits);@b@ }@b@@b@ public void acquireUninterruptibly(int permits) {@b@ if (permits < 0) throw new IllegalArgumentException();@b@ sync.acquireShared(permits);@b@ }@b@@b@ public boolean tryAcquire(int permits) {@b@ if (permits < 0) throw new IllegalArgumentException();@b@ return sync.nonfairTryAcquireShared(permits) >= 0;@b@ }@b@@b@ @b@ public boolean tryAcquire(int permits, long timeout, TimeUnit unit)@b@ throws InterruptedException {@b@ if (permits < 0) throw new IllegalArgumentException();@b@ return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));@b@ }@b@@b@ @b@ public void release(int permits) {@b@ if (permits < 0) throw new IllegalArgumentException();@b@ sync.releaseShared(permits);@b@ }@b@@b@ @b@ public int availablePermits() {@b@ return sync.getPermits();@b@ }@b@@b@ public int drainPermits() {@b@ return sync.drainPermits();@b@ }@b@@b@ protected void reducePermits(int reduction) {@b@ if (reduction < 0) throw new IllegalArgumentException();@b@ sync.reducePermits(reduction);@b@ }@b@@b@ public boolean isFair() {@b@ return sync instanceof FairSync;@b@ }@b@@b@ public final boolean hasQueuedThreads() {@b@ return sync.hasQueuedThreads();@b@ }@b@@b@ public final int getQueueLength() {@b@ return sync.getQueueLength();@b@ }@b@@b@ protected Collection<Thread> getQueuedThreads() {@b@ return sync.getQueuedThreads();@b@ }@b@@b@ public String toString() {@b@ return super.toString() + "[Permits = " + sync.getPermits() + "]";@b@ }@b@}