首页

基于netty源码包ClientBootstrap/ChannelFuture/Channels/ChannelPipeline自定义客户端NettyHttpClient实现类代码示例

标签:netty,ClientBootstrap,ChannelFuture,Channels,ChannelPipeline,自定义netty客户端,NettyHttpClient     发布时间:2018-08-06   

一、前言

关于netty源码包org.jboss.netty.bootstrap.ClientBootstrap、org.jboss.netty.channel.Channel、org.jboss.netty.channel.ChannelPipeline等基类自定义netty客户端NettyHttpClient实现类(对应服务端NettyServer类参见其他文章),详情参见代码示例。

二、代码示例

import java.net.InetSocketAddress;@b@import java.nio.channels.ClosedChannelException;@b@import java.util.concurrent.Executors;@b@@b@import org.jboss.netty.bootstrap.ClientBootstrap;@b@import org.jboss.netty.channel.Channel;@b@import org.jboss.netty.channel.ChannelFuture;@b@import org.jboss.netty.channel.ChannelHandlerContext;@b@import org.jboss.netty.channel.ChannelPipeline;@b@import org.jboss.netty.channel.ChannelPipelineFactory;@b@import org.jboss.netty.channel.ChannelStateEvent;@b@import org.jboss.netty.channel.Channels;@b@import org.jboss.netty.channel.ExceptionEvent;@b@import org.jboss.netty.channel.MessageEvent;@b@import org.jboss.netty.channel.SimpleChannelUpstreamHandler;@b@import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;@b@import org.jboss.netty.handler.codec.http.HttpChunkAggregator;@b@import org.jboss.netty.handler.codec.http.HttpRequestEncoder;@b@import org.jboss.netty.handler.codec.http.HttpResponse;@b@import org.jboss.netty.handler.codec.http.HttpResponseDecoder;@b@@b@public class NettyHttpClient {@b@@b@	private final int workerCount;@b@	private ClientBootstrap bootstrap;@b@@b@	public NettyHttpClient(int workerCount) {@b@		this.workerCount = workerCount;@b@		this.bootstrap = null;@b@	}@b@@b@	public void init() {@b@		bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(@b@				Executors.newCachedThreadPool(), @b@				Executors.newCachedThreadPool(), 1, workerCount));@b@		bootstrap.setOption("child.keepAlive", true); @b@		bootstrap.setOption("child.tcpNoDelay", true);  @b@		bootstrap.setPipelineFactory(new ChannelPipelineFactory() {@b@			@Override@b@			public ChannelPipeline getPipeline() throws Exception {@b@				ChannelPipeline pipeline = Channels.pipeline();@b@				pipeline.addLast("decoder", new HttpResponseDecoder());@b@				pipeline.addLast("aggregator", new HttpChunkAggregator(1048576));@b@				pipeline.addLast("encode", new HttpRequestEncoder());@b@				return pipeline;@b@			}@b@		});@b@	}@b@@b@	public void destory() {@b@		try {@b@			if (bootstrap != null) {@b@				bootstrap.releaseExternalResources();@b@				bootstrap = null;@b@			}@b@		} catch (Exception e) {@b@		}@b@	}@b@@b@	public void sendRequest(NettyHttpRequest request) {@b@		String uri = request.getUri();@b@		String host = null;@b@		Handler handler = new Handler(request);@b@		int port = 80;@b@		int host_start = uri.indexOf("://");@b@		if (host_start < 0) {@b@			handler.onException(new Exception("Invalid request uri!"));@b@			return ;@b@		}@b@		try {@b@			host_start += 3;@b@			int port_start = uri.indexOf(':', host_start);@b@			int port_end = uri.indexOf('/', host_start);@b@			int host_end = port_start > 0 ? port_start : port_end > 0 ? port_end : uri.length();@b@			host = uri.substring(host_start, host_end);@b@			if (port_start > 0) {@b@				port_start += 1;@b@				if (port_end > 0) {@b@					port = Integer.parseInt(uri.substring(port_start, port_end));@b@				} else {@b@					port = Integer.parseInt(uri.substring(port_start));@b@				}@b@			}@b@//			request.setUri(port_end > 0 ? uri.substring(port_end) : "/");@b@		} catch (Exception e) {@b@			handler.onException(new Exception("Invalid request uri!", e));@b@			return ;@b@		}@b@@b@		try {@b@			if (bootstrap == null) init();@b@			ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));@b@			Channel channel = future.getChannel();@b@			channel.getPipeline().addLast("handler", handler);@b@		} catch (Exception e) {@b@			handler.onException(new Exception("Connection refused!", e));@b@		}@b@	}@b@	@b@	/**@b@	 * handler@b@	 */@b@@b@	private class Handler extends SimpleChannelUpstreamHandler {@b@@b@		private final NettyHttpRequest request;@b@		private volatile boolean finished = false;@b@@b@		public Handler(NettyHttpRequest request) {@b@			this.request = request;@b@		}@b@@b@		@Override@b@		public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {@b@			e.getChannel().write(request.getOriginRequest());@b@		}@b@@b@		@Override@b@		public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {@b@			onException(new Exception("Connection refused!"));@b@		}@b@		@b@		public void onException(Throwable e) {@b@			if (finished == false) {@b@				finished = true;@b@				if (request.getOnError() != null) {@b@					request.getOnError().onException(e);@b@				}@b@			}@b@		}@b@@b@		@Override@b@		public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {@b@			if ((e.getCause() instanceof ClosedChannelException) == false) {@b@				if (e.getChannel().isOpen()) {@b@					try { e.getChannel().close(); } catch (Exception ex)  { }@b@				}@b@			}@b@			onException(e.getCause());@b@		}@b@@b@		@Override@b@		public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {@b@			Object message = e.getMessage();@b@			if (message instanceof HttpResponse) {@b@				if (finished == false) {@b@					finished = true;@b@					if (request.getOnSuccess() != null) {@b@						request.getOnSuccess().onResponse(new NettyHttpResponse(request, (HttpResponse)message));@b@					}@b@				}@b@			}@b@			ctx.getChannel().close();@b@		}@b@	}@b@ @b@}