brpc作为一个rpc框架,除了支持作为server当然也需要对调用其他server有良好的支持,这篇将着重介绍一下brpc访问其他server的具体过程以及与之相关的往socket写response的机制。
brpc里发送数据都是调用channel类,关键在于channel::callmethod函数,不管是基于protobuf的正统rpc调用,还是http请求,都是依赖这个函数,定义如下:
注释解释的很清楚,一共有五个参数
- method:要调用的远端服务
- controller:包含附加数据和各种选项
- request:调用服务的请求
- response:调用服务后的返回
- done:用于回调,如果非null,发请求后会立即返回,调用结束后会通过done-run执行回调函数,属于异步调用,如果为null,则会阻塞直到调用完成。
其中1、3、4属于基于protobuf的正统rpc调用需要的参数,如果是普通的http请求则不需要,
1.1 基于proto的rpc请求示例
调用的echo函数是pb生成的,如下:
1.2 普通http请求示例:
而在callmethod里面,最核心的就是issuerpc,也就是真正向远端发起调用的部分。
在issuerpc里面,先是拿了一个临时的socket并根据选服务器的策略设置好远端地址之类的,
然后根据连接类型获取合适的socket,注意tmp_socket的使用
如果是单连接直接把tmp_sock传给sending_sock,如果是连接池或短连接,则基于tmp_socket的一些信息去得到相应的连接池socket或者短连接socket,然后释放掉tmp_socket,注意拿到的用于发送的socket里都是注册了epoll in事件的,事件触发后调用 以前讲过的的onnewmessage。
然后就是打包数据发送,首先调用对应的协议打包:
调用socket的write发送:
而在上面说到的write函数里,则是调用startwrite开始写,为了提高性能和吞吐,发消息有点复杂,官方文档是这么描述的:
"消息”指向连接写出的有边界的二进制串,可能是发向上游client的response或下游server的request。多个线程可能会同时向一个fd发送消息,而写fd又是非原子的,所以如何高效率地排队不同线程写出的数据包是这里的关键。baidu-rpc使用一种wait-free mpsc链表来实现这个功能。所有待写出的数据都放在一个单链表节点中,next指针初始化为一个特殊值(socket::writerequest::unconnected)。当一个线程想写出数据前,它先尝试和对应的链表头(socket::_write_head)做原子交换,返回值是交换前的链表头。如果返回值为空,说明它获得了写出的权利,它会在原地写一次数据。否则说明有另一个线程在写,它把next指针指向返回的头,那样正在写的线程之后会看到并写出这块数据。这套方法可以让写竞争是wait-free的,而获得写权利的线程虽然在原理上不是wait-free也不是lock-free,可能会被一个值仍为unconnected的节点锁定(这需要发起写的线程正好在原子交换后,在设置next指针前,仅仅一条指令的时间内被os换出),但在实践中很少出现。在当前的实现中,如果获得写权利的线程一下子无法写出所有的数据,会启动一个keepwrite线程继续写,直到所有的数据都被写出。这套逻辑非常复杂,大致原理如下图,细节请阅读socket.cpp。
startwrite里,如上述,首先是判断是否有其他线程在写,如果有其他线程在写则把自己挂进链表后直接返回,因为正在写的线程写完自己的部分后会接着写链表里的其他部分。否则置req->next为null,开始就地写:
一个fd来了三个request的示意图如下(整个过程第一个request没写完,一直在写第一个request),注意在当前状态下,链表其实是反的,writehead里存的是最近添加的
写数据时,首先connectifnot,该函数有三种返回值,-1 0 和1,0标识原来已连接,-1标识连接失败,1表示正在连接,连接好后会执行keepwriteifconnected回调开始写,里面会调用keepwrite:
注意在connect的时候会用keepwriteifconnected作为回调函数注册epoll out事件,也就如果没连接,则进行连接操作并注册epollout,connectifnot返回1,startwrite会直接返回,epoll out保证一旦连接上会接续写。
如果connectifnot返回0,则继续往下走:
当前线程内就地写一次,如果写完了结束,没写完另起线程开始写:
iswritecomplete用于判断是否写完,当前request没写完和有新来的request都算没写完:
函数里首先判断当前request是否写完,return_when_no_more是没有新request下的返回值,如果当前写完了就是true,也就是整个写完了,否则是false,还是需要启动新的bthread来keepwrite。desired是想要赋给_write_head的值,也就是接下来要写的部分,初始化为null,如果当前的old_head还没写完,那么赋值成old_head。接下来调用_write_head.compare_exchange_strong,也就是如果new_head和_write_head相等,那么把desired放进write_head,返回true,否则赋值把new_head置成_write_head的值,返回false。compare_exchange_strong返回true表明没有新req过来,新的尾部也就是老的头部,返回return_when_no_more,也就是当前节点是否写完。compare_exchange_strong返回失败则表明有新的request替换进了write_head里,new_head被置成新的_write_head的值了,继续往下走。
前面提到了,在写的过程如果有新req过来,链表其实是反的,上图就是翻转的过程,new_head经过前面的过程存的是前面时刻的_write_head,假设req1写完一次后进入了iswritecomplete函数,拿到的new_head是3,old_head是1,随后又来了一个4,那么从3开始翻转到1结束,翻转后如下,4此时并没有什么存在感,本轮不处理:
这种情况下iswritecomplete返回false,所以startwrite函数来到下面部分:
就地写没写完,拿当前req作为参数启动bt执行keepwrite继续写,
keepwrite主要进行如下操作:优先写完当前req。 只有当前req写完了再写下一个,因为前面执行iswritecomplete函数的时候对链表的已知部分进行了翻转,所以req->next就是下一个过来的req。
批量释放掉已完成的request:
找到当前链表的尾部,比如在上面那种情况下尾部就是3,并用这个尾部做参数去调用上面说过的那个iswritecomplete
这里再详细解释下keepwrite调用iswritecomplete的参数的过程,可以结合上面的链表示意图来理解,
keepwrite是用cur_tail和req做参数去调用的,cur_tail作为old_head,这里调用后如果尾部发生了更新则用新值更新到cur_tail里,注意iswritecomplete后,撇开新入的req,原来的req们都是被翻转过的,也就是说cur_tail是新尾部。keepwrite调用iswritecomplete大概可以分为四种情况,重点在于如果有新来的req则会触发针对后来的一批的翻转:
1.req == cur_tail ,有新来的req
原本只有一个要写,但有新来的所以compare_exchange_strong失败,做翻转后新尾更新到cur_tail,返回false,未写完
2.req == cur_tail ,没有新来的req
原本只有一个要写,而且没有新来的,如果这一个没写完返回false,写完了返回true
3.req != cur_tail ,有新来的req
因为!singular_node成立,所以return_when_no_more = false,但有新来的所以compare_exchange_strong失败,做后面的翻转操作,比如上面图的例子,再次调用iswritecomplete后4会成为翻转后的新尾,更新到cur_tail,保证后面的循环回写到。
4.req != cur_tail , 没有新来的req:
因为!singular_node成立,所以return_when_no_more = false,且compare_exchange_strong成功,直接返回return_when_no_more,也就是false。
整个keepwrtie在一个do whilie(1)结构体里,也就是没写完就不断地一直写,顺序就是先写完当前req的,再写翻转过的链表的下一个req,保证顺序。
总的来说这套写的机制比较复杂,对于一次往socket上写数据,总结如下:
- 原子操作判断是否有其他线程在写这个socket了,如果有,把这次写和原写队列连接好后直接返回,正在写的在写完自己的数据后会把同一socket上写队列里的其余的数据也写完。
- 如果没人写,判断是否已经连接,已连接的话当前线程直接发起写以尽快完成写入,避免上下文切换,而且如果数据量比较大,一次没写完后也会马上返回,起其他线程来继续写入,以避免阻塞,因为当前发起写的就是就是普通线程,brpc是没有专门的io线程的。
- 还没连接则发起连接并用“继续写入的函数”作为回调注册epollout后直接返回,让回调完成后续的写操作。
之所以用这套复杂的机制,最主要的目的还是追求效率、避免阻塞,提高吞吐。