菜鸟笔记
提升您的技术认知

brpc 笔记

bthread(一) 前言bthread(二) 线程模型及bthreadbthread(三) bthread数据结构bthread(四) bthread用户接口和代码执行路径bthread(五) 无锁队列rq的代码实现bthread(六) 小结brpc的精华bthread源码剖析brpc介绍、编译与使用brpc源码解析(一)—— rpc服务添加以及服务器启动主要过程brpc源码解析(二)—— brpc收到请求的处理过程brpc源码解析(三)—— 请求其他服务器以及往socket写数据的机制brpc源码解析(四)—— bthread机制brpc源码解析(五)—— 基础类resource pool详解brpc源码解析(六)—— 基础类socket详解brpc源码解析(七)—— worker基于parkinglot的bthread调度brpc源码解析(八)—— 基础类eventdispatcher详解brpc源码解析(九)—— 基础类workstealingqueue详解brpc源码解析(十)—— 核心组件bvar详解(1)简介和整体架构brpc源码解析(十一)—— reducer类和adder类解析brpc源码解析(十二)—— 核心组件bvar详解 agentgroup类详解brpc源码解析(十三)—— 核心组件bvar详解(4)combiner详解brpc源码解析(十四)—— 核心组件bvar详解 sampler详解brpc源码解析(十五)—— bthread栈创建和切换详解brpc源码解析(十六)—— 作为client的连接建立和处理详解brpc源码解析(十七)—— bthread上的类futex同步组件butex详解brpc源码解析(十八)—— mpsc队列executionqueue详解brpc源码解析(十九)—— 双buffer数据结构doublybuffereddata详解brpc源码解析(二十)—— 用于访问下游的channel类详解

brpc源码解析(三)—— 请求其他服务器以及往socket写数据的机制-ag真人游戏

阅读 : 234

brpc作为一个rpc框架,除了支持作为server当然也需要对调用其他server有良好的支持,这篇将着重介绍一下brpc访问其他server的具体过程以及与之相关的往socket写response的机制。

brpc里发送数据都是调用channel类,关键在于channel::callmethod函数,不管是基于protobuf的正统rpc调用,还是http请求,都是依赖这个函数,定义如下:

注释解释的很清楚,一共有五个参数

  1. method:要调用的远端服务
  2. controller:包含附加数据和各种选项
  3. request:调用服务的请求
  4. response:调用服务后的返回
  5. done:用于回调,如果非null,发请求后会立即返回,调用结束后会通过done-run执行回调函数,属于异步调用,如果为null,则会阻塞直到调用完成。
    其中1、3、4属于基于protobuf的正统rpc调用需要的参数,如果是普通的http请求则不需要,

1.1 基于proto的rpc请求示例



调用的echo函数是pb生成的,如下:

1.2 普通http请求示例:

而在callmethod里面,最核心的就是issuerpc,也就是真正向远端发起调用的部分。

在issuerpc里面,先是拿了一个临时的socket并根据选服务器的策略设置好远端地址之类的,

然后根据连接类型获取合适的socket,注意tmp_socket的使用

如果是单连接直接把tmp_sock传给sending_sock,如果是连接池或短连接,则基于tmp_socket的一些信息去得到相应的连接池socket或者短连接socket,然后释放掉tmp_socket,注意拿到的用于发送的socket里都是注册了epoll in事件的,事件触发后调用 以前讲过的的onnewmessage。

然后就是打包数据发送,首先调用对应的协议打包:

调用socket的write发送:

而在上面说到的write函数里,则是调用startwrite开始写,为了提高性能和吞吐,发消息有点复杂,官方文档是这么描述的:

"消息”指向连接写出的有边界的二进制串,可能是发向上游client的response或下游server的request。多个线程可能会同时向一个fd发送消息,而写fd又是非原子的,所以如何高效率地排队不同线程写出的数据包是这里的关键。baidu-rpc使用一种wait-free mpsc链表来实现这个功能。所有待写出的数据都放在一个单链表节点中,next指针初始化为一个特殊值(socket::writerequest::unconnected)。当一个线程想写出数据前,它先尝试和对应的链表头(socket::_write_head)做原子交换,返回值是交换前的链表头。如果返回值为空,说明它获得了写出的权利,它会在原地写一次数据。否则说明有另一个线程在写,它把next指针指向返回的头,那样正在写的线程之后会看到并写出这块数据。这套方法可以让写竞争是wait-free的,而获得写权利的线程虽然在原理上不是wait-free也不是lock-free,可能会被一个值仍为unconnected的节点锁定(这需要发起写的线程正好在原子交换后,在设置next指针前,仅仅一条指令的时间内被os换出),但在实践中很少出现。在当前的实现中,如果获得写权利的线程一下子无法写出所有的数据,会启动一个keepwrite线程继续写,直到所有的数据都被写出。这套逻辑非常复杂,大致原理如下图,细节请阅读socket.cpp。

startwrite里,如上述,首先是判断是否有其他线程在写,如果有其他线程在写则把自己挂进链表后直接返回,因为正在写的线程写完自己的部分后会接着写链表里的其他部分。否则置req->next为null,开始就地写:

一个fd来了三个request的示意图如下(整个过程第一个request没写完,一直在写第一个request),注意在当前状态下,链表其实是反的,writehead里存的是最近添加的

写数据时,首先connectifnot,该函数有三种返回值,-1 0 和1,0标识原来已连接,-1标识连接失败,1表示正在连接,连接好后会执行keepwriteifconnected回调开始写,里面会调用keepwrite:

注意在connect的时候会用keepwriteifconnected作为回调函数注册epoll out事件,也就如果没连接,则进行连接操作并注册epollout,connectifnot返回1,startwrite会直接返回,epoll out保证一旦连接上会接续写。

如果connectifnot返回0,则继续往下走:

当前线程内就地写一次,如果写完了结束,没写完另起线程开始写:

iswritecomplete用于判断是否写完,当前request没写完和有新来的request都算没写完:

函数里首先判断当前request是否写完,return_when_no_more是没有新request下的返回值,如果当前写完了就是true,也就是整个写完了,否则是false,还是需要启动新的bthread来keepwrite。desired是想要赋给_write_head的值,也就是接下来要写的部分,初始化为null,如果当前的old_head还没写完,那么赋值成old_head。接下来调用_write_head.compare_exchange_strong,也就是如果new_head和_write_head相等,那么把desired放进write_head,返回true,否则赋值把new_head置成_write_head的值,返回false。compare_exchange_strong返回true表明没有新req过来,新的尾部也就是老的头部,返回return_when_no_more,也就是当前节点是否写完。compare_exchange_strong返回失败则表明有新的request替换进了write_head里,new_head被置成新的_write_head的值了,继续往下走。

前面提到了,在写的过程如果有新req过来,链表其实是反的,上图就是翻转的过程,new_head经过前面的过程存的是前面时刻的_write_head,假设req1写完一次后进入了iswritecomplete函数,拿到的new_head是3,old_head是1,随后又来了一个4,那么从3开始翻转到1结束,翻转后如下,4此时并没有什么存在感,本轮不处理:

这种情况下iswritecomplete返回false,所以startwrite函数来到下面部分:

就地写没写完,拿当前req作为参数启动bt执行keepwrite继续写,

keepwrite主要进行如下操作:优先写完当前req。 只有当前req写完了再写下一个,因为前面执行iswritecomplete函数的时候对链表的已知部分进行了翻转,所以req->next就是下一个过来的req。

批量释放掉已完成的request:

找到当前链表的尾部,比如在上面那种情况下尾部就是3,并用这个尾部做参数去调用上面说过的那个iswritecomplete

这里再详细解释下keepwrite调用iswritecomplete的参数的过程,可以结合上面的链表示意图来理解,

keepwrite是用cur_tail和req做参数去调用的,cur_tail作为old_head,这里调用后如果尾部发生了更新则用新值更新到cur_tail里,注意iswritecomplete后,撇开新入的req,原来的req们都是被翻转过的,也就是说cur_tail是新尾部。keepwrite调用iswritecomplete大概可以分为四种情况,重点在于如果有新来的req则会触发针对后来的一批的翻转:
1.req == cur_tail ,有新来的req
原本只有一个要写,但有新来的所以compare_exchange_strong失败,做翻转后新尾更新到cur_tail,返回false,未写完
2.req == cur_tail ,没有新来的req
原本只有一个要写,而且没有新来的,如果这一个没写完返回false,写完了返回true
3.req != cur_tail ,有新来的req
因为!singular_node成立,所以return_when_no_more = false,但有新来的所以compare_exchange_strong失败,做后面的翻转操作,比如上面图的例子,再次调用iswritecomplete后4会成为翻转后的新尾,更新到cur_tail,保证后面的循环回写到。
4.req != cur_tail , 没有新来的req:
因为!singular_node成立,所以return_when_no_more = false,且compare_exchange_strong成功,直接返回return_when_no_more,也就是false。

整个keepwrtie在一个do whilie(1)结构体里,也就是没写完就不断地一直写,顺序就是先写完当前req的,再写翻转过的链表的下一个req,保证顺序。

总的来说这套写的机制比较复杂,对于一次往socket上写数据,总结如下:

  1. 原子操作判断是否有其他线程在写这个socket了,如果有,把这次写和原写队列连接好后直接返回,正在写的在写完自己的数据后会把同一socket上写队列里的其余的数据也写完。
  2. 如果没人写,判断是否已经连接,已连接的话当前线程直接发起写以尽快完成写入,避免上下文切换,而且如果数据量比较大,一次没写完后也会马上返回,起其他线程来继续写入,以避免阻塞,因为当前发起写的就是就是普通线程,brpc是没有专门的io线程的。
  3. 还没连接则发起连接并用“继续写入的函数”作为回调注册epollout后直接返回,让回调完成后续的写操作。

之所以用这套复杂的机制,最主要的目的还是追求效率、避免阻塞,提高吞吐。

网站地图