企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
[TOC] > [github.com](https://learnku.com/articles/37343#c23c9b) ## 安装 `go get github.com/coreos/etcd/clientv3 ` ## clientv3.New( 连接 ``` client, err := clientv3.New(clientv3.Config{ Endpoints: []string{"localhost:2379"}, // Endpoints: []string{"localhost:2379", "localhost:22379", "localhost:32379"} DialTimeout: 5 * time.Second, }) type Client struct { Cluster //向集群里增加 etcd 服务端节点之类,属于管理员操作。 KV //我们主要使用的功能,即 K-V 键值库的操作 Lease //租约相关操作,比如申请一个 TTL=10 秒的租约(应用给 key 可以实现键值的自动过期) Watcher //观察订阅,从而监听最新的数据变化 Auth //管理 etcd 的用户和权限,属于管理员操作 Maintenance //维护 etcd,比如主动迁移 etcd 的 leader 节点,属于管理员操作 Username string Password string } ``` ## clientv3.NewKV 操作kv ``` kv := clientv3.NewKV(cli) ``` ### Put ``` putResp, err := kv.Put(context.TODO(),"/test/key1", "hello") fmt.Printf("PutResponse: %v, err: %v", putResp, err) // PutResponse: &{cluster\_id:14841639068965178418 member\_id:10276657743932975437 revision:3 raft\_term:7 }, err: % ``` ### Get #### 获取一条 ``` getResp, _ := kv.Get(context.TODO(), "/test/key1") fmt.Printf("%+v\n", getResp) fmt.Println(getResp.Header.GetClusterId())//17237436991929493444 fmt.Println(getResp.Header.GetMemberId()) //18249187646912138824 fmt.Println(string(getResp.Kvs[0].Value)) //hello ``` #### 获取目录下的内容 根据路径识别本质就是类似 sql 的like 语句 "foo%" ``` getResp, _ := kv.Get(context.TODO(), "/test/",clientv3.WithPrefix()) fmt.Println(string(getResp.Kvs[0].Value)) //hello fmt.Println(string(getResp.Kvs[1].Value)) //Hello World! ``` #### Lease 设置自动过期key ``` Grant:分配一个租约。 Revoke:释放一个租约。 TimeToLive:获取剩余 TTL 时间。 Leases:列举所有 etcd 中的租约。 KeepAlive:自动定时的续约某个租约。 KeepAliveOnce:为某个租约续约一次。 Close:释放当前客户端建立的所有租约。 ``` ``` // 设置一个 10s 过期的 lease lease := clientv3.NewLease(client) response, e := lease.Grant(context.TODO(), 10) // Put 之前 Lease 已经过期了,返回error kv.Put(context.TODO(), "/test/vanish", "vanish in 10s", clientv3.WithLease(response.ID)) getResponse, _ := kv.Get(context.TODO(), "/test/vanish") fmt.Println(string(getResponse.Kvs[0].Value)) // vanish in 10s time.Sleep(11*time.Second) getResponse, _= kv.Get(context.TODO(), "/test/vanish") fmt.Println(string(getResponse.Kvs[0].Value)) // 报错,没有值 ``` ### Op ``` kv.Do(context.TODO(),clientv3.OpPut("/test/key3","Hello World!")) //等于 kv.Put(context.TODO(),"/test/key3", "Hello World!") ``` ### Txn 事务 ``` kv.Txn(context.TODO()).If( clientv3.Compare(clientv3.Value(k1), ">", v1), clientv3.Compare(clientv3.Version(k1), "=", 2) ).Then( clientv3.OpPut(k2,v2), clentv3.OpPut(k3,v3) ).Else( clientv3.OpPut(k4,v4), clientv3.OpPut(k5,v5) ).Commit() ``` ### Watch ``` kv := clientv3.NewKV(client) kv.Put(context.TODO(), "/test/key1", "hello") watcher := clientv3.NewWatcher(client) watch := watcher.Watch(context.TODO(), "/test/key1") go func() { for{ select { case <- watch: response, _ := kv.Get(context.TODO(), "/test/key1") fmt.Printf("%+v\n", response) default: // fmt.Println("sleep 1s ") time.Sleep(1*time.Second) } } }() go func() { time.Sleep(1*time.Second) kv.Put(context.TODO(),"/test/key1","hello2") }() ``` ### 使用 etcd 简化版 embed 用于单元测试 ``` package main import ( "context" "fmt" "log" "net/http" "time" "go.etcd.io/etcd/v3/embed" ) func main() { cfg := embed.NewConfig() e, err := embed.StartEtcd(cfg) if err != nil { log.Fatal(err) } defer e.Close() select { case <-e.Server.ReadyNotify(): fmt.Println("Server is ready!") case <-time.After(60 * time.Second): e.Server.Stop() // trigger a shutdown fmt.Println("Server took too long to start!") } // Example: put a key, get a key cli, err := e.Client() if err != nil { log.Fatal(err) } key := "exampleKey" value := "exampleValue" ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) _, err = cli.Put(ctx, key, value) cancel() if err != nil { log.Fatal(err) } getResp, err := cli.Get(context.Background(), key) if err != nil { log.Fatal(err) } for _, kv := range getResp.Kvs { fmt.Printf("Key: %s, Value: %s\n", kv.Key, kv.Value) } // Example: HTTP API go func() { if err := http.ListenAndServe(":2379", e.Server); err != nil { log.Fatal(err) } }() // Your application logic goes here... select {} } ```