首页 最新 热门 推荐

  • 首页
  • 最新
  • 热门
  • 推荐

4、zookeeper的java三种客户端介绍-Curator(crud、事务操作、监听、分布式计数器、分布式锁)Zookeeper系列文章目录一、zookeeper原生Java API二、ZkClient三、Apache curator

  • 23-09-04 16:01
  • 3622
  • 6385
blog.csdn.net

Zookeeper系列文章目录

1、zookeeper3.7.1安装与验证
2、zookeeper基本操作及应用示例(shell、java api、应用场景示例)
3、zookeeper的选举----经验证符合事实,网上很多都是错误的
4、zookeeper的java三种客户端介绍-Curator(crud、事务操作、监听、分布式计数器、分布式锁)
5、zookeeper的java -Curator(服务注册与发现)


目录

  • Zookeeper系列文章目录
  • 一、zookeeper原生Java API
  • 二、ZkClient
  • 三、Apache curator
    • 1、pom.xml
    • 2、定义常量类
    • 3、连接实例化
    • 4、事务操作示例
    • 5、CRUD示例
    • 6、监听示例
    • 7、计数器示例
      • 1)、单机原子自增性实现
        • 1、Synchronized示例
        • 2、Lock示例
        • 3、AtomicInteger示例
      • 2)、分布式线程安全原子自增实现
    • 8、分布式锁示例
          • 1)、实现原理
          • 2)、优点
          • 3)、缺点
          • 4)、实现


zookeeper常用的3种java客户端

  • zookeeper原生Java API
  • ZkClient
  • Apache curator
    本文将按照顺序逐一简单介绍其使用。

一、zookeeper原生Java API

Zookeeper客户端提供了基本的操作,比如创建会话、创建节点、读取节点、更新数据、删除节点和检查节点是否存在等。但对于开发人员来说,Zookeeper提供的基本操纵还是有一些不足之处。
Zookeeper API不足之处
(1)Session超时之后没有实现重连机制,需要手动操作;
(2)Watcher注册是一次性的,每次触发之后都需要重新进行注册;
(3)不支持递归创建节点;
(4)异常处理繁琐,Zookeeper提供了很多异常,对于开发人员来说可能根本不知道该如何处理这些异常信息;
(5)只提供了简单的byte[]数组的接口,没有提供针对对象级别的序列化;
(6)创建节点时如果节点存在抛出异常,需要自行检查节点是否存在;
(7)删除节点无法实现级联删除;
基于以上原因,直接使用Zookeeper原生API的人并不多。

具体使用参见文章zookeeper的基本操作及应用示例中的介绍。

二、ZkClient

ZkClient是一个开源客户端,在Zookeeper原生API接口的基础上进行了包装,更便于开发人员使用。解决如下问题:
1)session会话超时重连
2)解决Watcher反复注册
3)简化API开发
虽然 ZkClient 对原生 API 进行了封装,但也有它自身的不足之处:

  • 几乎没有参考文档;
  • 异常处理简化(抛出RuntimeException);
  • 重试机制比较难用;
  • 没有提供各种使用场景的实现;

示例欠奉,pom.xml文件增加如下内容:


<dependency>
    <groupId>com.101tecgroupId>
    <artifactId>zkclientartifactId>
    <version>0.10version>
dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

三、Apache curator

Curator是Netflix公司开源的一套Zookeeper客户端框架,和ZkClient一样,解决了非常底层的细节开发工作,包括连接重连、反复注册Watcher和NodeExistsException异常等。目前已经成为 Apache 的顶级项目。
官网:http://curator.apache.org/index.html
Apache Curator是Apache ZooKeeper的Java / JVM客户端库,Apache ZooKeeper是一种分布式协调服务。它包括一个高级API框架和实用程序,使Apache ZooKeeper更容易和更可靠。它还包括常见用例和扩展(如服务发现和Java 8异步DSL)的配方。
其特点:

  • Apache 的开源项目
  • 解决Watch注册一次就会失效的问题
  • 提供一套Fluent风格的 API 更加简单易用
  • 提供更多解决方案并且实现简单,例如:分布式锁
  • 提供常用的ZooKeeper工具类
  • 编程风格更舒服

除此之外,Curator中还提供了Zookeeper各种应用场景(Recipe,如共享锁服务、Master选举机制和分布式计算器等)的抽象封装。

Curator项目组件
在这里插入图片描述

组件的Maven依赖
在这里插入图片描述
源码地址:https://github.com/apache/curator
如果只想使用Curator操作Zookeeper增删改查,则使用curator-client包及curator-framework包。zookeeper的版本是3.7.1

1、pom.xml


<dependency>
	<groupId>org.apache.curatorgroupId>
	<artifactId>curator-frameworkartifactId>
	<version>5.3.0version>
dependency>

<dependency>
	<groupId>org.apache.curatorgroupId>
	<artifactId>curator-clientartifactId>
	<version>5.3.0version>
dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

2、定义常量类

import org.apache.curator.retry.ExponentialBackoffRetry;

/**
 * @author alanchan
 *
 */
public class Constant {
	// 会话超时时间 10000
	public final static int SESSION_TIMEOUT = 10 * 1000;

	// 连接超时时间 50000
	public final static int CONNECTION_TIMEOUT = 50 * 1000;

	// ZooKeeper服务地址
	public static String zkServerAddress = "192.168.10.41:2118,192.168.10.42:2118,192.168.10.43:2118";

	// 1 重试策略:初试时间为1s 重试3次
	public static ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

3、连接实例化


import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
import org.apache.zookeeper.ZooKeeper.States;


public class App {

	public static void main(String[] args) {
		// 例子1
//		test1();

		// 例子2
//		CuratorFramework zkClient = createSimple(zkServerAddress);
//		zkClient.start();

		// 例子3
		ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
		CuratorFramework zkClient = createWithOptions(Constant.zkServerAddress, retryPolicy, 5000, 3000);
		zkClient.start();

		System.out.println(States.CONNECTED);
		System.out.println(zkClient.getState());
	}

	public static void test1() {
//    * @param baseSleepTimeMs initial amount of time to wait between retries  初始重试等待时间
//    * @param maxRetries max number of times to retry 最大重试次数
//    * @param maxSleepMs max time in ms to sleep on each retry 最大重试等待时间
		ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3, 5000);

		CuratorFramework zkClient = CuratorFrameworkFactory.builder().connectString(Constant.zkServerAddress)
				.sessionTimeoutMs(3000).connectionTimeoutMs(5000).retryPolicy(retryPolicy).build();
		// 很重要 一定要调用start来创建session链接
		zkClient.start();

		CloseableUtils.closeQuietly(zkClient);
	}

	public static CuratorFramework createSimple(String connectionString) {
		// these are reasonable arguments for the ExponentialBackoffRetry. The first
		// retry will wait 1 second - the second will wait up to 2 seconds - the
		// third will wait up to 4 seconds.
		ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);

		// The simplest way to get a CuratorFramework instance. This will use default
		// values.
		// The only required arguments are the connection string and the retry policy
		return CuratorFrameworkFactory.newClient(connectionString, retryPolicy);
	}

	// int connectionTimeoutMs  50000
	// int sessionTimeoutMs 10000
	//连接时间要大于会话时间,如果设置不当,会出现 KeeperErrorCode = ConnectionLoss异常
	public static CuratorFramework createWithOptions(String connectionString, RetryPolicy retryPolicy,
			int connectionTimeoutMs, int sessionTimeoutMs) {
		// using the CuratorFrameworkFactory.builder() gives fine grained control
		// over creation options. See the CuratorFrameworkFactory.Builder javadoc
		// details
		return CuratorFrameworkFactory.builder().connectString(connectionString).retryPolicy(retryPolicy)
				.connectionTimeoutMs(connectionTimeoutMs).sessionTimeoutMs(sessionTimeoutMs)
				// etc. etc.
				.build();
	}
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69

4、事务操作示例

import java.util.Collection;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.transaction.CuratorOp;
import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
import org.zookeeper_curator.App;
import org.zookeeper_curator.Constant;

/**
 * @author alanchan Curator 事务管理 操作
 */
public class TransactionExamples {

	public static void main(String[] args) throws Exception {

		transaction(getCuratorFramework());

	}

	private static CuratorFramework getCuratorFramework() {
		CuratorFramework client = App.createWithOptions(Constant.zkServerAddress, Constant.retryPolicy,
				Constant.CONNECTION_TIMEOUT, Constant.SESSION_TIMEOUT);
		client.start();
		return client;
	}

	public static Collection<CuratorTransactionResult> transaction(CuratorFramework client) throws Exception {
		// this example shows how to use ZooKeeper's transactions

		CuratorOp createOp = client.transactionOp().create().forPath("/a/path", "some data".getBytes());
		CuratorOp setDataOp = client.transactionOp().setData().forPath("/another/path", "other data".getBytes());
		CuratorOp deleteOp = client.transactionOp().delete().forPath("/yet/another/path");

		// 将上述基本操作封装程一个事务
		//如果上述三个操作都没有事先创建路径的话,不会成功任何一个命令
		//为了验证,先创建一个/a目录,按照不增加事务控制的话,create /a/path,应该成功,因为增加了事务控制,应该都不会成功
		//如果先创建了/a目录,第一条命令create /a/path会成功,但没有提前创建/another/path,所以修改数据不会成功
		Collection<CuratorTransactionResult> results = client.transaction().forOperations(createOp, setDataOp,
				deleteOp);

		for (CuratorTransactionResult result : results) {
			System.out.println(result.getForPath() + " - " + result.getType());
		}

		return results;
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47

5、CRUD示例

import java.util.List;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.Watcher;
import org.zookeeper_curator.App;
import org.zookeeper_curator.Constant;

/**
 * @author alanchan
 *
 */
public class CrudExamples {
	// 创建连接实例
	private static CuratorFramework zkClient = null;

	public static void main(String[] args) throws Exception {

		// int connectionTimeoutMs 50000
		// int sessionTimeoutMs 10000
		// 2 通过工厂创建连接
		zkClient = App.createWithOptions(Constant.zkServerAddress, Constant.retryPolicy, Constant.CONNECTION_TIMEOUT,
				Constant.SESSION_TIMEOUT);

		zkClient.start();

//		zkClient.create().forPath("/testcurator", "testing".getBytes());

//		create(zkClient, "/testcurator", "testing".getBytes());
//		createEphemeral(zkClient, "/testcurator", "testing".getBytes());

//		createEphemeralSequential(zkClient, "/testcurator", "testing".getBytes());
//		createIdempotent(zkClient, "/testcurator", "testing".getBytes());

//		setData(zkClient, "/testcurator", "testing2".getBytes());

//		setDataAsync(zkClient, "/testcurator", "testing32".getBytes());

		BackgroundCallback callback = new BackgroundCallback() {

			@Override
			public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
				System.out.println("监听事件触发,event内容为:" + event);
			}
		};
		setDataAsyncWithCallback(zkClient, callback, "/testcurator", "testing".getBytes());

		System.out.println(zkClient.getState());
		CloseableUtils.closeQuietly(zkClient);
	}

	// 创建默认节点
	public static void create(CuratorFramework client, String path, byte[] payload) throws Exception {
		// this will create the given ZNode with the given data
		client.create().forPath(path, payload);
	}

	// 创建临时节点
	public static void createEphemeral(CuratorFramework client, String path, byte[] payload) throws Exception {
		// this will create the given EPHEMERAL ZNode with the given data
		client.create().withMode(CreateMode.EPHEMERAL).forPath(path, payload);
	}

	// 创建序列临时节点
	public static String createEphemeralSequential(CuratorFramework client, String path, byte[] payload)
			throws Exception {
		// this will create the given EPHEMERAL-SEQUENTIAL ZNode with the given data
		// using Curator protection.
		return client.create().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, payload);
	}

	// 创建幂等的节点
	public static void createIdempotent(CuratorFramework client, String path, byte[] payload) throws Exception {
		/*
		 * This will create the given ZNode with the given data idempotently, meaning
		 * that if the initial create failed transiently, it will be retried and behave
		 * as if the first create never happened, even if the first create actually
		 * succeeded on the server but the client didn't know it.
		 */
		client.create().idempotent().forPath(path, payload);
	}

	// 针对节点,修改其数据
	public static void setData(CuratorFramework client, String path, byte[] payload) throws Exception {
		// set data for the given node
		client.setData().forPath(path, payload);
	}

	// 此监听主要针对background通知和错误通知。使用此监听器之后,调用inBackground方法会异步获得监听,而对于节点的创建或修改则不会触发监听事件
	public static void setDataAsync(CuratorFramework client, String path, byte[] payload) throws Exception {
		// this is one method of getting event/async notifications
		CuratorListener listener = new CuratorListener() {
			@Override
			public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception {
				// examine event for details
				System.out.println("监听事件触发,event内容为:" + event);
			}
		};
		client.getCuratorListenable().addListener(listener);

		// 异步获取节点数据
		client.getData().inBackground().forPath(path);

		// 变更节点内容
		// set data for the given node asynchronously. The completion notification
		// is done via the CuratorListener.
		client.setData().inBackground().forPath(path, payload);
		Thread.sleep(Integer.MAX_VALUE);
	}

	// 通过回调函数获取数据变化
	public static void setDataAsyncWithCallback(CuratorFramework client, BackgroundCallback callback, String path,
			byte[] payload) throws Exception {
		// this is another method of getting notification of an async completion
		client.setData().inBackground(callback).forPath(path, payload);

	}

	//
	public static void setDataIdempotent(CuratorFramework client, String path, byte[] payload, int currentVersion)
			throws Exception {
		/*
		 * This will set the given ZNode with the given data idempotently, meaning that
		 * if the initial setData failed transiently, it will be retried and behave as
		 * if the first setData never happened, even if the first setData actually
		 * succeeded on the server but the client didn't know it. In other words, if
		 * currentVersion == X and payload = P, this will return success if the znode
		 * ends up in the state (version == X+1 && data == P). If withVersion is not
		 * specified, it will end up with success so long as the data == P, no matter
		 * the znode version.
		 */
		client.setData().idempotent().withVersion(currentVersion).forPath(path, payload);
		client.setData().idempotent().forPath(path, payload);
	}

	// 删除节点
	public static void delete(CuratorFramework client, String path) throws Exception {
		// delete the given node
		client.delete().forPath(path);
	}

	//
	public static void guaranteedDelete(CuratorFramework client, String path) throws Exception {
		// delete the given node and guarantee that it completes

		/*
		 * Guaranteed Delete
		 * 
		 * Solves this edge case: deleting a node can fail due to connection issues.
		 * Further, if the node was ephemeral, the node will not get auto-deleted as the
		 * session is still valid. This can wreak havoc with lock implementations.
		 * 
		 * 
		 * When guaranteed is set, Curator will record failed node deletions and attempt
		 * to delete them in the background until successful. NOTE: you will still get
		 * an exception when the deletion fails. But, you can be assured that as long as
		 * the CuratorFramework instance is open attempts will be made to delete the
		 * node.
		 */

		client.delete().guaranteed().forPath(path);
	}

	//
	public static void deleteIdempotent(CuratorFramework client, String path, int currentVersion) throws Exception {
		/*
		 * This will delete the given ZNode with the given data idempotently, meaning
		 * that if the initial delete failed transiently, it will be retried and behave
		 * as if the first delete never happened, even if the first delete actually
		 * succeeded on the server but the client didn't know it. In other words, if
		 * currentVersion == X, this will return success if the znode ends up deleted,
		 * and will retry after connection loss if the version the znode's version is
		 * still X. If withVersion is not specified, it will end up successful so long
		 * as the node is deleted eventually. Kind of like guaranteed but not in the
		 * background. For deletes this is equivalent to the older quietly() behavior,
		 * but it is also provided under idempotent() for compatibility with
		 * Create/SetData.
		 */
		client.delete().idempotent().withVersion(currentVersion).forPath(path);
		client.delete().idempotent().forPath(path);
		client.delete().quietly().withVersion(currentVersion).forPath(path);
		client.delete().quietly().forPath(path);
	}

	//
	public static List<String> watchedGetChildren(CuratorFramework client, String path) throws Exception {
		/**
		 * Get children and set a watcher on the node. The watcher notification will
		 * come through the CuratorListener (see setDataAsync() above).
		 */
		return client.getChildren().watched().forPath(path);
	}

	//
	public static List<String> watchedGetChildren(CuratorFramework client, String path, Watcher watcher)
			throws Exception {
		/**
		 * Get children and set the given watcher on the node.
		 */
		return client.getChildren().usingWatcher(watcher).forPath(path);
	}

	public static boolean isExists(CuratorFramework zkClient, String path) throws Exception {
		/*
		 * boolean flag = false; Stat stat = zkClient.checkExists().forPath(path); if
		 * (stat != null) { flag = true; }
		 */
		return zkClient.checkExists().forPath(path) != null ? true : false;
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210
  • 211
  • 212
  • 213
  • 214
  • 215

6、监听示例

包含原生API和CuratorCache、CuratorListener
atomic,分布式计数器(DistributedAtomicLong),能在分布式环境下实现原子自增
barriers,分布式屏障(DistributedBarrier),使用屏障来阻塞分布式环境中进程的运行,直到满足特定的条件
cache,监听机制,分为NodeCache(监听节点数据变化),PathChildrenCache(监听节点的子节点数据变化),TreeCache(既能监听自身节点数据变化也能监听子节点数据变化)
leader,leader选举
locks,分布式锁
nodes,提供持久化节点(PersistentNode)服务,即使客户端与zk服务的连接或者会话断开
queue,分布式队列(包括优先级队列DistributedPriorityQueue,延迟队列DistributedDelayQueue等)
shared,分布式计数器SharedCount
要使用该部分功能,则需要在pom.xml中增加如下部分


<dependency>
    <groupId>org.apache.curatorgroupId>
    <artifactId>curator-recipesartifactId>
    <version>5.3.0version>
dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

代码示例如下:

import java.util.function.Consumer;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorListener;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.CuratorCache;
import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
import org.apache.curator.framework.recipes.cache.CuratorCacheListenerBuilder.ChangeListener;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.utils.CloseableUtils;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.zookeeper_curator.App;
import org.zookeeper_curator.Constant;

/**
 * @author chenw
 * TreeCacheListener、PathChildrenCacheListener和NodeCacheListener均已过期,替代的是CuratorCache
 */
public class ListenerDemo {
	// 创建连接实例
	private static CuratorFramework zkClient = null;

	public static void main(String[] args) throws Exception {
		zkClient = App.createWithOptions(Constant.zkServerAddress, Constant.retryPolicy, Constant.CONNECTION_TIMEOUT,
				Constant.SESSION_TIMEOUT);
		zkClient.start();

		String path = "/testNode";

		//
		if (!isExists(zkClient, path)) {
			zkClient.create().forPath(path, "testing".getBytes());
		}

//		watchEvent(zkClient, path);

//		curatorListener(zkClient, path);

//		curatorNodeCache(zkClient, path);

		curatorNodeCacheByLambda(zkClient, path);

		CloseableUtils.closeQuietly(zkClient);
	}

	// 使用原生zookeeper api,监控路径变化,只能监听一次变化
//	运行结果如下:
//	监听节点内容:testing
//	监听器watchedEvent:WatchedEvent state:SyncConnected type:NodeDataChanged path:/testNode
//实际数据如下:
//	[zk: server1:2118(CONNECTED) 39] get /testNode 
//	second data	
	private static void watchEvent(CuratorFramework zkClient, String path) throws Exception {

		byte[] data = zkClient.getData().usingWatcher(new Watcher() {
			@Override
			public void process(WatchedEvent watchedEvent) {
				System.out.println("监听器watchedEvent:" + watchedEvent);
			}
		}).forPath(path);
		System.out.println("监听节点内容:" + new String(data));

		// 第一次变更节点数据
		zkClient.setData().forPath(path, "new data".getBytes());

		// 第二次变更节点数据
		zkClient.setData().forPath(path, "second data".getBytes());
		Thread.sleep(Integer.MAX_VALUE);
	}

	// CuratorListener监听,此监听主要针对background通知和错误通知。使用此监听器之后,调用inBackground方法会异步获得监听,而对于节点的创建或修改则不会触发监听事件
	// 其中两次触发监听事件,第一次触发为注册监听事件时触发,第二次为getData异步处理返回结果时触发。而setData的方法并未触发监听事件。
	// 测试结果是只有getData数据时有结果返回

//	运行结果如下:
//	监听事件触发,event内容为:testing   CuratorEventImpl{type=GET_DATA, resultCode=0, path='/testNode', name='null', children=null, context=null, stat=55834578087,55834578087,1659404526966,1659404526966,0,0,0,0,7,0,55834578087
//			, data=[116, 101, 115, 116, 105, 110, 103], watchedEvent=null, aclList=null, opResults=null}

	// 实际结果
//	[zk: server1:2118(CONNECTED) 56] get /testNode 
//	testing123
	private static void curatorListener(CuratorFramework zkClient, String path) throws Exception {

		zkClient.getCuratorListenable().addListener(new CuratorListener() {
			// 增加监听方法
			public void eventReceived(CuratorFramework client, CuratorEvent event) {
				System.out.println("监听事件触发,event内容为:" + new String(event.getData()) + "   " + event);

			}
		});

		// 异步获取节点数据
		zkClient.getData().inBackground().forPath(path);

		// 变更节点内容
		zkClient.setData().forPath(path, "testing123".getBytes());

		Thread.sleep(Integer.MAX_VALUE);
	}

	// 该版本适合zookeeper 3.6.0 以上版本,服务器 zookeeper 服务也需要为 3.6.0 版本
	// CuratorCache会试图将来自节点的数据保存在本地缓存中。
	// 可以缓存指定的单个节点,也可以缓存以指定节点为根的整个子树(默认缓存方案)。可以给CuratorCache实例注册监听器,当相关节点发生更改时会接收到通知,
	// 将响应更新、创建、删除等事件。

	// 客户端操作结果
//	[zk: server2:2118(CONNECTED) 138] delete /testNode 
//	[zk: server2:2118(CONNECTED) 140] ls /
//	[zookeeper]
//	[zk: server2:2118(CONNECTED) 141] create /testNode
//	Created /testNode
//	[zk: server2:2118(CONNECTED) 142] ls /
//	[testNode, zookeeper]
//	[zk: server2:2118(CONNECTED) 143] set /testNode "testing1234"
//	[zk: server2:2118(CONNECTED) 144] get /testNode 
//	testing1234

//	程序运行结果
//	   create:ChildData{path='/testNode', stat=55834578106,55834578106,1659419377672,1659419377672,0,0,0,0,7,0,55834578106, data=[116, 101, 115, 116, 105, 110, 103]}
//	   初始化...
//	   delete:ChildData{path='/testNode', stat=55834578106,55834578106,1659419377672,1659419377672,0,0,0,0,7,0,55834578106, data=[116, 101, 115, 116, 105, 110, 103]}
//	   create:ChildData{path='/testNode', stat=55834578109,55834578109,1659419426777,1659419426777,0,0,0,0,0,0,55834578109, data=null}
//	   ChildData{path='/testNode', stat=55834578109,55834578109,1659419426777,1659419426777,0,0,0,0,0,0,55834578109 , data=null}   change  to ChildData{path='/testNode', stat=55834578109,55834578110,1659419426777,1659419448002,1,0,0,0,11,0,55834578109, data=[116, 101, 115, 116, 105, 110, 103, 49, 50, 51, 52]}
	private static void curatorNodeCache(CuratorFramework zkClient, String path) throws Exception {
		CuratorCache cache = CuratorCache.build(zkClient, path);

		CuratorCacheListener listener = CuratorCacheListener.builder().forCreates(new Consumer<ChildData>() {// 创建监控

			@Override
			public void accept(ChildData t) {
				System.out.println("   create:" + t);
			}
		}).forChanges(new ChangeListener() {// 改变监控

			@Override
			public void event(ChildData oldNode, ChildData node) {
				System.out.println(oldNode + "   change  to " + node);
			}
		}).forDeletes(new Consumer<ChildData>() {// 删除监控

			@Override
			public void accept(ChildData t) {
				System.out.println("   delete:" + t);
			}
		}).forInitialized(new Runnable() {// 初始化监控

			@Override
			public void run() {
				System.out.println("   初始化...");
			}
		}).build();
		// register the listener
		cache.listenable().addListener(listener);

		// the cache must be started
		cache.start();

		Thread.sleep(Integer.MAX_VALUE);
	}

	// 该方法与curatorNodeCache方法功能完全一致,不同的是写法
	private static void curatorNodeCacheByLambda(CuratorFramework zkClient, String path) throws Exception {

		CuratorCache cache = CuratorCache.build(zkClient, path);

		CuratorCacheListener listener = CuratorCacheListener.builder()
				.forCreates(node -> System.out.println("   create:" + node))
				.forChanges((oldNode, node) -> System.out.println(oldNode + "   change  to " + node))
				.forDeletes(node -> System.out.println("   delete:" + node))
				.forInitialized(() -> System.out.println("   初始化...")).build();

		// register the listener
		cache.listenable().addListener(listener);

		// the cache must be started
		cache.start();

		Thread.sleep(Integer.MAX_VALUE);
	}

	private static boolean isExists(CuratorFramework zkClient, String path) throws Exception {
		/*
		 * boolean flag = false; Stat stat = zkClient.checkExists().forPath(path); if
		 * (stat != null) { flag = true; }
		 */
		return zkClient.checkExists().forPath(path) != null ? true : false;
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190

7、计数器示例

1)、单机原子自增性实现

原子性就是指该操作是不可再分的,要么全部执行,要么全部不执行。
Java中保证原子性操作的有:

  • Synchronized和Lock,加锁使得同一时刻只有一个线程能访问共享变量,操作自然是原子的
  • java.util.concurrent.atomic下的原子操作类,如AtomicInteger,AtomicReference,基于Cas算法实现了类似乐观锁版本更新控制的原子操作

这两种方法,下面会分别介绍如何使用来解决i++原子性问题
volatile关键字不能保证原子性,保证原子性还得通过synchronized,Lock和java.util.concurrent.atomic下的原子操作类。

1、Synchronized示例

    static int count = 0;
    static CountDownLatch countDownLatch = new CountDownLatch(10);
    static Object lock = new Object();
    public static void main(String[] args) {
        //创建10个线程 每个线程内部自增100000次
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                for (int j = 0; j < 100000; j++) {
	                //锁住lock对象,lock对象所有线程共享
                    synchronized (lock) {
                        count++;
                    }
                }
                countDownLatch.countDown();
            }).start();
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(count);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

2、Lock示例

    static int count = 0;
    static CountDownLatch countDownLatch = new CountDownLatch(10);
    static ReentrantLock lock = new ReentrantLock();

    public static void main(String[] args) {
        //创建10个线程 每个线程内部自增100000次
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                for (int j = 0; j < 100000; j++) {
                    lock.lock();
                    count++;
                    lock.unlock();
                }
                countDownLatch.countDown();
            }).start();
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(count);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

ReentrantLock相比Synchronized,主要区别有以下几点
在这里插入图片描述

3、AtomicInteger示例

将count变量设为AtomicInteger,使用incrementAndGet方法也能实现,变量原子自增

    static CountDownLatch countDownLatch = new CountDownLatch(10);
    static AtomicInteger count = new AtomicInteger(0);
    public static void main(String[] args) {
        //创建10个线程 每个线程内部自增100000次
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                for (int j = 0; j < 100000; j++) {
                    count.incrementAndGet();
                }
                countDownLatch.countDown();
            }).start();
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(count.get());
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

2)、分布式线程安全原子自增实现

Curator基于Zookeeper实现的分布式计数器,Curator recipes包下实现了DistributedAtomicInteger,DistributedAtomicLong等分布式原子自增计数器。

import java.util.concurrent.CountDownLatch;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.atomic.AtomicValue;
import org.apache.curator.framework.recipes.atomic.DistributedAtomicInteger;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
import org.zookeeper_curator.App;
import org.zookeeper_curator.Constant;

/**
 * @author alanchan
 *
 */
public class DistributedAtomicIntegerDemo {
	static CountDownLatch countDownLatch = new CountDownLatch(10);
	static int count = 0;
	// 创建连接实例
	private static CuratorFramework zkClient = null;

	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		zkClient = App.createWithOptions(Constant.zkServerAddress, Constant.retryPolicy, Constant.CONNECTION_TIMEOUT,
				Constant.SESSION_TIMEOUT);
		zkClient.start();

		String path = "/countdown";
//		testIDoublePlus();
		distributedAtomicInteger(zkClient, path);
		CloseableUtils.closeQuietly(zkClient);
	}

	private static void distributedAtomicInteger(CuratorFramework zkClient, String path) throws Exception {
		DistributedAtomicInteger distributedAtomicInteger = new DistributedAtomicInteger(zkClient, path,
				new ExponentialBackoffRetry(1000, 3));
//		1.start
//        start作用是启动一个新线程。
//		当用start()开始一个线程后,线程就进入就绪状态,使线程所代表的虚拟处理机处于可运行状态,这意味着它可以由JVM调度并执行。但是这并不意味着线程就会立即运行。只有当cpu分配时间片时,这个线程获得时间片时,才开始执行run()方法。start()不能被重复调用,它调用run()方法.run()方法是你必须重写的
//		2.run
//		run()就和普通的成员方法一样,可以被重复调用。
//		如果直接调用run方法,并不会启动新线程!程序中依然只有主线程这一个线程,其程序执行路径还是只有一条,还是要顺序执行,还是要等待run方法体执行完毕后才可继续执行下面的代码,这样就没有达到多线程的目的。
//		调用start方法方可启动线程,而run方法只是thread的一个普通方法调用,还是在主线程里执行。
//下面的例子运行结果是10×100,具体看start和run的解释
		//		for (int i = 0; i < 10; i++) {
//
//			Runnable thread = new Runnable() {
//				@Override
//				public void run() {
//					for (int j = 0; j < 100; j++) {
//						try {
//							AtomicValue result = distributedAtomicInteger.increment();
//						} catch (Exception e) {
//							e.printStackTrace();
//						}
//					}
//					countDownLatch.countDown();
//				}
//			};
//			thread.run();
//		}

		for (int i = 0; i < 10; i++) {
			new Thread(() -> {
				for (int j = 0; j < 100; j++) {
					try {
						// 调用add方法自增
						// Add delta to the current value and return the new value information. Remember
						// to always check {@link AtomicValue#succeeded()}.
						AtomicValue<Integer> result = distributedAtomicInteger.add(1);
//						if (result.succeeded()) {
//							System.out.println("current value: " + distributedAtomicInteger.get().postValue());
//						}
					} catch (Exception e) {
						e.printStackTrace();
					}
				}
				countDownLatch.countDown();

			}).start();
		}
		try {
			countDownLatch.await();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		// 查看结果
		System.out.println("多线程自增结果:" + distributedAtomicInteger.get().postValue());
	}

	/**
	 * 测试i++ 是否线程安全
	 * CountDownLatch调用await()方法的线程将一直阻塞等待,直到这个CountDownLatch对象的计数值减到0(每次调用countDown方法计数器减一)为止。
	 * 例子里每个子线程自增100000次后调用countDown()方法将计数器减一,初始化数值10,10个线程全部跑完自增后,主线程await方法不再阻塞,输出count值
	 */
	private static void testIDoublePlus() {
		// 创建10个线程 每个线程内部自增100000次
		for (int i = 0; i < 10; i++) {
			new Thread(() -> {
				for (int j = 0; j < 100000; j++) {
					count++;
				}
				countDownLatch.countDown();
			}).start();
		}
		try {
			countDownLatch.await();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		System.out.println(count);
	}

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116

8、分布式锁示例

1)、实现原理

基于zookeeper瞬时有序节点实现的分布式锁。大致思想即为:每个客户端对某个功能加锁时,在zookeeper上的与该功能对应的指定节点的目录下,生成一个唯一的瞬时有序节点。判断是否获取锁的方式很简单,只需要判断有序节点中序号最小的一个。当释放锁的时候,只需将这个瞬时节点删除即可。同时,其可以避免服务宕机导致的锁无法释放,而产生的死锁问题。

2)、优点

锁安全性高,zk可持久化

3)、缺点

性能开销比较高。因为其需要动态产生、销毁瞬时节点来实现锁功能。

4)、实现

可以直接采用zookeeper第三方库curator即可方便地实现分布式锁。
1、使用原生API存在的问题

  • 会话连接是异步的,需要自己去处理。比如使用 CountDownLatch Watch
  • 需要重复注册,不然就不能生效
  • 开发的复杂性比较高
  • 不支持多节点删除和创建。需要自己去递归

2、Curator主要实现了下面四种锁

  • InterProcessMutex:分布式可重入排它锁
  • InterProcessSemaphoreMutex:分布式排它锁
  • InterProcessReadWriteLock:分布式读写锁
  • InterProcessMultiLock:将多个锁作为单个实体管理的容器
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
import org.zookeeper_curator.App;
import org.zookeeper_curator.Constant;

/**
 * @author alanchan 测试分布式锁
 */
public class DistributedLockDemo {

	private static Integer testData = 0;
	private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();

	public static void main(String[] args) throws Exception {
		String path = "/locks";
  #InterProcessMutex:分布式可重入排它锁
//		testInterProcessMutex(path);
#InterProcessSemaphoreMutex:分布式排它锁
//		testInterProcessSemaphoreMutex(path);  //该方法不能获得期望的结果
#InterProcessReadWriteLock:分布式读写锁
//		pool-1-thread-4获取读锁
//		pool-1-thread-4释放读锁
//		pool-1-thread-2获取读锁
//		pool-1-thread-2释放读锁
//		pool-1-thread-5获取写锁
//		pool-1-thread-5释放写锁
//		pool-1-thread-3获取读锁
//		pool-1-thread-1获取读锁
//		pool-1-thread-3释放读锁
//		pool-1-thread-1释放读锁
		// 验证读写锁
        for (int i = 0; i < 5; i++) {
            EXECUTOR_SERVICE.execute(new InterProcessReadWriteLockRunnable());
        }

//		pool-1-thread-5获取写锁
//		pool-1-thread-5获取读锁,锁降级成功
//		pool-1-thread-5释放读锁
//		pool-1-thread-5释放写锁
//		pool-1-thread-4获取写锁
//		pool-1-thread-4获取读锁,锁降级成功
//		pool-1-thread-4释放读锁
//		pool-1-thread-4释放写锁
//		pool-1-thread-2获取写锁
//		pool-1-thread-2获取读锁,锁降级成功
//		pool-1-thread-2释放读锁
//		pool-1-thread-2释放写锁
//		pool-1-thread-1获取写锁
//		pool-1-thread-1获取读锁,锁降级成功
//		pool-1-thread-1释放读锁
//		pool-1-thread-1释放写锁
//		pool-1-thread-3获取写锁
//		pool-1-thread-3获取读锁,锁降级成功
//		pool-1-thread-3释放读锁
//		pool-1-thread-3释放写锁
		// 验证锁降级
//		for (int i = 0; i < 5; i++) {
//			EXECUTOR_SERVICE.execute(new InterProcessReadWriteLockRunnable1());
//		}
	}

	// 写入数据
	private static void write(String path) throws Exception {
		// 创建读写锁对象, Curator 以公平锁的方式进行实现
		InterProcessReadWriteLock lock = new InterProcessReadWriteLock(getCuratorFramework(), path);
		// 获取写锁(使用 InterProcessMutex 实现, 所以是可以重入的)
		InterProcessLock writeLock = lock.writeLock();
		writeLock.acquire();
		try {
			Thread.sleep(10);
			testData++;
			System.out.println("写入数据,测试:" + testData);
		} finally {
			writeLock.release();
		}
	}

	// 读取数据
	private void read(CuratorFramework zkClient, String path) throws Exception {
		// 创建读写锁对象, Curator 以公平锁的方式进行实现
		InterProcessReadWriteLock lock = new InterProcessReadWriteLock(zkClient, path);
		// 获取读锁(使用 InterProcessMutex 实现, 所以是可以重入的)
		InterProcessLock readLock = lock.readLock();
		readLock.acquire();
		try {
			Thread.sleep(10);
			System.out.println("读取数据,测试:" + testData);
		} finally {
			readLock.release();
		}
	}

	// Result:
//	线程2 获取到锁
//	线程2 再次获取到锁
//	线程2 释放锁
//	线程2  再次释放锁
//	线程1 获取到锁
//	线程1 再次获取到锁
//	线程1 释放锁
//	线程1  再次释放锁
	// 共享可重入锁
	public static void testInterProcessMutex(String path) {
		// 创建分布式锁1
		InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), path);
		// 创建分布式锁2
		InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), path);
		new Thread(new Runnable() {

			@Override
			public void run() {

				try {
					lock1.acquire();
					System.out.println("线程1 获取到锁");
					lock1.acquire();
					System.out.println("线程1 再次获取到锁");

					Thread.sleep(5 * 1000);

					lock1.release();
					System.out.println("线程1 释放锁");
					Thread.sleep(5 * 1000);
					lock1.release();
					System.out.println("线程1 再次释放锁");
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
		}).start();

		new Thread(new Runnable() {
			@Override
			public void run() {

				try {
					lock2.acquire();
					System.out.println("线程2 获取到锁");
					lock2.acquire();
					System.out.println("线程2 再次获取到锁");

					Thread.sleep(5 * 1000);

					lock2.release();
					System.out.println("线程2 释放锁");
					Thread.sleep(5 * 1000);
					lock2.release();
					System.out.println("线程2 再次释放锁");
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
		}).start();
	}

	// result:
	// 线程2 获取到锁
	// 然后处于阻塞状态,其他线程不能获取到锁,也不能释放锁

	// 共享不可重入锁
	public static void testInterProcessSemaphoreMutex(String path) {
		// 创建分布式锁1
		InterProcessLock lock1 = new InterProcessSemaphoreMutex(getCuratorFramework(), path);
		// 创建分布式锁2
		InterProcessLock lock2 = new InterProcessSemaphoreMutex(getCuratorFramework(), path);
		new Thread(new Runnable() {

			@Override
			public void run() {

				try {
					lock1.acquire();
					System.out.println("线程1 获取到锁");
					lock1.acquire();
					System.out.println("线程1 再次获取到锁");

					Thread.sleep(5 * 1000);

					lock1.release();
					System.out.println("线程1 释放锁");
					Thread.sleep(5 * 1000);
					lock1.release();
					System.out.println("线程1 再次释放锁");
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
		}).start();

		new Thread(new Runnable() {
			@Override
			public void run() {

				try {
					lock2.acquire();
					System.out.println("线程2 获取到锁");
					lock2.acquire();
					System.out.println("线程2 再次获取到锁");

					Thread.sleep(5 * 1000);

					lock2.release();
					System.out.println("线程2 释放锁");
					Thread.sleep(5 * 1000);
					lock2.release();
					System.out.println("线程2 再次释放锁");
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
		}).start();
	}

	// 共享不可重入锁
	public InterProcessLock getInterProcessSemaphoreMutex(CuratorFramework zkClient, String path) {
		InterProcessLock ipsmLock = new InterProcessSemaphoreMutex(zkClient, path);
		return ipsmLock;
	}

	// 共享可重入锁
	public InterProcessLock getInterProcessMutex(CuratorFramework zkClient, String path) {
		InterProcessLock ipLock = new InterProcessMutex(zkClient, path);
		return ipLock;
	}

	// 共享可重入读锁
	public InterProcessLock getInterProcessReadLock(CuratorFramework zkClient, String path) {
		InterProcessReadWriteLock ipReadWriteLock = new InterProcessReadWriteLock(zkClient, path);
		InterProcessLock readLock = ipReadWriteLock.readLock();
		return readLock;
	}

	// 共享可重入写锁
	public InterProcessLock getInterProcessWriteLock(CuratorFramework zkClient, String path) {
		InterProcessReadWriteLock ipReadWriteLock = new InterProcessReadWriteLock(zkClient, path);
		InterProcessLock writeLock = ipReadWriteLock.writeLock();
		return writeLock;
	}

	private static CuratorFramework getCuratorFramework() {
		CuratorFramework client = App.createWithOptions(Constant.zkServerAddress, Constant.retryPolicy,
				Constant.CONNECTION_TIMEOUT, Constant.SESSION_TIMEOUT);
		// 启动客户端
		client.start();
		System.out.println("zookeeper 启动成功");
		return client;
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210
  • 211
  • 212
  • 213
  • 214
  • 215
  • 216
  • 217
  • 218
  • 219
  • 220
  • 221
  • 222
  • 223
  • 224
  • 225
  • 226
  • 227
  • 228
  • 229
  • 230
  • 231
  • 232
  • 233
  • 234
  • 235
  • 236
  • 237
  • 238
  • 239
  • 240
  • 241
  • 242
  • 243
  • 244
  • 245
  • 246
  • 247
  • 248
  • 249
  • 250
  • 251
  • 252
  • 253
  • 254

读写锁

import java.nio.charset.StandardCharsets;
import java.util.Random;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
import org.zookeeper_curator.App;
import org.zookeeper_curator.Constant;

/**
 * @author alanchan
 * 测试读写锁
 */
public class InterProcessReadWriteLockRunnable implements Runnable {

	@Override
	public void run() {
		CuratorFramework zkClient = getCuratorFramework();
		String path = "/locks";
		// 创建InterProcessReadWriteLock实例,用于提供分布式锁的功能
		InterProcessReadWriteLock readWriteLock = new InterProcessReadWriteLock(zkClient, path,
				"分布式读写锁".getBytes(StandardCharsets.UTF_8));

		// 根据随机数来决定获取写锁还是读锁
		Random random = new Random();
		try {
			if (random.nextInt(10000) > 5000) {
				// 获取写锁
				readWriteLock.writeLock().acquire();
				System.out.println(Thread.currentThread().getName() + "获取写锁");
				Thread.sleep(2);
				System.out.println(Thread.currentThread().getName() + "释放写锁");
				// 释放写锁
				readWriteLock.writeLock().release();
			} else {
				// 获取读锁
				readWriteLock.readLock().acquire();
				System.out.println(Thread.currentThread().getName() + "获取读锁");
				Thread.sleep(2);
				System.out.println(Thread.currentThread().getName() + "释放读锁");
				// 释放读锁
				readWriteLock.readLock().release();
			}
		} catch (Exception e) {
			e.printStackTrace();
		}

	}

	private static CuratorFramework getCuratorFramework() {
		CuratorFramework client = App.createWithOptions(Constant.zkServerAddress, Constant.retryPolicy,
				Constant.CONNECTION_TIMEOUT, Constant.SESSION_TIMEOUT);
		// 启动客户端
		client.start();
		System.out.println("zookeeper 启动成功");
		return client;
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57

锁降级

import java.nio.charset.StandardCharsets;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
import org.zookeeper_curator.App;
import org.zookeeper_curator.Constant;

/**
 * @author alanchan
 * 锁降级
 */
public class InterProcessReadWriteLockRunnable1 implements Runnable {

	private static CuratorFramework getCuratorFramework() {
		CuratorFramework client = App.createWithOptions(Constant.zkServerAddress, Constant.retryPolicy,
				Constant.CONNECTION_TIMEOUT, Constant.SESSION_TIMEOUT);
		// 启动客户端
		client.start();
		System.out.println("zookeeper 启动成功");
		return client;
	}

	@Override
	public void run() {
		CuratorFramework zkClient = getCuratorFramework();
		String path = "/locks";
		// 创建InterProcessReadWriteLock实例,用于提供分布式锁的功能
		InterProcessReadWriteLock readWriteLock = new InterProcessReadWriteLock(zkClient, path,
				"分布式读写锁".getBytes(StandardCharsets.UTF_8));
		try {
			// 获取写锁
			readWriteLock.writeLock().acquire();
			System.out.println(Thread.currentThread().getName() + "获取写锁");
			Thread.sleep(2000);
			// 锁降级
			readWriteLock.readLock().acquire();
			System.out.println(Thread.currentThread().getName() + "获取读锁,锁降级成功");
			Thread.sleep(2000);
			// 释放读锁
			System.out.println(Thread.currentThread().getName() + "释放读锁");
			readWriteLock.readLock().release();
			// 释放写锁
			System.out.println(Thread.currentThread().getName() + "释放写锁");
			readWriteLock.writeLock().release();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49

以上简单的介绍了一下zookeeper的java三种客户端介绍-Curator(crud、事务操作、监听、分布式计数器、分布式锁)。具体深入的使用可能需要进一步的了解。

注:本文转载自blog.csdn.net的一瓢一瓢的饮 alanchan的文章"https://blog.csdn.net/chenwewi520feng/article/details/130288718"。版权归原作者所有,此博客不拥有其著作权,亦不承担相应法律责任。如有侵权,请联系我们删除。
复制链接
复制链接
相关推荐
发表评论
登录后才能发表评论和回复 注册

/ 登录

评论记录:

未查询到任何数据!
回复评论:

分类栏目

后端 (14832) 前端 (14280) 移动开发 (3760) 编程语言 (3851) Java (3904) Python (3298) 人工智能 (10119) AIGC (2810) 大数据 (3499) 数据库 (3945) 数据结构与算法 (3757) 音视频 (2669) 云原生 (3145) 云平台 (2965) 前沿技术 (2993) 开源 (2160) 小程序 (2860) 运维 (2533) 服务器 (2698) 操作系统 (2325) 硬件开发 (2492) 嵌入式 (2955) 微软技术 (2769) 软件工程 (2056) 测试 (2865) 网络空间安全 (2948) 网络与通信 (2797) 用户体验设计 (2592) 学习和成长 (2593) 搜索 (2744) 开发工具 (7108) 游戏 (2829) HarmonyOS (2935) 区块链 (2782) 数学 (3112) 3C硬件 (2759) 资讯 (2909) Android (4709) iOS (1850) 代码人生 (3043) 阅读 (2841)

热门文章

101
推荐
关于我们 隐私政策 免责声明 联系我们
Copyright © 2020-2025 蚁人论坛 (iYenn.com) All Rights Reserved.
Scroll to Top