菜鸟笔记
提升您的技术认知

brpc 笔记

bthread(一) 前言bthread(二) 线程模型及bthreadbthread(三) bthread数据结构bthread(四) bthread用户接口和代码执行路径bthread(五) 无锁队列rq的代码实现bthread(六) 小结brpc的精华bthread源码剖析brpc介绍、编译与使用brpc源码解析(一)—— rpc服务添加以及服务器启动主要过程brpc源码解析(二)—— brpc收到请求的处理过程brpc源码解析(三)—— 请求其他服务器以及往socket写数据的机制brpc源码解析(四)—— bthread机制brpc源码解析(五)—— 基础类resource pool详解brpc源码解析(六)—— 基础类socket详解brpc源码解析(七)—— worker基于parkinglot的bthread调度brpc源码解析(八)—— 基础类eventdispatcher详解brpc源码解析(九)—— 基础类workstealingqueue详解brpc源码解析(十)—— 核心组件bvar详解(1)简介和整体架构brpc源码解析(十一)—— reducer类和adder类解析brpc源码解析(十二)—— 核心组件bvar详解 agentgroup类详解brpc源码解析(十三)—— 核心组件bvar详解(4)combiner详解brpc源码解析(十四)—— 核心组件bvar详解 sampler详解brpc源码解析(十五)—— bthread栈创建和切换详解brpc源码解析(十六)—— 作为client的连接建立和处理详解brpc源码解析(十七)—— bthread上的类futex同步组件butex详解brpc源码解析(十八)—— mpsc队列executionqueue详解brpc源码解析(十九)—— 双buffer数据结构doublybuffereddata详解brpc源码解析(二十)—— 用于访问下游的channel类详解

brpc源码解析(十六)—— 作为client的连接建立和处理详解-ag真人游戏

阅读 : 201

以前写过文章聊过brpc访问下游和发送数据的机制,但和下游连接的相关处理细节没有过多涉及,这里来聊下brpc client对于连接的相关处理。

为了避免歧义,这里再贴一下官方文档对于socket的定义:
和fd相关的数据均在socket中,是rpc最复杂的结构之一,这个结构的独特之处在于用64位的socketid指代socket对象以方便在多线程环境下使用fd。
为了和我们常见的socket区分,本文中socket均指代brpc的socket,传统的网络socket概念均用套接字指代。

首先,brpc支持多种协议,每种协议有其支持的连接方式,基本的连接方式分别为单连接,连接池和短连接,定义在options.proto里面。

在使用中会利用按位与和或来节省开销,对于支持多连接方式的协议,有两种复合的连接方式供使用:

注册协议的时候会确定协议支持的连接方式:

在访问下游的时候,如果不手动指定,channel 的initchanneloptions函数会在协议支持的连接方式里帮我们选择最优的。如下:

可以看到,考虑到性能和开销,假设协议支持,会优先选择单连接,其次是连接池,然后才是短连接。

3.1 获取指代具体下游服务的tmp_socket

对于一次下游访问,会调用以前介绍过的issuerpc函数进行实际的rpc访问,首先要确定的是要访问的具体的下游,这里分两类,一是单个服务实例的下游,另一个则是通过nameservice进行访问的下游,实际生产环境中绝大部分都是后者,因为单实例的可靠性和容量都没有保证。

首先会声明一个socketuniqueptr类型的tmp_sock,之所以叫tmp_sock我理解是因为这个socket仅仅是一个用于获取实际发送用的socket的临时socket,而不是真正发送使用,虽然在单连接场景下二者是一致的。

然后会判断是不是single server,如果是,下游的serverid只有一个,直接根据对应server的socketid 去获取对应的socket,否则则通过loadbalancer去选取具体的下游服务器,这个步骤之后,tmp_sock里面就是实际要访问的下游服务器对应的socketid了。

3.2 根据tmp_socket获取用于发送数据的socket

接下来就是根据连接方式通过tmp_sock获取真正用于发送数据的socket,如下:

对于单连接,很简单,因为只会有一个连接,直接把tmp_sock赋值给实际用于发送的sending_sock即可。

对于连接池,我们上面拿到的socket是main_socket,会对应一个池对象,由这个socket发起的访问均是从这个池子里取,如下:

对于短连接,则直接根据远端地址新建一个用于发送的socket,如下:

3.3 连接以准备发送

随后调用sending_sock的write进行数据写入,对于连接不存在时的访问,层层调用过后会进入到socket的connectifnot函数,具体过程以前介绍过,不再赘述。


这里_conn是用于自定义的连接建立,目前只有stream方式用到了,其余的都是走的else分支,也就是调用
int socket::connect(const timespec* abstime, int (on_connect)(int, int, void), void* data) 函数。

这里面也就是真正的发起连接的地方,会调用系统函数进行连接,核心部分如下:

先调用socket系统函数创建套接字,sock_stream表明是tcp的套接字,并且为了异步设置成非阻塞:

用当前socket对象保存的远端地址调用系统函数connect进行连接,这里的remote_side也就是上述获取实际发送用的socket时确定下来的。因为是非阻塞套接字所以会立即返回,根据返回值判断是否成功,返回值为0表示连接成功,einprogress表示正在连接,在非阻塞模式下这两种情况都认为是正常的,其余情况则为失败直接返回:

随后则根据是否传入了非null的on_connect回调函数来进行下一步操作,如果有,说明是异步调用,则需要添加epoll out事件。先是新建了一个epolloutrequest类型的req变量来保存fd、要发送的数据和回调函数等信息,当前场景下的回调函数是keepwriteifconnected,也就是连接成功后就写入,epolloutrequest继承自socketuser,一个给即将回收的socket用的closure,会定期进行健康检查之类的,这里不展开讨论。随后将req赋值给options中的一个变量并create socket,用于添加epollout事件。如下:

然后根据fd获取对应的dispatcher,并调用其addepollout函数,第一个参数connect_id是用于事件发生后获取对应的socket进行后续处理,第二个参数sockfd则是epoll真正要监听的fd,关于dispatcher相关的内容可以参考以前的博文。如下:

如果没有非空的on_connect,则需要阻塞等待:

waitepollout里面仍然是添加了epollout事件的监听,区别在于随后通过butex_wait阻塞等待事件发生,butex可以理解为bthread上的futex,利用的是_epollout_butex变量,事件发生后调用的handleepollout函数里会对这个变量执行原子加并执行wakeup,阻塞的bthread从而被唤醒。

而当一次访问下游的调用结束后,会根据连接的类型对其进行后续处理,主要逻辑如下:

如果是单连接的流式请求,出错且使用的socket和main socket 不一致, setfailed方便重用。

如果是连接池,并且正常发送了数据,则根据是否持续读取下游返回的数据的选项判断如何将socket归还到连接池,对于持续读取的socket,会判断是否读完并进行相应处理,也就是void socket::onprogressivereadcompleted()函数,这个函数在首次调用的时候会将_controller_released_socket置为true,并在后面协议的parse函数里再次被调用从而执行归还或者setfailed的逻辑,如下:

对于短连接,和连接池处理方式类似,和连接池的区别在于一个是return,一个是setfailed,如下:

网站地图