侧边栏壁纸
博主头像
高大北博主等级

所有的再见中,我最喜欢明天见

  • 累计撰写 208 篇文章
  • 累计创建 151 个标签
  • 累计收到 20 条评论
标签搜索

目 录CONTENT

文章目录

Zookeeper分布式协调工具【入门到精通】

高大北
2022-06-21 / 0 评论 / 1 点赞 / 341 阅读 / 3,439 字 / 正在检测是否收录...

什么是Zookeeper

官方文档上这么解释zookeeper,它是一个分布式服务框架,是Apache Hadoop 的一个子项目,它主要是用来解决分布式应用中经常遇到的一些数据管理问题,如:统一命名服务、状态同步服务、集群管理、分布式应用配置项的管理等。
上面的解释有点抽象,简单来说zookeeper=文件系统+监听通知机制。

Zookeeper应用场景

  1. 注册中心(Dubbo+Zookeeper)
  2. 分布式配置中心(统一存放配置文件)
  3. 分布式锁
  4. 分布式队列
  5. 分布式文件系统

zookeeper相关特性

1、高效
适用于大型的分布式系统. 如果写多的话性能不高,因为它要做所有节点之间的数据同步。
2、可靠
支持集群,大部分可用即服务可用
3、顺序
所有写请求由leader生成递增zxid,写操作时,采用mvcc乐观锁机制进行写,保证所有写操作顺序。
4、简洁
对外提供的api非常实用、简洁。仅仅7个api
create - 在树形结构的位置中创建节点
delete - 删除一个节点
exists - 测试节点在指定位置上是否存在
get data - 从节点上读取数据
set data - 往节点写入输入
get chilren - 检索一个节点的子节点列表
sync - 等待传输数据

Zookeeper数据模型

  1. 临时节点
  2. 临时顺序节点
  3. 持久节点
  4. 持久顺序节点

ACL权限控制

ACL权限模型,实际上就是对树每个节点实现控制
身份的认证有4种方式:

  • world:默认方式,相当于全世界都能访问
  • auth:代表已经认证通过的用户(cli中可以通过addauth digest user:pwd 来添加当前上下文中的授权用户)
  • digest:即用户名:密码这种方式认证,这也是业务系统中最常用的
  • ip:使用Ip地址认证

Zookeeper单机版本

Linux环境安装Zookeeper(单机)

1.解压zk压缩包
https://logaaaaa.oss-cn-beijing.aliyuncs.com/zookeeper-3.4.14.tar.gz
tar -zxvf zookeeper-3.4.14.tar.gz

  1. 进入到zk目录
    cd zookeeper-3.4.14

3.在zk目录中创建data和logs文件夹
mkdir data
mkdir logs

4.进入到conf目录,修改文件名称
mv zoo_sample.cfg zoo.cfg
vi zoo.cfg

dataDir =/usr/local/zookeeper-3.4.14/data
dataLogDir=/usr/local/zookeeper-3.4.14/logs
5.启动zk
./zkServer.sh start
./zkServer.sh status

Java语言操作ZK

依赖

      <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.7</version>
        </dependency>
    /**
     * 连接地址
     */
    private static final String Address="192.168.31.185:2181";
    /**
     * 超时时间
     */
    private static final int TIMEOUT_PARAM=5000;
    /**
     * 事件通知
     */
    private static final String WATCHER="";
    /**
     * 计数器
     */
    private static CountDownLatch countDownLatch = new CountDownLatch(1);
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        ZooKeeper zooKeeper = new ZooKeeper(Address, TIMEOUT_PARAM, new Watcher() {
            //获取到该链接是否成功
            @Override
            public void process(WatchedEvent watchedEvent) {
                Event.KeeperState state = watchedEvent.getState();

                if (state == Event.KeeperState.SyncConnected) {
                    System.out.println("连接成功");
                    countDownLatch.countDown();
                }
            }
        });
        System.out.println("zk正在连接等待");
        countDownLatch.await();
        System.out.println("开始创建节点");
        /**
         * 创建节点
         * 1。路径名称
         * 2。节点value
         * 3。节点权限
         * 4。节点类型 4种类型  临时节点,持久化节点。临时有序号节点。持久有序号节点。
         */
        if(zooKeeper.exists("/test1", false) == null) {
         String s = zooKeeper.create("/test1", "gt1f".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
         System.out.println(s);
        }
        zooKeeper.close();
    }

创建账号以及设置连接账户

// 创建账号权限 admin可以实现读写操作
Id id1 = new Id("digest", DigestAuthenticationProvider.generateDigest("admin:admin123"));
ACL acl1 = new ACL(ZooDefs.Perms.ALL, id1);
// 设置zk连接账号
zooKeeper.addAuthInfo("digest", "guest:guest123".getBytes());

实现服务注册与发现

服务注册端

@Component
public class ApplicationRunnerImpl implements ApplicationRunner {

    @Value("${server.port}")
    private String serverPort;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        start();
    }

    public void start() throws IOException, KeeperException, InterruptedException {
        ZooKeeper zooKeeper = new ZooKeeper("192.168.31.185", 50000, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                System.out.println(watchedEvent.getState().name());
            }
        });
        String parentPath = "/gtf";
        Stat exists = zooKeeper.exists(parentPath, new Watcher() {
            @Override
            public void process(WatchedEvent event) {

            }
        });
        // 如果当前父节点不存在的情况 就创建
        if (exists == null) {
            zooKeeper.create(parentPath, "gtf".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
        // 将当前服务信息注册到zk上
        String data = "http://127.0.0.1:" + serverPort;
        zooKeeper.create(parentPath + "/" + serverPort, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
    }
}
客户端
private static final CountDownLatch countDownLatch = new CountDownLatch(1);

public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        ZooKeeper zooKeeper = new ZooKeeper("192.168.31。185", 50000, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
        Event.KeeperState state = watchedEvent.getState();
        // 如果当前连接成功,则开始放心
        if (state == Event.KeeperState.SyncConnected) {
        System.out.println("zk连接成功~~");
        countDownLatch.countDown();
        }
        }
        });
        countDownLatch.await();
        String path = "/gtf";
        // 获取该节点下子集
        List<String> children = zooKeeper.getChildren(path, null, new Stat());
        for (int i = 0; i < children.size(); i++) {
        String pathChildren = path + "/" + children.get(i);
        byte[] data = zooKeeper.getData(pathChildren, null, new Stat());
        System.out.println("服务接口地址:" + new String(data));
        }
        }

Linux环境安装Zookeeper(集群)

原理:投票过半机制 zk选举策略领导就采用过半机制 两阶段提交协议
注意:首先需要明确zookeeper选举的规则:leader选举,要求 可用节点数量 > 总节点数量/2 。注意 是 > , 不是 ≥。

进入conf目录,修改 zoo.cfg

server.1=192.168.31.185:2888:3888
server.2=192.168.31.186:2888:3888
server.3=192.168.31.233:2888:3888

删除data/下的version-2 zookeeper_server.pid

在data目录,新建 myid 内容为 1 2 3 这数值不允许重复
image-1655967982223

Zookeeper一致性原理

强制一致性:数据一修改数据立马发生变更
弱一致性:数据一修改,在网络同步。允许数据读取之前的数据
最终一致性:数据一修改,允许数据有短暂延迟。

Zookeeper如何解决分布式一致性问题 ZAB协议底层两阶段提交协议

Zk集群是由多个Server节点组成了一个集群,只有一个Leader节点;其他节点类型都是为Follower类型。

Zk中分为三种节点:

  1. Leader类型 领导类型 负责写的请求,和各个节点同步;
  2. Follower类型 跟随者 负责读的请求和投票决议
  3. ObServer类型 观察者 和Follower大部分特征都是一样的,唯一区别就是不能参与选举和投票
    为什么要使用ObServer类型,主要不影响原来本身选举的时间的效率、目的是提高客户端查询效率;

zk数据同步

Zk集群是由多个Server节点组成了一个集群,只有一个Leader节点;其他节点类型都是为Follower类型。

  1. 每个Follower节点保存了Leader节点副本数据;
  2. 全局保证数据一致性问题
  3. 分布式读写分开 写的请求统一交给Leader实现,Follower或者是ObServer节点主要实现读的操作;
    注意:如果我们连接的节点类型为ObServer或者Follower情况下做写的操作的时候直接转发到Leader实现写

ZAB原子广播协议 核心是保证各个节点数据同步问题,ZAB协议中两种模式

(恢复模式、广播模式)
恢复模式:选举新的Leader
广播模式:解决每个节点数据同步问题

zk与eureka区别

网络分区脑裂

在集群的情况下,一般只会有选举一个master节点、其他节点都是为从接地安慰你,那么如果网络发生抖动或者部分节点无法实现通讯 那么就会导致部分节点从新实现选举,这样就会存在多个master节点。

相同点:Eureka和Zookeeper都是可以实现服务注册中心。

不同点:

Zookeeper保证CP数据一致性问题,(原理ZAB原子广播协议),当zk在某种情况下出现了宕机,会重新实现对zk选举新的领导(恢复机制),如果zk选举的新的领导时间过长的话,
或者投票没有过半数 ,那么会导致整个zk集群环境不可用,这也以为者服务注册中心不可用,所以zk必须保证数据一致性;

Eureka保证ap,设计思想有限考虑可用性、完全去中心化服务注册中心,每个节点都是均等的;Eureka集群没有主从之分,几个节点挂掉也不会影响到整个Eureka的使用,Eureka客户端发现连接时的可用的话,自动切换到下一个eureka连接,只要保证eureka有一个节点存在的话,就可以保证整个服务注册中心使用。

为什么SpringCloud选择Eureka作为注册中心而不是Zookeeper呢?

首先在这时候我们明白一点:

服务注册中心,可以短暂读取以前服务注册列表信息,但是不可用接受节点宕机不可用。

CAP概念

① C:Consistency,一致性,数据一致更新,所有数据变动都是同步的。
② A:Availability,可用性,系统具有好的响应性能。
③ P:Partition tolerance,分区容错性。以实际效果而言,分区相当于对通信的时限要求。系统如果不能在时限内达成数据一致性,就意味着发生了分区的情况,必须就当前操作在C和A之间做出选择,也就是说无论任何消息丢失,系统都可用。

目前我们当前技术环境下,不能同时满足CA,但是可以满足CP或者AP

Zookeeper实现分布式锁

Zookeeper事件通知

pom

        <!-- java语言连接zk -->
        <dependency>
            <groupId>com.101tec</groupId>
            <artifactId>zkclient</artifactId>
            <version>0.8</version>
        </dependency>
    /**
     * 连接地址
     */
    private static final String Address="192.168.31.185:2181";
    /**
     * 超时时间
     */
    private static final int TIMEOUT_PARAM=5000;
    /**
     * 事件通知
     */
    private static final String WATCHER="";
    /**
     * 计数器
     */
    private static CountDownLatch countDownLatch = new CountDownLatch(1);

    public static void main(String[] args) {
        //创建zookeeper连接
        ZkClient zkClient = new ZkClient(Address, TIMEOUT_PARAM);
        String parentPath="/gtf-service";
        //开始监听子节点发生的变化
        zkClient.subscribeChildChanges(parentPath, new IZkChildListener() {
            @Override
            public void handleChildChange(String s, List<String> list) throws Exception {
                System.out.println("s:"+s +",节点发生了变化");
                list.forEach((t)->{
                    System.out.println(t);
                });
            }
        });
        //监听节点的value数值是否发生变化
        zkClient.subscribeDataChanges(parentPath, new IZkDataListener() {
            //节点内容是否发生变化
            @Override
            public void handleDataChange(String s, Object o) throws Exception {
                System.out.println("s:"+s+"0:"+o);
            }
            //监听该节点是否可以删除
            @Override
            public void handleDataDeleted(String s) throws Exception {
                System.err.println("s删除:"+s);
            }
        });
//        zkClient.writeData(parentPath,"gtf-servicexiugaui");
        zkClient.delete(parentPath+"/zjiedian");
        while (true){

        }
//        zkClient.close();
    }

Zookeeper分布式锁实现

Zookeeper实现分布式锁的思路:
节点保证唯一、事件通知、临时节点(生命周期和Session会关联)
创建分布式锁原理:

  1. 多个jvm同时在Zookeeper上创建相同的临时节点(lockPath)
  2. 因为临时节点路径保证唯一的性,只要谁能够创建成功谁就能够获取锁,就可以开始执行业务逻辑;
  3. 如果节点已经给其他请求创建的话或者是创建节点失败,当前的请求实现等待;
    释放锁的原理:
    .因为我们采用临时节点,当前节点创建成功,表示获取锁成功;正常执行完业务逻辑调用Session关闭连接方法,当前的节点会删除;----释放锁
    其他正在等待请求,采用事件监听如果当前节点被删除的话,有重新进入到获取锁流程;
    临时节点+事件通知

使用模版方式进行分布式锁

创建一个接口

/**
* 
*/
public interface Lock {
  /**
   * 获取锁
   */
  void getLock();

  /**
   *  释放锁
   *
   */
  void unlock();
}

创建一个模版

public abstract  class AbstractTemplateLock implements Lock{

    @Override
    public void getLock() {
        //定义一个共同抽象的骨架。
       if (tryLock()){
           System.out.println("》》》"+Thread.currentThread().getName() + "获取锁成功");
       }else {
           //获取锁失败  ,开始等待。
           waitLock();//时间监听
           getLock();
       }

    }

    protected  abstract  void waitLock() ;
    protected  abstract boolean tryLock() ;
    protected  abstract boolean unLockImpl() ;
    @Override
    public void unlock() {
        //关闭zk连接
        unLockImpl();
    }
}

创建一个实现

public class ZkTemplzateLock  extends AbstractTemplateLock {
    /**
     * 连接地址
     */
    private static final String Address="192.168.31.185:2181";
    /**
     * 超时时间
     */
    private static final int TIMEOUT_PARAM=5000;
    /*
     创建zk连接
     */
    private ZkClient zkClient = new ZkClient(Address, TIMEOUT_PARAM);

    /**
     * 共同的创建锁 名称
     */
    private String localPath="/lockgtf";

    private CountDownLatch countDownLatch=null;
    @Override
    protected void waitLock() {
    //事件监听,监听节点是否被删除。
        IZkDataListener iZkDataListener = new IZkDataListener() {
            @Override
            public void handleDataChange(String s, Object o) throws Exception {

            }

            @Override
            public void handleDataDeleted(String s) throws Exception {
                if (countDownLatch != null) {
                    countDownLatch.countDown();
                }
            }
        };
        zkClient.subscribeDataChanges(localPath, iZkDataListener);

        if (countDownLatch == null) {
            countDownLatch=new CountDownLatch(1);
        }
        try {
            countDownLatch.await();//如果当前计数器不为0,一直等待。
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //1。如果当前节点被删除的节点下,开始重新获取锁。
        zkClient.unsubscribeDataChanges(localPath,iZkDataListener);

    }

    @Override
    protected boolean tryLock() {
        //获取锁的思想,多个节点同时创建,只要有一个创建成功 就是创建成功
        try {
            zkClient.createEphemeral(localPath);
            return true;
        }catch (Exception e) {
            //如果创建节点已经存在 就会异常。
            return false;
        }
    }

    @Override
    protected boolean unLockImpl() {
        if (zkClient!=null) {
            zkClient.close();
            System.out.println(Thread.currentThread().getName() + "释放锁成功");
            return true;
        }
        return false;
    }
}
1

评论区