一、裸机中的IO
我们先看下计算机的组成部分:
从图中我们很清楚的看到Input/Output 即为 IO,也就是计算机主机和外部设备进行信息的交换。
这种交换可能是磁盘IO也有可能是网络IO。
二、操作系统中的IO
操作系统分为内核态和用户态,且默认认为应用程序直接和磁盘或网卡进行IO操作是不安全的。因此操作系统为上层提供了相应的API供应用程序调用。操作系统接收到IO调用然后去内核进行IO执行。如图所示:
操作系统内核完成IO操作还包括两个过程:
准备数据阶段:内核等待I/O设备准备好数据
拷贝数据阶段:将数据从内核缓冲区copy到用户进程缓冲区
一次IO的本质其实就是: 等待数据 + 拷贝
三、IO分类
一个完整的IO过程包括以下几个步骤
1、用户应用程序向操作系统发起IO请求
2、操作系统向磁盘或网卡或者外部设备发送IO请求(用户应用程序等待1)
3、操作系统等待磁盘或者网卡或者外部设备准备数据(用户应用程序等待2)
4、操作系统将将数据加载到内核缓冲区(用户应用程序等待3)
5、操作系统将数据copy到用户进程缓冲区(用户应用程序等待4)
6、用户应用程序使用IO资源
1、阻塞IO
阻塞IO即用户程序发出IO操作调用后,等待内核返回数据,内核发起IO执行等待网卡或磁盘返回数据,一直等到数据具备,内核将数据copy到用户程序使用。期间用户程序都是阻塞状态,效率低,CPU利用率差。
问题:用户程序在等待数据传输时无法执行其他任务,导致资源利用率低下。利用多线程来提高并发也会导致调度成本的增加,也会增加编程的难度
2、同步非阻塞IO
用户应用程序向操作系统发起IO请求后,立刻返回一个标识(此时IO资源还不可用),用户应用程序可以先做其他的事情(比如再次去接收客户端的连接请求)。用轮询机制查询IO资源状态等到资源具备后,再进行数据copy进行使用。
问题:如果有1万个并发会导致用户程序需要发起1万次系统调用,增加了成本(基本开销有:用户栈、内核栈切换,保存寄存器信息,由于用户空间和内核空间地址不同会导致cpu缓存失效,也有可能导致页表切换,因此可能导致cpu缓存失效,一般需要几百ns,而普通的函数调用只需要1ns)
3、多路复用IO
提供一种机制,可以监听多个文件描述符,一旦发现有就绪状态就通知用户应用程序执行读写操作
select
POSIX标准中的
有文件描述符个数限制(1024),linux/posix_types.h头文件有这样的声明:
#define __FD_SETSIZE 1024
比如有1千个并发,用户程序调用操作系统select函数,将1千个文件描述符传给内核区域,内核遍历这1千个文件描述符,如果没有就绪就阻塞当前进程,如果有数据具备的就给其打上标识返回给用户程序。用户程序遍历1千文件描述符,发现数据具备的文件描述符后进行读写操作(从linux帮助手册中可以看到select()还是有bug的,特殊情况下会将文件描述符阻塞状态当作非阻塞状态使用)
epoll
Linux特有的
没有文件描述符个数限制,它的限制是操作系统限制的最大的打开文件句柄数目(用户可以自己设定)
用mmap在调用进程的虚拟地址空间中创建新的映射,让内核和用户应用程序都可以访问这份地址空间,并用红黑树存放所有的文件描述符,再用一个链表存放已经就绪的文件描述符,用户应用程序只需要关注这个链表就可以了
4、异步IO
可以看到非阻塞IO解决了步骤2-步骤4的等待问题,但是步骤5还是需要等待。异步IO可以做到步骤5完成后再通知用户应用程序,这样用户应用程序就可以直接使用了。
目前只有Windows上支持AIO,linux上还不支持,因此AIO的使用并没有NIO广泛。
四、相关linux操作系统调用
1、read
ssize_t read(int fd, void *buf, size_t count);
描述:
传一个文件描述符、buf缓冲区。返回读取的字节数
从文件描述符fd向buf缓冲区读取多个字节
读取操作从当前文件偏移量开始,文件偏移量按读取的字节数递增,如果当前文件偏移位于或超过文件末尾,则不会读取任何字节,read()返回零。
2、write
ssize_t write(int fd, const void *buf, size_t count);
描述:
传一个文件描述符、buf缓冲区。返回写的字节数
从指向缓冲区的buf向文件描述符fd引用的文件写入计数字节
如果底层物理介质上的空间不足,或者遇到RLIMIT_FSIZE资源限制可参考函数 setrlimit(2),
如果在写入的字节数少于计数字节后,调用被信号处理程序中断,则写入的字节数量可能小于计数。可参考函数pipe(7)
对于可查找的文件可以调用lseek(2)
注意:
write()的成功返回并不能保证数据已提交到磁盘。事实上,在一些有缺陷的实现中,它甚至不能保证已成功为数据保留了空间。唯一可以确定的方法是在写入所有数据后调用fsync(2)
如果write()在写入任何字节之前被信号处理程序中断,则调用失败,并返回错误EINTR;如果在写入至少一个字节后中断,则调用成功,并返回写入的字节数。(所以上层必须根据返回的字节数判断文件是否写完了)
3、select
int select(int nfds, fd_set *readfds, fd_set *writefds,fd_set *exceptfds, struct timeval *timeout);
void FD_CLR(int fd, fd_set *set);
int FD_ISSET(int fd, fd_set *set);
void FD_SET(int fd, fd_set *set);
void FD_ZERO(fd_set *set);
int pselect(int nfds, fd_set *readfds, fd_set *writefds,fd_set *exceptfds, const struct timespec *timeout,const sigset_t *sigmask);
描述:
select() 和pselect() 允许程序监视多个文件描述符,等待一个或多个文件描述符“ready”进行某类I/O操作(例如,可能输入)。如果可以在不阻塞的情况下执行相应的I/O操作(例如read(2)),则认为文件描述符已准备就绪。
select() 和pselect() 的操作是相同的,但有以下三个区别:
1、select() 使用一个超时,它是一个结构体timeval(带秒和微秒),而pselect() 使用结构体timespec(含秒和纳秒)。
2、select() 可以更新timeout参数,以指示还剩多少时间。pselect() 不会更改此参数。
3、select() 没有sigmask参数,其行为类似于使用NULL sigmask调用的pselect()。
提供了四个宏来操纵集合。FD_ZERO() 清除集合。FD_SET()和FD_CLR()分别在集合中添加和删除给定的文件描述符。FD_ISSET()测试文件描述符是否是集合的一部分;这在select()返回后很有用。
BUGS:
从2.1版本开始,glibc提供了使用sigprocmask(2) 和select() 实现的pselect() 的仿真。此实现仍然容易受到pselect() 旨在防止的竞争条件的影响。glibc的现代版本在提供它的内核上使用(无种族)pselect() 系统调用
在缺少pselect() 的系统上,可以使用自管道技巧实现可靠(且更便携)的信号捕获。在这种技术中,信号处理程序将一个字节写入管道,管道的另一端由主程序中的select()监视。(为了避免在写入可能已满的管道或从可能为空的管道读取时可能发生阻塞,在读取和写入管道时使用非阻塞I/O。)
在Linux下,select()可能会将套接字文件描述符报告为“准备读取”,但随后的读取仍会阻塞。例如,当数据已经到达,但在检查时具有错误的校验和并被丢弃时,可能会发生这种情况。可能还有其他情况,其中文件描述符被虚假地报告为就绪。因此,在不应该阻塞的套接字上使用O_NONBLOCK可能更安全。
在Linux上,如果调用被信号处理程序中断(即EINTR错误返回),select()也会修改超时。POSIX.1-2001不允许这样做。Linux pselect()系统调用具有相同的行为,但glibc包装器通过在内部将超时复制到本地变量并将该变量传递给系统调用来隐藏此行为
4、epoll
epoll - I/O event notification facility (epoll不是一个函数调用,而是一个IO事件通知工具)
描述:
epoll API执行与poll(2)类似的任务:监视多个文件描述符以查看是否可以对其中任何一个进行I/O。epoll API既可以用作边缘触发接口,也可以用作级别触发接口,并且可以很好地扩展到大量关注的文件描述符。提供以下系统调用来创建和管理epoll实例
epoll_create(2) : 创建一个epoll实例,并返回一个引用该实例的文件描述符
然后通过epoll_ctl(2) 注册、修改、删除特定文件描述符。当前在epoll实例上注册的文件描述符集有时称为epoll集
epoll_wait(2)等待I/O事件,如果当前没有可用的事件,则阻塞调用线程
epoll有两种工作模式ET(边缘触发)、LT(水平触发)(默认使用LT)
1、表示管道读取端(rfd)的文件描述符在epoll实例上注册。
2、管道写入器在管道的写入侧写入2kB的数据。
3、调用epoll_wait(2) 将返回rfd作为就绪文件描述符。
4、管道读取器从rfd读取1kB的数据。
5、对epoll_wait(2) 的调用已完成。
如果已使用EPOLLET ET模式,标志将rfd文件描述符添加到epoll接口,则步骤5中对epoll_wait(2)的调用可能会挂起,尽管文件输入缓冲区中仍存在可用数据;同时,远程对等体可能期望基于其已经发送的数据的响应。这样做的原因是,边缘触发模式仅在受监视的文件描述符发生更改时才传递事件。因此,在步骤5中,调用者可能会等待输入缓冲区中已经存在的一些数据。在上面的例子中,由于在2中完成了写入,将在rfd上生成一个事件,该事件在3中被消耗。由于4中完成的读取操作不会消耗整个缓冲区数据,因此步骤5中对epoll_wait(2)的调用可能会无限期阻塞。
5、mmap
void *mmap(void *addr, size_t length, int prot, int flags,int fd, off_t offset);
int munmap(void *addr, size_t length);
描述:
将文件或设备映射或取消映射到内存中
mmap() 在调用进程的虚拟地址空间中创建一个新的映射。新映射的起始地址在addr中指定。length参数指定映射的长度。
如果addr为NULL,则内核选择创建映射的地址;这是创建新映射的最便携的方法。如果addr不为NULL,则内核将其视为关于映射位置的提示;在Linux上,映射将在附近的页面边界处创建。新映射的地址作为调用的结果返回。
文件映射的内容使用从文件描述符fd引用的文件(或其他对象)中的偏移量开始的长度字节进行初始化。偏移量必须是sysconf(_SC_page_size)返回的页面大小的倍数。
prot参数描述了映射所需的内存保护(并且不得与文件的打开模式冲突)。它要么是PROT_NONE,要么是以下一个或多个标志的位(PROT_EXEC可执行、PROT_READ可读取、PROT_WRITE可写入、PROT_NONE无法访问)
flags参数确定映射的更新是否对映射同一区域的其他进程可见,以及更新是否会传递到底层文件。此行为是通过在标志中包含以下值之一来确定的:(MAP_SHARED、MAP_SHARED_VALIDATE等等)
mmap() 映射的内存在fork(2) 中保留,具有相同的属性。
文件以页面大小的倍数映射。对于不是页面大小倍数的文件,映射时剩余内存为零,对该区域的写入不会写入文件。未指定更改映射基础文件大小对与文件添加或删除区域对应的页面的影响
munmap()
munmap() 系统调用删除指定地址范围的映射,并导致对该范围内地址的进一步引用生成无效的内存引用。进程终止时,该区域也会自动取消映射。另一方面,关闭文件描述符并不会取消映射该区域。
注意:
不同的硬件PROT_WRITE、PROT_READ、PROT_EXEC状态可能表示方式不同
BUGS:
在Linux上,没有像上面MAP_NORESERVE下建议的那样的保证。默认情况下,当系统内存不足时,任何进程都可以随时被终止。
在2.6.7之前的内核中,只有当prot指定为prot_NONE时,MAP_POPULATE标志才有效。
指定系统应始终对对象末尾的任何部分页面进行零填充,并且系统永远不会在对象末尾之后写入对对象的任何修改。在Linux上,当您在对象结束后将数据写入此类部分页面时,即使在文件关闭并取消映射后,数据也会留在页面缓存中,即使数据从未写入文件本身,后续映射也可能会看到修改后的内容。在某些情况下,这可以通过在取消映射之前调用msync(2) 来修复;然而,这在tmpfs上不起作用(例如,在使用shm_overview(7)中记录的POSIX共享内存接口时)。
五、代码示例
1、BIO
Client
import java.io.*; import java.net.Socket; public class BIOClient { public static void main(String[] args) throws Exception { //构建客户端Socket,并于服务端建立连接 Socket socket = new Socket("127.0.0.1", 8888); //构建输出流向服务端发送信息 OutputStream outputStream = socket.getOutputStream(); PrintStream ps = new PrintStream(outputStream); ps.println("hello"); ps.flush(); //构建输入流获取客户端的信息 InputStream inputStream = socket.getInputStream(); BufferedReader br = new BufferedReader(new InputStreamReader(inputStream)); String s = null; //阻塞等待服务端发送给客户端的数据 while ((s = br.readLine()) != null) { System.out.println(Thread.currentThread().getName()+" 接收到服务端的数据:" + s); } outputStream.close(); socket.close(); } }
Server
import java.io.*; import java.net.ServerSocket; import java.net.Socket; public class BIOServer { public static void main(String[] args) throws Exception { int port = 8888; // 服务器端口 ServerSocket serverSocket = new ServerSocket(port); while (true) { // 阻塞等待客户端连接 Socket socket = serverSocket.accept(); System.out.println("客户端连接成功:" + socket.getRemoteSocketAddress()); // 每个连接创建一个线程处理 ,且每个线程同一时刻只能处理一个连接 new Thread(() -> { try { // 输入流读取数据 输入流是客户端发的信息,输出流是自己发出去给客户端的信息 BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream())); String inputLine; OutputStream outputStream = socket.getOutputStream(); PrintWriter printWriter = new PrintWriter(outputStream); //阻塞状态,等待输入信息完全接收才给客户端发送信息 while ((inputLine = in.readLine()) != null) { printWriter.println("给客户端的消息:收到你的消息:"+inputLine); printWriter.flush(); System.out.println("客户端消息:" + inputLine); } printWriter.close(); in.close(); } catch (Exception e) { e.printStackTrace(); } }).start(); } } }
2、NIO
Client
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; public class NIOClient { public static void main(String[] args) { try { // 创建 SocketChannel SocketChannel socketChannel = SocketChannel.open(); socketChannel.connect(new InetSocketAddress("localhost", 8080)); // 发送数据到服务器 String message = "Hello, Server!"; ByteBuffer buffer = ByteBuffer.wrap(message.getBytes()); socketChannel.write(buffer); socketChannel.close(); // 关闭连接 } catch (IOException e) { e.printStackTrace(); } } }
Server
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; public class NIOServer { public static void main(String[] args) { try { // 创建 ServerSocketChannel ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.bind(new InetSocketAddress(8080)); while (true) { // 等待客户端连接 SocketChannel clientSocketChannel = serverSocketChannel.accept(); System.out.println("Client connected: " + clientSocketChannel.getRemoteAddress()); // 创建 ByteBuffer 用于读取数据 ByteBuffer buffer = ByteBuffer.allocate(1024); // 从客户端读取数据 int bytesRead; while ((bytesRead = clientSocketChannel.read(buffer)) != -1) { buffer.flip(); // 切换至读模式 while (buffer.hasRemaining()) { System.out.print((char) buffer.get()); // 打印接收到的数据 } buffer.clear(); // 清空缓冲区,准备下一次读取 } clientSocketChannel.close(); // 关闭客户端连接 } } catch (IOException e) { e.printStackTrace(); } } }
3、NIO epoll
Client
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; public class NioEpollClient { private final Selector selector; private final SocketChannel socketChannel; public NioEpollClient(String host, int port) throws IOException { socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); // 连接到服务器 if (!socketChannel.connect(new InetSocketAddress(host, port))) { while (!socketChannel.finishConnect()) { // 等待连接完成,可以在这里添加超时逻辑 } } // 注册连接完成事件 selector = Selector.open(); socketChannel.register(selector, SelectionKey.OP_CONNECT); } public void send(String message) throws IOException { byte[] data = message.getBytes(); socketChannel.write(java.nio.ByteBuffer.wrap(data)); } public void close() throws IOException { socketChannel.close(); selector.close(); } public static void main(String[] args) throws IOException { NioEpollClient client = new NioEpollClient("localhost", 8888); client.send("Hello Server"); client.close(); }
Server
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.util.Iterator; import java.util.Set; public class NioEpollServer { private Selector selector; private ServerSocketChannel serverSocket; public NioEpollServer(int port) throws IOException { selector = Selector.open(); serverSocket = ServerSocketChannel.open(); serverSocket.configureBlocking(false); serverSocket.socket().bind(new InetSocketAddress(port)); serverSocket.register(selector, SelectionKey.OP_ACCEPT); } public void listen() throws IOException { while (true) { selector.select(); Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> it = selectedKeys.iterator(); while (it.hasNext()) { SelectionKey key = it.next(); it.remove(); if (key.isAcceptable()) { ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); SocketChannel socket = ssc.accept(); socket.configureBlocking(false); socket.register(selector, SelectionKey.OP_READ); } else if (key.isReadable()) { SocketChannel socket = (SocketChannel) key.channel(); int count; StringBuilder buffer = new StringBuilder(); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); while ((count = socket.read(byteBuffer)) > 0) { byteBuffer.flip(); buffer.append(Charset.forName("UTF-8").decode(byteBuffer)); byteBuffer.clear(); } // Handle received data if (buffer.length() > 0) { System.out.println("Received data: " + buffer.toString()); } // Prepare response String response = "Hello Client!"; ByteBuffer outBuffer = ByteBuffer.wrap(response.getBytes()); socket.write(outBuffer); }else if(key.isWritable()){ SocketChannel socket = (SocketChannel) key.channel(); String response = "Hello Client 2 !"; ByteBuffer outBuffer = ByteBuffer.wrap(response.getBytes()); socket.write(outBuffer); } } } } public static void main(String[] args) throws IOException { NioEpollServer server = new NioEpollServer(8888); server.listen(); } }