zookeeper【封神录】下篇

news/2024/7/4 7:46:34 标签: zookeeper, 分布式, 云原生, idea, java, 后端, 中间件

目录

🥞1.客户端API 

🌭2.服务器动态上下线  

🧂3.分布式锁 


1.客户端API 

1.1导入依赖

java">    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.13.2</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>2.14.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.5.7</version>
        </dependency>
    </dependencies>

1.2代码实现

java">public class zkClient {
    //一定不要有空格
    private String connectString = "192.168.20.129:2181,192.168.20.130:2181,192.168.20.131:2181";
    private int sessionTimeOut = 2000;
    private ZooKeeper zkClient;

    /**
     * 初始话zookeeper
     * 参数1:连接地址
     * 参数2:超时时间
     * 参数3:监听器
     */
    @Before
    public void init() throws IOException {
        zkClient = new ZooKeeper(connectString, sessionTimeOut, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                List<String> children = null;
                try {
                    children = zkClient.getChildren("/", true);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                System.out.println("========================");
                for (String child : children) {
                    System.out.println(child);
                }
                System.out.println("========================");
            }
        });
    }

    /**
     * 创建子节点
     * 参数1:创建节点的路径
     * 参数2:节点的数据(转化为字节)
     * 参数3:节点的权限
     * 参数4:节点的类型(临时/永久)
     **/
    @Test
    public void create() throws InterruptedException, KeeperException {
        String nodeCreate = zkClient.create("/class", "s1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    }

    /**
     * 监控子节点变化
     * 参数1:要监控的节点目录
     * 参数2:监听器(true:使用初始化是的监听器)
     */
    @Test
    public void getChildren() throws InterruptedException, KeeperException {
        List<String> children = zkClient.getChildren("/", true);
        for (String child : children) {
            System.out.println(child);
        }
        //延时
        Thread.sleep(Long.MAX_VALUE);
    }

    /**
     * 判断节点是否存在
     * 参数1:判断的节点路径
     * 参数:是否使用监听器
     */
    @Test
    public void isExist() throws InterruptedException, KeeperException {
        Stat stat = zkClient.exists("/class", false);
        System.out.println(stat == null ? "不存在" : "存在");
    }
}

 1.3写数据原理

1.写入请求直接发送给Leader

  • 1.客户端发请求给Leader
  • 2.leader执行请求并应答,然后把请求分发给下一个follower
  • 3.follower会执行请求并应答。
  • 4.当应答数超过半数,Leader就会回复客户端,完成了写请求
  • 5.leader会继续发送写请求给剩下的follower

2.写入请求发送给Follower

  • 1.客户端发请求给follower,follower没有写权限,立即把写请求发给leader
  • 2.leader执行写请求并应答,然后把写请求分发给follower
  • 3.follower会执行请求并应答。
  • 4.当应答数超过半数,Leader回复follower,由follower回复客户端,完成了写请求
  • 5.leader会继续发送写请求给剩下的follower

2.服务器动态上下线 

2.1客户端

  • 1.获取zookeeper连接
  • 2.监听节点的变化
  • 3.业务逻辑(睡眠)
java">public class DisClient {
    private String connectString = "192.168.20.129:2181,192.168.20.130:2181,192.168.20.131:2181";
    private int sessionTimeOut = 2000;
    private ZooKeeper zooKeeper;

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        DisClient client = new DisClient();
        //1.获取zk连接
        client.getConnect();
        //2.监听/servers下面的节点变化
        client.getServerList();
        //3.业务逻辑
        client.business();

    }

    private void business() throws InterruptedException {
        Thread.sleep(Long.MAX_VALUE);
    }

    /**
     * 监听服务端(获取节点信息)
     * @throws InterruptedException
     * @throws KeeperException
     */
    private void getServerList() throws InterruptedException, KeeperException {
        List<String> children = zooKeeper.getChildren("/servers", true);
        //服务器地址存放到集合中
        ArrayList<String> list = new ArrayList<>();
        for (String child : children) {
            byte[] data = zooKeeper.getData("/servers/" + child, false, null);
            list.add(new String(data));
        }
        System.out.println(list);

    }

    /**
     * 初始话zookeeper
     * @throws IOException
     */
    private void getConnect() throws IOException {
        zooKeeper = new ZooKeeper(connectString, sessionTimeOut, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {

                try {
                    getServerList();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (KeeperException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

2.2服务端 

  • 1.获取zookeeper连接
  • 2.创建节点(服务端注册到zookeeper
  • 3.业务逻辑(睡眠)
java">/**
 * 服务端注册zookeeper
 */
public class DisServer {
    private String connectString = "192.168.20.129:2181,192.168.20.130:2181,192.168.20.131:2181";
    private int sessionTimeOut = 2000;
    private ZooKeeper zooKeeper;

    public static void main(String[] args) throws Exception {
        DisServer dIsServer = new DisServer();
        //1.获取zk连接
        dIsServer.getConnect();
        //2.注册服务器到zk节点(创建节点)
        dIsServer.register(args[0]);
        //3.启动业务逻辑
        dIsServer.business();
    }

    private void business() throws InterruptedException {
        Thread.sleep(Long.MAX_VALUE);
    }

    /**
     * 注册服务器(创建节点)
     * @param hostname
     * @throws InterruptedException
     * @throws KeeperException
     */
    private void register(String hostname) throws InterruptedException, KeeperException {
        String create = zooKeeper.create("/servers/"+hostname, hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println(hostname + "已经上线");
    }


    /**
     * 初始化zookeeper
     * @throws IOException
     */
    private void getConnect() throws IOException {
        zooKeeper = new ZooKeeper(connectString, sessionTimeOut, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {

            }
        });
    }
}

3.分布式锁 

Zookeeper的分布式锁实现基于其znode(zk节点)功能。每个znode都可以有数据和子节点,并且每个znode都有一个版本号。Zookeeper的分布式锁利用了znode的版本号特性,同时使用watcher机制实现分布式锁的互斥

3.1执行流程

  1. 当一个客户端需要获取锁时,它会在Zookeeper上创建一个临时且有序的znode节点。
  2. 客户端通过获取Zookeeper上的znode列表,并判断自己创建的节点是否是所有节点中最小的那个,如果是,则表示客户端获得了锁。
  3. 如果客户端没有获得锁,则监听它前面(比它序号小的)的节点,等待锁的释放。
  4. 当客户端释放锁时,它会删除自己创建的znode节点,此时,Zookeeper会通知正在等待前面的节点上的watcher机制,让等待锁的客户端尝试重新获取锁

3.2代码实现

java">/**
 * 分布式锁
 */
public class ZkLock {
    private final String connectString = "192.168.20.129:2181,192.168.20.130:2181,192.168.20.131:2181";
    private final int sessionTimeOut = 2000;
    private final ZooKeeper zooKeeper;
    private String path;

    private CountDownLatch countDownLatch = new CountDownLatch(1);
    private CountDownLatch countPathLatch = new CountDownLatch(1);
    private String currentNode;

    //构造器初始化
    public ZkLock() throws IOException, InterruptedException, KeeperException {
        //1.获取链接
        zooKeeper = new ZooKeeper(connectString, sessionTimeOut, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                //countDownLatch 连接上zookeeper,释放
                if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
                    countDownLatch.countDown();
                }
                //countPathLatch 释放
                if (watchedEvent.getType() == Event.EventType.NodeDeleted && watchedEvent.getPath().equals(path)) {
                    countPathLatch.countDown();

                }
            }
        });
        //等待zookeeper正常连接后,往下执行程序
        countDownLatch.await();

        //2.判断根节点locks是否存在
        Stat stat = zooKeeper.exists("/locks", false);
        if (stat == null) {
            //说明不存在,创建根节点
            zooKeeper.create("/locks", "locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }

    }


    //3.加锁
    public void zkLock() {
        //创建对应的临时带序号的节点
        try {
            currentNode = zooKeeper.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

            //判断创建的节点是否是最小的序号节点,如果是,获取到锁,如果不是,监听他前一个结点
            List<String> children = zooKeeper.getChildren("/locks", false);

            //如果只有一个值,直接获取锁,如果不是,则判断
            if (children.size() == 1) {
                //直接枷锁
                return;
            } else {
                //对节点进行排序
                Collections.sort(children);
                //获取节点名称
                String thisNode = currentNode.substring("/locks/".length());
                //通过界节点名称,获取在集合中的下标
                int index = children.indexOf(thisNode);

                //判断下标
                if (index == -1) {
                    System.out.println("数据异常");
                } else if (index == 0) {//第一个数据
                    //直接枷锁
                    return;
                } else {//说明多个节点,进行监听前一个节点
                    path = "/locks/" + children.get(index - 1);
                    zooKeeper.getData(path, true, new Stat());
                    //等待监听
                    countPathLatch.await();
                    return;
                }
            }
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    //4.解锁
    public void unZkLock() {
        //删除节点
        try {
            zooKeeper.delete(currentNode, -1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }
}

3.3线程测试 

java">public class ZkLockTest {
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        ZkLock zkLock1 = new ZkLock();
        ZkLock zkLock2 = new ZkLock();
        ZkLock zkLock3 = new ZkLock();

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    zkLock1.zkLock();
                    System.out.println("线程1,获取到锁");
                    Thread.sleep(3 * 1000);
                    zkLock1.unZkLock();
                    System.out.println("线程1,释放锁");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    zkLock2.zkLock();
                    System.out.println("线程2,获取到锁");
                    Thread.sleep(3 * 1000);
                    zkLock2.unZkLock();
                    System.out.println("线程2,释放锁");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    zkLock3.zkLock();
                    System.out.println("线程3,获取到锁");
                    Thread.sleep(3 * 1000);
                    zkLock3.unZkLock();
                    System.out.println("线程3,释放锁");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

4.Curator框架 

Curator是Apache ZooKeeper的一个高级客户端库,旨在使开发人员更容易编写可靠的分布式系统。它为ZooKeeper提供了许多有用的功能,包括连接管理分布式选举缓存观察。Curator还提供了一组易于使用的API,可以轻松管理ZooKeeper的节点和数据。

4.1添加依赖

java">        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>5.5.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-client</artifactId>
            <version>5.5.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>5.5.0</version>
        </dependency>

4.2代码实现

java">    /**
     * 客户端连接
     * @return
     */
    private static CuratorFramework getCuratorFramework() {

        //创建zookeeper的客户端:重试策略,初始化每次重试之间需要等待的时间,基准等待时间为3秒
        ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3);

        CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.20.129:2181,192.168.20.131:2181,192.168.20.130:2181")
                .connectionTimeoutMs(2000)
                .sessionTimeoutMs(2000)
                .retryPolicy(policy).build();
        client.start();
        System.out.println("zookeeper启动~");
        return client;
    }

4.3创建线程测试

java">InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");
        InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock1.acquire();
                    System.out.println("线程一获取到锁");
                    Thread.sleep(3000);
                    lock1.release();
                    System.out.println("线程一释放锁");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock2.acquire();
                    System.out.println("线程二获取到锁");
                    Thread.sleep(3000);
                    lock2.release();
                    System.out.println("线程二释放锁");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }


http://www.niftyadmin.cn/n/5305308.html

相关文章

x-cmd pkg | gitui - git 终端交互式命令行工具

目录 简介首次用户功能特点类似工具与竞品进一步探索 简介 gitui 由 Stephan D 于 2020 年使用 Rust 语言构建的 git 终端交互式命令行工具&#xff0c;旨在终端界面中便捷管理 git 存储库。 首次用户 使用 x gitui 即可自动下载并使用 在终端运行 eval "$(curl https:/…

数据库-创建表

create table 表的名字([表定义选项]表定义选项 (列名1 类型 &#xff0c;列名2 类型&#xff0c;…&#xff0c;n 类型】 create table Class(class_id int ,class_name varchar(128),class_teachar varchar(64)) ;create table 表的名字([表定义选项][表的单选项] [表的分区…

2023APMCM亚太数学建模C题 - 中国新能源汽车的发展趋势(2)

五&#xff0e;问题二模型建立和求解 5.1 问题二模型建立和求解 针对题目二&#xff0c;题目要求收集中国新能源电动汽车行业发展数据&#xff0c;建立数学模型描述&#xff0c;并预测未来十年的发展。由于在第一文中&#xff0c;我们已经收集了一定的新能源行业发展数据&…

我的2023

前言 时间总是在人没有注意到的时候就悄然流逝了&#xff0c;一晃而过&#xff0c;我搭建《广然笔记》这个独立博客也已经过去了五年。只言片语回顾一下博客的五年历程&#xff0c;顺便写下2023年的一年总结&#xff0c;同时展望一下2024年。 博客的五年历程 想法起源 关于…

CentOS 8 基于官方源码制作openssh 9.6 rpm包(含ssh-copy-id、openssl) —— 筑梦之路

CentOS 8 制作openssh9.6 rpm(含ssh-copy-id命令)包 —— 筑梦之路_centos8 rpmbuild -ba openssh.spec 9.6-CSDN博客 CentOS 8 制作openssh9.0/9.2/9.3 rpm包——筑梦之路_centos8 openssl rpm包-CSDN博客 效果截图&#xff1a; 和使用官方默认的spec文件制作的区别&#xff…

nodejs发送消息给钉钉机器人

1.钉钉添加机器人 1.1 新建一个群 --> 群设置 --> 机器人 1.2 机器人管理 --> 添加机器人 1.3 机器人--> 自定义-->添加 1.4 配置信息 备注1&#xff1a;密钥复制出来SEC2c689174c4a8ed49c8a7309a490cd98e0e7f7bc788bb7232d53c738eb5f5d008 备注2&#xff1a;…

SpringBoot+RocketMQ集群(dledger)部署完整学习笔记

文章目录 前言一、单台集群部署二、多台集群部署1.修改配置2.dashboard修改 三、整合springboot1.引入pom和修改yml2.编写消费者3.编写生产者4.测试效果 总结 前言 RocketMQ集群方式有好几种 官网地址 https://rocketmq.apache.org/zh/docs/4.x/deployment/01deploy 2m-2s-asy…

Docker-Compose部署Redis(v7.2)主从模式

文章目录 一、前提准备1. redis配置文件2. 下载redis镜像3. 文件夹结构 二、docker-compose三、主从配置1.主节点配置文件2.从节点配置文件 四、运行五、测试 环境 docker desktop for windows 4.23.0redis 7.2 一、前提准备 1. redis配置文件 因为Redis 7.2 docker镜像里面…