client-go提供了许多有用的工具,其中就包括选举,通过选举的功能,我们可以同时运行多个业务实例,多个实例之间互为备份,并且同一时刻只有一个实例运行。
选举的逻辑并不复杂,复杂的是如何在一个分布式架构选举,由于etcd应该实现了共识算法,所以分布式的一些问题就不需要考虑了(比如脑裂)。选举的逻辑大致可以分为以下几步。
这个例子来自client-go官方的例子, 简单的改了一下
源代码地址: https://github.com/kubernetes/client-go/blob/v0.20.2/examples/leader-election/main.go
func main() {
var kubeconfig *string
var leaseLockName string
var leaseLockNamespace string
var id string
if home := homedir.HomeDir(); home != "" {
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
} else {
kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
}
flag.StringVar(&id, "id", uuid.New().String(), "the holder identity name")
flag.StringVar(&leaseLockName, "lease-lock-name", "client-go-election", "the lease lock resource name")
flag.StringVar(&leaseLockNamespace, "lease-lock-namespace", "default", "the lease lock resource namespace")
flag.Parse()
// 1.
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
client := clientset.NewForConfigOrDie(config)
// 2.
run := func(ctx context.Context) {
klog.Info("现在我是leader了")
select {}
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 3.
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
go func() {
<-ch
klog.Info("Received termination, signaling shutdown")
cancel()
}()
// 4.
lock := &resourcelock.LeaseLock{
LeaseMeta: metav1.ObjectMeta{
Name: leaseLockName,
Namespace: leaseLockNamespace,
},
Client: client.CoordinationV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: id,
},
}
// 5.
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: lock,
ReleaseOnCancel: true,
LeaseDuration: 60 * time.Second,
RenewDeadline: 15 * time.Second,
RetryPeriod: 5 * time.Second,
// 6.
Callbacks: leaderelection.LeaderCallbacks{
//
OnStartedLeading: func(ctx context.Context) {
run(ctx)
},
//
OnStoppedLeading: func() {
//
klog.Infof("leader lost: %s", id)
os.Exit(0)
},
//
OnNewLeader: func(identity string) {
// 7.
if identity == id {
return
}
klog.Infof("new leader elected: %s", identity)
},
},
})
}
代码分解如下:
rest.Config并构建静态客户端集合ctrl + c或者kill -15 pid这样的退出信号, 当收到信号后就可以关闭context, 而context的关闭可以通知leaderelection,这样leaderelection就会执行一些应该收尾的回调函数,比如用户注册的业务逻辑,或者释放租约如果你执行了上述代码,你可以在k8s集群查询到配置的租约lease
kubectl get leases election
# 输出如下
NAME HOLDER AGE
election ea0a3e2a-d8ec-4fa8-8581-02a0507337c9 16s
holder对应的是生成的uuid, 即
Identity对应的值
选举并没有太多初始化的操作,通过传入的LeaderElectionConfig直接运行。
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
// 省略相关配置
}
func RunOrDie(ctx context.Context, lec LeaderElectionConfig) {
// 1.
le, err := NewLeaderElector(lec)
// 2.
le.Run(ctx)
}
func (le *LeaderElector) Run(ctx context.Context) {
defer runtime.HandleCrash()
defer func() {
// 3.
le.config.Callbacks.OnStoppedLeading()
}()
// 4.
if !le.acquire(ctx) {
return // ctx signalled done
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// 5.
go le.config.Callbacks.OnStartedLeading(ctx)
// 6.
le.renew(ctx)
}
代码分解如下:
LeaderElector,创建的过程并不复杂,主要是校验各个参数是否合法根据上面的代码我们知道,选举的代码存在两个状态。
le.acquire(ctx)le.renew(ctx)这个状态会不断的尝试获取租约以成为Leader.
func (le *LeaderElector) acquire(ctx context.Context) bool {
// 1.
ctx, cancel := context.WithCancel(ctx)
defer cancel()
succeeded := false
wait.JitterUntil(func() {
// 2.
succeeded = le.tryAcquireOrRenew(ctx)
// 3.
le.maybeReportTransition()
// 4.
if !succeeded {
klog.V(4).Infof("failed to acquire lease %v", desc)
return
}
// 5.
cancel()
}, le.config.RetryPeriod, JitterFactor, true, ctx.Done())
return succeeded
}
代码分解如下:
wait.JitterUntilOnNewLeaderwait.JitterUntil里执行,所以退出之后还会不断的重试context的cancel来停止wait.JitterUntil。client-go里面的k8s.io/apimachinery/pkg/util/wait 有各种重试逻辑的帮助函数,比如这里的
wait.JitterUntil
而尝试获取租约的代码如下:
func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
now := metav1.Now()
leaderElectionRecord := rl.LeaderElectionRecord{
HolderIdentity: le.config.Lock.Identity(),
LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second),
RenewTime: now,
AcquireTime: now,
}
// 1.
oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx)
if err != nil {
// 2.
if !errors.IsNotFound(err) {
return false
}
if err = le.config.Lock.Create(ctx, leaderElectionRecord); err != nil {
klog.Errorf("error initially creating leader election record: %v", err)
return false
}
le.observedRecord = leaderElectionRecord
le.observedTime = le.clock.Now()
return true
}
// 3.
if len(oldLeaderElectionRecord.HolderIdentity) > 0 &&
le.observedTime.Add(le.config.LeaseDuration).After(now.Time) &&
!le.IsLeader() {
klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)
return false
}
// 4.
if le.IsLeader() {
leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime
leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions
} else {
leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1
}
// 5.
if err = le.config.Lock.Update(ctx, leaderElectionRecord); err != nil {
klog.Errorf("Failed to update lock: %v", err)
return false
}
return true
}
func (ll *LeaseLock) Get(ctx context.Context) (*LeaderElectionRecord, []byte, error) {
var err error
// 6.
ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Get(ctx, ll.LeaseMeta.Name, metav1.GetOptions{})
record := LeaseSpecToLeaderElectionRecord(&ll.lease.Spec)
recordByte, err := json.Marshal(*record)
if err != nil {
return nil, nil, err
}
return record, recordByte, nil
}
代码分解如下:
tryAcquireOrRenew.tryAcquireOrRenew这段逻辑会在称为Leader之前不断的尝试。
如果获取到了租约,那么就会成为Leader,成为了Leader也不是一劳永逸,而是要不断的刷新租约,告诉其他选举者自己还在线,你们这些家伙休想取而代之。
func (le *LeaderElector) Run(ctx context.Context) {
// 省略获取租约等逻辑
le.renew(ctx)
}
func (le *LeaderElector) renew(ctx context.Context) {
// 1.
ctx, cancel := context.WithCancel(ctx)
defer cancel()
wait.Until(func() {
// 2.
timeoutCtx, timeoutCancel := context.WithTimeout(ctx, le.config.RenewDeadline)
defer timeoutCancel()
// 3.
err := wait.PollImmediateUntil(le.config.RetryPeriod, func() (bool, error) {
return le.tryAcquireOrRenew(timeoutCtx), nil
}, timeoutCtx.Done())
// 4.
le.maybeReportTransition()
desc := le.config.Lock.Describe()
if err == nil {
klog.V(5).Infof("successfully renewed lease %v", desc)
return
}
// 5.
le.config.Lock.RecordEvent("stopped leading")
le.metrics.leaderOff(le.config.Name)
klog.Infof("failed to renew lease %v: %v", desc, err)
cancel()
}, le.config.RetryPeriod, ctx.Done())
// 6.
if le.config.ReleaseOnCancel {
le.release()
}
}
代码分解如下:
如果不熟悉k8s的部署,可能会有个问题,Leader掉线之后岂不是少了一个参选者? 如果网络环境差的话,,岂不是参选者越来越少?那么最后岂不是一个选举者都没了,怎么做到高可用?
为了避免这种情况,应该有两个前提
OnStartedLeading,这样从Leader状态掉线后就会终止程序通过client-go和k8s环境我们可以很容易写一个高可用的应用,这样的高可用应用可以保证总是有一个实例处理业务,因为只有一个实例处理业务,那么可以避免一些并发冲突的问题。
client-go提供了许多有用的工具,其中就包括选举,通过选举的功能,我们可以同时运行多个业务实例,多个实例之间互为备份,并且同一时刻只有一个实例运行。
选举的逻辑并不复杂,复杂的是如何在一个分布式架构选举,由于etcd应该实现了共识算法,所以分布式的一些问题就不需要考虑了(比如脑裂)。选举的逻辑大致可以分为以下几步。
这个例子来自client-go官方的例子, 简单的改了一下
源代码地址: https://github.com/kubernetes/client-go/blob/v0.20.2/examples/leader-election/main.go
func main() {
var kubeconfig *string
var leaseLockName string
var leaseLockNamespace string
var id string
if home := homedir.HomeDir(); home != "" {
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
} else {
kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
}
flag.StringVar(&id, "id", uuid.New().String(), "the holder identity name")
flag.StringVar(&leaseLockName, "lease-lock-name", "client-go-election", "the lease lock resource name")
flag.StringVar(&leaseLockNamespace, "lease-lock-namespace", "default", "the lease lock resource namespace")
flag.Parse()
// 1.
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
client := clientset.NewForConfigOrDie(config)
// 2.
run := func(ctx context.Context) {
klog.Info("现在我是leader了")
select {}
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 3.
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
go func() {
<-ch
klog.Info("Received termination, signaling shutdown")
cancel()
}()
// 4.
lock := &resourcelock.LeaseLock{
LeaseMeta: metav1.ObjectMeta{
Name: leaseLockName,
Namespace: leaseLockNamespace,
},
Client: client.CoordinationV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: id,
},
}
// 5.
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: lock,
ReleaseOnCancel: true,
LeaseDuration: 60 * time.Second,
RenewDeadline: 15 * time.Second,
RetryPeriod: 5 * time.Second,
// 6.
Callbacks: leaderelection.LeaderCallbacks{
//
OnStartedLeading: func(ctx context.Context) {
run(ctx)
},
//
OnStoppedLeading: func() {
//
klog.Infof("leader lost: %s", id)
os.Exit(0)
},
//
OnNewLeader: func(identity string) {
// 7.
if identity == id {
return
}
klog.Infof("new leader elected: %s", identity)
},
},
})
}
代码分解如下:
rest.Config并构建静态客户端集合ctrl + c或者kill -15 pid这样的退出信号, 当收到信号后就可以关闭context, 而context的关闭可以通知leaderelection,这样leaderelection就会执行一些应该收尾的回调函数,比如用户注册的业务逻辑,或者释放租约如果你执行了上述代码,你可以在k8s集群查询到配置的租约lease
kubectl get leases election
# 输出如下
NAME HOLDER AGE
election ea0a3e2a-d8ec-4fa8-8581-02a0507337c9 16s
holder对应的是生成的uuid, 即
Identity对应的值
选举并没有太多初始化的操作,通过传入的LeaderElectionConfig直接运行。
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
// 省略相关配置
}
func RunOrDie(ctx context.Context, lec LeaderElectionConfig) {
// 1.
le, err := NewLeaderElector(lec)
// 2.
le.Run(ctx)
}
func (le *LeaderElector) Run(ctx context.Context) {
defer runtime.HandleCrash()
defer func() {
// 3.
le.config.Callbacks.OnStoppedLeading()
}()
// 4.
if !le.acquire(ctx) {
return // ctx signalled done
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// 5.
go le.config.Callbacks.OnStartedLeading(ctx)
// 6.
le.renew(ctx)
}
代码分解如下:
LeaderElector,创建的过程并不复杂,主要是校验各个参数是否合法根据上面的代码我们知道,选举的代码存在两个状态。
le.acquire(ctx)le.renew(ctx)这个状态会不断的尝试获取租约以成为Leader.
func (le *LeaderElector) acquire(ctx context.Context) bool {
// 1.
ctx, cancel := context.WithCancel(ctx)
defer cancel()
succeeded := false
wait.JitterUntil(func() {
// 2.
succeeded = le.tryAcquireOrRenew(ctx)
// 3.
le.maybeReportTransition()
// 4.
if !succeeded {
klog.V(4).Infof("failed to acquire lease %v", desc)
return
}
// 5.
cancel()
}, le.config.RetryPeriod, JitterFactor, true, ctx.Done())
return succeeded
}
代码分解如下:
wait.JitterUntilOnNewLeaderwait.JitterUntil里执行,所以退出之后还会不断的重试context的cancel来停止wait.JitterUntil。client-go里面的k8s.io/apimachinery/pkg/util/wait 有各种重试逻辑的帮助函数,比如这里的
wait.JitterUntil
而尝试获取租约的代码如下:
func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
now := metav1.Now()
leaderElectionRecord := rl.LeaderElectionRecord{
HolderIdentity: le.config.Lock.Identity(),
LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second),
RenewTime: now,
AcquireTime: now,
}
// 1.
oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx)
if err != nil {
// 2.
if !errors.IsNotFound(err) {
return false
}
if err = le.config.Lock.Create(ctx, leaderElectionRecord); err != nil {
klog.Errorf("error initially creating leader election record: %v", err)
return false
}
le.observedRecord = leaderElectionRecord
le.observedTime = le.clock.Now()
return true
}
// 3.
if len(oldLeaderElectionRecord.HolderIdentity) > 0 &&
le.observedTime.Add(le.config.LeaseDuration).After(now.Time) &&
!le.IsLeader() {
klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)
return false
}
// 4.
if le.IsLeader() {
leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime
leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions
} else {
leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1
}
// 5.
if err = le.config.Lock.Update(ctx, leaderElectionRecord); err != nil {
klog.Errorf("Failed to update lock: %v", err)
return false
}
return true
}
func (ll *LeaseLock) Get(ctx context.Context) (*LeaderElectionRecord, []byte, error) {
var err error
// 6.
ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Get(ctx, ll.LeaseMeta.Name, metav1.GetOptions{})
record := LeaseSpecToLeaderElectionRecord(&ll.lease.Spec)
recordByte, err := json.Marshal(*record)
if err != nil {
return nil, nil, err
}
return record, recordByte, nil
}
代码分解如下:
tryAcquireOrRenew.tryAcquireOrRenew这段逻辑会在称为Leader之前不断的尝试。
如果获取到了租约,那么就会成为Leader,成为了Leader也不是一劳永逸,而是要不断的刷新租约,告诉其他选举者自己还在线,你们这些家伙休想取而代之。
func (le *LeaderElector) Run(ctx context.Context) {
// 省略获取租约等逻辑
le.renew(ctx)
}
func (le *LeaderElector) renew(ctx context.Context) {
// 1.
ctx, cancel := context.WithCancel(ctx)
defer cancel()
wait.Until(func() {
// 2.
timeoutCtx, timeoutCancel := context.WithTimeout(ctx, le.config.RenewDeadline)
defer timeoutCancel()
// 3.
err := wait.PollImmediateUntil(le.config.RetryPeriod, func() (bool, error) {
return le.tryAcquireOrRenew(timeoutCtx), nil
}, timeoutCtx.Done())
// 4.
le.maybeReportTransition()
desc := le.config.Lock.Describe()
if err == nil {
klog.V(5).Infof("successfully renewed lease %v", desc)
return
}
// 5.
le.config.Lock.RecordEvent("stopped leading")
le.metrics.leaderOff(le.config.Name)
klog.Infof("failed to renew lease %v: %v", desc, err)
cancel()
}, le.config.RetryPeriod, ctx.Done())
// 6.
if le.config.ReleaseOnCancel {
le.release()
}
}
代码分解如下:
如果不熟悉k8s的部署,可能会有个问题,Leader掉线之后岂不是少了一个参选者? 如果网络环境差的话,,岂不是参选者越来越少?那么最后岂不是一个选举者都没了,怎么做到高可用?
为了避免这种情况,应该有两个前提
OnStartedLeading,这样从Leader状态掉线后就会终止程序通过client-go和k8s环境我们可以很容易写一个高可用的应用,这样的高可用应用可以保证总是有一个实例处理业务,因为只有一个实例处理业务,那么可以避免一些并发冲突的问题。