现在我们有了想要连接的 Peer,接下来要做的就是连接的建立了。在 Pingora 中,目前一共实现了三种协议:

  • TCP(L4)
  • SSL
  • HTTP

我们也会按照这个顺序依次介绍。这一篇中我们先来看看这三种协议中最底层的:TCP(L4) 连接。

ToC

目录结构

Pingora 中,协议相关的内容都定义在 pingora-core::protocol 下而和协议对应的连接则定义在 pingora-core::connector 下。以 L4 连接为例,我们主要研究的文件结构就是:

  • connectors/
    • l4.rs
  • protocols/l4/
    • ext.rs
    • socket.rs
    • stream.rs

l4connect 方法接收 Peerbind_toSocketAddr 两个参数,最终返回 L4Stream

connect 同时支持 InetUnix Domain Socket。我们从 Inet 下手,来理一下整个连接的基本流程。Unix Domain Socket 部分的实现大同小异,而且整体来看比 Inet 简单,这里就不展开赘述了。

连接部分的代码如下所示:

/// Establish a connection (l4) to the given peer using its settings and an optional bind address.

pub async fn connect<P>(peer: &P, bind_to: Option<InetSocketAddr>) -> Result<Stream>

where

P: Peer + Send + Sync,

{

5 collapsed lines

if peer.get_proxy().is_some() {

return proxy_connect(peer)

.await

.err_context(|| format!("Fail to establish CONNECT proxy: {}", peer));

}

let peer_addr = peer.address();

let mut stream: Stream = match peer_addr {

SocketAddr::Inet(addr) => {

let connect_future = tcp_connect(addr, bind_to.as_ref());

let conn_res = match peer.connection_timeout() {

Some(t) => pingora_timeout::timeout(t, connect_future)

.await

.explain_err(ConnectTimedout, |_| {

format!("timeout {t:?} connecting to server {peer}")

})?,

None => connect_future.await,

};

match conn_res {

Ok(socket) => {

debug!("connected to new server: {}", peer.address());

if let Some(ka) = peer.tcp_keepalive() {

debug!("Setting tcp keepalive");

set_tcp_keepalive(&socket, ka)?;

}

Ok(socket.into())

}

Err(e) => {

let c = format!("Fail to connect to {peer}");

match e.etype() {

SocketError | BindError => Error::e_because(InternalError, c, e),

_ => Err(e.more_context(c)),

}

}

}

}

SocketAddr::Unix(addr) => {

26 collapsed lines

let connect_future = connect_uds(

addr.as_pathname()

.expect("non-pathname unix sockets not supported as peer"),

);

let conn_res = match peer.connection_timeout() {

Some(t) => pingora_timeout::timeout(t, connect_future)

.await

.explain_err(ConnectTimedout, |_| {

format!("timeout {t:?} connecting to server {peer}")

})?,

None => connect_future.await,

};

match conn_res {

Ok(socket) => {

debug!("connected to new server: {}", peer.address());

// no SO_KEEPALIVE for UDS

Ok(socket.into())

}

Err(e) => {

let c = format!("Fail to connect to {peer}");

match e.etype() {

SocketError | BindError => Error::e_because(InternalError, c, e),

_ => Err(e.more_context(c)),

}

}

}

}

}?;

let tracer = peer.get_tracer();

if let Some(t) = tracer {

t.0.on_connected();

stream.tracer = Some(t);

}

stream.set_nodelay()?;

let digest = SocketDigest::from_raw_fd(stream.as_raw_fd());

digest

.peer_addr

.set(Some(peer_addr.clone()))

.expect("newly created OnceCell must be empty");

stream.set_socket_digest(digest);

Ok(stream)

}

跟随上文代码高亮的顺序,可以基本整理出 l4 连接的建立逻辑,如下:

  1. 首先是连接的建立本身。这里调用了 tcp_connect,也就是 l4::ext::connect 进行了连接。
  2. 然后是 tcp_keepalive。如果 Peer 需要设置这一选项,则通过 l4::ext::set_tcp_keepalive 进行设置。
  3. 最后是 set_nodelay。由于 Pingora 通过 Tokio 在用户态实现了写缓存,因此这里需要将 Nagle's algorithm 禁用。

tcp_connect

l4::ext::connect 的连接过程基本和 Tokio 中的连接相似。区别是增加了 bind_to 的实现。

/*

* this extension is needed until the following are addressed

* https://github.com/tokio-rs/tokio/issues/1543

* https://github.com/tokio-rs/mio/issues/1257

* https://github.com/tokio-rs/mio/issues/1211

*/

/// connect() to the given address while optionally bind to the specific source address

///

/// `IP_BIND_ADDRESS_NO_PORT` is used.

pub async fn connect(addr: &SocketAddr, bind_to: Option<&SocketAddr>) -> Result<TcpStream> {

let socket = if addr.is_ipv4() {

TcpSocket::new_v4()

} else {

TcpSocket::new_v6()

}

.or_err(SocketError, "failed to create socket")?;

if cfg!(target_os = "linux") {

ip_bind_addr_no_port(socket.as_raw_fd(), true)

.or_err(SocketError, "failed to set socket opts")?;

if let Some(baddr) = bind_to {

socket

.bind(*baddr)

.or_err_with(BindError, || format!("failed to bind to socket {}", *baddr))?;

};

}

// TODO: add support for bind on other platforms

socket

.connect(*addr)

.await

.map_err(|e| wrap_os_connect_error(e, format!("Fail to connect to {}", *addr)))

}

首先在 [1]Pingora 调用了 ip_bind_addr_no_port,要求操作系统不自动分配端口。

#[cfg(target_os = "linux")]

fn ip_bind_addr_no_port(fd: RawFd, val: bool) -> io::Result<()> {

const IP_BIND_ADDRESS_NO_PORT: i32 = 24;

set_opt(fd, libc::IPPROTO_IP, IP_BIND_ADDRESS_NO_PORT, val as c_int)

}

#[cfg(not(target_os = "linux"))]

fn ip_bind_addr_no_port(_fd: RawFd, _val: bool) -> io::Result<()> {

Ok(())

}

Pingora 目前只在 Linux 上正确实现了 ip_bind_addr_no_port,其他系统下只有 dummy 实现。对于 LinuxPingora 通过 setsockopt 设置了 IP_BIND_ADDRESS_NO_PORT 属性。

在确保 OS 没有自动分配 port 之后,Pingora[2]bind 手动绑定了 bind_to 所要求的 SocketAddr

Stream

l4::ext::connect 成功后,我们得到的是一个 tokio::net::tcp::TcpStream,而 connect 需要返回的却是 Stream 类型。将 TcpStream 转化为 Stream 则是 protocol::l4::Stream 中定义的了。

Pingora 定义的 l4::Stream 是一个带读写缓存的 BufStream,其中写缓存可以选择关闭。如下代码所示:

/// A concrete type for transport layer connection + extra fields for logging

#[derive(Debug)]

pub struct Stream {

stream: BufStream<RawStream>,

buffer_write: bool,

proxy_digest: Option<Arc<ProxyDigest>>,

socket_digest: Option<Arc<SocketDigest>>,

/// When this connection is established

pub established_ts: SystemTime,

/// The distributed tracing object for this stream

pub tracer: Option<Tracer>,

}

impl Stream {

/// set TCP nodelay for this connection if `self` is TCP

pub fn set_nodelay(&mut self) -> Result<()> {

if let RawStream::Tcp(s) = &self.stream.get_ref() {

s.set_nodelay(true)

.or_err(ConnectError, "failed to set_nodelay")?;

}

Ok(())

}

}

TcpStream -> Stream

Pingora 实现了 From,因此 TcpStreamUnixStream 可以通过 into() 转化成 Stream

// Large read buffering helps reducing syscalls with little trade-off

// Ssl layer always does "small" reads in 16k (TLS record size) so L4 read buffer helps a lot.

const BUF_READ_SIZE: usize = 64 * 1024;

// Small write buf to match MSS. Too large write buf delays real time communication.

// This buffering effectively implements something similar to Nagle's algorithm.

// The benefit is that user space can control when to flush, where Nagle's can't be controlled.

// And userspace buffering reduce both syscalls and small packets.

const BUF_WRITE_SIZE: usize = 1460;

// NOTE: with writer buffering, users need to call flush() to make sure the data is actually

// sent. Otherwise data could be stuck in the buffer forever or get lost when stream is closed.

25 collapsed lines

/// A concrete type for transport layer connection + extra fields for logging

#[derive(Debug)]

pub struct Stream {

stream: BufStream<RawStream>,

buffer_write: bool,

proxy_digest: Option<Arc<ProxyDigest>>,

socket_digest: Option<Arc<SocketDigest>>,

/// When this connection is established

pub established_ts: SystemTime,

/// The distributed tracing object for this stream

pub tracer: Option<Tracer>,

}

impl Stream {

/// set TCP nodelay for this connection if `self` is TCP

pub fn set_nodelay(&mut self) -> Result<()> {

if let RawStream::Tcp(s) = &self.stream.get_ref() {

s.set_nodelay(true)

.or_err(ConnectError, "failed to set_nodelay")?;

}

Ok(())

}

}

impl From<TcpStream> for Stream {

fn from(s: TcpStream) -> Self {

Stream {

stream: BufStream::with_capacity(BUF_READ_SIZE, BUF_WRITE_SIZE, RawStream::Tcp(s)),

buffer_write: true,

established_ts: SystemTime::now(),

proxy_digest: None,

socket_digest: None,

tracer: None,

}

}

}

impl From<UnixStream> for Stream {

10 collapsed lines

fn from(s: UnixStream) -> Self {

Stream {

stream: BufStream::with_capacity(BUF_READ_SIZE, BUF_WRITE_SIZE, RawStream::Unix(s)),

buffer_write: true,

established_ts: SystemTime::now(),

proxy_digest: None,

socket_digest: None,

tracer: None,

}

}

}

这里比较关键的是两个常量的选值:

  1. BUF_READ_SIZE: 这个值是 64k。由于 TLS record size 是 16k,因此这里通过 64k 对齐,减少 syscall 的数量。
  2. BUF_WRITE_SIZE: 这个值是 1460,和 MSS 匹配,通过 Tokio 在用户态的写缓存实现,替换掉内核态的 Nagle's algorithm

Stream::drop

Streamtrait Drop 实现也非常有趣。我们看看:

impl Drop for Stream {

fn drop(&mut self) {

if let Some(t) = self.tracer.as_ref() {

t.0.on_disconnected();

}

/* use nodelay/local_addr function to detect socket status */

let ret = match &self.stream.get_ref() {

RawStream::Tcp(s) => s.nodelay().err(),

RawStream::Unix(s) => s.local_addr().err(),

};

if let Some(e) = ret {

match e.kind() {

tokio::io::ErrorKind::Other => {

if let Some(ecode) = e.raw_os_error() {

if ecode == 9 {

// Or we could panic here

error!("Crit: socket {:?} is being double closed", self.stream);

}

}

}

_ => {

debug!("Socket is already broken {:?}", e);

}

}

} else {

// try flush the write buffer. We use now_or_never() because

// 1. Drop cannot be async

// 2. write should usually be ready, unless the buf is full.

let _ = self.flush().now_or_never();

}

debug!("Dropping socket {:?}", self.stream);

}

}

[1]Pingora 分别通过 nodelaylocal_addr 尝试获取 Stream 的状态。而在 [2],当 Stream 仍然可写时,则调用了自身的 flush(),并且通过 now_or_never() 尝试立即将缓存清空。

Stream::poll_write

另一个比较有趣的点是 Pingora 实现绕过写缓存的方式:

impl AsyncWrite for Stream {

fn poll_write(

mut self: Pin<&mut Self>,

cx: &mut Context,

buf: &[u8],

) -> Poll<io::Result<usize>> {

if self.buffer_write {

Pin::new(&mut self.stream).poll_write(cx, buf)

} else {

Pin::new(&mut self.stream.get_mut()).poll_write(cx, buf)

}

}

28 collapsed lines

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {

Pin::new(&mut self.stream).poll_flush(cx)

}

fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {

Pin::new(&mut self.stream).poll_shutdown(cx)

}

fn poll_write_vectored(

mut self: Pin<&mut Self>,

cx: &mut Context<'_>,

bufs: &[std::io::IoSlice<'_>],

) -> Poll<io::Result<usize>> {

if self.buffer_write {

Pin::new(&mut self.stream).poll_write_vectored(cx, bufs)

} else {

Pin::new(&mut self.stream.get_mut()).poll_write_vectored(cx, bufs)

}

}

fn is_write_vectored(&self) -> bool {

if self.buffer_write {

self.stream.is_write_vectored() // it is true

} else {

self.stream.get_ref().is_write_vectored()

}

}

}

在需要 buffer_write 时,还是直接通过 self.steram 调用 poll_write 即可。但对于不希望使用缓存的场景,则是通过 get_mut() 直接拿到了 underlying I/O object。而向 underlying stream 的写入自然就跳过 BufStream 的缓存了。

基于 HTTP 隧道的 L4 代理

在上文描述 l4::connect 的实现中,我们省略了 proxy_connect 的部分。代理连接部分代码如下所示:

/// Establish a connection (l4) to the given peer using its settings and an optional bind address.

pub async fn connect<P>(peer: &P, bind_to: Option<InetSocketAddr>) -> Result<Stream>

where

P: Peer + Send + Sync,

{

if peer.get_proxy().is_some() {

return proxy_connect(peer)

.await

.err_context(|| format!("Fail to establish CONNECT proxy: {}", peer));

}

let peer_addr = peer.address();

let mut stream: Stream = match peer_addr {

Pingora 中,L4 的代理是通过 HTTP 隧道实现的。简单来说,就是 Proxy Server 通过 Unix Domain Socket 启动一个 HTTP 1.1HTTP Proxy,然后 Proxy Client(也就是 Pingora)通过 CONNECT 请求连接到指定的远端 ip:port。完整代码如下所示:

async fn proxy_connect<P: Peer>(peer: &P) -> Result<Stream> {

// safe to unwrap

let proxy = peer.get_proxy().unwrap();

let options = peer.get_peer_options().unwrap();

// combine required and optional headers

let mut headers = proxy

.headers

.iter()

.chain(options.extra_proxy_headers.iter());

// not likely to timeout during connect() to UDS

let stream: Box<Stream> = Box::new(

connect_uds(&proxy.next_hop)

.await

.or_err_with(ConnectError, || {

format!("CONNECT proxy connect() error to {:?}", &proxy.next_hop)

})?

.into(),

);

let req_header = raw_connect::generate_connect_header(&proxy.host, proxy.port, &mut headers)?;

let fut = raw_connect::connect(stream, &req_header);

let (mut stream, digest) = match peer.connection_timeout() {

Some(t) => pingora_timeout::timeout(t, fut)

.await

.explain_err(ConnectTimedout, |_| "establishing CONNECT proxy")?,

None => fut.await,

}

.map_err(|mut e| {

// http protocol may ask to retry if reused client

e.retry.decide_reuse(false);

e

})?;

debug!("CONNECT proxy established: {:?}", proxy);

stream.set_proxy_digest(digest);

let stream = stream.into_any().downcast::<Stream>().unwrap(); // safe, it is Stream from above

Ok(*stream)

}

整个过程分为基本三部分:

  1. 首先在 [1]Pingora 通过 connect_uds() 连接了指定的 Unix Domain Socket 路径。
  2. 然后在 [2],通过 raw_connect::generate_connect_header() 生成了 req_header
  3. 最后在 [3],通过 raw_connect::connect() 建连。

我们一步一步来看。

connect_uds

连接 Unix Domain Socket 很简单,直接调 tokioconnect() 方法就可以了:

/// connect() to the given Unix domain socket

pub async fn connect_uds(path: &std::path::Path) -> Result<UnixStream> {

UnixStream::connect(path)

.await

.map_err(|e| wrap_os_connect_error(e, format!("Fail to connect to {}", path.display())))

}

接下来是生成 ReqHeader 的过程。

/// Generate the CONNECT header for the given destination

pub fn generate_connect_header<'a, H, S>(

host: &str,

port: u16,

headers: H,

) -> Result<Box<ReqHeader>>

where

S: AsRef<[u8]>,

H: Iterator<Item = (S, &'a Vec<u8>)>,

{

// TODO: valid that host doesn't have port

// TODO: support adding ad-hoc headers

let authority = format!("{host}:{port}");

let req = http::request::Builder::new()

.version(http::Version::HTTP_11)

.method(http::method::Method::CONNECT)

.uri(format!("https://{authority}/")) // scheme doesn't matter

.header(http::header::HOST, &authority);

let (mut req, _) = match req.body(()) {

Ok(r) => r.into_parts(),

Err(e) => {

return Err(e).or_err(InvalidHTTPHeader, "Invalid CONNECT request");

}

};

for (k, v) in headers {

let header_name = http::header::HeaderName::from_bytes(k.as_ref())

.or_err(InvalidHTTPHeader, "Invalid CONNECT request")?;

let header_value = http::header::HeaderValue::from_bytes(v.as_slice())

.or_err(InvalidHTTPHeader, "Invalid CONNECT request")?;

req.headers.insert(header_name, header_value);

}

Ok(Box::new(req))

}

这里 Pingora 通过 http crateBuilder 构造了一个合法的 HTTP CONNECT Request,并填充了 PingoraPeer 中指定的 Proxy Header

raw_connect::connect

万事就绪,最后只要建立连接就可以了。

/// Try to establish a CONNECT proxy via the given `stream`.

///

/// `request_header` should include the necessary request headers for the CONNECT protocol.

///

/// When successful, a [`Stream`] will be returned which is the established CONNECT proxy connection.

pub async fn connect(stream: Stream, request_header: &ReqHeader) -> Result<(Stream, ProxyDigest)> {

let mut http = HttpSession::new(stream);

// We write to stream directly because HttpSession doesn't write req header in auth form

let to_wire = http_req_header_to_wire_auth_form(request_header);

http.underlying_stream

.write_all(to_wire.as_ref())

.await

.or_err(WriteError, "while writing request headers")?;

http.underlying_stream

.flush()

.await

.or_err(WriteError, "while flushing request headers")?;

// TODO: set http.read_timeout

let resp_header = http.read_resp_header_parts().await?;

Ok((

http.underlying_stream,

validate_connect_response(resp_header)?,

))

}

整个连接过程分三个部分。首先,在 [1]request_header 通过 http_req_header_to_wire_auth_form 转换为 Buffer

#[inline]

fn http_req_header_to_wire_auth_form(req: &ReqHeader) -> BytesMut {

let mut buf = BytesMut::with_capacity(512);

// Request-Line

let method = req.method.as_str().as_bytes();

buf.put_slice(method);

buf.put_u8(b' ');

// NOTE: CONNECT doesn't need URI path so we just skip that

if let Some(path) = req.uri.authority() {

buf.put_slice(path.as_str().as_bytes());

}

buf.put_u8(b' ');

let version = match req.version {

Version::HTTP_09 => "HTTP/0.9",

Version::HTTP_10 => "HTTP/1.0",

Version::HTTP_11 => "HTTP/1.1",

_ => "HTTP/0.9",

};

buf.put_slice(version.as_bytes());

buf.put_slice(CRLF);

// headers

let headers = &req.headers;

for (key, value) in headers.iter() {

buf.put_slice(key.as_ref());

buf.put_slice(HEADER_KV_DELIMITER);

buf.put_slice(value.as_ref());

buf.put_slice(CRLF);

}

buf.put_slice(CRLF);

buf

}

然后,在 [2] 读取并校验 HTTP Server 的返回值、Header 是否合法:

#[inline]

fn validate_connect_response(resp: Box<ResponseHeader>) -> Result<ProxyDigest> {

if !resp.status.is_success() {

return Error::e_because(

ConnectProxyFailure,

"None 2xx code",

ConnectProxyError::boxed_new(resp),

);

}

// Checking Content-Length and Transfer-Encoding is optional because we already ignore them.

// We choose to do so because we want to be strict for internal use of CONNECT.

// Ignore Content-Length header because our internal CONNECT server is coded to send it.

if resp.headers.get(http::header::TRANSFER_ENCODING).is_some() {

return Error::e_because(

ConnectProxyFailure,

"Invalid Transfer-Encoding presents",

ConnectProxyError::boxed_new(resp),

);

}

Ok(ProxyDigest::new(resp))

}

最后在 [3],将 httpunderlying_stream 返回。至此,TCP 连接已经建立,Pingora 也就可以在这条通道中继续进行 TCP 通信了。