在讲完一些有的没的(?)之后,我们继续回到 Pingora 的主线。这一篇我们来看一看 Peer,也就是 pingora-core::upstreams 中的一些结构与实现。

ToC

从 Example 继续

回到 Getting Started 开头的 Example。这次我们的注意力放在 HttpPeer 上:

use async_trait::async_trait;

use pingora::prelude::*;

use std::sync::Arc;

pub struct LB(Arc<LoadBalancer<RoundRobin>>);

#[async_trait]

impl ProxyHttp for LB {

7 collapsed lines

/// For this small example, we don't need context storage

type CTX = ();

fn new_ctx(&self) -> () {

()

}

async fn upstream_peer(&self, _session: &mut Session, _ctx: &mut ()) -> Result<Box<HttpPeer>> {

let upstream = self

.0

.select(b"", 256) // hash doesn't matter for round robin

.unwrap();

println!("upstream peer is: {upstream:?}");

// Set SNI to one.one.one.one

let peer = Box::new(HttpPeer::new(upstream, true, "one.one.one.one".to_string()));

Ok(peer)

}

25 collapsed lines

async fn upstream_request_filter(

&self,

_session: &mut Session,

upstream_request: &mut RequestHeader,

_ctx: &mut Self::CTX,

) -> Result<()> {

upstream_request

.insert_header("Host", "one.one.one.one")

.unwrap();

Ok(())

}

}

fn main() {

let mut my_server = Server::new(None).unwrap();

my_server.bootstrap();

let upstreams = LoadBalancer::try_from_iter(["1.1.1.1:443", "1.0.0.1:443"]).unwrap();

let mut lb = http_proxy_service(&my_server.configuration, LB(Arc::new(upstreams)));

lb.add_tcp("0.0.0.0:6188");

my_server.add_service(lb);

my_server.run_forever();

}

可以看到,在实现 ProxyHttp 的过程中,我们需要返回一个 HttpPeer 用于连接。HttpPeer,以及它实现了的 trait Peer,都是定义在 pingora-core 中的结构。让我们深入看看。

trait Peer

PeerConnector 在连接时用于承载连接参数的结构。它的 Trait 定义如下:

/// [`Peer`] defines the interface to communicate with the [`crate::connectors`] regarding where to

/// connect to and how to connect to it.

pub trait Peer: Display + Clone {

/// The remote address to connect to

fn address(&self) -> &SocketAddr;

/// If TLS should be used;

fn tls(&self) -> bool;

/// The SNI to send, if TLS is used

fn sni(&self) -> &str;

/// To decide whether a [`Peer`] can use the connection established by another [`Peer`].

///

/// The connection to two peers are considered reusable to each other if their reuse hashes are

/// the same

fn reuse_hash(&self) -> u64;

103 collapsed lines

/// Get the proxy setting to connect to the remote server

fn get_proxy(&self) -> Option<&Proxy> {

None

}

/// Get the additional options to connect to the peer.

///

/// See [`PeerOptions`] for more details

fn get_peer_options(&self) -> Option<&PeerOptions> {

None

}

/// Get the additional options for modification.

fn get_mut_peer_options(&mut self) -> Option<&mut PeerOptions> {

None

}

/// Whether the TLS handshake should validate the cert of the server.

fn verify_cert(&self) -> bool {

match self.get_peer_options() {

Some(opt) => opt.verify_cert,

None => false,

}

}

/// Whether the TLS handshake should verify that the server cert matches the SNI.

fn verify_hostname(&self) -> bool {

match self.get_peer_options() {

Some(opt) => opt.verify_hostname,

None => false,

}

}

/// The alternative common name to use to verify the server cert.

///

/// If the server cert doesn't match the SNI, this name will be used to

/// verify the cert.

fn alternative_cn(&self) -> Option<&String> {

match self.get_peer_options() {

Some(opt) => opt.alternative_cn.as_ref(),

None => None,

}

}

/// Which local source address this connection should be bind to.

fn bind_to(&self) -> Option<&InetSocketAddr> {

match self.get_peer_options() {

Some(opt) => opt.bind_to.as_ref(),

None => None,

}

}

/// How long connect() call should be wait before it returns a timeout error.

fn connection_timeout(&self) -> Option<Duration> {

match self.get_peer_options() {

Some(opt) => opt.connection_timeout,

None => None,

}

}

/// How long the overall connection establishment should take before a timeout error is returned.

fn total_connection_timeout(&self) -> Option<Duration> {

match self.get_peer_options() {

Some(opt) => opt.total_connection_timeout,

None => None,

}

}

/// If the connection can be reused, how long the connection should wait to be reused before it

/// shuts down.

fn idle_timeout(&self) -> Option<Duration> {

self.get_peer_options().and_then(|o| o.idle_timeout)

}

/// Get the ALPN preference.

fn get_alpn(&self) -> Option<&ALPN> {

self.get_peer_options().map(|opt| &opt.alpn)

}

/// Get the CA cert to use to validate the server cert.

///

/// If not set, the default CAs will be used.

fn get_ca(&self) -> Option<&Arc<Box<[X509]>>> {

match self.get_peer_options() {

Some(opt) => opt.ca.as_ref(),

None => None,

}

}

/// Get the client cert and key for mutual TLS if any

fn get_client_cert_key(&self) -> Option<&Arc<CertKey>> {

None

}

/// The TCP keepalive setting that should be applied to this connection

fn tcp_keepalive(&self) -> Option<&TcpKeepalive> {

self.get_peer_options()

.and_then(|o| o.tcp_keepalive.as_ref())

}

/// The interval H2 pings to send to the server if any

fn h2_ping_interval(&self) -> Option<Duration> {

self.get_peer_options().and_then(|o| o.h2_ping_interval)

}

fn matches_fd<V: AsRawFd>(&self, fd: V) -> bool {

self.address().check_fd_match(fd)

}

fn get_tracer(&self) -> Option<Tracer> {

None

}

}

为了节省篇幅,这里将大量拥有默认实现的 trait Method 折叠了。有需要可以自行展开观看。从最核心的角度出发,Peer 必须实现的,也就是连接的过程中必须告知 Connector 的属性有:

方法简介
address()连接的地址。
tls()是否要以 TLS 连接。
sni()当使用 TLS 时的 SNI
reuse_hash()是否应该复用连接。当这个值相等时,则可以复用。

初次之外,还有一些有用的属性,包括:

  • get_proxy(): 是否使用代理连接
  • verify_cert(): 是否检查证书
  • verify_hostname: 是否检查 Hostname
  • connection_timeout(): 通过 connect 建立连接时的超时时间
  • total_connection_timeout(): 连接的总超时时间
  • matches_fd(): 判断某一 FD 是否和该连接匹配
  • get_tracer(): 获取该连接对应的 Tracer

Tracing 与 Tracer

trait Peer 中,定义了一种 Tracer 类型。这也是 Peer 用于追踪的手段。Tracer 需要实现 trace Tracing,负责在连接成功失败时被调用。定义如下:

/// The interface to trace the connection

pub trait Tracing: Send + Sync + std::fmt::Debug {

/// This method is called when successfully connected to a remote server

fn on_connected(&self);

/// This method is called when the connection is disconnected.

fn on_disconnected(&self);

/// A way to clone itself

fn boxed_clone(&self) -> Box<dyn Tracing>;

}

/// An object-safe version of Tracing object that can use Clone

#[derive(Debug)]

pub struct Tracer(pub Box<dyn Tracing>);

BasicPeer

在了解完 trait Peer 的定义后,我们来看一个简单的 Peer 实现:BasicPeerPingora 在一些简单的场景中会用到 BasicPeer,比如 TcpHealthCheck 和单元测试。这个实现也比较粗糙,目前只支持建立到 address 的非 TLS 连接。源码非常简单,基本就是把 field 填了一下。如下所示:

/// A simple TCP or TLS peer without many complicated settings.

#[derive(Debug, Clone)]

pub struct BasicPeer {

pub _address: SocketAddr,

pub sni: String,

pub options: PeerOptions,

}

impl BasicPeer {

/// Create a new [`BasicPeer`]

pub fn new(address: &str) -> Self {

BasicPeer {

_address: SocketAddr::Inet(address.parse().unwrap()), // TODO: check error, add support

// for UDS

sni: "".to_string(), // TODO: add support for SNI

options: PeerOptions::new(),

}

}

}

7 collapsed lines

impl Display for BasicPeer {

fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {

write!(f, "{:?}", self)

}

}

impl Peer for BasicPeer {

fn address(&self) -> &SocketAddr {

&self._address

}

fn tls(&self) -> bool {

!self.sni.is_empty()

}

fn bind_to(&self) -> Option<&InetSocketAddr> {

None

}

fn sni(&self) -> &str {

&self.sni

}

// TODO: change connection pool to accept u64 instead of String

fn reuse_hash(&self) -> u64 {

let mut hasher = AHasher::default();

self._address.hash(&mut hasher);

hasher.finish()

}

fn get_peer_options(&self) -> Option<&PeerOptions> {

Some(&self.options)

}

}

HttpPeer

看完了比较简单的,再来看看相对复杂一些的。HttpPeer,也就是我们在 Example 中用到的结构。相比 BasicPeer,它额外支持了 httpsSNI、代理、客户端证书,还有诸如 UDS(Unix Domain Socket) 的使用。

/// A peer representing the remote HTTP server to connect to

#[derive(Debug, Clone)]

pub struct HttpPeer {

pub _address: SocketAddr,

pub scheme: Scheme,

pub sni: String,

pub proxy: Option<Proxy>,

pub client_cert_key: Option<Arc<CertKey>>,

pub options: PeerOptions,

}

impl HttpPeer {

// These methods are pretty ad-hoc

pub fn is_tls(&self) -> bool {

match self.scheme {

Scheme::HTTP => false,

Scheme::HTTPS => true,

}

}

fn new_from_sockaddr(address: SocketAddr, tls: bool, sni: String) -> Self {

HttpPeer {

_address: address,

scheme: Scheme::from_tls_bool(tls),

sni,

proxy: None,

client_cert_key: None,

options: PeerOptions::new(),

}

}

/// Create a new [`HttpPeer`] with the given socket address and TLS settings.

pub fn new<A: ToInetSocketAddrs>(address: A, tls: bool, sni: String) -> Self {

let mut addrs_iter = address.to_socket_addrs().unwrap(); //TODO: handle error

let addr = addrs_iter.next().unwrap();

Self::new_from_sockaddr(SocketAddr::Inet(addr), tls, sni)

}

/// Create a new [`HttpPeer`] with the given path to Unix domain socket and TLS settings.

pub fn new_uds(path: &str, tls: bool, sni: String) -> Self {

let addr = SocketAddr::Unix(UnixSocketAddr::from_pathname(Path::new(path)).unwrap()); //TODO: handle error

Self::new_from_sockaddr(addr, tls, sni)

}

/// Create a new [`HttpPeer`] that uses a proxy to connect to the upstream IP and port

/// combination.

pub fn new_proxy(

next_hop: &str,

ip_addr: IpAddr,

port: u16,

tls: bool,

sni: &str,

headers: BTreeMap<String, Vec<u8>>,

) -> Self {

HttpPeer {

_address: SocketAddr::Inet(InetSocketAddr::new(ip_addr, port)),

scheme: Scheme::from_tls_bool(tls),

sni: sni.to_string(),

proxy: Some(Proxy {

next_hop: PathBuf::from(next_hop).into(),

host: ip_addr.to_string(),

port,

headers,

}),

client_cert_key: None,

options: PeerOptions::new(),

}

}

fn peer_hash(&self) -> u64 {

let mut hasher = AHasher::default();

self.hash(&mut hasher);

hasher.finish()

}

}

32 collapsed lines

impl Hash for HttpPeer {

fn hash<H: Hasher>(&self, state: &mut H) {

self._address.hash(state);

self.scheme.hash(state);

self.proxy.hash(state);

self.sni.hash(state);

// client cert serial

self.client_cert_key.hash(state);

// origin server cert verification

self.verify_cert().hash(state);

self.verify_hostname().hash(state);

self.alternative_cn().hash(state);

}

}

impl Display for HttpPeer {

fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {

write!(f, "addr: {}, scheme: {},", self._address, self.scheme)?;

if !self.sni.is_empty() {

write!(f, "sni: {},", self.sni)?;

}

if let Some(p) = self.proxy.as_ref() {

write!(f, "proxy: {p},")?;

}

if let Some(cert) = &self.client_cert_key {

write!(f, "client cert: {},", cert)?;

}

Ok(())

}

}

impl Peer for HttpPeer {

fn address(&self) -> &SocketAddr {

&self._address

}

fn tls(&self) -> bool {

self.is_tls()

}

fn sni(&self) -> &str {

&self.sni

}

// TODO: change connection pool to accept u64 instead of String

fn reuse_hash(&self) -> u64 {

self.peer_hash()

}

fn get_peer_options(&self) -> Option<&PeerOptions> {

Some(&self.options)

}

fn get_mut_peer_options(&mut self) -> Option<&mut PeerOptions> {

Some(&mut self.options)

}

fn get_proxy(&self) -> Option<&Proxy> {

self.proxy.as_ref()

}

fn matches_fd<V: AsRawFd>(&self, fd: V) -> bool {

if let Some(proxy) = self.get_proxy() {

proxy.next_hop.check_fd_match(fd)

} else {

self.address().check_fd_match(fd)

}

}

fn get_client_cert_key(&self) -> Option<&Arc<CertKey>> {

self.client_cert_key.as_ref()

}

fn get_tracer(&self) -> Option<Tracer> {

self.options.tracer.clone()

}

}

HttpPeer 支持三种构造形式:

  • [1]: new(),可以建立到某个 SocketAddr 的连接。
  • [2]: new_uds(),可以通过 Unix Domain Socket 的路径建立连接。
  • [3]: new_proxy(),则可以以一个 http proxy 作为跳板继续建立连接。

Proxy

HttpProxy 中使用的 Proxy 支持连接任意 host:portUDS 路径,并可以指定额外的 HTTP Header,定义如下:

/// The proxy settings to connect to the remote server, CONNECT only for now

#[derive(Debug, Hash, Clone)]

pub struct Proxy {

pub next_hop: Box<Path>, // for now this will be the path to the UDS

pub host: String, // the proxied host. Could be either IP addr or hostname.

pub port: u16, // the port to proxy to

pub headers: BTreeMap<String, Vec<u8>>, // the additional headers to add to CONNECT

}

12 collapsed lines

impl Display for Proxy {

fn fmt(&self, f: &mut Formatter) -> FmtResult {

write!(

f,

"next_hop: {}, host: {}, port: {}",

self.next_hop.display(),

self.host,

self.port

)

}

}