分布式锁

1 分布式锁基础

1.1、分布式锁

在并发场景中,为了保证临界资源的数据一致性,我们进厂使用到锁这个工具对临界资源进行保护,让混乱的并发访问行为退化为秩序的串行访问行为

在本地环境中,由于多线程之间能够共享进程的数据,因此可以比较简单地实现进程内的互斥锁;然而分布式场景中,有时我们需要跨域多个物理节点执行加锁操作,因此我们就需要依赖到类似于redis、mysql这样的储存组件,在此从基础上实现所谓的“分布式锁”技术。

1.2、核心性质

分布式锁应当具备如下几个核心性质:

  • 独占性:对于同一把锁,同一个时刻只能被一个取锁方占有,这是作为锁工具的最基础的一项心智
  • 健壮性:即不能产生死锁。假如某个占有锁的使用方因宕机而无法主动执行解锁动作,锁也应按能够被正常传递下去,被其他使用方所延续使用
  • 对称性:加锁和解锁的使用方法必须为同一身份。不允许非法释放他人持有的分布式锁
  • 高可用:当提供分布式锁服务的基础组件中存在少量节点发生故障事,不应该影响到分布式锁的稳定性

1.3、实现类型

分布式锁根据其实现模型,可以被分为两大类:

  • 主动轮询型:该模型类似于单机锁中的主动轮询+cas乐观锁,取锁方会持续对分布式锁发出尝试获取动作,如果锁已经被占用则会不断发起重试,知道取锁成功为止
  • watch回调型:在取锁方发现锁已经被他人占用时,会创建watcher监视器订阅锁的释放事件,随后不在发起主动取锁的尝试;当锁被释放后,取锁方能通过之前的watcher感知到这一变化,然后重新发起取锁的尝试动作

1.4、一些个人理解

在单机环境中,主动轮询和watch回调两种锁模型各有优劣,所谓“优”和“劣”也就相对而言,需要对cpu空转以及阻塞协程两种损耗做出权衡

然而,在分布式场景中,我个人觉得又是的天平略微朝着watch回调型的实现策略倾斜。这是因为分布式场景中“轮询”这一动作的成本相比单机锁而言要高很多,背后存在的行为可能是一次甚至多次网络io请求,这种情况下,取锁方局域watch回调的方式,在确保锁被释放、自身有机会取锁的情况下,才会重新发出尝试取锁的请求,这样就能在很大程度上避免无语的轮询消耗

当然,主动轮询的分布式锁能够保证使用方始终占据流程的主动权,整个流程可以更加轻便灵活;此外,watch机制在实现过程中需要简历长连接完成watch监听动作,也会存在一定的资源损耗,因此这个问题没有标准答案,应该结合实际的需求背景采取不同的应对策略:在并发激烈程度较高是倾向于watch回调型分布式锁;反之,主动轮询型分布式锁可能会是更好的选择。

除此之外,基于watch回到模型实现的分布式锁背后可能还存在其他问题,比如:当多个尝试取锁的使用方watch监听同一把锁是,一次锁的释放事件可能会引发“惊群效应”。这个问题以及对应的解决方案将会在本文探讨

2、主动轮询型

2.1、实现思路

主动轮询型分布式锁的实现思路为:

  • 针对同一把分布式锁,使用同一条数据进行标识(以redis为例,则为同一个key对应的kv数据记录)
  • 假如在存储介质成功插入了该条数据(要求之前该key对应的数据不存在),则被认定为加锁成功
  • 把从存储介质中删除该条数据这一行为理解为释放锁操作
  • 倘若在插入该数据时,已经发现数据已经存在(锁已经被他人持有),则持续轮询,知道锁被他人删除(他人释放锁),并有自身完成数据插入为止(取锁成功)
  • 由于是并发场景,需保证【(1)检查数据是否已插入(2)数据不存在则插入数据】这两个步骤之间是原子化的不可拆分的(在redis中是set only if no exist -- SETNX操作)

2.2 技术选型

实现主动轮询实现分布式锁时,我们常用的组件包括redis和mysql

redis

在实现主动轮询分布式锁时,redis算得上是大家最常用的组件,在第三章中,本文会以redis为例,进行主动轮询分布式锁的实践介绍

redis官方文档

redis基于内存实现数据的存储,因此足够高效轻便。此外,redis基于单线程模型完成数据处理工作,支持SETNX原子操作指令,能够很方便支持分布式锁的加锁操作

setnx使用文档,(事实上redis 2.6.12版本之后,setnx操作已经被弃置,官方推荐大家使用set指令并附加上nx参数来实现setnx指令相同的效果)

此外,redis还支持使用lua脚本指定组装同一个redis节点下的多笔操作形成一个具备原子性的事务

redis lua脚本使用文档
在通过redis实现分布式锁时,我们可以将key对应的value设置为使用方的身份表示。在解锁流程中,通过lua脚本组装步骤【(1)检查释放锁动作执行着的身份;(2)身份合法时才进行解锁】。如此一来,分布式锁的对称性也就得以保证了。

mysql

mysql官方文档
通过经典的关系型数据库mysql也能实现和redis类似的效果

  • 建立一张用于储存分布式锁记录的数据表
  • 以分布式锁的标识键作为表的唯一键(类比于redis的key)
  • 基于唯一键的特性,把同以把锁只能被插入一条数据,因此也就只能由一个使用方持有锁
  • 当锁被占有是,其他取锁方尝试插入数据时,会被mysql的唯一键所拦截,进而感知到锁已被占用这一情报。
  • 在表中可以新增一个字段标识使用方的身份,完整的解锁动作可以基于mysql事务(使用innodb引擎)保证原子性:【(1)检查释放锁动作的执行者身份;(2)身份合法才会进行解锁】。基于此,分布式锁的对称性能够得到保证。

2.3、死锁的问题

下一个问题是,我们在设计主动轮询型分布式锁时,如果避免出现死锁的问题而导致分布式锁不能用呢?

这项能力在mysql中显得捉襟见肘,不过在使用redis时,我们可以通过过期时间expire time机制得以保证,我们通常会在插入分布式锁对应的kv数据时设置一个过期时间expire time,这样即使使用方因异常导致无法正常解锁,锁对应的数据项也会在达到过时时间阈值后被自动删除,实现分布式锁的效果。

值得一提的是,这种过期机制也带来的新得问题,因为锁的持有者并不能精确的预判到自己持有锁后处理业务逻辑的实际耗时,因此此处设置的过期时间只能是一个偏向保守的值,假如因为一些异常情况导致占有锁的使用方在业务流程中耗时超过了设置的过期时间的阈值,就会导致锁被提前释放,其他取锁方可能取锁成功,最终导致数据不一致的并发问题。

针对这个问题,在分布式锁工具redisson中给出了解决方案--看门狗策略(watch dog strategy):在所得持有方为完成业务逻辑的处理事,会持续对分布式锁的过期阈值进行延期操作。

2.4、弱一致性问题

回顾 redis 的设计思路,为避免单点故障问题,redis 会基于主从复制的方式实现数据备份. (以哨兵机制为例,哨兵会持续监听 master 节点的健康状况,倘若 master 节点发生故障,哨兵会负责扶持 slave 节点上位,以保证整个集群能够正常对外提供服务). 此外,在 CAP 体系中,redis 走的是 AP 路线,为保证服务的吞吐性能,主从节点之间的数据同步是异步延迟进行的.

到这里问题就来了,试想一种场景:倘若 使用方 A 在 redis master 节点加锁成功,但是对应的 kv 记录在同步到 slave 之前,master 节点就宕机了. 此时未同步到这项数据的 slave 节点升为 master,这样分布式锁被 A 持有的“凭证” 就这样凭空消失了. 于是不知情的使用方 B C D 都可能加锁成功,于是就出现了一把锁被多方同时持有的问题,导致分布式锁最基本的独占性遭到破坏.

关于这个问题,一个比较经典的解决方案是:redis 红锁(redlock,全称 redis distribution lock),本文仅仅抛出一个引子,具体内容我们后续单开一篇再聊.

3 redis分布式锁

3.1、sdk介绍

首先,本文使用到基于golang编写的redis客户端sdk:redigo,用于和redis组件进行交互。

redigo开源地址

本文使用到的redigo源码版本为v1.8.9

3.2、源码介绍

redis客户端

  • 在redigo的基础上,封装实现了redis客户端client,内置了一个连接池 reids.pool进行redis的连接复用
  • 客户端client对外暴露了setNEX方法,语义是set with expire time only if key not exist,用于支持分布式锁加锁操作
  • 客户端client对外暴露了eval方法,用以执行lua脚本,后续用来支持分布式解锁操作
package redis_lock
import (
    "context"
    "errors"
    "time"


    "github.com/gomodule/redigo/redis"
)

// Client Redis 客户端.
type Client struct {
    ClientOptions
    pool *redis.Pool
}

func NewClient(network, address, password string, opts ...ClientOption) *Client {
    c := Client{
        ClientOptions: ClientOptions{
            network:  network,
            address:  address,
            password: password,
        },
    }


    for _, opt := range opts {
        opt(&c.ClientOptions)
    }


    repairClient(&c.ClientOptions)


    pool := c.getRedisPool()
    return &Client{
        pool: pool,
    }
}

func (c *Client) getRedisPool() *redis.Pool {
    return &redis.Pool{
        MaxIdle:     c.maxIdle,
        IdleTimeout: time.Duration(c.idleTimeoutSeconds) * time.Second,
        Dial: func() (redis.Conn, error) {
            c, err := c.getRedisConn()
            if err != nil {
                return nil, err
            }
            return c, nil
        },
        MaxActive: c.maxActive,
        Wait:      c.wait,
        TestOnBorrow: func(c redis.Conn, t time.Time) error {
            _, err := c.Do("PING")
            return err
        },
    }
}

func (c *Client) getRedisConn() (redis.Conn, error) {
    if c.address == "" {
        panic("Cannot get redis address from config")
    }


    var dialOpts []redis.DialOption
    if len(c.password) > 0 {
        dialOpts = append(dialOpts, redis.DialPassword(c.password))
    }
    conn, err := redis.DialContext(context.Background(),
        c.network, c.address, dialOpts...)
    if err != nil {
        return nil, err
    }
    return conn, nil
}

func (c *Client) GetConn(ctx context.Context) (redis.Conn, error) {
    return c.pool.GetContext(ctx)
}

// 只有 key 不存在时,能够 set 成功. set 时携带上超时时间,单位秒.
func (c *Client) SetNEX(ctx context.Context, key, value string, expireSeconds int64) (int64, error) {
    if key == "" || value == "" {
        return -1, errors.New("redis SET keyNX or value can't be empty")
    }


    conn, err := c.pool.GetContext(ctx)
    if err != nil {
        return -1, err
    }
    defer conn.Close()


    reply, err := conn.Do("SET", key, value, "EX", expireSeconds, "NX")
    if err != nil {
        return -1, nil
    }


    r, _ := reply.(int64)
    return r, nil
}


// Eval 支持使用 lua 脚本.
func (c *Client) Eval(ctx context.Context, src string, keyCount int, keysAndArgs []interface{}) (interface{}, error) {
    args := make([]interface{}, 2+len(keysAndArgs))
    args[0] = src
    args[1] = keyCount
    copy(args[2:], keysAndArgs)


    conn, err := c.pool.GetContext(ctx)
    if err != nil {
        return -1, err
    }
    defer conn.Close()


    return conn.Do("EVAL", args...)
}

redis分布式锁

  • 定义了redis分布式锁的类型:RedisLock
  • 锁RedisLock中需要内置一个redis客户端Client,用于后续请求交互
  • 锁实例被创建使,需要显示指定锁的标识键key
  • 锁被创建时,会取创建者的进程id+协程id,拼接生成token,作为使用方的身份标识
  • 用户可以使用option配置项,声明创建的锁是否是阻塞模式,锁对应的过期时间阈值已经等锁超时阈值等配置
package redis_lock

import (
    "context"
    "errors"
    "fmt"
    "time"

    "github.com/xiaoxuxiansheng/redis_lock/utils"
)

var ErrLockAcquiredByOthers = errors.New("lock is acquired by others")

func IsRetryableErr(err error) bool {
    return errors.Is(err, ErrLockAcquiredByOthers)
}

// 基于 redis 实现的分布式锁,不可重入,但保证了对称性
type RedisLock struct {
    LockOptions
    key    string
    token  string
    client *Client
}

func NewRedisLock(key string, client *Client, opts ...LockOption) *RedisLock {
    r := RedisLock{
        key:    key,
        token:  utils.GetProcessAndGoroutineIDStr(),
        client: client,
    }

    for _, opt := range opts {
        opt(&r.LockOptions)
    }

    repairLock(&r.LockOptions)
    return &r
}
package utils

import (
    "fmt"
    "os"
    "runtime"
    "strconv"
    "strings"
)

func GetCurrentProcessID() string {
    return strconv.Itoa(os.Getpid())
}

// GetCurrentGoroutineID 获取当前的协程ID
func GetCurrentGoroutineID() string {
    buf := make([]byte, 128)
    buf = buf[:runtime.Stack(buf, false)]
    stackInfo := string(buf)
    return strings.TrimSpace(strings.Split(strings.Split(stackInfo, "[running]")[0], "goroutine")[1])
}

func GetProcessAndGoroutineIDStr() string {
    return fmt.Sprintf("%s_%s", GetCurrentProcessID(), GetCurrentGoroutineID())
}
package redis_lock

const (
    // 默认连接池超过 10 s 释放连接
    DefaultIdleTimeoutSeconds = 10
    // 默认最大激活连接数
    DefaultMaxActive = 100
    // 默认最大空闲连接数
    DefaultMaxIdle = 20
)

type ClientOptions struct {
    maxIdle            int
    idleTimeoutSeconds int
    maxActive          int
    wait               bool
    // 必填参数
    network  string
    address  string
    password string
}

type ClientOption func(c *ClientOptions)

func WithMaxIdle(maxIdle int) ClientOption {
    return func(c *ClientOptions) {
        c.maxIdle = maxIdle
    }
}

func WithIdleTimeoutSeconds(idleTimeoutSeconds int) ClientOption {
    return func(c *ClientOptions) {
        c.idleTimeoutSeconds = idleTimeoutSeconds
    }
}

func WithMaxActive(maxActive int) ClientOption {
    return func(c *ClientOptions) {
        c.maxActive = maxActive
    }
}

func WithWaitMode() ClientOption {
    return func(c *ClientOptions) {
        c.wait = true
    }
}

func repairClient(c *ClientOptions) {
    if c.maxIdle < 0 {
        c.maxIdle = DefaultMaxIdle
    }

    if c.idleTimeoutSeconds < 0 {
        c.idleTimeoutSeconds = DefaultIdleTimeoutSeconds
    }

    if c.maxActive < 0 {
        c.maxActive = DefaultMaxActive
    }
}

type LockOption func(*LockOptions)

func WithBlock() LockOption {
    return func(o *LockOptions) {
        o.isBlock = true
    }
}

func WithBlockWaitingSeconds(waitingSeconds int64) LockOption {
    return func(o *LockOptions) {
        o.blockWaitingSeconds = waitingSeconds
    }
}

func WithExpireSeconds(expireSeeconds int64) LockOption {
    return func(o *LockOptions) {
        o.expireSeconds = expireSeeconds
    }
}

func repairLock(o *LockOptions) {
    if o.isBlock && o.blockWaitingSeconds <= 0 {
        // 默认阻塞等待时间上限为 5 秒
        o.blockWaitingSeconds = 5
    }

    // 分布式锁默认超时时间为 30 秒
    if o.expireSeconds <= 0 {
        o.expireSeconds = 30
    }
}

type LockOptions struct {
    isBlock             bool
    blockWaitingSeconds int64
    expireSeconds       int64
}

非阻塞模式加锁

  • 倘若锁处于非阻塞模式,则只会执行一次trylock方法进行尝试加锁操作,倘若失败,就会直接返回错误
  • trylock 操作基于redis的setNEX操作实现,即基于原子操作实现set with expire time only if key not exist 的语言
const RedisLockKeyPrefix = "REDIS_LOCK_PREFIX_"

// Lock 加锁.
func (r *RedisLock) Lock(ctx context.Context) error {
    // 不管是不是阻塞模式,都要先获取一次锁
    err := r.tryLock(ctx)
    if err == nil {
        return nil
    }

    // 非阻塞模式加锁失败直接返回错误
    if !r.isBlock {
        return err
    }

    // 判断错误是否可以允许重试,不可允许的类型则直接返回错误
    if !IsRetryableErr(err) {
        return err
    }

    // 基于阻塞模式持续轮询取锁
    return r.blockingLock(ctx)
}

func (r *RedisLock) tryLock(ctx context.Context) error {
    // 首先查询锁是否属于自己
    reply, err := r.client.SetNEX(ctx, r.getLockKey(), r.token, r.expireSeconds)
    if err != nil {
        return err
    }
    if reply != 1 {
        return fmt.Errorf("reply: %d, err: %w", reply, ErrLockAcquiredByOthers)
    }
    return nil
}

func (r *RedisLock) getLockKey() string {
    return RedisLockKeyPrefix + r.key
}

阻塞模式加锁

  • 当锁处在阻塞模式下,会通过ticker,每隔50ms执行一次尝试取锁的请求(tryLock:setNEX)
  • 倘若某次请求取锁成功,则直接返回
  • 倘若达到等锁超时的阈值或者中途发生预期之外的错误,则会终止流程
func (r *RedisLock) blockingLock(ctx context.Context) error {
    // 阻塞模式等锁时间上限
    timeoutCh := time.After(time.Duration(r.blockWaitingSeconds) * time.Second)
    // 轮询 ticker,每隔 50 ms 尝试取锁一次
    ticker := time.NewTicker(time.Duration(50) * time.Millisecond)
    defer ticker.Stop()

    for range ticker.C {
        select {
        // ctx 终止了
        case <-ctx.Done():
            return fmt.Errorf("lock failed, ctx timeout, err: %w", ctx.Err())
            // 阻塞等锁达到上限时间
        case <-timeoutCh:
            return fmt.Errorf("block waiting time out, err: %w", ErrLockAcquiredByOthers)
        // 放行
        default:
        }

        // 尝试取锁
        err := r.tryLock(ctx)
        if err == nil {
            // 加锁成功,返回结果
            return nil
        }

        // 不可重试类型的错误,直接返回
        if !IsRetryableErr(err) {
            return err
        }
    }

    return nil
} 

解锁

  • 解锁动作基于lua脚本执行
  • lua脚本执行内容分为两部分:【(1)校验当前操作这是否拥有锁的所有权(2)倘若是,则释放锁】
// Unlock 解锁. 基于 lua 脚本实现操作原子性.
func (r *RedisLock) Unlock(ctx context.Context) error {
    keysAndArgs := []interface{}{r.getLockKey(), r.token}
    reply, err := r.client.Eval(ctx, LuaCheckAndDeleteDistributionLock, 1, keysAndArgs)
    if err != nil {
        return err
    }

    if ret, _ := reply.(int64); ret != 1 {
        return errors.New("can not unlock without ownership of lock)
    }
    return nil
}   
// LuaCheckAndDeleteDistributionLock 判断是否拥有分布式锁的归属权,是则删除
const LuaCheckAndDeleteDistributionLock = `
  local lockerKey = KEYS[1]
  local targetToken = ARGV[1]
  local getToken = redis.call('get',lockerKey)
  if (not getToken or getToken ~= targetToken) then
    return 0
  else
    return redis.call('del',lockerKey)
  end
`

4 watch回调型

4.1、实现思路

对于实现 watch 回调型分布式锁,一些基本要点和 2.1 小节中聊到的主动轮询型分布式锁类似:

  • 针对于同一把分布式锁,使用一条相同的数据进行标识(唯一、明确的 key)
  • 倘若在存储介质内成功插入该条数据(要求 key 对应的数据不存在),则这一行为被认定为加锁成功
  • 把从存储介质中删除该条数据这行为理解为解锁操作

与主动轮询型分布式锁不同的是,在取锁失败时,watch 回调型分布式锁不会持续轮询,而是会 watch 监听锁的删除事件:

  • 倘若在插入数据时,发现该条记录已经存在,说明锁已被他人持有,此时选择监听这条数据记录的删除事件,当对应事件发生时说明锁被释放了,此时才继续尝试取锁

4.2 技术选型

在实现上,我们需要依赖于提供了 watch 机制的状态存储组件,不仅能支持数据的存储和去重,还需要利用到其中的 watch 监听回调功能进行锁释放事件的订阅感知.

为满足上述诉求,我们常用的技术组件包括 etcd 和 zookeeper.

(1)etcd

etcd 官方文档

etcd 是一款适合用于共享配置和服务发现的分布式 kv 存储组件,底层基于分布式共识算法 raft 协议保证了存储服务的强一致和高可用.

在 etcd 中提供了watch 监听器的功能,即针对于指定范围的数据,通过与 etcd 服务端节点创建 grpc 长连接的方式持续监听变更事件. 关于 watch 机制的详细介绍,可以参见我上一周发表的两篇文章—— etcd watch 机制源码解析——客户端篇/服务端篇.

此外,etcd 中写入数据时,还支持通过版本 revision 机制进行取锁秩序的统筹协调,是一款很适合用于实现分布式锁的组件.

etcd 是本文在介绍 watch 回调型分布式锁时选取的工程实践案例,在本文第 5 章会结合实现源码展开介绍.

(2)zookeeper

zookeeper 官方文档

ZooKeeper是一款开源的分布式应用协调服务,底层基于分布式共识算法 zab 协议保证了数据的强一致性和高可用性.

zookeeper 中提供了临时顺序节点(EPHEMERAL_SEQUENTIAL)类型以及 watch 监听器机制,能够满足实现 watch 回调型分布式锁所需要具备的一切核心能力.

不过在本文中,zk 部分我们不多作展开,介绍内容以 etcd 为核心.

4.3 死锁问题

为避免死锁问题的产生,etcd 中提供了租约 lease 机制. 租约,顾名思义,是一份具有时效性的协议,一旦达到租约上规定的截止时间,租约就会失去效力. 同时,etcd 中还提供了续约机制(keepAlive),用户可以通过续约操作来延迟租约的过期时间.

那么,我们如何来利用租约 lease 机制解决分布式锁中可能存在的死锁问题呢?实现思路如下:

  • 用户可以先申请一份租约,设定好租约的截止时间
  • 异步启动一个续约协程,负责在业务逻辑处理完成前,按照一定的时间节奏持续进行续约操作
  • 在执行取锁动作,将对应于锁的 kv 数据和租约进行关联绑定,使得锁数据和租约拥有相同的过期时间属性

在这样的设定之下,倘若分布式锁的持有者出现异常状况导致无法正常解锁,则可以通过租约的过期机制完成对分布式锁的释放,死锁问题因此得以规避. 此外,锁的使用方可以将租约的初始过期时间设定为一个偏小的值,并通过续约机制来对租约的生效周期进行动态延长. 可以看到,此处 etcd 中的租约及续约机制,实现了与 redisson 中 watch dog 机制类似的效果.

4.4 惊群效应

惊群效应又称为羊群效应:羊群是一种纪律性很差的组织,平时就处在一种散漫无秩序地移动模式之下. 需要注意的是,在羊群中一旦有某只羊出现异动,其他的羊也会不假思索地一哄而上跑动起来,全然不估计附近可能有狼或者何处有更好的草源等客观问题.

在 watch 回调型分布式锁的实现过程中,可能也会存在类似于惊群效应的问题. 这里指的是:倘若一把分布式锁的竞争比较激烈,那么锁的释放事件可能同时被多个的取锁方所监听,一旦锁真的被释放了,所有的取锁方都会一拥而上尝试取锁,然而我们知道,一个轮次中真正能够取锁成功的只会有一名角色,因此这个过程中会存在大量无意义的性能损耗,且释放锁时刻瞬间激增的请求流量也可能会对系统稳定性产生负面效应.

为规避惊群效应,etcd 中提供了前缀 prefix 机制以及版本 revision 机制,和 zookeeper 的临时顺序节点功能有些类似:

  • 对于同一把分布式锁,锁记录数据的 key 拥有共同的前缀 prefix,作为锁的标识
  • 每个取锁方取锁时,会以锁前缀 prefix 拼接上自身的身份标识(租约 id),生成完整的 lock key. 因此各取锁方完整的 lock key 都是互不相同的(只是有着相同的前缀),理论上所有取锁方都能成功把锁记录数据插入到 etcd 中
  • 每个取锁方插入锁记录数据时,会获得自身 lock key 处在锁前缀 prefix 范围下唯一且递增的版本号 revision
  • 取锁方插入加锁记录数据不意味着加锁成功,而是需要在插入数据后查询一次锁前缀 prefix 下的记录列表,判定自身 lock key 对应的 revision 是不是其中最小的,如果是的话,才表示加锁成功
  • 如果锁被他人占用,取锁方会 watch 监听 revision 小于自己但最接近自己的那个 lock key 的删除事件.
  • 这样所有的取锁方就会在 revision 机制的协调下,根据取锁序号(revision)的先后顺序排成一条队列,每当锁被释放,只会惊动到下一顺位的取锁方,惊群问题得以避免.

5 etcd 分布式锁

5.1 sdk 介绍

etcd 开源地址

本文使用到的 etcd 源码版本为 v3.5.8.

etcd 作者在 etcd 的 concurrency 包下,基于 watch 机制结合 revision 机制实现了一款通用的 etcd 分布式锁,因此这部分代码我不再手写,而是会基于官方的实现示范进行源码讲解.

5.2 实现源码

(1)数据结构

Session

session 指的是一次访问会话,背后对应的是一笔租约 lease. 用户调用 NewSession 方法构造 session 实例时,执行的步骤包括:

  • 通过 client.Grant 方法申请到一个 lease id
  • 调用 client.KeepAlive 方法持续对租约进行续期
  • 构造一个会话 session 实例
  • 异步开启一个守护协程,进行租约续期响应参数的处理(keepAlive)
const defaultSessionTTL = 60

// Session represents a lease kept alive for the lifetime of a client.
// Fault-tolerant applications may use sessions to reason about liveness.
type Session struct {
    client *v3.Client
    opts   *sessionOptions
    id     v3.LeaseID

    cancel context.CancelFunc
    donec  <-chan struct{}
}

// NewSession gets the leased session for a client.
func NewSession(client *v3.Client, opts ...SessionOption) (*Session, error) {
    lg := client.GetLogger()
    ops := &sessionOptions{ttl: defaultSessionTTL, ctx: client.Ctx()}
    for _, opt := range opts {
        opt(ops, lg)
    }

    id := ops.leaseID
    if id == v3.NoLease {
        resp, err := client.Grant(ops.ctx, int64(ops.ttl))
        if err != nil {
            return nil, err
        }
        id = resp.ID
    }

    ctx, cancel := context.WithCancel(ops.ctx)
    keepAlive, err := client.KeepAlive(ctx, id)
    if err != nil || keepAlive == nil {
        cancel()
        return nil, err
    }

    donec := make(chan struct{})
    s := &Session{client: client, opts: ops, id: id, cancel: cancel, donec: donec}

    // keep the lease alive until client error or cancelled context
    go func() {
        defer close(donec)
        for range keepAlive {
            // eat messages until keep alive channel closes
        }
    }()

    return s,nil
}

假如用户处理完成业务逻辑之后,可以通过 session.Close 方法完成会话的关闭,在方法中会通过 context 的 cancel 动作,停止对租约的续期行为.

// Close orphans the session and revokes the session lease.
func (s *Session) Close() error {
    s.Orphan()
    // if revoke takes longer than the ttl, lease is expired anyway
    ctx, cancel := context.WithTimeout(s.opts.ctx, time.Duration(s.opts.ttl)*time.Second)
    _, err := s.client.Revoke(ctx, s.id)
    cancel()
    return err
}

// Orphan ends the refresh for the session lease. This is useful
// in case the state of the client connection is indeterminate (revoke
// would fail) or when transferring lease ownership.
func (s *Session) Orphan() {
    s.cancel()
    <-s.donec
}
Mutex

Mutex 是 etcd 分布式锁的类型,其中核心字段包括:

  • s:内置了一个会话 session
  • pfx:分布式锁的公共前缀
  • myKey:当前锁使用方完整的 lock key,由 pfx 和 lease id 两部分拼接而成
  • myRev:当前锁使用方 lock key 在公共锁前缀 pfx 下对应的版本 revision
// Mutex implements the sync Locker interface with etcd
type Mutex struct {
    s *Session
    
    pfx   string
    myKey string
    myRev int64
    hdr   *pb.ResponseHeader
}

func NewMutex(s *Session, pfx string) *Mutex {
    return &Mutex{s, pfx + "/", "", -1, nil}
}

(2)方法链路

TryLock

Mutex.TryLock 方法会执行一次尝试加锁的动作,倘若锁已经被其他人占有,则会直接返回错误,不会阻塞:

  • 调用 Mutex.tryAcquire 方法插入 my key(已存在则查询),获取到 my key 对应的 revision 以及当前锁的实际持有者
  • 倘若锁 pfx 从未被占用过,或者锁 pfx 下存在的 revision 中,自身的 revision 是其中最小的一个,则说明自己加锁成功
  • 倘若锁已经被其他人占用,则删除自己加锁时创建的 kv 对记录,然后返回锁已被他人占用的错误
// TryLock locks the mutex if not already locked by another session.
// If lock is held by another session, return immediately after attempting necessary cleanup
// The ctx argument is used for the sending/receiving Txn RPC.
func (m *Mutex) TryLock(ctx context.Context) error {
    resp, err := m.tryAcquire(ctx)
    if err != nil {
        return err
    }
    // if no key on prefix / the minimum rev is key, already hold the lock
    ownerKey := resp.Responses[1].GetResponseRange().Kvs
    if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
        m.hdr = resp.Header
        return nil
    }
    client := m.s.Client()
    // Cannot lock, so delete the key
    if _, err := client.Delete(ctx, m.myKey); err != nil {
        return err
    }
    m.myKey = "\x00"
    m.myRev = -1
    return ErrLocked
}
lock

Mutex.Lock 方法采用的是阻塞加锁的处理模式,倘若分布式锁已经被其他人占用,则会持续阻塞等待时机,直到自己取锁成功:

  • 调用 Mutex.tryAcquire 方法插入 my key(已存在则查询),获取到 my key 对应的 revision 以及当前锁的实际持有者
  • 倘若锁 pfx 从未被占用过,或者锁 pfx 下存在的 revision 中,自身的 revision 是其中最小的一个,则说明自己加锁成功
  • 倘若锁已被他人占用,调用 waitDeletes 方法,watch 监听 revision 小于自己且最接近于自己的锁记录数据的删除事件
  • 当接收到解锁事件后,会再检查一下自身的租约有没有过期,如果没有,则说明加锁成功
// Lock locks the mutex with a cancelable context. If the context is canceled
// while trying to acquire the lock, the mutex tries to clean its stale lock entry.
func (m *Mutex) Lock(ctx context.Context) error {
    resp, err := m.tryAcquire(ctx)
    if err != nil {
        return err
    }
    // if no key on prefix / the minimum rev is key, already hold the lock
    ownerKey := resp.Responses[1].GetResponseRange().Kvs
    if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
        m.hdr = resp.Header
        return nil
    }
    client := m.s.Client()
    // wait for deletion revisions prior to myKey
    // TODO: early termination if the session key is deleted before other session keys with smaller revisions.
    _, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
    // release lock key if wait failed
    if werr != nil {
        m.Unlock(client.Ctx())
        return werr
    }


    // make sure the session is not expired, and the owner key still exists.
    gresp, werr := client.Get(ctx, m.myKey)
    if werr != nil {
        m.Unlock(client.Ctx())
        return werr
    }


    if len(gresp.Kvs) == 0 { // is the session key lost?
        return ErrSessionExpired
    }
    m.hdr = gresp.Header


    return nil
}
tryAcquire

Mutex.tryAcquire 方法,使用方会完成锁数据的插入以及 revision 的获取:

  • 基于 etcd 的事务操作,判定假如当前 my key 还没创建过锁的 kv 记录,则创建 kv 记录并执行 getOwner 操作获取当前锁的持有者;倘若已经创建过,则查询对应的 kv 记录,并调用 getOwner 获取当前锁的持有者
  • 返回 my key 对应的 revision 和当前锁的 owner(锁 pfx 中最小 revision 的归属方),供上层的 Lock 或者 TryLock 方法使用
func (m *Mutex) tryAcquire(ctx context.Context) (*v3.TxnResponse, error) {
    s := m.s
    client := m.s.Client()


    m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease())
    cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
    // put self in lock waiters via myKey; oldest waiter holds lock
    put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
    // reuse key in case this session already holds the lock
    get := v3.OpGet(m.myKey)
    // fetch current holder to complete uncontended path with only one RPC
    getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)
    resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()
    if err != nil {
        return nil, err
    }
    m.myRev = resp.Header.Revision
    if !resp.Succeeded {
        m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
    }
    return resp, nil
}
waitDeletes
  • 基于一个 for 循环实现自旋
  • 每轮处理中,会获取 revision 小于自己且最接近于自己的取锁方的 key
  • 倘若 key 不存在,则说明自己的 revision 已经是最小的,直接取锁成功
  • 倘若 key 存在,则调用 waitDelete 方法阻塞监听这个 key 的删除事件
// waitDeletes efficiently waits until all keys matching the prefix and no greater
// than the create revision are deleted.
func waitDeletes(ctx context.Context, client *v3.Client, pfx string, maxCreateRev int64) (*pb.ResponseHeader, error) {
    getOpts := append(v3.WithLastCreate(), v3.WithMaxCreateRev(maxCreateRev))
    for {
        resp, err := client.Get(ctx, pfx, getOpts...)
        if err != nil {
            return nil, err
        }
        if len(resp.Kvs) == 0 {
            return resp.Header, nil
        }
        lastKey := string(resp.Kvs[0].Key)
        if err = waitDelete(ctx, client, lastKey, resp.Header.Revision); err != nil {
            return nil, err
        }
    }
}
func waitDelete(ctx context.Context, client *v3.Client, key string, rev int64) error {
    cctx, cancel := context.WithCancel(ctx)
    defer cancel()


    var wr v3.WatchResponse
    wch := client.Watch(cctx, key, v3.WithRev(rev))
    for wr = range wch {
        for _, ev := range wr.Events {
            if ev.Type == mvccpb.DELETE {
                return nil
            }
        }
    }
    if err := wr.Err(); err != nil {
        return err
    }
    if err := ctx.Err(); err != nil {
        return err
    }
    return errors.New("lost watcher waiting for delete")
}
unlock

解锁时直接删除自己的 kv 对记录即可,假如自己是持有锁的角色,那么删除 kv 对记录就是真正意义上的解锁动作;即便自己并无持有锁,删除 kv 对就代表自己退出了抢锁流程,也不会对流程产生负面影响.

这里大家可能会存在一个疑问,就是假如执行 unlock 操作的角色本身只是处在等锁队列中,并未真正持有锁,那么执行删除 kv 对记录时是否会误将队列中的下一个取锁方误唤醒,引起秩序混乱?

答案是不会的,大家可以回过头观察 waitDeletes 方法的实现逻辑,取锁方在从 waitDelete 方法中接收到前一笔 kv 记录的删除事件而被唤醒后,它会接着查询一轮比它小且最接近的 revision 对应的 kv 对记录,如果存在则继续进行监听,直到这样的 kv 数据不存在时才会取锁成功(my revision 已经是锁 pfx 下最小的 revision).

func (m *Mutex) Unlock(ctx context.Context) error {
    if m.myKey == "" || m.myRev <= 0 || m.myKey == "\x00" {
        return ErrLockReleased
    }


    if !strings.HasPrefix(m.myKey, m.pfx) {
        return fmt.Errorf("invalid key %q, it should have prefix %q", m.myKey, m.pfx)
    }


    client := m.s.Client()
    if _, err := client.Delete(ctx, m.myKey); err != nil {
        return err
    }
    m.myKey = "\x00"
    m.myRev = -1
    return nil
}

6 总结

本篇和大家一起探讨了如何基于 Golang 实现主动轮询和 watch 回调两种模式的分布式锁. 本文分别以redis 和 etcd 两个组件为例进行了分布式锁的原理介绍及源码展示.

redis 可以算是我们最常用于实现分布式锁的组件,但是由于其中缺少续约机制以及存在数据弱一致性的问题,导致分布式锁的独占性并不能够得到保证. 后续我会单独开一个篇章,和大家一起聊聊如何通过 watch dog 和 redlock 机制解决 redis 分布式锁可能存在的安全隐患.

Last modification:April 12, 2024
如果觉得我的文章对你有用,请收藏本站