[TOC]
# go操作etcd
~~~
go get -u go.etcd.io/etcd
~~~
在import的时候 应该import “go.etcd.io/etcd/clientv3” 而不是 "github.com/coreos/etcd/clientv3"
## 连接
~~~
import (
"fmt"
"go.etcd.io/etcd/clientv3"
"time"
)
func main() {
var (
config clientv3.Config //服务器配置
client *clientv3.Client
err error
)
//客户端配置
config = clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
}
//建立连接
if client, err = clientv3.New(config); err != nil {
fmt.Println(err)
return
}
defer client.Close()
fmt.Println("连接成功")
}
~~~
## kv设置
~~~
package main
import (
"context"
"fmt"
"go.etcd.io/etcd/clientv3"
"time"
)
func main() {
var (
config clientv3.Config //服务器配置
client *clientv3.Client //客户端连接对象
err error
kv clientv3.KV //操作kv的对象
putResp *clientv3.PutResponse //设置kv的对象
)
//客户端配置
config = clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
}
//建立连接
if client, err = clientv3.New(config); err != nil {
fmt.Println(err)
return
}
//关闭连接
defer client.Close()
//用于读写etcd的键值对
kv = clientv3.NewKV(client)
//这里面有上下文,可以用来取消他
//第三个参数可选,clientv3.WithPrevKV()表示可以查到以前的kv
if putResp, err = kv.Put(context.TODO(), "/cron/jobs/job1", "hello", clientv3.WithPrevKV()); err != nil {
fmt.Println(err)
} else {
//每次操作有个唯一的Revision,单调递增
fmt.Println("Revision: ", putResp.Header.Revision)
//打印之前的值
if putResp.PrevKv != nil {
fmt.Println("之前的值: ", string(putResp.PrevKv.Key), "---", string(putResp.PrevKv.Value))
}
}
}
~~~
## kv读取
~~~
package main
import (
"context"
"fmt"
"go.etcd.io/etcd/clientv3"
"time"
)
func main() {
var (
config clientv3.Config //服务器配置
client *clientv3.Client //客户端连接对象
err error
kv clientv3.KV //操作kv的对象
getResp *clientv3.GetResponse //获取kv的对象
)
//客户端配置
config = clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
}
//建立连接
if client, err = clientv3.New(config); err != nil {
fmt.Println(err)
return
}
//关闭连接
defer client.Close()
//用于读写etcd的键值对
kv = clientv3.NewKV(client)
//第三个参数也是可选的 clientv3.
if getResp, err = kv.Get(context.TODO(), "/cron/jobs/job1"); err != nil {
fmt.Println(err)
} else {
//打印出来的create_revision是创建版本,mod_revision是修改版本, version是修改的次数
fmt.Println(getResp.Kvs)
}
}
~~~
## 以什么为前缀查找
~~~
func main() {
var (
config clientv3.Config //服务器配置
client *clientv3.Client //客户端连接对象
err error
kv clientv3.KV //操作kv的对象
getResp *clientv3.GetResponse //获取kv的对象
)
//客户端配置
config = clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
}
//建立连接
if client, err = clientv3.New(config); err != nil {
fmt.Println(err)
return
}
//关闭连接
defer client.Close()
//用于读写etcd的键值对
kv = clientv3.NewKV(client)
//第三个参数也是可选的 clientv3.
//以什么为前缀的
if getResp, err = kv.Get(context.TODO(), "/cron/jobs/", clientv3.WithPrefix()); err != nil {
fmt.Println(err)
} else {
//数组
//打印出来的create_revision是创建版本,mod_revision是修改版本, version是修改的次数
fmt.Println(getResp.Kvs)
for k, v := range getResp.Kvs {
fmt.Println(k)
fmt.Println(v)
}
}
}
~~~
## 删除key
~~~
func main() {
var (
config clientv3.Config //服务器配置
client *clientv3.Client //客户端连接对象
err error
kv clientv3.KV //操作kv的对象
deleteResp *clientv3.DeleteResponse //删除的对象
)
//客户端配置
config = clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
}
//建立连接
if client, err = clientv3.New(config); err != nil {
fmt.Println(err)
return
}
//关闭连接
defer client.Close()
//用于读写etcd的键值对
kv = clientv3.NewKV(client)
//删除
//第三个参数可选,clientv3.WithPrevKV()表示会赋值deleteResp.PrevKvs
if deleteResp, err = kv.Delete(context.TODO(), "/cron/jobs/job1", clientv3.WithPrevKV()); err != nil {
fmt.Println(err)
return
}
//被删除之前的k和v,上面第三个参数要设置,否则是没有的
if len(deleteResp.PrevKvs) != 0 {
for _, v := range deleteResp.PrevKvs {
fmt.Println(string(v.Key))
fmt.Println(string(v.Value))
}
}
}
~~~
## 删除多个key,以什么为前缀
~~~
if deleteResp, err = kv.Delete(context.TODO(), "/cron/jobs/", clientv3.WithPrefix()); err != nil {
fmt.Println(err)
return
}
~~~
## 删除连续的2个key
~~~
if deleteResp, err = kv.Delete(context.TODO(), "/cron/jobs/job1", clientv3.WithFromKey(), clientv3.WithLimit(2)); err != nil {
fmt.Println(err)
return
}
~~~
## 租约
~~~
import (
"context"
"fmt"
"go.etcd.io/etcd/clientv3"
"time"
)
func main() {
var (
config clientv3.Config //服务器配置
client *clientv3.Client //客户端连接对象
err error
lease clientv3.Lease //租约对象
leaseGrantResp *clientv3.LeaseGrantResponse //申请到的租约
leaseId clientv3.LeaseID //租约id
putResp *clientv3.PutResponse //PUT对象
getResp *clientv3.GetResponse //get对象
kv clientv3.KV //kv操作对象
)
//客户端配置
config = clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
}
//建立连接
if client, err = clientv3.New(config); err != nil {
fmt.Println(err)
return
}
//关闭连接
defer client.Close()
//申请一个租约(lease)
lease = clientv3.NewLease(client)
//申请一个3秒的租约
if leaseGrantResp, err = lease.Grant(context.TODO(), 3); err != nil {
fmt.Println(err)
return
}
//拿到租约的id
leaseId = leaseGrantResp.ID
fmt.Println("租约id: ", leaseId)
//获得kv对象
kv = clientv3.NewKV(client)
//put一个kv,让他与租约关联起来,从而实现10秒后自动过期
//第三个参数是具体的值
if putResp, err = kv.Put(context.TODO(), "/cron/lock/job1", "hello", clientv3.WithLease(leaseId)); err != nil {
fmt.Println(err)
return
}
fmt.Println("写入成功: ", putResp.Header.Revision)
//time.Sleep(5 * time.Second)
//获取下kv
if getResp, err = kv.Get(context.TODO(), "/cron/lock/job1"); err != nil {
fmt.Println(err)
return
}
if getResp.Count == 0 {
fmt.Println("kv过期了")
return
}
fmt.Println("读取成功: ", getResp.Kvs)
}
~~~
## 续租
~~~
package main
import (
"context"
"fmt"
"go.etcd.io/etcd/clientv3"
"time"
)
func main() {
var (
config clientv3.Config //服务器配置
client *clientv3.Client //客户端连接对象
err error
lease clientv3.Lease //租约对象
leaseGrantResp *clientv3.LeaseGrantResponse //申请到的租约
leaseId clientv3.LeaseID //租约id
putResp *clientv3.PutResponse //PUT对象
getResp *clientv3.GetResponse //get对象
kv clientv3.KV //kv操作对象
keepResp *clientv3.LeaseKeepAliveResponse //续租channel中的对象
keepRespChan <-chan *clientv3.LeaseKeepAliveResponse //续租的channel
)
//客户端配置
config = clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
}
//建立连接
if client, err = clientv3.New(config); err != nil {
fmt.Println(err)
return
}
//关闭连接
defer client.Close()
//申请一个租约(lease)
lease = clientv3.NewLease(client)
//申请一个10秒的租约
if leaseGrantResp, err = lease.Grant(context.TODO(), 3); err != nil {
fmt.Println(err)
return
}
//拿到租约的id
leaseId = leaseGrantResp.ID
fmt.Println("租约id: ", leaseId)
//自动续租 KeepAliveOnce只续租一次 KeepAlive是一直续租里面有个协程维护着
if keepRespChan, err = lease.KeepAlive(context.TODO(), leaseId); err != nil {
fmt.Println(err)
return
}
//启动协程,消费这个租约
go func() {
for {
select {
//每秒续租一次
case keepResp = <-keepRespChan:
//如果维护租约中发生异常,网络重新连接后发现租约过期的话,或者我主动把context取消
if keepResp == nil {
fmt.Println("租约已经失效了")
//退出循环
goto END
} else {
//续租一切正常,打印租约id
fmt.Println("收到自动续租应答: ", keepResp.ID)
}
}
}
END:
}()
//获得kv对象
kv = clientv3.NewKV(client)
//put一个kv,让他与租约关联起来,从而实现10秒后自动过期
//第三个参数是具体的值
if putResp, err = kv.Put(context.TODO(), "/cron/lock/job1", "hello", clientv3.WithLease(leaseId)); err != nil {
fmt.Println(err)
return
}
fmt.Println("写入成功: ", putResp.Header.Revision)
//time.Sleep(5 * time.Second)
//获取下kv
if getResp, err = kv.Get(context.TODO(), "/cron/lock/job1"); err != nil {
fmt.Println(err)
return
}
if getResp.Count == 0 {
fmt.Println("kv过期了")
return
}
fmt.Println("读取成功: ", getResp.Kvs)
for {
;
}
}
~~~
## op封装get和put
~~~
import (
"context"
"fmt"
"go.etcd.io/etcd/clientv3"
"time"
)
func main() {
var (
config clientv3.Config //服务器配置
client *clientv3.Client //客户端连接对象
err error
kv clientv3.KV //kv操作对象
putOp clientv3.Op //op对象,赋值的
getOp clientv3.Op //op对象, 获取值的
opResp clientv3.OpResponse //op执行的返回结果
)
//客户端配置
config = clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
}
//建立连接
if client, err = clientv3.New(config); err != nil {
fmt.Println(err)
return
}
//关闭连接
defer client.Close()
//获得kv对象
kv = clientv3.NewKV(client)
//Op:opeartion 代表一个操作,具体操作封装在里面
putOp = clientv3.OpPut("/cron/jobs/job8", "111")
//执行op
if opResp, err = kv.Do(context.TODO(), putOp); err != nil {
fmt.Println(err)
return
}
//把opeartion变为put的 opResp.Put()
fmt.Println("写入Revision: ", opResp.Put().Header.Revision)
//get的
getOp = clientv3.OpGet("/cron/jobs/job8")
//执行op
if opResp, err = kv.Do(context.TODO(), getOp); err != nil {
fmt.Println(err)
return
}
fmt.Println("读取数据: ", opResp.Get().Kvs)
}
~~~
# 分布式锁
~~~
package main
import (
"context"
"fmt"
"go.etcd.io/etcd/clientv3"
"time"
)
func main() {
var (
config clientv3.Config //服务器配置
client *clientv3.Client //客户端连接对象
err error
kv clientv3.KV //kv操作对象
lease clientv3.Lease //租约对象
leaseGrantResp *clientv3.LeaseGrantResponse //申请到的租约
leaseId clientv3.LeaseID //租约id
keepResp *clientv3.LeaseKeepAliveResponse //续租channel中的对象
keepRespChan <-chan *clientv3.LeaseKeepAliveResponse //续租的channel
ctx context.Context //创建一个用于取消租约的context
cancelFunc context.CancelFunc //取消上下文
txn clientv3.Txn //事务
txnResp *clientv3.TxnResponse //事务提交的返回值
)
//客户端配置
config = clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
}
//建立连接
if client, err = clientv3.New(config); err != nil {
fmt.Println(err)
return
}
//关闭连接
defer client.Close()
//获得kv对象
kv = clientv3.NewKV(client)
//lease实现锁自动过期
//op操作
//txn事务: if else then
//1.上锁(创建租约,自动续租,拿着租约去抢占一个key)
//申请一个租约(lease)
lease = clientv3.NewLease(client)
//申请一个5秒的租约
if leaseGrantResp, err = lease.Grant(context.TODO(), 5); err != nil {
fmt.Println(err)
return
}
//拿到租约的id
leaseId = leaseGrantResp.ID
//准备一个用于取消自动续租的context
ctx, cancelFunc = context.WithCancel(context.TODO())
//确保函数退出后,自动续租会停止
defer cancelFunc()
//确保租约释放
defer lease.Revoke(context.TODO(), leaseId)
//自动续租 KeepAliveOnce只续租一次 KeepAlive是一直续租里面有个协程维护着
if keepRespChan, err = lease.KeepAlive(ctx, leaseId); err != nil {
fmt.Println(err)
return
}
//启动协程,消费这个租约
go func() {
for {
select {
//每秒续租一次
case keepResp = <-keepRespChan:
//如果维护租约中发生异常,网络重新连接后发现租约过期的话,或者我主动把context取消
if keepResp == nil {
fmt.Println("租约已经失效了")
//退出循环
goto END
} else {
//续租一切正常,打印租约id
fmt.Println("收到自动续租应答: ", keepResp.ID)
}
}
}
END:
}()
//if 不存在key,then设置他,else抢锁失败
//创建事务
txn = kv.Txn(context.TODO())
//定义事务
//job9的创建版本=0,满足了说明key不存在.满足就走then,不满足就走else
txn.If(clientv3.Compare(clientv3.CreateRevision("/cron/lock/job9"), "=", 0)).
Then(clientv3.OpPut("/cron/lock/job9", "", clientv3.WithLease(leaseId))).
Else(clientv3.OpGet("/cron/lock/job9"))
//提交事务
if txnResp, err = txn.Commit(); err != nil {
fmt.Println(err)
return
}
//判断是否抢到了锁
if !txnResp.Succeeded {
//没抢到锁的话,取else部分的返回值
fmt.Println("锁被占用: ", txnResp.Responses[0].GetResponseRange().Kvs)
}
//2. 处理业务
//在锁内很安全
fmt.Println("----------处理任务")
time.Sleep(5 * time.Second)
//3. 释放锁(取消自动续租,释放租约,一释放与租约关联的key就被删除了)
//defer会把租约释放掉,关联的kv就被删除了
}
~~~
- 基础
- 简介
- 主要特征
- 变量和常量
- 编码转换
- 数组
- byte与rune
- big
- sort接口
- 和mysql类型对应
- 函数
- 闭包
- 工作区
- 复合类型
- 指针
- 切片
- map
- 结构体
- sync.Map
- 随机数
- 面向对象
- 匿名组合
- 方法
- 接口
- 权限
- 类型查询
- 异常处理
- error
- panic
- recover
- 自定义错误
- 字符串处理
- 正则表达式
- json
- 文件操作
- os
- 文件读写
- 目录
- bufio
- ioutil
- gob
- 栈帧的内存布局
- shell
- 时间处理
- time详情
- time使用
- new和make的区别
- container
- list
- heap
- ring
- 测试
- 单元测试
- Mock依赖
- delve
- 命令
- TestMain
- path和filepath包
- log日志
- 反射
- 详解
- plugin包
- 信号
- goto
- 协程
- 简介
- 创建
- 协程退出
- runtime
- channel
- select
- 死锁
- 互斥锁
- 读写锁
- 条件变量
- 嵌套
- 计算单个协程占用内存
- 执行规则
- 原子操作
- WaitGroup
- 定时器
- 对象池
- sync.once
- 网络编程
- 分层模型
- socket
- tcp
- udp
- 服务端
- 客户端
- 并发服务器
- Http
- 简介
- http服务器
- http客户端
- 爬虫
- 平滑重启
- context
- httptest
- 优雅中止
- web服务平滑重启
- beego
- 安装
- 路由器
- orm
- 单表增删改查
- 多级表
- orm使用
- 高级查询
- 关系查询
- SQL查询
- 元数据二次定义
- 控制器
- 参数解析
- 过滤器
- 数据输出
- 表单数据验证
- 错误处理
- 日志
- 模块
- cache
- task
- 调试模块
- config
- 部署
- 一些包
- gjson
- goredis
- collection
- sjson
- redigo
- aliyunoss
- 密码
- 对称加密
- 非对称加密
- 单向散列函数
- 消息认证
- 数字签名
- mysql优化
- 常见错误
- go run的错误
- 新手常见错误
- 中级错误
- 高级错误
- 常用工具
- 协程-泄露
- go env
- gometalinter代码检查
- go build
- go clean
- go test
- 包管理器
- go mod
- gopm
- go fmt
- pprof
- 提高编译
- go get
- 代理
- 其他的知识
- go内存对齐
- 细节总结
- nginx路由匹配
- 一些博客
- redis为什么快
- cpu高速缓存
- 常用命令
- Go 永久阻塞的方法
- 常用技巧
- 密码加密解密
- for 循环迭代变量
- 备注
- 垃圾回收
- 协程和纤程
- tar-gz
- 红包算法
- 解决golang.org/x 下载失败
- 逃逸分析
- docker
- 镜像
- 容器
- 数据卷
- 网络管理
- 网络模式
- dockerfile
- docker-composer
- 微服务
- protoBuf
- GRPC
- tls
- consul
- micro
- crontab
- shell调用
- gorhill/cronexpr
- raft
- go操作etcd
- mongodb