IOTXING

记录技术学习之路

0%

Curator框架-InterProcessMutex

InterProcessMutex 分布式锁

InterProcessMutex是常用的一个使用ZK的分布式锁,它会在我们指定的前缀下面创建类型为有序临时的临时节点,然后序号最靠前的节点会获取到锁。
如果某个持有锁的线程挂了之后,对应的节点就会被删除,如果这时候有线程在等待锁,就会获取到锁

类定义

1
public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex>

变量

1
2
3
4
5
private final LockInternals internals;    //一些内部实现,例如路径,锁名,客户端等
private final String basePath;
//用来存放线程和锁数据的关系,其中的LockData下面有
private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();
private static final String LOCK_NAME = "lock-"; //锁的前缀
1
2
3
4
5
6
7
8
9
10
11
12
private static class LockData
{
final Thread owningThread; //拥有锁的线程
final String lockPath; //锁的路径
final AtomicInteger lockCount = new AtomicInteger(1); //锁的数量

private LockData(Thread owningThread, String lockPath)
{
this.owningThread = owningThread;
this.lockPath = lockPath;
}
}

internalLock (实际获取锁的逻辑)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private boolean internalLock(long time, TimeUnit unit) throws Exception
{
/*
Note on concurrency: a given lockData instance
can be only acted on by a single thread so locking isn't necessary
*/

Thread currentThread = Thread.currentThread(); //取到当前变量

LockData lockData = threadData.get(currentThread); //根据当前变量取到对应的锁
if ( lockData != null ) //如果已经有锁存在了,就对锁的数量加一
{
// re-entering
lockData.lockCount.incrementAndGet(); //使用了原子操作,所以同时只会有一个线程进行加一
return true;
}

String lockPath = internals.attemptLock(time, unit, getLockNodeBytes()); //尝试获取锁
if ( lockPath != null ) //如果抢到了锁,就返回,然后存在本地的map中
{
LockData newLockData = new LockData(currentThread, lockPath);
threadData.put(currentThread, newLockData);
return true;
}

return false; //没有拿到锁
}

attempLock (尝试加锁的逻辑)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
{
final long startMillis = System.currentTimeMillis(); //获取当前的开始时间
final Long millisToWait = (unit != null) ? unit.toMillis(time) : null; //获取需要等待的时间
final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
int retryCount = 0; //重试次数

String ourPath = null;
boolean hasTheLock = false; //是否已经拥有锁
boolean isDone = false; //是否已经结束
while ( !isDone )
{
isDone = true;

try
{ //创建锁,并把锁的path返回来
ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
//节点创建完之后,循环等待获取锁,如果没有设置等待时间,就一直等待,如果设置了等待时间,在超时后会删除节点
hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
}
catch ( KeeperException.NoNodeException e )
{
// gets thrown by StandardLockInternalsDriver when it can't find the lock node
// this can happen when the session expires, etc. So, if the retry allows, just try it all again
//如果创建节点失败或者没有找到对应的节点,重试创建
if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
{
isDone = false;
}
else
{
throw e;
}
}
}

if ( hasTheLock )
{
return ourPath;
}

return null;
}

createsTheLock(创建节点)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception
{
String ourPath;
if ( lockNodeBytes != null )
{//尝试创建一个EPHEMERAL_SEQUENTIAL节点,也就是临时有序节点,如果客户端端口连接,就会被删除
ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
}
else
{
ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
}
return ourPath;
}

//上面使用了一个函数creatingParentContainersIfNeeded,这个函数会在父节点不存在的时候,自动创建父节点,如果不创建父节点的话,在添加子节点时会出错。
//会根据我们传入的path作为父节点,然后生成一个子节点的路径,创建子节点,并返回

internalLockLoop(创建节点后,自旋等待获取锁)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
{
boolean haveTheLock = false; //是否含有锁
boolean doDelete = false; //是否已经删除
try
{
if ( revocable.get() != null )
{
client.getData().usingWatcher(revocableWatcher).forPath(ourPath); //监视路径
}

while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock ) //判断客户端状态
{
List<String> children = getSortedChildren(); //获取到所有的锁,并排序
String sequenceNodeName = ourPath.substring(basePath.length() + 1); // 截取出来我们定义的锁路径

PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases); //尝试获取锁
if ( predicateResults.getsTheLock() )
{
haveTheLock = true; //表示拿到锁了
}
else
{
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();

synchronized(this) //如果没有拿到锁,就去监听前面一个子节点,这里用到了zk的wather,会另外一篇文章分析。
{
try
{
// use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
client.getData().usingWatcher(watcher).forPath(previousSequencePath);
if ( millisToWait != null )
{
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if ( millisToWait <= 0 )
{
doDelete = true; // 如果等待时间到了,还是没有拿到锁,就把我们建立的node删掉
break;
}

wait(millisToWait);
}
else
{
wait(); //如果没有设置拿锁的等待时间,就一直等
}
}
catch ( KeeperException.NoNodeException e )
{
// it has been deleted (i.e. lock released). Try to acquire again
}
}
}
}
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e); //检查是否中断
doDelete = true; //如果中断了,就把节点删掉
throw e;
}
finally
{
if ( doDelete )
{
deleteOurPath(ourPath);
}
}
return haveTheLock;
}

getsTheLock (已经建立了节点,判断是否获取锁)

1
2
3
4
5
6
7
8
9
10
11
public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception
{
int ourIndex = children.indexOf(sequenceNodeName);
validateOurIndex(sequenceNodeName, ourIndex); //校验我们的节点path是否存在,如果没有找到,ourIndex会是-1

boolean getsTheLock = ourIndex < maxLeases; //maxLeases 默认为1,也就是只有排序最靠前的子节点,才能拿到锁
//需要监听的路径,如果已经拿到锁了,不需要监视路径,如果没有拿到,则监视当前节点的前面一个节点
String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);
//组装起来,扔回去
return new PredicateResults(pathToWatch, getsTheLock);
}

release (释放锁)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public void release() throws Exception
{
/*
Note on concurrency: a given lockData instance
can be only acted on by a single thread so locking isn't necessary
*/

Thread currentThread = Thread.currentThread(); //获取当前线程
LockData lockData = threadData.get(currentThread); //从map中取到当前线程的锁
if ( lockData == null ) //如果没有锁,抛出异常
{
throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
}
//进行原子减操作
int newLockCount = lockData.lockCount.decrementAndGet();
//如果大于0,返回
if ( newLockCount > 0 )
{
return;
}
//如果减后小于0,说明已经多释放了一次,抛异常
if ( newLockCount < 0 )
{
throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
}
//如果为0,说明当前是最后一个锁了,准备释放锁
try
{
internals.releaseLock(lockData.lockPath);
}
//把节点删掉
finally
{
threadData.remove(currentThread);
}
}
1
2
3
4
5
void releaseLock(String lockPath) throws Exception
{
revocable.set(null); //把存有的锁的信息删掉
deleteOurPath(lockPath); //把路径删掉
}

获取锁的时候的调用图

acquire

释放锁时候的流程

release