0%

nio

回过头补点基础.

nio的不同

nio是面向缓冲区的.在io中,数据传输是”流”的模型,单向的,直接面向数据本身(通常意义的流,不是指那几个特别的).而在nio中,数据传输使用buffer,缓冲区.buffer类似于火车,而传输通道channel则是铁轨.这个通道是双向的,buffer在上面来回传输.

buffer

buffer底层就是一个数组,用于存储不同类型的数据.具体来说,基本类型中,除boolean之外,都有对应的buffer.但是常用的就是bytebuffer,因为这个大小传输数据正合适.

分配一个指定大小的缓冲区:ByteBuffer.allocate(1024)

buffer的几个关键属性:

  • capacity:容量
  • limit:界限,缓冲区中可操作的范围,limit之后的数组内容不可操作.
  • position:位置

当分配1024大小后,上面三个值为1024,1024,0.

接着使用put方法向数组放入五个字节数据,position变为5,其它不变.下一次写入从下标5开始.

如果要写数据,则要使用flip方法将position重置为0,即从0开始读取数据.limit变为5,表示从5开始的地方不可读取.

接着就可以使用get方法读取数据.

image-20200801104708362

get方法无参数,表示读取position位置的数据,然后position加一.

dst表示读取到哪个数组去.

读取方法会使得position相应增加.

使用rewind方法可以恢复position位置.可以从头开始读.

使用clear方法可以清空缓冲区,恢复最初状态.但是实际只是清空指针位置,数据本身还在里面.

还有另一个属性mark,mark方法用来标记position位置.当position发生变动后,可以使用reset方法使position回到mark标记的位置.

remaining方法返回缓冲区可操作数据的个数,就是limit减去position.

  • allocate分配的非直接缓冲区建立在JVM内存中
  • allocateDirect分配的直接缓冲区建立在物理内存中.

通道

通道channel只能与buffer交互.负责数据传输.

Channel接口有四个实现类:

  • FileChannel
  • SocketChannel
  • ServerSocketChannel
  • DatagramChannel

第一个是本地传输,第二第三个是tdp传输,第四个是udp传输.

获取通道:

1.针对支持通道的类提供了getChannel方法:

  • 本地IO
    • FileInputStream/FileOutputStream
    • RandomAccessFile
  • 网络IO
    • Socket
    • ServerSocket
    • DatagramSocket

2.NIO2中为各个通道提供了静态方法open

3.NIO2的工具类Files的newByteChannel方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Test
void test() throws IOException {
FileInputStream fis = new FileInputStream("F:\\迷路帖.jpg");
FileOutputStream fos = new FileOutputStream("F:\\迷路帖1.jpg");

FileChannel inChannel = fis.getChannel();
FileChannel outChannel = fos.getChannel();

ByteBuffer buf = ByteBuffer.allocate(1024);
// 从通道读入缓冲区
while (inChannel.read(buf) != -1) { // 切换为读数据模式(将position置为0)
buf.flip();
// 写入
outChannel.write(buf);
// 清空缓冲区
buf.clear();
}
outChannel.close();
inChannel.close();
fos.close();
fis.close();
}

上面演示使用通道完成的文件复制,用的是非直接缓冲区.实际和原来的IO没多大区别.保留这种对流的操作方式纯粹是为了兼容.

现在用新的NIO2的open方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Test
void test2() throws IOException {
// 搭建通道
FileChannel inChannel = FileChannel.open(Paths.get("F:\\迷路帖.jpg"), StandardOpenOption.READ);
FileChannel outChannel = FileChannel.open(Paths.get("F:\\迷路帖1.jpg"), StandardOpenOption.WRITE,StandardOpenOption.READ, StandardOpenOption.CREATE_NEW);

// 内存映射文件(在物理内存中的缓冲区,只有ByteBuffer支持这种写法),与allocateDirect方法效果一样,MappedByteBuffer是ByteBuffer的子类
MappedByteBuffer inMapBuf = inChannel.map(MapMode.READ_ONLY, 0, inChannel.size());
// 第一个参数是设置缓冲区的读写模式,这里有只读或者可读可写模式,而为了写,连带着要可读,所以outChannel要加一个可读
MappedByteBuffer outMapBuf = outChannel.map(MapMode.READ_WRITE, 0, inChannel.size());

// 用一个数组从内存映射文件中取出数据再写入另一个内存映射文件
byte[] dts = new byte[inMapBuf.limit()];
inMapBuf.get(dts);
outMapBuf.put(dts);

inChannel.close();
outChannel.close();
}

下面是更精简的写法:

1
2
3
4
5
6
7
8
9
10
@Test
void test3() throws IOException {
// 搭建通道
FileChannel inChannel = FileChannel.open(Paths.get("F:\\迷路帖.jpg"), StandardOpenOption.READ);
FileChannel outChannel = FileChannel.open(Paths.get("F:\\迷路帖1.jpg"), StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE_NEW);
// 从输入通道直接传到输出通道
inChannel.transferTo(0, inChannel.size(), outChannel);
inChannel.close();
outChannel.close();
}

这种方法本质与上面一样,只不过是transfer方法内部建立了直接缓冲区.传输

方法还可以换个写法:outChannel.transferFrom(inChannel, 0, inChannel.size());

分散读取:将通道中的数据依次分散到多个缓冲区中

聚集写入:将多个缓冲区的数据依次聚集到一个通道中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Test
void test4() throws IOException {
RandomAccessFile raf = new RandomAccessFile("1.txt", "rw");

//获得通道
FileChannel fileChannel = raf.getChannel();
//分配两个缓冲区
ByteBuffer buffer1 = ByteBuffer.allocate(100);
ByteBuffer buffer2 = ByteBuffer.allocate(1000);

ByteBuffer[] bufs = {buffer1, buffer2};
fileChannel.read(bufs);//分散读取,分别给每个缓冲区读取响应长度的数据

Arrays.stream(bufs).forEach(ByteBuffer::flip);
System.out.println(new String(buffer1.array(), 0, buffer1.limit()));
System.out.println(buffer2.toString());
}

原来的read或者write方法接受的是一个buffer,现在可以接收一个buffer数组.

NIO的非阻塞模式

非阻塞模式是在网络IO中才有.在服务端和客户端中间 加了一个选择器.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@Test
void client() throws IOException {
SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9090));
FileChannel inChannel = FileChannel.open(Paths.get("pom.xml"), StandardOpenOption.READ);
ByteBuffer buffer = ByteBuffer.allocate(1024);
while (inChannel.read(buffer) != -1) {
buffer.flip();
sChannel.write(buffer);
buffer.clear();
}

int len = 0;
while ((len = sChannel.read(buffer)) != -1) {
buffer.flip();
System.out.println(new String(buffer.array(), 0, len));
buffer.clear();
}
inChannel.close();
sChannel.close();
}

@Test
void server() throws IOException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
FileChannel outChannel = FileChannel.open(Paths.get("pom"), StandardOpenOption.WRITE, StandardOpenOption.CREATE);
serverSocketChannel.bind(new InetSocketAddress(9090));
SocketChannel socketChannel = serverSocketChannel.accept();
ByteBuffer buffer = ByteBuffer.allocate(1024);
while (socketChannel.read(buffer) != -1) {
buffer.flip();
outChannel.write(buffer);
buffer.clear();
}

buffer.put("接收完成".getBytes());
buffer.flip();
socketChannel.write(buffer);

socketChannel.close();
outChannel.close();
serverSocketChannel.close();
}

ServerSocketChannel与SocketChannel:通过 ServerSocketChannel.accept() 方法监听新进来的连接。当 accept()方法返回的时候,它返回一个包含新进来的连接的 SocketChannel。

上面的代码是用inchannel读入文件到SocketChannel中,SocketChannel根据ip和端口号建立套接字连接,ServerSocketChannel监听端口,得到一个SocketChannel.然后将SocketChannel中的数据写入outChannel.接着在SocketChannel中放入buffer,buffer有回传数据.让客户端显示回传数据.

但是这里的问题是,客户端在发送完数据后没有关闭与服务端的连接.这条通道就一直占用着.即客户端做完该做的事后却一直连着服务端,没有正确关闭.

而现在使用选择器可以做到多路复用,非阻塞的传输.

如果客户端准备好连接,那选择器得到准备连接的状态,让服务器与之连接.如果准备好传输数据,那么就传输数据.中间这种状态的切换是通过轮询选择器的状态码实现.一旦状态码改变就执行.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@Test
void server() throws IOException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(9090));
//获取一个选择器
Selector selector = Selector.open();
//将通道注册到选择器上,并且指定监听"OP_ACCEPT",即接收事件,表示服务器接收已经就绪
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

//当选择器中有"状态码"时
while (selector.select() > 0) {
//遍历这些"状态码"
Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
//如果是表示已准备好接收数据
if (key.isAcceptable()) {
//则获取客户端连接
SocketChannel socketChannel = serverSocketChannel.accept();
//切换非阻塞模式
socketChannel.configureBlocking(false);
//将该通道注册到选择器,监听"读就绪"
socketChannel.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable()) {
//如果是读就绪,则通过key获得通道传输数据
SocketChannel sChannel = (SocketChannel) key.channel();

ByteBuffer buffer = ByteBuffer.allocate(1024);
int len=0;
while ((len = sChannel.read(buffer)) > 0) {
buffer.flip();
System.out.println(new String(buffer.array(), 0, len));
buffer.clear();
}
}
//处理完当前这个状态码就要移出去.下次通道就绪再放到selector中
keyIterator.remove();
}
}

}

上面是一个不太完整规范的代码.其中selector.select()的返回值是表示selector中有多少个通道就绪(有几个”状态码”),它会一直阻塞到直到selector中有就绪的管道.如果方法里有参数,则表示最多阻塞多少毫秒.表示面是完整的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package selector;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class WebServer {
public static void main(String[] args) {
try {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.socket().bind(new InetSocketAddress("127.0.0.1", 8000));
ssc.configureBlocking(false);

Selector selector = Selector.open();
// 注册 channel,并且指定感兴趣的事件是 Accept
ssc.register(selector, SelectionKey.OP_ACCEPT);

ByteBuffer readBuff = ByteBuffer.allocate(1024);
ByteBuffer writeBuff = ByteBuffer.allocate(128);
writeBuff.put("received".getBytes());
writeBuff.flip();

while (true) {
int nReady = selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> it = keys.iterator();

while (it.hasNext()) {
SelectionKey key = it.next();

if (key.isAcceptable()) {
// 创建新的连接,并且把连接注册到selector上,而且,
// 声明这个channel只对读操作感兴趣。
SocketChannel socketChannel = ssc.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
}
else if (key.isReadable()) {
SocketChannel socketChannel = (SocketChannel) key.channel();
readBuff.clear();
socketChannel.read(readBuff);

readBuff.flip();
System.out.println("received : " + new String(readBuff.array()));
key.interestOps(SelectionKey.OP_WRITE);
}
else if (key.isWritable()) {
writeBuff.rewind();
SocketChannel socketChannel = (SocketChannel) key.channel();
socketChannel.write(writeBuff);
key.interestOps(SelectionKey.OP_READ);
}
it.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}

当服务器正常启动且有客户端连接上,流程是这样的 :

  1. 获取ServerSocketChannel这个通道
  2. 将这个通道在selector注册,并且监听接收就绪事件
  3. 进入while(true)循环,得到所有的selectedKey.如果此时服务端正常工作,那么显然已经”接收就绪”.则创建一个SocketChannel客户端通道.并且在selector注册这个通道监听”读就绪”.
  4. 此时来到it.remove();,在selector中去掉ServerSocketChannel通道接收就绪的事件,下次传输就又可以加这个事件.此时如果客户端准备发送还没就绪,selector的selectedKey就是空的.因为where(true),所以就会一直等.这里还可以写成上面说的selector.select()
  5. 当客户端终于发送就绪,触发了”读就绪”.进入else if (key.isReadable()).根据OP_READ这个key的key.channel得到之前注册的通道.

然后开始传输数据.

Pipe

用于线程之间单向的传输数据.

image-20200801215320731

通过Pipe.open()方法打开管道:Pipe pipe = Pipe.open();

要向管道写数据,需要访问sink通道:Pipe.SinkChannel sinkChannel = pipe.sink();

通过调用SinkChannel的write()方法,将数据写入SinkChannel,像这样

1
2
3
4
5
6
7
8
9
10
String newData = "New String to write to file..." + System.currentTimeMillis();
ByteBuffer buf = ByteBuffer.allocate(48);
buf.clear();
buf.put(newData.getBytes());

buf.flip();

while(buf.hasRemaining()) {
sinkChannel.write(buf);
}

从读取管道的数据,需要访问source通道,像这样:Pipe.SourceChannel sourceChannel = pipe.source();

调用source通道的read()方法来读取数据,像这样:

1
2
3
ByteBuffer buf = ByteBuffer.allocate(48);

int bytesRead = sourceChannel.read(buf);