Netty高级
本博客根据黑马netty教程学习而做的笔记(高级),链接如下:黑马程序员Netty全套教程,全网最全Netty深入浅出教程,Java网络编程的王者
# 一、概述
# 1.什么是Netty
Netty 是一个异步的、基于事件驱动的网络应用框架,用于快速开发可维护、高性能的网络服务器和客户端
# 2.Netty的优势
如果使用传统NIO,其工作量大,bug 多
- 需要自己构建协议
- 解决 TCP 传输问题,如粘包、半包
- 因为bug的存在,epoll 空轮询导致 CPU 100%
Netty 对 API 进行增强,使之更易用,如
- FastThreadLocal => ThreadLocal
- ByteBuf => ByteBuffer
# 二、入门案例
客户端
public class HelloClient {
public static void main(String[] args) throws InterruptedException {
// 1、启动器,负责组装netty组件,启动服务
new Bootstrap()
// 2、BossEvenLoop,WorkerEventLoop(selector,thread),group 组
.group(new NioEventLoopGroup())
// 3、选择客户端的 NioSocketChannel 类型
.channel(NioSocketChannel.class)
// 4、boss 负责处理连接worker(child) 负责处理器读写,决定worker能执行那些操作
.handler(
// 5、Channel代表和客户端进行数据读写的通道,初始化,负责添加别的handler
new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel sc) throws Exception {
// 添加编码 handler ,ByteBuf
sc.pipeline().addLast(new StringEncoder());
}
})
// connect异步非阻塞,正在执行连接的是nio线程
.connect(new InetSocketAddress("localhost",8080))
.sync()
.channel()
.writeAndFlush("hello, world");
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
服务器
public class HelloServer {
public static void main(String[] args) {
// 1、启动器,负责组装netty组件,启动服务
new ServerBootstrap()
// 2、BossEvenLoop,WorkerEventLoop(selector,thread),group 组
.group(new NioEventLoopGroup())
// 3、选择服务器的 ServerSocketChannel 类型
.channel(NioServerSocketChannel.class)
// 4、boss 负责处理连接worker(child) 负责处理器读写,决定worker能执行那些操作
.childHandler(
// 5、Channel代表和客户端进行数据读写的通道,初始化,负责添加别的handler
new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel sc) throws Exception {
// 添加解码handler ,ByteBuf
sc.pipeline().addLast(new StringDecoder());
// 添加自定义的handler
sc.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(msg);
}
});
}
})
.bind(8080);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
运行流程
# 三、组件
# 1.EventLoop
EventLoop 本质是一个单线程执行器(同时维护了一个 Selector),里面有 run 方法处理一个或多个 Channel 上源源不断的 io 事件
事件循环组 EventLoopGroup EventLoopGroup 是一组 EventLoop,Channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop,后续这个 Channel 上的 io 事件都由此 EventLoop 来处理(保证了 io 事件处理时的线程安全)
切换线程
由上面的图可以看出,当handler中绑定的Group不同时,需要切换Group来执行不同的任务
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
// 获得下一个EventLoop, excutor 即为 EventLoopGroup
EventExecutor executor = next.executor();
// 如果下一个EventLoop 在当前的 EventLoopGroup中
if (executor.inEventLoop()) {
// 使用当前 EventLoopGroup 中的 EventLoop 来处理任务
next.invokeChannelRead(m);
} else {
// 否则让另一个 EventLoopGroup 中的 EventLoop 来创建任务并执行
executor.execute(new Runnable() {
public void run() {
next.invokeChannelRead(m);
}
});
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
- 如果两个 handler 绑定的是同一个EventLoopGroup,那么就直接调用
- 否则,把要调用的代码封装为一个任务对象,由下一个 handler 的 EventLoopGroup 来调用
# 2.Channel
Channel 的常用方法
- close() 可以用来关闭Channel
- closeFuture() 用来处理 Channel 的关闭
- sync 方法作用是同步等待 Channel 关闭
- addListener 方法是异步等待 Channel 关闭
- pipeline() 方法用于添加处理器
- write() 方法将数据写入
- 因为缓冲机制,数据被写入到 Channel 中以后,不会立即被发送
- 只有当缓冲满了或者调用了flush()方法后,才会将数据通过 Channel 发送出去
- writeAndFlush() 方法将数据写入并立即发送(刷出)
当我们要关闭channel时,可以调用channel.close()方法进行关闭。但是该方法也是一个异步方法。真正的关闭操作并不是在调用该方法的线程中执行的,而是在NIO线程中执行真正的关闭操作
如果我们想在channel真正关闭以后,执行一些额外的操作,可以选择以下两种方法来实现
- 第一种是通过channel.closeFuture()方法获得对应的ChannelFuture对象,然后调用sync()方法同步阻塞执行操作的线程,
- 第二种调用closeFuture.addListener方法,异步添加任务 close的后续操作
# 3.Future & Promise
netty 中的 Future 与 jdk 中的 Future 同名,但是是两个接口 netty 的 Future 继承自 jdk 的 Future,而 Promise 又对 netty Future 进行了扩展
- jdk Future 只能同步等待任务结束(或成功、或失败)才能得到结果
- netty Future 可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束
- netty Promise 不仅有 netty Future 的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器
Future:可以同步获取异步获取结果
@Slf4j
public class TestNettyFuture {
public static void main(String[] args) throws ExecutionException, InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();
EventLoop eventLoop = group.next();
Future<Integer> future = eventLoop.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
log.debug("子线程");
Thread.sleep(1000);
return 70;
}
});
// 1.同步阻塞获取
Integer integer = future.get();
log.debug("主线程:{}",integer);
// 2.异步非阻塞获取
/* future.addListener(new GenericFutureListener<Future<? super Integer>>() {
@Override
public void operationComplete(Future<? super Integer> future) throws Exception {
Object now = future.getNow();
log.debug("主线程:{}",now);
}
});*/
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
Promise:可以用于存放各个线程中的结果,然后让其他线程去获取该结果
@Slf4j
public class TestNettyPromise {
public static void main(String[] args) throws ExecutionException, InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();
EventLoop eventLoop = group.next();
DefaultPromise promise = new DefaultPromise<>(eventLoop);
new Thread(()->{
log.debug("开始计算");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
promise.setSuccess(80);
}).start();
// 1.同步阻塞获取
/* Object o = promise.get();
log.debug("主线程:{}",o);*/
// 2.异步非阻塞获取
promise.addListener(new GenericFutureListener<Future<? super Integer>>() {
@Override
public void operationComplete(Future<? super Integer> future) throws Exception {
Object now = future.getNow();
log.debug("主线程:{}",now);
}
});
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 4.Handler & Pipeline
Pipeline:相当于流水线 Handler:相当于流水线上的工序
- ChannelInboundHandlerAdapter 入栈,读取客户端数据,写会结果
- ChannelOutboundHandlerAdapter 出栈,对写回的结果进行加工
@Slf4j
public class TestPipeline {
public static void main(String[] args) {
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel sc) throws Exception {
ChannelPipeline pipeline = sc.pipeline();
// 默认会加一个head 和 tail
// 自己手动加的是在两个之间
// head -> h1 -> h2 -> h3 -> h6 -> h5 -> h4 -> tail
pipeline.addLast("h1",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("1");
// 将数据传给下一个 pipeline
super.channelRead(ctx, msg);
}
});
pipeline.addLast("h2",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("2");
super.channelRead(ctx, msg);
}
});
pipeline.addLast("h3",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("3");
super.channelRead(ctx, msg);
// 执行 writeAndFlush方法,才能触发出栈的方法
sc.writeAndFlush(ctx.alloc().buffer().writeBytes("=========".getBytes()));
}
});
pipeline.addLast("h4",new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("4");
super.write(ctx, msg, promise);
}
});
pipeline.addLast("h5",new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("5");
super.write(ctx, msg, promise);
}
});
pipeline.addLast("h6",new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("6");
super.write(ctx, msg, promise);
}
});
}
})
.bind(8080);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
handler需要放入通道的pipeline中,才能根据放入顺序来使用handler
pipeline是结构是一个带有head与tail指针的双向链表,其中的节点为handler 要通过ctx.fireChannelRead(msg)等方法,将当前handler的处理结果传递给下一个handler 当有入站(Inbound)操作时,会从head开始向后调用handler,直到handler不是处理Inbound操作为止 当有出站(Outbound)操作时,会从tail开始向前调用handler,直到handler不是处理Outbound操作为止
具体结构如下
调用顺序
# 5.ByteBuf
创建
public class ByteBufStudy {
public static void main(String[] args) {
// 创建ByteBuf
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16);
ByteBufUtil.log(buffer);
// 向buffer中写入数据
StringBuilder sb = new StringBuilder();
for(int i = 0; i < 20; i++) {
sb.append("a");
}
buffer.writeBytes(sb.toString().getBytes(StandardCharsets.UTF_8));
// 查看写入结果
ByteBufUtil.log(buffer);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
直接内存与堆内存 优缺点:
- 直接内存创建和销毁的代价昂贵,但读写性能高(少一次内存复制),适合配合池化功能一起用
- 直接内存对 GC 压力小,因为这部分内存不受 JVM 垃圾回收的管理,但也要注意及时主动释放
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16);// 默认系统内存
ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(16);// 堆内存
ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(16);// 系统内存
2
3
池化vs非池化 池化的最大意义在于可以重用 ByteBuf
- 没有池化,则每次都得创建新的 ByteBuf 实例,这个操作对直接内存代价昂贵,就算是堆内存,也会增加 GC 压力
- 有了池化,则可以重用池中 ByteBuf 实例,并且采用了与 jemalloc 类似的内存分配算法提升分配效率
- 高并发时,池化功能更节约内存,减少内存溢出的可能
-Dio.netty.allocator.type={unpooled|pooled}
组成
- 读写操作不同于ByteBuffer只用position进行控制,ByteBuf分别由读指针和写指针两个指针控制。进行读写操作时,无需进行模式的切换
- 读指针前的部分被称为废弃部分,是已经读过的内容
- 读指针与写指针之间的空间称为可读部分
- 写指针与当前容量之间的空间称为可写部分
扩容 当ByteBuf中的容量无法容纳写入的数据时,会进行扩容操作
扩容规则
- 如何写入后数据大小未超过 512 字节,则选择下一个 16 的整数倍进行扩容
- 例如写入后大小为 12 字节,则扩容后 capacity 是 16 字节
- 如果写入后数据大小超过 512 字节,则选择下一个 2n
- 例如写入后大小为 513 字节,则扩容后 capacity 是 210=1024 字节(29=512 已经不够了)
- 扩容不能超过 maxCapacity,否则会抛出java.lang.IndexOutOfBoundsException异常
释放
由于 Netty 中有堆外内存(直接内存)的 ByteBuf 实现,堆外内存最好是手动来释放,而不是等 GC 垃圾回收。
UnpooledHeapByteBuf 使用的是 JVM 内存,只需等 GC 回收内存即可
UnpooledDirectByteBuf 使用的就是直接内存了,需要特殊的方法来回收内存
PooledByteBuf 和它的子类使用了池化机制,需要更复杂的规则来回收内存 Netty 这里采用了引用计数法来控制回收内存,每个 ByteBuf 都实现了 ReferenceCounted 接口
每个 ByteBuf 对象的初始计数为 1
调用 release 方法计数减 1,如果计数为 0,ByteBuf 内存被回收
调用 retain 方法计数加 1,表示调用者没用完之前,其它 handler 即使调用了 release 也不会造成回收
当计数为 0 时,底层内存会被回收,这时即使 ByteBuf 对象还在,其各个方法均无法正常使用
因为 pipeline 的存在,一般需要将 ByteBuf 传递给下一个 ChannelHandler,如果在每个 ChannelHandler 中都去调用 release ,就失去了传递性(如果在这个 ChannelHandler 内这个 ByteBuf 已完成了它的使命,那么便无须再传递) 基本规则是,谁是最后使用者,谁负责 release
入站 ByteBuf 处理原则
- 对原始 ByteBuf 不做处理,调用 ctx.fireChannelRead(msg) 向后传递,这时无须 release
- 将原始 ByteBuf 转换为其它类型的 Java 对象,这时 ByteBuf 就没用了,必须 release
- 如果不调用 ctx.fireChannelRead(msg) 向后传递,那么也必须 release
- 注意各种异常,如果 ByteBuf 没有成功传递到下一个 ChannelHandler,必须 release
- 假设消息一直向后传,那么 TailContext 会负责释放未处理消息(原始的 ByteBuf)
出站 ByteBuf 处理原则
- 出站消息最终都会转为 ByteBuf 输出,一直向前传,由 HeadContext flush 后 release
异常处理原则
- 有时候不清楚 ByteBuf 被引用了多少次,但又必须彻底释放,可以循环调用 release 直到返回 true
切片
- ByteBuf切片是【零拷贝】的体现之一,对原始 ByteBuf 进行切片成多个 ByteBuf,切片后的 ByteBuf 并没有发生内存复制,还是使用原始 ByteBuf 的内存,切片后的 ByteBuf 维护独立的 read,write 指针得到分片后的buffer后,要调用其retain方法,使其内部的引用计数加一。避免原ByteBuf释放,导致切片buffer无法使用 修改原ByteBuf中的值,也会影响切片后得到的ByteBuf
优势
- 池化思想 - 可以重用池中 ByteBuf 实例,更节约内存,减少内存溢出的可能
- 读写指针分离,不需要像 ByteBuffer 一样切换读写模式
- 可以自动扩容
- 支持链式调用,使用更流畅
- 很多地方体现零拷贝,例如
- slice、duplicate、CompositeByteBuf
# 四、应用
# 黏包,半包
粘包
- 现象
- 发送 abc def,接收 abcdef
- 原因
- 应用层
- 接收方 ByteBuf 设置太大(Netty 默认 1024)
- 传输层-网络层
- 滑动窗口:假设发送方 256 bytes 表示一个完整报文,但由于接收方处理不及时且窗口大小足够大(大于256 bytes),这 256 bytes 字节就会缓冲在接收方的滑动窗口中,当滑动窗口中缓冲了多个报文就会粘包
- Nagle 算法:会造成粘包
- 应用层
- 现象
半包
- 现象
- 发送 abcdef,接收 abc def
- 原因
- 应用层
- 接收方 ByteBuf 小于实际发送数据量
- 传输层-网络层
- 滑动窗口:假设接收方的窗口只剩了 128 bytes,发送方的报文大小是 256 bytes,这时接收方窗口中无法容纳发送方的全部报文,发送方只能先发送前 128 bytes,等待 ack 后才能发送剩余部分,这就造成了半包
- 数据链路层
- MSS 限制:当发送的数据超过 MSS 限制后,会将数据切分发送,就会造成半包
- 应用层
- 现象
发生粘包与半包现象的本质是因为 TCP 是流式协议,消息无边界
解决方案
定长解码器
- 客户端于服务器约定一个最大长度,保证客户端每次发送的数据长度都不会大于该长度。若发送数据长度不足则需要补齐至该长度 服务器接收数据时,将接收到的数据按照约定的最大长度进行拆分,即使发送过程中产生了粘包,也可以通过定长解码器将数据正确地进行拆分。服务端需要用到FixedLengthFrameDecoder对数据进行定长解码,具体使用方法如下
行解码器
- 行解码器的是通过分隔符对数据进行拆分来解决粘包半包问题的
长度字段解码器(类型http协议)
- 在传送数据时可以在数据中添加一个用于表示有用数据长度的字段,在解码时读取出这个用于表明长度的字段,同时读取其他相关参数,即可知道最终需要的数据是什么样子的
参数解析
- maxFrameLength 最大长度
- lengthFieldOffset 长度字段偏移量
- lengthFieldLength 长度字段的字节数
- lengthAdjustment 长度调整
- initialBytesToStrip 切除前面部分
# 2、协议设计与解析
TCP/IP 中消息传输基于流的方式,没有边界 协议的目的就是划定消息的边界,制定通信双方要共同遵守的通信规则
Redis协议
@Slf4j
public class TestRedisClient {
public static void main(String[] args) {
final byte[] LINE = {13,10};
NioEventLoopGroup worker = new NioEventLoopGroup();
try{
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(worker);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler());
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelActive(ChannelHandlerContext ctx) {
ByteBuf buffer = ctx.alloc().buffer();
buffer.writeBytes("*3".getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes("#3".getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes("set".getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes("$4".getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes("name".getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes("$8".getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes("zhangsan".getBytes());
buffer.writeBytes(LINE);
ctx.writeAndFlush(buffer);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
String s = buf.toString(Charset.defaultCharset());
log.debug(s);
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("vm.syyo.com", 6379).sync();
channelFuture.channel().closeFuture().sync();
}catch (InterruptedException e){
log.debug("error,{}",e);
}finally {
worker.shutdownGracefully();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
HTTP协议
@Slf4j
public class TestHttpServer {
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
try{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss,worker);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
// http 解码器
ch.pipeline().addLast(new HttpServerCodec());
// 1.读的处理器
/* ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("{}",msg.getClass());
if (msg instanceof HttpRequest){// 请求头
}else if (msg instanceof HttpContent){// 请求体
}
}
});*/
// 2.
ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws Exception {
// 获取请求
log.debug("{}",msg.uri());
byte[] bytes = "<h1>hello world!</h1>".getBytes();
// 响应
DefaultFullHttpResponse response = new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK);
response.headers().setInt(CONTENT_LENGTH,bytes.length);
response.content().writeBytes(bytes);
// 写入channel
ctx.writeAndFlush(response);
}
});
}
});
ChannelFuture channelFuture = serverBootstrap.bind( 8080).sync();
channelFuture.channel().closeFuture().sync();
}catch (InterruptedException e){
log.debug("error,{}",e);
}finally {
worker.shutdownGracefully();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# Sharable
组件复用注解 有线程安全问题的就不能用
# 五、优化
# 六、源码
// netty 中使用 NioEventLoopGroup (简称 nio boss 线程)来封装线程和 selector
Selector selector = Selector.open();
// 创建 NioServerSocketChannel,同时会初始化它关联的 handler,以及为原生 ssc 存储 config
NioServerSocketChannel attachment = new NioServerSocketChannel();
// 创建 NioServerSocketChannel 时,创建了 java 原生的 ServerSocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
// 启动 nio boss 线程执行接下来的操作
//注册(仅关联 selector 和 NioServerSocketChannel),未关注事件
SelectionKey selectionKey = serverSocketChannel.register(selector, 0, attachment);
// head -> 初始化器 -> ServerBootstrapAcceptor -> tail,初始化器是一次性的,只为添加 acceptor
// 绑定端口
serverSocketChannel.bind(new InetSocketAddress(8080));
// 触发 channel active 事件,在 head 中关注 op_accept 事件
selectionKey.interestOps(SelectionKey.OP_ACCEPT);
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23







