侧边栏壁纸
博主头像
憨憨大头个人博客博主等级

心存希冀,目有繁星

  • 累计撰写 110 篇文章
  • 累计创建 13 个标签
  • 累计收到 0 条评论

目 录CONTENT

文章目录

Zookeeper的搭建和基本使用

Administrator
2024-08-03 / 0 评论 / 0 点赞 / 11 阅读 / 58789 字
Zookeeper

一、简介

Zookeeper 是高可用、高性能且一致的开源的分布式应用程序协调服务,提供的服务:统一命名服务、分布式协调、统一配置管理、分布式锁等功能。

可以理解为 zookeeper = 文件系统 + 通知机制。

官网地址:https://zookeeper.apache.org/
源码地址:https://github.com/apache/zookeeper

二、分布式集群搭建

1、环境准备

  1. 三台服务器:192.168.1.21、192.168.1.22、192.168.1.23

Zookeeper 集群的工作是超过半数才能对外提供服务,一般集群数量为奇数台(2*n + 1台)。

  1. 安装 JDK 环境:Linux 安装 JDK 教程

Zookeeper 是用 Java 编写的,所以需要运行于 Java 环境。

2、下载 zookeeper

Zookeeper 官网下载,下载 apache-zookeeper-3.6.3-bin.tar.gz ,下载带 bin 的文件,表示已编译过的。

Zookeeper下载指示图

3、解压下载的 zookeeper 到 /usr/local/ 目录

tar -zxvf apache-zookeeper-3.6.3.tar.gz -C /usr/local/

4、复制配置文件

进入 zookeeper 的 conf 目录,会看到 zoo_sample.cfg,这个文件是官方给出的 zookeeper 配置文件的样板文件,复制 zoo_sample.cfg 并命名为 zoo.cfg 作为配置文件。
zoo.cfg 这是 zookeeper 官方指定的配置文件名称,因为 zkService.sh 文件引用 zkEnv.sh 文件,而 zkEnv.sh 文件里面引用 zoo.cfg 配置文件。

cd /usr/local/apache-zookeeper-3.6.3/conf/
cp zoo_sample.cfg zoo.cfg

5、修改配置文件

1)、修改或添加以下配置内容

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/opt/zookeeper/zkdata
dataLogDir=/opt/zookeeper/zkdatalog
clientPort=2181
server.1=192.168.1.21:2888:3888
server.2=192.168.1.22:2888:3888
server.3=192.168.1.23:2888:3888

配置说明

tickTime:这个时间是作为 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳。

initLimit:这个配置项是用来配置 Zookeeper 接受客户端(这里所说的客户端不是用户连接 Zookeeper 服务器的客户端,而是 Zookeeper 服务器集群中连接到 Leader 的 Follower 服务器)初始化连接时最长能忍受多少个心跳时间间隔数。当已经超过 10个心跳的时间(也就是 tickTime)长度后 Zookeeper 服务器还没有收到客户端的返回信息,那么表明这个客户端连接失败。总的时间长度就是 102000=20 秒

syncLimit:这个配置项标识 Leader 与 Follower 之间发送消息,请求和应答时间长度,最长不能超过多少个 tickTime 的时间长度,总的时间长度就是 5*2000=10秒

dataDir:顾名思义就是 Zookeeper 保存数据的目录,默认情况下,Zookeeper 将写数据的日志文件也保存在这个目录里。

clientPort:这个端口就是客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求。

server.A=B:C:D:其中 A 是一个数字,表示这个是第几号服务器;B 是这个服务器的 ip 地址;C 表示的是这个服务器与集群中的 Leader 服务器交换信息的端口;D 表示的是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的 Leader,而这个端口就是用来执行选举时服务器相互通信的端口。如果是伪集群的配置方式,由于 B 都是一样,所以不同的 Zookeeper 实例通信端口号不能一样,所以要给它们分配不同的端口号。

2)、zoo.cfg 配置参数说明

参数说明
tickTime=2000zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,
每个 tickTime 时间就会发送一个心跳。
initLimit=10leader 和 follower 初始连接的最长等待心跳时长
initLimit * tickTime
syncLimit=5leader 和 follower 之间的同步通讯时长
syncLimit * tickTime
dataDir=/opt/zookeeper/zkdata快照日志的存储路径
dataLogDir=/opt/zookeeper/zkdatalog事物日志的存储路径
clientPort=2181客户端连接 zookeeper 服务器的端口号
Server.A=B:C:DA:第几号服务器
B:服务器 IP
C:leader 和 follower 通讯端口
D:leader 选举的端口

3)、创建 data 目录和 log 目录

mkdir /opt/zookeeper/zkdata
mkdir /opt/zookeeper/zkdatalog

4)、分别创建每个节点自己的 myid 文件

# server.1
echo "1" > /opt/zookeeper/zkdata/myid
# server.2
echo "2" > /opt/zookeeper/zkdata/myid
# server.3
echo "3" > /opt/zookeeper/zkdata/myid

6、启动 zookeeper

进入 zookeeper 的 bin 目录,分别启动 zookeeper

cd /usr/local/apache-zookeeper-3.6.3/bin/
./zkServer.sh start

7、检测 zookeeper 是否启动成功

查看 zookeeper 状态

./zkServer.sh status
# 输出以下内容表示启动成功,Mode 表示是主节点 leader 或从节点 follower
ZooKeeper JMX enabled by default
Using config: /usr/local/apache-zookeeper-3.6.3-bin/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: leader

8、使用命令连接 zookeeper

./zkCli.sh -server 192.168.1.21:2181,192.168.1.22:2181,192.168.1.23:2181

三、集群架构

集群架构图

1、Server

一台 zookeeper 服务器就是一个 server,zookeeper 集群是由多个 server 组成,每个 server 保存一份数据副本,每个 server 会有不同的角色。

Leader:处理事务请求,集群内部各服务器的调度者。

Follower:处理非事务请求,转发事务请求给 leader 服务器,参与 leader 选举投票。

Observer:处理非事务请求,转发事务请求给 leader 服务器,不参与 leader 选举投票。

2、Client

连接 zookeeper 服务的客户端。

四、Leader 选举机制

sid:服务器 id,用来标识一台 zookeeper 集群中的机器,每台机器不能重复,和 myid 一致。
zxid:事务id,用来标识一次服务器状态的变更,每次写操作都有事务 id。
epoch:每个 leader 任期的代号。

服务器状态:LOOKING、FOLLOWING、LEADING、OBSERVING

1、第一次启动

  1. 服务器 1 启动,发起一次选举,服务器 1 投自己一票,不够半数以上,选举无法完成,服务器 1 状态保持为 LOOKING。

  2. 服务器 2 启动,重新发起一次选举,服务器 1 和 2 分别投自己一票并交换选票信息,此时服务器 1 发现服务器 2 的 myid 比自己目前投票的(服务器 1)大,服务器 1 更改投票为服务器 2,此时服务器 2 的票数已经超过半数,服务器 2 当选 leader ,此时状态为 LEADING,服务器 1 更改状态为 FOLLOWING。

  3. 服务器 3 启动,重新发起一次选举,此时服务器 1 和 2 已经不是 LOOKING 状态,不会更改选票信息,经过交换选票,服务器 2 为 2 票,服务器 3 为 1 票,此时服务器 3 服从多数,更改投票为服务器 2,并更改状态为 FOLLOWING。

演示 1:服务 1、2、3 stop,服务 1 start,服务 2 start,服务 3 start。服务 2 是 leader。
演示 2:服务 1、2、3 stop,服务 3 start,服务 2 start,服务 1 start。服务 3 是 leader。

2、非第一次启动

1)集群中已经存在 leader

服务器运行期间 follower 无法和 leader 保持连接,follower 试图去选举 leader 时,会被告知当前服务器的 leader 信息,而 follower 仅需要和 leader 建立连接,并进行状态同步即可。

2)集群中不存在 leader

假设 zookeeper 由 3 台服务器组成,sid 分别为 1、2、3,zxid 分别为 8、7、8,此时 sid 为 2 的服务器是 leader。某一时刻 sid 为 2 的服务器出现故障,此时会开始进行 leader 选举。
此时选举 leader 规则:① epoch 大的直接胜出;② epoch相同,事务 id 大的胜出;③ 事务 id 相同,服务器 id 大的胜出。
此时 sid 为 1、3 的投票情况:(epoch=1,zxid=8,sid=1),(epoch=1,zxid=8,sid=3)

演示 1:服务 1、2、3 stop,服务 3 start,服务 2 start,服务 1 start,服务 3 restart。服务 2 是 leader。
演示 2:服务 1、2、3 stop,服务 3 start,服务 2 start,服务 1 start,服务 1 restart,服务 3 restart。服务 1 是 leader。
演示 3:服务 1、2、3 stop,服务 3 start,服务 2 start,服务 1 start,服务 1 restart,客户端发起事务请求,服务 3 restart。服务 2 是 leader。

五、特性

  1. Zookeeper 集群由一个 leader,多个 follower 组成。
  2. Zookeeper 集群只要有半数以上节点存活就能正常服务,所以适合安装奇数台服务器。
  3. 全局数据一致,每个 server 保存一份相同的数据副本,client 无论连接到哪个 server 得到的数据都是一致的。
  4. 更新请求顺序执行,来自同一个 client 的更新请求按其发送顺序依次执行。
  5. 数据更新原子性,一次数据更新要么成功,要么失败。
  6. 实时性,在一定时间范围内,client 能读到最新数据。

六、数据结构

1、数据模型

Zookeeper 数据模型的结构与 Unix 文件系统很类似,整体可以看作一棵树,每个节点称为 Znode,每个 Znode 默认能够存储 1MB 的数据,每个 Znode 都可以通过其路径唯一标识。

数据模型图

2、节点类型

3.5.3 版本之前四种类型:持久节点、持久有序节点、临时节点、临时有序节点。
3.5.3 版本后新增三种类型:容器节点、TTL 节点、TTL 有序节点。

TTL 节点需要在 zoo.cfg 配置 extendedTypesEnabled=true 开启才可使用。

顺序编号说明:创建 Znode 时设置顺序标识,Znode 名称后会附加一个值,顺序号是一个单调递增的计数器,由父节点维护。在分布式系统中,顺序号可以被用于为所有的事件进行全局排序,这样客户端可以通过顺序号推断事件的顺序。

1)、持久节点

PERSISTENT:客户端与 zookeeper 断开连接后,该节点依旧存在。

2)、持久有序节点

PERSISTENT_SEQUENTIAL:客户端与 zookeeper 断开连接后,该节点依旧存在,只是 zookeeper 给该节点名称进行顺序编号。

3)、临时节点

EPHEMERAL:客户端与 zookeeper 断开连接后,该节点被删除。

4)、临时有序节点

EPHEMERAL_SEQUENTIAL:客户端与 zookeeper 断开连接后,该节点被删除,只是 zookeeper 给该节点名称进行顺序编号。

5)、容器节点

CONTAINER:和持久节点表现形式一样,区别是 zookeeper 服务端启动后会去扫描所有容器节点,当发现容器节点的子节点数量为 0 时,会自动删除该节点。

6)、TTL 节点

PERSISTENT_WITH_TTL:指带有存活时间,超过指定时间后如果该节点下面没有子节点就会被自动删除。

7)、TTL 有序节点

PERSISTENT_SEQUENTIAL_WITH_TTL:指带有存活时间,超过指定时间后如果该节点下面没有子节点就会被自动删除,只是 zookeeper 给该节点名称进行顺序编号。

3、节点信息

属性说明
cZxid创建时的事务 id(zxid)
ctime创建时的时间戳
mZxid最后一次更新时的事务 id
mtime最后一次更新时的时间戳
pZxid最后一次更新的子节点的事务 id
cversion子节点的更新次数
dataVersion更新次数
aclVersionACL 的更新次数
ephemeralOwner如果是临时节点,则值为与该节点绑定的 sessionid
如果不是临时节点,则值为 0
dataLength数据的字节数
numChildren子节点数量

七、ACL 权限控制

1、概述

ACL(Access Control List)权限控制由授权策略、授权对象、权限(scheme:id:perm) 3 个方面来标识。

Zookeeper 的权限控制是基于每个节点的,需要对每个节点设置权限。

每个节点支持设置多种权限控制方案和多个权限。

子节点不会继承父节点的权限,客户端无权访问某节点,但可能可以访问它的子节点。

2、授权策略(scheme):采用何种方式授权

world:默认方式,相当于全部都能访问。

auth:代表已经认证通过的用户(通过命令 addauth digest user:pwd 来添加当前上下文的授权用户)。

digest:即用户名:密码这种方式认证,这也是业务系统中最常用的。用 user:pwd 字符串生成一个 base64 的 SHA1 编码的加密串,然后这加密串用来作为 id。

ip:使用客户端的主机 ip 认证。

3、授权对象(id):给谁授权

world:只有一个 id:anyone。例:world:anyone

auth:格式为 auth:user。例:auth:user1

digest:格式为 digest:user:BASE64(SHA1(user:pwd))。例:用户名为 user1、密码为 123 的结果为:digest:user1:Nv3cjIteQ1W3F6fZb+mYaNSs5rc=

ip:通常是一个 ip 或 ip 段。例:ip:192.168.1.21 或 ip:192.168.1.20/25

4、权限(perm):授予什么权限

CREATE(c):创建权限,可以创建子节点

READ(r):读权限,可以读取节点数据及显示子节点列表

WRITE(w):写权限,可以设置节点数据

DELETE(d):删除权限,可以删除子节点

ADMIN(a):管理权限,可以设置节点的权限

5、超级管理员设置

生成超级管理密钥:BASE64(SHA1(user:pwd)),这里以 super:admin 为例。

"-Dzookeeper.DigestAuthenticationProvider.superDigest=super:xQJmxLMiHGwaqBvst5y6rkB6HQs="

修改 zkServer.sh 文件,在对应的位置添加上面的脚本。

超级管理员设置图

重启 zookeeper 服务,启动客户端,输入 addauth digest super:admin 即可进入超级管理员。

八、Watcher 监听器

1、概述

客户端注册监听它关心的节点,当节点发生变化(数据变化、状态变化、节点删除、子节点数量变化)时,Zookeeper 服务端会通知客户端。

监听机制保证 zookeeper 保存的任何数据、任何改变都能快速的响应到监听该节点的应用程序。

2、监听原理

  1. 首先要有一个 main() 线程。
  2. main() 线程中创建 zookeeper 客户端,这时就会创建两个线程,一个负责网络连接通信(connet),一个负责监听(listener)。
  3. 通过 connect 线程将注册的监听事件发送给 zookeeper。
  4. 在 zookeeper 的注册监听器列表中将注册的监听事件添加到列表中。
  5. zookeeper 监听到有数据或路径变化,就会将这个消息发送给 listener 线程。
  6. listener 线程内部调用了 process() 方法。

监听原理图

3、常见监听类型

  1. 监听节点数据的变化
# 在 192.168.1.22 主机上注册监听 /demo-w 节点数据变化
get -w /demo-w
# 在 192.168.1.23 主机上修改 /demo-w 节点的数据
set /demo-w "demo-w-test"
# 观察 192.168.1.22 主机上收到数据变化的监听
WATCHER::
WatchedEvent state:SyncConnected type:NodeDataChanged path:/demo-w
  1. 监听子节点增减的变化
# 在 192.168.1.22 主机上注册监听 /demo-w 节点的子节点变化
ls -w /demo-w
# 在 192.168.1.23 主机 /demo-w 节点上创建子节点
create /demo-w/w1 "w1"
# 观察 192.168.1.22 主机上收到子节点变化的监听
WATCHER::
WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/demo-w

注意:注册一次,只能监听一次;想再次监听,需要再次注册。

九、常用命令

1、服务端命令

命令说明
sh zkServer.sh start启动服务
sh zkServer.sh status查看服务状态
sh zkServer.sh restart重启服务
sh zkServer.sh stop停止服务
sh zkCli.sh启动客户端并连接本地服务端
sh zkCli.sh -server ip:port启动客户端并连接指定服务端

2、客户端命令

命令说明
help显示所有操作命令
ls [-s] [-w] [-R] path显示当前节点的子节点
-s:查看当前节点信息
-w:监听器
-R:递归显示
create [-s] [-e] [-c] [-t ttl] path [data] [acl]创建节点
-s:加序列号
-e:临时节点
-c:容器节
-t:TTL 节点
ttl:过期时间(毫秒)
data:节点数据,默认为 null
acl:权限控制
get [-s] [-w] path获取节点值
-s:节点 信息
-w:监听器
getAllChildrenNumber path获取子节点数量
getEphemerals path获取临时节点列表
set [-s] [-v version] path data设置节点值
-s:节点信息
-v:指定数据版本号修改
version:数据版本号
data:节点值
stat [-w] path显示节点信息
-w:监听器
delete [-v version] path删除节点
-v:指定数据版本号删除
version:数据版本号
deleteall path递归删除节点
addauth scheme auth添加认证用户
eg:addauth digest username:password
getAcl [-s] path查看 ACL 权限
-s:节点信息
setAcl [-s] [-v version] [-R] path acl设置 ACL 权限
-s:节点信息
-v:指定数据版本号设置
version:数据版本号
-R:递归设置
acl:权限格式(scheme:id:perm
listquota path查看节点限额
`setquota -n-b val path`
`delquota [-n-b] path`
`removewatches path [-c-d
`printwatches onoff`
sync path强制同步节点
version查看 zookeeper 版本
history查看历史执行命令
redo cmdno重新执行历史命令
cmdno:历史执行命令编号
close关闭当前会话
connect host:port重新连接指定服务端
quit退出客户端

十、JavaAPI

1、引入 maven 依赖

<!--zookeeper-->
<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.6.3</version>
</dependency>

2、常用 API 说明

1)创建 zookeeper 连接

new ZooKeeper(String connectString, int sessionTimeout, Watcher watcher);

connectString:zookeeper 地址
sessionTimeout:会话超时时间(毫秒)
watcher:注册监听器

2)创建节点

zk.create(String path, byte[] data, List<ACL> acl, CreateMode createMode);
zk.create(String path, byte[] data, List<ACL> acl, CreateMode createMode, Stat stat);
zk.create(String path, byte[] data, List<ACL> acl, CreateMode createMode, Stat stat, long ttl);
zk.create(String path, byte[] data, List<ACL> acl, CreateMode createMode, StringCallback cb, Object ctx);
zk.create(String path, byte[] data, List<ACL> acl, CreateMode createMode, Create2Callback cb, Object ctx);
zk.create(String path, byte[] data, List<ACL> acl, CreateMode createMode, Create2Callback cb, Object ctx, long ttl);

path:节点路径
data:节点值
acl:ACL 权限列表,ZooDefs.Ids 这个类提供了一些静态接口
createMode:节点类型,这是个枚举类 CreateMode
stat:节点信息对象
ttl:节点存活时间(毫秒),节点类型必须为 TTL 类型才可使用
StringCallback:异步回调接口,没有回调节点信息
Create2Callback:异步回调接口,回调节点信息对象
ctx:传递上下文参数

3)设置节点值

zk.setData(String path, byte[] data, int version);
zk.setData(String path, byte[] data, int version,StatCallback cb,Object ctx);

path:节点路径
data:节点值
version:节点当前版本,-1 表示忽略版本验证
StatCallback:异步回调接口,回调节点信息对象
ctx:传递上下文参数

4)查看节点

zk.getData(String path, boolean watch, Stat stat);
zk.getData(String path, Watcher watcher, Stat stat);
zk.getData(String path, boolean watch, DataCallback cb, Object ctx);
zk.getData(String path, Watcher watcher, DataCallback cb, Object ctx);

path:节点路径
watch:是否使用创建连接时注册的监听器
watcher:注册新的监听器
stat:节点信息对象
DataCallback:异步回调接口,回调节点值和节点信息
ctx:传递上下文参数

5)查看子节点

zk.getChildren(String path, boolean watch);
zk.getChildren(String path, Watcher watcher);
zk.getChildren(String path, boolean watch, Stat stat);
zk.getChildren(String path, Watcher watcher, Stat stat);
zk.getChildren(String path, boolean watch, ChildrenCallback cb, Object ctx);
zk.getChildren(String path, Watcher watcher, ChildrenCallback cb, Object ctx);
zk.getChildren(String path, boolean watch, Children2Callback cb, Object ctx);
zk.getChildren(String path, Watcher watcher, Children2Callback cb, Object ctx);

path:节点路径
watch:是否使用创建连接时注册的监听器
watcher:注册新的监听器
stat:节点信息对象
ChildrenCallback:异步回调接口,回调子节点
Children2Callback:异步回调接口,回调子节点和该节点信息
ctx:传递上下文参数

6)查看子节点数量

zk.getAllChildrenNumber(String path);
zk.getAllChildrenNumber(String path, AllChildrenNumberCallback cb, Object ctx);

path:节点路径
AllChildrenNumberCallback:异步回调接口,回调子节点数量
ctx:传递上下文参数

7)检查节点是否存在

zk.exists(String path, boolean watch);
zk.exists(String path, Watcher watcher);
zk.exists(String path, boolean watch,StatCallback cb, Object ctx);
zk.exists(String path, Watcher watcher,StatCallback cb, Object ctx);

path:节点路径
watch:是否使用创建连接时注册的监听器
watcher:注册新的监听器
StatCallback:异步回调接口,回调节点信息对象
ctx:传递上下文参数

8)删除节点

zk.delete(String path, int version);
zk.delete(String path, int version, VoidCallback cb, Object ctx);

path:节点路径
version:节点当前版本,-1 表示忽略版本验证
VoidCallback:异步回调接口
ctx:传递上下文参数

9)查看临时节点

zk.getEphemerals();
zk.getEphemerals(String prefixPath);
zk.getEphemerals(EphemeralsCallback cb, Object ctx);
zk.getEphemerals(String prefixPath, EphemeralsCallback cb, Object ctx);

prefixPath:前缀路径
EphemeralsCallback:异步回调接口,回调临时节点路径
ctx:传递上下文参数

10)添加认证用户

zk.addAuthInfo(String scheme, byte[] auth);
// 使用例子
zk.addAuthInfo("digest", "user1:123".getBytes());

scheme:授权策略
auth:认证内容

11)查看节点 ACL 权限

zk.getACL(String path, Stat stat);
zk.getACL(String path, Stat stat, ACLCallback cb, Object ctx);

path:节点路径
stat:节点信息对象
ACLCallback:异步回调接口,回调 ACL 权限和节点信息
ctx:传递上下文参数

12)设置节点 ACL 权限

zk.setACL(String path, List<ACL> acl, int aclVersion);
zk.setACL(String path, List<ACL> acl, int version, StatCallback cb, Object ctx);

path:节点路径
acl:ACL 权限列表,ZooDefs.Ids 这个类提供了一些静态接口
aclVersion:节点 acl 版本,-1 表示忽略版本验证
version:节点当前版本,-1 表示忽略版本验证
StatCallback:异步回调接口,回调节点信息对象
ctx:传递上下文参数

3、代码演示

详情看代码演示

十一、分布式锁

1、分布式锁定义

进程 1 在使用某资源的时候会先去获得锁,进程 1 获得锁后会对该资源保持独占,这样其他进程就无法访问该资源,进程 1 用完该资源后就将锁释放掉,让其他进程来获得锁,通过这个锁机制就能保证了分布式系统中多个进程能够有序的访问该资源。那么这个分布式环境下的这个锁叫作分布式锁。

2、分布式锁实现分析

  1. 接收到请求后,在 /locks 节点下创建一个临时有序的子节点
  2. 判断自己在 /locks 节点的子节点中是不是最小的节点,是就获得锁,否则对前一个节点进行节点删除的监听
  3. 获得锁处理完业务后,删除当前节点来释放锁,然后后一个节点将收到通知,然后获得锁

分布式锁实现分析图

3、分布式锁实现

/**
 * 使用zookeeper实现分布式锁
 *
 * @author lwf
 */
public class DistributedLock {

    private final ZooKeeper zooKeeper;
    private final CountDownLatch connectLatch = new CountDownLatch(1);
    private final CountDownLatch waitLatch = new CountDownLatch(1);
    /**
     * 根节点路径
     */
    private final String ROOT_PATH = "/locks";
    /**
     * 当前创建节点
     */
    private String currentNode;
    /**
     * 等待释放锁节点路径
     */
    private String waitPath;

    public DistributedLock() {
        // 创建zookeeper连接
        zooKeeper = ZookeeperUtils.getConnection(new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                if (event.getState() == Event.KeeperState.SyncConnected) {
                    // 连接创建成功,唤醒线程
                    connectLatch.countDown();
                }
                if (event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitPath)) {
                    // 删除节点事件并且是等待释放锁节点路径,唤醒线程等待锁线程
                    waitLatch.countDown();
                }
            }
        });
        try {
            // 线程阻塞
            connectLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        try {
            // 判断根节点是否存在
            Stat stat = zooKeeper.exists(ROOT_PATH, false);
            if (stat == null) {
                // 创建根节点
                String rootNode = zooKeeper.create(ROOT_PATH, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                System.out.println("根节点不存在,重新创建:" + rootNode);
            }
        } catch (KeeperException | InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 加锁
     */
    public void lock() {
        try {
            // 创建临时有序的节点
            currentNode = zooKeeper.create(ROOT_PATH + "/seq-", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            // 获取所有子节点列表
            List<String> children = zooKeeper.getChildren(ROOT_PATH, false);
            // 只有一个子节点,只能是当前创建的节点,可以获得锁
            if (children.size() == 1) {
                return;
            }
            // 对所有子节点进行升序排序
            Collections.sort(children);
            // 截取当前节点的名称 /locks/seq-000000001 -> seq-000000001
            String currentChild = currentNode.substring((ROOT_PATH + "/").length());
            // 获取当前节点在子节点列表中的位置
            int currentIndex = children.indexOf(currentChild);
            // 当前节点是子节点列表中最小,可以获得锁
            if (currentIndex == 0) {
                return;
            }
            // 当前节点不是最小,获取前节点路径
            waitPath = ROOT_PATH + "/" + children.get(currentIndex - 1);
            // 在等待释放锁节点注册监听器,监听节点删除时可以获得锁
            zooKeeper.getData(waitPath, true, new Stat());
            // 线程阻塞,等待释放锁
            waitLatch.await();
        } catch (KeeperException | InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 释放锁
     */
    public void unlock() {
        try {
            // 删除当前节点
            zooKeeper.delete(currentNode, -1);
        } catch (InterruptedException | KeeperException e) {
            e.printStackTrace();
        }
    }

}

4、分布式锁测试

1)、创建三个线程同时执行

@Test
public void testDistributedLock() {
    // 创建分布式锁1
    DistributedLock lock1 = new DistributedLock();
    // 创建分布式锁2
    DistributedLock lock2 = new DistributedLock();
    // 创建分布式锁3
    DistributedLock lock3 = new DistributedLock();

    Thread thread1 = new Thread(new Runnable() {
        @Override
        public void run() {
            try {
                lock1.lock();
                System.out.println("线程1-获得锁");
                Thread.sleep(5000);
                lock1.unlock();
                System.out.println("线程1-释放锁");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    });

    Thread thread2 = new Thread(new Runnable() {
        @Override
        public void run() {
            try {
                lock2.lock();
                System.out.println("线程2-获得锁");
                Thread.sleep(5000);
                lock2.unlock();
                System.out.println("线程2-释放锁");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    });

    Thread thread3 = new Thread(new Runnable() {
        @Override
        public void run() {
            try {
                lock3.lock();
                System.out.println("线程3-获得锁");
                Thread.sleep(5000);
                lock3.unlock();
                System.out.println("线程3-释放锁");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    });

    thread1.start();
    thread2.start();
    thread3.start();
    try {
        thread1.join();
        thread2.join();
        thread3.join();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

2)、控制台输出

线程1-获得锁
线程1-释放锁
线程2-获得锁
线程2-释放锁
线程3-获得锁
线程3-释放锁

5、实现方式的对比

问题rediszookeeper
死锁如果没有设置失效时间,解锁之前断开连接,就会导致其他线程无法获得锁。有效的解决死锁问题。
因为获取锁是创建临时节点,临时节点在断开连接时会自动删除,即自动解锁。
失效时间失效时间不好控制。
如果设置失效时间太短,业务逻辑还没处理完就自动解锁,就会产生并发问题;
如果设置失效时间太长,解锁之前断开连接,那么需要等待失效时间才能解锁。
不需要考虑失效时间。
阻塞锁非阻塞锁。
无论获取锁成功或失败都直接返回,需要自己不断去尝试获取锁。
阻塞锁。
如果没有获取到锁,会在创建的临时节点上绑定监听器,线程阻塞等待,一旦节点有变化,恢复等待线程获取到锁。
不可重入一个线程获取到锁后,在解锁之前无法再次获取到锁。
因为使用的 key 已经存在,无法再执行 put 操作。
有效的解决不可重入问题。
在 curator 客户端中封装了一个可重入的锁服务。
性能方面性能较好。需要频繁创建删除节点,性能较差。
实现复杂性实现起来比较方便。实现起来较为复杂。

十二、zkclient & curator

1、zkclient

zkclient 是由 Datameer 的工程师开发的开源客户端,对 zookeeper 的原生 API 进行了包装,实现了超时重连、Watcher 反复注册等功能。

目前运用到的项目有:Dubbo、Kafka、Helix。

Github:https://github.com/sgroschupf/zkclient

Maven 依赖:

<dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.11</version>
</dependency>

1)创建连接

new ZkClient(String serverstring);
new ZkClient(String zkServers, int connectionTimeout);
new ZkClient(String zkServers, int sessionTimeout, int connectionTimeout);
new ZkClient(String zkServers, int sessionTimeout, int connectionTimeout, ZkSerializer zkSerializer);
new ZkClient(String zkServers, int sessionTimeout, int connectionTimeout, ZkSerializer zkSerializer, long operationRetryTimeout);
new ZkClient(IZkConnection connection);
new ZkClient(IZkConnection connection, int connectionTimeout);
new ZkClient(IZkConnection zkConnection, int connectionTimeout, ZkSerializer zkSerializer);
new ZkClient(IZkConnection zkConnection, int connectionTimeout, ZkSerializer zkSerializer, long operationRetryTimeout);

serverstring: zookeeper连接地址。
connectionTimeout:连接超时时间(毫秒)。
sessionTimeout:会话超时时间(毫秒),默认是 30000 毫秒。
zkSerializer:自定义的序列化器。
connection:是 IZkConnection 的接口实现类。
operationRetryTimeout:是在与服务连接断开后,进行重试操作的最长时间。如果断开时间未超过 operationRetryTimeout,则所有操作进行重试。operationRetryTimeout 单位也是毫秒,如果传入小于 0 的值,则一直进行操作重试,直至与服务的连接恢复。

2)创建节点

String create(String path, Object data, CreateMode mode);
String create(String path, Object data, List<ACL> acl, CreateMode mode);
void createPersistent(String path);
void createPersistent(String path, boolean createParents);
void createPersistent(String path, boolean createParents, List<ACL> acl);
void createPersistent(String path, Object data);
void createPersistent(String path, Object data, List<ACL> acl);
String createPersistentSequential(String path, Object data);
String createPersistentSequential(String path, Object data, List<ACL> acl);
void createEphemeral(String path);
void createEphemeral(String path, List<ACL> acl);
void createEphemeral(String path, Object data);
void createEphemeral(String path, Object data, List<ACL> acl);
String createEphemeralSequential(String path, Object data);
String createEphemeralSequential(String path, Object data, List<ACL> acl);

3)设置节点值

void writeData(String path, Object object);
void writeData(String path, Object datat, int expectedVersion);
Stat writeDataReturnStat(String path, Object datat, int expectedVersion)

4)获取节点值

<T extends Object> T readData(String path);
<T extends Object> T readData(String path, Stat stat)
<T extends Object> T readData(String path, boolean returnNullIfPathNotExists);

5)获取子节点

List<String> getChildren(String path);

6)获取子节点数量

int countChildren(String path);

7)判断节点是否存在

boolean exists(String path);

8)删除节点

boolean delete(String path);
boolean delete(String path, int version);
boolean deleteRecursive(String path);

9)监听机制

在原生 API 中,提供了 watcher 的机制监听节点,而 zkClient 将之转换成 Listener 的概念,就是订阅服务端的事件。

// 订阅节点子节点变化事件(创建节点、删除节点、创建子节点、删除子节点)
List<String> subscribeChildChanges(String path, IZkChildListener listener);
// 取消订阅节点子节点变化事件
void unsubscribeChildChanges(String path, IZkChildListener childListener);

// 订阅节点数据变化事件(创建节点、删除节点、节点数据变更)
void subscribeDataChanges(String path, IZkDataListener listener);
// 取消订阅节点数据变化事件
void unsubscribeDataChanges(String path, IZkDataListener dataListener);

// 订阅连接状态变化事件
void subscribeStateChanges(final IZkStateListener listener);
// 取消订阅连接状态变化事件
void unsubscribeStateChanges(IZkStateListener stateListener);

// 取消订阅全部事件
void unsubscribeAll();

2、curator

Curator 是 Netflix 公司开源的一个 zookeeper 客户端,与 zookeeper 的其他客户端相比,curator 的抽象层次更高,简化了 zookeeper 客户端的开发量。现在是 apache 的开源框架,使用 fluent 编程风格实现。

官网:https://curator.apache.org/

Curator 内部实现的几种重试策略:
① ExponentialBackoffRetry:重试指定的次数, 且每一次重试之间停顿的时间逐渐增加.
② RetryNTimes:指定最大重试次数的重试策略
③ RetryOneTime:仅重试一次
④ RetryUntilElapsed:一直重试直到达到规定的时间

Maven 依赖:

<!--基础框架-->
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>5.2.0</version>
</dependency>
<!--高级特性,分布式锁、队列等-->
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>5.2.0</version>
</dependency>
<!--curator客户端重试策略-->
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-client</artifactId>
    <version>5.2.0</version>
</dependency>

1)创建连接,通过工厂类创建

// 简单方式创建实例
CuratorFrameworkFactory.newClient(String connectString, RetryPolicy retryPolicy);
CuratorFrameworkFactory.newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy);
CuratorFrameworkFactory.newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy, ZKClientConfig zkClientConfig);
// 重试策略,初试时间3秒,重试3次
RetryPolicy policy = new ExponentialBackoffRetry(3000, 3);
// 使用Builder创建,提供更多参数控制
CuratorFramework client = CuratorFrameworkFactory.builder()
        .connectString(ZookeeperConstants.connectString)
        .sessionTimeoutMs(ZookeeperConstants.sessionTimeout)
        .connectionTimeoutMs(ZookeeperConstants.connectionTimeout)
        .retryPolicy(policy)
        .build();
// 开启连接
client.start();

2)创建节点

String path = client.create()
        .creatingParentsIfNeeded() // 递归创建所需父节点
        .withMode(CreateMode.PERSISTENT) // 节点类型
        .forPath(ROOT + "/demo1", "demo1".getBytes()); // 目录和值

3)设置节点值

Stat stat = client.setData().withVersion(-1)
        .forPath(ROOT + "/demo1", "data".getBytes()); // 目录和值

4)获取节点值

Stat stat = new Stat();
byte[] data = client.getData().storingStatIn(stat)
        .forPath(ROOT + "/demo1");

5)获取子节点

Stat stat = new Stat();
List<String> children = client.getChildren().storingStatIn(stat)
        .forPath(ROOT);

6)判断节点是否存在

Stat stat = client.checkExists().forPath(ROOT + "/demo1");

7)删除节点

client.delete().guaranteed()  // 强制保证删除
        .deletingChildrenIfNeeded() // 递归删除子节点
        .withVersion(-1) // 指定删除的版本号
        .forPath(ROOT);

8)分布式锁

// 创建分布式锁
InterProcessLock lock = new InterProcessMutex(client, ROOT);
// 获得锁
lock.acquire();
// 释放锁
lock.release();

十三、主流注册中心简单对比

功能ZookeeperEurekaNacosConsul
CAP 理论CPAPCP + APCP
访问协议TCPHTTPHTTP/DNSHTTP/DNS
健康检查Keep AliveClient BeatTCP/HTTP/MySql/Client BeatTCP/HTTP/gRPC/Cmd
负载均衡策略-Ribbon权重/metadata/SelectorFabio
雪崩保护
自动注销实例支持支持支持支持
监听支持支持支持支持支持
多数据中心不支持支持支持支持
跨注册中心同步不支持不支持支持支持
SpringCloud 集成支持支持支持支持
Dubbo 集成支持不支持支持支持
k8s 集成不支持不支持支持支持

CAP 理论是分布式系统中一个很重要的理论,它描述的是一个分布式系统最多只能满足 CAP 中的两个条件,不可能同时满足三个条件。

C(Consistency):强一致性。保证在一定时间内,集群中的各个节点会达到较强的一致性,同时,为了达到这一点,一般会牺牲一点响应时间。而放弃 C 也不意味着放弃一致性,而是放弃强一致性。允许系统内有一定的数据不一致情况的存在。

A (Avalibility):可用性。意味着系统一直处于可用状态。个别节点的故障不会影响整个服务的运作。

P(Partition Tolerance):分区容错性。当系统出现网络分区等情况时,依然能对外提供服务。想到达到这一点,一般来说会把数据复制到多个分区里,来提高分区容错性。这个一般是不会被抛弃的。

0

评论区