我们采用Etcd作为服务的注册中心, 后面workflow和网关traefik也会使用etcd作为存储, 因此我们先学习下etcd的基本使用
由于我们的测试使用,因此使用docker搭建单节点etcd:
# windows上注意不要使用绝对路径: /usr/local/bin/etcd
# --listen-client-urls, --advertise-client-urls 必须带上, 后面使用api是的时候需要, 不然client 访问不到
docker run \
-itd \
-p 2379:2379 \
-p 2380:2380 \
--name etcd quay.io/coreos/etcd:latest etcd \
--listen-client-urls http://0.0.0.0:2379 \
--advertise-client-urls http://0.0.0.0:2379
通过命令查看当前etcd的版本
$ docker exec -it etcd etcdctl -version
etcdctl version: 3.3.8
API version: 2
查看当前实例
$ docker exec -it -e "ETCDCTL_API=3" etcd etcdctl member list -w table
+------------------+---------+---------+-----------------------+-----------------------+
| ID | STATUS | NAME | PEER ADDRS | CLIENT ADDRS |
+------------------+---------+---------+-----------------------+-----------------------+
| 8e9e05c52164694d | started | default | http://localhost:2380 | http://localhost:2379 |
+------------------+---------+---------+-----------------------+-----------------------+
注意: etcd容器没有shell, 你可以把他当做一个 二进制包来使用, 只是名字有点长而已
$ docker exec -it etcd etcdctl
NAME:
etcdctl - A simple command line client for etcd.
WARNING:
Environment variable ETCDCTL_API is not set; defaults to etcdctl v2.
Set environment variable ETCDCTL_API=3 to use v3 API or ETCDCTL_API=2 to use v2 API.
USAGE:
etcdctl [global options] command [command options] [arguments...]
VERSION:
3.3.8
COMMANDS:
backup backup an etcd directory
cluster-health check the health of the etcd cluster
mk make a new key with a given value
mkdir make a new directory
rm remove a key or a directory
rmdir removes the key if it is an empty directory or a key-value pair
get retrieve the value of a key
ls retrieve a directory
set set the value of a key
setdir create a new directory or update an existing directory TTL
update update an existing key with a given value
updatedir update an existing directory
watch watch a key for changes
...
etcd有2个版本, 默认是v2, ETCDCTL_API=3 来使用v3版本的
$ docker exec -it -e "ETCDCTL_API=3" etcd etcdctl
NAME:
etcdctl - A simple command line client for etcd3.
USAGE:
etcdctl
VERSION:
3.3.8
API VERSION:
3.3
COMMANDS:
get Gets the key or a range of keys
put Puts the given key into the store
del Removes the specified key or range of keys [key, range_end)
txn Txn processes all the requests in one transaction
compaction Compacts the event history in etcd
...
etcd 是kv数据库, 我们的所有操作都是对存在数据库里面的key-value进行操作
下面我们演示一个简单的基于etcd的配置中心的流程:
通过put 可以往数据库里面添加一条数据
# 写入数据
$ docker exec -it -e "ETCDCTL_API=3" etcd etcdctl put /registry/configs/default/cmdb "cmdb config object"
OK
# 读取数据
$ docker exec -it -e "ETCDCTL_API=3" etcd etcdctl get /registry/configs/default/cmdb
C:/Program Files/Git/registry/configs/default/cmdb
cmdb config object
## 注意, 这里由于是windows的原因, key多出了一部分: C:/Program Files/Git
## 估计和windows下的 git bash有关系, 直接使用etcd client是没有这个问题的
我们我们再次往这个key写入数据,会覆盖之前的值, 也就实现了修改的效果
# 写入数据
$ docker exec -it -e "ETCDCTL_API=3" etcd etcdctl put /registry/configs/default/cmdb "cmdb config object v2"
OK
# 读取数据
$ docker exec -it -e "ETCDCTL_API=3" etcd etcdctl get /registry/configs/default/cmdb
C:/Program Files/Git/registry/configs/default/cmdb
cmdb config object v2
读取数据采用get指令, 最基本的方法就是 get key_name, 这种方法只能读取一个key, 如果我们想要读取多个key, 需要通过--prefix, 这样就能读取前缀为 key_prefix 的所有key, 也就实现了list功能
因此我们在添加一个key: workflow的配置
$ docker exec -it -e "ETCDCTL_API=3" etcd etcdctl put /registry/configs/default/workflow "workflow config object v1"
OK
然后我们查看 当前注册的所有配置
$ docker exec -it -e "ETCDCTL_API=3" etcd etcdctl get --prefix /registry/configs
C:/Program Files/Git/registry/configs/default/cmdb
cmdb config object v2
C:/Program Files/Git/registry/configs/default/workflow
workflow config object v1
# Order of results; ASCEND or DESCEND (ASCEND by default)
$ docker exec -it -e "ETCDCTL_API=3" etcd etcdctl get --prefix --order="DESCEND" /registry/configs
C:/Program Files/Git/registry/configs/default/workflow
workflow config object v1
C:/Program Files/Git/registry/configs/default/cmdb
cmdb config object v2
$ docker exec -it -e "ETCDCTL_API=3" etcd etcdctl get --prefix --print-value-only /registry/configs
cmdb config object v2
workflow config object v1
$ docker exec -it -e "ETCDCTL_API=3" etcd etcdctl get --prefix --keys-only /registry/configs
C:/Program Files/Git/registry/configs/default/cmdb
C:/Program Files/Git/registry/configs/default/workflow
$ docker exec -it -e "ETCDCTL_API=3" etcd etcdctl get --prefix --keys-only --limit 1 /registry/configs
C:/Program Files/Git/registry/configs/default/cmdb
# 当前的keys
$ docker exec -it -e "ETCDCTL_API=3" etcd etcdctl get --prefix --keys-only /registry/configs
C:/Program Files/Git/registry/configs/default/cmdb
C:/Program Files/Git/registry/configs/default/keyauth
C:/Program Files/Git/registry/configs/default/workflow
## 我们要查询从keyauth开始的后面的key, 当然你也可以指定limit
$ docker exec -it -e "ETCDCTL_API=3" etcd etcdctl get --from-key --keys-only --limit=2 /registry/configs/default/keyauth
C:/Program Files/Git/registry/configs/default/keyauth
C:/Program Files/Git/registry/configs/default/workflow
# 可以看出,如果你要把他当场景的mysql或者mongo使用 是比较蓝瘦的, 你并不能指望数据来给你处理很多业务(MySQL 存储过程, Mongo MapReduce和函数)
etcd的value是有版本概念的, 我们我们之前修改过一次cmdb的配置, 当前是v2的配置
$ docker exec -it -e "ETCDCTL_API=3" etcd etcdctl get /registry/configs/default/cmdb
C:/Program Files/Git/registry/configs/default/cmdb
cmdb config object v2
其实没次key有修改的时候 都会返回一个该key的版本号的, 但是需要-w json 才能查看到, 下面我么修改2次cmdb的配置
$ docker exec -it -e "ETCDCTL_API=3" etcd etcdctl put /registry/configs/default/cmdb "cmdb config object v3" -w json
{"header":{"cluster_id":14841639068965178418,"member_id":10276657743932975437,"revision":6,"raft_term":2}}
$ docker exec -it -e "ETCDCTL_API=3" etcd etcdctl put /registry/configs/default/cmdb "cmdb config object v4" -w json
{"header":{"cluster_id":14841639068965178418,"member_id":10276657743932975437,"revision":7,"raft_term":2}}
如果我们没指定获取那个版本, 默认获取最新版本, 如果要获取历史版本需要--rev指定版本号
$ docker exec -it -e "ETCDCTL_API=3" etcd etcdctl get /registry/configs/default/cmdb
C:/Program Files/Git/registry/configs/default/cmdb
cmdb config object v4
# 获取版本6
$ docker exec -it -e "ETCDCTL_API=3" etcd etcdctl get --rev=6 /registry/configs/default/cmdb
C:/Program Files/Git/registry/configs/default/cmdb
cmdb config object v3
因此要使用etcd作为配置中心, 你需要保持好key的版本号
现在我们workflow服务下线了, 我需要删除他的配置怎么办? etcd 可以通过del 删除指定key, 也可以指定--prefix 删除一批key
# 删除后返回删除的个数
$ docker exec -it -e "ETCDCTL_API=3" etcd etcdctl del /registry/configs/default/workflow
1
# 再次查看 workflow的配置已经删除了
$ docker exec -it -e "ETCDCTL_API=3" etcd etcdctl get --prefix /registry/configs
C:/Program Files/Git/registry/configs/default/cmdb
cmdb config object v4
C:/Program Files/Git/registry/configs/default/keyauth
keyauth config object v1
像k8s里面的 watch list 就是使用的 该功能, 我们可以通过watch 一个prefix key, 当这个key有变化的时候 我们可以收到变化的数据
模拟node节点, watch 所有的config变化事件
$ docker exec -it -e "ETCDCTL_API=3" etcd etcdctl watch --prefix /registry/configs
模拟API server 修改 cmdb的配置
docker exec -it -e "ETCDCTL_API=3" etcd etcdctl put /registry/configs/default/cmdb "cmdb config object v5"
此时node就会收到该事件
$ docker exec -it -e "ETCDCTL_API=3" etcd etcdctl watch --prefix /registry/configs
PUT
C:/Program Files/Git/registry/configs/default/cmdb
cmdb config object v5
如果我们要为key 设置一个TTL喃?, 比如 这个key 存活时间为60s
$ docker exec -it -e "ETCDCTL_API=3" etcd etcdctl lease grant 60
lease 694d7d5f3050ef39 granted with TTL(60s)
$ docker exec -it -e "ETCDCTL_API=3" etcd etcdctl put /registry/configs/default/ttlkey --lease=694d7d5f3050ef39 "key with ttl"
OK
$ docker exec -it -e "ETCDCTL_API=3" etcd etcdctl get /registry/configs/default/ttlkey
C:/Program Files/Git/registry/configs/default/ttlkey
key with ttl
# 60s秒过后, 该key就查不到了
$ docker exec -it -e "ETCDCTL_API=3" etcd etcdctl get /registry/configs/default/ttlkey
比如我们一个服务启动了3个副本, 在修改数据A的时候需要 先获取锁才能修改, 不然修改顺序就乱了, 怎么办?
通过lock 来创建一个锁, 该锁未释放之前, 其他想要获取该锁的实例都会阻塞
$ docker exec -it -e "ETCDCTL_API=3" etcd etcdctl lock /registry/configs/default/lockkey
C:/Program Files/Git/registry/configs/default/lockkey/694d7d5f3050ef43
# 直到node1释放后,node2才能获取到锁
$ docker exec -it -e "ETCDCTL_API=3" etcd etcdctl lock /registry/configs/default/lockkey
C:/Program Files/Git/registry/configs/default/lockkey/694d7d5f3050ef55
etcd client的使用逻辑和cli基本一致
package basic
import (
"context"
"fmt"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
)
func NewClient() {
client, err := clientv3.New(clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
panic(err)
}
ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
resp, err := client.MemberList(ctx)
if err != nil {
panic(err)
}
fmt.Println(resp)
}
正常的情况下, 我们可以打印出当前etcd的member节点信息
&{cluster_id:14841639068965178418 member_id:10276657743932975437 raft_term:2 [ID:10276657743932975437 name:"default" peerURLs:"http://localhost:2380" clientURLs:"http://0.0.0.0:2379" ] {} [] 0}
// put
key := "/registry/configs/default/cmdb"
putResp, err := client.Put(ctx, key, "cmdb config v1")
if err != nil {
panic(err)
}
fmt.Println(putResp)
// get
getResp, err := client.Get(ctx, key)
if err != nil {
panic(err)
}
fmt.Println(getResp)
// get with prefix
getResp, err = client.Get(ctx, key, clientv3.WithPrefix())
if err != nil {
panic(err)
}
fmt.Println(getResp.Kvs)
get 支持多个参数, 和命令行的含义一样, 只是这里使用编程的opt语法传入
// delete
delResp, err := client.Delete(ctx, key, clientv3.WithPrevKV())
if err != nil {
panic(err)
}
fmt.Println(delResp.PrevKvs)
注意 这里添加一个参数WithPrevKV, 默认情况下 delete是无法获取到被删除的值得, 通过添加该参数可以 获取当前被删除的值
你也可以通过命令行确认结果
$ docker exec -it -e "ETCDCTL_API=3" etcd etcdctl get --prefix /registry/configs
先编写一个用于测试的包
package watch
import (
"context"
"fmt"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
)
var (
client *clientv3.Client
ctx, _ = context.WithTimeout(context.Background(), 5*time.Second)
)
func WatchConfig(key string) {
wc := client.Watch(context.Background(), key, clientv3.WithPrefix())
for v := range wc {
fmt.Println(v)
}
}
func UpdateConfig(data string) {
// put
key := "/registry/configs/default/cmdb"
putResp, err := client.Put(ctx, key, data)
if err != nil {
panic(err)
}
fmt.Println(putResp)
}
func init() {
c, err := clientv3.New(clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
panic(err)
}
client = c
}
api server 负责修改对象
package main
import "gitee.com/infraboard/go-course/day24/etcd/watch"
func main() {
watch.UpdateConfig("cmdb v3")
}
controller 负责watch 对象变化
package main
import "gitee.com/infraboard/go-course/day24/etcd/watch"
func main() {
watch.WatchConfig("/registry/configs")
}
然后我们测试修改
$ go run controler/main.go
revision 12 PUT key:"/registry/configs/default/cmdb" create_revision:6 mod_revision:12 version:7 value:"cmdb v3"
revision 13 PUT key:"/registry/configs/default/cmdb" create_revision:6 mod_revision:13 version:8 value:"cmdb v3"
revision 14 PUT key:"/registry/configs/default/cmdb" create_revision:6 mod_revision:14 version:9 value:"cmdb v3"
revision 15 DELETE key:"/registry/configs/default/cmdb" mod_revision:15
之前 1000 goroutine 修改全局变量累加的例子:
package main
import (
"sync"
)
// 全局变量
var counter int
func main() {
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter++
}()
}
wg.Wait()
println(counter)
}
当在同一个进程的时候我们可以使用: 互斥锁,可以解决并行抢占的问题
func main() {
var wg sync.WaitGroup
var lock sync.Mutex
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
lock.Lock()
counter++
lock.Unlock()
}()
}
wg.Wait()
println(counter)
}
如果我们不是在一个进程内部, 比如 你起了3个进程, 这3个进程 可能分布在不同的主机上, 这个时候进程锁 就无法满足需求了, 需要使用分布式锁
可以实现分布式锁能力的服务主要有(主要看数据一致性模型)
当然我们选择使用etcd实现, etcd 提供的 concurrency 就是解决分布式并发问题的:
下面我们利用etcd 实现一个分布式互斥锁
package etcd
import (
"context"
"fmt"
"sync"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
)
// 初始化sync.Locker对象.
func NewEtcdMutex(ctx context.Context, client *clientv3.Client, pfx string) (sync.Locker, error) {
// 创建Session对象.
// 在调用concurrency.NewSession时,会设置ttl,默认为60秒
// Session对象会持有对应的LeaseID,并会调用KeepAlive来续期
// 使得锁在Unlock之前一直是有效的,其它想抢占分布式锁的程序只能是等待
sess, err := concurrency.NewSession(client)
if err != nil {
return nil, err
}
// 创建Mutex对象. 需要指定锁的名称, 和命令行使用lock一样,就是key的prefix
m := concurrency.NewMutex(sess, "/registry/locks")
return &EtcdMutex{
sess: sess,
m: m,
pfx: pfx,
ctx: ctx,
}, nil
}
type EtcdMutex struct {
sess *concurrency.Session
m *concurrency.Mutex
pfx string
ctx context.Context
}
// 申请锁.
func (l *EtcdMutex) Lock() {
// 不是标准的sync.Locker接口,需要传入Context对象,在获取锁时可以设置超时时间,或主动取消请求.
err := l.m.Lock(l.ctx)
if err != nil {
panic(err)
}
fmt.Println("获取锁")
}
// 释放锁.
func (l *EtcdMutex) Unlock() {
err := l.m.Unlock(l.ctx)
if err != nil {
panic(err)
}
fmt.Println("释放锁")
}
写个node模拟多进程 获取互斥锁:
package main
import (
"context"
"time"
"gitee.com/infraboard/go-course/day24/etcd/lock/etcd"
clientv3 "go.etcd.io/etcd/client/v3"
)
var (
client *clientv3.Client
)
func main() {
lock, err := etcd.NewEtcdMutex(context.Background(), client, "/registry/locks")
if err != nil {
panic(err)
}
lock.Lock()
time.Sleep(10 * time.Second)
lock.Unlock()
}
func init() {
c, err := clientv3.New(clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
panic(err)
}
client = c
}
测试2个锁 是否能循环
如果你不想获取锁的时候阻塞, 比如当前有人持有锁,直接放弃抢占锁, 可以使用Trylock
// TryLock locks the mutex if not already locked by another session.
// If lock is held by another session, return immediately after attempting necessary cleanup
// The ctx argument is used for the sending/receiving Txn RPC.
func (m *Mutex) TryLock(ctx context.Context) error
因为etcd 是kv数据库, 所以没有where之类的操作, 如果你想要设置索引过滤对象, 只能设计好你的key
比如:
/registry/configs/namesapce/resource_name
历史版本越多,存储空间越大,性能越差,直到etcd到达空间配额限制的时候,etcd的写入将会被禁止变为只读,影响线上服务,因此这些历史版本需要进行压缩
数据压缩并不是清理现有数据,只是对给定版本之前的历史版本进行清理,清理后数据的历史版本将不能访问,但不会影响现有最新数据的访问
etcdctl compact 5
基础准备好了后,我们接下来 开始workflow的 API Server的开发
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。