这篇文章主要是阅读reqwest的源代码, 而底层关于hyper的实现这篇文章浅尝辄止, 仅仅只是过一下hyper请求的大致流程,以后对rust的掌握程度更高之后在看hyper的源代码。
reqwest代码版本: 0.11.22
个人水平有限,如有错误还请指正。
一般来说,一个客户端的实现至少会包含两个部分,协议解析和连接池管理, 这点无论是python的requests还是golang的net/http都是差不多的,但是reqwest并不需要处理这两个部分,因为reqwest是对hyper的封装, 脏活累活都被hyper承包了,reqwest要做的就是提供尽可能优雅和便于使用的用户体验。
如果对reqwest的使用不太熟悉,可以参考我的这篇文章: https://youerning.top/post/reqwest-tutorial
无论是写代码还是阅读代码,我们都需要一个入口,本篇文章的代码入口是以下代码。
use reqwest::Result;
#[tokio::main]
async fn main() -> Result<()>{
let body = reqwest::get("https://youerning.top")
.await?
.text()
.await?;
println!("body: {}", body);
Ok(())
}
pub async fn get<T: IntoUrl>(url: T) -> crate::Result<Response> {
Client::builder().build()?.get(url).send().await
}
在深入代码前,需要先对上面最简单的这部分代码做一些解析, 上面的代码可以分为三个部分
然后我们可以按照这三个部分分别阅读代码。
首先来看看reqwest是怎么构造client的。
pub fn builder() -> ClientBuilder {
ClientBuilder::new()
}
impl ClientBuilder {
pub fn new() -> ClientBuilder {
// 1.
let mut headers: HeaderMap<HeaderValue> = HeaderMap::with_capacity(2);
headers.insert(ACCEPT, HeaderValue::from_static("*/*"));
// 2.
ClientBuilder {
// 内部的config对象保存所有clientBuilder的配置项
config: Config {
error: None,
accepts: Accepts::default(),
// 省略其他配置项
}
}
}
pub fn build(self) -> crate::Result<Client> {
// 3.
let config = self.config;
if let Some(err) = config.error {
return Err(err);
}
// connector和builder的构造过程后文再说明
// 4.
Ok(Client {
inner: Arc::new(ClientRef {
// 省略其他参数
hyper: builder.build(connector),
}),
})
}
代码分解如下:
accept请求体为"*/*"ClientBuilder对象, client的默认参数都可以在这里看到connector和builder,这部分后续展开ClientBuilder的参数都是私有的,所以想要设置它的参数需要调用它提供的公开函数或者说接口,比较常用的有以下方法
connector正如其名字表示的那样,它是一个连接器,负责连接远端,然后返回一个Connection, 这Connection后续可以放在连接池中复用。
根据依赖选项中的启用的特性不同,rust会编译不同的connector, 我们这里只看开启默认特性的代码。
pub fn build(self) -> crate::Result<Client> {
let mut connector = {
// http connector 处理http协议
let mut http = HttpConnector::new_with_resolver(DynResolver::new(resolver.clone()));
#[cfg(feature = "__tls")]
match config.tls {
#[cfg(feature = "default-tls")]
TlsBackend::Default => {
// TlsConnectorBuilder
let mut tls = TlsConnector::builder();
// 设置tls相关属性
tls.danger_accept_invalid_certs(!config.certs_verification);
tls.use_sni(config.tls_sni);
tls.disable_built_in_roots(!config.tls_built_in_root_certs);
}
// 在http connector外部包一层tls,用于处理tls连接
Connector::new_default_tls(
http,
tls,
// 其他参数省略
)?
}
pub(crate) fn new_default_tls<T>() -> crate::Result<Connector> {
// 省略tls connector的构造过程
}
connector的底层构造过程这里不做深入,仅仅粗略的过一遍。
无论是reqwest还是hyper的构造逻辑都是类似的,在请求前通过一个builder对象来设置属性,最后build出最终的目标对象。
pub fn build(self) -> crate::Result<Client> {
let mut builder = hyper::Client::builder();
pub fn builder() -> Builder {
Builder::default()
}
// 省略其他的参数设置
builder.pool_idle_timeout(config.pool_idle_timeout);
builder.pool_max_idle_per_host(config.pool_max_idle_per_host);
connector.set_keepalive(config.tcp_keepalive);
Ok(Client {
inner: Arc::new(ClientRef {
hyper: builder.build(connector),
// 省略其他参数
}
}
}
}
pub fn build<C, B>(&self, connector: C) -> Client<C, B>{
Client {
config: self.client_config,
conn_builder: self.conn_builder.clone(),
connector,
// 创建连接池,用于复用连接
pool: Pool::new(self.pool_config, &self.conn_builder.exec),
}
}
Client::builder().build()方法最终构建出了一个包含hyper Client的Client,至此我们已经有Client了,可以开始构造请求。
pub async fn get<T: IntoUrl>(url: T) -> crate::Result<Response> {
Client::builder().build()?.get(url).send().await
}
pub fn get<U: IntoUrl>(&self, url: U) -> RequestBuilder {
self.request(Method::GET, url)
}
pub fn request<U: IntoUrl>(&self, method: Method, url: U) -> RequestBuilder {
// 默认使用的http协议是http/1.1
let req = url.into_url().map(move |url| Request::new(method, url));
RequestBuilder::new(self.clone(), req)
}
Client::builder().build()?.get(url)最后生成了一个RequestBuilder对象,当我们得到builder对象之后可以通过它暴露的接口设置各种属性,当设置完毕后才发送请求。
RequestBuilder提供了很多有用的接口,这里简单的列举一些比较常用的方法
ClientBuilder::timeout()的设置。当请求构造好之后就可以发送请求了。
pub async fn get<T: IntoUrl>(url: T) -> crate::Result<Response> {
Client::builder().build()?.get(url).send().await
}
pub fn send(self) -> impl Future<Output = Result<Response, crate::Error>> {
// 因为在设置request参数的时候可能出错,所以这里需要判断一下request是否正确
match self.request {
Ok(req) => self.client.execute_request(req),
Err(err) => Pending::new_err(err),
}
}
pub(super) fn execute_request(&self, req: Request) -> Pending {
// 1.
let (method, url, mut headers, body, timeout, version) = req.pieces();
// 2.
let builder = hyper::Request::builder()
.method(method.clone())
.uri(uri)
.version(version);
// 基于http协议版本构造不同的ResponseFuture
let in_flight = match version {
_ => {
// 3.
let mut req = builder
.body(body.into_stream())
.expect("valid request parts");
*req.headers_mut() = headers.clone();
// 4.
ResponseFuture::Default(self.inner.hyper.request(req))
}
};
// 将responseFuture包一层
Pending {
inner: PendingInner::Request(PendingRequest {
method,
url,
headers,
body: reusable,
urls: Vec::new(),
retry_count: 0,
client: self.inner.clone(),
in_flight,
timeout,
}),
}
}
代码分解如下:
reqwest构造的请求分解,便于后续将参数传给底层的hyperhyper的builder对象,这个对象和reqwest的requestbuilder对象类似,用于构造请求hyper的request对象hyper client请求,注意,这里返回的是Future, 所以不会立即执行至此请求已经全部准备好也已经生成了ResponseFuture,就差最后的发送(.await)了。
rust的异步编程在于Future这个trait, 只要实现了这个trait就是Future, 也就可以.await, 而.await的最终调用就是调用实现Future trait的poll方法,所以我们可以查看对应的poll方法来查看其对应的逻辑。
再次之前,我们简单的看下send()方法返回的对象的调用链
Pending -> PendingRequest -> ResponseFuture
impl Future for Pending {
type Output = Result<Response, crate::Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let inner = self.inner();
// 之所以包一层是为了避免被多次调用
// 不知道什么情况下会触发这种情况....
match inner.get_mut() {
// poll PendingRequest
PendingInner::Request(ref mut req) => Pin::new(req).poll(cx),
PendingInner::Error(ref mut err) => Poll::Ready(Err(err
.take()
.expect("Pending error polled more than once"))),
}
}
}
Pending的作用就是简单的封装, 避免多次调用。
什么情况会触发多次调用?
PendingRequest开始真正的调用hyper之前返回的Future对象,并处理不同的响应码。
impl Future for PendingRequest {
type Output = Result<Response, crate::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// 1.
if let Some(delay) = self.as_mut().timeout().as_mut().as_pin_mut() {
if let Poll::Ready(()) = delay.poll(cx) {
return Poll::Ready(Err(
crate::error::request(crate::error::TimedOut).with_url(self.url.clone())
));
}
}
// 2.
loop {
let res = match self.as_mut().in_flight().get_mut() {
// 3.
ResponseFuture::Default(r) => match Pin::new(r).poll(cx) {
// 请求完成但是失败
Poll::Ready(Err(e)) => {
if self.as_mut().retry_error(&e) {
continue;
}
return Poll::Ready(Err(
crate::error::request(e).with_url(self.url.clone())
));
}
// 请求完成并成功
Poll::Ready(Ok(res)) => res,
// 还未完成响应
Poll::Pending => return Poll::Pending,
},
// http3的情况
}
// 重定向的处理
// 4.
// 返回结果
let res = Response::new(
res,
self.url.clone(),
self.client.accepts,
self.timeout.take(),
);
return Poll::Ready(Ok(res));
}
}
代码分解如下:
ResponseFuture的poll方法至此请求已经通过底层的hyper发送,只等其返回结果了。
hyper的代码我就简单的过一下了,因为我也不是看得太懂T_T。
首先回到之前的调用路径, 上文的ResponseFuture就是这里的in_flight,也就是ResponseFuture::Default(self.inner.hyper.request(req))
let in_flight = match version {
// 忽略http3情况的处理代码
_ => {
// 1.
ResponseFuture::Default(self.inner.hyper.request(req))
}
};
pub fn request(&self, mut req: Request<B>) -> ResponseFuture {
// 处理connect方法, 这在代理https时有用
// 2.
let pool_key = match extract_domain(req.uri_mut(), is_http_connect) {
Ok(s) => s,
Err(err) => {
return ResponseFuture::new(future::err(err));
}
};
// 3.
ResponseFuture::new(self.clone().retryably_send_request(req, pool_key))
}
async fn retryably_send_request() -> crate::Result<Response<Body>> {
loop {
req = match self.send_request(req, pool_key.clone()).await {
Ok(resp) => return Ok(resp),
// 省略错误处理代码
}
}
}
}
async fn send_request() -> Result<Response<Body>, ClientError<B>> {
//
let mut pooled = match self.connection_for(pool_key).await {
Ok(pooled) => pooled,
// 省略错误处理代码
};
//
let mut res = match pooled.send_request_retryable(req).await {
// 省略错误处理代码
Ok(res) => res,
};
Ok(res)
}
代码分解如下:
ResponseFuture对象pool_key, pool_key用来区分连接池中的连接retryably_send_request异步函数调用后生成的Future对象包装起来, 也就是说,当responseFuture对象被poll的时候就会调用retryably_send_request函数里的代码reqwest和hyper的代码都用到了构造者(builder)设计模式, 所以两者的请求步骤是差不多的,首先创建一个builder, 基于这个builder暴露的接口可以设置各种参数,最后调用builder方法生成对应的对象,比如Client对象或者Request对象。
个人认为,如果要尽可能的掌握一个编程库,查看其源代码是必不可少的,再者为了学习rust, 多看一下对应语言的编程库也是有必要的。
这篇文章主要是阅读reqwest的源代码, 而底层关于hyper的实现这篇文章浅尝辄止, 仅仅只是过一下hyper请求的大致流程,以后对rust的掌握程度更高之后在看hyper的源代码。
reqwest代码版本: 0.11.22
个人水平有限,如有错误还请指正。
一般来说,一个客户端的实现至少会包含两个部分,协议解析和连接池管理, 这点无论是python的requests还是golang的net/http都是差不多的,但是reqwest并不需要处理这两个部分,因为reqwest是对hyper的封装, 脏活累活都被hyper承包了,reqwest要做的就是提供尽可能优雅和便于使用的用户体验。
如果对reqwest的使用不太熟悉,可以参考我的这篇文章: https://youerning.top/post/reqwest-tutorial
无论是写代码还是阅读代码,我们都需要一个入口,本篇文章的代码入口是以下代码。
use reqwest::Result;
#[tokio::main]
async fn main() -> Result<()>{
let body = reqwest::get("https://youerning.top")
.await?
.text()
.await?;
println!("body: {}", body);
Ok(())
}
pub async fn get<T: IntoUrl>(url: T) -> crate::Result<Response> {
Client::builder().build()?.get(url).send().await
}
在深入代码前,需要先对上面最简单的这部分代码做一些解析, 上面的代码可以分为三个部分
然后我们可以按照这三个部分分别阅读代码。
首先来看看reqwest是怎么构造client的。
pub fn builder() -> ClientBuilder {
ClientBuilder::new()
}
impl ClientBuilder {
pub fn new() -> ClientBuilder {
// 1.
let mut headers: HeaderMap<HeaderValue> = HeaderMap::with_capacity(2);
headers.insert(ACCEPT, HeaderValue::from_static("*/*"));
// 2.
ClientBuilder {
// 内部的config对象保存所有clientBuilder的配置项
config: Config {
error: None,
accepts: Accepts::default(),
// 省略其他配置项
}
}
}
pub fn build(self) -> crate::Result<Client> {
// 3.
let config = self.config;
if let Some(err) = config.error {
return Err(err);
}
// connector和builder的构造过程后文再说明
// 4.
Ok(Client {
inner: Arc::new(ClientRef {
// 省略其他参数
hyper: builder.build(connector),
}),
})
}
代码分解如下:
accept请求体为"*/*"ClientBuilder对象, client的默认参数都可以在这里看到connector和builder,这部分后续展开ClientBuilder的参数都是私有的,所以想要设置它的参数需要调用它提供的公开函数或者说接口,比较常用的有以下方法
connector正如其名字表示的那样,它是一个连接器,负责连接远端,然后返回一个Connection, 这Connection后续可以放在连接池中复用。
根据依赖选项中的启用的特性不同,rust会编译不同的connector, 我们这里只看开启默认特性的代码。
pub fn build(self) -> crate::Result<Client> {
let mut connector = {
// http connector 处理http协议
let mut http = HttpConnector::new_with_resolver(DynResolver::new(resolver.clone()));
#[cfg(feature = "__tls")]
match config.tls {
#[cfg(feature = "default-tls")]
TlsBackend::Default => {
// TlsConnectorBuilder
let mut tls = TlsConnector::builder();
// 设置tls相关属性
tls.danger_accept_invalid_certs(!config.certs_verification);
tls.use_sni(config.tls_sni);
tls.disable_built_in_roots(!config.tls_built_in_root_certs);
}
// 在http connector外部包一层tls,用于处理tls连接
Connector::new_default_tls(
http,
tls,
// 其他参数省略
)?
}
pub(crate) fn new_default_tls<T>() -> crate::Result<Connector> {
// 省略tls connector的构造过程
}
connector的底层构造过程这里不做深入,仅仅粗略的过一遍。
无论是reqwest还是hyper的构造逻辑都是类似的,在请求前通过一个builder对象来设置属性,最后build出最终的目标对象。
pub fn build(self) -> crate::Result<Client> {
let mut builder = hyper::Client::builder();
pub fn builder() -> Builder {
Builder::default()
}
// 省略其他的参数设置
builder.pool_idle_timeout(config.pool_idle_timeout);
builder.pool_max_idle_per_host(config.pool_max_idle_per_host);
connector.set_keepalive(config.tcp_keepalive);
Ok(Client {
inner: Arc::new(ClientRef {
hyper: builder.build(connector),
// 省略其他参数
}
}
}
}
pub fn build<C, B>(&self, connector: C) -> Client<C, B>{
Client {
config: self.client_config,
conn_builder: self.conn_builder.clone(),
connector,
// 创建连接池,用于复用连接
pool: Pool::new(self.pool_config, &self.conn_builder.exec),
}
}
Client::builder().build()方法最终构建出了一个包含hyper Client的Client,至此我们已经有Client了,可以开始构造请求。
pub async fn get<T: IntoUrl>(url: T) -> crate::Result<Response> {
Client::builder().build()?.get(url).send().await
}
pub fn get<U: IntoUrl>(&self, url: U) -> RequestBuilder {
self.request(Method::GET, url)
}
pub fn request<U: IntoUrl>(&self, method: Method, url: U) -> RequestBuilder {
// 默认使用的http协议是http/1.1
let req = url.into_url().map(move |url| Request::new(method, url));
RequestBuilder::new(self.clone(), req)
}
Client::builder().build()?.get(url)最后生成了一个RequestBuilder对象,当我们得到builder对象之后可以通过它暴露的接口设置各种属性,当设置完毕后才发送请求。
RequestBuilder提供了很多有用的接口,这里简单的列举一些比较常用的方法
ClientBuilder::timeout()的设置。当请求构造好之后就可以发送请求了。
pub async fn get<T: IntoUrl>(url: T) -> crate::Result<Response> {
Client::builder().build()?.get(url).send().await
}
pub fn send(self) -> impl Future<Output = Result<Response, crate::Error>> {
// 因为在设置request参数的时候可能出错,所以这里需要判断一下request是否正确
match self.request {
Ok(req) => self.client.execute_request(req),
Err(err) => Pending::new_err(err),
}
}
pub(super) fn execute_request(&self, req: Request) -> Pending {
// 1.
let (method, url, mut headers, body, timeout, version) = req.pieces();
// 2.
let builder = hyper::Request::builder()
.method(method.clone())
.uri(uri)
.version(version);
// 基于http协议版本构造不同的ResponseFuture
let in_flight = match version {
_ => {
// 3.
let mut req = builder
.body(body.into_stream())
.expect("valid request parts");
*req.headers_mut() = headers.clone();
// 4.
ResponseFuture::Default(self.inner.hyper.request(req))
}
};
// 将responseFuture包一层
Pending {
inner: PendingInner::Request(PendingRequest {
method,
url,
headers,
body: reusable,
urls: Vec::new(),
retry_count: 0,
client: self.inner.clone(),
in_flight,
timeout,
}),
}
}
代码分解如下:
reqwest构造的请求分解,便于后续将参数传给底层的hyperhyper的builder对象,这个对象和reqwest的requestbuilder对象类似,用于构造请求hyper的request对象hyper client请求,注意,这里返回的是Future, 所以不会立即执行至此请求已经全部准备好也已经生成了ResponseFuture,就差最后的发送(.await)了。
rust的异步编程在于Future这个trait, 只要实现了这个trait就是Future, 也就可以.await, 而.await的最终调用就是调用实现Future trait的poll方法,所以我们可以查看对应的poll方法来查看其对应的逻辑。
再次之前,我们简单的看下send()方法返回的对象的调用链
Pending -> PendingRequest -> ResponseFuture
impl Future for Pending {
type Output = Result<Response, crate::Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let inner = self.inner();
// 之所以包一层是为了避免被多次调用
// 不知道什么情况下会触发这种情况....
match inner.get_mut() {
// poll PendingRequest
PendingInner::Request(ref mut req) => Pin::new(req).poll(cx),
PendingInner::Error(ref mut err) => Poll::Ready(Err(err
.take()
.expect("Pending error polled more than once"))),
}
}
}
Pending的作用就是简单的封装, 避免多次调用。
什么情况会触发多次调用?
PendingRequest开始真正的调用hyper之前返回的Future对象,并处理不同的响应码。
impl Future for PendingRequest {
type Output = Result<Response, crate::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// 1.
if let Some(delay) = self.as_mut().timeout().as_mut().as_pin_mut() {
if let Poll::Ready(()) = delay.poll(cx) {
return Poll::Ready(Err(
crate::error::request(crate::error::TimedOut).with_url(self.url.clone())
));
}
}
// 2.
loop {
let res = match self.as_mut().in_flight().get_mut() {
// 3.
ResponseFuture::Default(r) => match Pin::new(r).poll(cx) {
// 请求完成但是失败
Poll::Ready(Err(e)) => {
if self.as_mut().retry_error(&e) {
continue;
}
return Poll::Ready(Err(
crate::error::request(e).with_url(self.url.clone())
));
}
// 请求完成并成功
Poll::Ready(Ok(res)) => res,
// 还未完成响应
Poll::Pending => return Poll::Pending,
},
// http3的情况
}
// 重定向的处理
// 4.
// 返回结果
let res = Response::new(
res,
self.url.clone(),
self.client.accepts,
self.timeout.take(),
);
return Poll::Ready(Ok(res));
}
}
代码分解如下:
ResponseFuture的poll方法至此请求已经通过底层的hyper发送,只等其返回结果了。
hyper的代码我就简单的过一下了,因为我也不是看得太懂T_T。
首先回到之前的调用路径, 上文的ResponseFuture就是这里的in_flight,也就是ResponseFuture::Default(self.inner.hyper.request(req))
let in_flight = match version {
// 忽略http3情况的处理代码
_ => {
// 1.
ResponseFuture::Default(self.inner.hyper.request(req))
}
};
pub fn request(&self, mut req: Request<B>) -> ResponseFuture {
// 处理connect方法, 这在代理https时有用
// 2.
let pool_key = match extract_domain(req.uri_mut(), is_http_connect) {
Ok(s) => s,
Err(err) => {
return ResponseFuture::new(future::err(err));
}
};
// 3.
ResponseFuture::new(self.clone().retryably_send_request(req, pool_key))
}
async fn retryably_send_request() -> crate::Result<Response<Body>> {
loop {
req = match self.send_request(req, pool_key.clone()).await {
Ok(resp) => return Ok(resp),
// 省略错误处理代码
}
}
}
}
async fn send_request() -> Result<Response<Body>, ClientError<B>> {
//
let mut pooled = match self.connection_for(pool_key).await {
Ok(pooled) => pooled,
// 省略错误处理代码
};
//
let mut res = match pooled.send_request_retryable(req).await {
// 省略错误处理代码
Ok(res) => res,
};
Ok(res)
}
代码分解如下:
ResponseFuture对象pool_key, pool_key用来区分连接池中的连接retryably_send_request异步函数调用后生成的Future对象包装起来, 也就是说,当responseFuture对象被poll的时候就会调用retryably_send_request函数里的代码reqwest和hyper的代码都用到了构造者(builder)设计模式, 所以两者的请求步骤是差不多的,首先创建一个builder, 基于这个builder暴露的接口可以设置各种参数,最后调用builder方法生成对应的对象,比如Client对象或者Request对象。
个人认为,如果要尽可能的掌握一个编程库,查看其源代码是必不可少的,再者为了学习rust, 多看一下对应语言的编程库也是有必要的。