熟悉使用netty,写博客备忘
1.首先写服务端代码,依赖的类后面体现
public class MessageServer {
public static void main(String[] args) {
ConstantStep.getInsatnce().printStep();
System.out.println("MessageServer start..");
ServerBootstrap bootstrap = new ServerBootstrap(
new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
// 与下面的addLast操作选择一个即可
bootstrap.setPipelineFactory(new MessageServerPipelineFactory());
//bootstrap.getPipeline().addLast("decoder", new MessageDecoder());
//bootstrap.getPipeline().addLast("encoder", new MessageEncoder());
//bootstrap.getPipeline().addLast("handler", new MessageServerHandler());
// Bind and start to accept incoming connections
bootstrap.bind(new InetSocketAddress("localhost",9550));
}
}
2.相应的客户端:
public class MessageClient {
public static void main(String[] args) {
ConstantStep.getInsatnce().printStep();
System.out.println("MessageClient start..");
start();
}
private static void start() {
String host = "127.0.0.1";
int port = 9550;
ClientBootstrap bootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
// Set up the event pipeline factory.
// 不能既有MessageServerPipelineFactory,又有bootstrap.getPipeline().addLast()这个方法
bootstrap.setPipelineFactory(new MessageClientPipelineFactory());
//bootstrap.getPipeline().addLast("decoder", new MessageDecoder());
//bootstrap.getPipeline().addLast("encoder", new MessageEncoder());
//bootstrap.getPipeline().addLast("handler", new MessageClientHandler());
// Start the connection attempt.
ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
// Wait until the connection is closed or the connection attempt fails
future.getChannel().getCloseFuture().awaitUninterruptibly();
// Shut down thread pools to exit
future.getChannel().write("--------Shut down thread pools to exit: future getChannel");
//release resource
bootstrap.releaseExternalResources();
}
}
3.服务端的handler
public class MessageServerHandler extends SimpleChannelUpstreamHandler {
private static final Logger logger = Logger.getLogger(MessageServerHandler.class);
private final AtomicLong transferredBytes = new AtomicLong();
@Override
public void messageReceived(ChannelHandlerContext cxt, MessageEvent event){
ConstantStep.getInsatnce().printStep();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
// /ChannelBuffer buffer = (ChannelBuffer) event.getMessage();
transferredBytes.addAndGet(((ChannelBuffer) event.getMessage()).readableBytes());
System.out.println("MessageServerHandler : messageReceived, length = "
+ ((ChannelBuffer) event.getMessage()).readableBytes()
+ transferredBytes.byteValue());
event.getChannel().write(event.getMessage());
System.out.println("Client: messageReceived ," + event.getMessage()
+ ",time = " + System.currentTimeMillis());
}
@Override
public void exceptionCaught(ChannelHandlerContext cxt, ExceptionEvent event) {
ConstantStep.getInsatnce().printStep();
logger.log(Level.WARNING, "******ServerHandler: exceptionCaught", event.getCause());
System.out.println("ServerHandler: exceptionCaught ******" + event.getCause().getLocalizedMessage());
event.getChannel().close();
}
@Override
public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent event)
throws Exception {
ConstantStep.getInsatnce().printStep();
System.out.println("ServerHandler........channelOpen, channel id = " + event.getChannel().getId());
super.channelOpen(ctx, event);
}
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
ConstantStep.getInsatnce().printStep();
System.out.println("ServerHandler........channelConnected, context name = " + ctx.getName());
super.channelConnected(ctx, e);
}
}
4.客户端的handler
public class MessageClientHandler extends SimpleChannelUpstreamHandler {
private static final Logger logger = Logger.getLogger(MessageClientHandler.class);
//private ChannelBuffer responseBuffer = ChannelBuffers.dynamicBuffer();
private static byte[] content = {'A','2','3','4','5','7','8','9','e','a','b','c','d','f','g','h','i','j'};
@Override
public void channelConnected(ChannelHandlerContext cxt, ChannelStateEvent event)
throws Exception {
ConstantStep.getInsatnce().printStep();
//System.out.println("当连接到服务器的时候,就开始发送256字节的字符串");
logger.info("channelConnected: client try to connect server...");
//当messageReceived接收到服务器的消息时,又把消息发送给服务器
event.getChannel().write(nextMessage());//this.getTestString(256)
super.channelConnected(cxt, event);
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) {
ConstantStep.getInsatnce().printStep();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
//ChannelBuffer buffer = (ChannelBuffer) event.getMessage();
//System.out.println("Client: messageReceived , buffer.readableBytes = " + buffer.readableBytes());
System.out.println("Client: messageReceived ," + event.getMessage()
+ ",time = " + System.currentTimeMillis());
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
event.getChannel().write(event.getMessage());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent event) {
ConstantStep.getInsatnce().printStep();
//logger.log(Level.WARNING, "CLIENT: exceptionCaught " , event.getCause());
System.out.println(event.getCause().getMessage());
ctx.getChannel().close();
}
public static String getTestString(int size) {
ConstantStep.getInsatnce().printStep();
StringBuilder sb = new StringBuilder(20);
for(int i=0; i<size; i++) {
sb.append("A");
}
return sb.toString();
}
public static ChannelBuffer nextMessage() {
ConstantStep.getInsatnce().printStep();
return ChannelBuffers.wrappedBuffer(content);
}
}
5.MessageServerPipelineFactory:
public class MessageServerPipelineFactory implements ChannelPipelineFactory {
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
//在链条 加上各种处理器 ,注意顺序
pipeline.addLast("handler", new MessageServerHandler());
pipeline.addLast("decoder", new ServerMessageDecoder());
pipeline.addLast("encoder", new ServerMessageEncoder());
return pipeline;
}
}
6.类似的MessageClientPipelineFactory
public class MessageClientPipelineFactory implements ChannelPipelineFactory {
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("handler", new MessageClientHandler());
pipeline.addLast("decoder", new MessageDecoder());
pipeline.addLast("encoder", new MessageEncoder());
return pipeline;
}
}
7.ServerMessageDecoder:
public class ServerMessageDecoder extends FrameDecoder {
@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel,
ChannelBuffer buffer) throws Exception {
ConstantStep.getInsatnce().printStep();
Thread.sleep(100);
System.out.println("ServerDDDDD: MessageDecoder: decode() ,msg : "
+ buffer.getClass()
+ ",time = " + System.currentTimeMillis());
//对应前面的编码,先读取4字节的int.如发现不够4字节,则直接返回null,累积到下一次读取。
//if(buffer.readableBytes()<4) return null;
//int dataLength = buffer.getInt(buffer.readerIndex());
//如果发现读取的数据长度不够,则累积到下一次读取
//if(buffer.readableBytes() < (dataLength + 4)) return null;
//buffer.skipBytes(4);
byte[] bytes = new byte[buffer.readableBytes()];
buffer.readBytes(bytes);
return new String(bytes);
}
}
8.ServerMessageEncoder:
public class ServerMessageEncoder extends OneToOneDecoder {
@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel,
Object msg) throws Exception {
ConstantStep.getInsatnce().printStep();
Thread.sleep(100);
System.out.println("Server EEEEE: MessageEncoder: decode() ,msg : "
+ msg.getClass() + ", info = " + msg
+ ",time = " + System.currentTimeMillis());
if(!(msg instanceof String)) {
return msg;
}
//开始编码:将要发送的字符串转换成字节,数据包头是一个4字节的int,后面就是字符串byte
String str = (String)msg;
byte[] bytes = str.getBytes();
int length = bytes.length;
ChannelBuffer cb = ChannelBuffers.dynamicBuffer();
cb.writeInt(length);
cb.writeBytes(bytes);
return cb;
//return msg;
}
}
9.MessageEncoder:
public class MessageEncoder extends OneToOneDecoder {
@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel,
Object msg) throws Exception {
ConstantStep.getInsatnce().printStep();
Thread.sleep(100);
System.out.println("EEEEE: MessageEncoder: decode() ,msg : "
+ msg.getClass() + ", info = " + msg
+ ",time = " + System.currentTimeMillis());
if(!(msg instanceof String)) {
return msg;
}
//开始编码:将要发送的字符串转换成字节,数据包头是一个4字节的int,后面就是字符串byte
String str = (String)msg;
byte[] bytes = str.getBytes();
int length = bytes.length;
ChannelBuffer cb = ChannelBuffers.dynamicBuffer();
cb.writeInt(length);
cb.writeBytes(bytes);
return cb;
//return msg;
}
}
10:MessageDecoder:
public class MessageDecoder extends FrameDecoder {
@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel,
ChannelBuffer buffer) throws Exception {
ConstantStep.getInsatnce().printStep();
Thread.sleep(100);//测试用
System.out.println("DDDDD: MessageDecoder: decode() ,msg : "
+ buffer.getClass()
+ ",time = " + System.currentTimeMillis());
//对应前面的编码,先读取4字节的int.如发现不够4字节,则直接返回null,累积到下一次读取。
//if(buffer.readableBytes()<4) return null;
//int dataLength = buffer.getInt(buffer.readerIndex());
//如果发现读取的数据长度不够,则累积到下一次读取
//if(buffer.readableBytes() < (dataLength + 4)) return null;
//buffer.skipBytes(4);
byte[] bytes = new byte[buffer.readableBytes()];
buffer.readBytes(bytes);
return new String(bytes);
}
}
11:ConstantStep
public class ConstantStep {
private int step = 0;
private static ConstantStep instance = new ConstantStep();
private ConstantStep() {
}
public synchronized static ConstantStep getInsatnce(){//
return instance;
}
public synchronized void printStep() {
System.out.println(" now step : " + step++);
}
}
相关推荐
netty基础类
Netty基础,用于学习Netty,参考黑马程序员的netty教程
java-netty图,分析选择器 java-netty图,分析IO java-netty图,分析管道 java-netty图,分析缓存
Netty学习资料,基础应用实战
设计基础知识,初学者可以参照
内容不仅包含Java NIO入门知识、Netty 的基础功能开发指导、编解码框架定制等,还包括私有协议栈定制和开发、Netty 核心类库源码分析,以及Netty 的架构剖析。 《Netty 权威指南(第2 版)》适合架构师、设计师、...
内容不仅包含Java NIO入门知识、Netty 的基础功能开发指导、编解码框架定制等,还包括私有协议栈定制和开发、Netty 核心类库源码分析,以及Netty 的架构剖析。 《Netty 权威指南(第2 版)》适合架构师、设计师、...
内容不仅包含Java NIO入门知识、Netty 的基础功能开发指导、编解码框架定制等,还包括私有协议栈定制和开发、Netty 核心类库源码分析,以及Netty 的架构剖析。 《Netty 权威指南(第2 版)》适合架构师、设计师、...
内容不仅包含Java NIO入门知识、Netty 的基础功能开发指导、编解码框架定制等,还包括私有协议栈定制和开发、Netty 核心类库源码分析,以及Netty 的架构剖析。 《Netty 权威指南(第2 版)》适合架构师、设计师、...
netty权威指南 第二版是一本Netty架构学习指南,由李林锋...通过阅读netty权威指南 第二版,读者不仅能够掌握Netty基础功能的使用和开发,更能够掌握Netty核心类库的原理和使用约束,从而在实际工作中更好地使用Netty。
内容不仅包含Java NIO入门知识、Netty 的基础功能开发指导、编解码框架定制等,还包括私有协议栈定制和开发、Netty 核心类库源码分析,以及Netty 的架构剖析。 《Netty 权威指南(第2 版)》适合架构师、设计师、...
Netty是由JBOSS提供的一个Java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的...下面这篇文章主要给大家介绍了关于Netty基础使用的相关资料,需要的朋友可以参考下。
内容不仅包含Java NIO入门知识、Netty 的基础功能开发指导、编解码框架定制等,还包括私有协议栈定制和开发、Netty 核心类库源码分析,以及Netty 的架构剖析。 《Netty 权威指南(第2 版)》适合架构师、设计师、...
netty案例,netty4.1基础入门篇六《NettyServer群发消息》源码 https://mp.weixin.qq.com/s?__biz=MzIxMDAwMDAxMw==&mid=2650724778&idx=1&sn=72e4b1ea5323475b16e99c6720c7069d&scene=19#wechat_redirect
netty案例,netty4.1基础入门篇七《嗨!NettyClient》源码 https://mp.weixin.qq.com/s?__biz=MzIxMDAwMDAxMw==&mid=2650724783&idx=1&sn=bc827e680ebd533fe67720fd695257be&scene=19#wechat_redirect
netty案例,netty4.1基础入门篇十一《netty udp通信方式案例Demo》源码 https://mp.weixin.qq.com/s?__biz=MzIxMDAwMDAxMw==&mid=2650724927&idx=1&sn=a16bc8e98d6a27816da0896adcc83778&scene=19#wechat_redirect
netty案例,netty4.1基础入门篇十二《简单实现一个Netty的Http服务》源码 https://mp.weixin.qq.com/s?__biz=MzIxMDAwMDAxMw==&mid=2650724932&idx=1&sn=eb1631ddbd0e7a0dcb3a655374631f48&scene=19#wechat_redirect
netty案例,netty4.1基础入门篇四《NettyServer收发数据》源码 https://mp.weixin.qq.com/s?__biz=MzIxMDAwMDAxMw==&mid=2650724766&idx=1&sn=a39a05c550f43467a3c3cedd070f8226&scene=19#wechat_redirect
基础篇:java的IO演进之路; BIO ;NIO;伪异步;NIO类库 ; 入门篇:Jetty简单应用入门;TCP粘包拆包;定位符和定长解码器; 中级篇:编解码技术和常用的序列化框架(protobuf /java/Marshalling) 高级篇:Http协议...
netty案例,netty4.1基础入门篇十《关于ChannelOutboundHandlerAdapter简单使用》源码 ...