ZooKeeper是⼀个典型的发布/订阅模式的分布式数据管理与协调框架,我们可以使⽤它来进⾏分布式数据的发布与订阅。另⼀⽅⾯,通过对ZooKeeper中丰富的数据节点类型进⾏交叉使⽤,配合Watcher事件通知机制,可以⾮常⽅便地构建⼀系列分布式应⽤中都会涉及的核⼼功能,如数据发布/订阅、命名服务、集群管理、Master选举、分布式锁和分布式队列等。那接下来就针对这些典型的分布式应⽤场景来做下介绍
Zookeeper的两⼤特性:
-
客户端如果对Zookeeper的数据节点注册Watcher监听,那么当该数据节点的内容或是其⼦节点
列表发⽣变更时,Zookeeper服务器就会向订阅的客户端发送变更通知。
-
对在Zookeeper上创建的临时节点,⼀旦客户端与服务器之间的会话失效,那么临时节点也会被⾃动删除
利⽤其两⼤特性,可以实现集群机器存活监控系统,若监控系统在/clusterServers节点上注册⼀个Watcher监听,那么但凡进⾏动态添加机器的操作,就会在/clusterServers节点下创建⼀个临时节点:/clusterServers/[Hostname],这样,监控系统就能够实时监测机器的变动情况。
Java客户端的服务器动态上下线监听
分布式系统中,主节点会有多台,主节点可能因为任何原因出现宕机或者下线,⽽任意⼀台客户端都要能实时感知到主节点服务器的上下线。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| package com.lagou.zk.test;
import org.I0Itec.zkclient.ZkClient;
public class ServerMain { private ZkClient zkClient = null; private void connectZK(){ zkClient = new ZkClient("Linux121:2181,Linux122:2181,Linux123:2181"); if(!zkClient.exists("/servers")){ zkClient.createPersistent("/servers"); } } private void registerServerInfo(String ip,String port){ final String path = zkClient.createEphemeralSequential("/servers/server", ip +":"+port); System.out.println("---->>> 服务器注册成功,ip="+ip+";port ="+port+";节点路径信息="+path); } public static void main(String[] args) { final ServerMain server = new ServerMain(); server.connectZK(); server.registerServerInfo(args[0],args[1] ); new TimeServer(Integer.parseInt(args[1])).start(); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| package com.lagou.zk.test;
import java.io.IOException; import java.io.OutputStream; import java.net.ServerSocket; import java.net.Socket; import java.util.Date;
public class TimeServer extends Thread { private int port=0; public TimeServer(int port) { this.port = port; } @Override public void run() { try { final ServerSocket serverSocket = new ServerSocket(port); while(true){ final Socket socket = serverSocket.accept(); final OutputStream out = socket.getOutputStream(); out.write(new Date().toString().getBytes()); } } catch (IOException e) { e.printStackTrace(); } } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
| package com.lagou.zk.test;
import org.I0Itec.zkclient.IZkChildListener; import org.I0Itec.zkclient.ZkClient; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; import java.util.ArrayList; import java.util.List; import java.util.Random;
public class Client { ZkClient zkClient = null; ArrayList<String> infos = new ArrayList<String>(); private void connectZk() { zkClient = new ZkClient("Linux121:2181,Linux122:2181"); final List<String> childs = zkClient.getChildren("/servers"); for (String child : childs) { final Object o = zkClient.readData("/servers/" + child); infos.add(String.valueOf(o)); } zkClient.subscribeChildChanges("/servers", new IZkChildListener() { public void handleChildChange(String s, List<String> children) throws Exception { ArrayList<String> list = new ArrayList<String>(); for (String path : children) { final Object o = zkClient.readData("/servers/" + path); list.add(String.valueOf(o)); } infos = list; System.out.println("--》接收到通知,最新服务器信息为:" + infos); } }); } public void sendRequest() throws IOException { final Random random = new Random(); final int i = random.nextInt(infos.size()); final String ipPort = infos.get(i); final String[] arr = ipPort.split(":"); final Socket socket = new Socket(arr[0], Integer.parseInt(arr[1])); final OutputStream out = socket.getOutputStream(); final InputStream in = socket.getInputStream(); out.write("query time".getBytes()); out.flush(); final byte[] b = new byte[1024]; in.read(b); System.out.println("client端接收到server:+" + ipPort + "+返回结果:" + new String(b)); in.close(); out.close(); socket.close(); } public static void main(String[] args) throws InterruptedException { final Client client = new Client(); client.connectZk(); while (true) { try { client.sendRequest(); } catch (IOException e) { e.printStackTrace(); try { client.sendRequest(); } catch (IOException e1) { e1.printStackTrace(); } } Thread.sleep(2000); } } }
|
分布式锁
什么是锁
分布式锁
分布式的环境中会不会出现脏数据的情况呢?类似单机程序中线程安全的问题。观察下⾯的例⼦
假设Redis ⾥⾯的某个商品库存为 1;此时两个⽤户同时下单,其中⼀个下单请求执⾏到第 3 步,更新数据库的库存为 0,但是第 4 步还没有执⾏。⽽另外⼀个⽤户下单执⾏到了第 2 步,发现库存还是 1,就继续执⾏第 3 步。但是商品库存已经为0,所以如果数据库没有限制就会出现超卖的问题。
⽤锁把 2、3、4 步锁住,让他们执⾏完之后,另⼀个线程才能进来执⾏。
公司业务发展迅速,系统应对并发不断提⾼,解决⽅案是要增加⼀台机器,结果会出现更⼤的问题
假设有两个下单请求同时到来,分别由两个机器执⾏,那么这两个请求是可以同时执⾏了,依然存在超卖的问题。
因为如图所示系统是运⾏在两个不同的 JVM ⾥⾯,不同的机器上,增加的锁只对⾃⼰当前 JVM ⾥⾯的线程有效,对于其他 JVM 的线程是⽆效的。所以现在已经不是线程安全问题。需要保证两台机器加的锁是同⼀个锁,此时分布式锁就能解决该问题。
分布式锁的作⽤:在整个系统提供⼀个全局、唯⼀的锁,在分布式系统中每个系统在进⾏相关操作的时候需要获取到该锁,才能执⾏相应操作。
zk实现分布式锁
-
利⽤Zookeeper可以创建临时带序号节点的特性来实现⼀个分布式锁
-
锁就是zk指定⽬录下序号最⼩的临时序列节点,多个系统的多个线程都要在此⽬录下创建临时的顺序节点,因为Zk会为我们保证节点的顺序性,所以可以利⽤节点的顺序进⾏锁的判断。
-
每个线程都是先创建临时顺序节点,然后获取当前⽬录下最⼩的节点(序号),判断最⼩节点是不是当前节点,如果是那么获取锁成功,如果不是那么获取锁失败。
-
获取锁失败的线程获取当前节点上⼀个临时顺序节点,并对对此节点进⾏监听,当该节点删除的时候(上⼀个线程执⾏结束删除或者是掉线zk删除临时节点)这个线程会获取到通知,代表获取到了锁。
main方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| package com.lagou.zk.dislock;
public class DisLockTest { public static void main(String[] args) { for (int i = 0; i < 10; i++) { new Thread(new DisLockRunnable()).start(); } } static class DisLockRunnable implements Runnable { public void run() { final DisClient client = new DisClient(); client.getDisLock(); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } client.deleteLock(); } } }
|
核⼼实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
| package com.lagou.zk.dislock;
import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkClient; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch;
public class DisClient { public DisClient() { synchronized (DisClient.class){ if (!zkClient.exists("/distrilock")) { zkClient.createPersistent("/distrilock"); } } } String beforNodePath; String currentNoePath; private ZkClient zkClient = new ZkClient("linux121:2181,linux122:2181"); public void getDisLock() { final String threadName = Thread.currentThread().getName(); if (tryGetLock()) { System.out.println(threadName + ":获取到了锁"); } else { System.out.println(threadName + ":获取锁失败,进⼊等待状态"); waitForLock(); getDisLock(); } } CountDownLatch countDownLatch = null; public boolean tryGetLock() { if (null == currentNoePath || "".equals(currentNoePath)) { currentNoePath = zkClient.createEphemeralSequential("/distrilock/", "lock"); } final List<String> childs = zkClient.getChildren("/distrilock"); Collections.sort(childs); final String minNode = childs.get(0); if (currentNoePath.equals("/distrilock/" + minNode)) { return true; } else { final int i = Collections.binarySearch(childs, currentNoePath.substring("/distrilock/".length())); String lastNodeChild = childs.get(i - 1); beforNodePath = "/distrilock/" + lastNodeChild; } return false; } public void waitForLock() { final IZkDataListener iZkDataListener = new IZkDataListener() { public void handleDataChange(String s, Object o) throws Exception { } public void handleDataDeleted(String s) throws Exception { countDownLatch.countDown(); } }; zkClient.subscribeDataChanges(beforNodePath, iZkDataListener); if (zkClient.exists(beforNodePath)) { countDownLatch = new CountDownLatch(1); try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } zkClient.unsubscribeDataChanges(beforNodePath, iZkDataListener); } public void deleteLock() { if (zkClient != null) { zkClient.delete(currentNoePath); zkClient.close(); } } }
|
分布式锁的实现可以是 Redis、Zookeeper,相对来说⽣产环境如果使⽤分布式锁可以考虑使⽤Redis实现⽽⾮Zk。