企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
[TOC] > [参考](https://chai2010.cn/advanced-go-programming-book/ch6-cloud/ch6-02-lock.html) ## 概述 与单机并发下需要加锁一样,在分布式场景下,我们也需要这种“抢占”的逻辑 ## 基于Redis的setnx 我们可以使用Redis提供的`setnx`命令(只有在 key 不存在时设置 key 的值) ``` package main import ( "fmt" "sync" "time" "github.com/go-redis/redis" ) func incr() { client := redis.NewClient(&redis.Options{ Addr: "localhost:6379", Password: "", // no password set DB: 0, // use default DB }) var lockKey = "counter_lock" var counterKey = "counter" // lock resp := client.SetNX(lockKey, 1, time.Second*5) lockSuccess, err := resp.Result() if err != nil || !lockSuccess { fmt.Println(err, "lock result: ", lockSuccess) return } // counter ++ getResp := client.Get(counterKey) cntValue, err := getResp.Int64() if err == nil || err == redis.Nil { cntValue++ resp := client.Set(counterKey, cntValue, 0) _, err := resp.Result() if err != nil { // log err println("set value error!") } } println("current counter is ", cntValue) delResp := client.Del(lockKey) unlockSuccess, err := delResp.Result() if err == nil && unlockSuccess > 0 { println("unlock success!") } else { println("unlock failed", err) } } func main() { var wg sync.WaitGroup for i := 0; i < 10; i++ { wg.Add(1) go func() { defer wg.Done() incr() }() } wg.Wait() } ``` 看看运行结果: ``` ❯❯❯ go run redis_setnx.go <nil> lock result: false <nil> lock result: false <nil> lock result: false <nil> lock result: false <nil> lock result: false <nil> lock result: false <nil> lock result: false <nil> lock result: false <nil> lock result: false current counter is 2028 unlock success! ``` ## 基于ZooKeeper ``` package main import ( "time" "github.com/samuel/go-zookeeper/zk" ) func main() { c, _, err := zk.Connect([]string{"127.0.0.1"}, time.Second) //*10) if err != nil { panic(err) } l := zk.NewLock(c, "/lock", zk.WorldACL(zk.PermAll)) err = l.Lock() if err != nil { panic(err) } println("lock succ, do your business logic") time.Sleep(time.Second * 10) // do some thing l.Unlock() println("unlock succ, finish business logic") } ``` - 基于ZooKeeper的锁与基于Redis的锁的不同之处在于Lock成功之前会一直阻塞,这与我们单机场景中的`mutex.Lock`很相似 - 其原理也是基于临时Sequence节点和watch API,例如我们这里使用的是`/lock`节点。Lock会在该节点下的节点列表中插入自己的值,只要节点下的子节点发生变化,就会通知所有watch该节点的程序。这时候程序会检查当前节点下最小的子节点的id是否与自己的一致。如果一致,说明加锁成功了 - 这种分布式的阻塞锁比较适合分布式任务调度场景,但不适合高频次持锁时间短的抢锁场景 ## 基于etcd ``` package main import ( "log" "github.com/zieckey/etcdsync" ) func main() { m, err := etcdsync.New("/lock", 10, []string{"http://127.0.0.1:2379"}) if m == nil || err != nil { log.Printf("etcdsync.New failed") return } err = m.Lock() if err != nil { log.Printf("etcdsync.Lock failed") return } log.Printf("etcdsync.Lock OK") log.Printf("Get the lock. Do something here.") err = m.Unlock() if err != nil { log.Printf("etcdsync.Unlock failed") } else { log.Printf("etcdsync.Unlock OK") } } ``` 1. 先检查`/lock`路径下是否有值,如果有值,说明锁已经被别人抢了 2. 如果没有值,那么写入自己的值。写入成功返回,说明加锁成功。写入时如果节点被其它节点写入过了,那么会导致加锁失败,这时候到 3 3. watch`/lock`下的事件,此时陷入阻塞 4. 当`/lock`路径下发生事件时,当前进程被唤醒。检查发生的事件是否是删除事件(说明锁被持有者主动unlock),或者过期事件(说明锁过期失效)。如果是的话,那么回到 1,走抢锁流程。 值得一提的是,在**etcdv3**的API中官方已经提供了可以直接使用的**锁API**,读者可以查阅etcd的文档做进一步的学习。