Curator的使用
admin
2023-03-20 19:41:10
0


Curator



为了更好的实现Java操作zookeeper服务器,后来出现了Curator框架,非常的强大,目前已经是Apache的顶级项目,里面提供了更多丰富的操作,例如session超时重连、主从选举、分布式计数器、分布式锁等等适用于各种复杂的zookeeper场景的API封装




1 Curator框架使用(一)

Curator框架中使用链式编程风格,易读性更强,使用工厂方法创建连接对象。

1.使用CuratorFrameworkFactory的两个静态工厂方法(参数不同)来实现

1.1 connectString:连接串

1.2 retryPolicy:重试连接策略。有四种实现,分别是:ExponentialBackoffRetry、RetryNTimes、RetryOneTimes、RetryUntilElapsed

1.3sessionTimeoutMs:会话超时时间,默认为60000ms

1.4connectionTimeoutMs连接超时时间,默认为15000ms

 

注意对于retryPolicy策略通过一个接口来让用户自定义实现




2 Curator框架使用(二)

2.1创建连接

/** 重试策略: 初始时间为1s, 重试10次 */

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);

 

/** 通过工厂创建连接 */

CuratorFramework cf = CuratorFrameworkFactory.builder()

.connectString(ZK_ADDR)

.sessionTimeoutMs(SESSION_TIMEOUT)

.retryPolicy(retryPolicy)

.build();

 

/** 开启连接 */

cf.start();



2.2 新增节点

/**

 * 新增节点:指定节点类型(不加withMode默认为持久类型节点)、路径、数据内容

 * 1.creatingParentsIfNeeded() 递归创建父目录

 * 2.withMode() 节点类型(持久|临时)

 * 3.forPath() 指定路径

 */

cf.create()

.creatingParentsIfNeeded()

.withMode(CreateMode.PERSISTENT)

.forPath("/super/c1", "c1内容".getBytes());



2.3 删除节点

/**

 * 删除节点

 * 1.deletingChildrenIfNeeded() 递归删除

 * 2.guaranteed() 确保节点被删除

 * 3. withVersion(int version) //特定版本号  

 */

cf.delete().deletingChildrenIfNeeded().forPath("/super");



2.4 读取和修改数据

/**

 * 读取和修改数据 : getData()和setData()

 */

cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/super/c1", "c1内容".getBytes());

cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/super/c2", "c2内容".getBytes());

 

/** 读取节点内容 */

String c2_data = new String(cf.getData().forPath("/super/c2"));

System.out.println("c2_data-->"+c2_data);

 

/** 修改节点内容 */

cf.setData().forPath("/super/c2", "修改c2的内容".getBytes());

String update_c2_data = new String(cf.getData().forPath("/super/c2"));

System.out.println("update_c2_data-->"+update_c2_data);



2.5 绑定回调函数

ExecutorService pool = Executors.newCachedThreadPool();

 

cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)

.inBackground(new BackgroundCallback() {


@Override

public void proce***esult(CuratorFramework cf, CuratorEvent event)

throws Exception {

System.out.println("code-->" + event.getResultCode());

System.out.println("type-->" + event.getType());

System.out.println("线程为-->" + Thread.currentThread().getName());

}

}, pool).forPath("/super/c3", "c2的内容".getBytes());

 

System.out.println("主线程-->" + Thread.currentThread().getName());

 

Thread.sleep(Integer.MAX_VALUE);



2.6 读取子节点和判断节点是否存在

/**

 * 读取子节点的方法: getChildren()

 * 判断节点是否存在: checkExists()

 */

List list = cf.getChildren().forPath("/super");

for (String p: list) {

System.out.println(p);

}

 

//如果为null标识不存在

Stat stat = cf.checkExists().forPath("/super/c4");

System.out.println(stat);



3 Curator框架使用(三)

如果要使用类似Wather的监听功能Curator必须依赖一个jar包,Maven依赖

org.apache.curator

curator-recipes

2.4.2

有了这个依赖包,使用NodeCache的方式去客户端实例中注册一个监听缓存,然后实现对应的监听方法即可,这里主要有两种监听方式

NodeCacheListener:监听节点的新增、修改操作

PathChildrenCacheListener:监听子节点的新增、修改、删除操作



4 Curator使用场景

4.1 分布式锁

在分布式场景中,为了保证数据的一致性,经常在程序运行的某一个点需要进行同步操作(java提供了synchronized或者Reentrantlock实现)比如看一个小示例,这个示例出现分布式不同步的问题

比如:之前是在高并发下访问一个程序,现在则是在高并发下访问多个服务器节点(分布式)

 

使用Curator基于zookeeper的特性提供的分布式锁来处理分布式场景的数据一致性,zookeeper本身的分布式是有写问题的,之前实现的时候遇到过,这里强烈推荐使用Curator分布式锁

public class Lock2 {
/** zk地址 */
private static final String ZK_ADDR = "192.168.1.220:2181,192.168.1.127:2181,192.168.1.128:2181";
/** session超时时间 */
private static final int SESSION_TIMEOUT = 5000; //MS
static int count = 10;
public static CuratorFramework createCuratorFramework(){
CuratorFramework cf = CuratorFrameworkFactory.builder()
.connectString(ZK_ADDR)
.sessionTimeoutMs(SESSION_TIMEOUT)
.retryPolicy(new ExponentialBackoffRetry(1000, 10))
.build();
return cf;
}
public static void main(String[] args) throws Exception {
final CountDownLatch countDown = new CountDownLatch(1);
for (int i =0; i < 10; i++) {
new Thread(new Runnable() {
@Override
public void run() {
CuratorFramework cf = createCuratorFramework();
cf.start();
//锁对象 client 锁节点
final InterProcessMutex lock = new InterProcessMutex(cf, "/super");
try {
countDown.await();
lock.acquire(); //获得锁
number();
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
lock.release();//释放锁
} catch (Exception e) {
e.printStackTrace();
}
}
}
},"t" + i).start();;
}
Thread.sleep(2000);
countDown.countDown();
}
 
public static void number() {
count--;
System.out.println(Thread.currentThread().getName() + "-->" + count);
}
}




4.2 分布式计数器功能

一说到分布式计数器,可能脑海里想到AtomicInteger(原子累加)这种经典方式,如果针对一个JVM的场景当然没问题,但是现在是在分布式场景下,就需要利用Curator框架的DistributedAtomicInteger了

public class CuratorAtomicInteger {
/** zk地址 */
private static final String ZK_ADDR = "192.168.1.220:2181,192.168.1.127:2181,192.168.1.128:2181";
/** session超时时间 */
private static final int SESSION_TIMEOUT = 5000; //MS
public static void main(String[] args) throws Exception {
CuratorFramework cf = CuratorFrameworkFactory.builder()
.connectString(ZK_ADDR)
.sessionTimeoutMs(SESSION_TIMEOUT)
.retryPolicy(new ExponentialBackoffRetry(1000, 10))
.build();
cf.start();
//使用DistributedAtomicInteger
DistributedAtomicInteger atomicInteger = new DistributedAtomicInteger(cf, "/superM", new RetryNTimes(3, 1000));
//atomicInteger.increment();
atomicInteger.add(1);
AtomicValue atomicValue = atomicInteger.get();
System.out.println("atomicValue.succeeded()-->" + atomicValue.succeeded());
System.out.println("atomicValue.postValue()-->" + atomicValue.postValue());
System.out.println("atomicValue.preValue()-->" + atomicValue.preValue());
}
}





4.3 Barrier

4.3.1 DistributedDoubleBarrier

分布式Barrier 类DistributedDoubleBarrier: 它会阻塞所有节点上的等待进程,直到某一个被满足, 然后所有的节点同时开始,中间谁先运行完毕,谁后运行完毕不关心,但是最终一定是一块退出运行的

 

public class CuratorBarrier {
/** zk地址 */
private static final String ZK_ADDR = "192.168.1.220:2181,192.168.1.127:2181,192.168.1.128:2181";
/** session超时时间 */
private static final int SESSION_TIMEOUT = 5000; //MS
public static void main(String[] args) throws Exception{
for (int i =0; i < 5; i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
/** 实例化5个客户端对象 */
CuratorFramework cf = CuratorFrameworkFactory.builder()
.connectString(ZK_ADDR)
.sessionTimeoutMs(SESSION_TIMEOUT)
.retryPolicy(new ExponentialBackoffRetry(1000, 10))
.build();
cf.start();
DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(cf, "/superBarrier", 5);
Thread.sleep(1000 * (new Random()).nextInt(3));
System.out.println(Thread.currentThread().getName() + " 已准备好!");
barrier.enter();
System.out.println("同时开始运行...");
Thread.sleep(1000 * (new Random()).nextInt(3));
System.out.println("运行完毕...");
barrier.leave();
System.out.println("同时退出运行...");
} catch (Exception e) {
e.printStackTrace();
}
}
},"t" + i).start();;
}
}
}





4.3.2 DistributedBarrier

分布式Barrier 类DistributedBarrier: 它会阻塞所有节点上的等待进程(所有节点进入待执行状态),直到“某一个人吹哨”说开始执行, 然后所有的节点同时开始

public class CuratorBarrier2 {
/** zk地址 */
private static final String ZK_ADDR = "192.168.1.220:2181,192.168.1.127:2181,192.168.1.128:2181";
/** session超时时间 */
private static final int SESSION_TIMEOUT = 5000; //MS
static DistributedBarrier barrier = null;
public static void main(String[] args) throws Exception{
for (int i =0; i < 5; i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
/** 实例化5个客户端对象 */
CuratorFramework cf = CuratorFrameworkFactory.builder()
.connectString(ZK_ADDR)
.sessionTimeoutMs(SESSION_TIMEOUT)
.retryPolicy(new ExponentialBackoffRetry(1000, 10))
.build();
cf.start();
barrier = new DistributedBarrier(cf, "/superBarrier");
System.out.println(Thread.currentThread().getName() + " 设置barrier");
barrier.setBarrier(); //设置
barrier.waitOnBarrier(); //等待
System.out.println("开始执行程序...");
} catch (Exception e) {
e.printStackTrace();
}
}
},"t" + i).start();;
}
Thread.sleep(5000);
barrier.removeBarrier(); //释放
}
}





5 Curator重试策略

Curator内部实现的几种重试策略:

1.ExponentialBackoffRetry:重试指定的次数, 且每一次重试之间停顿的时间逐渐增加.

2.RetryNTimes:指定最大重试次数的重试策略   

3.RetryOneTime:仅重试一次

4.RetryUntilElapsed:一直重试直到达到规定的时间



5.1 ExponentialBackoffRetry

ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries)

ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)

 

参数说明

   1.baseSleepTimeMs 初始sleep时间

   2.maxRetries 最大重试次数

   3.maxSleepMs 最大重试时间 




5.2 RetryNTimes

RetryNTimes(int n, int sleepMsBetweenRetries)


参数说明

   1.n 最大重试次数

   2.sleepMsBetweenRetries 每次重试的间隔时间

 



5.3 RetryOneTime

RetryOneTime(int sleepMsBetweenRetry)

 

参数说明

   1.sleepMsBetweenRetry为重试间隔的时间

 

5.4 RetryUntilElapsed

RetryUntilElapsed(int maxElapsedTimeMs, int sleepMsBetweenRetries)

 

参数说明

   1.maxElapsedTimeMs 最大重试时间

   2.sleepMsBetweenRetries 每次重试的间隔时间

 

 

 













相关内容

热门资讯

学习新语·政绩观|“愚公”治山... 统筹:郭洁宇 朱旭东设计:殷哲伦新华社新媒体中心新华社出品
请问瓷砖外墙做防水多少钱 已有2条回答 回复者:小休维 瓷砖外墙防水涂料60元一平方。外墙透明防水涂料无色、透明、涂覆...
旧房翻新可以直接贴壁纸吗? 不可以的。首先墙纸就不能贴在乳胶漆墙面上,因为乳胶漆的工艺、质量等都会影响到墙皮的附着力,所以在乳胶...
怎样翻新厨房瓷砖墙面?墙面瓷砖... 厨房理不管是哪一种的瓷砖,都可以在旧瓷砖上贴新的瓷砖。个别严重起鼓或者破坏的瓷砖需要革除,革除后用马...
无缝瓷砖真的可以做到无缝吗 无缝瓷砖之间的缝隙确实比较小,但是想要做到真正的无缝应该是不太可能的,据说这种无缝瓷砖每个瓷砖之间的...
瓷砖可以翻新吗 瓷砖可以翻新。瓷砖翻新的方法有多种,比如可以重新进行上色,用瓷漆在瓷砖上重新进行涂色和美化,在施工过...
庄瑞雄称沈伯洋当市长2天就能解... 海峡导报综合报道 台民意机构民进党团干事长庄瑞雄日前称,鼠患是城市治理的问题,蒋万安必须要展现市长的...
双层大巴撞上限高架,车头嵌入架... 5月9日,广东湛江一双层大巴撞上限高架,造成车身和限高架受损,无人受伤。相关视频显示,一辆白色双层大...
五一票房冠军是惊悚片,“下沉市... 【文/新潮观鱼】今年“五一”档,有一个有意思的现象:一部看起来没有“爆款相”,演员和导演都没有很大票...
“几轮博弈后,特朗普发现:中国... 【文/观察者网 王一】当地时间5月9日,英国《金融时报》发长文分析称,在美国与中国围绕贸易、科技、地...