应用 docker 创立的三个 Zookeeper 服务端组成的集群,其 ip 地址别离为:

  • 172.17.0.2
  • 172.17.0.3
  • 172.17.0.4

一、增删改查

1 增 / create

创立新节点一共有四种:

  • 长久节点
  • 长期节点
  • 长久时序节点
  • 长期时序节点

代码:

package mainimport (  ...    "github.com/go-zookeeper/zk")func main() {    conn, _, err := zk.Connect([]string{"172.17.0.2", "172.17.0.3", "172.17.0.4"}, time.Second)    if err != nil {        panic(err)    }    defer conn.Close()    // 创立长久节点    path, err := conn.Create("/hello", []byte("world"), 0, zk.WorldACL(zk.PermAll))    if err != nil {        panic(err)    }    println("Created", path)    // 创立长期节点,创立此节点的会话完结后立刻革除此节点    ephemeral, err := conn.Create("/ephemeral", []byte("1"), zk.FlagEphemeral, zk.WorldACL(zk.PermAll))    if err != nil {        panic(err)    }    println("Ephemeral node created:", ephemeral)    // 创立长久时序节点    sequence, err := conn.Create("/sequence", []byte("1"), zk.FlagSequence, zk.WorldACL(zk.PermAll))    if err != nil {        panic(err)    }    println("Sequence node created:", sequence)    // 创立长期时序节点,创立此节点的会话完结后立刻革除此节点    ephemeralSequence, err := conn.Create("/ephemeralSequence", []byte("1"), zk.FlagEphemeral|zk.FlagSequence, zk.WorldACL(zk.PermAll))    if err != nil {        panic(err)    }    println("Ephemeral-Sequence node created:", ephemeralSequence)}

2 查 / get

增完了节点,接下来当然是查看一下节点信息:

package mainimport (    ...  "github.com/go-zookeeper/zk")func main() {    ...    result, state, err := conn.Get("/hello")    if err != nil {        panic(err)    }    fmt.Println("result: ", string(result))    fmt.Println("state ->")    fmt.Printf("cZxid=%d\nctime=%d\nmZxid=%d\nmtime=%d\npZxid=%d\ncversion=%d\ndataVersion=%d\naclVersion=%d\nephemeralOwner=%v\ndataLength=%d\nnumChildren=%d\n", state.Czxid, state.Ctime, state.Mzxid, state.Mtime, state.Pzxid, state.Cversion, state.Version, state.Aversion, state.EphemeralOwner, state.DataLength, state.NumChildren)}

后果:

result:  worldstate ->cZxid=4294967345ctime=1618569545037mZxid=4294967345mtime=1618569545037pZxid=4294967345cversion=0dataVersion=0aclVersion=0ephemeralOwner=0dataLength=5numChildren=0

3 改 / set

代码:

package mainimport (    ...    "github.com/go-zookeeper/zk")func main() {    ...        path := "/hello"    _, state, _ := conn.Get(path)    state, err = conn.Set(path, []byte("girl"), state.Version)    if err != nil {        panic(err)    }    fmt.Println("state ->")    fmt.Printf("cZxid=%d\nctime=%d\nmZxid=%d\nmtime=%d\npZxid=%d\ncversion=%d\ndataVersion=%d\naclVersion=%d\nephemeralOwner=%v\ndataLength=%d\nnumChildren=%d\n", state.Czxid, state.Ctime, state.Mzxid, state.Mtime, state.Pzxid, state.Cversion, state.Version, state.Aversion, state.EphemeralOwner, state.DataLength, state.NumChildren)      data, _, err := conn.Get(path)    if err != nil {        panic(err)    }    fmt.Println("\nnew value: ", string(data))}

后果:

state ->cZxid=4294967345ctime=1618569545037mZxid=4294967372mtime=1618575729297pZxid=4294967345cversion=0dataVersion=1aclVersion=0ephemeralOwner=0dataLength=4numChildren=0new value:  girl

4 删 / delete

代码:

package mainimport (    ...    "github.com/go-zookeeper/zk")func main() {    ...    path := "/hello"    exists, state, err := conn.Exists(path)    fmt.Printf("\npath[%s] exists: %v\n", path, exists)    err = conn.Delete(path, state.Version)    if err != nil {        panic(err)    }    fmt.Printf("path[%s] is deleted.", path)    exists, _, err = conn.Exists(path)    fmt.Printf("\npath[%s] exists: %v\n", path, exists)}

后果:

path[/hello] exists: truepath[/hello] is deleted.path[/hello] exists: false

二、权限 / ACL

zk 的节点有 5 种权限:CREATE、READ、WRITE、DELETE 和 ADMIN。

ACL 权限由 scheme:id:permissions 组成。

scheme有 4 种形式:

  • world
  • auth
  • digest
  • ip

上面对这 4 种形式都测试一遍。

1 world

默认形式,相当于全世界都能拜访。

/test节点的权限批改为 crwa 后尝试删除其子节点 /1

func main() {    ...    // get acl    acl, state, err := conn.GetACL("/test")    if err != nil {        panic(err)    }    fmt.Println("\nget acl:")    fmt.Println("scheme =", acl[0].Scheme)    fmt.Println("id =", acl[0].ID)    fmt.Println("permissions =", acl[0].Perms)    // set acl    perms := zk.PermCreate | zk.PermRead | zk.PermWrite | zk.PermAdmin // crwa 权限    state, err = conn.SetACL("/test", zk.WorldACL(int32(perms)), state.Version)    if err != nil {        panic(err)    }    fmt.Println("SetAcl successful.")    // create child node    _, err = conn.Create("/test/1", []byte("1"), 0, zk.WorldACL(zk.PermAll))    if err != nil {        panic(err)    }    // get child node    _, state, err = conn.Get("/test/1")    if err != nil {        panic(err)    }    // delete child node /1    err = conn.Delete("/test/1", state.Version)    if err != nil {        fmt.Println("delete failed: ", err.Error())        os.Exit(1)    }}

后果:

get acl:scheme = worldid = anyonepermissions = 31SetAcl successful.delete failed:  zk: not authenticatedexit status 1

能够看见,因为权限问题无奈删除子节点/1,即便子节点/1赋予的是全副权限。

第 1 大节除了测试 world 外,还对 ACL 权限的设置办法进行了初步的摸索。

2 auth

auth 用来授予用户权限,所以须要先创立用户。

为不存在的用户受权

func main() {    ...    _, state, err := conn.Get("/test")    if err != nil {        panic(err)    }    acl := zk.ACL{        Perms:  31, // cdrwa        Scheme: "auth",        ID:     "user:123456", // 不存在的用户    }    // 为不存在的用户受权    _, err = conn.SetACL("/test", []zk.ACL{acl}, state.Version)    if err != nil {        panic(err)    }}$ go run main.gopanic: zk: invalid ACL specified

节点对指定用户受权

func main() {    ...    _, state, err := conn.Get("/test")    if err != nil {        panic(err)    }    // 用户受权,用户不存在的话会新建    err = conn.AddAuth("digest", []byte("user1:123456"))    if err != nil {        panic(err)    }        acl := zk.ACL{        Perms:  31, // cdrwa,也能够用zk权限位或计算        Scheme: "auth",        ID:     "user1:123456", // 用户名和明码,明码明文或密文皆可    }        // 为用户受权    _, err = conn.SetACL("/test", []zk.ACL{acl}, state.Version)    if err != nil {        panic(err)    }    fmt.Println("节点[/test]已对用户 user1 受权")}$ go run main.go节点[/test]已对用户 user1 受权

受权实现后,须要验证用户认证信息能力进行下一步操作。

应用未受权用户(如 world)拜访

func main() {    data, _, err := conn.Get("/test")    if err != nil {        panic(err)    }    fmt.Println(string(data))}$ go run main.gopanic: zk: not authenticated

如果应用不正确的用户名和明码,失去的会是同样的用户认证失败的后果。

应用指定用户拜访

func main() {    // 每个会话都要验证受权    err = conn.AddAuth("digest", []byte("user1:123456"))    if err != nil {        panic(err)    }        // 查问节点内容    data, _, err := conn.Get("/test")    if err != nil {        panic(err)    }    fmt.Println("节点[/test] 存储的内容:", string(data))    // 查问节点 acl    acl, _, err := conn.GetACL("/test")    if err != nil {        panic(err)    }    fmt.Println("acl 信息:")    fmt.Println("scheme =", acl[0].Scheme)    fmt.Println("id =", acl[0].ID)    fmt.Println("permissions =", acl[0].Perms)}}$ go run main.go节点[/test] 存储的内容: testacl 信息:scheme = digestid = user1:HYGa7IZRm2PUBFiFFu8xY2pPP/s=permissions = 31

能够看到,在查问节点的 acl 信息时返回的用户明码是加密过的密文。

3 digest

digestauth基本相同,惟一的区别在于设置权限时,明码须要应用密文。

zk golang 库中有专为digest结构的办法:

zk.DigestACL(perms int32, user, password string)

此办法传入的明码须要是明文,其外部逻辑会将明文转为密文再向 zookeeper 传递。

应用示例:

conn.SetACL("/test", zk.DigestACL(31, "user1", "123456"), 0)

4 ip

ip 权限顾名思义,就是限度 ip 地址的拜访权限。

把节点的权限设置给指定的 ip 地址后,其余 ip 将无法访问该节点。

设置指定 ip

func main() {    conn, _, err := zk.Connect([]string{"172.17.0.2", "172.17.0.3", "172.17.0.4"}, time.Second)    if err != nil {        panic(err)    }    defer conn.Close()    _, err = conn.SetACL("/ip", ipACL(31, "172.17.0.1"), 0)    if err != nil {        panic(err)    }    fmt.Println("节点[/ip] 已设置 ip 权限")}

其余 ip 拜访此节点

新建了一个 docker 容器,ip 地址为 172.17.0.5,拜访此节点时:

    data, _, err := conn.Get("/ip")    if err != nil {        panic(err)    }    fmt.Println(string(data))
panic: zk: not authenticated

报认证谬误。

应用指定 ip 拜访此节点

    data, _, err := conn.Get("/ip")    if err != nil {        panic(err)    }    fmt.Println("节点[/ip] 存储的内容:", string(data))    acl, _, err := conn.GetACL("/ip")    if err != nil {        panic(err)    }    fmt.Println("\nacl 信息:")    fmt.Println("scheme =", acl[0].Scheme)    fmt.Println("id =", acl[0].ID)    fmt.Println("permissions =", acl[0].Perms)

后果:

节点[/ip] 存储的内容:ipacl 信息:scheme = ipid = 172.17.0.1permissions = 31

三、监听 / watch

watch 用来实现公布/订阅性能,可能让多个订阅者同时监听某一个主题对象,当这个主题对象本身状态发生变化时,会告诉所有订阅者。

每个 watch 仅有一次触发的机会,一旦触发会立刻生效,想要继续监听,就须要始终注册。

go-zookeeper 中的监听机制是通过事件/ Event 实现的。

1 监听一个节点

1.1 全局监听

将监听器放到Connect函数中,如果有监听事件产生,会始终执行监听器的回调函数。

示例代码:

func callback(e zk.Event) {    fmt.Println("++++++++++++++++++++++++")    fmt.Println("path:", e.Path)    fmt.Println("type:", e.Type.String())    fmt.Println("state:", e.State.String())    fmt.Println("------------------------")}func main() {    eventCallbackOption := zk.WithEventCallback(callback)    conn, _, err := zk.Connect([]string{"172.17.0.2", "172.17.0.3", "172.17.0.4"}, time.Second, eventCallbackOption)    if err != nil {        panic(err)    }    defer conn.Close()    // 注册一个 watch    exists, state, _, err := conn.ExistsW("/watch")    if err != nil {        panic(err)    }    if !exists {        // 创立 /watch 时,触发监听事件,watch 生效        _, err = conn.Create("/watch", []byte("watch"), zk.FlagEphemeral, zk.WorldACL(zk.PermAll))        if err != nil {            panic(err)        }        // 再注册一个 watch        _, state, _, err = conn.ExistsW("/watch")        if err != nil {            panic(err)        }    }    // 删除 /watch 时,触发监听事件,watch 生效    err = conn.Delete("/watch", state.Version)    if err != nil {        panic(err)    }}

后果:

++++++++++++++++++++++++path: type: EventSessionstate: StateConnecting------------------------++++++++++++++++++++++++path: type: EventSessionstate: StateConnected------------------------2021/04/18 16:57:11 connected to 172.17.0.2:2181++++++++++++++++++++++++path: type: EventSessionstate: StateHasSession------------------------2021/04/18 16:57:11 authenticated: id=72057651018596414, timeout=40002021/04/18 16:57:11 re-submitting `0` credentials after reconnect++++++++++++++++++++++++path: /watchtype: EventNodeCreatedstate: Unknown------------------------++++++++++++++++++++++++path: /watchtype: EventNodeDeletedstate: Unknown------------------------2021/04/18 16:57:11 recv loop terminated: EOF2021/04/18 16:57:11 send loop terminated: <nil>++++++++++++++++++++++++path: type: EventSessionstate: StateDisconnected------------------------

1.2 只监听局部事件

对于不须要设置全局监听器的场景,须要对事件 channel 进行操作,watch 应用一次就生效。

func main() {    conn, _, err := zk.Connect([]string{"172.17.0.2", "172.17.0.3", "172.17.0.4"}, time.Second)    if err != nil {        panic(err)    }    defer conn.Close()    // 注册一个 watch    exists, _, eventChannel, err := conn.ExistsW("/watch")    if err != nil {        panic(err)    }    go func() {        // 从事件 channel 中取出事件        e := <-eventChannel        fmt.Println("++++++++++++++++++++++++")        fmt.Println("path:", e.Path)        fmt.Println("type:", e.Type.String())        fmt.Println("state:", e.State.String())        fmt.Println("------------------------")    }()    if !exists {        // 创立 /watch 时,触发监听事件,watch 生效        _, err = conn.Create("/watch", []byte("watch"), zk.FlagEphemeral, zk.WorldACL(zk.PermAll))        if err != nil {            panic(err)        }    }}

2 监听子节点

TODO: 监听子节点时服务端返回的事件类型可能不正确,待欠缺

四、客户端随机 host

func main() {    hosts := []string{"172.17.0.2:2181", "172.17.0.3:2181", "172.17.0.4:2181"}    hostProvider := new(zk.DNSHostProvider)    err := hostProvider.Init(hosts)    if err != nil {        panic(err)    }    host, retry := hostProvider.Next() // 取得host    fmt.Println(host, retry)    time.Sleep(10 * time.Second) // 做一些事件    hostProvider.Connected() // 将应用过的 host 放到 host_list 最初}

zk.Connect函数在执行时,conn内的公有属性hostProvider曾经对传的 host 切片做了 conn.hostProvider.Init(srvs) 解决。

但同时,也能够通过WithHostProvider函数替换默认的hostProvider

五、分布式锁

go-zookeeper 增加分布式锁的办法为NewLock(c *Conn, path string, acl []ACL)

锁的构造体为:

type Lock struct {    c        *Conn    path     string    acl      []ACL    lockPath string    seq      int}

这个构造体实现了三个办法:Lock()LockWithData(data []byte)Unlock()

Lock()LockWithData(data []byte)中传入了空参数:

func (l *Lock) Lock() error {    return l.LockWithData([]byte{})}

1 原理

zookeeper的分布式锁能够利用每个节点的唯一性来实现,但所有服务监听一个节点对于分布式系统来说齐全是资源节约。

zookeeper能够利用长期时序/长期程序节点来创立一个有序的长期节点列表来实现分布式锁:

服务 A 创立了节点 a,此时节点 a 的后面没有节点,所以服务 A 能够执行。此时服务 B 创立了节点 b,节点 b 是节点 a 的下一个节点,那么服务 B 只须要监听节点 a 即可。

也就是说,因为长期有序节点列表是有序的,所以每个服务只须要监听本人创立的节点的前一个节点即可。

2 示例代码

上面是 50 个 Gorouine 进行抢锁的示例:

func main() {    conn, _, err := zk.Connect([]string{"172.17.0.2", "172.17.0.3", "172.17.0.4"}, time.Second)    if err != nil {        panic(err)    }    defer conn.Close()    var wg sync.WaitGroup    for i := 0; i < 50; i++ {        wg.Add(1)        go func(n int) {            defer wg.Done()            lock := zk.NewLock(conn, "/root/lock", zk.WorldACL(zk.PermAll))            err = lock.LockWithData([]byte("it is a lock"))            if err != nil {                panic(err)            }            fmt.Println("第", n, "个 goroutine 获取到了锁")            time.Sleep(time.Second) // 1 秒后开释锁            lock.Unlock()        }(i)    }    wg.Wait()}

后果:

第 49 个 goroutine 获取到了锁第 12 个 goroutine 获取到了锁第 13 个 goroutine 获取到了锁第 11 个 goroutine 获取到了锁...第 28 个 goroutine 获取到了锁第 30 个 goroutine 获取到了锁第 26 个 goroutine 获取到了锁第 31 个 goroutine 获取到了锁