一、前言
关于netty源码包org.jboss.netty.bootstrap.ClientBootstrap、org.jboss.netty.channel.Channel、org.jboss.netty.channel.ChannelPipeline等基类自定义netty客户端NettyHttpClient实现类(对应服务端NettyServer类参见其他文章),详情参见代码示例。
二、代码示例
import java.net.InetSocketAddress;@b@import java.nio.channels.ClosedChannelException;@b@import java.util.concurrent.Executors;@b@@b@import org.jboss.netty.bootstrap.ClientBootstrap;@b@import org.jboss.netty.channel.Channel;@b@import org.jboss.netty.channel.ChannelFuture;@b@import org.jboss.netty.channel.ChannelHandlerContext;@b@import org.jboss.netty.channel.ChannelPipeline;@b@import org.jboss.netty.channel.ChannelPipelineFactory;@b@import org.jboss.netty.channel.ChannelStateEvent;@b@import org.jboss.netty.channel.Channels;@b@import org.jboss.netty.channel.ExceptionEvent;@b@import org.jboss.netty.channel.MessageEvent;@b@import org.jboss.netty.channel.SimpleChannelUpstreamHandler;@b@import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;@b@import org.jboss.netty.handler.codec.http.HttpChunkAggregator;@b@import org.jboss.netty.handler.codec.http.HttpRequestEncoder;@b@import org.jboss.netty.handler.codec.http.HttpResponse;@b@import org.jboss.netty.handler.codec.http.HttpResponseDecoder;@b@@b@public class NettyHttpClient {@b@@b@ private final int workerCount;@b@ private ClientBootstrap bootstrap;@b@@b@ public NettyHttpClient(int workerCount) {@b@ this.workerCount = workerCount;@b@ this.bootstrap = null;@b@ }@b@@b@ public void init() {@b@ bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(@b@ Executors.newCachedThreadPool(), @b@ Executors.newCachedThreadPool(), 1, workerCount));@b@ bootstrap.setOption("child.keepAlive", true); @b@ bootstrap.setOption("child.tcpNoDelay", true); @b@ bootstrap.setPipelineFactory(new ChannelPipelineFactory() {@b@ @Override@b@ public ChannelPipeline getPipeline() throws Exception {@b@ ChannelPipeline pipeline = Channels.pipeline();@b@ pipeline.addLast("decoder", new HttpResponseDecoder());@b@ pipeline.addLast("aggregator", new HttpChunkAggregator(1048576));@b@ pipeline.addLast("encode", new HttpRequestEncoder());@b@ return pipeline;@b@ }@b@ });@b@ }@b@@b@ public void destory() {@b@ try {@b@ if (bootstrap != null) {@b@ bootstrap.releaseExternalResources();@b@ bootstrap = null;@b@ }@b@ } catch (Exception e) {@b@ }@b@ }@b@@b@ public void sendRequest(NettyHttpRequest request) {@b@ String uri = request.getUri();@b@ String host = null;@b@ Handler handler = new Handler(request);@b@ int port = 80;@b@ int host_start = uri.indexOf("://");@b@ if (host_start < 0) {@b@ handler.onException(new Exception("Invalid request uri!"));@b@ return ;@b@ }@b@ try {@b@ host_start += 3;@b@ int port_start = uri.indexOf(':', host_start);@b@ int port_end = uri.indexOf('/', host_start);@b@ int host_end = port_start > 0 ? port_start : port_end > 0 ? port_end : uri.length();@b@ host = uri.substring(host_start, host_end);@b@ if (port_start > 0) {@b@ port_start += 1;@b@ if (port_end > 0) {@b@ port = Integer.parseInt(uri.substring(port_start, port_end));@b@ } else {@b@ port = Integer.parseInt(uri.substring(port_start));@b@ }@b@ }@b@// request.setUri(port_end > 0 ? uri.substring(port_end) : "/");@b@ } catch (Exception e) {@b@ handler.onException(new Exception("Invalid request uri!", e));@b@ return ;@b@ }@b@@b@ try {@b@ if (bootstrap == null) init();@b@ ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));@b@ Channel channel = future.getChannel();@b@ channel.getPipeline().addLast("handler", handler);@b@ } catch (Exception e) {@b@ handler.onException(new Exception("Connection refused!", e));@b@ }@b@ }@b@ @b@ /**@b@ * handler@b@ */@b@@b@ private class Handler extends SimpleChannelUpstreamHandler {@b@@b@ private final NettyHttpRequest request;@b@ private volatile boolean finished = false;@b@@b@ public Handler(NettyHttpRequest request) {@b@ this.request = request;@b@ }@b@@b@ @Override@b@ public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {@b@ e.getChannel().write(request.getOriginRequest());@b@ }@b@@b@ @Override@b@ public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {@b@ onException(new Exception("Connection refused!"));@b@ }@b@ @b@ public void onException(Throwable e) {@b@ if (finished == false) {@b@ finished = true;@b@ if (request.getOnError() != null) {@b@ request.getOnError().onException(e);@b@ }@b@ }@b@ }@b@@b@ @Override@b@ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {@b@ if ((e.getCause() instanceof ClosedChannelException) == false) {@b@ if (e.getChannel().isOpen()) {@b@ try { e.getChannel().close(); } catch (Exception ex) { }@b@ }@b@ }@b@ onException(e.getCause());@b@ }@b@@b@ @Override@b@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {@b@ Object message = e.getMessage();@b@ if (message instanceof HttpResponse) {@b@ if (finished == false) {@b@ finished = true;@b@ if (request.getOnSuccess() != null) {@b@ request.getOnSuccess().onResponse(new NettyHttpResponse(request, (HttpResponse)message));@b@ }@b@ }@b@ }@b@ ctx.getChannel().close();@b@ }@b@ }@b@ @b@}