Zookeeper-API

1 Zookeeper API

1.1 引入Zookeeper依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.7.1</version>
</dependency>

1.2 日志输出

(1)引入依赖

1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>

(2)定义log4j.properties文件

1
2
3
4
5
6
7
###set log levels###
log4j.rootLogger=info, stdout
###output to the console###
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d{dd/MM/yy HH:mm:ss:SSS z}] %t%5p %c{2}: %m%n

1.3 连接服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class ZkConnUtil{
private static ZooKeeper zookeeper;
private static final CountDownLatch countDownLatch = new CountDownLatch(1);
// 获得zkConn
public static ZooKeeper getZkConn(String zkServer) throws Exception {
zookeeper = new ZooKeeper(zkServer, 30000, new Watcher() {
@Override
public void process(WatchedEvent event) {
Event.KeeperState state = event.getState();
if (Event.KeeperState.SyncConnected == state) {
System.out.println("连接zkServer成功.");
countDownLatch.countDown();
}
}
});
countDownLatch.await();
return zookeeper;
}
public static void main(String[] args) throws Exception {
ZooKeeper zooKeeper = getZkConn("192.168.0.8:2181");
}
}

1.4 创建ZNode

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class CreateZNode {
private ZooKeeper zooKeeper;
public CreateZNode(ZooKeeper zooKeeper) {
this.zooKeeper = zooKeeper;
}
// 同步创建节点
public void createZNodeWithSync() throws Exception {
String znode = zooKeeper.create("/zookeeper-api-sync", "111".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("创建节点成功: "+znode);
}
// 异步创建节点
public void createZNodeWithAsync(){
zooKeeper.create("/zookeeper-api-async","111".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new AsyncCallback.StringCallback(){
@Override
public void processResult(int rc, String path, Object ctx, String name) {
System.out.println("rc: "+rc);
System.out.println("path: "+path);
System.out.println("ctx: "+ctx);
System.out.println("name: "+name);
}
},"create-asyn");
}
public static void main(String[] args) throws Exception {
CreateZNode createZNode = new CreateZNode(ZkConnUtil.getZkConn("192.168.0.8:2181"));
// createZNode.createZNodeWithSync();
createZNode.createZNodeWithAsync();
System.in.read();
}
}

1.5 查询ZNode数据并设置监听

watch是一次性的,不是永久的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class GetZNodeData {
private ZooKeeper zooKeeper;
public GetZNodeData(ZooKeeper zooKeeper) {
this.zooKeeper = zooKeeper;
}
// 同步获取数据
public void getDataSync(){
Stat stat = new Stat();
try {
byte[] data = zooKeeper.getData("/zookeeper-api-sync", new Watcher(){
@Override
public void process(WatchedEvent event) {
// 一旦节点发生变化,则会回调该方法
System.out.println("event: "+event);
}
}, stat);
String s = new String(data);
System.out.println("data: "+s);
System.out.println("stat: "+stat);
} catch (Exception e){
e.printStackTrace();
}
}
// 异步获取数据
public void getDataAsync(){
zooKeeper.getData("/zookeeper-api-async",false, new AsyncCallback.DataCallback() {
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
System.out.println("rc: "+rc);
System.out.println("path: "+path);
System.out.println("ctx: "+ctx);
System.out.println("data: "+new String(data));
System.out.println("stat: "+stat);
}
},"get-data-async");
}
public static void main(String[] args) throws Exception {
GetZNodeData getZNodeData = new GetZNodeData(ZkConnUtil.getZkConn("192.168.0.8:2181"));
getZNodeData.getDataSync();
getZNodeData.getDataAsync();
System.in.read();
}
}

1.6 修改ZNode数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class UpdateZNodeData {
private ZooKeeper zooKeeper;
public UpdateZNodeData(ZooKeeper zooKeeper) {
this.zooKeeper = zooKeeper;
}
// 同步修改节点数据
public void setDataSync() throws Exception {
// 版本号为-1,表示可以直接修改,不用关心版本号
zooKeeper.setData("/zookeeper-api-sync", "222".getBytes(), -1);
}
// 异步修改节点数据
public void setDataAsync(){
zooKeeper.setData("/zookeeper-api-async", "222".getBytes(), -1, new AsyncCallback.StatCallback() {
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
System.out.println("rc: "+rc);
System.out.println("path: "+path);
System.out.println("ctx: "+ctx);
System.out.println("stat: "+stat);
}
},"set-data-async");
}
// 根据版本修改同步节点数据
public void setDataSyncWithVersion() throws Exception {
Stat stat = new Stat();
zooKeeper.getData("/zookeeper-api-sync", false, stat);
zooKeeper.setData("/zookeeper-api-sync", "555".getBytes(),
stat.getVersion());
}
public static void main(String[] args) throws Exception {
UpdateZNodeData updateZNodeData = new UpdateZNodeData(ZkConnUtil.getZkConn("192.168.0.8:2181"));
// updateZNodeData.setDataSync();
// updateZNodeData.setDataAsync();
updateZNodeData.setDataSyncWithVersion();
System.in.read();
}
}

1.7 删除ZNode

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class DeleteZNode {
private ZooKeeper zooKeeper;
public DeleteZNode(ZooKeeper zooKeeper) {
this.zooKeeper = zooKeeper;
}
// 同步删除节点
public void deleteZNodeSync() throws Exception {
zooKeeper.delete("/zookeeper-api-sync",-1);
}
// 异步删除节点
public void deleteZNodeAsync(){
zooKeeper.delete("/zookeeper-api-async", -1, new AsyncCallback.VoidCallback() {
@Override
public void processResult(int rc, String path, Object ctx) {
System.out.println("rc: "+rc);
System.out.println("path: "+path);
System.out.println("ctx: "+ctx);
}
},"delete-znode-async");
}
public static void main(String[] args) throws Exception {
DeleteZNode deleteZNode = new DeleteZNode(ZkConnUtil.getZkConn("192.168.0.8:2181"));
deleteZNode.deleteZNodeAsync();
deleteZNode.deleteZNodeSync();
System.in.read();
}
}

2 Apache Curator

官网:https://curator.apache.org/

最初是由Netflix团队开发的,后来捐献给了Apache,目前是Apache的顶级项目。Curator是对Zookeeper客户端的封装,主要目的就是简化Zookeeper客户端的使用,不需要自己手动处理ConnectionLossException、NodeExistsException等异常,提供了连接重连以及watch永久注册等解决方案。

2.1 引入依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.2.1</version>
</dependency>

2.2 Curator对节点的增删改查

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class CuratorApi {
public static void main(String[] args) {
String connectStr = "192.168.0.8:2181";
CuratorFramework curatorFramework = CuratorFrameworkFactory // fluent
.builder()
.connectionTimeoutMs(20000)
.connectString(connectStr)
.retryPolicy(new ExponentialBackoffRetry(1000, 3)) // 设置客户端的重试策略,每隔10秒中重试一次,最多3次
.build();
curatorFramework.start();
try {
// 创建节点 curator-api
String znode = curatorFramework
.create()
.withMode(CreateMode.PERSISTENT)
.forPath("/curator-api", "666".getBytes());
System.out.println("创建节点成功: " + znode);
// 查询节点 curator-api 数据
byte[] bytes = curatorFramework.getData().forPath(znode);
System.out.println("节点curator-api 数据查询成功: " + new String(bytes));
// 修改节点 curator-api 数据
curatorFramework.setData().forPath(znode, "888".getBytes());
System.out.println("节点curator-api 数据修改成功.");
// 删除节点 curator-api
curatorFramework.delete().forPath(znode);
System.out.println("节点curator-api 已被删除.");
} catch (Exception e) {
e.printStackTrace();
}
}
}

2.3 Curator设置监听

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
public class CuratorWatch {
public static void main(String[] args) {
curatorWatchPersistent();
}
// 一次性监听
private static void curatorWatchOnce() {
String connectStr = "192.168.0.8:2181";
CuratorFramework curatorFramework = CuratorFrameworkFactory
.builder()
.connectionTimeoutMs(20000)
.connectString(connectStr)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
curatorFramework.start();
try {
// 创建节点 curator-watch-once
String znode = curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath("/curatorwatch-once", "".getBytes());
System.out.println("节点创建成功: " + znode);
// 给节点 curator-watch-once 添加一次性watch
curatorFramework.getData().usingWatcher(new CuratorWatcher() {
@Override
public void process(WatchedEvent event) throws Exception {
System.out.println("节点发生变化: " + event);
}
}).forPath(znode);
System.out.println("给节点curator-watch-once 添加watch成功.");
// 让当前进程不结束
System.in.read();
} catch (Exception e) {
e.printStackTrace();
}
}
// 永久监听
private static void curatorWatchPersistent() {
String connectStr = "192.168.0.8:2181";
CuratorFramework curatorFramework = CuratorFrameworkFactory
.builder()
.connectionTimeoutMs(20000)
.connectString(connectStr)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
curatorFramework.start();
try {
// 创建节点 curator-watch-persistent
String znode = curatorFramework.create().forPath("/curator-watchpersistent", "".getBytes());
System.out.println("节点创建成功: " + znode);
// 永久的监听
CuratorCache curatorCache = CuratorCache.build(curatorFramework,znode, CuratorCache.Options.SINGLE_NODE_CACHE);
CuratorCacheListener listener = CuratorCacheListener.builder().forAll(new CuratorCacheListener() {
@Override
public void event(Type type, ChildData oldData, ChildData data){
// 等同于Watch#process回调
System.out.println("节点 "+data.getPath()+" 发生改变, 事件类型为: " + type);
}
}).build();
curatorCache.listenable().addListener(listener);
curatorCache.start();
System.out.println("给节点curator-watch-persistent 添加watch成功.");
// 让当前进程不结束
System.in.read();
} catch (Exception e) {
e.printStackTrace();
}
}
}

Zookeeper-API
http://www.zivjie.cn/2023/03/11/中间件/zookeeper/Zookeeper-API/
作者
Francis
发布于
2023年3月11日
许可协议