首页

基于java的nio消息实现方式优缺点分析及示例代码说明

标签:java,nio,消息,源码,io对比,阻塞iO,非阻塞IO,nio远离     发布时间:2015-05-29   

java nio在jdk1.4版本中加入新io的功能,所有涉及类都在java.nio包下,改写了原有客户端服务器一对一线程阻塞等待的方式,通过事件驱动的方式监听消息,实际上通常采用Reactor模式2,提高系统整体资源利用率,下面具体分析对比io及nio同样实现消息方式的优缺点,同时对nio的实现方式具体分析及代码示例

一、阻塞I/O缺点

原i/o操作都是以字符为单位进行读写(虽然上层使用高级流进行封装不去直接处理字节流,但最终都是调用底层处理字符流),该操作是一次一个字节的处理数据,速度有点慢,通过InputStream的read()阻塞方法从流中读取数据,如数据源没数据,就会一直等到下去,每一个客户端服务器都会建立一个线程等到上面的等到阻塞操作,其他程序占用,所以造成系统资源无效浪费,不能有效的利用,这是新io非阻塞的io需要解决的问题,具体缺点如下:

1.系统资源利用率低,浪费硬件资源(见图1-1)-大部分线程都是在阻塞等到中(详细代码见“”)

ServerSocket ss=new ServerSocket(8080);@b@System.out.println("服务器已启动,等到客户端连接");@b@while(true){@b@   //监听并接受客户端连接,注意该方法有阻塞性@b@   Socket s=ss.accept();@b@   //将客户端的Socket连接添加到一个Vector集合@b@   vector.add(s);@b@   System.out.println("客户端连接成功!"+s);@b@   new MySocketOpt(s).start();@b@}

2.系统稳定度不够,服务端响应随着客户端的增加延时增加,每一个客户端需要建立一个线程,当到达一定的限制时,会是系统无法响应

1111111111111.jpg

图1-1

二、非阻塞NIO分析

NIO基于原有的I/O的改进和扩展,NIO与原有I/O不同,它是基于特殊缓冲区块(Buffer)进行高效的I/O操作。NIO的缓存区块与普通的缓冲区不同,它是一块连续的空间,它内存的分配不在Java的堆栈中,不受Java内存回收的影响;它的实现不是存Java的代码,而是本地代码,这样操作系统可以直接与缓冲区进行交互,Java程序只需要完成对缓冲区的读写,而后续操作由操作系统完成。异步通道Channel(又称频道)是NIO的新特征,通过Channel,Java应用程序能够更好地与操作系统的I/O服务结合起来,充分利用Buffer缓冲区,更高效地完成I/O操作。NIO新特征具体体现方面如下:

  1. 更灵活的可伸缩的I/O接口(scalable I/O),包括I/O抽象Channels的出现以及新的多元(multiplexed),非阻塞(non-blocking)的I/O机制。可以更灵活方便的构建产品级的应用服务,对于成千上万的客户端连接变动更可用,从而有效地利用多个处理器资

  2. 快速缓冲(fast buffered)的二进制和字符I/O接口。快速缓冲的二进制I/O API使得用户可以容易的编写出操作文件流或者二进制数据流的高性能代码。而快速缓存的字符I/O API使得用户可以高效地处理字符流和文件,此外它还将正则表达式引入到Java平台中来格式化用户的输入与输出

  3. 字符集的编码器和解码器。这些字符集转换API使得用户可以直接访问操作系统内置的字符集转换器,同时还支持那些外来的转换器

  4. 基于Perl风格正则表达式的匹配机制(A pattern-matching facility based on  Perl-style regular expressions)

  5. 改良的文件系统接口,支持锁定和内存映射(locks and memory mapping).此特征使得用户可以更加容易地处理各种文件系统操作中出现的问题,同时使得用户可以更加高效地访问大量的文件属性集。此外如果用户确实需要,还可以访问与平台相关的一些特征。最后,它还提供对非本地文件系统的支持,例如网络文件系统(network filesystems)。

  6. 新的I/O违例类可以使用户更加有针对性地来处理各种I/O错误,让用户能够在各种平台上一致地对待这些错误。

  7. 增加了对并发的支持,NIO类中的大部分方法都支持多个并发的线程

三、NIO具体工作原理

服务端专门有一个线程负责处理所有注册事件,并负责分发(wait/notify),但事件被触发时,分配独立线程完成响应过程(读取数据、解码、计算处理、编码、发送相应),如图3-1所示。java NIO采用双向通道(channel)进行数据传输,不是像之前I/O采用单向流(stream),并在通道上注册我们需要的事件:服务端接收客户端连接事件:SelectionKey.OP_ACCEPT(16)、客户端连接服务端事件:SelectionKey.OP_CONNECT(8)、读事件:SelectionKey.OP_READ(1)、写事件:SelectionKey.OP_WRITE(4)。

222222.jpg图3-1

服务器端和客户端各次有自己的时间选择对象Selector,它们通过在通道对象channel上注册想要的事件,进行建立会话通道,到事件发生时,将进行处理响应结果给对方,如下图3-2

33333.gif

图3-1

四、代码示例

客户端部分

客户端部分由client.java、ClientThread.java类组成,具体如下:

import java.io.*;@b@@b@public class Client {@b@@b@    public static void main(String[] args) {@b@        ClientThread client = new ClientThread();@b@        client.start();@b@        BufferedReader sin = new BufferedReader(@b@                new InputStreamReader(System.in));@b@        try {@b@            String readline;@b@            while ((readline = sin.readLine()) != null) {@b@                if (readline.equals("bye")) {@b@                    client.close();@b@                    System.exit(0);@b@                }@b@                client.send(readline);@b@            }@b@        } catch (IOException e) {@b@            e.printStackTrace();@b@        }@b@    }@b@}

服务端部分

服务端部分包括Server.java,具体如下:

import java.io.IOException;@b@import java.net.InetSocketAddress;@b@import java.nio.ByteBuffer;@b@import java.nio.CharBuffer;@b@import java.nio.channels.Selector;@b@import java.nio.channels.ServerSocketChannel;@b@import java.nio.channels.SelectionKey;@b@import java.nio.channels.SocketChannel;@b@import java.nio.charset.Charset;@b@import java.nio.charset.CharsetDecoder;@b@import java.nio.charset.CharsetEncoder;@b@import java.util.Iterator;@b@@b@public class Server {@b@    private static final int SERVERPORT=8890;@b@    public static void main(String[] args) {@b@    Selector selector = null;@b@    ServerSocketChannel server = null;@b@    try{@b@        selector = Selector.open();@b@        server = ServerSocketChannel.open();@b@        InetSocketAddress ip = new InetSocketAddress(SERVERPORT);@b@        server.socket().bind(ip);@b@        server.configureBlocking(false);@b@        server.register(selector,SelectionKey.OP_ACCEPT);@b@        System.out.println("服务器开始接受客户端连接");@b@        while(true){@b@            try {@b@               selector.select();@b@             } catch (Exception e) {@b@ @b@        }@b@        Iterator<SelectionKey>it = selector.selectedKeys().iterator();@b@        while(it.hasNext()){@b@            SelectionKey key = it.next();@b@            it.remove();@b@            if(key.isAcceptable()){@b@               ServerSocketChannel server2 = (ServerSocketChannel)key.channel();@b@               SocketChannel channel = server2.accept();@b@               channel.configureBlocking(false);@b@               channel.register(selector, SelectionKey.OP_READ|SelectionKey.OP_WRITE);@b@               System.out.println("客户端:"@b@               + channel.socket().getInetAddress().getHostName() + ":"@b@               + channel.socket().getPort() + " 连接上了");@b@           }@b@           if(key.isReadable()){@b@            SocketChannel channel = (SocketChannel)key.channel();@b@            CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();@b@            ByteBuffer buffer = ByteBuffer.allocate(50);@b@             try{@b@                  channel.read(buffer);@b@             }catch(IOException e){@b@    //            客户端异常断开连接@b@                 System.out.println("客户端:"@b@                  + channel.socket().getInetAddress().getHostName() + ":"@b@                  + channel.socket().getPort() + " 已断开连接");@b@                channel.close();@b@                continue;@b@            }@b@            buffer.flip();@b@            String msg = decoder.decode(buffer).toString();@b@            if (msg.equals("退出!@#$%")) {@b@//           客户端主动断开连接@b@             System.out.println("客户端:"@b@                  + channel.socket().getInetAddress().getHostName() + ":"@b@                   + channel.socket().getPort() + " 已断开连接");@b@                channel.close();@b@                continue;@b@            }@b@            System.out.println(@b@               channel.socket().getInetAddress().getHostName() + ":"@b@                 + channel.socket().getPort() + ":" + msg);@b@            if (key.isWritable()) {@b@//         System.out.println("可写");@b@              CharsetEncoder encoder = Charset.forName("UTF-8").newEncoder();@b@              try {@b@                channel.write(encoder.encode(CharBuffer.wrap("server receive your message ")));@b@             } catch (IOException e) {@b@                                // TODO: handle exception@b@                System.out.println("写入io错误");@b@                            }@b@                        }@b@                    }@b@                }@b@            }@b@        }catch(IOException e){@b@            e.printStackTrace();@b@        }finally{@b@            try{@b@                selector.close();@b@                server.close();@b@            }catch(IOException e){}@b@        }@b@    }@b@}