比较有趣的是,作为一个通常意义上用于「反向代理」的工具,Pingora 中也有可以实现 HTTP Server 的部分。与大部分 HTTP Server 类似,但也有 Pingora 的醍醐味在里面。话不多说,赶紧端上来吧(

ToC

Example: Logging

官方示例里有一个 logging 的部分,代码如下:

pub struct MyGateway {

req_metric: prometheus::IntCounter,

}

#[async_trait]

impl ProxyHttp for MyGateway {

...

async fn logging(

&self,

session: &mut Session,

_e: Option<&pingora::Error>,

ctx: &mut Self::CTX,

) {

let response_code = session

.response_written()

.map_or(0, |resp| resp.status.as_u16());

// access log

info!(

"{} response code: {response_code}",

self.request_summary(session, ctx)

);

self.req_metric.inc();

}

fn main() {

...

let mut prometheus_service_http =

pingora::services::listening::Service::prometheus_http_service();

prometheus_service_http.add_tcp("127.0.0.1:6192");

my_server.add_service(prometheus_service_http);

my_server.run_forever();

}

可以看到,通过高亮部分的代码,这段 logging 的代码在 127.0.0.1:6192 上启动了一个 Prometheus metric server。这个 Server 是按什么逻辑去实现的呢?

trait ServeHttp

我们自底向上分解一下。首先来看 ServeHttp

/// This trait defines how to map a request to a response

#[cfg_attr(not(doc_async_trait), async_trait)]

pub trait ServeHttp {

/// Define the mapping from a request to a response.

/// Note that the request header is already read, but the implementation needs to read the

/// request body if any.

///

/// # Limitation

/// In this API, the entire response has to be generated before the end of this call.

/// So it is not suitable for streaming response or interactive communications.

/// Users need to implement their own [`super::HttpServerApp`] for those use cases.

async fn response(&self, http_session: &mut ServerSession) -> Response<Vec<u8>>;

}

这是一个标准又简单的 HTTP Server,将 Request 映射为 Response。这个 Trait 的定义也比较简单,不支持 streaming,只能一次性返回全部 Body 内容。

struct HttpServer

HttpServer 则是一个捆绑了 ServeHttp 和一系列 HttpModules 中间件的存在:

/// A helper struct for HTTP server with http modules embedded

pub struct HttpServer<SV> {

app: SV,

modules: HttpModules,

}

针对 SV: ServeHttp,它实现了 HttpServerApp。这实际也是约束了 app 必须是一个实现了 ServeHttpServer 实现:

#[cfg_attr(not(doc_async_trait), async_trait)]

impl<SV> HttpServerApp for HttpServer<SV>

where

SV: ServeHttp + Send + Sync,

{

async fn process_new_http(

self: &Arc<Self>,

mut http: ServerSession,

shutdown: &ShutdownWatch,

) -> Option<Stream> {

// ...

HttpModule

HttpModule 可以简单理解成用于修改 ServeHttp 接收和返回的中间件,用于修改请求和返回的内容。Pingora 中有一系列用于构造/使用 HttpModule 的方式。

trait HttpModule

我们先从最基本的 Trait 定义来看:

pub trait HttpModule {

fn request_header_filter(&mut self, _req: &mut RequestHeader) -> Result<()> {

Ok(())

}

fn request_body_filter(&mut self, body: Option<Bytes>) -> Result<Option<Bytes>> {

Ok(body)

}

fn response_filter(&mut self, _t: &mut HttpTask) -> Result<()> {

Ok(())

}

fn as_any(&self) -> &dyn Any;

fn as_any_mut(&mut self) -> &mut dyn Any;

}

type Module = Box<dyn HttpModule + 'static + Send + Sync>

可以看到,HttpModule 对请求和返回都可以进行修改。对于请求,Trait 中定义了 request_header_filter()用于修改 header,以及 request_body_filter() 用来修改 Body;而对于返回,HttpTask 中的所有部分都可以更改。HttpTask 定义如下:

/// An enum to hold all possible HTTP response events.

#[derive(Debug)]

pub enum HttpTask {

/// the response header and the boolean end of response flag

Header(Box<pingora_http::ResponseHeader>, bool),

/// A piece of response header and the end of response boolean flag

Body(Option<bytes::Bytes>, bool),

/// HTTP response trailer

Trailer(Option<Box<http::HeaderMap>>),

/// Signal that the response is already finished

Done,

/// Signal that the reading of the response encounters errors.

Failed(pingora_error::BError),

}

基本上涵盖了 Response 返回的所有内容。

trait HttpModuleBuilder

定义了模块后,我们还不能直接使用。我们需要为这个模块创建一个 Builder,符合以下 Trait 的定义:

/// Trait to init the http module ctx for each request

pub trait HttpModuleBuilder {

/// The order the module will run

///

/// The lower the value, the later it runs relative to other filters.

/// If the order of the filter is not important, leave it to the default 0.

fn order(&self) -> i16 {

0

}

/// Initialize and return the per request module context

fn init(&self) -> Module;

}

pub type ModuleBuilder = Box<dyn HttpModuleBuilder + 'static + Send + Sync>

HttpModuleBuilder 中,Pingora 规定了 HttpModule 加载与应用的优先级。

HttpModules

最后,回到我们 HttpServer 中使用的 HttpModules 类型。这其实是一个很像 Builder 的结构,定义如下:

/// The object to hold multiple http modules

pub struct HttpModules {

modules: Vec<ModuleBuilder>,

module_index: OnceCell<Arc<HashMap<TypeId, usize>>>,

}

impl HttpModules {

/// Create a new [HttpModules]

pub fn new() -> Self {

HttpModules {

modules: vec![],

module_index: OnceCell::new(),

}

}

/// Add a new [ModuleBuilder] to [HttpModules]

///

/// Each type of [HttpModule] can be only added once.

/// # Panic

/// Panic if any [HttpModule] is added more than once.

pub fn add_module(&mut self, builder: ModuleBuilder) {

if self.module_index.get().is_some() {

// We use a shared module_index the index would be out of sync if we

// add more modules.

panic!("cannot add module after ctx is already built")

}

self.modules.push(builder);

// not the most efficient way but should be fine

// largest order first

self.modules.sort_by_key(|m| -m.order());

}

/// Build the contexts of all the modules added to this [HttpModules]

pub fn build_ctx(&self) -> HttpModuleCtx {

let module_ctx: Vec<_> = self.modules.iter().map(|b| b.init()).collect();

let module_index = self

.module_index

.get_or_init(|| {

let mut module_index = HashMap::with_capacity(self.modules.len());

for (i, c) in module_ctx.iter().enumerate() {

let exist = module_index.insert(c.as_any().type_id(), i);

if exist.is_some() {

panic!("duplicated filters found")

}

}

Arc::new(module_index)

})

.clone();

HttpModuleCtx {

module_ctx,

module_index,

}

}

}

在构造期间,我们需要通过 HttpModules::add_module 添加新的 ModuleBuilder。在每次添加时,都会以 HttpModuleBuilder::order() 进行排序,确定最终模块的执行顺序([2])。

在每次 HTTP 请求来临时,build_ctx() 都会被调用。在这个过程中,Pingora 会通过 HttpModuleBuilder::init() 创建对应的 HttpModule([3]),并且将顺序信息以 HashMap 的形式存储来生成一个 index([4])。顺序信息只会创建一次,之后便通过 OnceCell 的形式保留在 module_index 中,供后续复用([1])。

HttpModuleCtx

HttpModule 的最后一环就是 HttpModuleCtx 了。其实这个部分已经没什么可讲的了,就是很简单的顺序执行 Module 的内容,代码一看就懂了x

/// The Contexts of multiple modules

///

/// This is the object that will apply all the included modules to a certain HTTP request.

/// The modules are ordered according to their `order()`.

pub struct HttpModuleCtx {

// the modules in the order of execution

module_ctx: Vec<Module>,

// find the module in the vec with its type ID

module_index: Arc<HashMap<TypeId, usize>>,

}

impl HttpModuleCtx {

/// Create a placeholder empty [HttpModuleCtx].

///

/// [HttpModules] should be used to create nonempty [HttpModuleCtx].

pub fn empty() -> Self {

HttpModuleCtx {

module_ctx: vec![],

module_index: Arc::new(HashMap::new()),

}

}

/// Get a ref to [HttpModule] if any.

pub fn get<T: 'static>(&self) -> Option<&T> {

let idx = self.module_index.get(&TypeId::of::<T>())?;

let ctx = &self.module_ctx[*idx];

Some(

ctx.as_any()

.downcast_ref::<T>()

.expect("type should always match"),

)

}

/// Get a mut ref to [HttpModule] if any.

pub fn get_mut<T: 'static>(&mut self) -> Option<&mut T> {

let idx = self.module_index.get(&TypeId::of::<T>())?;

let ctx = &mut self.module_ctx[*idx];

Some(

ctx.as_any_mut()

.downcast_mut::<T>()

.expect("type should always match"),

)

}

/// Run the `request_header_filter` for all the modules according to their orders.

pub fn request_header_filter(&mut self, req: &mut RequestHeader) -> Result<()> {

for filter in self.module_ctx.iter_mut() {

filter.request_header_filter(req)?;

}

Ok(())

}

/// Run the `request_body_filter` for all the modules according to their orders.

pub fn request_body_filter(&mut self, mut body: Option<Bytes>) -> Result<Option<Bytes>> {

for filter in self.module_ctx.iter_mut() {

body = filter.request_body_filter(body)?;

}

Ok(body)

}

/// Run the `response_filter` for all the modules according to their orders.

pub fn response_filter(&mut self, t: &mut HttpTask) -> Result<()> {

for filter in self.module_ctx.iter_mut() {

filter.response_filter(t)?;

}

Ok(())

}

}

回到 Prometheus

看完一圈底层的实现,我们再回到 Prometheus。现在再来看 PrometheusServer 的实现就非常简单清晰了:

/// A HTTP application that reports Prometheus metrics.

///

/// This application will report all the [static metrics](https://docs.rs/prometheus/latest/prometheus/index.html#static-metrics)

/// collected via the [Prometheus](https://docs.rs/prometheus/) crate;

pub struct PrometheusHttpApp;

#[cfg_attr(not(doc_async_trait), async_trait)]

impl ServeHttp for PrometheusHttpApp {

async fn response(&self, _http_session: &mut ServerSession) -> Response<Vec<u8>> {

let encoder = TextEncoder::new();

let metric_families = prometheus::gather();

let mut buffer = vec![];

encoder.encode(&metric_families, &mut buffer).unwrap();

Response::builder()

.status(200)

.header(http::header::CONTENT_TYPE, encoder.format_type())

.header(http::header::CONTENT_LENGTH, buffer.len())

.body(buffer)

.unwrap()

}

}

/// The [HttpServer] for [PrometheusHttpApp]

///

/// This type provides the functionality of [PrometheusHttpApp] with compression enabled

pub type PrometheusServer = HttpServer<PrometheusHttpApp>;

impl PrometheusServer {

pub fn new() -> Self {

let mut server = Self::new_app(PrometheusHttpApp);

// enable gzip level 7 compression

server.add_module(ResponseCompressionBuilder::enable(7));

server

}

}

整个过程中最主要的步骤只有:

  1. PrometheusHttpApp 实现 ServeHttp
  2. 定义一个 PrometheusServer,将 PrometheusHttpApp 构造进去,并且添加上需要的 HttpModuleBuilder

真的是⑨都能看懂的程度,这里就不过多赘述了。

最后看看……ResponseCompression

PrometheusServer 中用到了 ResponseCompressionBuilder,而这也是 Pingora 目前在其仓库内实现的唯一一个 HttpModule。这里我们简单看看它在 pingora-core 中的基本实现,详情留到以后再展开(咕):

/// HTTP response compression module

pub struct ResponseCompression(ResponseCompressionCtx);

impl HttpModule for ResponseCompression {

fn as_any(&self) -> &dyn std::any::Any {

self

}

fn as_any_mut(&mut self) -> &mut dyn std::any::Any {

self

}

fn request_header_filter(&mut self, req: &mut RequestHeader) -> Result<()> {

self.0.request_filter(req);

Ok(())

}

fn response_filter(&mut self, t: &mut HttpTask) -> Result<()> {

self.0.response_filter(t);

Ok(())

}

}

/// The builder for HTTP response compression module

pub struct ResponseCompressionBuilder {

level: u32,

}

impl ResponseCompressionBuilder {

/// Return a [ModuleBuilder] for [ResponseCompression] with the given compression level

pub fn enable(level: u32) -> ModuleBuilder {

Box::new(ResponseCompressionBuilder { level })

}

}

impl HttpModuleBuilder for ResponseCompressionBuilder {

fn init(&self) -> Module {

Box::new(ResponseCompression(ResponseCompressionCtx::new(

self.level, false,

)))

}

fn order(&self) -> i16 {

// run the response filter later than most others filters

i16::MIN / 2

}

}

嘛,就是这样(x