多应用+插件架构,代码干净,二开方便,首家独创一键云编译技术,文档视频完善,免费商用码云13.8K 广告
[TOC] # 服务器上下线动态感知 ## 需求描述 某分布式系统中,主节点可以有多台,可以动态上下线 任意一台客户端都能实时感知到主节点服务器的上下线 ## 设计思路 ![](https://box.kancloud.cn/1b8395769c8511e98364098e7d3abe68_928x376.png) ![](https://box.kancloud.cn/466142d55436101464a9747287aa4289_632x627.png) ## 代码开发 ### 服务端实现 ~~~ package hello; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import java.io.IOException; import java.util.concurrent.CountDownLatch; public class DistributedServer { private static final String CONNECT_STRING = "192.168.33.12:2181,192.168.3.33:2181"; //超过2秒就认为挂了 private static final int SESSION_TIME_OUT = 2000; //父节点 private static final String PARENT_NODE = "/servers"; private ZooKeeper zk = null; //latch就相当于一个对象,当latch.await()方法执行时,线程就会等待 //当latch的count减为0的时候,将会唤醒等待的线程 //让主线程阻塞掉 private CountDownLatch countDownLatch = new CountDownLatch(1); //创建到zk客户端的连接 private void getConnect() throws IOException, InterruptedException { //一new完就往下走,但是这时候客户端还没完成连接,所以我们要等他创建好 //一旦成功握手这边的process就会回调一次 zk = new ZooKeeper(CONNECT_STRING, SESSION_TIME_OUT, new Watcher() { public void process(WatchedEvent watchedEvent) { //SyncConnected同步连接 //回调了并且事件等于连接成功 if (watchedEvent.getState() == Event.KeeperState.SyncConnected) { //把计数减少,然后主线程就可以往下走了 countDownLatch.countDown(); } //收到事件通知后的回调函数(应该是我们自己的事件处理逻辑) System.out.println(watchedEvent.getType() + "---" + watchedEvent.getPath()); try { //再次注册监听,监听/下面的 zk.getChildren("/", true); } catch (Exception e) { e.printStackTrace(); } } }); countDownLatch.await(); } //向zk集群注册服务信息 private void registerServer(String hostname) throws KeeperException, InterruptedException { //判断这个父节点是否存在 Stat exists = zk.exists(PARENT_NODE, false); /* * Ids.OPEN_ACL_UNSAFE 创建开放节点,允许任意操作 * Ids.READ_ACL_UNSAFE 创建只读节点 * Ids.CREATOR_ALL_AC /创建者有全部权限 * */ if (exists == null) { //如果这个父节点/servers不存在就把他创建出来,父节点需要持久的,其他注册过来的就用短暂的 zk.create(PARENT_NODE, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } //创建子节点,并且这个子节点后面是跟序号的,在子节点的名称后面加上一串数字后缀,避免冲突,而且子节点是短暂的 String path = zk.create(PARENT_NODE + "/server", hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); //打印下表示这台服务器上线了 System.out.println(hostname + "is online..." + path); } //业务功能 public void handleBussiness(String hostname) throws InterruptedException { //这边业务就是打印一句话,那个ip开始工作了 System.out.println(hostname + "start working ..."); //让主线程sleep Thread.sleep(Long.MAX_VALUE); } public static void main(String[] args) throws IOException, InterruptedException, KeeperException { DistributedServer server = new DistributedServer(); //获取zk连接 server.getConnect(); //利用zk连接注册服务器信息(主机名),这边参数可能用args[0]来替代 server.registerServer("192.168.3.33"); //启动业务功能 server.handleBussiness("192.168.3.33"); } } ~~~ ### 客户端实现 ~~~ package hello; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; public class DistributedClient { private static final String connectString = "192.168.33.12:2181,192.168.3.33:2181"; //超过2秒就认为挂了 private static final int sessionTimeOut = 2000; //父节点 private static final String parentNode = "/servers"; //提供给各业务线程使用的服务器列表 private volatile List<String> serverList; private ZooKeeper zk = null; //latch就相当于一个对象,当latch.await()方法执行时,线程就会等待 //当latch的count减为0的时候,将会唤醒等待的线程 //让主线程阻塞掉 private CountDownLatch countDownLatch = new CountDownLatch(1); //创建到zk的客户端连接 public void getConnect() throws IOException, InterruptedException { //一new完就往下走,但是这时候客户端还没完成连接,所以我们要等他创建好 //一旦成功握手这边的process就会回调一次 zk = new ZooKeeper(connectString, sessionTimeOut, new Watcher() { public void process(WatchedEvent watchedEvent) { //SyncConnected同步连接 //回调了并且事件等于连接成功 if (watchedEvent.getState() == Event.KeeperState.SyncConnected) { //把计数减少,然后主线程就可以往下走了 countDownLatch.countDown(); } //收到事件通知后的回调函数(应该是我们自己的事件处理逻辑) System.out.println(watchedEvent.getType() + "---" + watchedEvent.getPath()); try { //重新更新服务器列表,并且注册了监听 getServerList(); } catch (Exception e) { e.printStackTrace(); } } }); countDownLatch.await(); } //获取服务器信息列表 private void getServerList() throws Exception { //获取服务器子节点信息,并且对父节点进行监听 List<String> children = zk.getChildren(parentNode, true); //先创建一个局部的list来存服务器信息 List<String> servers = new ArrayList<String>(); for (String child : children) { //child只是子节点的节点名,我们获取数据要给全路径 //结果是服务器的名字 byte[] data = zk.getData(parentNode + "/" + child, false, null); servers.add(new String(data)); } //把servers赋值给成员变量serverList,已提供给各业务线程使用 //避免别的线程正在看的时候,这边正在写,所以弄个赋值 serverList = servers; //打印服务器列表 System.out.println(serverList); } //业务线程 public void handleBussiness() throws InterruptedException { //这边业务就是打印一句话,那个ip开始工作了 System.out.println("client start working ..."); //让主线程sleep Thread.sleep(Long.MAX_VALUE); } public static void main(String[] args) throws Exception { //获取zk连接 DistributedClient client = new DistributedClient(); client.getConnect(); //获取servers的子节点信息(并监听),从中获取服务器信息列表 client.getServerList(); //业务线程启动 client.handleBussiness(); } } ~~~ 然后可以打成jar包放到别的电脑上