一、前言
关于event源码包中定义的event.Callback、event.Event、event.Loop事件相关接口及抽象类,然后结合TCP服务客户端超时链接事件Event进行源码设计和实现,具体参考下面源码说明部分。
二、源码说明
1.Callback、Event接口及Loop抽象类
package event;@b@@b@import java.io.PrintStream;@b@import java.nio.ByteBuffer;@b@import java.nio.channels.ServerSocketChannel;@b@import java.nio.channels.SocketChannel;@b@@b@public abstract interface Callback@b@{@b@ public static final ErrorCallback DEFAULT_ERROR_CB = new DefaultErrorCallback();@b@@b@ public static abstract class TCPServerCB@b@ implements Callback.ErrorCallback@b@ {@b@ public void onConnect(TCPServerLoop l, ServerSocketChannel ssc)@b@ {@b@ }@b@@b@ public abstract void onAccept(TCPServerLoop paramTCPServerLoop, ServerSocketChannel paramServerSocketChannel, SocketChannel paramSocketChannel);@b@@b@ public void onClose(TCPServerLoop l, ServerSocketChannel ssc)@b@ {@b@ }@b@@b@ public void onError(Loop l, Throwable t)@b@ {@b@ l.onError(t); }@b@@b@ public void onError(Loop l, String msg) {@b@ l.onError(msg); }@b@@b@ public void onError(TCPServerLoop l, ServerSocketChannel ssc, Throwable t) {@b@ onError(l, t);@b@ }@b@ }@b@@b@ public static abstract class TCPClientCB@b@ implements Callback.ErrorCallback@b@ {@b@ public void onConnect(TCPClientLoop l, SocketChannel c)@b@ {@b@ }@b@@b@ public abstract void onData(TCPClientLoop paramTCPClientLoop, SocketChannel paramSocketChannel, ByteBuffer paramByteBuffer);@b@@b@ public void onClose(TCPClientLoop l, SocketChannel c)@b@ {@b@ }@b@@b@ public void onError(Loop l, Throwable t)@b@ {@b@ l.onError(t); }@b@@b@ public void onError(Loop l, String msg) {@b@ l.onError(msg); }@b@@b@ public void onError(TCPClientLoop l, SocketChannel c, Throwable t) {@b@ onError(l, t);@b@ }@b@ }@b@@b@ public static class DefaultErrorCallback@b@ implements Callback.ErrorCallback@b@ {@b@ public void onError(Loop l, Throwable t)@b@ {@b@ t.printStackTrace();@b@ System.exit(1); }@b@@b@ public void onError(Loop l, String msg) {@b@ System.err.println(msg);@b@ Thread.currentThread(); Thread.dumpStack();@b@ System.exit(1);@b@ }@b@ }@b@@b@ public static abstract interface ErrorCallback extends Callback@b@ {@b@ public abstract void onError(Loop paramLoop, Throwable paramThrowable);@b@@b@ public abstract void onError(Loop paramLoop, String paramString);@b@ }@b@}
package event;@b@@b@public abstract interface Event@b@{@b@ public static abstract class Timeout@b@ implements Event@b@ {@b@ long timeout;@b@@b@ public Timeout()@b@ {@b@ }@b@@b@ public Timeout(long ms)@b@ {@b@ this.timeout = ms;@b@ }@b@@b@ public long getTimeout() {@b@ return this.timeout;@b@ }@b@@b@ public abstract void go(TimeoutLoop paramTimeoutLoop);@b@ }@b@}
package event;@b@@b@import java.io.IOException;@b@import java.io.PrintStream;@b@import java.nio.channels.Selector;@b@import java.nio.channels.spi.SelectorProvider;@b@@b@public abstract class Loop extends Thread@b@{@b@ protected long maxSleep;@b@ volatile boolean stopped;@b@ protected Thread loopThread;@b@ protected Callback.ErrorCallback errCB;@b@ protected Selector selector;@b@@b@ public Loop()@b@ {@b@ this.maxSleep = 0L;@b@ try@b@ {@b@ this.selector = SelectorProvider.provider().openSelector();@b@ } catch (IOException ioe) {@b@ throw new RuntimeException(ioe);@b@ }@b@ }@b@@b@ public Loop(Callback.ErrorCallback cb)@b@ {@b@ setErrCB(cb);@b@ }@b@@b@ public void run() {@b@ this.loopThread = Thread.currentThread();@b@ int numSelected = 0;@b@ if (!(this.stopped))@b@ try@b@ {@b@ numSelected = this.selector.select(this.maxSleep);@b@ this.maxSleep = 0L;@b@ go();@b@ } catch (Throwable t) {@b@ onError(t);@b@ }@b@ }@b@@b@ public boolean isLoopThread()@b@ {@b@ return ((this.loopThread != null) && (Thread.currentThread().equals(this.loopThread)));@b@ }@b@@b@ protected abstract void go()@b@ throws Throwable;@b@@b@ protected void onError(Throwable t)@b@ {@b@ if (null != this.errCB)@b@ this.errCB.onError(this, t);@b@ else@b@ Callback.DEFAULT_ERROR_CB.onError(this, t);@b@ }@b@@b@ protected void onError(String msg) {@b@ if (null != this.errCB)@b@ this.errCB.onError(this, msg);@b@ else@b@ Callback.DEFAULT_ERROR_CB.onError(this, msg);@b@ }@b@@b@ public void wake()@b@ {@b@ this.selector.wakeup();@b@ }@b@@b@ public void stopLoop()@b@ {@b@ this.stopped = true;@b@ wake();@b@ }@b@@b@ public void setErrCB(Callback.ErrorCallback errCB) {@b@ this.errCB = errCB;@b@ }@b@@b@ static void p(Object o) {@b@ System.out.println(o);@b@ }@b@}
2.TimeoutLoop超时实现类
package event;@b@@b@import java.io.PrintStream;@b@import java.util.LinkedList;@b@import java.util.PriorityQueue;@b@import java.util.Queue;@b@@b@public class TimeoutLoop extends Loop@b@{@b@ private Queue<T> timeouts;@b@ private LinkedList<T> newTimeouts;@b@ long loopTime;@b@@b@ public TimeoutLoop()@b@ {@b@ this.timeouts = new PriorityQueue();@b@ this.newTimeouts = new LinkedList();@b@ }@b@@b@ protected void go() {@b@ if ((!($assertionsDisabled)) && (!(isLoopThread()))) throw new AssertionError();@b@@b@ this.loopTime = System.nanoTime();@b@ handleTimeouts();@b@@b@ handleNewTimeouts();@b@ }@b@@b@ private void handleNewTimeouts() {@b@ if ((!($assertionsDisabled)) && (!(isLoopThread()))) throw new AssertionError();@b@@b@ synchronized (this.newTimeouts) {@b@ this.timeouts.addAll(this.newTimeouts);@b@ setMaxSleep();@b@@b@ this.newTimeouts.clear();@b@ }@b@ }@b@@b@ private void setMaxSleep()@b@ {@b@ if ((!($assertionsDisabled)) && (!(isLoopThread()))) throw new AssertionError();@b@@b@ T timeout = (T)this.timeouts.peek();@b@@b@ long sleep = (null == timeout) ? 0L : max(1000000L, timeout.time - this.loopTime);@b@ sleep /= 1000000L;@b@@b@ this.maxSleep = sleep;@b@ }@b@@b@ private int handleTimeouts() {@b@ if ((!($assertionsDisabled)) && (!(isLoopThread()))) throw new AssertionError();@b@@b@ if (this.timeouts.size() == 0) {@b@ return 0;@b@ }@b@@b@ int count = 0;@b@ T timeout = null;@b@ do@b@ {@b@ timeout = (T)this.timeouts.peek();@b@ if (this.loopTime < timeout.time) break;@b@ timeout.ev.go(this);@b@ this.timeouts.remove(timeout);@b@ ++count;@b@ if (timeout.interval) {@b@ timeout.time = (this.loopTime + timeout.ev.getTimeout() * 1000000L);@b@ this.timeouts.add(timeout);@b@ }@b@@b@ }@b@@b@ while (0 != this.timeouts.size());@b@@b@ return count;@b@ }@b@@b@ public void addTimeout(Event.Timeout ev) {@b@ addTimeout(ev, false); }@b@@b@ public void addInterval(Event.Timeout ev) {@b@ addTimeout(ev, true);@b@ }@b@@b@ private void addTimeout(Event.Timeout ev, boolean interval)@b@ {@b@ long timesOutOn = System.nanoTime() + ev.getTimeout() * 1000000L;@b@ T t = new T(this, timesOutOn, ev, interval);@b@@b@ synchronized (this.newTimeouts)@b@ {@b@ this.newTimeouts.add(t);@b@ }@b@ if (!(isLoopThread()))@b@ wake();@b@ }@b@@b@ private static long min(long one, long two)@b@ {@b@ if (one < two)@b@ return one;@b@@b@ return two; }@b@@b@ private static long max(long one, long two) {@b@ if (one > two)@b@ return one;@b@@b@ return two;@b@ }@b@@b@ public static void main(String[] args) throws Throwable@b@ {@b@ TimeoutLoop loop = new TimeoutLoop();@b@@b@ loop.start();@b@ loop.addTimeout(new Event.Timeout(750L) {@b@ public void go(TimeoutLoop l) { TimeoutLoop.p("timeout");@b@ }@b@@b@ });@b@ loop.addTimeout(new Event.Timeout() {@b@ public void go(TimeoutLoop l) { TimeoutLoop.p("timeout-1");@b@ }@b@@b@ });@b@ loop.addTimeout(new Event.Timeout(850L) {@b@ public void go(TimeoutLoop l) { TimeoutLoop.p("timeout2");@b@ }@b@@b@ });@b@ loop.addTimeout(new Event.Timeout(150L) {@b@ int i;@b@@b@ public void go(TimeoutLoop l) { TimeoutLoop.p("timeout0");@b@ this.i += 1;@b@ if (this.i > 3) return;@b@ l.addTimeout(this);@b@ }@b@@b@ });@b@ loop.addInterval(new Event.Timeout(100L) {@b@ public void go(TimeoutLoop l) { TimeoutLoop.p("interval");@b@ }@b@@b@ });@b@ Thread.sleep(1000L);@b@ loop.stopLoop();@b@ }@b@@b@ static void p(Object o) {@b@ System.out.println(o); }@b@@b@ class T implements Comparable<T> {@b@ Event.Timeout ev;@b@ long time;@b@ boolean interval;@b@@b@ T(, long paramLong, Event.Timeout paramTimeout, boolean paramBoolean) { this.time = paramLong;@b@ this.ev = ev;@b@ this.interval = paramBoolean; }@b@@b@ public int compareTo() {@b@ return (int)(this.time - o.time);@b@ }@b@ }@b@}
3.TCPClientLoop、TCPServerLoop类
package event;@b@@b@import java.io.IOException;@b@import java.net.InetAddress;@b@import java.net.InetSocketAddress;@b@import java.net.Socket;@b@import java.net.SocketAddress;@b@import java.nio.ByteBuffer;@b@import java.nio.channels.ClosedChannelException;@b@import java.nio.channels.SelectionKey;@b@import java.nio.channels.Selector;@b@import java.nio.channels.SocketChannel;@b@import java.util.Iterator;@b@import java.util.LinkedList;@b@import java.util.Queue;@b@import java.util.Set;@b@@b@public class TCPClientLoop extends TimeoutLoop@b@{@b@ private final ByteBuffer buf = ByteBuffer.allocateDirect(65535);@b@@b@ public SocketChannel createTCPClient(Callback.TCPClientCB cb, String host, int port)@b@ {@b@ SocketChannel sc;@b@ try@b@ {@b@ sc = SocketChannel.open();@b@@b@ SocketAddress remote = new InetSocketAddress(InetAddress.getByName(host), port);@b@@b@ sc.configureBlocking(false);@b@ if (isLoopThread())@b@ sc.register(this.selector, 8, new R(this, sc, cb));@b@ else@b@ addTimeout(new Event.Timeout(this, sc, cb) {@b@ public void go() {@b@ TCPClientLoop l = (TCPClientLoop)loop;@b@ try {@b@ this.val$sc.register(l.selector, 8, new TCPClientLoop.R(this.this$0, this.val$sc, this.val$cb));@b@ } catch (ClosedChannelException cce) {@b@ this.val$cb.onError(l, this.val$sc, cce);@b@ }@b@ }@b@ });@b@@b@ sc.connect(remote);@b@ return sc;@b@ } catch (Throwable t) {@b@ cb.onError(this, t);@b@ }@b@ return null; }@b@@b@ public void createTCPClient(Callback.TCPClientCB cb, SocketChannel sc) {@b@ try {@b@ if (null == sc) {@b@ cb.onError(this, "channel is null");@b@ return;@b@ }@b@ if (!(sc.isConnected())) {@b@ cb.onError(this, "channel not connected!");@b@ return;@b@ }@b@ if (sc.isBlocking()) {@b@ sc.configureBlocking(false);@b@ if (sc.isBlocking()) {@b@ cb.onError(this, "can't make channel non-blocking");@b@ return;@b@ }@b@ }@b@ if (isLoopThread())@b@ sc.register(this.selector, 1, new R(this, sc, cb));@b@ else@b@ addTimeout(new Event.Timeout(this, sc, cb) {@b@ public void go() {@b@ TCPClientLoop loop = (TCPClientLoop)l;@b@ try {@b@ this.val$sc.register(loop.selector, 1, new TCPClientLoop.R(this.this$0, this.val$sc, this.val$cb));@b@ } catch (ClosedChannelException cce) {@b@ this.val$cb.onError(loop, this.val$sc, cce);@b@ }@b@ }@b@ });@b@ }@b@ catch (Throwable t)@b@ {@b@ cb.onError(this, t);@b@ }@b@ }@b@@b@ public void write(SocketChannel sc, Callback.TCPClientCB cb, byte[] bytes)@b@ {@b@ write(sc, cb, ByteBuffer.wrap(bytes)); }@b@@b@ public void write(SocketChannel sc, Callback.TCPClientCB cb, ByteBuffer buffer) {@b@ if (!(isLoopThread())) {@b@ addTimeout(new Event.Timeout(this, sc, cb, buffer) {@b@ public void go() {@b@ ((TCPClientLoop)loop).write(this.val$sc, this.val$cb, this.val$buffer);@b@ }@b@@b@ });@b@ return;@b@ }@b@@b@ SelectionKey key = sc.keyFor(this.selector);@b@ if (null == key) {@b@ cb.onError(this, sc, new RuntimeException("not a previously configured channel!"));@b@ }@b@ else {@b@ key.interestOps(key.interestOps() | 0x4);@b@@b@ R r_orig = (R)key.attachment();@b@ r_orig.push(buffer);@b@ }@b@ }@b@@b@ public void close(SocketChannel sc, Callback.TCPClientCB client)@b@ {@b@ addTimeout(new Event.Timeout(this, sc, client) {@b@ public void go() {@b@ try {@b@ this.val$sc.close();@b@ } catch (Throwable t) {@b@ this.val$client.onError(this.this$0, this.val$sc, t);@b@ }@b@ }@b@ });@b@ }@b@@b@ public void go() {@b@ if ((!($assertionsDisabled)) && (!(isLoopThread()))) throw new AssertionError();@b@ Iterator keys = this.selector.selectedKeys().iterator();@b@ while (true) { SelectionKey key;@b@ do { if (!(keys.hasNext())) break label119;@b@ key = (SelectionKey)keys.next();@b@ keys.remove();@b@ if ((key.isValid()) && (key.isConnectable()))@b@ handleConnect(key);@b@@b@ if ((key.isValid()) && (key.isReadable()))@b@ handleRead(key);@b@ }@b@ while ((!(key.isValid())) || (!(key.isWritable())));@b@ handleWrite(key);@b@ }@b@@b@ label119: super.go();@b@ }@b@@b@ private void handleRead(SelectionKey key)@b@ {@b@ if ((!($assertionsDisabled)) && (!(isLoopThread()))) throw new AssertionError();@b@@b@ SocketChannel sc = (SocketChannel)key.channel();@b@ Callback.TCPClientCB cb = ((R)key.attachment()).cb;@b@@b@ this.buf.clear();@b@ int i = 0;@b@ try {@b@ i = sc.read(this.buf);@b@ } catch (IOException ioe) {@b@ cb.onError(this, sc, ioe);@b@ return;@b@ }@b@ if (-1 == i) {@b@ cb.onClose(this, sc);@b@@b@ key.interestOps(key.interestOps() & 0xFFFFFFFE);@b@ }@b@ else@b@ {@b@ this.buf.flip();@b@ cb.onData(this, sc, this.buf);@b@ }@b@ }@b@@b@ private void handleWrite(SelectionKey key)@b@ {@b@ if ((!($assertionsDisabled)) && (!(isLoopThread()))) throw new AssertionError();@b@ SocketChannel sc = (SocketChannel)key.channel();@b@ R r = (R)key.attachment();@b@@b@ Queue data = r.bufferList;@b@ ByteBuffer buffer = null;@b@ while (null != (buffer = (ByteBuffer)data.peek())) {@b@ long num;@b@ try {@b@ num = sc.write(buffer);@b@ }@b@ catch (IOException ioe) {@b@ r.cb.onError(this, sc, ioe);@b@@b@ return;@b@ }@b@ if (buffer.remaining() != 0)@b@ {@b@ break;@b@ }@b@ data.remove();@b@ }@b@@b@ if (data.isEmpty())@b@ {@b@ key.interestOps(key.interestOps() & 0xFFFFFFFB);@b@ }@b@ }@b@@b@ private void handleConnect(SelectionKey key)@b@ {@b@ if ((!($assertionsDisabled)) && (!(isLoopThread()))) throw new AssertionError();@b@@b@ SocketChannel sc = (SocketChannel)key.channel();@b@ Callback.TCPClientCB cb = ((R)key.attachment()).cb;@b@ try {@b@ sc.finishConnect();@b@ } catch (IOException ioe) {@b@ cb.onError(this, sc, ioe);@b@ return;@b@ }@b@ cb.onConnect(this, sc);@b@@b@ int io = key.interestOps();@b@ io |= 1;@b@ io &= -9;@b@ key.interestOps(io);@b@ }@b@@b@ public void shutdownOutput(SocketChannel sc, Callback.TCPClientCB cb)@b@ {@b@ try@b@ {@b@ sc.socket().shutdownOutput();@b@ } catch (IOException ioe) {@b@ cb.onError(this, sc, ioe);@b@ }@b@ }@b@@b@ static String bin(int num) {@b@ return Integer.toBinaryString(num); }@b@@b@ public static void main(String[] args) {@b@ TCPClientLoop loop = new TCPClientLoop();@b@ loop.start();@b@ Callback.TCPClientCB cb = new Callback.TCPClientCB() {@b@ public void onConnect(TCPClientLoop l, SocketChannel ch) {@b@ TimeoutLoop.p("onConnect: " + ch);@b@ byte[] bs = "GET / HTTP/1.1\r\n\r\n".getBytes();@b@ l.write(ch, this, ByteBuffer.wrap(bs)); }@b@@b@ public void onData(TCPClientLoop l, SocketChannel ch, ByteBuffer b) {@b@ TimeoutLoop.p("onData: " + b);@b@@b@ byte[] bs = "GET / HTTP/1.1\r\n\r\n".getBytes();@b@ l.write(ch, this, ByteBuffer.wrap(bs)); }@b@@b@ public void onClose(TCPClientLoop l, SocketChannel ch) {@b@ TimeoutLoop.p("closed: " + ch);@b@ SelectionKey key = ch.keyFor(l.selector);@b@ TimeoutLoop.p(key);@b@ }@b@@b@ };@b@ loop.createTCPClient(cb, args[0], 8000); }@b@@b@ class R {@b@ SocketChannel channel;@b@ Callback.TCPClientCB cb;@b@ ByteBuffer buffer;@b@ Queue<ByteBuffer> bufferList;@b@@b@ R(, SocketChannel paramSocketChannel, Callback.TCPClientCB paramTCPClientCB) {@b@ this.channel = paramSocketChannel;@b@ this.cb = paramTCPClientCB;@b@ this.bufferList = new LinkedList(); }@b@@b@ void push() {@b@ this.bufferList.add(buffer);@b@ }@b@ }@b@}
package event;@b@@b@import java.io.IOException;@b@import java.net.ServerSocket;@b@import java.net.SocketAddress;@b@import java.nio.channels.ClosedChannelException;@b@import java.nio.channels.SelectionKey;@b@import java.nio.channels.Selector;@b@import java.nio.channels.ServerSocketChannel;@b@import java.nio.channels.SocketChannel;@b@import java.util.Iterator;@b@import java.util.Set;@b@@b@public class TCPServerLoop extends TCPClientLoop@b@{@b@ public void createTCPServer(Callback.TCPServerCB cb, SocketAddress sa)@b@ {@b@ ServerSocketChannel ssc;@b@ try@b@ {@b@ ssc = ServerSocketChannel.open();@b@ ssc.configureBlocking(false);@b@ ssc.socket().bind(sa);@b@ cb.onConnect(this, ssc);@b@ if (isLoopThread())@b@ ssc.register(this.selector, 16, cb);@b@ else@b@ addTimeout(new Event.Timeout(this, ssc, cb) {@b@ public void go() {@b@ try {@b@ this.val$ssc.register(l.selector, 16, this.val$cb);@b@ } catch (ClosedChannelException cce) {@b@ this.val$cb.onError((TCPServerLoop)l, this.val$ssc, cce);@b@ }@b@ }@b@ });@b@ }@b@ catch (IOException ioe) {@b@ cb.onError(this, ioe);@b@ }@b@ }@b@@b@ public void go() {@b@ if ((!($assertionsDisabled)) && (!(isLoopThread()))) throw new AssertionError();@b@ Iterator keys = this.selector.selectedKeys().iterator();@b@ while (true) { SelectionKey key;@b@ do { if (!(keys.hasNext())) break label81;@b@ key = (SelectionKey)keys.next(); }@b@ while ((!(key.isValid())) || (!(key.isAcceptable())));@b@ keys.remove();@b@ handleAccept(key);@b@ }@b@@b@ label81: super.go();@b@ }@b@@b@ public void handleAccept(SelectionKey key) {@b@ if ((!($assertionsDisabled)) && (!(isLoopThread()))) throw new AssertionError();@b@ if ((!($assertionsDisabled)) && (!(key.isAcceptable()))) throw new AssertionError();@b@@b@ ServerSocketChannel ssc = (ServerSocketChannel)key.channel();@b@ Callback.TCPServerCB cb = (Callback.TCPServerCB)key.attachment();@b@ SocketChannel sc = null;@b@ try {@b@ sc = ssc.accept();@b@ } catch (IOException ioe) {@b@ cb.onError(this, ssc, ioe);@b@ return;@b@ }@b@ cb.onAccept(this, ssc, sc);@b@ }@b@}