1、我们在使用netty的是有都会存在将channelBuffer的数据处理成相应的String或者自定义数据。而这里主要是介绍管道里面存在的上行和下行的数据处理方式
2、通过一张图片来看一下具体管道中的实现过程
一个Channel中包括一个Socket、一个ChannelPipeline。一个ChannelPipeline中有一个ChannelSink和多个ChannelHandler。ChannelHandler分为两种:UpstremHandler、DownstreamHandler。
不论是读数据还是写数据都要经过Channel中的ChannelPipeline。读数据的过程是从Socket到ChannelPipeline,由ChannelPipeline交给里面的UpstreamHandler(或者叫做InBoundHandler)从下到上依次处理 。写数据时,由要经过ChannelPipeline里面在DownStreamHandler(或者是OutBoundHandler)由上到下依次处理。
3、因为UpstremHandler与DownstreamHandler的实现方式大同小异,我这里写的例子是UpstremHandler的例子
package com.troy.application.upstream;import org.jboss.netty.bootstrap.ServerBootstrap;import org.jboss.netty.channel.ChannelPipeline;import org.jboss.netty.channel.ChannelPipelineFactory;import org.jboss.netty.channel.Channels;import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;import org.jboss.netty.handler.timeout.IdleStateHandler;import org.jboss.netty.util.HashedWheelTimer;import java.net.InetSocketAddress;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class Server { public static void main(String[] args) { //声明服务类 ServerBootstrap serverBootstrap = new ServerBootstrap(); //设定线程池 ExecutorService boss = Executors.newCachedThreadPool(); ExecutorService work = Executors.newCachedThreadPool(); //设置工厂 serverBootstrap.setFactory(new NioServerSocketChannelFactory(boss,work)); //设置管道流 serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() throws Exception { ChannelPipeline channelPipeline = Channels.pipeline(); //添加处理方式 channelPipeline.addLast("idle",new IdleStateHandler(new HashedWheelTimer(),5,5,10)); channelPipeline.addLast("handler1",new Handler1()); channelPipeline.addLast("handler2",new Handler2()); return channelPipeline; } }); //设置端口 serverBootstrap.bind(new InetSocketAddress(9000)); }}
package com.troy.application.upstream;import org.jboss.netty.buffer.ChannelBuffer;import org.jboss.netty.channel.ChannelHandlerContext;import org.jboss.netty.channel.MessageEvent;import org.jboss.netty.channel.SimpleChannelHandler;import org.jboss.netty.channel.UpstreamMessageEvent;public class Handler1 extends SimpleChannelHandler { @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { ChannelBuffer channelBuffer = (ChannelBuffer) e.getMessage(); byte[] array = channelBuffer.array(); String message = new String(array); System.out.println("handler1"+message); ctx.sendUpstream(new UpstreamMessageEvent(ctx.getChannel(),message,e.getRemoteAddress())); }}
package com.troy.application.upstream;import org.jboss.netty.channel.ChannelHandlerContext;import org.jboss.netty.channel.MessageEvent;import org.jboss.netty.channel.SimpleChannelHandler;public class Handler2 extends SimpleChannelHandler { @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { String message = (String) e.getMessage(); System.out.println("handler2"+message); }}
说明:这里最重要的两个方法是sendUpstream和sendDownstream。这两个方式在上行和下行的处理基本上是一样的。在源码里面handler的处理都会存在sendUpstream和sendDownstream。这个两个方法也是多重处理的基础。