當前位置:編程學習大全網 - 源碼下載 - Netty--handler的執行順序

Netty--handler的執行順序

1. 簡介

Handler在netty中,無疑占據著非常重要的地位。Handler與Servlet中的filter很像,通過Handler可以完成通訊報文的解碼編碼、攔截指定的報文、統壹對日誌錯誤進行處理、統壹對請求進行計數、控制Handler執行與否。壹句話,沒有它做不到的只有妳想不到的。

Netty中的所有handler都實現自ChannelHandler接口。按照輸出輸出來分,分為ChannelInboundHandler、ChannelOutboundHandler兩大類。ChannelInboundHandler對從客戶端發往服務器的報文進行處理,壹般用來執行解碼、讀取客戶端數據、進行業務處理等;ChannelOutboundHandler對從服務器發往客戶端的報文進行處理,壹般用來進行編碼、發送報文到客戶端。

Netty中,可以註冊多個handler。ChannelInboundHandler按照註冊的先後順序執行;ChannelOutboundHandler按照註冊的先後順序逆序執行,如下圖所示,按照註冊的先後順序對Handler進行排序,request進入Netty後的執行順序為:

2、demo案例

如上即為本測試demo的例子:

2.1、服務器端

EchoServer.java

package com.example.nettynew.server;

import com.example.nettynew.hander.EchoInHandler1;

import com.example.nettynew.hander.EchoInHandler2;

import com.example.nettynew.hander.EchoOutHandler2;

import com.example.nettynew.hander.EchoOuteHandler1;

import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.Channel;

import io.netty.channel.ChannelFuture;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.EventLoopGroup;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.nio.NioServerSocketChannel;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Component;

/**

* ? 配置服務器功能,如線程、端口 ? 實現服務器處理程序,它包含業務邏輯,決定當有壹個請求連接或接收數據時該做什麽

*/

@Component

public class EchoServer {

@Autowired

EchoInHandler1 echoInHandler1;

@Autowired

EchoInHandler2 echoInHandler2;

@Autowired

EchoOuteHandler1 echoOuteHandler1;

@Autowired

EchoOutHandler2 echoOutHandler2;

public void start() throws Exception {

EventLoopGroup eventLoopGroup = null;

try {

//server端引導類

ServerBootstrap serverBootstrap = new ServerBootstrap();

//連接池處理數據

eventLoopGroup = new NioEventLoopGroup();

serverBootstrap.group(eventLoopGroup)

.channel(NioServerSocketChannel.class)//指定通道類型為NioServerSocketChannel,壹種異步模式,OIO阻塞模式為OioServerSocketChannel

.localAddress("localhost", 8099)//設置InetSocketAddress讓服務器監聽某個端口已等待客戶端連接。

.childHandler(new ChannelInitializer<Channel>() {//設置childHandler執行所有的連接請求

@Override

protected void initChannel(Channel ch) throws Exception {

// 註冊兩個InboundHandler,執行順序為註冊順序,所以應該是InboundHandler1 InboundHandler2

// 註冊兩個OutboundHandler,執行順序為註冊順序的逆序,所以應該是OutboundHandler2 OutboundHandler1

ch.pipeline().addLast(echoInHandler1);

ch.pipeline().addLast(echoOuteHandler1);

ch.pipeline().addLast(echoOutHandler2);

ch.pipeline().addLast(echoInHandler2);

}

});

// 最後綁定服務器等待直到綁定完成,調用sync()方法會阻塞直到服務器完成綁定,然後服務器等待通道關閉,因為使用sync(),所以關閉操作也會被阻塞。

ChannelFuture channelFuture = serverBootstrap.bind().sync();

System.out.println("開始監聽,端口為:" + channelFuture.channel().localAddress());

channelFuture.channel().closeFuture().sync();

} finally {

eventLoopGroup.shutdownGracefully().sync();

}

}

}

EchoInHandler1.java

package com.example.nettynew.hander;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.ChannelInboundHandlerAdapter;

import org.springframework.stereotype.Component;

@Component

public class EchoInHandler1 extends ChannelInboundHandlerAdapter {

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg) {

System.out.println("in1");

// 通知執行下壹個InboundHandler

ctx.fireChannelRead(msg);

}

@Override

public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

ctx.flush();//刷新後才將數據發出到SocketChannel

}

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)

throws Exception {

cause.printStackTrace();

ctx.close();

}

}

EchoInHandler2.java

package com.example.nettynew.hander;

import io.netty.buffer.ByteBuf;

import io.netty.buffer.Unpooled;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.ChannelInboundHandlerAdapter;

import org.springframework.stereotype.Component;

import java.util.Date;

@Component

public class EchoInHandler2 extends ChannelInboundHandlerAdapter {

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg)

throws Exception {

System.out.println("in2");

ByteBuf buf = (ByteBuf) msg;

byte[] req = new byte[buf.readableBytes()];

buf.readBytes(req);

String body = new String(req, "UTF-8");

System.out.println("接收客戶端數據:" + body);

//向客戶端寫數據

System.out.println("server向client發送數據");

String currentTime = new Date(System.currentTimeMillis()).toString();

ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());

ctx.write(resp);

}

@Override

public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

ctx.flush();//刷新後才將數據發出到SocketChannel

}

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)

throws Exception {

cause.printStackTrace();

ctx.close();

}

}

EchoOuteHandler1.java

package com.example.nettynew.hander;

import java.util.Date;

import io.netty.buffer.ByteBuf;

import io.netty.buffer.Unpooled;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.ChannelOutboundHandlerAdapter;

import io.netty.channel.ChannelPromise;

import org.springframework.stereotype.Component;

@Component

public class EchoOuteHandler1 extends ChannelOutboundHandlerAdapter {

@Override

// 向client發送消息

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {

System.out.println("out1");

/*System.out.println(msg);*/

String currentTime = new Date(System.currentTimeMillis()).toString();

ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());

ctx.write(resp);

ctx.flush();

}

}

EchoOutHandler2.java

package com.example.nettynew.hander;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.ChannelOutboundHandlerAdapter;

import io.netty.channel.ChannelPromise;

import org.springframework.stereotype.Component;

@Component

public class EchoOutHandler2 extends ChannelOutboundHandlerAdapter {

@Override

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {

System.out.println("out2");

// 執行下壹個OutboundHandler

/*System.out.println("at first..msg = "+msg);

msg = "hi newed in out2";*/

super.write(ctx, msg, promise);

}

}

2.2、客戶端

EchoClient.java

package send_order.client;

import io.netty.bootstrap.Bootstrap;

import io.netty.channel.ChannelFuture;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.EventLoopGroup;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.SocketChannel;

import io.netty.channel.socket.nio.NioSocketChannel;

import java.net.InetSocketAddress;

/**

* ? 連接服務器 ? 寫數據到服務器 ? 等待接受服務器返回相同的數據 ? 關閉連接

*

*

*

*/

public class EchoClient {

private final String host;

private final int port;

public EchoClient(String host, int port) {

this.host = host;

this.port = port;

}

public void start() throws Exception {

EventLoopGroup nioEventLoopGroup = null;

try {

// 客戶端引導類

Bootstrap bootstrap = new Bootstrap();

// EventLoopGroup可以理解為是壹個線程池,這個線程池用來處理連接、接受數據、發送數據

nioEventLoopGroup = new NioEventLoopGroup();

bootstrap.group(nioEventLoopGroup)//多線程處理

.channel(NioSocketChannel.class)//指定通道類型為NioServerSocketChannel,壹種異步模式,OIO阻塞模式為OioServerSocketChannel

.remoteAddress(new InetSocketAddress(host, port))//地址

.handler(new ChannelInitializer<SocketChannel>() {//業務處理類

@Override

protected void initChannel(SocketChannel ch)

throws Exception {

ch.pipeline().addLast(new EchoClientHandler());//註冊handler

}

});

// 鏈接服務器

ChannelFuture channelFuture = bootstrap.connect().sync();

channelFuture.channel().closeFuture().sync();

} finally {

nioEventLoopGroup.shutdownGracefully().sync();

}

}

public static void main(String[] args) throws Exception {

new EchoClient("localhost", 20000).start();

}

}

EchoClientHandler.java

package send_order.client;

import io.netty.buffer.ByteBuf;

import io.netty.buffer.Unpooled;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.SimpleChannelInboundHandler;

public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {

// 客戶端連接服務器後被調用

@Override

public void channelActive(ChannelHandlerContext ctx) throws Exception {

System.out.println("客戶端連接服務器,開始發送數據……");

byte[] req = "QUERY TIME ORDER".getBytes();//消息

ByteBuf firstMessage = Unpooled.buffer(req.length);//發送類

firstMessage.writeBytes(req);//發送

ctx.writeAndFlush(firstMessage);//flush

}

// ? 從服務器接收到數據後調用

@Override

protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg)

throws Exception {

System.out.println("client 讀取server數據..");

// 服務端返回消息後

ByteBuf buf = (ByteBuf) msg;

byte[] req = new byte[buf.readableBytes()];

buf.readBytes(req);

String body = new String(req, "UTF-8");

System.out.println("服務端數據為 :" + body);

}

// ? 發生異常時被調用

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)

throws Exception {

System.out.println("client exceptionCaught..");

// 釋放資源

ctx.close();

}

}

3、結果展示

但是大家主要註意到我把源碼中的:

ch.pipeline().addLast(echoInHandler1);

ch.pipeline().addLast(echoOuteHandler1);

ch.pipeline().addLast(echoOutHandler2);

ch.pipeline().addLast(echoInHandler2);

改為:

ch.pipeline().addLast(echoInHandler1);

ch.pipeline().addLast(echoInHandler2);

ch.pipeline().addLast(echoOuteHandler1);

ch.pipeline().addLast(echoOutHandler2);

大家可以觀察到下面的結果:outhandler沒有起作用

服務器端

4、總結

在使用Handler的過程中,需要註意:

1、ChannelInboundHandler之間的傳遞,通過調用 ctx.fireChannelRead(msg) 實現;調用ctx.write(msg) 將傳遞到ChannelOutboundHandler。

2、ctx.write()方法執行後,需要調用flush()方法才能令它立即執行。

3、流水線pipeline中outhandler不能放在最後,否則不生效

4、Handler的消費處理放在最後壹個處理。

在此我向大家推薦壹個架構學習交流群。交流學習群號:938837867 暗號:555 裏面會分享壹些資深架構師錄制的視頻錄像:有Spring,MyBatis,Netty源碼分析,高並發、高性能、分布式、微服務架構的原理,JVM性能優化、分布式架構等這些成為架構師必備

  • 上一篇:電腦系統有哪些?
  • 下一篇:wp出現 Error 404 - Not Found怎麽辦
  • copyright 2024編程學習大全網