应用 docker 创立的三个 Zookeeper 服务端组成的集群,其 ip 地址别离为:
- 172.17.0.2
- 172.17.0.3
- 172.17.0.4
一、增删改查
1 增 / create
创立新节点一共有四种:
- 长久节点
- 长期节点
- 长久时序节点
- 长期时序节点
代码:
package main
import (
...
"github.com/go-zookeeper/zk"
)
func main() {conn, _, err := zk.Connect([]string{"172.17.0.2", "172.17.0.3", "172.17.0.4"}, time.Second)
if err != nil {panic(err)
}
defer conn.Close()
// 创立长久节点
path, err := conn.Create("/hello", []byte("world"), 0, zk.WorldACL(zk.PermAll))
if err != nil {panic(err)
}
println("Created", path)
// 创立长期节点,创立此节点的会话完结后立刻革除此节点
ephemeral, err := conn.Create("/ephemeral", []byte("1"), zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
if err != nil {panic(err)
}
println("Ephemeral node created:", ephemeral)
// 创立长久时序节点
sequence, err := conn.Create("/sequence", []byte("1"), zk.FlagSequence, zk.WorldACL(zk.PermAll))
if err != nil {panic(err)
}
println("Sequence node created:", sequence)
// 创立长期时序节点,创立此节点的会话完结后立刻革除此节点
ephemeralSequence, err := conn.Create("/ephemeralSequence", []byte("1"), zk.FlagEphemeral|zk.FlagSequence, zk.WorldACL(zk.PermAll))
if err != nil {panic(err)
}
println("Ephemeral-Sequence node created:", ephemeralSequence)
}
2 查 / get
增完了节点,接下来当然是查看一下节点信息:
package main
import (
...
"github.com/go-zookeeper/zk"
)
func main() {
...
result, state, err := conn.Get("/hello")
if err != nil {panic(err)
}
fmt.Println("result:", string(result))
fmt.Println("state ->")
fmt.Printf("cZxid=%d\nctime=%d\nmZxid=%d\nmtime=%d\npZxid=%d\ncversion=%d\ndataVersion=%d\naclVersion=%d\nephemeralOwner=%v\ndataLength=%d\nnumChildren=%d\n", state.Czxid, state.Ctime, state.Mzxid, state.Mtime, state.Pzxid, state.Cversion, state.Version, state.Aversion, state.EphemeralOwner, state.DataLength, state.NumChildren)
}
后果:
result: world
state ->
cZxid=4294967345
ctime=1618569545037
mZxid=4294967345
mtime=1618569545037
pZxid=4294967345
cversion=0
dataVersion=0
aclVersion=0
ephemeralOwner=0
dataLength=5
numChildren=0
3 改 / set
代码:
package main
import (
...
"github.com/go-zookeeper/zk"
)
func main() {
...
path := "/hello"
_, state, _ := conn.Get(path)
state, err = conn.Set(path, []byte("girl"), state.Version)
if err != nil {panic(err)
}
fmt.Println("state ->")
fmt.Printf("cZxid=%d\nctime=%d\nmZxid=%d\nmtime=%d\npZxid=%d\ncversion=%d\ndataVersion=%d\naclVersion=%d\nephemeralOwner=%v\ndataLength=%d\nnumChildren=%d\n", state.Czxid, state.Ctime, state.Mzxid, state.Mtime, state.Pzxid, state.Cversion, state.Version, state.Aversion, state.EphemeralOwner, state.DataLength, state.NumChildren)
data, _, err := conn.Get(path)
if err != nil {panic(err)
}
fmt.Println("\nnew value:", string(data))
}
后果:
state ->
cZxid=4294967345
ctime=1618569545037
mZxid=4294967372
mtime=1618575729297
pZxid=4294967345
cversion=0
dataVersion=1
aclVersion=0
ephemeralOwner=0
dataLength=4
numChildren=0
new value: girl
4 删 / delete
代码:
package main
import (
...
"github.com/go-zookeeper/zk"
)
func main() {
...
path := "/hello"
exists, state, err := conn.Exists(path)
fmt.Printf("\npath[%s] exists: %v\n", path, exists)
err = conn.Delete(path, state.Version)
if err != nil {panic(err)
}
fmt.Printf("path[%s] is deleted.", path)
exists, _, err = conn.Exists(path)
fmt.Printf("\npath[%s] exists: %v\n", path, exists)
}
后果:
path[/hello] exists: true
path[/hello] is deleted.
path[/hello] exists: false
二、权限 / ACL
zk 的节点有 5 种权限:CREATE、READ、WRITE、DELETE 和 ADMIN。
ACL 权限由 scheme:id:permissions
组成。
scheme
有 4 种形式:
- world
- auth
- digest
- ip
上面对这 4 种形式都测试一遍。
1 world
默认形式,相当于全世界都能拜访。
将 /test
节点的权限批改为 crwa
后尝试删除其子节点 /1
:
func main() {
...
// get acl
acl, state, err := conn.GetACL("/test")
if err != nil {panic(err)
}
fmt.Println("\nget acl:")
fmt.Println("scheme =", acl[0].Scheme)
fmt.Println("id =", acl[0].ID)
fmt.Println("permissions =", acl[0].Perms)
// set acl
perms := zk.PermCreate | zk.PermRead | zk.PermWrite | zk.PermAdmin // crwa 权限
state, err = conn.SetACL("/test", zk.WorldACL(int32(perms)), state.Version)
if err != nil {panic(err)
}
fmt.Println("SetAcl successful.")
// create child node
_, err = conn.Create("/test/1", []byte("1"), 0, zk.WorldACL(zk.PermAll))
if err != nil {panic(err)
}
// get child node
_, state, err = conn.Get("/test/1")
if err != nil {panic(err)
}
// delete child node /1
err = conn.Delete("/test/1", state.Version)
if err != nil {fmt.Println("delete failed:", err.Error())
os.Exit(1)
}
}
后果:
get acl:
scheme = world
id = anyone
permissions = 31
SetAcl successful.
delete failed: zk: not authenticated
exit status 1
能够看见,因为权限问题无奈删除子节点 /1
,即便子节点/1
赋予的是全副权限。
第 1 大节除了测试 world 外,还对 ACL 权限的设置办法进行了初步的摸索。
2 auth
auth 用来授予用户权限,所以须要先创立用户。
为不存在的用户受权
func main() {
...
_, state, err := conn.Get("/test")
if err != nil {panic(err)
}
acl := zk.ACL{
Perms: 31, // cdrwa
Scheme: "auth",
ID: "user:123456", // 不存在的用户
}
// 为不存在的用户受权
_, err = conn.SetACL("/test", []zk.ACL{acl}, state.Version)
if err != nil {panic(err)
}
}
$ go run main.go
panic: zk: invalid ACL specified
节点对指定用户受权
func main() {
...
_, state, err := conn.Get("/test")
if err != nil {panic(err)
}
// 用户受权,用户不存在的话会新建
err = conn.AddAuth("digest", []byte("user1:123456"))
if err != nil {panic(err)
}
acl := zk.ACL{
Perms: 31, // cdrwa,也能够用 zk 权限位或计算
Scheme: "auth",
ID: "user1:123456", // 用户名和明码,明码明文或密文皆可
}
// 为用户受权
_, err = conn.SetACL("/test", []zk.ACL{acl}, state.Version)
if err != nil {panic(err)
}
fmt.Println("节点 [/test] 已对用户 user1 受权")
}
$ go run main.go
节点 [/test] 已对用户 user1 受权
受权实现后,须要验证用户认证信息能力进行下一步操作。
应用未受权用户 (如 world) 拜访
func main() {data, _, err := conn.Get("/test")
if err != nil {panic(err)
}
fmt.Println(string(data))
}
$ go run main.go
panic: zk: not authenticated
如果应用不正确的用户名和明码,失去的会是同样的用户认证失败的后果。
应用指定用户拜访
func main() {
// 每个会话都要验证受权
err = conn.AddAuth("digest", []byte("user1:123456"))
if err != nil {panic(err)
}
// 查问节点内容
data, _, err := conn.Get("/test")
if err != nil {panic(err)
}
fmt.Println("节点[/test] 存储的内容:", string(data))
// 查问节点 acl
acl, _, err := conn.GetACL("/test")
if err != nil {panic(err)
}
fmt.Println("acl 信息:")
fmt.Println("scheme =", acl[0].Scheme)
fmt.Println("id =", acl[0].ID)
fmt.Println("permissions =", acl[0].Perms)
}
}
$ go run main.go
节点[/test] 存储的内容:test
acl 信息:scheme = digest
id = user1:HYGa7IZRm2PUBFiFFu8xY2pPP/s=
permissions = 31
能够看到,在查问节点的 acl 信息时返回的用户明码是加密过的密文。
3 digest
digest
与 auth
基本相同,惟一的区别在于设置权限时,明码须要应用密文。
zk golang 库中有专为 digest
结构的办法:
zk.DigestACL(perms int32, user, password string)
此办法传入的明码须要是明文,其外部逻辑会将明文转为密文再向 zookeeper 传递。
应用示例:
conn.SetACL("/test", zk.DigestACL(31, "user1", "123456"), 0)
4 ip
ip 权限顾名思义,就是限度 ip 地址的拜访权限。
把节点的权限设置给指定的 ip 地址后,其余 ip 将无法访问该节点。
设置指定 ip
func main() {conn, _, err := zk.Connect([]string{"172.17.0.2", "172.17.0.3", "172.17.0.4"}, time.Second)
if err != nil {panic(err)
}
defer conn.Close()
_, err = conn.SetACL("/ip", ipACL(31, "172.17.0.1"), 0)
if err != nil {panic(err)
}
fmt.Println("节点[/ip] 已设置 ip 权限")
}
其余 ip 拜访此节点
新建了一个 docker 容器,ip 地址为 172.17.0.5,拜访此节点时:
data, _, err := conn.Get("/ip")
if err != nil {panic(err)
}
fmt.Println(string(data))
panic: zk: not authenticated
报认证谬误。
应用指定 ip 拜访此节点
data, _, err := conn.Get("/ip")
if err != nil {panic(err)
}
fmt.Println("节点[/ip] 存储的内容:", string(data))
acl, _, err := conn.GetACL("/ip")
if err != nil {panic(err)
}
fmt.Println("\nacl 信息:")
fmt.Println("scheme =", acl[0].Scheme)
fmt.Println("id =", acl[0].ID)
fmt.Println("permissions =", acl[0].Perms)
后果:
节点[/ip] 存储的内容:ip
acl 信息:scheme = ip
id = 172.17.0.1
permissions = 31
三、监听 / watch
watch 用来实现公布 / 订阅性能,可能让多个订阅者同时监听某一个主题对象,当这个主题对象本身状态发生变化时,会告诉所有订阅者。
每个 watch 仅有一次触发的机会,一旦触发会立刻生效,想要继续监听,就须要始终注册。
go-zookeeper 中的监听机制是通过事件 / Event 实现的。
1 监听一个节点
1.1 全局监听
将监听器放到 Connect
函数中,如果有监听事件产生,会始终执行监听器的回调函数。
示例代码:
func callback(e zk.Event) {fmt.Println("++++++++++++++++++++++++")
fmt.Println("path:", e.Path)
fmt.Println("type:", e.Type.String())
fmt.Println("state:", e.State.String())
fmt.Println("------------------------")
}
func main() {eventCallbackOption := zk.WithEventCallback(callback)
conn, _, err := zk.Connect([]string{"172.17.0.2", "172.17.0.3", "172.17.0.4"}, time.Second, eventCallbackOption)
if err != nil {panic(err)
}
defer conn.Close()
// 注册一个 watch
exists, state, _, err := conn.ExistsW("/watch")
if err != nil {panic(err)
}
if !exists {
// 创立 /watch 时,触发监听事件,watch 生效
_, err = conn.Create("/watch", []byte("watch"), zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
if err != nil {panic(err)
}
// 再注册一个 watch
_, state, _, err = conn.ExistsW("/watch")
if err != nil {panic(err)
}
}
// 删除 /watch 时,触发监听事件,watch 生效
err = conn.Delete("/watch", state.Version)
if err != nil {panic(err)
}
}
后果:
++++++++++++++++++++++++
path:
type: EventSession
state: StateConnecting
------------------------
++++++++++++++++++++++++
path:
type: EventSession
state: StateConnected
------------------------
2021/04/18 16:57:11 connected to 172.17.0.2:2181
++++++++++++++++++++++++
path:
type: EventSession
state: StateHasSession
------------------------
2021/04/18 16:57:11 authenticated: id=72057651018596414, timeout=4000
2021/04/18 16:57:11 re-submitting `0` credentials after reconnect
++++++++++++++++++++++++
path: /watch
type: EventNodeCreated
state: Unknown
------------------------
++++++++++++++++++++++++
path: /watch
type: EventNodeDeleted
state: Unknown
------------------------
2021/04/18 16:57:11 recv loop terminated: EOF
2021/04/18 16:57:11 send loop terminated: <nil>
++++++++++++++++++++++++
path:
type: EventSession
state: StateDisconnected
------------------------
1.2 只监听局部事件
对于不须要设置全局监听器的场景,须要对事件 channel 进行操作,watch 应用一次就生效。
func main() {conn, _, err := zk.Connect([]string{"172.17.0.2", "172.17.0.3", "172.17.0.4"}, time.Second)
if err != nil {panic(err)
}
defer conn.Close()
// 注册一个 watch
exists, _, eventChannel, err := conn.ExistsW("/watch")
if err != nil {panic(err)
}
go func() {
// 从事件 channel 中取出事件
e := <-eventChannel
fmt.Println("++++++++++++++++++++++++")
fmt.Println("path:", e.Path)
fmt.Println("type:", e.Type.String())
fmt.Println("state:", e.State.String())
fmt.Println("------------------------")
}()
if !exists {
// 创立 /watch 时,触发监听事件,watch 生效
_, err = conn.Create("/watch", []byte("watch"), zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
if err != nil {panic(err)
}
}
}
2 监听子节点
TODO: 监听子节点时服务端返回的事件类型可能不正确,待欠缺
四、客户端随机 host
func main() {hosts := []string{"172.17.0.2:2181", "172.17.0.3:2181", "172.17.0.4:2181"}
hostProvider := new(zk.DNSHostProvider)
err := hostProvider.Init(hosts)
if err != nil {panic(err)
}
host, retry := hostProvider.Next() // 取得 host
fmt.Println(host, retry)
time.Sleep(10 * time.Second) // 做一些事件
hostProvider.Connected() // 将应用过的 host 放到 host_list 最初}
zk.Connect
函数在执行时,conn
内的公有属性 hostProvider
曾经对传的 host 切片做了 conn.hostProvider.Init(srvs)
解决。
但同时,也能够通过 WithHostProvider
函数替换默认的hostProvider
。
五、分布式锁
go-zookeeper 增加分布式锁的办法为NewLock(c *Conn, path string, acl []ACL)
。
锁的构造体为:
type Lock struct {
c *Conn
path string
acl []ACL
lockPath string
seq int
}
这个构造体实现了三个办法:Lock()
,LockWithData(data []byte)
和Unlock()
。
Lock()
是 LockWithData(data []byte)
中传入了空参数:
func (l *Lock) Lock() error {return l.LockWithData([]byte{})
}
1 原理
zookeeper
的分布式锁能够利用每个节点的唯一性来实现,但所有服务监听一个节点对于分布式系统来说齐全是资源节约。
而 zookeeper
能够利用长期时序 / 长期程序节点来创立一个有序的长期节点列表来实现分布式锁:
服务 A 创立了节点 a,此时节点 a 的后面没有节点,所以服务 A 能够执行。此时服务 B 创立了节点 b,节点 b 是节点 a 的下一个节点,那么服务 B 只须要监听节点 a 即可。
也就是说,因为长期有序节点列表是有序的,所以每个服务只须要监听本人创立的节点的前一个节点即可。
2 示例代码
上面是 50 个 Gorouine 进行抢锁的示例:
func main() {conn, _, err := zk.Connect([]string{"172.17.0.2", "172.17.0.3", "172.17.0.4"}, time.Second)
if err != nil {panic(err)
}
defer conn.Close()
var wg sync.WaitGroup
for i := 0; i < 50; i++ {wg.Add(1)
go func(n int) {defer wg.Done()
lock := zk.NewLock(conn, "/root/lock", zk.WorldACL(zk.PermAll))
err = lock.LockWithData([]byte("it is a lock"))
if err != nil {panic(err)
}
fmt.Println("第", n, "个 goroutine 获取到了锁")
time.Sleep(time.Second) // 1 秒后开释锁
lock.Unlock()}(i)
}
wg.Wait()}
后果:
第 49 个 goroutine 获取到了锁
第 12 个 goroutine 获取到了锁
第 13 个 goroutine 获取到了锁
第 11 个 goroutine 获取到了锁
...
第 28 个 goroutine 获取到了锁
第 30 个 goroutine 获取到了锁
第 26 个 goroutine 获取到了锁
第 31 个 goroutine 获取到了锁