1 Zookeeper API
1.1 引入Zookeeper依赖
1 2 3 4 5
| <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.7.1</version> </dependency>
|
1.2 日志输出
(1)引入依赖
1 2 3 4 5 6 7 8 9 10
| <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.25</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency>
|
(2)定义log4j.properties文件
1 2 3 4 5 6 7
| log4j.rootLogger=info, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.Target=System.out log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=[%d{dd/MM/yy HH:mm:ss:SSS z}] %t%5p %c{2}: %m%n
|
1.3 连接服务端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| public class ZkConnUtil{ private static ZooKeeper zookeeper; private static final CountDownLatch countDownLatch = new CountDownLatch(1); public static ZooKeeper getZkConn(String zkServer) throws Exception { zookeeper = new ZooKeeper(zkServer, 30000, new Watcher() { @Override public void process(WatchedEvent event) { Event.KeeperState state = event.getState(); if (Event.KeeperState.SyncConnected == state) { System.out.println("连接zkServer成功."); countDownLatch.countDown(); } } }); countDownLatch.await(); return zookeeper; } public static void main(String[] args) throws Exception { ZooKeeper zooKeeper = getZkConn("192.168.0.8:2181"); } }
|
1.4 创建ZNode
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| public class CreateZNode { private ZooKeeper zooKeeper; public CreateZNode(ZooKeeper zooKeeper) { this.zooKeeper = zooKeeper; } public void createZNodeWithSync() throws Exception { String znode = zooKeeper.create("/zookeeper-api-sync", "111".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println("创建节点成功: "+znode); } public void createZNodeWithAsync(){ zooKeeper.create("/zookeeper-api-async","111".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new AsyncCallback.StringCallback(){ @Override public void processResult(int rc, String path, Object ctx, String name) { System.out.println("rc: "+rc); System.out.println("path: "+path); System.out.println("ctx: "+ctx); System.out.println("name: "+name); } },"create-asyn"); } public static void main(String[] args) throws Exception { CreateZNode createZNode = new CreateZNode(ZkConnUtil.getZkConn("192.168.0.8:2181")); createZNode.createZNodeWithAsync(); System.in.read(); } }
|
1.5 查询ZNode数据并设置监听
watch是一次性的,不是永久的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| public class GetZNodeData { private ZooKeeper zooKeeper; public GetZNodeData(ZooKeeper zooKeeper) { this.zooKeeper = zooKeeper; } public void getDataSync(){ Stat stat = new Stat(); try { byte[] data = zooKeeper.getData("/zookeeper-api-sync", new Watcher(){ @Override public void process(WatchedEvent event) { System.out.println("event: "+event); } }, stat); String s = new String(data); System.out.println("data: "+s); System.out.println("stat: "+stat); } catch (Exception e){ e.printStackTrace(); } } public void getDataAsync(){ zooKeeper.getData("/zookeeper-api-async",false, new AsyncCallback.DataCallback() { @Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { System.out.println("rc: "+rc); System.out.println("path: "+path); System.out.println("ctx: "+ctx); System.out.println("data: "+new String(data)); System.out.println("stat: "+stat); } },"get-data-async"); } public static void main(String[] args) throws Exception { GetZNodeData getZNodeData = new GetZNodeData(ZkConnUtil.getZkConn("192.168.0.8:2181")); getZNodeData.getDataSync(); getZNodeData.getDataAsync(); System.in.read(); } }
|
1.6 修改ZNode数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| public class UpdateZNodeData { private ZooKeeper zooKeeper; public UpdateZNodeData(ZooKeeper zooKeeper) { this.zooKeeper = zooKeeper; } public void setDataSync() throws Exception { zooKeeper.setData("/zookeeper-api-sync", "222".getBytes(), -1); } public void setDataAsync(){ zooKeeper.setData("/zookeeper-api-async", "222".getBytes(), -1, new AsyncCallback.StatCallback() { @Override public void processResult(int rc, String path, Object ctx, Stat stat) { System.out.println("rc: "+rc); System.out.println("path: "+path); System.out.println("ctx: "+ctx); System.out.println("stat: "+stat); } },"set-data-async"); } public void setDataSyncWithVersion() throws Exception { Stat stat = new Stat(); zooKeeper.getData("/zookeeper-api-sync", false, stat); zooKeeper.setData("/zookeeper-api-sync", "555".getBytes(), stat.getVersion()); } public static void main(String[] args) throws Exception { UpdateZNodeData updateZNodeData = new UpdateZNodeData(ZkConnUtil.getZkConn("192.168.0.8:2181")); updateZNodeData.setDataSyncWithVersion(); System.in.read(); } }
|
1.7 删除ZNode
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| public class DeleteZNode { private ZooKeeper zooKeeper; public DeleteZNode(ZooKeeper zooKeeper) { this.zooKeeper = zooKeeper; } public void deleteZNodeSync() throws Exception { zooKeeper.delete("/zookeeper-api-sync",-1); } public void deleteZNodeAsync(){ zooKeeper.delete("/zookeeper-api-async", -1, new AsyncCallback.VoidCallback() { @Override public void processResult(int rc, String path, Object ctx) { System.out.println("rc: "+rc); System.out.println("path: "+path); System.out.println("ctx: "+ctx); } },"delete-znode-async"); } public static void main(String[] args) throws Exception { DeleteZNode deleteZNode = new DeleteZNode(ZkConnUtil.getZkConn("192.168.0.8:2181")); deleteZNode.deleteZNodeAsync(); deleteZNode.deleteZNodeSync(); System.in.read(); } }
|
2 Apache Curator
官网:https://curator.apache.org/
最初是由Netflix团队开发的,后来捐献给了Apache,目前是Apache的顶级项目。Curator是对Zookeeper客户端的封装,主要目的就是简化Zookeeper客户端的使用,不需要自己手动处理ConnectionLossException、NodeExistsException等异常,提供了连接重连以及watch永久注册等解决方案。
2.1 引入依赖
1 2 3 4 5
| <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>5.2.1</version> </dependency>
|
2.2 Curator对节点的增删改查
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| public class CuratorApi { public static void main(String[] args) { String connectStr = "192.168.0.8:2181"; CuratorFramework curatorFramework = CuratorFrameworkFactory .builder() .connectionTimeoutMs(20000) .connectString(connectStr) .retryPolicy(new ExponentialBackoffRetry(1000, 3)) .build(); curatorFramework.start(); try { String znode = curatorFramework .create() .withMode(CreateMode.PERSISTENT) .forPath("/curator-api", "666".getBytes()); System.out.println("创建节点成功: " + znode); byte[] bytes = curatorFramework.getData().forPath(znode); System.out.println("节点curator-api 数据查询成功: " + new String(bytes)); curatorFramework.setData().forPath(znode, "888".getBytes()); System.out.println("节点curator-api 数据修改成功."); curatorFramework.delete().forPath(znode); System.out.println("节点curator-api 已被删除."); } catch (Exception e) { e.printStackTrace(); } } }
|
2.3 Curator设置监听
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
| public class CuratorWatch { public static void main(String[] args) { curatorWatchPersistent(); } private static void curatorWatchOnce() { String connectStr = "192.168.0.8:2181"; CuratorFramework curatorFramework = CuratorFrameworkFactory .builder() .connectionTimeoutMs(20000) .connectString(connectStr) .retryPolicy(new ExponentialBackoffRetry(1000, 3)) .build(); curatorFramework.start(); try { String znode = curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath("/curatorwatch-once", "".getBytes()); System.out.println("节点创建成功: " + znode); curatorFramework.getData().usingWatcher(new CuratorWatcher() { @Override public void process(WatchedEvent event) throws Exception { System.out.println("节点发生变化: " + event); } }).forPath(znode); System.out.println("给节点curator-watch-once 添加watch成功."); System.in.read(); } catch (Exception e) { e.printStackTrace(); } } private static void curatorWatchPersistent() { String connectStr = "192.168.0.8:2181"; CuratorFramework curatorFramework = CuratorFrameworkFactory .builder() .connectionTimeoutMs(20000) .connectString(connectStr) .retryPolicy(new ExponentialBackoffRetry(1000, 3)) .build(); curatorFramework.start(); try { String znode = curatorFramework.create().forPath("/curator-watchpersistent", "".getBytes()); System.out.println("节点创建成功: " + znode); CuratorCache curatorCache = CuratorCache.build(curatorFramework,znode, CuratorCache.Options.SINGLE_NODE_CACHE); CuratorCacheListener listener = CuratorCacheListener.builder().forAll(new CuratorCacheListener() { @Override public void event(Type type, ChildData oldData, ChildData data){ System.out.println("节点 "+data.getPath()+" 发生改变, 事件类型为: " + type); } }).build(); curatorCache.listenable().addListener(listener); curatorCache.start(); System.out.println("给节点curator-watch-persistent 添加watch成功."); System.in.read(); } catch (Exception e) { e.printStackTrace(); } } }
|