这一节我们编写一个监视器:EventLogMonitor ,也就是用来接收事件的程序,用来代替 netcat 。EventLogMonitor 做下面事情:
* 接收 LogEventBroadcaster 广播的 UDP DatagramPacket
* 解码 LogEvent 消息
* 输出 LogEvent 消息
和之前一样,将实现自定义 ChannelHandler 的逻辑。图13.4描述了LogEventMonitor 的 ChannelPipeline 并表明了 LogEvent 的流经情况。
[![](https://box.kancloud.cn/2015-08-19_55d47aa7cd6d5.jpg)](https://github.com/waylau/essential-netty-in-action/blob/master/images/Figure%2013.4%20LogEventMonitor.jpg)
Figure 13.4 LogEventMonitor
图中显示我们的两个自定义 ChannelHandlers,LogEventDecoder 和 LogEventHandler。首先是负责将网络上接收到的 DatagramPacket 解码到 LogEvent 消息。清单13.6显示了实现。
Listing 13.6 LogEventDecoder
~~~
public class LogEventDecoder extends MessageToMessageDecoder<DatagramPacket> {
@Override
protected void decode(ChannelHandlerContext ctx, DatagramPacket datagramPacket, List<Object> out) throws Exception {
ByteBuf data = datagramPacket.content(); //1
int i = data.indexOf(0, data.readableBytes(), LogEvent.SEPARATOR); //2
String filename = data.slice(0, i).toString(CharsetUtil.UTF_8); //3
String logMsg = data.slice(i + 1, data.readableBytes()).toString(CharsetUtil.UTF_8); //4
LogEvent event = new LogEvent(datagramPacket.recipient(), System.currentTimeMillis(),
filename,logMsg); //5
out.add(event);
}
}
~~~
1. 获取 DatagramPacket 中数据的引用
2. 获取 SEPARATOR 的索引
3. 从数据中读取文件名
4. 读取数据中的日志消息
5. 构造新的 LogEvent 对象并将其添加到列表中
第二个 ChannelHandler 将执行一些首先创建的 LogEvent 消息。在这种情况下,我们只会写入 system.out。在真实的应用程序可能用到一个单独的日志文件或放到数据库。
下面的清单显示了 LogEventHandler。
Listing 13.7 LogEventHandler
~~~
public class LogEventHandler extends SimpleChannelInboundHandler<LogEvent> { //1
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace(); //2
ctx.close();
}
@Override
public void channelRead0(ChannelHandlerContext channelHandlerContext, LogEvent event) throws Exception {
StringBuilder builder = new StringBuilder(); //3
builder.append(event.getReceivedTimestamp());
builder.append(" [");
builder.append(event.getSource().toString());
builder.append("] [");
builder.append(event.getLogfile());
builder.append("] : ");
builder.append(event.getMsg());
System.out.println(builder.toString()); //4
}
}
~~~
1. 继承 SimpleChannelInboundHandler 用于处理 LogEvent 消息
2. 在异常时,输出消息并关闭 channel
3. 建立一个 StringBuilder 并构建输出
4. 打印出 LogEvent 的数据
LogEventHandler 打印出 LogEvent 的一个易读的格式,包括以下:
* 收到时间戳以毫秒为单位
* 发送方的 InetSocketAddress,包括IP地址和端口
* LogEvent 生成绝对文件名
* 实际的日志消息,代表在日志文件中一行
现在我们需要安装处理程序到 ChannelPipeline ,如图13.4所示。下一个清单显示了这是如何实现 LogEventMonitor 类的一部分。
Listing 13.8 LogEventMonitor
~~~
public class LogEventMonitor {
private final Bootstrap bootstrap;
private final EventLoopGroup group;
public LogEventMonitor(InetSocketAddress address) {
group = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(group) //1
.channel(NioDatagramChannel.class)
.option(ChannelOption.SO_BROADCAST, true)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new LogEventDecoder()); //2
pipeline.addLast(new LogEventHandler());
}
}).localAddress(address);
}
public Channel bind() {
return bootstrap.bind().syncUninterruptibly().channel(); //3
}
public void stop() {
group.shutdownGracefully();
}
public static void main(String[] args) throws Exception {
if (args.length != 1) {
throw new IllegalArgumentException("Usage: LogEventMonitor <port>");
}
LogEventMonitor monitor = new LogEventMonitor(new InetSocketAddress(Integer.parseInt(args[0]))); //4
try {
Channel channel = monitor.bind();
System.out.println("LogEventMonitor running");
channel.closeFuture().await();
} finally {
monitor.stop();
}
}
}
~~~
1. 引导 NioDatagramChannel。设置 SO_BROADCAST socket 选项。
2. 添加 ChannelHandler 到 ChannelPipeline
3. 绑定的通道。注意,在使用 DatagramChannel 是没有连接,因为这些 无连接
4. 构建一个新的 LogEventMonitor
- Introduction
- 开始
- Netty-异步和数据驱动
- Netty 介绍
- 构成部分
- 关于本书
- 第一个 Netty 应用
- 设置开发环境
- Netty 客户端/服务端 总览
- 写一个 echo 服务器
- 写一个 echo 客户端
- 编译和运行 Echo 服务器和客户端
- 总结
- Netty 总览
- Netty 快速入门
- Channel, Event 和 I/O
- 什么是 Bootstrapping 为什么要用
- ChannelHandler 和 ChannelPipeline
- 近距离观察 ChannelHandler
- 总结
- 核心功能
- Transport(传输)
- 案例研究:Transport 的迁移
- Transport API
- 包含的 Transport
- Transport 使用情况
- 总结
- Buffer(缓冲)
- Buffer API
- ByteBuf - 字节数据的容器
- 字节级别的操作
- ByteBufHolder
- ByteBuf 分配
- 总结
- ChannelHandler 和 ChannelPipeline
- ChannelHandler 家族
- ChannelPipeline
- ChannelHandlerContext
- 总结
- Codec 框架
- 什么是 Codec
- Decoder(解码器)
- Encoder(编码器)
- 抽象 Codec(编解码器)类
- 总结
- 提供了的 ChannelHandler 和 Codec
- 使用 SSL/TLS 加密 Netty 程序
- 构建 Netty HTTP/HTTPS 应用
- 空闲连接以及超时
- 解码分隔符和基于长度的协议
- 编写大型数据
- 序列化数据
- 总结
- Bootstrap 类型
- 引导客户端和无连接协议
- 引导服务器
- 从 Channel 引导客户端
- 在一个引导中添加多个 ChannelHandler
- 使用Netty 的 ChannelOption 和属性
- 关闭之前已经引导的客户端或服务器
- 总结
- 引导
- Bootstrap 类型
- 引导客户端和无连接协议
- 引导服务器
- 从 Channel 引导客户端
- 在一个引导中添加多个 ChannelHandler
- 使用Netty 的 ChannelOption 和属性
- 关闭之前已经引导的客户端或服务器
- 总结
- NETTY BY EXAMPLE
- 单元测试
- 总览
- 测试 ChannelHandler
- 测试异常处理
- 总结
- WebSocket
- WebSocket 程序示例
- 添加 WebSocket 支持
- 测试程序
- 总结
- SPDY
- SPDY 背景
- 示例程序
- 实现
- 启动 SpdyServer 并测试
- 总结
- 通过 UDP 广播事件
- UDP 基础
- UDP 广播
- UDP 示例
- EventLog 的 POJO
- 写广播器
- 写监视器
- 运行 LogEventBroadcaster 和 LogEventMonitor
- 总结
- 高级主题
- 实现自定义的编解码器
- 编解码器的范围
- 实现 Memcached 编解码器
- 了解 Memcached 二进制协议
- Netty 编码器和解码器
- 测试编解码器
- EventLoop 和线程模型
- 线程模型的总览
- EventLoop
- EventLoop
- I/O EventLoop/Thread 分配细节
- 总结
- 用例1:Droplr Firebase 和 Urban Airship
- 用例2:Facebook 和 Twitter