以前写过文章聊过brpc访问下游和发送数据的机制,但和下游连接的相关处理细节没有过多涉及,这里来聊下brpc client对于连接的相关处理。
为了避免歧义,这里再贴一下官方文档对于socket的定义:
和fd相关的数据均在socket中,是rpc最复杂的结构之一,这个结构的独特之处在于用64位的socketid指代socket对象以方便在多线程环境下使用fd。
为了和我们常见的socket区分,本文中socket均指代brpc的socket,传统的网络socket概念均用套接字指代。
首先,brpc支持多种协议,每种协议有其支持的连接方式,基本的连接方式分别为单连接,连接池和短连接,定义在options.proto里面。
在使用中会利用按位与和或来节省开销,对于支持多连接方式的协议,有两种复合的连接方式供使用:
注册协议的时候会确定协议支持的连接方式:
在访问下游的时候,如果不手动指定,channel 的initchanneloptions函数会在协议支持的连接方式里帮我们选择最优的。如下:
可以看到,考虑到性能和开销,假设协议支持,会优先选择单连接,其次是连接池,然后才是短连接。
3.1 获取指代具体下游服务的tmp_socket
对于一次下游访问,会调用以前介绍过的issuerpc函数进行实际的rpc访问,首先要确定的是要访问的具体的下游,这里分两类,一是单个服务实例的下游,另一个则是通过nameservice进行访问的下游,实际生产环境中绝大部分都是后者,因为单实例的可靠性和容量都没有保证。
首先会声明一个socketuniqueptr类型的tmp_sock,之所以叫tmp_sock我理解是因为这个socket仅仅是一个用于获取实际发送用的socket的临时socket,而不是真正发送使用,虽然在单连接场景下二者是一致的。
然后会判断是不是single server,如果是,下游的serverid只有一个,直接根据对应server的socketid 去获取对应的socket,否则则通过loadbalancer去选取具体的下游服务器,这个步骤之后,tmp_sock里面就是实际要访问的下游服务器对应的socketid了。
3.2 根据tmp_socket获取用于发送数据的socket
接下来就是根据连接方式通过tmp_sock获取真正用于发送数据的socket,如下:
对于单连接,很简单,因为只会有一个连接,直接把tmp_sock赋值给实际用于发送的sending_sock即可。
对于连接池,我们上面拿到的socket是main_socket,会对应一个池对象,由这个socket发起的访问均是从这个池子里取,如下:
对于短连接,则直接根据远端地址新建一个用于发送的socket,如下:
3.3 连接以准备发送
随后调用sending_sock的write进行数据写入,对于连接不存在时的访问,层层调用过后会进入到socket的connectifnot函数,具体过程以前介绍过,不再赘述。
这里_conn是用于自定义的连接建立,目前只有stream方式用到了,其余的都是走的else分支,也就是调用
int socket::connect(const timespec* abstime, int (on_connect)(int, int, void), void* data) 函数。
这里面也就是真正的发起连接的地方,会调用系统函数进行连接,核心部分如下:
先调用socket系统函数创建套接字,sock_stream表明是tcp的套接字,并且为了异步设置成非阻塞:
用当前socket对象保存的远端地址调用系统函数connect进行连接,这里的remote_side也就是上述获取实际发送用的socket时确定下来的。因为是非阻塞套接字所以会立即返回,根据返回值判断是否成功,返回值为0表示连接成功,einprogress表示正在连接,在非阻塞模式下这两种情况都认为是正常的,其余情况则为失败直接返回:
随后则根据是否传入了非null的on_connect回调函数来进行下一步操作,如果有,说明是异步调用,则需要添加epoll out事件。先是新建了一个epolloutrequest类型的req变量来保存fd、要发送的数据和回调函数等信息,当前场景下的回调函数是keepwriteifconnected,也就是连接成功后就写入,epolloutrequest继承自socketuser,一个给即将回收的socket用的closure,会定期进行健康检查之类的,这里不展开讨论。随后将req赋值给options中的一个变量并create socket,用于添加epollout事件。如下:
然后根据fd获取对应的dispatcher,并调用其addepollout函数,第一个参数connect_id是用于事件发生后获取对应的socket进行后续处理,第二个参数sockfd则是epoll真正要监听的fd,关于dispatcher相关的内容可以参考以前的博文。如下:
如果没有非空的on_connect,则需要阻塞等待:
waitepollout里面仍然是添加了epollout事件的监听,区别在于随后通过butex_wait阻塞等待事件发生,butex可以理解为bthread上的futex,利用的是_epollout_butex变量,事件发生后调用的handleepollout函数里会对这个变量执行原子加并执行wakeup,阻塞的bthread从而被唤醒。
而当一次访问下游的调用结束后,会根据连接的类型对其进行后续处理,主要逻辑如下:
如果是单连接的流式请求,出错且使用的socket和main socket 不一致, setfailed方便重用。
如果是连接池,并且正常发送了数据,则根据是否持续读取下游返回的数据的选项判断如何将socket归还到连接池,对于持续读取的socket,会判断是否读完并进行相应处理,也就是void socket::onprogressivereadcompleted()函数,这个函数在首次调用的时候会将_controller_released_socket置为true,并在后面协议的parse函数里再次被调用从而执行归还或者setfailed的逻辑,如下:
对于短连接,和连接池处理方式类似,和连接池的区别在于一个是return,一个是setfailed,如下: