一、对比说明
BIO (Blocking I/O):同步阻塞I/O模式,传统的 java.io 包。@b@NIO (New I/O):同步非阻塞模式 - 是 Java 1.4 引入的 java.nio 包,提供了 Channel、Selector、Buffer 等新的抽象,可以构建多路复用的、同步非阻塞 IO 程序。@b@AIO (Asynchronous I/O):异步非阻塞I/O模型,是 Java 1.7 之后引入的包,是 NIO 的升级版本,提供了异步非堵塞的 IO 操作方式
完整代码示例参加《亿级流量Jαva高并发与网络编程实战》的ch06章节目录。
二、代码说明
1) io示例
package com.xwood.demo.chat.io;@b@@b@import java.io.FileOutputStream;@b@import java.io.IOException;@b@import java.io.InputStream;@b@import java.io.OutputStream;@b@import java.net.Socket;@b@import java.net.UnknownHostException;@b@@b@public class MyClient {@b@ public static void main(String[] args) throws UnknownHostException, IOException {@b@ //客户端 连接服务端发布的服务@b@ Socket socket = new Socket("127.0.0.1",8882);@b@ //接受服务端发来到文件@b@ InputStream in = socket.getInputStream() ;@b@ byte[] bs = new byte[64] ;@b@ int len = -1 ;@b@ OutputStream fileOut = new FileOutputStream("d:\\temp\\xwood_back.gif") ;@b@ while( (len =in.read(bs))!=-1 ) {@b@ fileOut.write(bs,0,len);@b@ }@b@ System.out.println("文件接收成功!");@b@ fileOut.close();@b@ in.close();@b@ socket.close();@b@ }@b@@b@}
package com.xwood.demo.chat.io;@b@@b@import java.io.IOException;@b@import java.net.ServerSocket;@b@import java.net.Socket;@b@@b@public class MyServer {@b@ public static void main(String[] args) throws IOException {@b@ //服务的地址: 本机ip:8888@b@ ServerSocket server = new ServerSocket(8882);@b@ //允许接收多个客户端连接@b@ while (true) {@b@ //一直阻塞,直到有客户端发来连接@b@ Socket socket = server.accept();@b@ //创建一个线程,用于给该客户端发送一个文件@b@ new Thread(new SendFile(socket)).start();@b@ }@b@ }@b@}
package com.xwood.demo.chat.io;@b@@b@import java.io.File;@b@import java.io.FileInputStream;@b@import java.io.InputStream;@b@import java.io.OutputStream;@b@import java.net.Socket;@b@@b@//服务端向客户端发送文件@b@public class SendFile implements Runnable{@b@ private Socket socket ;@b@ public SendFile(Socket socket) {@b@ this.socket = socket ;@b@ }@b@ @Override@b@ public void run() {@b@ try {@b@ System.out.println("连接成功!");@b@ OutputStream out = socket.getOutputStream() ;@b@ File file = new File("d:\\xwood_back.gif");@b@ InputStream fileIn = new FileInputStream(file) ;@b@ byte[] bs = new byte[64] ;@b@ int len = -1 ;@b@ while( (len=fileIn.read(bs)) !=-1 ) {@b@ out.write(bs,0,len);@b@ }@b@ fileIn.close();@b@ out.close();@b@ socket.close();@b@ }catch(Exception e) {@b@ e.printStackTrace();@b@ }@b@ }@b@}
2)nio示例
package com.xwood.demo.chat.nio;@b@@b@@b@import java.io.IOException;@b@import java.net.InetSocketAddress;@b@import java.net.ServerSocket;@b@import java.nio.ByteBuffer;@b@import java.nio.channels.SelectionKey;@b@import java.nio.channels.Selector;@b@import java.nio.channels.ServerSocketChannel;@b@import java.nio.channels.SocketChannel;@b@import java.nio.charset.Charset;@b@import java.util.HashMap;@b@import java.util.Iterator;@b@import java.util.Map;@b@import java.util.Set;@b@@b@@b@public class ChatServer {@b@ /*@b@ clientsMap:保存所有的客户端@b@ key:客户端的名字@b@ value:客户端连接服务端的Channel@b@ */@b@ private static Map<String, SocketChannel> clientsMap = new HashMap();@b@@b@ public static void main(String[] args) throws IOException {@b@ int[] ports = new int[]{7777,8889,9999};@b@ Selector selector = Selector.open();@b@@b@ for(int port:ports){@b@ ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();@b@ serverSocketChannel.configureBlocking(false);@b@@b@ ServerSocket serverSocket = serverSocketChannel.socket();@b@@b@ //将聊天服务绑定到7777、8888和9999三个端口上@b@ serverSocket.bind(new InetSocketAddress(port));@b@ System.out.println("服务端启动成功,端口"+port);@b@@b@ //在服务端的选择器上,注册一个通道,并标识该通道所感兴趣的事件是:接收客户端连接(接收就绪)@b@ serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);@b@@b@ }@b@@b@ while (true) {@b@ //一直阻塞,直到选择器上存在已经就绪的通道(包含感兴趣的事件)@b@ selector.select();@b@ //selectionKeys包含了所有通道与选择器之间的关系(接收连接、读、写)@b@ Set<SelectionKey> selectionKeys = selector.selectedKeys();@b@ Iterator<SelectionKey> keyIterator = selectionKeys.iterator();@b@ //如果selector中有多个就绪通道(接收就绪、读就绪、写就绪等),则遍历这些通道@b@ while (keyIterator.hasNext()) {@b@ SelectionKey selectedKey = keyIterator.next();@b@ String receive = null;@b@ //与客户端交互的通道@b@ SocketChannel clientChannel;@b@ try {@b@ //接收就绪(已经可以接收客户端的连接了)@b@ if (selectedKey.isAcceptable()) {@b@ ServerSocketChannel server = (ServerSocketChannel) selectedKey.channel();@b@ clientChannel = server.accept();@b@ //切换到非阻塞模式@b@ clientChannel.configureBlocking(false);@b@ //再在服务端的选择器上,注册第二个通道,并标识该通道所感兴趣的事件是:接收客户端发来的消息(读就绪)@b@ clientChannel.register(selector, SelectionKey.OP_READ);@b@ //用“key四位随机数”的形式模拟客户端的key值@b@ String key = "key" + (int) (Math.random() * 9000 + 1000);@b@ //将该建立完毕连接的 通道 保存到clientsMap中@b@ clientsMap.put(key, clientChannel);@b@ //读就绪(已经可以读取客户端发来的信息了)@b@ } else if (selectedKey.isReadable()) {@b@ clientChannel = (SocketChannel) selectedKey.channel();@b@ ByteBuffer readBuffer = ByteBuffer.allocate(1024);@b@ int result = -1 ;@b@ try {@b@ //将服务端读取到的客户端消息,放入readBuffer中@b@ result = clientChannel.read(readBuffer);@b@ //如果终止客户端,则read()会抛出IOException异常,可以依次判断是否有客户端退出。@b@ }catch (IOException e){@b@ //获取退出连接的client对应的key@b@ String clientKey = getClientKey(clientChannel);@b@ System.out.println("客户端"+clientKey+"退出聊天室");@b@ clientsMap.remove(clientKey);@b@ clientChannel.close();@b@ selectedKey.cancel();@b@@b@ continue;@b@ }@b@ if (result > 0) {@b@ readBuffer.flip();@b@ Charset charset = Charset.forName("utf-8");@b@ receive = String.valueOf(charset.decode(readBuffer).array());@b@ //将读取到的客户端消息,打印在服务端的控制台,格式: “客户端key,客户端消息”@b@ System.out.println(clientChannel + ":" + receive);@b@ //处理客户端第一次发来的连接测试信息@b@ if ("connecting".equals(receive)) {@b@ receive = "新客户端加入聊天!";@b@ }@b@ //将读取到的客户消息保存在attachment中,用于后续向所有客户端转发此消息@b@ selectedKey.attach(receive);@b@ //将通道所感兴趣的事件标识为:向客户端发送消息(写就绪)@b@ selectedKey.interestOps(SelectionKey.OP_WRITE);@b@ }@b@ //写就绪@b@ } else if (selectedKey.isWritable()) {@b@ clientChannel = (SocketChannel) selectedKey.channel();@b@ //获取发送消息从client对应的key@b@ String sendKey = getClientKey(clientChannel);@b@ //将接收到的消息,拼接成“发送消息的客户端Key:消息”的形式,再广播给所有client@b@ for (Map.Entry<String, SocketChannel> entry : clientsMap.entrySet()) {@b@ SocketChannel eachClient = entry.getValue();@b@ ByteBuffer broadcastMsg = ByteBuffer.allocate(1024);@b@ broadcastMsg.put((sendKey + ":" + selectedKey.attachment()).getBytes());@b@ broadcastMsg.flip();@b@ eachClient.write(broadcastMsg);@b@@b@ }@b@ selectedKey.interestOps(SelectionKey.OP_READ);@b@ }@b@ } catch (Exception e) {@b@ e.printStackTrace();@b@ }@b@ }@b@ selectionKeys.clear();@b@ }@b@ }@b@@b@ public static String getClientKey(SocketChannel clientChannel){@b@ String sendKey = null;@b@ //很多client在发下消息,通过for找到是哪个client在发消息,找到该client的key@b@ for (Map.Entry<String, SocketChannel> entry : clientsMap.entrySet()) {@b@ if (clientChannel == entry.getValue()) {@b@ //找到发送消息的client所对应的key@b@ sendKey = entry.getKey();@b@ break;@b@ }@b@ }@b@ return sendKey ;@b@ }@b@}
package com.xwood.demo.chat.nio;@b@@b@@b@import java.io.BufferedReader;@b@import java.io.IOException;@b@import java.io.InputStreamReader;@b@import java.net.InetSocketAddress;@b@import java.nio.ByteBuffer;@b@import java.nio.channels.SelectionKey;@b@import java.nio.channels.Selector;@b@import java.nio.channels.SocketChannel;@b@import java.util.Iterator;@b@import java.util.Set;@b@@b@@b@public class ChatClient {@b@ public static void main(String[] args) {@b@ try {@b@ SocketChannel socketChannel = SocketChannel.open();@b@ //切换到非阻塞模式@b@ socketChannel.configureBlocking(false);@b@ Selector selector = Selector.open();@b@ //在客户端的选择器上,注册一个通道,并标识该通道所感兴趣的事件是:向服务端发送连接(连接就绪)。对应于服务端的OP_ACCEPT事件@b@ socketChannel.register(selector, SelectionKey.OP_CONNECT);@b@ //随机连接到服务端提供的一个端口上@b@ //int[] ports = {7777,8889,9999};@b@ // int port = ports[(int)(Math.random()*3)] ;@b@ socketChannel.connect(new InetSocketAddress("127.0.0.1", 8889));@b@ while (true) {@b@ selector.select();@b@ //selectionKeys包含了所有通道与选择器之间的关系(请求连接、读、写)@b@ Set<SelectionKey> selectionKeys = selector.selectedKeys();@b@ Iterator<SelectionKey> keyIterator = selectionKeys.iterator();@b@ while (keyIterator.hasNext()) {@b@ SelectionKey selectedKey = keyIterator.next();@b@ //判断是否连接成功@b@ if (selectedKey.isConnectable()) {@b@ ByteBuffer sendBuffer = ByteBuffer.allocate(1024);@b@ //创建一个用于和服务端交互的Channel@b@ SocketChannel client = (SocketChannel) selectedKey.channel();@b@ //如果状态是:正在连接中...@b@ if (client.isConnectionPending()) {@b@ boolean isConnected = client.finishConnect();@b@ if (isConnected) {@b@ System.out.println("连接成功!访问的端口是:"+8889);@b@ //向服务端发送一条测试消息@b@ sendBuffer.put("connecting".getBytes());@b@ sendBuffer.flip();@b@ client.write(sendBuffer);@b@ }@b@@b@ //在“聊天室”中,对于客户端而言,可以随时向服务端发送消息(写操作),因此,需要建立一个单独写线程@b@ new Thread(() -> {@b@ while (true) {@b@ try {@b@ sendBuffer.clear();@b@ //接收用户从控制台输入的内容,并发送给服务端@b@ InputStreamReader reader = new InputStreamReader(System.in);@b@ BufferedReader bReader = new BufferedReader(reader);@b@ String message = bReader.readLine();@b@@b@ sendBuffer.put(message.getBytes());@b@ sendBuffer.flip();@b@ client.write(sendBuffer);@b@ } catch (Exception e) {@b@ e.printStackTrace();@b@ }@b@ }@b@ }).start();@b@ }@b@ //标记通道感兴趣的事件是:读取服务端消息(读就绪)@b@ client.register(selector, SelectionKey.OP_READ);@b@ //客户端读取服务端的反馈消息@b@ } else if (selectedKey.isReadable()) {@b@ SocketChannel client = (SocketChannel) selectedKey.channel();@b@ ByteBuffer readBuffer = ByteBuffer.allocate(1024);@b@ //将服务端的反馈消息放入readBuffer中@b@ int len = client.read(readBuffer);@b@ if (len > 0) {@b@ String receive = new String(readBuffer.array(), 0, len);@b@ System.out.println(receive);@b@ }@b@ }@b@ }@b@ selectionKeys.clear();@b@ }@b@@b@ } catch (IOException e) {@b@ e.printStackTrace();@b@ }@b@ }@b@}
3)aio示例
package com.xwood.demo.chat.aio;@b@@b@import java.net.InetSocketAddress;@b@import java.nio.ByteBuffer;@b@import java.nio.channels.AsynchronousServerSocketChannel;@b@import java.nio.channels.AsynchronousSocketChannel;@b@import java.nio.channels.CompletionHandler;@b@@b@public class AIOServer {@b@@b@ public static void main(String[] args) throws Exception {@b@ final AsynchronousServerSocketChannel channel = AsynchronousServerSocketChannel@b@ .open()@b@ .bind(new InetSocketAddress("127.0.0.1", 8881));@b@@b@ while (true) {@b@ //接收客户端请求的连接@b@ channel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() {@b@ //当接收到连接时,触发completed()@b@ @Override@b@ public void completed(final AsynchronousSocketChannel client, Void attachment) {@b@ channel.accept(null, this);@b@ ByteBuffer buffer = ByteBuffer.allocate(1024);@b@ //开接收客户端发来的消息@b@ client.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {@b@ //当接收到消息时,触发completed()@b@ @Override@b@ public void completed(Integer result_num, ByteBuffer dataBuffer) {@b@ dataBuffer.flip();@b@ String receive = new String(dataBuffer.array(), 0, dataBuffer.limit());@b@ System.out.println("接收到的客户端消息:" + receive);@b@ try {@b@ client.close();@b@ } catch (Exception e) {@b@ e.printStackTrace();//打印异常@b@ }@b@ }@b@ @Override@b@ public void failed(Throwable e, ByteBuffer attachment) {@b@ e.printStackTrace();@b@ }@b@ });@b@ }@b@@b@ @Override@b@ public void failed(Throwable e, Void attachment) {@b@ e.printStackTrace();@b@ }@b@ });@b@ for (; ; ) {@b@ System.out.println("main线程和用于读取客户端消息的线程是异步执行的...");@b@ Thread.sleep(1000);@b@ }@b@ }@b@ }@b@@b@}
package com.xwood.demo.chat.aio;@b@@b@import java.net.InetSocketAddress;@b@import java.nio.ByteBuffer;@b@import java.nio.channels.AsynchronousSocketChannel;@b@import java.util.concurrent.Future;@b@@b@public class AIOClient {@b@ public static void main(String[] args) throws Exception {@b@ AsynchronousSocketChannel channel = AsynchronousSocketChannel.open();@b@ channel.connect(new InetSocketAddress("127.0.0.1", 8881)).get();@b@ ByteBuffer buffer = ByteBuffer.wrap("Hello Server".getBytes());@b@ //向服务端发送消息@b@ Future<Integer> future = channel.write(buffer);@b@ while (!future.isDone()) {@b@ System.out.println("在channel将消息发送完毕以前,main可以异步处理其他事情..");@b@ Thread.sleep(1000);@b@ }@b@ Integer len = future.get();@b@ System.out.println("发送完毕!共发送字节数:"+len);@b@@b@ }@b@}