据我所知,所有开源负载均衡都会提供至少一种扩展机制,BFE也不例外,BFE通过模块的选择可以更精细的控制BFE在处理请求中的各个阶段。如果内置模块不能满足自己需求,那么可以自己开发模块,而BFE是用Golang写的,所以开发效率很高。
下面是BFE各个回调点的位置。

BFE的模块和其他开源产品的一个很大的不同点是,BFE基于Product(租户)维度控制,而其他大多数产品都是基于Host(域名)。
首先看看BFE是怎么加载和启用模块的。
func StartUp(cfg bfe_conf.BfeConfig, version string, confRoot string) error {
var err error
// 1.
bfe_modules.SetModules()
// 2.
if err = bfeServer.RegisterModules(cfg.Server.Modules); err != nil {
log.Logger.Error("StartUp(): RegisterModules():%s", err.Error())
return err
}
// 3.
if err = bfeServer.InitModules(); err != nil {
log.Logger.Error("StartUp(): bfeServer.InitModules():%s",
err.Error())
return err
}
}
// 4.
func SetModules() {
for _, module := range moduleList {
bfe_module.AddModule(module)
}
}
// 5.
func (srv *BfeServer) RegisterModules(modules []string) error {
for _, moduleName := range modules {
moduleName = strings.TrimSpace(moduleName)
if err := srv.Modules.RegisterModule(moduleName); err != nil {
return err
}
}
return nil
}
// 6.
func (bm *BfeModules) RegisterModule(name string) error {
module, ok := moduleMap[name]
bm.workModules[name] = module
return nil
}
// 7.
func (srv *BfeServer) InitModules() error {
return srv.Modules.Init(srv.CallBacks, srv.Monitor.WebHandlers, srv.ConfRoot)
}
func (bm *BfeModules) Init(cbs *BfeCallbacks, whs *web_monitor.WebHandlers, cr string) error {
// 8.
for _, name := range modulesAll {
module, ok := bm.workModules[name]
if ok {
err := module.Init(cbs, whs, cr)
modulesEnabled = append(modulesEnabled, name)
}
}
return nil
}
代码分解如下:
BFE代码中所有集成的模块Modules参数依次尝试注册模块BfeServer对象上从上面的的代码可以看到,模块的初始化主要分为三个部分,加载所有可用的模块,注册用户配置的模块,将注册的模块依次初始化,也就是调用其对应的Init方法,BFE的模块初始化流程还是比较简单明了的。
下面开始看常见模块的初始化逻辑。
根据前面的代码,我们可以直接找模块的Init方法。
这个模块用于设置请求的客户端IP是否是可信的,可信的IP段由用户配置。
// 1.
func (m *ModuleTrustClientIP) Init() error {
confPath := bfe_module.ModConfPath(cr, m.name)
if conf, err = ConfLoad(confPath, cr); err != nil {
return fmt.Errorf("%s: conf load err %s", m.name, err.Error())
}
return m.init(conf, cbs, whs)
}
func (m *ModuleTrustClientIP) init(cfg *ConfModTrustClientIP, cbs *bfe_module.BfeCallbacks,
whs *web_monitor.WebHandlers) error {
m.configPath = cfg.Basic.DataPath
// 2.
m.trustTable = ipdict.NewIPTable()
// 3.
if err := m.loadConfData(nil); err != nil {
return fmt.Errorf("err in loadConfData(): %s", err.Error())
}
// 4.
err := cbs.AddFilter(bfe_module.HandleAccept, m.acceptHandler)
// 5.
err = web_monitor.RegisterHandlers(whs, web_monitor.WebHandleMonitor, m.monitorHandlers())
// 6.
err = whs.RegisterHandler(web_monitor.WebHandleReload, m.name, m.loadConfData)
return nil
}
func (m *ModuleTrustClientIP) acceptHandler(session *bfe_basic.Session) int {
m.state.ConnTotal.Inc(1)
trusted := m.trustTable.Search(session.RemoteAddr.IP)
if trusted {
m.state.ConnTrustClientip.Inc(1)
}
// 7.
session.SetTrustSource(trusted)
// state for internal remote ip
if session.RemoteAddr.IP.IsPrivate() {
m.state.ConnAddrInternal.Inc(1)
if !trusted {
m.state.ConnAddrInternalNotTrust.Inc(1)
}
}
return bfe_module.BfeHandlerGoOn
}
代码分解如下:
m.acceptHandler注册到回调函数中reload时被调用的函数true。这个模块并不会拒绝请求,它只是在请求的Session(会话)对象的isTrustSourcez字段设置一个true。这个布尔值只有在条件匹配时使用条件原语req_cip_trusted才会应用。
这个模块主要用于屏蔽指定的IP段,以及根据条件原语来屏蔽。为了更好理解看看它的测试配置文件
{
"Version": "init version",
"Config": {
"example_product": [
{
"action": {
"cmd": "CLOSE",
"params": []
},
"name": "example rule",
"cond": "req_path_in(\"/limit\", false)"
}
]
}
}
这个配置文件用于屏蔽匹配的规则
192.168.1.253 192.168.1.254
192.168.1.250
192.168.1.250/20
这个配置文件用于指定要屏蔽的IP段
最后看看代码
func (m *ModuleBlock) Init() error {
// 1.
confPath := bfe_module.ModConfPath(cr, m.name)
if conf, err = ConfLoad(confPath, cr); err != nil {
return fmt.Errorf("%s: conf load err %s", m.name, err.Error())
}
// 2.
if err = m.loadGlobalIPTable(nil); err != nil {
return fmt.Errorf("%s: loadGlobalIPTable() err %s", m.name, err.Error())
}
// 3.
if err = m.loadProductRuleConf(nil); err != nil {
return fmt.Errorf("%s: loadProductRuleConf() err %s", m.name, err.Error())
}
// 4.
err = cbs.AddFilter(bfe_module.HandleAccept, m.globalBlockHandler)
// 5.
err = cbs.AddFilter(bfe_module.HandleFoundProduct, m.productBlockHandler)
return nil
}
// 6.
func (m *ModuleBlock) globalBlockHandler(session *bfe_basic.Session) int {
clientIP := session.RemoteAddr.IP
if m.ipTable.Search(clientIP) {
session.SetError(ErrBlock, "connection blocked")
return bfe_module.BfeHandlerClose
}
return bfe_module.BfeHandlerGoOn
}
// 7.
func (m *ModuleBlock) productBlockHandler() {
// 8.
rules, ok := m.ruleTable.Search(bfe_basic.GlobalProduct)
if ok { // rules found
retVal, isMatch, resp := m.productRulesProcess(request, rules)
if isMatch {
return retVal, resp
}
}
// 9.
rules, ok = m.ruleTable.Search(request.Route.Product)
if !ok {
return bfe_module.BfeHandlerGoOn, nil
}
retVal, isMatch, resp := m.productRulesProcess(request, rules)
if !isMatch {
m.state.ReqAccept.Inc(1)
}
return retVal, resp
}
代码分解如下:
Product租户分类的屏蔽的规则globalBlockHandlerproductBlockHandlerglobal的product租户名,这个特殊的租户名会全局生效这个模块主要使用用来修改头信息,为了更好理解看看它的测试配置文件
{
"Version": "init version",
"Config": {
"example_product": [
{
"cond": "req_path_prefix_in(\"/header\", false)",
"actions": [
{
"cmd": "RSP_HEADER_SET",
"params": [
"X-Proxied-By",
"bfe"
]
}
],
"last": true
}
]
}
}
配置的意思就是, 当请求的路径前缀包含/header, 就设置X-Proxied-By: bfe这样的HTTP头信息。
func (m *ModuleHeader) Init(cbs *bfe_module.BfeCallbacks, whs *web_monitor.WebHandlers,
// 1.
confPath := bfe_module.ModConfPath(cr, m.name)
if conf, err = ConfLoad(confPath, cr); err != nil {
return fmt.Errorf("%s: conf load err %s", m.name, err.Error())
}
return m.init(conf, cbs, whs)
}
func (m *ModuleHeader) init() error {
// 2.
if err := m.loadConfData(nil); err != nil {
return fmt.Errorf("err in loadConfData(): %s", err.Error())
}
// 3.
err := cbs.AddFilter(bfe_module.HandleAfterLocation, m.reqHeaderHandler)
err = cbs.AddFilter(bfe_module.HandleReadResponse, m.rspHeaderHandler)
return nil
}
代码分解如下:
这个模块的功能逻辑比较直观,具体的实现代码就不看了。
这个模块用来改写请求,可以修改三部分的数据,源码如下:
var allowActions = map[string]interface{}{
// host actions
action.ActionHostSetFromPathPrefix: nil, // set host from path prefix
action.ActionHostSet: nil, //set host
action.ActionHostSuffixReplace: nil, // replace host suffix
// path actions
action.ActionPathSet: nil, // set path
action.ActionPathPrefixAdd: nil, // add path prefix
action.ActionPathPrefixTrim: nil, // trim path prefix
// query actions
action.ActionQueryAdd: nil, // add query
action.ActionQueryDel: nil, // del query
action.ActionQueryRename: nil, // rename query
action.ActionQueryDelAllExcept: nil, // del query except given query key
}
根据名字应该就能看出来, Host, Path, Query三个字段的值。
下面是一个示例配置文件
{
"Version": "init version",
"Config": {
"youerning_product": [
{
"Cond": "req_path_prefix_in(\"/rewrite\", false)",
"Actions": [
{
"Cmd": "PATH_PREFIX_ADD",
"Params": [
"/bfe/"
]
}
],
"Last": true
}
]
}
}
上面配置的意思是,在URL路径上加上/bfe/, 如果路径的前缀是/rewrite的话。
测试如下:
curl "http://127.0.0.1:8080/rewrite" -H "Host: youerning.top"
ClusterB Backend1 /bfe/rewrite
其实模块的初始化代码都差不多,后面都会简单的过一下初始化过程,主要是看看它的回调点在哪。
func (m *ModuleReWrite) Init() error {
// 加载配置文件
confPath := bfe_module.ModConfPath(cr, m.name)
if conf, err = ConfLoad(confPath, cr); err != nil {
return fmt.Errorf("%s: conf load err %s", m.name, err.Error())
}
return m.init(conf, cbs, whs)
}
func (m *ModuleReWrite) init() error {
// HandleAfterLocation的回调会被调用
err := cbs.AddFilter(bfe_module.HandleAfterLocation, m.rewriteHandler)
return nil
}
这个模块的名字也很直观,就是重定向,我们通过配置文件来学习。
{
"Version": "init version",
"Config": {
"youerning_product": [
{
"Cond": "req_path_prefix_in(\"/redirect\", false)",
"Actions": [
{
"Cmd": "URL_SET",
"Params": ["http://127.0.0.1:18002"]
}
],
"Status": 301
}
]
}
}
上面配置的意思是,重定向到http://127.0.0.1:18002, 如果路径的前缀是/redirect的话。
测试如下:
curl "http://127.0.0.1:8080/redirect" -H "Host: youerning.top" -i -L
HTTP/1.1 301 Moved Permanently
Location: http://127.0.0.1:18002
Server: bfe
Date: Tue, 05 Sep 2023 01:40:53 GMT
Content-Length: 57
Content-Type: text/html; charset=utf-8
HTTP/1.1 200 OK
Date: Tue, 05 Sep 2023 01:40:53 GMT
Content-Length: 20
Content-Type: text/plain; charset=utf-8
ClusterA Backend2 /
可以看到,最先得到的响应时301,被重定向的位置是http://127.0.0.1:18002
这个模块用来生成一个随机的日志ID, 不需要配置文件,暂时不知道干啥的。
func (m *ModuleLogId) Init(cbs *bfe_module.BfeCallbacks, whs *web_monitor.WebHandlers,
cr string) error {
// 分别在HandleAccept, HandleBeforeLocation创建logid
err := cbs.AddFilter(bfe_module.HandleAccept, m.sessionIdHandler)
err = cbs.AddFilter(bfe_module.HandleBeforeLocation, m.requestIdHandler)
return nil
}
func (m *ModuleLogId) sessionIdHandler(session *bfe_basic.Session) int {
// 在session上设置logid
session.SessionId = genLogId()
return bfe_module.BfeHandlerGoOn
}
func (m *ModuleLogId) requestIdHandler() {
// 在HTTP头 X-Bfe-Log-Id 上设置logid
req.LogId = genLogId()
req.HttpRequest.Header.Set(bfe_basic.HeaderBfeLogId, req.LogId)
return bfe_module.BfeHandlerGoOn, nil
}
// 随机生成logid
func genLogId() string {
b := make([]byte, 16)
_, err := rand.Read(b)
if err != nil {
return ""
}
return hex.EncodeToString(b)
}
这个也不知道是干啥的,在请求上添加了一个tag
func (m *ModuleTag) init() error {
// 加载数据文件
_, err = m.loadRuleData(nil)
// 注册回调点
err = cbs.AddFilter(bfe_module.HandleFoundProduct, m.tagHandler)
return nil
}
func (m *ModuleTag) tagHandler(request *bfe_basic.Request) (int, *bfe_http.Response) {
rules, ok := m.ruleTable.Search(request.Route.Product)
for _, rule := range rules {
if rule.Cond.Match(request) {
//在请求对象上加上tags
request.AddTags(rule.Param.TagName, []string{rule.Param.TagValue})
}
}
return bfe_module.BfeHandlerGoOn, nil
}
// 具体的实现逻辑
func (req *Request) AddTags(name string, ntags []string) {
tags := req.Tags.TagTable[name]
tags = append(tags, ntags...)
req.Tags.TagTable[name] = tags
}
这个还是比较有用的,用来追踪某个Product(租户)的耗时,但是需要配置额外的trace agent ,比如Zipkin,Jaeger, elasticsearch等服务,我没配置过,所以只过一下代码主流程。
func (m *ModuleTrace) init() error {
// 初始化trace对象
globalTrace, err = trace.NewTrace(conf.Basic.ServiceName, conf.GetTraceConfig())
//
_, err = m.loadRuleData(nil)
// 分别在发现产品后和结束请求后
err = cbs.AddFilter(bfe_module.HandleFoundProduct, m.startTrace)
err = cbs.AddFilter(bfe_module.HandleRequestFinish, m.finishTrace)
return nil
}
这个是用来配置请求日志的,也略过。
这个模块比较有用,但是官方的配置参数有一些错误,下面是一个正确的配置文件
{
"Version": "20190101000000",
"Config": {
"youerning_product": [{
"Name": "youerning_prison",
"Cond": "req_path_prefix_in(\"/prison\", false)",
"accessSignConf": {
"UseClientIP": false,
"UseSocketIP": false,
"UseConnectID": false,
"UseUrl": false,
"UseHost": false,
"UsePath": false,
"UseHeaders": false,
"UrlRegexp": false,
"Query": [],
"Header": [],
"Cookie": []
},
"action": {
"cmd": "CLOSE",
"params": []
},
"checkPeriod": 1,
"stayPeriod": 10,
"threshold": 2,
"accessDictSize": 1000,
"prisonDictSize": 1000
}]
}
}
官方代码仓库里的配置文件使用的字段是url, path等,这些参数不会解析成功。
测试如下:
for i in `seq 1 4`;do curl "http://127.0.0.1:8080/prison" -H "Host: youerning.top" -i ;done
HTTP/1.1 200 OK
Content-Length: 26
Content-Type: text/plain; charset=utf-8
Date: Tue, 05 Sep 2023 03:19:41 GMT
ClusterB Backend1 /prison
HTTP/1.1 200 OK
Content-Length: 26
Content-Type: text/plain; charset=utf-8
Date: Tue, 05 Sep 2023 03:19:42 GMT
ClusterA Backend1 /prison
curl: (52) Empty reply from server
curl: (52) Empty reply from server
可以看到,处理前面两个请求成功之外,其他的请求都失败了,这是因为上面的配置参数是checkPeriod: 1, threshold:2, 意思是一秒钟超过两次请求就是直接关闭连接,关闭的持续时间是stayPeriod: 10, 也就是10秒。
一般来说,我们不会对某个地址直接限流,而是基于客户端IP, 所以我们可以配置clientIP: true启用基于客户端IP限流的功能。
值得注意的是, bfe 的mod_prison模块使用的不是漏桶算法,所以checkPeriod设置的太大可能导致后端服务崩溃,即使是1秒,其实也有可能会崩溃,假设1秒分为5个区间,然后设置的阈值是10,当第一个区间请求来了1,那么第5个区间的最大请求量能到9,而下一秒开始的时候又能最大请求到10,也就说,最坏的情况下一秒的请求是可以超过设置的阈值的,并且接近阈值的两倍!!!。示意图如下
第一秒
[1,2,3,4,5]
1,0,0,0,9
第二秒
[1, 2, 3, 4, 5]
10,0, 0, 0, 0
所以在不自己开发一个其他的限流模块的情况下,不建议这个checkPeriod设置的太大。
最后来看看代码吧
func (m *ModulePrison) Init() error {
// 1.
confPath := bfe_module.ModConfPath(cr, m.name)
conf, err := ConfLoad(confPath, cr)
m.productConfPath = conf.Basic.ProductRulePath
openDebug = conf.Log.OpenDebug
// 2.
if _, err := m.loadProductRuleTable(nil); err != nil {
return fmt.Errorf("%s.Init():loadProductRuleTable(): %s", m.name, err.Error())
}
// 3.
err = cbs.AddFilter(bfe_module.HandleFoundProduct, m.prisonHandler)
return nil
}
func (m *ModulePrison) prisonHandler(req *bfe_basic.Request) (
int, *bfe_http.Response) {
// 4.
product := bfe_basic.GlobalProduct
ret, res := m.processProductRules(req, product)
if ret != bfe_module.BfeHandlerGoOn {
return ret, res
}
// 5.
product = req.Route.Product
ret, res = m.processProductRules(req, product)
return ret, res
}
func (m *ModulePrison) processProductRules(req *bfe_basic.Request, product string) (int, *bfe_http.Response) {
rules, ok := m.productTable.getRules(product)
if !ok {
if openDebug {
log.Logger.Debug("product[%s] without prison rules, pass", product)
}
return bfe_module.BfeHandlerGoOn, nil
}
// 6.
return m.processRules(req, rules)
}
func (m *ModulePrison) processRules(req *bfe_basic.Request, rules *prisonRules) (int, *bfe_http.Response) {
for _, rule := range rules.ruleList {
if !rule.cond.Match(req) {
continue
}
// 7.
if !rule.recordAndCheck(req) {
continue
}
// 如果被屏蔽了,就根据设置的action进行返回
switch rule.action.Cmd {
case action.ActionClose:
req.ErrCode = ErrPrison
return bfe_module.BfeHandlerClose, nil
case action.ActionFinish:
req.ErrCode = ErrPrison
return bfe_module.BfeHandlerFinish, nil
default:
rule.action.Do(req)
}
}
return bfe_module.BfeHandlerGoOn, nil
}
func (r *prisonRule) recordAndCheck(req *bfe_basic.Request) bool {
// 8.
sign, err := r.accessSigner.Sign(r.condStr, req)
// 9.
if deny := r.shouldDeny(sign, req); deny {
return deny
}
// 10
r.recordAccess(sign)
return r.shouldDeny(sign, req)
}
代码分解如下:
Product租户为维度,所以放在找到Product租户之后,倒是没有问题,但是限流感觉放在HandleAccept阶段效率更高。Product租户对应的规则下面看看签名的逻辑
func (s *AccessSigner) Sign(label string, req *bfe_basic.Request) (AccessSign, error) {
// 1.从请求中生成要作为签名的数据
data, err := s.prepareData(label, req)
// 2. 基于上一步的数据使用md5算法计算出一个唯一标识
return AccessSign(md5.Sum(data)), nil
}
func (s *AccessSigner) prepareData(label string, req *bfe_basic.Request) ([]byte, error) {
var buf bytes.Buffer
// label就是配置的Cond字段的值
buildKeyValue(&buf, "label", label)
// 如果启用clientIP签名,就将数据追加到这个buf里面
if s.UseClientIP {
if req.ClientAddr == nil {
return nil, errors.New("request without client ip")
}
buildKeyValue(&buf, "clientIP", req.ClientAddr.IP.String())
}
// 和clientIP类似
if s.UseUrl {
buildKeyValue(&buf, "url", req.HttpRequest.RequestURI)
}
// 省略其他字段的设置
return buf.Bytes(), nil
}
从上面的代码知道,如果我们的accessSignConf不启用任何其他标识的话,那么就是作用整个规则,也就是说不同的用户都受制于这个限流规则,一般情况下我们会至少加个UseClientIP, 然后可以根据自己的应用需求增加其他的字段标识用户。
当我们得到了标识请求的签名,就可以基于这个KEY来判断用户是否可以继续访问了。
func (r *prisonRule) recordAndCheck(req *bfe_basic.Request) bool {
sign, err := r.accessSigner.Sign(r.condStr, req)
if deny := r.shouldDeny(sign, req); deny {
return deny
}
r.recordAccess(sign)
return r.shouldDeny(sign, req)
}
func (r *prisonRule) shouldDeny(sign AccessSign, req *bfe_basic.Request) bool {
// 1.
freeTimeNs, ok := r.prisonDict.Get(sign)
if !ok {
return false
}
// 2.
if time.Now().UnixNano() < freeTimeNs.(int64) {
prisonInfo := &PrisonInfo{
PrisonType: ModPrison,
PrisonName: r.name,
FreeTime: time.Unix(0, freeTimeNs.(int64)),
IsExpired: false,
Action: r.action.Cmd,
}
req.SetContext(ReqCtxPrisonInfo, prisonInfo)
return true
}
// remove prison record if expired
// 3.
r.prisonDict.Del(sign)
return false
}
func (r *prisonRule) recordAccess(sign AccessSign) {
var f *AccessCounter
// 4.
value, ok := r.accessDict.Get(sign)
if !ok {
f = NewAccessCounter()
r.accessDict.Add(sign, f)
} else {
f = value.(*AccessCounter)
}
// 5.
if block, restTimeNs := f.IncAndCheck(r.checkPeriodNs, r.threshold); block {
freeTimeNs := r.stayPeriodNs + restTimeNs + time.Now().UnixNano()
r.prisonDict.Add(sign, freeTimeNs)
r.accessDict.Del(sign)
}
}
func (c *AccessCounter) IncAndCheck(checkPeriodNs int64, threshold int32) (bool, int64) {
// 6.
now := time.Now().UnixNano()
stime := atomic.LoadInt64(&c.startTime)
if stime+checkPeriodNs < now { // reset count
c.reset()
}
// 7.
count := atomic.AddInt32(&c.count, 1)
// 8.
stime = atomic.LoadInt64(&c.startTime)
return count > threshold, stime + checkPeriodNs - now
}
代码分解如下:
atomic.AddInt32**方法,这样能保证原子性。mod_prison有两个LRUCache对象, accessDict用于存储所有请求信息,prisonDict用于存储被限流的信息。
为什么用LRUCache这样的缓存对象呢?
因为随着请求的不断积累,内存会无限膨胀,所以需要限制记录的数据大小,官方代码仓库的配置是1000, 即"accessDictSize": 1000,"prisonDictSize": 1000两个配置参数,大家可以根据自己的实际情况配置,一个AccessCounter占用内存14字节, 1000 * 14差不多13KB 左右, 如果算上底层存储的interface{}, list.Element的抽象可能差不多56字节一条记录,10MB内存可以记录差不多1872457条记录,所以配置的时候可以适当设置大一点,不用担心。
很多模块都用到了两个通用的部分Cond和Action, 前者用于匹配请求或者响应中的条件,在BFE的官方文档称之为原语,这个需要查阅官方文档或者源代码,大多数原语的命名都是比较直观的,所以这个不会过于深入,Action则是一个动作,当条件匹配之后可以执行,比如拒绝。
还有就是很多模块都支持global的设置,即对所有产品生效,这个做全局配置很有用。
据我所知,所有开源负载均衡都会提供至少一种扩展机制,BFE也不例外,BFE通过模块的选择可以更精细的控制BFE在处理请求中的各个阶段。如果内置模块不能满足自己需求,那么可以自己开发模块,而BFE是用Golang写的,所以开发效率很高。
下面是BFE各个回调点的位置。

BFE的模块和其他开源产品的一个很大的不同点是,BFE基于Product(租户)维度控制,而其他大多数产品都是基于Host(域名)。
首先看看BFE是怎么加载和启用模块的。
func StartUp(cfg bfe_conf.BfeConfig, version string, confRoot string) error {
var err error
// 1.
bfe_modules.SetModules()
// 2.
if err = bfeServer.RegisterModules(cfg.Server.Modules); err != nil {
log.Logger.Error("StartUp(): RegisterModules():%s", err.Error())
return err
}
// 3.
if err = bfeServer.InitModules(); err != nil {
log.Logger.Error("StartUp(): bfeServer.InitModules():%s",
err.Error())
return err
}
}
// 4.
func SetModules() {
for _, module := range moduleList {
bfe_module.AddModule(module)
}
}
// 5.
func (srv *BfeServer) RegisterModules(modules []string) error {
for _, moduleName := range modules {
moduleName = strings.TrimSpace(moduleName)
if err := srv.Modules.RegisterModule(moduleName); err != nil {
return err
}
}
return nil
}
// 6.
func (bm *BfeModules) RegisterModule(name string) error {
module, ok := moduleMap[name]
bm.workModules[name] = module
return nil
}
// 7.
func (srv *BfeServer) InitModules() error {
return srv.Modules.Init(srv.CallBacks, srv.Monitor.WebHandlers, srv.ConfRoot)
}
func (bm *BfeModules) Init(cbs *BfeCallbacks, whs *web_monitor.WebHandlers, cr string) error {
// 8.
for _, name := range modulesAll {
module, ok := bm.workModules[name]
if ok {
err := module.Init(cbs, whs, cr)
modulesEnabled = append(modulesEnabled, name)
}
}
return nil
}
代码分解如下:
BFE代码中所有集成的模块Modules参数依次尝试注册模块BfeServer对象上从上面的的代码可以看到,模块的初始化主要分为三个部分,加载所有可用的模块,注册用户配置的模块,将注册的模块依次初始化,也就是调用其对应的Init方法,BFE的模块初始化流程还是比较简单明了的。
下面开始看常见模块的初始化逻辑。
根据前面的代码,我们可以直接找模块的Init方法。
这个模块用于设置请求的客户端IP是否是可信的,可信的IP段由用户配置。
// 1.
func (m *ModuleTrustClientIP) Init() error {
confPath := bfe_module.ModConfPath(cr, m.name)
if conf, err = ConfLoad(confPath, cr); err != nil {
return fmt.Errorf("%s: conf load err %s", m.name, err.Error())
}
return m.init(conf, cbs, whs)
}
func (m *ModuleTrustClientIP) init(cfg *ConfModTrustClientIP, cbs *bfe_module.BfeCallbacks,
whs *web_monitor.WebHandlers) error {
m.configPath = cfg.Basic.DataPath
// 2.
m.trustTable = ipdict.NewIPTable()
// 3.
if err := m.loadConfData(nil); err != nil {
return fmt.Errorf("err in loadConfData(): %s", err.Error())
}
// 4.
err := cbs.AddFilter(bfe_module.HandleAccept, m.acceptHandler)
// 5.
err = web_monitor.RegisterHandlers(whs, web_monitor.WebHandleMonitor, m.monitorHandlers())
// 6.
err = whs.RegisterHandler(web_monitor.WebHandleReload, m.name, m.loadConfData)
return nil
}
func (m *ModuleTrustClientIP) acceptHandler(session *bfe_basic.Session) int {
m.state.ConnTotal.Inc(1)
trusted := m.trustTable.Search(session.RemoteAddr.IP)
if trusted {
m.state.ConnTrustClientip.Inc(1)
}
// 7.
session.SetTrustSource(trusted)
// state for internal remote ip
if session.RemoteAddr.IP.IsPrivate() {
m.state.ConnAddrInternal.Inc(1)
if !trusted {
m.state.ConnAddrInternalNotTrust.Inc(1)
}
}
return bfe_module.BfeHandlerGoOn
}
代码分解如下:
m.acceptHandler注册到回调函数中reload时被调用的函数true。这个模块并不会拒绝请求,它只是在请求的Session(会话)对象的isTrustSourcez字段设置一个true。这个布尔值只有在条件匹配时使用条件原语req_cip_trusted才会应用。
这个模块主要用于屏蔽指定的IP段,以及根据条件原语来屏蔽。为了更好理解看看它的测试配置文件
{
"Version": "init version",
"Config": {
"example_product": [
{
"action": {
"cmd": "CLOSE",
"params": []
},
"name": "example rule",
"cond": "req_path_in(\"/limit\", false)"
}
]
}
}
这个配置文件用于屏蔽匹配的规则
192.168.1.253 192.168.1.254
192.168.1.250
192.168.1.250/20
这个配置文件用于指定要屏蔽的IP段
最后看看代码
func (m *ModuleBlock) Init() error {
// 1.
confPath := bfe_module.ModConfPath(cr, m.name)
if conf, err = ConfLoad(confPath, cr); err != nil {
return fmt.Errorf("%s: conf load err %s", m.name, err.Error())
}
// 2.
if err = m.loadGlobalIPTable(nil); err != nil {
return fmt.Errorf("%s: loadGlobalIPTable() err %s", m.name, err.Error())
}
// 3.
if err = m.loadProductRuleConf(nil); err != nil {
return fmt.Errorf("%s: loadProductRuleConf() err %s", m.name, err.Error())
}
// 4.
err = cbs.AddFilter(bfe_module.HandleAccept, m.globalBlockHandler)
// 5.
err = cbs.AddFilter(bfe_module.HandleFoundProduct, m.productBlockHandler)
return nil
}
// 6.
func (m *ModuleBlock) globalBlockHandler(session *bfe_basic.Session) int {
clientIP := session.RemoteAddr.IP
if m.ipTable.Search(clientIP) {
session.SetError(ErrBlock, "connection blocked")
return bfe_module.BfeHandlerClose
}
return bfe_module.BfeHandlerGoOn
}
// 7.
func (m *ModuleBlock) productBlockHandler() {
// 8.
rules, ok := m.ruleTable.Search(bfe_basic.GlobalProduct)
if ok { // rules found
retVal, isMatch, resp := m.productRulesProcess(request, rules)
if isMatch {
return retVal, resp
}
}
// 9.
rules, ok = m.ruleTable.Search(request.Route.Product)
if !ok {
return bfe_module.BfeHandlerGoOn, nil
}
retVal, isMatch, resp := m.productRulesProcess(request, rules)
if !isMatch {
m.state.ReqAccept.Inc(1)
}
return retVal, resp
}
代码分解如下:
Product租户分类的屏蔽的规则globalBlockHandlerproductBlockHandlerglobal的product租户名,这个特殊的租户名会全局生效这个模块主要使用用来修改头信息,为了更好理解看看它的测试配置文件
{
"Version": "init version",
"Config": {
"example_product": [
{
"cond": "req_path_prefix_in(\"/header\", false)",
"actions": [
{
"cmd": "RSP_HEADER_SET",
"params": [
"X-Proxied-By",
"bfe"
]
}
],
"last": true
}
]
}
}
配置的意思就是, 当请求的路径前缀包含/header, 就设置X-Proxied-By: bfe这样的HTTP头信息。
func (m *ModuleHeader) Init(cbs *bfe_module.BfeCallbacks, whs *web_monitor.WebHandlers,
// 1.
confPath := bfe_module.ModConfPath(cr, m.name)
if conf, err = ConfLoad(confPath, cr); err != nil {
return fmt.Errorf("%s: conf load err %s", m.name, err.Error())
}
return m.init(conf, cbs, whs)
}
func (m *ModuleHeader) init() error {
// 2.
if err := m.loadConfData(nil); err != nil {
return fmt.Errorf("err in loadConfData(): %s", err.Error())
}
// 3.
err := cbs.AddFilter(bfe_module.HandleAfterLocation, m.reqHeaderHandler)
err = cbs.AddFilter(bfe_module.HandleReadResponse, m.rspHeaderHandler)
return nil
}
代码分解如下:
这个模块的功能逻辑比较直观,具体的实现代码就不看了。
这个模块用来改写请求,可以修改三部分的数据,源码如下:
var allowActions = map[string]interface{}{
// host actions
action.ActionHostSetFromPathPrefix: nil, // set host from path prefix
action.ActionHostSet: nil, //set host
action.ActionHostSuffixReplace: nil, // replace host suffix
// path actions
action.ActionPathSet: nil, // set path
action.ActionPathPrefixAdd: nil, // add path prefix
action.ActionPathPrefixTrim: nil, // trim path prefix
// query actions
action.ActionQueryAdd: nil, // add query
action.ActionQueryDel: nil, // del query
action.ActionQueryRename: nil, // rename query
action.ActionQueryDelAllExcept: nil, // del query except given query key
}
根据名字应该就能看出来, Host, Path, Query三个字段的值。
下面是一个示例配置文件
{
"Version": "init version",
"Config": {
"youerning_product": [
{
"Cond": "req_path_prefix_in(\"/rewrite\", false)",
"Actions": [
{
"Cmd": "PATH_PREFIX_ADD",
"Params": [
"/bfe/"
]
}
],
"Last": true
}
]
}
}
上面配置的意思是,在URL路径上加上/bfe/, 如果路径的前缀是/rewrite的话。
测试如下:
curl "http://127.0.0.1:8080/rewrite" -H "Host: youerning.top"
ClusterB Backend1 /bfe/rewrite
其实模块的初始化代码都差不多,后面都会简单的过一下初始化过程,主要是看看它的回调点在哪。
func (m *ModuleReWrite) Init() error {
// 加载配置文件
confPath := bfe_module.ModConfPath(cr, m.name)
if conf, err = ConfLoad(confPath, cr); err != nil {
return fmt.Errorf("%s: conf load err %s", m.name, err.Error())
}
return m.init(conf, cbs, whs)
}
func (m *ModuleReWrite) init() error {
// HandleAfterLocation的回调会被调用
err := cbs.AddFilter(bfe_module.HandleAfterLocation, m.rewriteHandler)
return nil
}
这个模块的名字也很直观,就是重定向,我们通过配置文件来学习。
{
"Version": "init version",
"Config": {
"youerning_product": [
{
"Cond": "req_path_prefix_in(\"/redirect\", false)",
"Actions": [
{
"Cmd": "URL_SET",
"Params": ["http://127.0.0.1:18002"]
}
],
"Status": 301
}
]
}
}
上面配置的意思是,重定向到http://127.0.0.1:18002, 如果路径的前缀是/redirect的话。
测试如下:
curl "http://127.0.0.1:8080/redirect" -H "Host: youerning.top" -i -L
HTTP/1.1 301 Moved Permanently
Location: http://127.0.0.1:18002
Server: bfe
Date: Tue, 05 Sep 2023 01:40:53 GMT
Content-Length: 57
Content-Type: text/html; charset=utf-8
HTTP/1.1 200 OK
Date: Tue, 05 Sep 2023 01:40:53 GMT
Content-Length: 20
Content-Type: text/plain; charset=utf-8
ClusterA Backend2 /
可以看到,最先得到的响应时301,被重定向的位置是http://127.0.0.1:18002
这个模块用来生成一个随机的日志ID, 不需要配置文件,暂时不知道干啥的。
func (m *ModuleLogId) Init(cbs *bfe_module.BfeCallbacks, whs *web_monitor.WebHandlers,
cr string) error {
// 分别在HandleAccept, HandleBeforeLocation创建logid
err := cbs.AddFilter(bfe_module.HandleAccept, m.sessionIdHandler)
err = cbs.AddFilter(bfe_module.HandleBeforeLocation, m.requestIdHandler)
return nil
}
func (m *ModuleLogId) sessionIdHandler(session *bfe_basic.Session) int {
// 在session上设置logid
session.SessionId = genLogId()
return bfe_module.BfeHandlerGoOn
}
func (m *ModuleLogId) requestIdHandler() {
// 在HTTP头 X-Bfe-Log-Id 上设置logid
req.LogId = genLogId()
req.HttpRequest.Header.Set(bfe_basic.HeaderBfeLogId, req.LogId)
return bfe_module.BfeHandlerGoOn, nil
}
// 随机生成logid
func genLogId() string {
b := make([]byte, 16)
_, err := rand.Read(b)
if err != nil {
return ""
}
return hex.EncodeToString(b)
}
这个也不知道是干啥的,在请求上添加了一个tag
func (m *ModuleTag) init() error {
// 加载数据文件
_, err = m.loadRuleData(nil)
// 注册回调点
err = cbs.AddFilter(bfe_module.HandleFoundProduct, m.tagHandler)
return nil
}
func (m *ModuleTag) tagHandler(request *bfe_basic.Request) (int, *bfe_http.Response) {
rules, ok := m.ruleTable.Search(request.Route.Product)
for _, rule := range rules {
if rule.Cond.Match(request) {
//在请求对象上加上tags
request.AddTags(rule.Param.TagName, []string{rule.Param.TagValue})
}
}
return bfe_module.BfeHandlerGoOn, nil
}
// 具体的实现逻辑
func (req *Request) AddTags(name string, ntags []string) {
tags := req.Tags.TagTable[name]
tags = append(tags, ntags...)
req.Tags.TagTable[name] = tags
}
这个还是比较有用的,用来追踪某个Product(租户)的耗时,但是需要配置额外的trace agent ,比如Zipkin,Jaeger, elasticsearch等服务,我没配置过,所以只过一下代码主流程。
func (m *ModuleTrace) init() error {
// 初始化trace对象
globalTrace, err = trace.NewTrace(conf.Basic.ServiceName, conf.GetTraceConfig())
//
_, err = m.loadRuleData(nil)
// 分别在发现产品后和结束请求后
err = cbs.AddFilter(bfe_module.HandleFoundProduct, m.startTrace)
err = cbs.AddFilter(bfe_module.HandleRequestFinish, m.finishTrace)
return nil
}
这个是用来配置请求日志的,也略过。
这个模块比较有用,但是官方的配置参数有一些错误,下面是一个正确的配置文件
{
"Version": "20190101000000",
"Config": {
"youerning_product": [{
"Name": "youerning_prison",
"Cond": "req_path_prefix_in(\"/prison\", false)",
"accessSignConf": {
"UseClientIP": false,
"UseSocketIP": false,
"UseConnectID": false,
"UseUrl": false,
"UseHost": false,
"UsePath": false,
"UseHeaders": false,
"UrlRegexp": false,
"Query": [],
"Header": [],
"Cookie": []
},
"action": {
"cmd": "CLOSE",
"params": []
},
"checkPeriod": 1,
"stayPeriod": 10,
"threshold": 2,
"accessDictSize": 1000,
"prisonDictSize": 1000
}]
}
}
官方代码仓库里的配置文件使用的字段是url, path等,这些参数不会解析成功。
测试如下:
for i in `seq 1 4`;do curl "http://127.0.0.1:8080/prison" -H "Host: youerning.top" -i ;done
HTTP/1.1 200 OK
Content-Length: 26
Content-Type: text/plain; charset=utf-8
Date: Tue, 05 Sep 2023 03:19:41 GMT
ClusterB Backend1 /prison
HTTP/1.1 200 OK
Content-Length: 26
Content-Type: text/plain; charset=utf-8
Date: Tue, 05 Sep 2023 03:19:42 GMT
ClusterA Backend1 /prison
curl: (52) Empty reply from server
curl: (52) Empty reply from server
可以看到,处理前面两个请求成功之外,其他的请求都失败了,这是因为上面的配置参数是checkPeriod: 1, threshold:2, 意思是一秒钟超过两次请求就是直接关闭连接,关闭的持续时间是stayPeriod: 10, 也就是10秒。
一般来说,我们不会对某个地址直接限流,而是基于客户端IP, 所以我们可以配置clientIP: true启用基于客户端IP限流的功能。
值得注意的是, bfe 的mod_prison模块使用的不是漏桶算法,所以checkPeriod设置的太大可能导致后端服务崩溃,即使是1秒,其实也有可能会崩溃,假设1秒分为5个区间,然后设置的阈值是10,当第一个区间请求来了1,那么第5个区间的最大请求量能到9,而下一秒开始的时候又能最大请求到10,也就说,最坏的情况下一秒的请求是可以超过设置的阈值的,并且接近阈值的两倍!!!。示意图如下
第一秒
[1,2,3,4,5]
1,0,0,0,9
第二秒
[1, 2, 3, 4, 5]
10,0, 0, 0, 0
所以在不自己开发一个其他的限流模块的情况下,不建议这个checkPeriod设置的太大。
最后来看看代码吧
func (m *ModulePrison) Init() error {
// 1.
confPath := bfe_module.ModConfPath(cr, m.name)
conf, err := ConfLoad(confPath, cr)
m.productConfPath = conf.Basic.ProductRulePath
openDebug = conf.Log.OpenDebug
// 2.
if _, err := m.loadProductRuleTable(nil); err != nil {
return fmt.Errorf("%s.Init():loadProductRuleTable(): %s", m.name, err.Error())
}
// 3.
err = cbs.AddFilter(bfe_module.HandleFoundProduct, m.prisonHandler)
return nil
}
func (m *ModulePrison) prisonHandler(req *bfe_basic.Request) (
int, *bfe_http.Response) {
// 4.
product := bfe_basic.GlobalProduct
ret, res := m.processProductRules(req, product)
if ret != bfe_module.BfeHandlerGoOn {
return ret, res
}
// 5.
product = req.Route.Product
ret, res = m.processProductRules(req, product)
return ret, res
}
func (m *ModulePrison) processProductRules(req *bfe_basic.Request, product string) (int, *bfe_http.Response) {
rules, ok := m.productTable.getRules(product)
if !ok {
if openDebug {
log.Logger.Debug("product[%s] without prison rules, pass", product)
}
return bfe_module.BfeHandlerGoOn, nil
}
// 6.
return m.processRules(req, rules)
}
func (m *ModulePrison) processRules(req *bfe_basic.Request, rules *prisonRules) (int, *bfe_http.Response) {
for _, rule := range rules.ruleList {
if !rule.cond.Match(req) {
continue
}
// 7.
if !rule.recordAndCheck(req) {
continue
}
// 如果被屏蔽了,就根据设置的action进行返回
switch rule.action.Cmd {
case action.ActionClose:
req.ErrCode = ErrPrison
return bfe_module.BfeHandlerClose, nil
case action.ActionFinish:
req.ErrCode = ErrPrison
return bfe_module.BfeHandlerFinish, nil
default:
rule.action.Do(req)
}
}
return bfe_module.BfeHandlerGoOn, nil
}
func (r *prisonRule) recordAndCheck(req *bfe_basic.Request) bool {
// 8.
sign, err := r.accessSigner.Sign(r.condStr, req)
// 9.
if deny := r.shouldDeny(sign, req); deny {
return deny
}
// 10
r.recordAccess(sign)
return r.shouldDeny(sign, req)
}
代码分解如下:
Product租户为维度,所以放在找到Product租户之后,倒是没有问题,但是限流感觉放在HandleAccept阶段效率更高。Product租户对应的规则下面看看签名的逻辑
func (s *AccessSigner) Sign(label string, req *bfe_basic.Request) (AccessSign, error) {
// 1.从请求中生成要作为签名的数据
data, err := s.prepareData(label, req)
// 2. 基于上一步的数据使用md5算法计算出一个唯一标识
return AccessSign(md5.Sum(data)), nil
}
func (s *AccessSigner) prepareData(label string, req *bfe_basic.Request) ([]byte, error) {
var buf bytes.Buffer
// label就是配置的Cond字段的值
buildKeyValue(&buf, "label", label)
// 如果启用clientIP签名,就将数据追加到这个buf里面
if s.UseClientIP {
if req.ClientAddr == nil {
return nil, errors.New("request without client ip")
}
buildKeyValue(&buf, "clientIP", req.ClientAddr.IP.String())
}
// 和clientIP类似
if s.UseUrl {
buildKeyValue(&buf, "url", req.HttpRequest.RequestURI)
}
// 省略其他字段的设置
return buf.Bytes(), nil
}
从上面的代码知道,如果我们的accessSignConf不启用任何其他标识的话,那么就是作用整个规则,也就是说不同的用户都受制于这个限流规则,一般情况下我们会至少加个UseClientIP, 然后可以根据自己的应用需求增加其他的字段标识用户。
当我们得到了标识请求的签名,就可以基于这个KEY来判断用户是否可以继续访问了。
func (r *prisonRule) recordAndCheck(req *bfe_basic.Request) bool {
sign, err := r.accessSigner.Sign(r.condStr, req)
if deny := r.shouldDeny(sign, req); deny {
return deny
}
r.recordAccess(sign)
return r.shouldDeny(sign, req)
}
func (r *prisonRule) shouldDeny(sign AccessSign, req *bfe_basic.Request) bool {
// 1.
freeTimeNs, ok := r.prisonDict.Get(sign)
if !ok {
return false
}
// 2.
if time.Now().UnixNano() < freeTimeNs.(int64) {
prisonInfo := &PrisonInfo{
PrisonType: ModPrison,
PrisonName: r.name,
FreeTime: time.Unix(0, freeTimeNs.(int64)),
IsExpired: false,
Action: r.action.Cmd,
}
req.SetContext(ReqCtxPrisonInfo, prisonInfo)
return true
}
// remove prison record if expired
// 3.
r.prisonDict.Del(sign)
return false
}
func (r *prisonRule) recordAccess(sign AccessSign) {
var f *AccessCounter
// 4.
value, ok := r.accessDict.Get(sign)
if !ok {
f = NewAccessCounter()
r.accessDict.Add(sign, f)
} else {
f = value.(*AccessCounter)
}
// 5.
if block, restTimeNs := f.IncAndCheck(r.checkPeriodNs, r.threshold); block {
freeTimeNs := r.stayPeriodNs + restTimeNs + time.Now().UnixNano()
r.prisonDict.Add(sign, freeTimeNs)
r.accessDict.Del(sign)
}
}
func (c *AccessCounter) IncAndCheck(checkPeriodNs int64, threshold int32) (bool, int64) {
// 6.
now := time.Now().UnixNano()
stime := atomic.LoadInt64(&c.startTime)
if stime+checkPeriodNs < now { // reset count
c.reset()
}
// 7.
count := atomic.AddInt32(&c.count, 1)
// 8.
stime = atomic.LoadInt64(&c.startTime)
return count > threshold, stime + checkPeriodNs - now
}
代码分解如下:
atomic.AddInt32**方法,这样能保证原子性。mod_prison有两个LRUCache对象, accessDict用于存储所有请求信息,prisonDict用于存储被限流的信息。
为什么用LRUCache这样的缓存对象呢?
因为随着请求的不断积累,内存会无限膨胀,所以需要限制记录的数据大小,官方代码仓库的配置是1000, 即"accessDictSize": 1000,"prisonDictSize": 1000两个配置参数,大家可以根据自己的实际情况配置,一个AccessCounter占用内存14字节, 1000 * 14差不多13KB 左右, 如果算上底层存储的interface{}, list.Element的抽象可能差不多56字节一条记录,10MB内存可以记录差不多1872457条记录,所以配置的时候可以适当设置大一点,不用担心。
很多模块都用到了两个通用的部分Cond和Action, 前者用于匹配请求或者响应中的条件,在BFE的官方文档称之为原语,这个需要查阅官方文档或者源代码,大多数原语的命名都是比较直观的,所以这个不会过于深入,Action则是一个动作,当条件匹配之后可以执行,比如拒绝。
还有就是很多模块都支持global的设置,即对所有产品生效,这个做全局配置很有用。