博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
zookeeper编程入门系列之zookeeper实现分布式进程监控和分布式共享锁(图文详解)...
阅读量:7125 次
发布时间:2019-06-28

本文共 36721 字,大约阅读时间需要 122 分钟。

本博文的主要内容有

    一、zookeeper编程入门系列之利用zookeeper的临时节点的特性来监控程序是否还在运行

    二、zookeeper编程入门系列之zookeeper实现分布式进程监控

   三、zookeeper编程入门系列之zookeeper实现分布式共享锁

 

 

 

 

 

 

 

  这里,推荐用下面的eclipse版本(当然你若也有myeclipse,请忽视我这句话)

 

 

 

 

 

 

 

 

 

 

Group Id:zhouls.bigdata

Artifact Id:zkDemo

Package:zhouls.bigdata.zkDemo

 

 

 

   将默认的jdk,修改为jdk1.7

 

 

  修改默认的pom.xml

4.0.0
zhouls.bigdata
zkDemo
0.0.1-SNAPSHOT
jar
zkDemo
http://maven.apache.org
UTF-8
junit
junit
3.8.1
test

 

 

 

   修改后的pom.xml为

4.0.0
zhouls.bigdata
zkDemo
0.0.1-SNAPSHOT
jar
zkDemo
http://maven.apache.org
UTF-8
junit
junit
4.12
test
org.apache.curator
curator-framework
2.10.0

 

 

 

 

   junit是单元测试。

 

 

 

 

 

  也许,大家的jdk1.7会报错误,那就改为jdk1.8。

 

 

 

 

 一、zookeeper编程入门系列之利用zookeeper的临时节点的特性来监控程序是否还在运行

 

   写一个TestCurator.java

  怎么通过Curator连接到zookeeper官网,其实是有固定的。

 

 

 

 

 

   这打开需要好几分钟的时间,里面会有示范代码,教我们怎么连接zookeeper。

 

 

 

  我这里的zookeeper集群是master(192.168.80.145)、slave1(192.168.80.146)和slave2(192.168.80.147)。

 

 

WatchedEvent state:SyncConnected type:None path:null[zk: localhost:2181(CONNECTED) 0] ls /[hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch][zk: localhost:2181(CONNECTED) 1]

 

 

 

 

 

WatchedEvent state:SyncConnected type:None path:null[zk: localhost:2181(CONNECTED) 0] ls /[hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch][zk: localhost:2181(CONNECTED) 1]

 

 

 

WatchedEvent state:SyncConnected type:None path:null[zk: localhost:2181(CONNECTED) 0] ls /[hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch][zk: localhost:2181(CONNECTED) 1]

 

 

 

   现在,我想通过Curator来连接到zookeeper集群,并在里面创建临时节点。

   这里,永久节点为monitor、临时节点为test123。(当然,大家可以自行去命名)(同时,大家也可以通过命令行方式来创建,我这里就是以代码api形式来创建了)

 

 

 

 

 

 

 

 

 

    比如,这样,monitor是父节点(作为永久节点),test123是临时节点。

     而现在,是monitor都没有,它不会给我们一次性创建完。

 

   除非,大家在命令行里先创建好monitor节点,之后,然后上述代码可以操作成功。否则,就需如下修改代码。

 

 

 

 

 

package zhouls.bigdata.zkDemo;import org.apache.curator.RetryPolicy;import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;import org.apache.curator.retry.ExponentialBackoffRetry;import org.apache.zookeeper.CreateMode;import org.apache.zookeeper.ZooDefs.Ids;import org.junit.Test;/** *  * @author zhouls * */public class TestCurator {    @Test    public void testName() throws Exception {        // 1000:表示curator链接zk的时候超时时间是多少 3:表示链接zk的最大重试次数        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);        String connectString = "master:2181,slave1:2181,slave2:2181";        int sessionTimeoutMs = 5000;// 这个值只能在4000-40000ms之间 表示链接断掉之后多长时间临时节点会消失        int connectionTimeoutMs = 3000;// 获取链接的超时时间        CuratorFramework client = CuratorFrameworkFactory.newClient(                connectString, sessionTimeoutMs, connectionTimeoutMs,                retryPolicy);        client.start();// 开启客户端                client.create().creatingParentsIfNeeded()// 如果父节点不存在则创建            .withMode(CreateMode.EPHEMERAL)//指定节点类型,临时节点            .withACL(Ids.OPEN_ACL_UNSAFE)// 设置节点权限信息            .forPath("/monitor/test123");//指定节点名称                }}

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

   可以看到成功monitor生成,其实啊,/monitor/test123节点也是有的。(只是中间又消失了)

  为什么会中间消失了呢?是因为,test123是临时节点。创建完之后,它就会消失了。

 

WatchedEvent state:SyncConnected type:None path:null[zk: localhost:2181(CONNECTED) 0] ls /[hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch][zk: localhost:2181(CONNECTED) 1] ls /[monitor, hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch][zk: localhost:2181(CONNECTED) 2] ls /monitor[]

 

 

 

 

  那么,我想看,怎么用代码来实现呢?

  增加以下代码

 

 

   此时的代码是TestCurator.java

package zhouls.bigdata.zkDemo;import org.apache.curator.RetryPolicy;import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;import org.apache.curator.retry.ExponentialBackoffRetry;import org.apache.zookeeper.CreateMode;import org.apache.zookeeper.ZooDefs.Ids;import org.junit.Test;/** *  * @author zhouls * */public class TestCurator {    @Test    public void testName() throws Exception {        // 1000:表示curator链接zk的时候超时时间是多少 3:表示链接zk的最大重试次数        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);        String connectString = "master:2181,slave1:2181,slave2:2181";        int sessionTimeoutMs = 5000;// 这个值只能在4000-40000ms之间 表示链接断掉之后多长时间临时节点会消失        int connectionTimeoutMs = 3000;// 获取链接的超时时间        CuratorFramework client = CuratorFrameworkFactory.newClient(                connectString, sessionTimeoutMs, connectionTimeoutMs,                retryPolicy);        client.start();// 开启客户端                client.create().creatingParentsIfNeeded()// 如果父节点不存在则创建            .withMode(CreateMode.EPHEMERAL)//指定节点类型,临时节点            .withACL(Ids.OPEN_ACL_UNSAFE)// 设置节点权限信息            .forPath("/monitor/test123");//指定节点名称        while (true) {                    ;        }        }}

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

WatchedEvent state:SyncConnected type:None path:null[zk: localhost:2181(CONNECTED) 0] ls /[hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch][zk: localhost:2181(CONNECTED) 1] ls /[monitor, hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch][zk: localhost:2181(CONNECTED) 2] ls /monitor[test123][zk: localhost:2181(CONNECTED) 3]

 

 

 

  然后,我这边,把代码,来停掉,则它就会消失了。

 

 

 

 

 

 

WatchedEvent state:SyncConnected type:None path:null[zk: localhost:2181(CONNECTED) 0] ls /[hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch][zk: localhost:2181(CONNECTED) 1] ls /[monitor, hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch][zk: localhost:2181(CONNECTED) 2] ls /monitor[][zk: localhost:2181(CONNECTED) 3] ls /monitor[test123][zk: localhost:2181(CONNECTED) 4] ls /monitor[][zk: localhost:2181(CONNECTED) 5]

 

 

 

 

  好的,那么,现在,又有一个疑问出来了,在往monitor节点里,注册节点如test123,那么,我怎么知道是哪一台的呢?则此时,需要做如下修改

 

 

   此刻的代码如下TestCurator.java

package zhouls.bigdata.zkDemo;import java.net.InetAddress;import org.apache.curator.RetryPolicy;import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;import org.apache.curator.retry.ExponentialBackoffRetry;import org.apache.zookeeper.CreateMode;import org.apache.zookeeper.ZooDefs.Ids;import org.junit.Test;/** *  * @author zhouls * */public class TestCurator {    @Test    public void testName() throws Exception {        // 1000:表示curator链接zk的时候超时时间是多少 3:表示链接zk的最大重试次数        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);        String connectString = "master:2181,slave1:2181,slave2:2181";        int sessionTimeoutMs = 5000;// 这个值只能在4000-40000ms之间 表示链接断掉之后多长时间临时节点会消失        int connectionTimeoutMs = 3000;// 获取链接的超时时间        CuratorFramework client = CuratorFrameworkFactory.newClient(                connectString, sessionTimeoutMs, connectionTimeoutMs,                retryPolicy);        client.start();// 开启客户端        InetAddress localhost = InetAddress.getLocalHost();        String ip = localhost.getHostAddress();                client.create().creatingParentsIfNeeded()// 如果父节点不存在则创建            .withMode(CreateMode.EPHEMERAL)//指定节点类型,临时节点            .withACL(Ids.OPEN_ACL_UNSAFE)// 设置节点权限信息            .forPath("/monitor/" + ip);//指定节点名称        while (true) {                    ;        }            }}

 

 

 

 

WatchedEvent state:SyncConnected type:None path:null[zk: localhost:2181(CONNECTED) 0] ls /[hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch][zk: localhost:2181(CONNECTED) 1] ls /[monitor, hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch][zk: localhost:2181(CONNECTED) 2] ls /monitor[][zk: localhost:2181(CONNECTED) 3] ls /monitor[test123][zk: localhost:2181(CONNECTED) 4] ls /monitor[][zk: localhost:2181(CONNECTED) 5] ls /monitor[169.254.28.160][zk: localhost:2181(CONNECTED) 6]

 

 

 

 

WatchedEvent state:SyncConnected type:None path:null[zk: localhost:2181(CONNECTED) 0] ls /[hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch][zk: localhost:2181(CONNECTED) 1] ls /[monitor, hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch][zk: localhost:2181(CONNECTED) 2] ls /monitor[test123][zk: localhost:2181(CONNECTED) 3] ls /monitor[169.254.28.160][zk: localhost:2181(CONNECTED) 4]

 

 

 

 

WatchedEvent state:SyncConnected type:None path:null[zk: localhost:2181(CONNECTED) 0] ls /[hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch][zk: localhost:2181(CONNECTED) 1] ls /[monitor, hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch][zk: localhost:2181(CONNECTED) 2] ls /monitor[test123][zk: localhost:2181(CONNECTED) 3] ls /monitor[169.254.28.160][zk: localhost:2181(CONNECTED) 4]

 

 

  这个ip怎么不是我集群里的ip呢?是哪里的???

  原来是这里的

 

   因为,我是在test测试,所以,拿到的是windows本地的ip地址。

 

 

 

  如果,放在mian去测试,则就是拿到集群里的ip地址了。

 

 

 

 

 

 

 

 

 

 

 

  至此,我们是用临时节点的这个特性,来监控程序有没有运行的。并不是说临时节点就是来只做这个事!!!

package zhouls.bigdata.zkDemo;import java.net.InetAddress;import org.apache.curator.RetryPolicy;import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;import org.apache.curator.retry.ExponentialBackoffRetry;import org.apache.zookeeper.CreateMode;import org.apache.zookeeper.ZooDefs.Ids;import org.junit.Test;/** *  * @author zhouls * */public class TestCurator {    @Test    public void testName() throws Exception {        // 1000:表示curator链接zk的时候超时时间是多少 3:表示链接zk的最大重试次数        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);        String connectString = "master:2181,slave1:2181,slave2:2181";        int sessionTimeoutMs = 5000;// 这个值只能在4000-40000ms之间 表示链接断掉之后多长时间临时节点会消失        int connectionTimeoutMs = 3000;// 获取链接的超时时间        CuratorFramework client = CuratorFrameworkFactory.newClient(                connectString, sessionTimeoutMs, connectionTimeoutMs,                retryPolicy);        client.start();// 开启客户端        InetAddress localhost = InetAddress.getLocalHost();        String ip = localhost.getHostAddress();                client.create().creatingParentsIfNeeded()// 如果父节点不存在则创建            .withMode(CreateMode.EPHEMERAL)//指定节点类型,临时节点            .withACL(Ids.OPEN_ACL_UNSAFE)// 设置节点权限信息            .forPath("/monitor/" + ip);//指定节点名称        while (true) {                    ;        }                //或者        //        client.create().creatingParentsIfNeeded()// 如果父节点不存在则创建//                .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)// 指定节点类型,注意:临时节点必须在某一个永久节点下面//                .withACL(Ids.OPEN_ACL_UNSAFE)// 设置节点权限信息//                .forPath("/monitor/ + ip");// 指定节点名称//        while (true) {//            ;//        }    }}

 

 

  可以将这个,写入到入口类或构造函数里。每次开始前都调用执行。以此来监控程序是否还在运行,非常重要!

 

 

 

 

 

 

 

二、zookeeper编程入门系列之zookeeper实现分布式进程监控

 

 

 

   思路: 即在/下,先注册一个监视器,即monitor节点(为永久节点)

          然后,监视monitor节点下面的所有子节点(为临时节点)

 

   概念见

 

  先执行

 

 

 

   然后执行

 

 

 

 

  ZkNodeWacter.java

package zhouls.bigdata.zkDemo;import java.util.ArrayList;import java.util.List;import org.apache.curator.RetryPolicy;import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;import org.apache.curator.retry.ExponentialBackoffRetry;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;/** * 这个监视器需要一直在后台运行,所以相当于是一个死循环的进程 * @author zhouls * */public class ZkNodeWacter implements Watcher {    CuratorFramework client;    List
childrenList = new ArrayList
(); public ZkNodeWacter() { //在启动监视器的时候,链接到zk RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); String connectString = "master:2181,slave1:2181,slave2:2181"; int sessionTimeoutMs = 5000; int connectionTimeoutMs = 3000; client = CuratorFrameworkFactory.newClient( connectString, sessionTimeoutMs, connectionTimeoutMs, retryPolicy); client.start();// 开启客户端 //监视monitor节点下面的所有子节点(为临时节点) try { //在monitor目录上注册一个监视器,这个监视器只能使用一次 childrenList = client.getChildren().usingWatcher(this).forPath("/monitor"); } catch (Exception e) { e.printStackTrace(); } } /** * 当monitor节点下面的子节点发生变化的时候,这个方法会被调用到 */ public void process(WatchedEvent event) { System.out.println("我被调用了:"+event); try { //重复注册监视器 List
newChildrenList = client.getChildren().usingWatcher(this).forPath("/monitor"); //先遍历原始的子节点list for (String ip : childrenList) { if(!newChildrenList.contains(ip)){ System.out.println("节点消失:"+ip); } } for (String ip : newChildrenList) { if(!childrenList.contains(ip)){ System.out.println("新增节点:"+ip); } } childrenList = newChildrenList; } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { ZkNodeWacter spiderWacter = new ZkNodeWacter(); spiderWacter.start();//表示需要开启一个监视器 } private void start() { while(true){ ; } }}

 

 

 

 

 

 

  TestCurator.java

package zhouls.bigdata.zkDemo;import java.net.InetAddress;import org.apache.curator.RetryPolicy;import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;import org.apache.curator.retry.ExponentialBackoffRetry;import org.apache.zookeeper.CreateMode;import org.apache.zookeeper.ZooDefs.Ids;import org.junit.Test;/** *  * @author zhouls * */public class TestCurator {    @Test    public void testName() throws Exception {        // 1000:表示curator链接zk的时候超时时间是多少 3:表示链接zk的最大重试次数        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);        String connectString = "master:2181,slave1:2181,slave2:2181";        int sessionTimeoutMs = 5000;// 这个值只能在4000-40000ms之间 表示链接断掉之后多长时间临时节点会消失        int connectionTimeoutMs = 3000;// 获取链接的超时时间        CuratorFramework client = CuratorFrameworkFactory.newClient(                connectString, sessionTimeoutMs, connectionTimeoutMs,                retryPolicy);        client.start();// 开启客户端        InetAddress localhost = InetAddress.getLocalHost();        String ip = localhost.getHostAddress();                client.create().creatingParentsIfNeeded()// 如果父节点不存在则创建            .withMode(CreateMode.EPHEMERAL)//指定节点类型,临时节点            .withACL(Ids.OPEN_ACL_UNSAFE)// 设置节点权限信息            .forPath("/monitor/" + ip);//指定节点名称        while (true) {                    ;        }                        //        client.create().creatingParentsIfNeeded()// 如果父节点不存在则创建//                .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)// 指定节点类型,注意:临时节点必须在某一个永久节点下面//                .withACL(Ids.OPEN_ACL_UNSAFE)// 设置节点权限信息//                .forPath("/monitor/ + ip");// 指定节点名称//        while (true) {//            ;//        }    }}

 

 

 

 

 

 

 

三、zookeeper编程入门系列之zookeeper实现分布式共享锁

   这里,一般,都是创建临时有序子节点,怎么来创建,不难

   说到协调,我首先想到的是北京很多十字路口的交通协管,他们手握着小红旗,指挥车辆和行人是不是可以通行。如果我们把车辆和行人比喻成运行在计算机中的单元(线程),那么这个协管是干什么的?很多人都会想到,这不就是锁么?对,在一个并发的环境里,我们为了避免多个运行单元对共享数据同时进行修改,造成数据损坏的情况出现,我们就必须依赖像锁这样的协调机制,让有的线程可以先操作这些资源,然后其他线程等待。对于进程内的锁来讲,我们使用的各种语言平台都已经给我们准备很多种选择。

 

 

 

 

  

 

 

 

 

 

   TestCurator.java

package zhouls.bigdata.zkDemo;import java.net.InetAddress;import org.apache.curator.RetryPolicy;import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;import org.apache.curator.retry.ExponentialBackoffRetry;import org.apache.zookeeper.CreateMode;import org.apache.zookeeper.ZooDefs.Ids;import org.junit.Test;/** *  * @author zhouls * */public class TestCurator {    @Test    public void test1() throws Exception {        // 1000:表示curator链接zk的时候超时时间是多少 3:表示链接zk的最大重试次数        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);        String connectString = "master:2181,slave1:2181,slave2:2181";        int sessionTimeoutMs = 5000;// 这个值只能在4000-40000ms之间 表示链接断掉之后多长时间临时节点会消失        int connectionTimeoutMs = 3000;// 获取链接的超时时间        CuratorFramework client = CuratorFrameworkFactory.newClient(                connectString, sessionTimeoutMs, connectionTimeoutMs,                retryPolicy);        client.start();// 开启客户端        InetAddress localhost = InetAddress.getLocalHost();        String ip = localhost.getHostAddress();                client.create().creatingParentsIfNeeded()// 如果父节点不存在则创建            .withMode(CreateMode.EPHEMERAL)//指定节点类型,临时节点            .withACL(Ids.OPEN_ACL_UNSAFE)// 设置节点权限信息            .forPath("/monitor/" + ip);//指定节点名称        while (true) {            ;        }            }                @Test        public void test2() throws Exception {            // 1000:表示curator链接zk的时候超时时间是多少 3:表示链接zk的最大重试次数            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);            String connectString = "master:2181,slave1:2181,slave2:2181";            int sessionTimeoutMs = 5000;// 这个值只能在4000-40000ms之间 表示链接断掉之后多长时间临时节点会消失            int connectionTimeoutMs = 3000;// 获取链接的超时时间            CuratorFramework client = CuratorFrameworkFactory.newClient(                    connectString, sessionTimeoutMs, connectionTimeoutMs,                    retryPolicy);            client.start();// 开启客户端            InetAddress localhost = InetAddress.getLocalHost();            String ip = localhost.getHostAddress();                        client.create().creatingParentsIfNeeded()// 如果父节点不存在则创建            .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)// 指定节点类型,注意:临时节点必须在某一个永久节点下面            .withACL(Ids.OPEN_ACL_UNSAFE)// 设置节点权限信息            .forPath("/monitor/");// 指定节点名称            while (true) {                ;            }    }}

 

 

 

 

  DistributedLock.java

package zhouls.bigdata.zkDemo;import java.io.IOException;import java.util.ArrayList;import java.util.Collections;import java.util.List;import java.util.concurrent.CountDownLatch;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import org.apache.zookeeper.CreateMode;import org.apache.zookeeper.KeeperException;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.ZooDefs;import org.apache.zookeeper.ZooKeeper;import org.apache.zookeeper.data.Stat;/**      DistributedLock lock = null;    try {        lock = new DistributedLock("127.0.0.1:2181","test");        lock.lock();        //do something...    } catch (Exception e) {        e.printStackTrace();    }     finally {        if(lock != null)            lock.unlock();    }        //lock.closeZk();//在cleanup方法中添加 * */public class DistributedLock implements Lock, Watcher{    private ZooKeeper zk;    private String root = "/locks";//根    private String lockName;//竞争资源的标志    private String waitNode;//等待前一个锁    private String myZnode;//当前锁    private CountDownLatch latch;//计数器    private int sessionTimeout = 30000;//30秒    private int waitTimeout = 30000;//等待节点失效最大时间 30秒    private List
exception = new ArrayList
(); /** * 创建分布式锁,使用前请确认zkConnString配置的zookeeper服务可用 * @param zkConnString 127.0.0.1:2181 * @param lockName 竞争资源标志,lockName中不能包含单词lock */ public DistributedLock(String zkConnString, String lockName){ this.lockName = lockName; // 创建一个与服务器的连接 try { zk = new ZooKeeper(zkConnString, sessionTimeout, this); Stat stat = zk.exists(root, false); if(stat == null){ // 创建根节点 zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); } } catch (IOException e) { exception.add(e); } catch (KeeperException e) { exception.add(e); } catch (InterruptedException e) { exception.add(e); } } /** * zookeeper节点的监视器 */ public void process(WatchedEvent event) { if(this.latch != null) { this.latch.countDown(); } } /** * 获取锁 */ public void lock() { if(exception.size() > 0){ throw new LockException(exception.get(0)); } try { if(this.tryLock()){ System.out.println("Thread " + Thread.currentThread().getId() + " " +myZnode + " get lock true"); return; } else{ waitForLock(waitNode, waitTimeout);//等待获取锁 } } catch (KeeperException e) { throw new LockException(e); } catch (InterruptedException e) { throw new LockException(e); } } /** * 尝试获取锁 */ public boolean tryLock() { try { String splitStr = "_lock_"; if(lockName.contains(splitStr)) throw new LockException("lockName can not contains \\u000B"); //创建临时有序子节点 myZnode = zk.create(root + "/" + lockName + splitStr, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL); System.err.println(myZnode + " is created "); //取出所有子节点 List
subNodes = zk.getChildren(root, false); //取出所有lockName的锁 List
lockObjNodes = new ArrayList
(); for (String node : subNodes) { String _node = node.split(splitStr)[0]; if(_node.equals(lockName)){ lockObjNodes.add(node); } } //对所有节点进行默认排序,从小到大 Collections.sort(lockObjNodes); System.out.println(myZnode + "==" + lockObjNodes.get(0)); if(myZnode.equals(root+"/"+lockObjNodes.get(0))){ //如果是最小的节点,则表示取得锁 return true; } //如果不是最小的节点,找到比自己小1的节点 String subMyZnode = myZnode.substring(myZnode.lastIndexOf("/") + 1); //获取比当前节点小一级的节点(Collections.binarySearch(lockObjNodes, subMyZnode):获取当前节点的角标) waitNode = lockObjNodes.get(Collections.binarySearch(lockObjNodes, subMyZnode) - 1); } catch (KeeperException e) { throw new LockException(e); } catch (InterruptedException e) { throw new LockException(e); } return false; } public boolean tryLock(long time, TimeUnit unit) { try { if(this.tryLock()){ return true; } return waitForLock(waitNode,time); } catch (Exception e) { e.printStackTrace(); } return false; } /** * 等待获取锁 * @param lower :等待的锁 * @param waitTime 最大等待时间 * @return * @throws InterruptedException * @throws KeeperException */ private boolean waitForLock(String lower, long waitTime) throws InterruptedException, KeeperException { Stat stat = zk.exists(root + "/" + lower,true); //判断比自己小一个数的节点是否存在,如果不存在则无需等待锁,同时注册监听 if(stat != null){ System.out.println("Thread " + Thread.currentThread().getId() + " waiting for " + root + "/" + lower); this.latch = new CountDownLatch(1); this.latch.await(waitTime, TimeUnit.MILLISECONDS); this.latch = null; } return true; } /** * 取消锁监控 */ public void unlock() { try { System.out.println(Thread.currentThread().getId()+",unlock " + myZnode); zk.delete(myZnode,-1); myZnode = null; //zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } } /** * 关闭zk链接 */ public void closeZk(){ try { zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } } public void lockInterruptibly() throws InterruptedException { this.lock(); } public Condition newCondition() { return null; } /** * 自定义异常信息 * @author lenovo * */ public class LockException extends RuntimeException { private static final long serialVersionUID = 1L; public LockException(String e){ super(e); } public LockException(Exception e){ super(e); } }}

 

 

 

 

   如有两个线程, 两个线程要同时到mysql中更新一条数据, 对数据库中的数据进行累加更新。由于在分布式环境下, 这两个线程可能存在于不同的机器上的不同jvm进程中, 所以这两个线程的关系就是垮主机跨进程, 使用java中的synchronized锁是搞不定的。

 

 

 

  概念,见

 

 

 

  这里的节点也可以为lock。

  先执行以下的test3,再执行test4

 

 

 

 

 

 

 

 

 

 

 

 

 

[zk: localhost:2181(CONNECTED) 9] ls /       [monitor, hbase, zookeeper, admin, lock, consumers, config, storm, brokers, controller_epoch][zk: localhost:2181(CONNECTED) 10] ls /lock[169.254.28.160][zk: localhost:2181(CONNECTED) 11]

 

 

 

 

  然后,再执行test4

 

 

 

 

 

 

 

 

 

 

 

 

   然后,再执行下test4,试试,看看有什么变化

 

  可以看到,在增加。

 

 

  总的代码是TestCurator.java

package zhouls.bigdata.zkDemo;import java.net.InetAddress;import org.apache.curator.RetryPolicy;import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;import org.apache.curator.retry.ExponentialBackoffRetry;import org.apache.zookeeper.CreateMode;import org.apache.zookeeper.ZooDefs.Ids;import org.junit.Test;/** *  * @author zhouls * */public class TestCurator {    @Test    public void test1() throws Exception {        // 1000:表示curator链接zk的时候超时时间是多少 3:表示链接zk的最大重试次数        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);        String connectString = "master:2181,slave1:2181,slave2:2181";        int sessionTimeoutMs = 5000;// 这个值只能在4000-40000ms之间 表示链接断掉之后多长时间临时节点会消失        int connectionTimeoutMs = 3000;// 获取链接的超时时间        CuratorFramework client = CuratorFrameworkFactory.newClient(                connectString, sessionTimeoutMs, connectionTimeoutMs,                retryPolicy);        client.start();// 开启客户端        InetAddress localhost = InetAddress.getLocalHost();        String ip = localhost.getHostAddress();                client.create().creatingParentsIfNeeded()// 如果父节点不存在则创建            .withMode(CreateMode.EPHEMERAL)//指定节点类型,临时节点            .withACL(Ids.OPEN_ACL_UNSAFE)// 设置节点权限信息            .forPath("/monitor/" + ip);//指定节点名称        while (true) {            ;        }            }                @Test        public void test2() throws Exception {            // 1000:表示curator链接zk的时候超时时间是多少 3:表示链接zk的最大重试次数            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);            String connectString = "master:2181,slave1:2181,slave2:2181";            int sessionTimeoutMs = 5000;// 这个值只能在4000-40000ms之间 表示链接断掉之后多长时间临时节点会消失            int connectionTimeoutMs = 3000;// 获取链接的超时时间            CuratorFramework client = CuratorFrameworkFactory.newClient(                    connectString, sessionTimeoutMs, connectionTimeoutMs,                    retryPolicy);            client.start();// 开启客户端            InetAddress localhost = InetAddress.getLocalHost();            String ip = localhost.getHostAddress();                        client.create().creatingParentsIfNeeded()// 如果父节点不存在则创建            .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)// 指定节点类型,注意:临时节点必须在某一个永久节点下面            .withACL(Ids.OPEN_ACL_UNSAFE)// 设置节点权限信息            .forPath("/monitor/");// 指定节点名称            while (true) {                ;            }    }                                @Test        public void test3() throws Exception {            // 1000:表示curator链接zk的时候超时时间是多少 3:表示链接zk的最大重试次数            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);            String connectString = "master:2181,slave1:2181,slave2:2181";            int sessionTimeoutMs = 5000;// 这个值只能在4000-40000ms之间 表示链接断掉之后多长时间临时节点会消失            int connectionTimeoutMs = 3000;// 获取链接的超时时间            CuratorFramework client = CuratorFrameworkFactory.newClient(                    connectString, sessionTimeoutMs, connectionTimeoutMs,                    retryPolicy);            client.start();// 开启客户端            InetAddress localhost = InetAddress.getLocalHost();            String ip = localhost.getHostAddress();                        client.create().creatingParentsIfNeeded()// 如果父节点不存在则创建                .withMode(CreateMode.EPHEMERAL)//指定节点类型,临时节点                .withACL(Ids.OPEN_ACL_UNSAFE)// 设置节点权限信息                .forPath("/lock/" + ip);//指定节点名称            while (true) {                ;            }                    }                                        @Test        public void test4() throws Exception {            // 1000:表示curator链接zk的时候超时时间是多少 3:表示链接zk的最大重试次数            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);            String connectString = "master:2181,slave1:2181,slave2:2181";            int sessionTimeoutMs = 5000;// 这个值只能在4000-40000ms之间 表示链接断掉之后多长时间临时节点会消失            int connectionTimeoutMs = 3000;// 获取链接的超时时间            CuratorFramework client = CuratorFrameworkFactory.newClient(                    connectString, sessionTimeoutMs, connectionTimeoutMs,                    retryPolicy);            client.start();// 开启客户端            InetAddress localhost = InetAddress.getLocalHost();            String ip = localhost.getHostAddress();                        client.create().creatingParentsIfNeeded()// 如果父节点不存在则创建            .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)// 指定节点类型,注意:临时节点必须在某一个永久节点下面            .withACL(Ids.OPEN_ACL_UNSAFE)// 设置节点权限信息            .forPath("/lock/");// 指定节点名称            while (true) {                ;            }    }                        }

 

 

 

 

  DistributedLock.java

package zhouls.bigdata.zkDemo;import java.io.IOException;import java.util.ArrayList;import java.util.Collections;import java.util.List;import java.util.concurrent.CountDownLatch;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import org.apache.zookeeper.CreateMode;import org.apache.zookeeper.KeeperException;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.ZooDefs;import org.apache.zookeeper.ZooKeeper;import org.apache.zookeeper.data.Stat;/**      DistributedLock lock = null;    try {        lock = new DistributedLock("127.0.0.1:2181","test");        lock.lock();        //do something...    } catch (Exception e) {        e.printStackTrace();    }     finally {        if(lock != null)            lock.unlock();    }        //lock.closeZk();//在cleanup方法中添加 * */public class DistributedLock implements Lock, Watcher{    private ZooKeeper zk;    private String root = "/locks";//根    private String lockName;//竞争资源的标志    private String waitNode;//等待前一个锁    private String myZnode;//当前锁    private CountDownLatch latch;//计数器    private int sessionTimeout = 30000;//30秒    private int waitTimeout = 30000;//等待节点失效最大时间 30秒    private List
exception = new ArrayList
(); /** * 创建分布式锁,使用前请确认zkConnString配置的zookeeper服务可用 * @param zkConnString 127.0.0.1:2181 * @param lockName 竞争资源标志,lockName中不能包含单词lock */ public DistributedLock(String zkConnString, String lockName){ this.lockName = lockName; // 创建一个与服务器的连接 try { zk = new ZooKeeper(zkConnString, sessionTimeout, this); Stat stat = zk.exists(root, false); if(stat == null){ // 创建根节点 zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); } } catch (IOException e) { exception.add(e); } catch (KeeperException e) { exception.add(e); } catch (InterruptedException e) { exception.add(e); } } /** * zookeeper节点的监视器 */ public void process(WatchedEvent event) { if(this.latch != null) { this.latch.countDown(); } } /** * 获取锁 */ public void lock() { if(exception.size() > 0){ throw new LockException(exception.get(0)); } try { if(this.tryLock()){ System.out.println("Thread " + Thread.currentThread().getId() + " " +myZnode + " get lock true"); return; } else{ waitForLock(waitNode, waitTimeout);//等待获取锁 } } catch (KeeperException e) { throw new LockException(e); } catch (InterruptedException e) { throw new LockException(e); } } /** * 尝试获取锁 */ public boolean tryLock() { try { String splitStr = "_lock_"; if(lockName.contains(splitStr)) throw new LockException("lockName can not contains \\u000B"); //创建临时有序子节点 myZnode = zk.create(root + "/" + lockName + splitStr, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL); System.err.println(myZnode + " is created "); //取出所有子节点 List
subNodes = zk.getChildren(root, false); //取出所有lockName的锁 List
lockObjNodes = new ArrayList
(); for (String node : subNodes) { String _node = node.split(splitStr)[0]; if(_node.equals(lockName)){ lockObjNodes.add(node); } } //对所有节点进行默认排序,从小到大 Collections.sort(lockObjNodes); System.out.println(myZnode + "==" + lockObjNodes.get(0)); if(myZnode.equals(root+"/"+lockObjNodes.get(0))){ //如果是最小的节点,则表示取得锁 return true; } //如果不是最小的节点,找到比自己小1的节点 String subMyZnode = myZnode.substring(myZnode.lastIndexOf("/") + 1); //获取比当前节点小一级的节点(Collections.binarySearch(lockObjNodes, subMyZnode):获取当前节点的角标) waitNode = lockObjNodes.get(Collections.binarySearch(lockObjNodes, subMyZnode) - 1); } catch (KeeperException e) { throw new LockException(e); } catch (InterruptedException e) { throw new LockException(e); } return false; } public boolean tryLock(long time, TimeUnit unit) { try { if(this.tryLock()){ return true; } return waitForLock(waitNode,time); } catch (Exception e) { e.printStackTrace(); } return false; } /** * 等待获取锁 * @param lower :等待的锁 * @param waitTime 最大等待时间 * @return * @throws InterruptedException * @throws KeeperException */ private boolean waitForLock(String lower, long waitTime) throws InterruptedException, KeeperException { Stat stat = zk.exists(root + "/" + lower,true); //判断比自己小一个数的节点是否存在,如果不存在则无需等待锁,同时注册监听 if(stat != null){ System.out.println("Thread " + Thread.currentThread().getId() + " waiting for " + root + "/" + lower); this.latch = new CountDownLatch(1); this.latch.await(waitTime, TimeUnit.MILLISECONDS); this.latch = null; } return true; } /** * 取消锁监控 */ public void unlock() { try { System.out.println(Thread.currentThread().getId()+",unlock " + myZnode); zk.delete(myZnode,-1); myZnode = null; //zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } } /** * 关闭zk链接 */ public void closeZk(){ try { zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } } public void lockInterruptibly() throws InterruptedException { this.lock(); } public Condition newCondition() { return null; } /** * 自定义异常信息 * @author lenovo * */ public class LockException extends RuntimeException { private static final long serialVersionUID = 1L; public LockException(String e){ super(e); } public LockException(Exception e){ super(e); } }}

  这个代码里,大家可以改为自己的集群,如我的是master:2181,slave1:2181,slave2:2181

本文转自大数据躺过的坑博客园博客,原文链接:http://www.cnblogs.com/zlslch/p/7242381.html,如需转载请自行联系原作者

你可能感兴趣的文章
信用算力基于 RocketMQ 实现金融级数据服务的实践
查看>>
基于oauth 2.0 实现第三方开放平台
查看>>
kubernetes1.4 基础篇:Learn Kubernetes 1.4 by 6 steps(1):概要
查看>>
百万下载量的 Android 应用后台收集用户信息
查看>>
SQL Server 多表数据增量获取和发布 1
查看>>
C3P0连接池
查看>>
这 25 个开源机器学习项目,一般人我不告诉 Ta
查看>>
【WePY小程序框架实战四】-使用async&await异步请求数据
查看>>
iOS UIImageView(图片)
查看>>
可折叠显示的发光搜索表单
查看>>
PostgreSQL 10.1 手册_部分 II. SQL 语言_第 12 章 全文搜索_12.2. 表和索引
查看>>
java使用正则表达式判断手机号,固定电话,身份证,邮箱,url,车牌号,日期,ip地址,mac,人名等...
查看>>
新手也能轻松掌握的分布式系统「事务」技巧
查看>>
iOS开发之使用Git的基本使用(一)
查看>>
配置云存储网关在线服务支持多个互联VPC-高速通道版
查看>>
6个步骤从头开始编写机器学习算法:感知器案例研究
查看>>
NCalc 学习笔记 (三)
查看>>
NetBeans 成为 Apache 软件基金会顶级项目
查看>>
SSRF在Redis中反弹shell
查看>>
UML关系图
查看>>