一、基于Zookeeper实现分布式锁
通过ZooKeeper的节点创建与删除来实现分布式锁,使用临时节点(防止死锁)和顺序节点(可以利用锁释放的事件监听机制,来实现阻塞监听式的分布式锁)来管理锁的获取与释放。
也可通过使用ZooKeeper的一个客户端Curator,Curator提供的InterProcessMutex是分布式锁的实现,acquire方法用于获取锁,release方法用于释放锁。
实现原理:Zookeeper是一个分布式协调服务,分布式协调主要是来解决分布式系统中多个应用之间的数据一致性,Zookeeper内部的数据存储方式类似于文件目录形式的存储结构。在Zookeeper中的指定路径下创建临时节点,然后客户端根据当前路径下的节点状态来判断是否加锁成功。
获取锁:
- 创建一个临时序列节点,代表当前请求的锁。
- 获取当前根节点下的所有子节点并进行排序。
- 检查当前节点是否是最小的节点,如果是,则获取锁成功。
- 如果不是,则设置一个观察者,等待前一个节点的删除。
释放锁: 删除当前的临时节点,从而释放锁。
Java代码示例:
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.7.1</version> <!-- 根据需要选择合适的版本 -->
</dependency>
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
public class ZkDistributedLock {
private static final String LOCK_ROOT = "/locks"; // 锁的根节点路径
private final ZooKeeper zooKeeper; // ZooKeeper客户端
private String lockNode; // 当前锁节点
// 构造函数,初始化ZooKeeper连接并创建锁根节点
public ZkDistributedLock(String zkAddress) throws IOException {
this.zooKeeper = new ZooKeeper(zkAddress, 3000, null);
// 创建根节点(持久节点),如果不存在则创建
try {
if (zooKeeper.exists(LOCK_ROOT, false) == null) {
zooKeeper.create(LOCK_ROOT, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
// 获取锁
public void lock() throws KeeperException, InterruptedException {
// 创建一个临时序列节点,表示当前锁
lockNode = zooKeeper.create(LOCK_ROOT + "/lock-", new byte[0],
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
// 竞争锁
while (true) {
// 获取所有子节点并排序
List<String> children = zooKeeper.getChildren(LOCK_ROOT, false);
Collections.sort(children);
// 如果当前节点是最小的节点,则获取锁成功
if (lockNode.equals(LOCK_ROOT + "/" + children.get(0))) {
System.out.println("Lock acquired: " + lockNode);
return; // 获取锁成功
}
// 否则,等待前一个节点被删除
int index = Collections.binarySearch(children, lockNode.substring(LOCK_ROOT.length() + 1));
String prevNode = children.get(index - 1);
Stat stat = zooKeeper.exists(LOCK_ROOT + "/" + prevNode, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeDeleted) {
synchronized (ZkDistributedLock.this) {
ZkDistributedLock.this.notify(); // 通知等待的线程
}
}
}
});
// 如果前一个节点仍然存在,则等待它被删除
if (stat != null) {
synchronized (this) {
wait(); // 等待锁释放
}
}
}
}
// 释放锁
public void unlock() {
try {
zooKeeper.delete(lockNode, -1); // 删除临时节点
System.out.println("Lock released: " + lockNode);
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception {
// 创建ZooKeeper分布式锁实例
ZkDistributedLock lock = new ZkDistributedLock("localhost:2181");
// 获取锁
lock.lock();
// 进行临界区操作
System.out.println("Performing critical section operation...");
Thread.sleep(2000); // 模拟操作
// 释放锁
lock.unlock();
}
}