简介

CAP理论

在理论计算机科学中,CAP定理(CAP theorem),又被称作布鲁尔定理(Brewer’s theorem),它指出对于一个分布式计算系统来说,不可能同时满足以下三点:

  • 一致性(Consistency) (等同于所有节点访问同一份最新的数据副本)
  • 可用性(Availability)(每次请求都能获取到非错的响应——但是不保证获取的数据为最新数据)
  • 分区容错性(Partition tolerance)(以实际效果而言,分区相当于对通信的时限要求。系统如果不能在时限内达成数据一致性,就意味着发生了分区的情况,必须就当前操作在C和A之间做出选择。)

目前很多大型网站及应用都是分布式部署的,分布式场景中的数据一致性问题一直是一个比较重要的话题。基于 CAP理论,很多系统在设计之初就要对这三者做出取舍。在互联网领域的绝大多数的场景中,都需要牺牲强一致性来换取系统的高可用性,系统往往只需要保证最终一致性。

问题引入

对于锁大家肯定不会陌生,在Java中synchronized关键字和ReentrantLock可重入锁在我们的代码中是经常见的,一般我们用其在多线程环境中控制对资源的并发访问,但是随着分布式的快速发展,本地的加锁往往不能满足我们的需要,在我们的分布式环境中上面加锁的方法就会失去作用。

在分布式环境下,一个方法在同一个时间如果只能被一个节点的一个线程运行的话,节点与节点之间由于之间互相无法得知对方的相关信息,如果没有进行相关的同步策略的话,非常容易得到错误的结果。

在许多的场景中,我们为了保证数据的最终一致性,需要很多的技术方案来支持,比如分布式事务分布式锁等。很多时候我们需要保证一个方法在同一时间内只能被同一个线程执行。在单机环境中,通过 Java 提供的并发 API 我们可以解决,但是在分布式环境下,就没有那么简单啦。

  • 分布式与单机情况下最大的不同在于其不是多线程而是多进程
  • 多线程由于可以共享堆内存,因此可以简单的采取内存作为标记存储位置。而进程之间甚至可能都不在同一台物理机上,因此需要将标记存储在一个所有进程都能看到的地方。

例如:

  • 效率:不同节点重复相同的工作,这些工作会浪费资源。比如用户付了钱之后有可能不同节点会发出多封短信。
  • 正确性:如果两个节点在同一条数据上面操作,比如多个节点机器对同一个订单操作不同的流程有可能会导致该笔订单最后状态出现错误,造成损失。

介绍

  • 当在分布式模型下,数据只有一份(或有限制),此时需要利用锁的技术控制某一时刻修改数据的进程数。
  • 与单机模式下的锁不仅需要保证进程可见,还需要考虑进程与锁之间的网络问题。主要就是需要考虑到网络的延时和不可靠
  • 分布式锁还是可以将标记存在内存,只是该内存不是某个进程分配的内存而是公共内存如 Redis、Memcache。至于利用数据库、文件等做锁与单机的实现是一样的,只要保证标记能互斥就行。

要求

  • 可以保证在分布式部署的应用集群中,同一个方法在同一时间只能被一台机器上的一个线程执行。
  • 这把锁要是一把可重入锁(避免死锁)
  • 这把锁最好是一把阻塞锁(根据业务需求考虑要不要这条)
  • 这把锁最好是一把公平锁(根据业务需求考虑要不要这条)
  • 有高可用的获取锁和释放锁功能
  • 获取锁和释放锁的性能要好

常见的分布式锁

我们一般实现分布式锁有以下几个方式:

  • MySql
  • ZooKeeper
  • Redis
  • 自研分布式锁:如谷歌的Chubby。

使用MySQL实现分布式锁

建立数据库

MySQL分布式锁的数据库实现原理较为简单,简单实现逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
CREATE TABLE `resourceLock` (
`id` INT(11) UNSIGNED NOT NULL AUTO_INCREMENT,
`resource_name` VARCHAR(128) NOT NULL DEFAULT '' COMMENT '资源名字',
`node_info` VARCHAR(128) DEFAULT NULL COMMENT '机器信息',
`count` INT(11) NOT NULL DEFAULT '0' COMMENT '锁的次数,统计可重入锁',
`desc` VARCHAR(128) DEFAULT NULL COMMENT '额外的描述信息',
`update_time` TIMESTAMP NULL DEFAULT NULL COMMENT '更新时间',
`create_time` TIMESTAMP NULL DEFAULT NULL COMMENT '创建时间',
PRIMARY KEY (`id`),
UNIQUE KEY `unq_resource` (`resource_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4

当我们想要执行某个方法时,我们使用当前方法名向表中插入数据:

1
INSERT INTO resourceLock (method_name, desc) VALUES ('methodName', '测试的methodName');

阻塞式获取锁

lock一般是阻塞式的获取锁,意思就是不获取到锁誓不罢休,那么我们可以写一个死循环来执行其操作:

1
2
3
4
5
6
7
8
9
public void lock(){
while(true){
if(mysqlLock.lock(resource)){
return;
}
//休眠3ms后重试
LockSupport.parkNanos(1000 * 1000 * 3);
}
}

因为我们对method_name做了唯一性约束,这里如果有多个请求同时提交到数据库的话,数据库会保证只有一个操作可以成功,那么我们就可以认为操作成功的那个线程获得了该方法的锁,可以执行方法体内容。

mysqlLock.lock内部是一个sql,为了达到可重入锁的效果那么我们应该先进行查询,如果有值,那么需要比较node_info是否一致,这里的node_info可以用机器IP和线程名字来表示,如果一致那么就加可重入锁count的值,如果不一致那么就返回false。如果没有值那么直接插入一条数据。伪代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//添加事务
@Transaction
public void lock() {
if(select * from resourceLock where resource_name = 'xxx' for update; 查看是否有对应数据){
if(currentNodeInfo == resultNodeInfo) {
//如果锁的持有者信息一致,认为是可重入的
update resourceLock set count = count + 1 where resource_name = 'xxx';
return true;
//将count加一,成功获取锁。
}
else {
return false;
//否则说明有其他节点其他线程已经持有了锁,获取失败
} else {
insert into resourceLock;
//说明当前数据库中没有锁的相关信息,暂无节点线程拥有锁,此时直接插入记录获取锁
}
}
}

需要注意的是这一段代码需要加事务,必须要保证这一系列操作的原子性。

非阻塞式获取锁

tryLock()是非阻塞获取锁,如果获取不到那么就会马上返回,代码可以如下:

1
2
3
public boolean tryLock(){
return mysqlLock.lock();
}

此外我们还可以设置超时时间,尝试获取指定的timeout时如果还不能获取锁,我们才认为获取失败,在此之前我们通过自旋的方式不断尝试获取锁。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public boolean tryLock(long timeout) {
long endTimeout = System.currentTimeMillis() + timeout;
//直接确认截止时间,超出截止时间还未获取到锁就认为获取失败
while(true) {
if(mysqlLock.lock()){
return true;
//获取到锁了,直接返回true
}
if(System.currentTimeMillis() > endTimeout) {
return false;
//超时,获取失败
}
}
}

mysqlLock.lock和上面一样,但是要注意的是select … for update这个是阻塞的获取行锁,如果同一个资源并发量较大还是有可能会退化成阻塞的获取锁。

释放锁

根据可重入锁的规则,当相同节点线程访问方法时count计数加一,当其离开方法时,count计数减一,count计数减一的操作只有当前持有锁的节点才能执行,当count归零的时候我们认为目前已经没有相关节点线程在持有着锁,也就是说我们应当从数据库中删除对应的记录,将锁释放。如下为unlock方法的伪代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Transaction
public boolean unlock() {
if(select * from resourceLock where resource_name = 'xxx' for update; 查看是否有对应数据) {
if(currentNodeInfo == resultInfo) {
//如果当前操作者就是锁的持有者
if(count > 1) {
update count = count - 1; //将count减一处理
} else {
delete; //如果减一后归零了,我们需要释放锁,即将当前记录删除
}
} else {
return false; //如果解锁操作不是锁的持有者执行的,直接返回false,表示解锁失败
}
} else {
return false; //如果当前锁未被任何节点持有,直接返回false,没有查询到锁
}
}

锁超时

我们有可能会遇到我们的机器节点挂了,那么这个锁就不会得到释放,我们可以启动一个定时任务,通过计算一般我们处理任务的一般的时间,比如是5ms,那么我们可以稍微扩大一点,当这个锁超过20ms没有被释放我们就可以认定是节点挂了然后将其直接释放。

PS: 但是一般任务的处理时间也是很模糊的,处理锁超时等情况需要使用守护线程等方式来完成,稍后在Redis块会提及

存在问题

  • 因为是基于数据库实现的,数据库的可用性和性能将直接影响分布式锁的可用性及性能,所以,数据库需要双机部署、数据同步、主备切换;

  • 在实施的过程中会遇到各种不同的问题,为了解决这些问题,实现方式将会越来越复杂;依赖数据库需要一定的资源开销,性能问题需要考虑。

使用Redis实现分布式锁

实现命令

在Redis中我们使用setNx(set if not exist)方法,如果不存在则更新,其可以很好的用来实现我们的分布式锁。对于某个资源加锁我们只需要

1
setNx resourceName value

但是由于考虑到分布式锁的运行过程中存在锁超时的问题,如果运行过程中节点发生宕机,但是其所持有的锁还存在数据库中没有来得及释放,就会发生严重的死锁问题,其他需要获取锁的线程都会被挂起等待,而锁由于节点的意外退出已经无法释放了,因此我们使用Redis的过期机制也就是Expire操作,为每个键值对加上过期时间,这样即使线程意外退出,也不会影响锁的最终释放,这个问题我们稍后再说。

所以最终的操作就会是:

1
set resourceName value ex 5 nx

PS: 加入过期时间需要和setNx同一个原子操作,在Redis2.8之前我们需要使用Lua脚本达到我们的目的,但是redis2.8之后redis支持nx和ex操作是同一原子操作。

如果需要释放锁的话我们也只需要进行delete操作将对应的键值对删除即可:

1
delete resourceName

锁超时和误解除

如上文提及的,当我们处理对应分布式锁时,我们可以设置超时时间的方式来确保不会发生死锁,由于节点意外宕机没有及时释放锁导致其他节点无法获取锁。但是节点的超时时间如果设置的不合适会带来更多的问题,如果超时时间设置的太短,可能当前的节点线程仍然持有着锁,却由于超时时间已到导致锁被误解除,极有可能导致其他节点线程的进入。如果超时时间设置的太长了,虽然我们可以保证节点线程当前持有的锁能够正常的由自身释放,却可能导致节点宕机后锁超时的恢复时间过长,影响系统的性能。

如图,就是锁误解除的情况示意图,A节点在持有锁的过程中仍然在执行业务,但是在Redis中的记录却因为超时时间已经到了而直接解除,如果这个时候B节点尝试获取锁,那B节点完全是有能力直接拿到锁的,这样的话会发生不一致的问题,导致数据的错误。

那么我们应该使用怎样的方式来解决呢?在这里我们引入看门狗Watchdog的机制,当我们Redis数据库内的记录即将到达过期时间的时候,我们使用看门狗机制给这条记录续上新的过期时间,可以认为是续租,如果节点线程发生了宕机也没有相关的问题,当记录到期之后Redis也会很顺利的删除掉对应的记录,不会影响到系统的正常运行。

例如在Redission中就直接集成了看门狗的机制:

  1. 只有在未指定锁超时时间时才会使用看门狗;
  2. 看门狗默认续租时间是 10s 左右,internalLockLeaseTime / 3
  3. 可以通过 Config 统一设置看门狗的时间,设置 lockWatchdogTimeout 参数即可。

机制如图:

这样的话,我们的锁超时和锁误解除问题都得到了解决:

可重入锁

除了在Redis内部我们通过<K, V>键值对的方式我们可以存储可重入锁的count计数器相关信息,此外我们也未必需要将数据存储在数据库内,由于Redis的分布式锁一次只能被一个确定的节点线程拥有,所以我们完全可以将计数器等相关信息存储在服务器本地,由于多线程的关系,我们可以直接使用ThreadLocal等线程隔离的容器来进行存储。

在本地记录记录重入次数,如 Java 中使用 ThreadLocal 进行重入次数统计,简单示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private static ThreadLocal<Map<String, Integer>> LOCKERS = ThreadLocal.withInitial(HashMap::new);
// 加锁
public boolean lock(String key) {
Map<String, Integer> lockers = LOCKERS.get();
if (lockers.containsKey(key)) {
lockers.put(key, lockers.get(key) + 1);
return true;
} else {
if (SET key uuid NX EX 30) {
lockers.put(key, 1);
return true;
}
}
return false;
}
// 解锁
public void unlock(String key) {
Map<String, Integer> lockers = LOCKERS.get();
if (lockers.getOrDefault(key, 0) <= 1) {
lockers.remove(key);
DEL key
} else {
lockers.put(key, lockers.get(key) - 1);
}
}

使用ZooKeeper实现分布式锁

实现方式

ZooKeeper是一个为分布式应用提供一致性服务的开源组件,它内部是一个分层的文件系统目录树结构,规定同一个目录下只能有一个唯一文件名。基于ZooKeeper实现分布式锁的步骤如下:

  • 创建一个目录mylock;
  • 线程A想获取锁就在mylock目录下创建临时顺序节点;
  • 获取mylock目录下所有的子节点,然后获取比自己小的兄弟节点,如果不存在,则说明当前线程顺序号最小,获得锁;
  • 线程B获取所有节点,判断自己不是最小节点,设置监听比自己次小的节点;
  • 线程A处理完,删除自己的节点,线程B监听到变更事件,判断自己是不是最小的节点,如果是则获得锁。

具体示意图如下:

我们以某个资源为目录,然后这个目录下面的节点就是我们需要获取锁的客户端,未获取到锁的客户端注册需要注册Watcher到上一个客户端。如图,/lock是我们用于加锁的目录,/resource_name是我们锁定的资源,其下面的节点按照我们加锁的顺序排列。

Curator使用

Curator封装了Zookeeper底层的Api,使我们更加容易方便的对Zookeeper进行操作,并且它封装了分布式锁的功能,这样我们就不需要再自己实现了。

Curator实现了可重入锁(InterProcessMutex),也实现了不可重入锁(InterProcessSemaphoreMutex)。在可重入锁中还实现了读写锁。

例如:

InterProcessMutex是Curator实现的可重入锁,我们可以通过下面的一段代码实现我们的可重入锁:

1
2
3
4
5
6
7
8
9
10
public static void main(String[] args) throws Exception {
InterProcessMutex lock = new InterProcessMutex(client, lock_path);
try {
lock.acquire();
//do something 执行相关业务逻辑.....
//.......
} finally {
lock.release();
}
}

我们利用acuire进行加锁,release进行解锁。

加锁的流程具体如下:

  1. 首先进行可重入的判定:这里的可重入锁记录在ConcurrentMap<Thread, LockData> threadData这个Map里面,如果threadData.get(currentThread)是有值的那么就证明是可重入锁,然后记录就会加1。我们之前的Mysql其实也可以通过这种方法去优化,可以不需要count字段的值,将这个维护在本地可以提高性能。
  2. 然后在我们的资源目录下创建一个节点:比如这里创建一个/0000000002这个节点,这个节点需要设置为EPHEMERAL_SEQUENTIAL也就是临时节点并且有序。
  3. 获取当前目录下所有子节点,判断自己的节点是否位于子节点第一个。
  4. 如果是第一个,则获取到锁,那么可以返回。
  5. 如果不是第一个,则证明前面已经有人获取到锁了,那么需要获取自己节点的前一个节点。/0000000002的前一个节点是/0000000001,我们获取到这个节点之后,再上面注册Watcher(这里的watcher其实调用的是object.notifyAll(),用来解除阻塞)。
  6. object.wait(timeout)或object.wait():进行阻塞等待这里和我们第5步的watcher相对应。

解锁的具体流程:

  1. 首先进行可重入锁的判定:如果有可重入锁只需要次数减1即可,减1之后加锁次数为0的话继续下面步骤,不为0直接返回。
  2. 删除当前节点。
  3. 删除threadDataMap里面的可重入锁的数据。

锁超时?

Zookeeper不需要配置锁超时,由于我们设置节点是临时节点,我们的每个机器维护着一个ZK的session,通过这个session,ZK可以判断机器是否宕机。如果我们的机器挂掉的话,那么这个临时节点对应的就会被删除,所以我们不需要关心锁超时。