异步网络编程asio库, 不过效率上可能还不如libevent, 还是说一说
以前在说epoll和libevent的时候, 就谈到了异步IO的封装, 现在专门说说异步网络编程, 主要结合ASIO来说.
Boost的ASIO是一个异步IO库,封装了对Socket的常用操作,简化了基于socket程序的开发。支持跨平台。
实际上, 根据老外所说:(比较奇怪, 明明是异步IO库…)
Library also allows to use different strategies — synchronous and asynchronous to work with sockets.
Asio provides the basic building blocks for C++ networking, concurrency and other kinds of I/O.
补充一句: ASIO库效率上可能赶不上Libevent, 有大佬测试过; 但是! 这货 跨平台 && 兼容 std::iostream.
概述
异步 IO (Async)
上面事件驱动的 select,poll,epoll 机制已经很大的提升了性能,但是在数据的读写操作上还是同步的。而异步 IO 的出现进一步提升了 Server 的处理能力(全称无阻塞),应用程序发起一个异步读写操作,并提供相关参数(如用于存放数据的缓冲区、读写数据的大小、以及请求完成后的回调函数等),操作系统在自身的内核线程中执行实际的读或者写操作,并将结果存入程序制定的缓冲区中,然后把事件和缓冲区回调给应用程序。
2.6内核提出了epoll, 这个时候正好也提出了异步IO的实现aio_read
, aio_write
, 但是Linux实际上用的很少.
在当前 C10XXK 问题的主流背景下,epoll 和异步 IO 这种事件驱动模型正逐渐变为人们的首选方案,这也是 Nginx 能不断从 Apache 中抢占市场的一个重要原因。然而从 Linux 社区来看对填平 aio(异步 IO)这个大坑并没有太大兴趣(内核2.6后支持aio_read, write),那么为了异步 IO 的统一只能从应用层进行兼容,免不了多次内核态与用户态的交互,这对程序性能自然会有损失。当然随着时间的发展单机并发性能的解决办法越来越高效,但是对应的程序开发复杂度也越来越高,我们要做的就是在这两者之间做出最优权衡。
比如 epoll + ThreadPool 就是 muduo 这个高效 C++ 网络库采取的方案。
当然也有语言&库已经封装了各自的异步 IO 库,如 boost 的 asio
, C++11的async, promise, future等一系列机制(这已经说过了, 见我的文章<C++11并发库>
).
实际上, asio是独立的库, 但渐渐作为 boost 的一部分, 大概就这个样子:
这个库, 说大也大, 说小也小; 如果你项目中实际用到了, 那么可以好好研究一下, 否则抓住
核心机制
探讨一下.
(btw: 在没有接触过这个库以前, 腾讯后端的同学告诉我, 这个库就是简化了socket的操作; 而接触之后, 我觉得, 这个库带来了一次革新, 全新的开发模式)
先来一个直观的例子, 以下代码实现一个简单的tcp服务,访问http://localhost:6688可得到字符.
(如果浏览器说响应的字符无效, 可以直接用nc命令代替nc localhost 6688
)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
using namespace boost;
using namespace std;
int main(int argc, const char * argv[]) {
using namespace boost::asio;
try {
io_service ios;
//endpoint 涵盖了所有的监听地址信息, 它要绑定到io_service上
ip::tcp::acceptor acceptor(ios, ip::tcp::endpoint(ip::tcp::v4(), 6688));
cout << acceptor.local_endpoint().address() << endl;
while (true) {
//socket代表响应远端接收对象(accept后新创建的实例)
ip::tcp::socket sock(ios);
acceptor.accept(sock);
cout << "client:" ;
cout << sock.remote_endpoint().address() << endl;
sock.write_some(buffer("hello asio\n"));
}
} catch (std::exception& e) {
cout << e.what() << endl;
}
return 0;
}
编译的时候需要-lboost_system
.
区分对比
对比同步IO
异步的代码难于编写&调试还是同步的代码? 这个很难说, 一定要说的话, 那应该是
异步
.
在同步编程中,所有的操作都是顺序执行的,比如从socket中读取( 请求) ,然后写入( 回应) 到socket中。
每一个操作都是阻塞的。因为操作是阻塞的,所以为了不影响主程序,当在socket上读写时,通常会创建一个或多个线程来处理socket的输入/输出。
因此,同步的服务端/客户端通常是多线程的。
同步代码中, 往往要自己处理线程同步共享;
异步编程是事件驱动的。虽然启动了一个操作,但是你不知道它何时会结束;它只是提供一个回调给你,当操作结束时,它会调用这个API,并返回操作结果。
异步中线程不是必须的, 但是一般也需要, 只是需要更少的线程; 但是需要提供事件响应 handler.
简单说, 就是阻塞
与否的问题, posix给出了这样的定义:
- A synchronous I/O operation causes the requesting process to be blocked until that I/O operation completes;
- An asynchronous I/O operation does not cause the requesting process to be blocked;
阻塞IO和非阻塞IO的区别就在于:应用程序的调用是否立即返回!
区分同步和异步, 一个很重要的维度, 就是阻塞
, 而且是全程阻塞
: 内核准备数据的过程, 数据从内核态到用户态内存的过程.
同步IO和异步IO的区别就在于:数据拷贝的时候进程是否阻塞!
通常所说的阻塞IO, 非阻塞IO, 包括事件IO(即多路IO), 全部都是同步IO.
他们总有一个过程是被阻塞的, 要么是在kernel准备数据的时候(例如从网络缓存数据), 比如阻塞IO; 要么是从内核拷贝数据到应用程序buffer的过程, 比如non-blocking IO.
异步IO就很纯粹:
当进程发起IO 操作之后,就直接返回再也不理睬了,直到kernel发送一个信号,告诉进程说IO完成。在这整个过程中,进程完全没有被block。
下面给个图, 你就知道了(大概了解), 干说没用, 要自己去体会:(把阻塞IO拿出来当做同步IO的代表)
或者还有一个图更加完整的说明了问题:(图片源自百度)
对比事件IO
也就是和 select/poll/epoll对比一下(抱歉,我只熟悉linux平台), 虽然都是事件驱动(functor, functional handler), 但是同步IO和异步IO还是有着本质的区别.
这其中 epoll 比较特殊, 因为2.6内核提出了epoll, 这个时候正好也提出了异步IO的实现aio_read
, aio_write
, 如下:
但是怎么说呢, 我看到的现象, Linux下大家对epoll + 非阻塞IO
乐此不疲.
我还是把相关的事件IO, 在重复啰嗦一下:1
2
3
4
5
6
7
8
9
10select:是最初解决IO阻塞问题的方法。用结构体fd_set来告诉内核监听多个文件描述符,该结构体被称为描述符集。由数组来维持哪些描述符被置位了。对结构体的操作封装在三个宏定义中。通过轮寻来查找是否有描述符要被处理,如果没有则返回.
存在的问题:
1. 内置数组的形式使得select的最大文件数受限与FD_SIZE;
2. 每次调用select前都要重新初始化描述符集(以及来回拷贝),将fd从用户态拷贝到内核态,每次调用select后,都需要将fd从内核态拷贝到用户态;
3. 轮寻排查当文件描述符个数很多时,效率很低;
poll:通过一个可变长度的数组解决了select文件描述符受限的问题。数组中元素是结构体,该结构体保存描述符的信息,每增加一个文件描述符就向数组中加入一个结构体,结构体只需要拷贝一次到内核态。poll解决了select重复初始化的问题。轮寻排查的问题未解决。
epoll:轮寻排查所有文件描述符的效率不高,使服务器并发能力受限。因此,epoll采用只返回状态发生变化的文件描述符,便解决了轮寻的瓶颈。(文件描述符受限, 来回拷贝等问题也解决了)
轮训问题, 拷贝问题(重复初始化), 最大fd问题等, 可以参考我的文章 Linux网络IO模型
对比libevent
下面是业内人士的总结, 我摘录如下:
ASIO只涉及到Socket,提供简单的线程操作。
libevent只提供了简单的网络API的封装, 线程池, 内存池, 递归锁等均需要自己实现。
ASIO主要应用了Proactor。
libevent为Reactor模式
ASIO支持单线程与多线程调度。
libevent的线程调度需要自己来注册不同的事件句柄。
ASIO是基于函数对象的hanlder事件分派。
libevent基于注册的事件回调函数来实现事件分发。
ASIO支持多种平台,可移植性不存在问题。
libevent主要支持linux平台,freebsd平台, 其他平台下通过select模型进行支持, 效率不是太高。
基于ASIO开发应用,要求程序员熟悉函数对象,函数指针,熟悉boost库中的boost::bind。内存管理控制方面。
基于libevent开发应用,相对容易, 具体大家可以参考memcached这个开源的应用,里面使用了libevent这个库。
具体可参考我另外一篇文章 libevnet
boost asio Vs asio
下面是从它官网摘抄的一些区别:
- Asio is in a namespace called asio::, whereas Boost.Asio puts everything under boost::asio::.
- The main Asio header file is called asio.hpp. The corresponding header in Boost.Asio is boost/asio.hpp. All other headers are similarly changed.
- Any macros used by or defined in Asio are prefixed with ASIO_. In Boost.Asio they are prefixed with BOOSTASIO.
- Asio includes a class for launching threads: asio::thread. Boost.Asio does not include this class, to avoid overlap with the Boost.Thread library
- Boost.Asio uses the Boost.System library to provide support for error codes ( boost::system::error_code and boost::system::system_error). Asio includes these under its own namespace ( asio::error_code and asio::system_error). For C++11, Asio uses the std::error_code and std::system_error classes shipped with the compiler. When not using C++11, the Boost.System version of these classes currently supports better extensibility for user-defined error codes.
- Asio is header-file-only and for most uses does not require linking against any Boost library. When using C++11 with recent versions of gcc, clang or MSVC, Asio can be used independently of Boost by defining ASIO_STANDALONE when you compile. Boost.Asio always requires that you link against the Boost.System library, and also against Boost.Thread if you want to launch threads using boost::thread.
(如非必要, 可以单独使用 asio库, 而非boost.asio库)
核心概述
概述
io_service : 每个 Asio 程序都至少有一个 io_service 对象,它代表了操作系统的 I/O 服务,并把你的程序和这些服务链接起来。Boost.Asio使用io_service同操作系
统的输入/输出服务进行交互。
有了 io_service 还不足以完成 I/O 操作,用户一般也不跟 io_service 直接交互。根据 I/O 操作的不同,Asio 提供了不同的 I/O 对象,比如 timer(定时器),socket,等等。
timer : Timer 可以用来实现 watcher,是最简单的一种 I/O 对象。
socket : 支持网络编程的IO对象.
下面是一个简单的例子(同步服务端模型), 一个io_service, 建立socket, 把socket连接到目标地址和端口.1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20using boost::asio;
//线程执行体
void client_session(socket_ptr sock) {
while ( true) {
char data[512];
size_t len = sock->read_some(buffer(data));
if ( len > 0)
write(*sock, buffer("ok", 2)); //同步IO
}
}
//
typedef boost::shared_ptr<ip::tcp::socket> socket_ptr;
io_service service;
ip::tcp::endpoint ep( ip::tcp::v4(), 2001)); // listen on 2001
ip::tcp::acceptor acc(service, ep);
while ( true) {
socket_ptr sock(new ip::tcp::socket(service));
acc.accept(*sock); //接受请求
boost::thread( boost::bind(client_session, sock)); //建立新线程处理请求
}
上面的write
是同步IO, 异步的话应该是异步的话async_write(stream, buffer [, extra options], handler)
.
并且使用的是ip::tcp::acceptor
的accept方法, 异步一般使用async_accept方法.
异步服务端典型的, 如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20using boost::asio;
typedef boost::shared_ptr<ip::tcp::socket> socket_ptr;
io_service service;
ip::tcp::endpoint ep( ip::tcp::v4(), 2001)); // 监听端口2001
ip::tcp::acceptor acc(service, ep);
socket_ptr sock(new ip::tcp::socket(service));
start_accept(sock);
service.run(); //只有调用了io_service::run()方法的线程才会调用回调函数
void start_accept(socket_ptr sock) {
acc.async_accept(*sock, boost::bind( handle_accept, sock, _1) );
}
void handle_accept(socket_ptr sock, const boost::system::error_code &err) {
if ( err) return;
// 从这里开始IO操作, 你可以从socket读取或者写入
socket_ptr sock(new ip::tcp::socket(service));
start_accept(sock);
}
上面的代码 async_accept
内部handler的IO读写逻辑是, 再创建了一个新的socket, 然后再次调用start_accept().
用来创建另外一个“等待客户端连接”的异步操作,从而使service.run()循环一直保持忙碌状态.
库依赖
Boost.asio库依赖于下面这些库:
- Boost.System:这个库(必须)为Boost库提供操作系统支持
- Boost.Regex:使用这个库( 可选的) 以便你重载read_until()或者async_read_until()时使用boost::regex参数。
- Boost.DateTime:使用这个库( 可选的) 以便你使用Boost.Asio中的计时器
- OpenSSL:使用这个库( 可选的) 以便你使用Boost.Asio提供的SSL支持。
编译库
一般需要预先编译boost, 即使你是使用 boost.asio 的头文件.1
bjam –with-system –with-regex stage
上面的-with
部分就是依赖部分, 可以根据需要进行编译, 具体的编译参数参考:1
2
3
4
5
6
7
8
9
10
11--build-dir=<builddir> 编译的临时文件会放在builddir里(这样比较好管理,编译完就可以把它删除了)
--stagedir=<stagedir> 存放编译后库文件的路径,默认是stage
--build-type=complete
编译所有版本,不然只会编译一小部分版本,确切地说是相当于:
variant=release, threading=multi;link=shared|static;runtime-link=shared
variant=debug|release 决定编译什么版本(Debug or Release?)
link=static|shared 决定使用静态库还是动态库
threading=single|multi 决定使用单线程还是多线程库
runtime-link=static|shared 决定是静态还是动态链接C/C++标准库
--with-<library> 只编译指定的库,如输入--with-regex就只编译regex库了
--show-libraries 显示需要编译的库名称
(运行时, 少补了-I
和-L
选项)
异常问题
异步函数出错, 既可以返回error code, 也可以抛出异常(boost::system::system_error
); 两种方式任选(最好保持一致).
例如下面的代码:1
2
3
4
5
6
7
8
9
10
11try {
sock.connect(ep);
} catch(boost::system::system_error e) {
std::cout << e.code() << std::endl;
}
//或者
boost::system::error_code err;
sock.connect(ep, err);
if (err) {
std::cout << err << std::endl;
}
具体使用哪种方式没有太多要求, 但是没有太多折中选择给你:
- 如果你的代码复杂, 则可以使用异常机制(试想每个异步函数都检查返回值, 很累的), 但是这样做势必会在编译时产生很多监视代码, 降低运行效率
- 如果使用error code, 则可能存在大量的返回值检查代码逻辑.
我个人一般使用error code
, 可以具体的检查原因, 比如:1
2
3
4
5
6char data[512];
boost::system::error_code error;
size_t length = sock.read_some(buffer(data), error);
if (error == error::eof) {
return; // 连接关闭
}
(也会在一些场合使用异常处理机制, 毕竟有时候要书写简单)
线程安全
异步编程中, 也会使用线程(boost::thread), 但不会像同步代码中那么多.
异步代码往往最关心的不是是否使用线程问题, 而是线程安全问题
:
- io_service 线程安全
- socket 非线程安全
- utility 非线程安全
主要注意事项也很简单, 非要线程安全的类实例, 最好不要在多个线程中使用; 如果要用, 自己注意加锁控制.
超时处理
一些I/O操作需要一个超时时间, 这只能应用在异步操作上(同步意味着阻塞, 因此没有超时时间), 这就好像一个事件来了一样(实际上是时间事件), 可以开始回调了. 超时处理, 通常是用计时器
, 即deadline_timer
, 通常用法如下:1
2
3deadline_timer t(service, boost::posix_time::milliseconds(500));
t.wait(); // 同步等待(阻塞)
t.async_wait(&deadline_handler_func); //异步等待(不阻塞)
当同步等待的时候, 其实和boost::this_thread::sleep(500);
没有区别.
下面有一个示例代码:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16bool read = false;
void deadline_handler(const boost::system::error_code &) {
std::cout << (read ? "read successfully" : "read failed") <<
std::endl;
}
void read_handler(const boost::system::error_code &) {
read = true;
}
ip::tcp::socket sock(service);
//…
read = false;
char data[512];
sock.async_read_some(buffer(data, 512), read_handler);//异步读取buffer内容
deadline_timer t(service, boost::posix_time::milliseconds(100)); //设置等待时间100毫秒
t.async_wait(&deadline_handler); //设置异步等待(异步操作同时可以干其他的)
service.run(); //进行异步操作
在上述代码片段中,如果你在超时之前读完了数据,read则被设置成true,这样我们的伙伴就及时地通知了我们。
否则,当deadline_handler被调用时,read还是false,也就意味着我们的操作超时了。
计时器的使用, 在TCP编程中, 再常见不过了, 可以参考我的其他文章TCP中的计时器, 主要有:
- 建立连接定时器(connection-establishment timer)
- 重传定时器(retransmission timer)
- 延迟应答定时器(delayed ACK timer)
- 坚持定时器(persist timer)
- 保活定时器(keepalive timer)
- FIN_WAIT_2定时器(FIN_WAIT_2 timer)
- TIME_WAIT定时器 (TIME_WAIT timer, 也叫2MSL timer)
信号处理
可以创建一个signal_set
实例,指定异步等待的信号量,然后当这些信号量产生时,就会调用你的异步处理程序.
例如下面的代码, 处理软终止和中断信号:
1 | void signal_handler(const boost::system::error_code & err, int s |
io_service
如果要使用异步操作, 一般都会用到ioservice
的run
方法, 它处理异步操作, 和操作系统打交道, 并且调用回调函数.
没有异步操作的话, service.run()
直接返回(先还是会调用相关的回调函数).
这个类中有很多成员方法, 下面再细说.
在整个boost.asio中该类占有很重要地位.
普通文件IO
也可以连接到普通文件IO, 比如linux下可以这么做:
1 | posix::stream_descriptor sd_in(service, ::dup(STDIN_FILENO)); |
windows可以这么做:
1 | HANDLE h = ::OpenFile(...); |
实用性探究
看到 Boost.Asio 的文档有 1000 多页, 所以根本没有必要全文谈, 也谈不完.
下面的所有探究和注解, 全部都是以实用为主, 去理解这个库, 使用这个库, 而不是谈论API都有哪些重载.
参考
<Boost.Asio C++ Network Programming-2013>
这本书在git-book上有翻译版本(谢谢群里的小伙伴)- What is Boost.Asio, and why we should use it
- http://think-async.com/Asio