1. 下载ZooKeeper。官网:
下载后解压,假定zookeeper程序目录为/home/test/zookeeper,为陈述方便此目录记为 $ZOOKEEPER_HOME
可以直接启动:
[bin]$./zkServer.sh startZooKeeper JMX enabled by defaultUsing config: /Users/beef_in_jp/Documents/ebuyProjects/courier/tool/zookeeper-3.4.7/bin/../conf/zoo.cfgStarting zookeeper ... STARTED
没有配置过多服务器的话,此时是standalone模式:
[bin]$./zkServer.sh statusZooKeeper JMX enabled by defaultUsing config: /Users/beef_in_jp/Documents/ebuyProjects/courier/tool/zookeeper-3.4.7/bin/../conf/zoo.cfgMode: standalone
默认的监听端口:
[bin]$netstat -an | grep LISTENtcp46 0 0 *.2181 *.* LISTEN
2. Java代码测试Watcher
2.1 代码(编译需引入这些jar包:jline-0.9.94.jar,log4j-1.2.16.jar,slf4j-api-1.6.1.jar,slf4j-log4j12-1.6.1.jar,zookeeper-3.4.7.jar):
package com.ebuy.courier.testzookeeper.client;import org.apache.log4j.Logger;import org.apache.zookeeper.*;import org.apache.zookeeper.data.Stat;import com.beef.util.HexUtil;import java.io.IOException;import java.util.List;import java.util.UUID;import java.util.concurrent.atomic.AtomicBoolean;public class TestZooKeeperWatcher implements Runnable { private final static Logger logger = Logger.getLogger(TestZooKeeperWatcher.class); private String _pathToWatch; private ZooKeeper _zooKeeper; private Watcher _watcher; private AsyncCallback.StatCallback _existsStatCallback; private AsyncCallback.ChildrenCallback _childrenCallback; private AtomicBoolean _runFlg = new AtomicBoolean(false); private String _threadId; public static void main(String[] args) { try { //{host}:{port},{host}:{port},{host}:{port} String connectString = null; if(args.length > 0) { connectString = args[0]; } else { //default test connectString = "127.0.0.1:2181"; } TestZooKeeperWatcher watcher = new TestZooKeeperWatcher(connectString, "/test"); watcher.run(); } catch(Throwable e) { logger.error(null, e); } } public TestZooKeeperWatcher(String connectString, String pathToWatch) throws IOException, KeeperException, InterruptedException { int sessionTimeout = 30*1000; _pathToWatch = pathToWatch; _watcher = new MyWatcher(); _zooKeeper = new ZooKeeper( connectString, sessionTimeout, _watcher ); _existsStatCallback = new MyExistsStatCallBack(); _childrenCallback = new MyChildrenCallback(); UUID uuid = UUID.randomUUID(); _threadId = HexUtil.toHexString(uuid.getMostSignificantBits()) + HexUtil.toHexString(uuid.getLeastSignificantBits()); } @Override public void run() { _runFlg.set(true); logger.info("run() start -----> " + _threadId); try { synchronized (this) { while (_runFlg.get()) { logger.info("run() wait..."); wait(); } } } catch (InterruptedException e) { logger.info("TestZooKeeperWatcher interrupted"); } try { _zooKeeper.close(); logger.info("_zooKeeper closed"); } catch (Throwable e) { e.printStackTrace(); } logger.info("run() end -----> " + _threadId); } public void stop() { synchronized (this) { _runFlg.set(false); notifyAll(); } } private class MyWatcher implements Watcher { @Override public void process(WatchedEvent event) { try { logger.debug("process() event." + " type:" + event.getType() + " state:" + event.getState() + " path:" + event.getPath() ); Event.EventType eventType = event.getType(); Event.KeeperState keeperState = event.getState(); String path = event.getPath(); if(eventType == Event.EventType.None) { if(keeperState == Event.KeeperState.Expired) { stop(); } else if(keeperState == Event.KeeperState.SyncConnected) { //_zooKeeper.exists("/", _watcher); //_zooKeeper.exists(_pathToWatch, true, _existsStatCallback, null); _zooKeeper.exists(_pathToWatch, true); logger.debug("add async callback of exists to " + _pathToWatch); // _zooKeeper.getChildren(_pathToWatch, true, _childrenCallback, null); _zooKeeper.getChildren(_pathToWatch, true); logger.debug("add async callback of getChildren to " + _pathToWatch); } } else { //Node changed //_zooKeeper.exists(event.getPath(), ); if(eventType == Event.EventType.NodeCreated) { } else if(eventType == Event.EventType.NodeChildrenChanged) { } else if(eventType == Event.EventType.NodeDataChanged) { } else if(eventType == Event.EventType.NodeDeleted) { } } } catch(Throwable e) { logger.error(null, e); } } } private class MyExistsStatCallBack implements AsyncCallback.StatCallback { @Override public void processResult(int rc, String path, Object ctx, Stat stat) { logger.debug("MyExistsStatCallBack processResult()" + " rc:" + rc + " path:" + path ); if(rc == KeeperException.Code.OK.intValue()) { } else if(rc == KeeperException.Code.NONODE.intValue()) { } else if(rc == KeeperException.Code.SESSIONEXPIRED.intValue()) { //stop the main loop stop(); } else { } } } private class MyChildrenCallback implements AsyncCallback.ChildrenCallback { @Override public void processResult(int rc, String path, Object ctx, Listchildren) { logger.debug("MyChildrenCallback processResult()" + " rc:" + rc + " path:" + path + " children:[" + toCommaStrings(children) + "]" ); } } private static String toCommaStrings(List list) { StringBuilder sb = new StringBuilder(); int size = list.size(); for(int i = 0; i < size; i++) { if(i != 0) { sb.append(","); } sb.append(list.get(i)); } return sb.toString(); } }
2.2 测试自动重连
zookeeper启动后,停止,再启动,测试代码的日志:
2015-12-30 09:20:25,684 INFO (org.apache.zookeeper.ZooKeeper:100) - Client environment:zookeeper.version=3.4.7-1713338, built on 11/09/2015 04:32 GMT2015-12-30 09:20:25,686 INFO (org.apache.zookeeper.ZooKeeper:100) - Client environment:host.name=10.10.10.2422015-12-30 09:20:25,686 INFO (org.apache.zookeeper.ZooKeeper:100) - Client environment:java.version=1.6.0_652015-12-30 09:20:25,686 INFO (org.apache.zookeeper.ZooKeeper:100) - Client environment:java.vendor=Apple Inc.2015-12-30 09:20:25,689 INFO (org.apache.zookeeper.ZooKeeper:100) - Client environment:java.home=/System/Library/Java/JavaVirtualMachines/1.6.0.jdk/Contents/Home2015-12-30 09:20:25,689 INFO (org.apache.zookeeper.ZooKeeper:100) - Client environment:java.class.path=/Users/beef_in_jp/Documents/ebuyProjects/courier/srcCourier/TestZooKeeperClient/bin:/Users/beef_in_jp/Documents/ebuyProjects/courier/srcCourier/TestZooKeeperClient/lib/log4j-1.2.16.jar:/Users/beef_in_jp/Documents/ebuyProjects/courier/srcCourier/TestZooKeeperClient/lib/zookeeper-3.4.7.jar:/Users/beef_in_jp/Documents/ebuyProjects/courier/srcCourier/TestZooKeeperClient/lib/jline-0.9.94.jar:/Users/beef_in_jp/Documents/ebuyProjects/courier/srcCourier/TestZooKeeperClient/lib/slf4j-api-1.6.1.jar:/Users/beef_in_jp/Documents/ebuyProjects/courier/srcCourier/TestZooKeeperClient/lib/slf4j-log4j12-1.6.1.jar2015-12-30 09:20:25,689 INFO (org.apache.zookeeper.ZooKeeper:100) - Client environment:java.library.path=.:/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java2015-12-30 09:20:25,689 INFO (org.apache.zookeeper.ZooKeeper:100) - Client environment:java.io.tmpdir=/var/folders/c1/r1htsxps7r5f0rgjysszxd7r0000gn/T/2015-12-30 09:20:25,689 INFO (org.apache.zookeeper.ZooKeeper:100) - Client environment:java.compiler=2015-12-30 09:20:25,689 INFO (org.apache.zookeeper.ZooKeeper:100) - Client environment:os.name=Mac OS X2015-12-30 09:20:25,689 INFO (org.apache.zookeeper.ZooKeeper:100) - Client environment:os.arch=x86_642015-12-30 09:20:25,689 INFO (org.apache.zookeeper.ZooKeeper:100) - Client environment:os.version=10.10.52015-12-30 09:20:25,690 INFO (org.apache.zookeeper.ZooKeeper:100) - Client environment:user.name=beef_in_jp2015-12-30 09:20:25,690 INFO (org.apache.zookeeper.ZooKeeper:100) - Client environment:user.home=/Users/beef_in_jp2015-12-30 09:20:25,690 INFO (org.apache.zookeeper.ZooKeeper:100) - Client environment:user.dir=/Users/beef_in_jp/Documents/ebuyProjects/courier/srcCourier/TestZooKeeperClient2015-12-30 09:20:25,691 INFO (org.apache.zookeeper.ZooKeeper:438) - Initiating client connection, connectString=127.0.0.1:2181 sessionTimeout=30000 watcher=com.ebuy.courier.testzookeeper.client.TestZooKeeperWatcher$MyWatcher@2c641e9a2015-12-30 09:20:25,720 DEBUG (com.ebuy.courier.testzookeeper.client.TestZooKeeperWatcher:52) - run() wait...2015-12-30 09:20:25,726 INFO (org.apache.zookeeper.ClientCnxn:1032) - Opening socket connection to server 127.0.0.1/127.0.0.1:2181. Will not attempt to authenticate using SASL (java.lang.SecurityException: 无法定位登录配置)2015-12-30 09:20:25,738 INFO (org.apache.zookeeper.ClientCnxn:876) - Socket connection established to 127.0.0.1/127.0.0.1:2181, initiating session2015-12-30 09:20:25,752 INFO (org.apache.zookeeper.ClientCnxn:1299) - Session establishment complete on server 127.0.0.1/127.0.0.1:2181, sessionid = 0x151f0701a870001, negotiated timeout = 300002015-12-30 09:20:25,754 DEBUG (com.ebuy.courier.testzookeeper.client.TestZooKeeperWatcher:78) - process() event. type:None state:SyncConnected path:null2015-12-30 10:41:41,124 INFO (org.apache.zookeeper.ClientCnxn:1158) - Unable to read additional data from server sessionid 0x151f0701a870001, likely server has closed socket, closing socket connection and attempting reconnect2015-12-30 10:41:41,227 DEBUG (com.ebuy.courier.testzookeeper.client.TestZooKeeperWatcher:78) - process() event. type:None state:Disconnected path:null2015-12-30 10:41:42,255 INFO (org.apache.zookeeper.ClientCnxn:1032) - Opening socket connection to server 127.0.0.1/127.0.0.1:2181. Will not attempt to authenticate using SASL (java.lang.SecurityException: 无法定位登录配置)2015-12-30 10:41:42,259 WARN (org.apache.zookeeper.ClientCnxn:1162) - Session 0x151f0701a870001 for server null, unexpected error, closing socket connection and attempting reconnectjava.net.ConnectException: Connection refused at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:599) at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361) at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)2015-12-30 10:41:44,206 INFO (org.apache.zookeeper.ClientCnxn:1032) - Opening socket connection to server 127.0.0.1/127.0.0.1:2181. Will not attempt to authenticate using SASL (java.lang.SecurityException: 无法定位登录配置)2015-12-30 10:41:44,207 WARN (org.apache.zookeeper.ClientCnxn:1162) - Session 0x151f0701a870001 for server null, unexpected error, closing socket connection and attempting reconnectjava.net.ConnectException: Connection refused at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:599) at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361) at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)2015-12-30 10:41:46,023 INFO (org.apache.zookeeper.ClientCnxn:1032) - Opening socket connection to server 127.0.0.1/127.0.0.1:2181. Will not attempt to authenticate using SASL (java.lang.SecurityException: 无法定位登录配置)2015-12-30 10:41:46,024 WARN (org.apache.zookeeper.ClientCnxn:1162) - Session 0x151f0701a870001 for server null, unexpected error, closing socket connection and attempting reconnectjava.net.ConnectException: Connection refused at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:599) at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361) at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)2015-12-30 10:41:47,441 INFO (org.apache.zookeeper.ClientCnxn:1032) - Opening socket connection to server 127.0.0.1/127.0.0.1:2181. Will not attempt to authenticate using SASL (java.lang.SecurityException: 无法定位登录配置)2015-12-30 10:41:47,441 WARN (org.apache.zookeeper.ClientCnxn:1162) - Session 0x151f0701a870001 for server null, unexpected error, closing socket connection and attempting reconnectjava.net.ConnectException: Connection refused at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:599) at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361) at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)2015-12-30 10:41:49,454 INFO (org.apache.zookeeper.ClientCnxn:1032) - Opening socket connection to server 127.0.0.1/127.0.0.1:2181. Will not attempt to authenticate using SASL (java.lang.SecurityException: 无法定位登录配置)2015-12-30 10:41:49,455 WARN (org.apache.zookeeper.ClientCnxn:1162) - Session 0x151f0701a870001 for server null, unexpected error, closing socket connection and attempting reconnectjava.net.ConnectException: Connection refused at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:599) at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361) at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)2015-12-30 10:41:51,245 INFO (org.apache.zookeeper.ClientCnxn:1032) - Opening socket connection to server 127.0.0.1/127.0.0.1:2181. Will not attempt to authenticate using SASL (java.lang.SecurityException: 无法定位登录配置)2015-12-30 10:41:51,246 WARN (org.apache.zookeeper.ClientCnxn:1162) - Session 0x151f0701a870001 for server null, unexpected error, closing socket connection and attempting reconnectjava.net.ConnectException: Connection refused at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:599) at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361) at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)2015-12-30 10:41:52,944 INFO (org.apache.zookeeper.ClientCnxn:1032) - Opening socket connection to server 127.0.0.1/127.0.0.1:2181. Will not attempt to authenticate using SASL (java.lang.SecurityException: 无法定位登录配置)2015-12-30 10:41:52,945 WARN (org.apache.zookeeper.ClientCnxn:1162) - Session 0x151f0701a870001 for server null, unexpected error, closing socket connection and attempting reconnectjava.net.ConnectException: Connection refused at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:599) at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361) at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)2015-12-30 10:41:54,048 INFO (org.apache.zookeeper.ClientCnxn:1032) - Opening socket connection to server 127.0.0.1/127.0.0.1:2181. Will not attempt to authenticate using SASL (java.lang.SecurityException: 无法定位登录配置)2015-12-30 10:41:54,049 WARN (org.apache.zookeeper.ClientCnxn:1162) - Session 0x151f0701a870001 for server null, unexpected error, closing socket connection and attempting reconnectjava.net.ConnectException: Connection refused at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:599) at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361) at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)2015-12-30 10:41:55,816 INFO (org.apache.zookeeper.ClientCnxn:1032) - Opening socket connection to server 127.0.0.1/127.0.0.1:2181. Will not attempt to authenticate using SASL (java.lang.SecurityException: 无法定位登录配置)2015-12-30 10:41:55,817 WARN (org.apache.zookeeper.ClientCnxn:1162) - Session 0x151f0701a870001 for server null, unexpected error, closing socket connection and attempting reconnectjava.net.ConnectException: Connection refused at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:599) at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361) at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)2015-12-30 10:41:57,349 INFO (org.apache.zookeeper.ClientCnxn:1032) - Opening socket connection to server 127.0.0.1/127.0.0.1:2181. Will not attempt to authenticate using SASL (java.lang.SecurityException: 无法定位登录配置)2015-12-30 10:41:57,349 WARN (org.apache.zookeeper.ClientCnxn:1162) - Session 0x151f0701a870001 for server null, unexpected error, closing socket connection and attempting reconnectjava.net.ConnectException: Connection refused at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:599) at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361) at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)2015-12-30 10:41:58,758 INFO (org.apache.zookeeper.ClientCnxn:1032) - Opening socket connection to server 127.0.0.1/127.0.0.1:2181. Will not attempt to authenticate using SASL (java.lang.SecurityException: 无法定位登录配置)2015-12-30 10:41:58,758 INFO (org.apache.zookeeper.ClientCnxn:876) - Socket connection established to 127.0.0.1/127.0.0.1:2181, initiating session2015-12-30 10:41:58,776 INFO (org.apache.zookeeper.ClientCnxn:1299) - Session establishment complete on server 127.0.0.1/127.0.0.1:2181, sessionid = 0x151f0701a870001, negotiated timeout = 300002015-12-30 10:41:58,776 DEBUG (com.ebuy.courier.testzookeeper.client.TestZooKeeperWatcher:78) - process() event. type:None state:SyncConnected path:null
可以看到,zookeeper的客户端自带了自动重连功能。
在连接成功时,会触发SyncConnected事件,日志:process() event. type:None state:SyncConnected path:null
在连接断开时,会触发Disconnected事件,日志:process() event. type:None state:Disconnected path:null
2.3 测试节点创建、变更内容、添加子节点,变更自节点内容,删除子节点,删除父节点
zookeeper客户端启动:
[bin]$./zkCli.sh -server 127.0.0.1:2181
经过测试,ZooKeeper的exists, getChildren调用后,只会触发一次。
exists只在指定的路径触发。
getChildren只触发子节点的增加,子节点的内容变更和删除不会触发。