ZooKeeper是⼀个典型的发布/订阅模式的分布式数据管理与协调框架,我们可以使⽤它来进⾏分布式数据的发布与订阅。另⼀⽅⾯,通过对ZooKeeper中丰富的数据节点类型进⾏交叉使⽤,配合Watcher事件通知机制,可以⾮常⽅便地构建⼀系列分布式应⽤中都会涉及的核⼼功能,如数据发布/订阅、命名服务、集群管理、Master选举、分布式锁和分布式队列等。那接下来就针对这些典型的分布式应⽤场景来做下介绍

Zookeeper的两⼤特性

  1. 客户端如果对Zookeeper的数据节点注册Watcher监听,那么当该数据节点的内容或是其⼦节点
    列表发⽣变更时,Zookeeper服务器就会向订阅的客户端发送变更通知。

  2. 对在Zookeeper上创建的临时节点,⼀旦客户端与服务器之间的会话失效,那么临时节点也会被⾃动删除

利⽤其两⼤特性,可以实现集群机器存活监控系统,若监控系统在/clusterServers节点上注册⼀个Watcher监听,那么但凡进⾏动态添加机器的操作,就会在/clusterServers节点下创建⼀个临时节点:/clusterServers/[Hostname],这样,监控系统就能够实时监测机器的变动情况。

Java客户端的服务器动态上下线监听

分布式系统中,主节点会有多台,主节点可能因为任何原因出现宕机或者下线,⽽任意⼀台客户端都要能实时感知到主节点服务器的上下线。


  • 服务端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.lagou.zk.test;

import org.I0Itec.zkclient.ZkClient;

public class ServerMain {
private ZkClient zkClient = null;
//获取到zk对象
private void connectZK(){
zkClient = new ZkClient("Linux121:2181,Linux122:2181,Linux123:2181");
if(!zkClient.exists("/servers")){
zkClient.createPersistent("/servers");
}
}
//注册服务端信息到zk节点
private void registerServerInfo(String ip,String port){
//创建临时顺序节点
final String path = zkClient.createEphemeralSequential("/servers/server", ip +":"+port);
System.out.println("---->>> 服务器注册成功,ip="+ip+";port ="+port+";节点路径信息="+path);
}
public static void main(String[] args) {
final ServerMain server = new ServerMain();
server.connectZK();
server.registerServerInfo(args[0],args[1] );
//启动⼀个服务线程提供时间查询
new TimeServer(Integer.parseInt(args[1])).start();
}
}
  • 服务端提供时间查询的线程类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package com.lagou.zk.test;

import java.io.IOException;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Date;

public class TimeServer extends Thread {
private int port=0;
public TimeServer(int port) {
this.port = port;
}
@Override
public void run() {
//启动serversocket监听⼀个端⼝
try {
final ServerSocket serverSocket = new ServerSocket(port);
while(true){
final Socket socket = serverSocket.accept();
final OutputStream out = socket.getOutputStream();
out.write(new Date().toString().getBytes());
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
  • client端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package com.lagou.zk.test;

import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;

// 注册监听zk指定⽬录,
//维护⾃⼰本地⼀个servers信息,收到通知要进⾏更新
//发送时间查询请求并接受服务端返回的数据
public class Client {
//获取zkclient
ZkClient zkClient = null;
//维护⼀个serversi 信息集合
ArrayList<String> infos = new ArrayList<String>();
private void connectZk() {
// 创建zkclient
zkClient = new ZkClient("Linux121:2181,Linux122:2181");
//第⼀次获取服务器信息,所有的⼦节点
final List<String> childs = zkClient.getChildren("/servers");
for (String child : childs) {
//存储着ip+port
final Object o = zkClient.readData("/servers/" + child);
infos.add(String.valueOf(o));
}
//对servers⽬录进⾏监听
zkClient.subscribeChildChanges("/servers", new IZkChildListener() {
public void handleChildChange(String s, List<String> children)
throws Exception {
//接收到通知,说明节点发⽣了变化,client需要更新infos集合中的数据
ArrayList<String> list = new ArrayList<String>();
//遍历更新过后的所有节点信息
for (String path : children) {
final Object o = zkClient.readData("/servers/" + path);
list.add(String.valueOf(o));
}
//最新数据覆盖⽼数据
infos = list;
System.out.println("--》接收到通知,最新服务器信息为:" + infos);
}
});
}
//发送时间查询的请求
public void sendRequest() throws IOException {
//⽬标服务器地址
final Random random = new Random();
final int i = random.nextInt(infos.size());
final String ipPort = infos.get(i);
final String[] arr = ipPort.split(":");
//建⽴socket连接
final Socket socket = new Socket(arr[0], Integer.parseInt(arr[1]));
final OutputStream out = socket.getOutputStream();
final InputStream in = socket.getInputStream();
//发送数据
out.write("query time".getBytes());
out.flush();
//接收返回结果
final byte[] b = new byte[1024];
in.read(b);//读取服务端返回数据
System.out.println("client端接收到server:+" + ipPort + "+返回结果:" +
new String(b));
//释放资源
in.close();
out.close();
socket.close();
}
public static void main(String[] args) throws InterruptedException {
final Client client = new Client();
client.connectZk(); //监听器逻辑
while (true) {
try {
client.sendRequest(); //发送请求
} catch (IOException e) {
e.printStackTrace();
try {
client.sendRequest();
} catch (IOException e1) {
e1.printStackTrace();
}
}
//每隔⼏秒中发送⼀次请求到服务端
Thread.sleep(2000);
}
}
}

分布式锁

什么是锁

  • 在单机程序中,当存在多个线程可以同时改变某个变量(可变共享变量)时,为了保证线程安全(数据不能出现脏数据)就需要对变量或代码块做同步,使其在修改这种变量时能够串⾏执⾏消除并发修改变量。

  • 对变量或者堆代码码块做同步本质上就是加锁。⽬的就是实现多个线程在⼀个时刻同⼀个代码块只能有⼀个线程可执⾏。

分布式锁

分布式的环境中会不会出现脏数据的情况呢?类似单机程序中线程安全的问题。观察下⾯的例⼦


  • 问题

假设Redis ⾥⾯的某个商品库存为 1;此时两个⽤户同时下单,其中⼀个下单请求执⾏到第 3 步,更新数据库的库存为 0,但是第 4 步还没有执⾏。⽽另外⼀个⽤户下单执⾏到了第 2 步,发现库存还是 1,就继续执⾏第 3 步。但是商品库存已经为0,所以如果数据库没有限制就会出现超卖的问题。

  • 解决⽅法

⽤锁把 2、3、4 步锁住,让他们执⾏完之后,另⼀个线程才能进来执⾏。


公司业务发展迅速,系统应对并发不断提⾼,解决⽅案是要增加⼀台机器,结果会出现更⼤的问题

假设有两个下单请求同时到来,分别由两个机器执⾏,那么这两个请求是可以同时执⾏了,依然存在超卖的问题。


因为如图所示系统是运⾏在两个不同的 JVM ⾥⾯,不同的机器上,增加的锁只对⾃⼰当前 JVM ⾥⾯的线程有效,对于其他 JVM 的线程是⽆效的。所以现在已经不是线程安全问题。需要保证两台机器加的锁是同⼀个锁,此时分布式锁就能解决该问题。

分布式锁的作⽤:在整个系统提供⼀个全局、唯⼀的锁,在分布式系统中每个系统在进⾏相关操作的时候需要获取到该锁,才能执⾏相应操作。

zk实现分布式锁

  • 利⽤Zookeeper可以创建临时带序号节点的特性来实现⼀个分布式锁

  • 锁就是zk指定⽬录下序号最⼩的临时序列节点,多个系统的多个线程都要在此⽬录下创建临时的顺序节点,因为Zk会为我们保证节点的顺序性,所以可以利⽤节点的顺序进⾏锁的判断。

  • 每个线程都是先创建临时顺序节点,然后获取当前⽬录下最⼩的节点(序号),判断最⼩节点是不是当前节点,如果是那么获取锁成功,如果不是那么获取锁失败。

  • 获取锁失败的线程获取当前节点上⼀个临时顺序节点,并对对此节点进⾏监听,当该节点删除的时候(上⼀个线程执⾏结束删除或者是掉线zk删除临时节点)这个线程会获取到通知,代表获取到了锁。


main方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.lagou.zk.dislock;

public class DisLockTest {
public static void main(String[] args) {
//使⽤10个线程模拟分布式环境
for (int i = 0; i < 10; i++) {
new Thread(new DisLockRunnable()).start();//启动线程
}
}
static class DisLockRunnable implements Runnable {
public void run() {
//每个线程具体的任务,每个线程就是抢锁,
final DisClient client = new DisClient();
client.getDisLock();
//模拟获取锁之后的其它动作
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//释放锁
client.deleteLock();
}
}
}

核⼼实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package com.lagou.zk.dislock;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

//抢锁
//1. 去zk创建临时序列节点,并获取到序号
//2. 判断⾃⼰创建节点序号是否是当前节点最⼩序号,如果是则获取锁
//执⾏相关操作,最后要释放锁
//3. 不是最⼩节点,当前线程需要等待,等待你的前⼀个序号的节点
//被删除,然后再次判断⾃⼰是否是最⼩节点。。。
public class DisClient {
public DisClient() {
//初始化zk的/distrilocl节点,会出现线程安全问题
synchronized (DisClient.class){
if (!zkClient.exists("/distrilock")) {
zkClient.createPersistent("/distrilock");
}
}
}
//前⼀个节点
String beforNodePath;
String currentNoePath;
//获取到zkClient
private ZkClient zkClient = new ZkClient("linux121:2181,linux122:2181");
//把抢锁过程为量部分,⼀部分是创建节点,⽐较序号,另⼀部分是等待锁
//完整获取锁⽅法
public void getDisLock() {
//获取到当前线程名称
final String threadName = Thread.currentThread().getName();
//⾸先调⽤tryGetLock
if (tryGetLock()) {
//说明获取到锁
System.out.println(threadName + ":获取到了锁");
} else {
// 没有获取到锁,
System.out.println(threadName + ":获取锁失败,进⼊等待状态");
waitForLock();
//递归获取锁
getDisLock();
}
}
CountDownLatch countDownLatch = null;
//尝试获取锁
public boolean tryGetLock() {
//创建临时顺序节点,/distrilock/序号
if (null == currentNoePath || "".equals(currentNoePath)) {
currentNoePath = zkClient.createEphemeralSequential("/distrilock/", "lock");
}
//获取到/distrilock下所有的⼦节点
final List<String> childs = zkClient.getChildren("/distrilock");
//对节点信息进⾏排序
Collections.sort(childs); //默认是升序
final String minNode = childs.get(0);
//判断⾃⼰创建节点是否与最⼩序号⼀致
if (currentNoePath.equals("/distrilock/" + minNode)) {
//说明当前线程创建的就是序号最⼩节点
return true;
} else {
//说明最⼩节点不是⾃⼰创建,要监控⾃⼰当前节点序号前⼀个的节点
final int i = Collections.binarySearch(childs, currentNoePath.substring("/distrilock/".length()));
//前⼀个(lastNodeChild是不包括⽗节点)
String lastNodeChild = childs.get(i - 1);
beforNodePath = "/distrilock/" + lastNodeChild;
}
return false;
}
//等待之前节点释放锁,如何判断锁被释放,需要唤醒线程继续尝试tryGetLock
public void waitForLock() {
//准备⼀个监听器
final IZkDataListener iZkDataListener = new IZkDataListener() {
public void handleDataChange(String s, Object o) throws Exception
{
}
//删除
public void handleDataDeleted(String s) throws Exception {
//提醒当前线程再次获取锁
countDownLatch.countDown();//把值减1变为0,唤醒之前await线程
}
};
//监控前⼀个节点
zkClient.subscribeDataChanges(beforNodePath, iZkDataListener);
//在监听的通知没来之前,该线程应该是等待状态,先判断⼀次上⼀个节点是否还存在
if (zkClient.exists(beforNodePath)) {
//开始等待,CountDownLatch:线程同步计数器
countDownLatch = new CountDownLatch(1);
try {
countDownLatch.await();//阻塞,countDownLatch值变为0
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//解除监听
zkClient.unsubscribeDataChanges(beforNodePath, iZkDataListener);
}
//释放锁
public void deleteLock() {
if (zkClient != null) {
zkClient.delete(currentNoePath);
zkClient.close();
}
}
}

分布式锁的实现可以是 Redis、Zookeeper,相对来说⽣产环境如果使⽤分布式锁可以考虑使⽤Redis实现⽽⾮Zk。