Handler在netty中,無疑占據著非常重要的地位。Handler與Servlet中的filter很像,通過Handler可以完成通訊報文的解碼編碼、攔截指定的報文、統壹對日誌錯誤進行處理、統壹對請求進行計數、控制Handler執行與否。壹句話,沒有它做不到的只有妳想不到的。
Netty中的所有handler都實現自ChannelHandler接口。按照輸出輸出來分,分為ChannelInboundHandler、ChannelOutboundHandler兩大類。ChannelInboundHandler對從客戶端發往服務器的報文進行處理,壹般用來執行解碼、讀取客戶端數據、進行業務處理等;ChannelOutboundHandler對從服務器發往客戶端的報文進行處理,壹般用來進行編碼、發送報文到客戶端。
Netty中,可以註冊多個handler。ChannelInboundHandler按照註冊的先後順序執行;ChannelOutboundHandler按照註冊的先後順序逆序執行,如下圖所示,按照註冊的先後順序對Handler進行排序,request進入Netty後的執行順序為:
2、demo案例
如上即為本測試demo的例子:
2.1、服務器端
EchoServer.java
package com.example.nettynew.server;
import com.example.nettynew.hander.EchoInHandler1;
import com.example.nettynew.hander.EchoInHandler2;
import com.example.nettynew.hander.EchoOutHandler2;
import com.example.nettynew.hander.EchoOuteHandler1;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* ? 配置服務器功能,如線程、端口 ? 實現服務器處理程序,它包含業務邏輯,決定當有壹個請求連接或接收數據時該做什麽
*/
@Component
public class EchoServer {
@Autowired
EchoInHandler1 echoInHandler1;
@Autowired
EchoInHandler2 echoInHandler2;
@Autowired
EchoOuteHandler1 echoOuteHandler1;
@Autowired
EchoOutHandler2 echoOutHandler2;
public void start() throws Exception {
EventLoopGroup eventLoopGroup = null;
try {
//server端引導類
ServerBootstrap serverBootstrap = new ServerBootstrap();
//連接池處理數據
eventLoopGroup = new NioEventLoopGroup();
serverBootstrap.group(eventLoopGroup)
.channel(NioServerSocketChannel.class)//指定通道類型為NioServerSocketChannel,壹種異步模式,OIO阻塞模式為OioServerSocketChannel
.localAddress("localhost", 8099)//設置InetSocketAddress讓服務器監聽某個端口已等待客戶端連接。
.childHandler(new ChannelInitializer<Channel>() {//設置childHandler執行所有的連接請求
@Override
protected void initChannel(Channel ch) throws Exception {
// 註冊兩個InboundHandler,執行順序為註冊順序,所以應該是InboundHandler1 InboundHandler2
// 註冊兩個OutboundHandler,執行順序為註冊順序的逆序,所以應該是OutboundHandler2 OutboundHandler1
ch.pipeline().addLast(echoInHandler1);
ch.pipeline().addLast(echoOuteHandler1);
ch.pipeline().addLast(echoOutHandler2);
ch.pipeline().addLast(echoInHandler2);
}
});
// 最後綁定服務器等待直到綁定完成,調用sync()方法會阻塞直到服務器完成綁定,然後服務器等待通道關閉,因為使用sync(),所以關閉操作也會被阻塞。
ChannelFuture channelFuture = serverBootstrap.bind().sync();
System.out.println("開始監聽,端口為:" + channelFuture.channel().localAddress());
channelFuture.channel().closeFuture().sync();
} finally {
eventLoopGroup.shutdownGracefully().sync();
}
}
}
EchoInHandler1.java
package com.example.nettynew.hander;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.springframework.stereotype.Component;
@Component
public class EchoInHandler1 extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println("in1");
// 通知執行下壹個InboundHandler
ctx.fireChannelRead(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();//刷新後才將數據發出到SocketChannel
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
cause.printStackTrace();
ctx.close();
}
}
EchoInHandler2.java
package com.example.nettynew.hander;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.springframework.stereotype.Component;
import java.util.Date;
@Component
public class EchoInHandler2 extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
System.out.println("in2");
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8");
System.out.println("接收客戶端數據:" + body);
//向客戶端寫數據
System.out.println("server向client發送數據");
String currentTime = new Date(System.currentTimeMillis()).toString();
ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
ctx.write(resp);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();//刷新後才將數據發出到SocketChannel
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
cause.printStackTrace();
ctx.close();
}
}
EchoOuteHandler1.java
package com.example.nettynew.hander;
import java.util.Date;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import org.springframework.stereotype.Component;
@Component
public class EchoOuteHandler1 extends ChannelOutboundHandlerAdapter {
@Override
// 向client發送消息
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("out1");
/*System.out.println(msg);*/
String currentTime = new Date(System.currentTimeMillis()).toString();
ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
ctx.write(resp);
ctx.flush();
}
}
EchoOutHandler2.java
package com.example.nettynew.hander;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import org.springframework.stereotype.Component;
@Component
public class EchoOutHandler2 extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("out2");
// 執行下壹個OutboundHandler
/*System.out.println("at first..msg = "+msg);
msg = "hi newed in out2";*/
super.write(ctx, msg, promise);
}
}
2.2、客戶端
EchoClient.java
package send_order.client;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.net.InetSocketAddress;
/**
* ? 連接服務器 ? 寫數據到服務器 ? 等待接受服務器返回相同的數據 ? 關閉連接
*
*
*
*/
public class EchoClient {
private final String host;
private final int port;
public EchoClient(String host, int port) {
this.host = host;
this.port = port;
}
public void start() throws Exception {
EventLoopGroup nioEventLoopGroup = null;
try {
// 客戶端引導類
Bootstrap bootstrap = new Bootstrap();
// EventLoopGroup可以理解為是壹個線程池,這個線程池用來處理連接、接受數據、發送數據
nioEventLoopGroup = new NioEventLoopGroup();
bootstrap.group(nioEventLoopGroup)//多線程處理
.channel(NioSocketChannel.class)//指定通道類型為NioServerSocketChannel,壹種異步模式,OIO阻塞模式為OioServerSocketChannel
.remoteAddress(new InetSocketAddress(host, port))//地址
.handler(new ChannelInitializer<SocketChannel>() {//業務處理類
@Override
protected void initChannel(SocketChannel ch)
throws Exception {
ch.pipeline().addLast(new EchoClientHandler());//註冊handler
}
});
// 鏈接服務器
ChannelFuture channelFuture = bootstrap.connect().sync();
channelFuture.channel().closeFuture().sync();
} finally {
nioEventLoopGroup.shutdownGracefully().sync();
}
}
public static void main(String[] args) throws Exception {
new EchoClient("localhost", 20000).start();
}
}
EchoClientHandler.java
package send_order.client;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
// 客戶端連接服務器後被調用
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("客戶端連接服務器,開始發送數據……");
byte[] req = "QUERY TIME ORDER".getBytes();//消息
ByteBuf firstMessage = Unpooled.buffer(req.length);//發送類
firstMessage.writeBytes(req);//發送
ctx.writeAndFlush(firstMessage);//flush
}
// ? 從服務器接收到數據後調用
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg)
throws Exception {
System.out.println("client 讀取server數據..");
// 服務端返回消息後
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8");
System.out.println("服務端數據為 :" + body);
}
// ? 發生異常時被調用
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
System.out.println("client exceptionCaught..");
// 釋放資源
ctx.close();
}
}
3、結果展示
但是大家主要註意到我把源碼中的:
ch.pipeline().addLast(echoInHandler1);
ch.pipeline().addLast(echoOuteHandler1);
ch.pipeline().addLast(echoOutHandler2);
ch.pipeline().addLast(echoInHandler2);
改為:
ch.pipeline().addLast(echoInHandler1);
ch.pipeline().addLast(echoInHandler2);
ch.pipeline().addLast(echoOuteHandler1);
ch.pipeline().addLast(echoOutHandler2);
大家可以觀察到下面的結果:outhandler沒有起作用
服務器端
4、總結
在使用Handler的過程中,需要註意:
1、ChannelInboundHandler之間的傳遞,通過調用 ctx.fireChannelRead(msg) 實現;調用ctx.write(msg) 將傳遞到ChannelOutboundHandler。
2、ctx.write()方法執行後,需要調用flush()方法才能令它立即執行。
3、流水線pipeline中outhandler不能放在最後,否則不生效
4、Handler的消費處理放在最後壹個處理。
在此我向大家推薦壹個架構學習交流群。交流學習群號:938837867 暗號:555 裏面會分享壹些資深架構師錄制的視頻錄像:有Spring,MyBatis,Netty源碼分析,高並發、高性能、分布式、微服務架構的原理,JVM性能優化、分布式架構等這些成為架構師必備