[TOC]
# Zookeeper命令行以及API使用
## 1 Zookeeper命令行操作
### 1 客户端连接
~~~
运行 zkCli.sh –server <ip>进入命令行工具
~~~
![](https://box.kancloud.cn/a089d9212a512f1370a29a0e7e85255c_213x229.png)
### 2 查看znode路径
~~~
ls /mygirls
~~~
### 3 获取znode数据
~~~
get /mygirls
~~~
### 4 监听znode事件
~~~
ls /mygirls watch ## 就对一个节点的子节点变化事件注册了监听
get /mygirls watch ## 就对一个节点的数据内容变化事件注册了监听
~~~
> 注意: 监听器只生效一次
> 监听器的工作机制,其实是在客户端会专门创建一个监听线程,在本机的一个端口上等待zk集群发送过来事件
## 2 Zookeeper 客户端API
### 1 基本使用
> org.apache.zookeeper.Zookeeper是客户端入口主类,负责建立与server的会话
> 它提供以下几类主要方法 :
| 功能 | 描述 |
| --- | --- |
| create | 在本地目录树中创建一个节点 |
| delete | 删除一个节点 |
| exists | 测试本地是否存在目标节点 |
| get/set data | 从目标节点上读取 / 写数据 |
| get/set ACL| | 获取 / 设置目标节点访问控制列表信息 |
| get children | 检索一个子节点上的列表 |
| sync | 等待要被传送的数据 |
### 2 增删改查znode数据
~~~
public class SimpleDemo {
// 会话超时时间,设置为与系统默认时间一致
private static final int SESSION_TIMEOUT = 30000;
// 创建 ZooKeeper 实例
ZooKeeper zk;
// 创建 Watcher 实例
Watcher wh = new Watcher() {
public void process(org.apache.zookeeper.WatchedEvent event)
{
System.out.println(event.toString());
}
};
// 初始化 ZooKeeper 实例
private void createZKInstance() throws IOException
{
zk = new ZooKeeper("weekend01:2181", SimpleDemo.SESSION_TIMEOUT, this.wh);
}
private void ZKOperations() throws IOException, InterruptedException, KeeperException
{
System.out.println("/n1. 创建 ZooKeeper 节点 (znode : zoo2, 数据: myData2 ,权限: OPEN_ACL_UNSAFE ,节点类型: Persistent");
zk.create("/zoo2", "myData2".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("/n2. 查看是否创建成功: ");
System.out.println(new String(zk.getData("/zoo2", false, null)));
System.out.println("/n3. 修改节点数据 ");
zk.setData("/zoo2", "shenlan211314".getBytes(), -1);
System.out.println("/n4. 查看是否修改成功: ");
System.out.println(new String(zk.getData("/zoo2", false, null)));
System.out.println("/n5. 删除节点 ");
zk.delete("/zoo2", -1);
System.out.println("/n6. 查看节点是否被删除: ");
System.out.println(" 节点状态: [" + zk.exists("/zoo2", false) + "]");
}
private void ZKClose() throws InterruptedException
{
zk.close();
}
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
SimpleDemo dm = new SimpleDemo();
dm.createZKInstance();
dm.ZKOperations();
dm.ZKClose();
}
}
~~~
### 3 监听znode
> Zookeeper的监听器工作机制
![](https://box.kancloud.cn/0e85360b0f0cf0ea8f126251398d2fd0_528x142.png)
> 监听器是一个接口,我们的代码中可以实现Wather这个接口,实现其中的process方法,方法中即我们自己的业务逻辑
![](https://box.kancloud.cn/6a3770fc843e7d1cdce2a7f54b87df37_431x216.png)
> 监听器的注册是在获取数据的操作中实现:
> getData(path,watch?)监听的事件是:节点数据变化事件
> getChildren(path,watch?)监听的事件是:节点下的子节点增减变化事件
## 3 Zookeeper 应用案例
### 1 案例1——服务器上下线动态感知
#### 1.1 需求描述
> 某分布式系统中,主节点可以有多台,可以动态上下线
> 任意一台客户端都能实时感知到主节点服务器的上下线
#### 1.2 设计思路
![](https://box.kancloud.cn/d289dfceb651c9edf2676e24afc99de3_349x319.png)
#### 1.3 代码开发
> 1、客户端实现
~~~
public class AppClient {
private String groupNode = "sgroup";
private ZooKeeper zk;
private Stat stat = new Stat();
private volatile List<String> serverList;
/**
* 连接zookeeper
*/
public void connectZookeeper() throws Exception {
zk
= new ZooKeeper("localhost:4180,localhost:4181,localhost:4182", 5000, new Watcher() {
public void process(WatchedEvent event) {
// 如果发生了"/sgroup"节点下的子节点变化事件, 更新server列表, 并重新注册监听
if (event.getType() == EventType.NodeChildrenChanged
&& ("/" + groupNode).equals(event.getPath())) {
try {
updateServerList();
} catch (Exception e) {
e.printStackTrace();
}
}
}
});
updateServerList();
}
/**
* 更新server列表
*/
private void updateServerList() throws Exception {
List<String> newServerList = new ArrayList<String>();
// 获取并监听groupNode的子节点变化
// watch参数为true, 表示监听子节点变化事件.
// 每次都需要重新注册监听, 因为一次注册, 只能监听一次事件, 如果还想继续保持监听, 必须重新注册
List<String> subList = zk.getChildren("/" + groupNode, true);
for (String subNode : subList) {
// 获取每个子节点下关联的server地址
byte[] data = zk.getData("/" + groupNode + "/" + subNode, false, stat);
newServerList.add(new String(data, "utf-8"));
}
// 替换server列表
serverList = newServerList;
System.out.println("server list updated: " + serverList);
}
/**
* client的工作逻辑写在这个方法中
* 此处不做任何处理, 只让client sleep
*/
public void handle() throws InterruptedException {
Thread.sleep(Long.MAX_VALUE);
}
public static void main(String[] args) throws Exception {
AppClient ac = new AppClient();
ac.connectZookeeper();
ac.handle();
}
}
~~~
> 2、服务器端实现
~~~
public class AppServer {
private String groupNode = "sgroup";
private String subNode = "sub";
/**
* 连接zookeeper
* @param address server的地址
*/
public void connectZookeeper(String address) throws Exception {
ZooKeeper zk = new ZooKeeper(
"localhost:4180,localhost:4181,localhost:4182",
5000, new Watcher() {
public void process(WatchedEvent event) {
// 不做处理
}
});
// 在"/sgroup"下创建子节点
// 子节点的类型设置为EPHEMERAL_SEQUENTIAL, 表明这是一个临时节点, 且在子节点的名称后面加上一串数字后缀
// 将server的地址数据关联到新创建的子节点上
String createdPath = zk.create("/" + groupNode + "/" + subNode, address.getBytes("utf-8"),
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println("create: " + createdPath);
}
/**
* server的工作逻辑写在这个方法中
* 此处不做任何处理, 只让server sleep
*/
public void handle() throws InterruptedException {
Thread.sleep(Long.MAX_VALUE);
}
public static void main(String[] args) throws Exception {
// 在参数中指定server的地址
if (args.length == 0) {
System.err.println("The first argument must be server address");
System.exit(1);
}
AppServer as = new AppServer();
as.connectZookeeper(args[0]);
as.handle();
}
}
~~~
### 2 案例2——分布式共享锁
#### 1、需求描述
> 在我们自己的分布式业务系统中,可能会存在某种资源,需要被整个系统的各台服务器共享访问,但是只允许一台服务器同时访问
#### 2、设计思路
![](https://box.kancloud.cn/0e7437888f2b3a8806dbb2dc34a412d3_553x171.png)
#### 3、代码开发
~~~
public class DistributedClientMy {
// 超时时间
private static final int SESSION_TIMEOUT = 5000;
// zookeeper server列表
private String hosts = "spark01:2181,spark02:2181,spark03:2181";
private String groupNode = "locks";
private String subNode = "sub";
private boolean haveLock = false;
private ZooKeeper zk;
// 当前client创建的子节点
private volatile String thisPath;
/**
* 连接zookeeper
*/
public void connectZookeeper() throws Exception {
zk = new ZooKeeper("spark01:2181", SESSION_TIMEOUT, new Watcher() {
public void process(WatchedEvent event) {
try {
// 子节点发生变化
if (event.getType() == EventType.NodeChildrenChanged && event.getPath().equals("/" + groupNode)) {
// thisPath是否是列表中的最小节点
List<String> childrenNodes = zk.getChildren("/" + groupNode, true);
String thisNode = thisPath.substring(("/" + groupNode + "/").length());
// 排序
Collections.sort(childrenNodes);
if (childrenNodes.indexOf(thisNode) == 0) {
doSomething();
thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
// 创建子节点
thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
// wait一小会, 让结果更清晰一些
Thread.sleep(new Random().nextInt(1000));
// 监听子节点的变化
List<String> childrenNodes = zk.getChildren("/" + groupNode, true);
// 列表中只有一个子节点, 那肯定就是thisPath, 说明client获得锁
if (childrenNodes.size() == 1) {
doSomething();
thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
}
}
/**
* 共享资源的访问逻辑写在这个方法中
*/
private void doSomething() throws Exception {
try {
System.out.println("gain lock: " + thisPath);
Thread.sleep(2000);
// do something
} finally {
System.out.println("finished: " + thisPath);
// 将thisPath删除, 监听thisPath的client将获得通知
// 相当于释放锁
zk.delete(this.thisPath, -1);
}
}
public static void main(String[] args) throws Exception {
DistributedClientMy dl = new DistributedClientMy();
dl.connectZookeeper();
Thread.sleep(Long.MAX_VALUE);
}
}
~~~
- hadoop
- linux基础
- Linux入门
- Linux进阶
- shell
- Zookeeper
- Zookeeper简介及部署
- Zookeeper使用及API
- Redis
- Redis简介安装部署
- Redis使用及API
- Java高级增强
- Java多线程增强
- Maven简介及搭建
- Hive
- Hive简介及安装
- Hive操作
- HIve常用函数
- Hive数据类型
- Flume
- Flume简介及安装
- flume 拦截器(interceptor)
- azkaban
- azKaban简介及安装
- Sqoop
- Sqoop简介及安装
- HDFS
- HDFS原理
- HDFS操作API
- MAPREDUCE原理
- MAPREDUCE图片资源
- MAPREDUCE加强
- HBASE
- HBASE简介及安装
- HBASE操作及API
- HBASE内部原理
- Storm
- Storm简介及安装
- Storm原理
- kafka
- kafka简介及安装
- kafka常用操作及API
- kafka原理
- kafka配置详解
- Scala
- Scala简介及安装
- Scala基础语法
- Scala实战