整套大数据学习资料(视频+笔记)百度网盘无门槛下载:http://www.edu360.cn/news/content?id=3377

第十四章 关于 ZooKeeper

hadoop 花牛 30℃ 0评论

关于 

ZooKeeper


迄今为止,本书都是在教我们大规模数据处理技术。但本章的内容则有所不同,将介绍如何使用ZooKeeper来构建一般的分布式应用。ZooKeeperHadoop的分布式协调服务。

写分布式应用的主要困难在于会出现“部分失败”(partial failure)。当一条消息在网络中两个节点之间传送时,如果出现网络错误,发送者无法知道接收者是否已经收到这条消息。接收者可能在出现网络错误之前就已经收到这条消息,也有可能没有收到,又或者接收者的进程已经死掉。发送者能够获得真实情况的唯一途径就是重新连接接收者,并向它发出询问。这种情况就是部分失畋,即我们甚至不知道一个操作是否已经失败。

由于部分失败是分布式系统固有的特征,因此使用ZooKeeper并不能避免出现部分失败,当然它也不会隐藏部分失败。ZooKeeper可以提供一组工具,使你在构建分布式应用时能够对部分失败进行正确处理。

ZooKeeper具有以下特点。

ZooKeeper是简单的

ZooKeeper的核心是一个精简的文件系统,它提供一些简单的操作和一些额外的抽象操作,例如,排序和通知。

ZooKeeper是富有表现力的

ZooKeeper的基本操作是一组丰富的“构件”(building block),可用于实现多种协调数据结构和协议。相关的例子包括:分布式队列、分布式锁和一组节点中的“领导者选举”(leader election)

ZooKeeper具有高可用性

ZooKeeper运行于一组机器之上,并且在设计上具有高可用性,因此应用程序完全可以依赖于它。 ZooKeeper可以帮助系统避免出现单点故障,因此可以用于构建一个可靠的应用程序。

ZooKeeper采用松耦合交互方式

ZooKeeper支持的交互过程中,参与者不需要彼此了解。例如,ZooKeeper可以被用于实现“数据汇集”(rendezvous)机制,让进程在不了解其他进程(或网络状况)的情况下能够彼此发现并进行信息交互。参与的各方甚至可以 不必同时存在,因为一个进程可以在ZooKeeper中留下一条消息,在该进程结束后,另外一个进程还可以读取这条消息。

ZooKeeper是一个资源库

ZooKeeper提供了一个通用协调模式实现方法的开源共享库,使程序员免于编写这类通用的协议(这通常 是很难写正确的)。所有人都能够对这个资源库进行添加和改进,久而久之,会使每个人都从中受益。

同时,ZooKeeper也是高性能的。在它的诞生地Yahoo!公司,对于以写操作为主的工作负载来说,ZooKeeper的基准吞吐量已经超过每秒10 000个操作,对于常规的以读操作为主的工作负载来说,吞吐量更是高出好几倍。

1. 安装和运行ZooKeeper

首次尝试使用ZooKeeper时,最简单的方式是在一台ZooKeeper服务器上以独立模式(standalone mode)运行。例如,可以在一台用于开发的机器上尝试运行。运行ZooKeeper需要Java 6,因此首先要确认已经安装了 Java 6ZooKeeper提供了 Windows版本的脚本,因此在Windows环境中运行 ZooKeeper时不需要Cygwin。(Windows只能被作为开发平台,而不能作为生产平台。)

可以从ApacheZooKeeper发布页面下载ZooKeeper的一个稳定版本,然后在合适的位置将下载的压缩包解压:

% tar xzf zookeeper-x.y.z.tar.gz

ZooKeeper提供了几个能够运行服务并与之交互的二进制可执行文件,可以很方便地将包含这些二进制文件的目录加入命令行路径:

% export ZOOKEEPER_INSTALL=/home/tom/zookeeper-x.y.2

% export PATH=$PATH:$ZOOKEEPER_INSTALL/bin

在运行ZooKeeper服务之前,我们需要创建一个配置文件。这个配置文件习惯上被命名为zoo. cfg,并被保存在conf子目录中(也可以把它保存在/etc/zookeeper子目录中,如果设置了环境变量ZOOCFGDIR,也可以保存在该环境变量所指定的目录中)。配置文件的内容示例如下:

tickTime=2000

dataDir=/Users/tom/zookeeper

clientPort=2181

这是一个标准的Java属性文件,本例中所定义的三个属性是以独立模式运行ZooKeeper所需的最低要求。简单地说,tickTime属性指定了ZooKeeper中的基本时间单元(以毫秒为单位),dataDir属性指定了 ZooKeeper存储持久数据的本地文件系统位置;clientPort属性指定了ZooKeeper用于监听客户端连接的端口(通常使用2181端口)。用户应该将dataDir属性的值修改为自己系统所要求的合适位置。

定义好合适的配置文件之后,我们现在可以启动一个本地ZooKeeper服务器:

% zkServer.sh start

使用nc(telnet也可以)发送ruok命令(Are you OK?)到监听端口,检查ZooKeeper是否正在运行:

% echo ruok | nc localhost 2181

imok

imokZooKeeper在说“I'm OK”。还有其他一些用于管理ZooKeeper的命令,都采用类似的四字母组合,如表14-1所示。

14_1. ZooKeeper命令:四字母组合

类别

命令

描述

服务器状态

ruok

如果服务器正在运行并且未处于出错状态,则输出imok

conf

输出服务器的配置信息(根据配置文件zoo.cfg)

envi

输出服务器的环境信息,包括ZooKeeper版本、Java版本 和其他系统属性

srvr

输出服务器的统计信息,包括延迟统计、znode的数量和 服务器运行模式(standaloneleader follower)

stat

输出服务器的统计信息和已连接的客户端

srst

重置服务器的统计信息

isro

显示服务器是否处于只读(ro)模式(由于网络分区),或者读写(rw)模式

客户端连接

dump

列出集合体中的所有会话和短暂znode。必须连接到 leader才能够使用此命令(参考srvr命令)

cons

列出所有服务器客户端的连接统计信息

erst

重置连接统计信息

观察

wchs

列出服务器上所有观察的摘要信息

wchc

按连接列出服务器上所有的观察。注意:如果观察的数量 较多,此命令会影响服务器的性能

wchp

znode路径列出服务器上所有的观察。注意:如果观察 的数量较多,此命令会影响服务器的性能

监控

mntr

Java属性格式列出服务器统计信息。适合于用作 GangliaNagios等监控系统的信息源

 

除了mntr命令外,ZooKeeper还通过JMX来披露统计信息。从http://zookeeper.apache.org/上的ZooKeeper文档中可以获取详细的相关信息。在安装目录的src/contrib子目录中包含有相关的监控工具及方法。

2. 示例

假设有一组服务器用于为客户端提供某种服务。我们希望每个客户端都能找到其中一台服务器,这样它们就可以使用这项服务。在这个例子中,一个挑战是如何维护这组服务器的成员列表。

这组服务器的成员列表显然不能存储在网络中的单个节点上,否则该节点的故障将意味着整个系统的故障(我们希望这个成员列表是高度可用的)。我们先假设已经有了一种可靠的方法来解决成员列表的存储问题。接下来,如果其中一台服务器出现故障,我们需要解决如何从服务器成员列表中将它删除的问题。某个进程需要去负责删除故障服务器,但注意不能由故障服务器自己来完成,因为故障服务器已经不再运行!

我们所描述的不是一个被动的分布式数据结构,而是一个主动的、能够在某个外部事件发生时修改数据项状态的数据结构。ZooKeeper提供了这种服务,接下来让我们看看如何使用它来实现这种众所周知的组成员管理应用。

2.1. ZooKeeper中的组成员关系

理解ZooKeeper的一种方法就是将其看作一个具有高可用性特征的文件系 统。这个文件系统中没有文件和目录,而是统一使用“节点”(node)的概念,称为znodeznode既可以作为保存数据的容器(如同文件),也可以作为保存其他znode的容器(如同目录)。所有的znode构成了一个层次化的命名空间,一种自然的建立组成员列表的方式就是利用这种层次结构,创建一个以组名为节点名的znode作为父节点,然后以组成员名(服务器名)为节点名来创建作为子节点的znode。图14-1给出了一组具有层次结构的 znode

在这个示例中,我们没有在任何znode中存储数据,但在一个真实的应用中,你可以想象将成员相关的数据存储在它们的znode中,例如主机名。

image.png

2.2. 创建组

让我们通过写一段程序的方式来介绍ZooKeeperJava API,这段示例程序用于创建组名为/zooznode,参见范例14-1

范例14-1.该程序在Zookeeper中新建表示组的znode

public class CreateGroup implements Watcher {
    private static final int SESSION_TIMEOUT = 5000;
    private ZooKeeper zk;
    private CountDownLatch connectedSignal = new CountDownLatch(l);
    public void connect(String hosts) throws IOExgeption, InterruptedException {
        zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this);
        connectedSignal.await();
    }
    @Override
    public void process(WatchedEvent event) {
        // Watcher interface
        if (event.getState() == KeeperState.SyncConnected) {
            connectedSignal.countDown();
        }
    }
    public void create(String groupName) throws KeeperException, InterruptedException {
        String path = "/" + groupName;
        String createdPath = zk.create(path, null/*data*/, Ids.OPEN_ACL__UNSAFE, CreateMode.PERSISTENT);
        System.out.println("Created " + createdPath);
    }
    public void close() throws InterruptedException {
        zk.close();
    }
    public static void main(String[] args) throws Exception {
        CreateGroup createGroup = new CreateGroup();
        createGroup.connect(args[0]);
        createGroup.create(args[1]);
        createGroup.close();
    }
}

main()方法执行时,创建了一个CreateGroup的实例然后调用了这个实例的connect()方法。connect方法实例化了一个新的ZooKeeper类的对象,这个类是客户端API中的主要类,用于维护客户端和ZooKeeper服务之间的连接。ZooKeeper类的构造函数有三个参数:第一个参数是ZooKeeper服务的主机地址(可指定端口,默认端口是2181); 第二个参数是以毫秒为单位的会话超时参数(这里我们设成5),后文中将给出该参数的详细解释;第三个参数是一个Watcher对象的实例。Watcher对象接收来自于ZooKeeper的回调,以获得各种事件的通知。在这个例子中,CreateGroup是一个Watcher对象,因此我们将它传递给ZooKeeper构造函数。

当一个ZooKeeper的实例被创建时,会启动一个线程连接到ZooKeeper服务。由于对构造函数的调用是立即返回的,因此在使用新建的ZooKeeper对象前一定要等待其与ZooKeeper服务之间成功建立连接。我们使用JavaCountDownLatch(位于java.util.concurrent包中)来阻止使用新建的ZooKeeper对象,直到这个ZooKeeper对象已经准备就绪。Watcher类被用于获取ZooKeeper对象是否准备就绪的信息,在它的接口中只有一个方法:

public void process(WatchedEvent event);

当客户端已经与ZooKeeper服务建立连接后,Watcherprocess()方法会被调用,参数是一个用于表示该连接的事件。在接收到一个连接事件(Watcher.Event.KeeperState 的牧举型值 SyncConnected 来表示)时,我们通过调用CountDownLatchcountDown()方法来递减它的计数器。锁存器(latch)被创建时带有一个值为1的计数器,用于表示在它释放所有等待线程之前需要发生的事件数。在调用一次countDown()方法之后,计数器的值变为0,则await()方法返回。

现在connect()方法已经返回,下一个执行的是CreateGroupcreate()方法。在这个方法中,我们使用ZooKeeper实例中的create()方法来创建一个新的ZooKeeperznode。所需的参数包括:路径(用字符串表示)、znode的内容(字节数组,本例中使用空值)、访问控制列表(简称ACL,本例中使用了完全开放的ACL,允许任何客户端对znode进行读写)和创建znode的类型。

有两种类型的znode:短暂的(ephemeral)和持久的(persistent)。创建znode的客户端断开连接时,无论客户端是明确断开还是因为任何原因而终止,短暂znode都会被ZooKeeper服务删除。与之相反,当客户端断开连接时,持久znode不会被删除。我们希望代表一个组的znode存活的时间应当比创建程序的生命周期要长,因此在本例中我们创建了一个持久的znode

create()方法的返回值是ZooKeeper所创建的节点路径,我们用这个返回值来打印一条表示节点路径被成功创建的消息。当我们査看"顺序znode"(sequential znode)时,会发现create()方法返回的路径与传递给该方法的路径不同。

为了观察程序的执行,我们需要在本地机器上运行ZooKeeper,然后可以输入以下命令:

% export CLASSPATH=chl4/target/classes/:
$ZOOKEEPER_INSTALL/*:$ZOOKEEPER_INSTALL/lib/*:\$ZOOKEEPER-INSTALL/conf
% java CreateGroup localhost zoo
Created /zoo

2.3. 加入组

这个应用的下一部分是一段用于注册组成员的程序。每个组成员将作为一个程序运行,并且加入到组中。当程序退出时,这个组成员应当从组中被删除。为了实现这一点,我们在ZooKeeper的命名空间中使用短暂znode来代表一个组成员。

范例14-2中的程序JoinGroup实现了这个想法。在基类ConnectionWatcher中,对创建和连接ZooKeeper实例的程序逻辑进行了重构,参见范例14-3

范例14-2.该程序将成员加入组

public class JoinGroup extends ConnectionWatcher {
    public void join(String groupName, String memberName) throws KeeperException, InterruptedException {
        String path = "/"+ groupName + "/" + memberName;
        String createdPath = zk.create(path, null/*data*/, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        System.out.printIn("Created " + createdPath);
    }
    public static void main(String[] args) throws Exception {
        JoinGroup joinGroup = new 3oinGroup();
        joinGroup.connect(args[0]);
        joinGroup.join(angs[1], args[2]);
        // stay alive until process is killed or thread is interrupted
        Thread.sleep(Long.MAX_VALUE);
    }
}

范例14-3.该辅助类等待与ZooKeeper建立连接

public class ConnectionWatcher implements Watcher {
    private static final int SESSION_TIMEOUT = 5000;
    protected ZooKeeper zk;
    private CountDownLatch connectedSignal = new CountDownLatch(1);
    public void connect(String hosts) throws IOException, InterruptedException {
        zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this);
        connectedSignal.await();
    }
    @Override
    public void process(WatchedEvent event) {
        if (event.getState() == KeeperState.SyncConnected) {
            connectedSignal.countDown();
        }
    }
    public void close() throws InterruptedException {
        zk.close();
    }
}

JoinGroup的代码与CreateGroup的非常相似。在它的join()方法中,创建短暂znode作为组znode的子节点,然后通过休眠来模拟正在做某种工 作,直到该进程被强行终止。接着,你会看到随着进程终止,这个短暂znode ZooKeeper 删除。

14.2.4  列出组成员

现在,我们需要一段程序来査看组成员(参见范例14-4)

范例14-4.用于列出组成员的程序

public class ListGroup extends ConnectionWatcher {
    public void list(String groupName) throws KeeperException, InterruptedException {
        String path = "/" + groupName;
        try {
            List<String> children = zk.getChildren(path, false);
            if (children.isEmpty()) {
                System.out.printf("No members in group %s\n", groupName);
                System.exit(1);
            }
            for (String child : children) {
                System.out.println(child);
            }
        } catch (KeeperException.NoNodeException e) {
            System.out.printf("Group %s does not exist\n", groupName);
            System.exit(1);
        }
    }
    public static void main(String[] args) throws Exception {
        ListGroup listGroup = new ListGroup();
        listGroup.connect(args[0]);
        listGroup.list(args[1]);
        listGroup.close();
    }
}

list()方法中,我们调用了getChildren()方法来检索并打印输出一个znode的子节点列表,调用参数为该znode的路径和设为false的观察标志。如果在一个znode上设置了观察标志,那么一旦该znode的状态改变,关联的观察(Watcher)会被触发。虽然在这里我们没有使用观察,但在査看一个znode的子节点时,通过设置观察可以让应用程序接收到组成员加入、退出和组被删除的有关通知。

在这段程序中,我们捕捉了KeeperException.NoNodeException异常,代表组的znode不存在时,这个异常就会被抛出。

让我们看看ListGroup程序是如何工作的。起初,由于我们还没有在组中添加任何成员,因此zoo组是空的:

% java ListGroup localhost zoo
No members in group zoo

我们可以使用JoinGroup来向组中添加成员。由干这些作为组成员的 znode不会自己终止(因为sleep语句),所以我们以后台进程的方式来启动 它们:

% java DoinGroup localhost zoo duck &
% java DoinGroup localhost zoo cow &
% java DoinGroup localhost zoo goat &
% goat_pid=$!

最后一行命令保存了将goat添加到组中的Java进程的ID。我们需要保存这个进程ID,以便能够在查看组成员之后杀死该进程:

% java ListGroup localhost zoo
goat
duck
cow

为了从组中删哮一个成员,我们杀死了 goat所对应的进程:

% kill $goat_pid

几秒钟之后,由于该进程的ZooKeeper会话已经结束(超时设置为5),并

且所对应的短暂znode也已经被删除,所以goat会从组成员列表中消失。

% java ListGroup localhost zoo
duck
cow

让我们回顾一下,看看已经实现了哪些功能。对于参与到一个分布式系统中的节点,我们已经有了一个建立节点列表的方法。这些节点相互之间并不了解。例如,一个想使用列表中节点来完成某些工作的客户端,能够在这些节点不知情的情况下发现它们。

最后要注意的是组成员关系管理并不能解决与节点通信过程中出现的网络 问题。在与一个组中的成员节点进行通信的过程中可能会出现故障,这些故障必须以一种合适的方式来解决(重试、使用组中另外一个成员等)

ZooKeeper命令行工具

ZooKeeper提供了一个用于与其命名空间进行交互的命令行工具。我们可以使用这个工具列出/zoo znode之下的znode列表,如下所示:

% zkCli.sh localhost Is /zoo
Processing Is
WatchedEvent: Server state change. New state: SyncConnected [duck, cow] -0

不使用任何参数直接运行这个命令行工具,可以显示该工具的使用帮助。 

14.2.5删除组

为了使这个例子比较完整,让我们来看看如何删除一个组。ZooKeeper类提供了一个deleteO方法,该方法有两个参数:节点路径和版本号。如果所提供的版本号与znode的版本号一致,ZooKeeper会删除这个znode。这是一种乐观的加锁机制,使客户端能够检测出对znode的修改冲突。通过将版本号设置为-1,可以绕过这个版本检测机制,不管znode的版本号是什么而直接将其删除。

ZooKeeper不支持递归的删除操作,因此在删除父节点之前必须先删除子节点。在范例M-5中,DeleteGroup类用于删除一个组及其所有成员。

范例14-5.用于删除一个组及其所有成员的程序

public class DeleteGroup extends ConnectionWatcher {
    public void delete(String groupName) throws KeeperException, 
    InterruptedException {
        String path = "/" + groupName;
        try {
            List<String> children = zk.getChildren(path, false); 
            for (String child : children) { 
            zk.delete(path + "/" + child, -1〉;
            zk.delete(path, -1);
			}
        } catch (KeeperException.NoNodeException e) {
            System.out.printf("Group %s does not exist\n", groupName); 
            System.exit(1);
        }
    }
    public static void main(String[] args) throws Exception { 
        DeleteGroup deleteGroup = new DeleteGroup(); 
        deleteGroup.connect(args[0]); 
        deleteGroup.delete(args[1]); 
        deleteGroup.close();
    }
}

最后,我们可以删除之前所创建的zoo:

% java DeleteGroup localhost zoo 
% java ListGroup localhost zoo
Group zoo does not exist


14.3 ZooKeeper 服务

ZooKeeper是一个具有髙可用性的高性能协调服务。在本节中,我们将从三个方面来了解这个服务:模型、操作和实现。

14.3.1数据模型

ZooKeeper维护着一个树形层次结构,树中的节点被称为znodeznode以用于存储数据,并且有一个与之相关联的ACLZooKeeper被设计用来实现协调服务(这类服务通常使用小数据文件),而不是用于大容量数据存储,因此一个znode能存储的数据被限制在1MB以内。

ZooKeeper的数据访问具有原子性。客户端在读取一个zonde的数据时,要么读到所有的数据,要么读操作失败,不会只读到部分数据。同样,一个写操作将替换znode存储的所有数据。ZooKeeper会保证写操作不成功就失败,不会出现部分写之类的情况,也就是不会出现只保存客户端所写部分数据的情况。ZooKeeper不支持添加操作。这些特征都是与HDFS所不同的。HDFS被设计用于大容量数据存储,支持流式数据访问和添加操作。

znode通过路径被引用。像Unix中的文件系统路径一样,在ZooKeeper中路径被表示成用斜杠分隔的Unicode字符串。与Unix中的文件系统路径不 同的是,ZooKeeper中的路径必须是绝对路径,也就是说每条路径必须从一个斜杠字符开始。此外,所有的路径表示必须是规范的,即每条路径只有唯一的一种表示方式,不支持路径解析。例如,在Unix中,一个具有路径 /a/b的文件也可以通过路径/a/./b来表示,原因在于在Unix的路径中表示当前目录表示当前目录的—级目录)。在ZooKeeper中,

不具有这种特殊含义,这样表未的路径名是不合法的。

ZooKeepei中,路径由Unicode字符串构成,并且有一些限制(见 ZooKeeper的参考文档)。字符串zookeeper是一个保留词,不能将它作为路径表示中的一部分。需要特别指出的是,ZooKeeper使用/zooAeeper子树来保存管理信息,例如关于配额的信息。

注意,ZooKeeper的路径与URI不同,前者在Java API中通过java.lang.String来使用,而后者是通过 Hadoop Path (或 java.net.URI类)来使用。 

znode有一些性质非常适合用于构建分布式应用,我们将在接下来的几个小节中进行讨论。

1.短暂 znode

znode有两种类型:短暂的和持久的。znode的类型在创建时被确定并且之后不能再修改。在创建短暂znode的客户端会话结束时,ZooKeeper会将该短暂znode删除。相比之下,持久znode不依赖于客户端会话,只有当客户端(不一定是创建它的那个客户端)明确要删除该持久znode时才会被删除。

短暂znode不可以有子节点,即使是短暂子节点。 虽然每个短暂znode都会被绑定到一个客户端会话,但它们对所有的客户端还是可见的(当然,还是要符合其ACL的定义)。

对于那些需要知道特定时刻有哪些分布式资源可用的应用来说,使用短暂znode是一种理想的选择。本章前面的例子就使用了短暂znode来实现一个组成员管理服务,让任何进程都知道在特定的时刻有哪些组成员可用。

2.顺序号

顺序(sequentia)znode是指名称中包含ZooKeeper指定顺序号的znode。如果在创建znode时设置了顺序标识,那么该Mode名称之后便会附加一个值,这个值是由一个单调递增的计数器(由父节点维护)所添加的。

例如,如果一个客户端请求创建一个名为/a/b-的顺序znode,则所创建znode的名字可能是/a/b-3。®如果稍后,另外一个名为/a/b的顺序znode被创建,计数器会给出一个更大的值来保证znode名称的唯一性,例如,/a/b-5.在JavaAPI中,顺序znode的实际路径会作为create()调用的返回值被传回客户端。

在一个分布式系统中,顺序号可以被用于为所有的事件进行全局排序,这样客户端就可以通过顺序号来推断事件的顺序。14.4.3节介绍了如何使用顺序znode来实现共享锁。

3.观察

znode以某种方式发生变化时,“观察”watch)机制可以让客户端得到通知。可以针对ZooKeeper服务的操作来设置观察,该服务的其他操作可以触发观察。例如,客户端可以对一个znode调用exists操作,同时设定一个观察。如果这个znode不存在,则客户端所调用的exists操作将返回false。如果一段时间之后,另外一个客户端创建了这个znode,则这个观察会被触发,通知前一个客户端这个znode被创建。在下一小节中,将完整介绍哪些操作会触发其他操作。

观察只能被触发一次。为了能够多次收到通知,客户端需要重新注册所需的观察。在前面的例子中,如果客户端希望收到更多znode是否存在的通知(例如在这个znode被删除时也能收到通知),则需要再次调用exists操作来设定一个新的观察。

14.4.1节中,将有一个例子来演示如何使用观察来更新集群的配置。

14.3.2操作

如表丨4-2所示,ZooKeeper中有9种基本操作。

14-2. ZooKeeper服务的操作

操作

描述

delete

删除一个znode(znode不能有任何子节点)

exists

测试一个znode是否存在并且査询它的元数据

getACL, setACL

获取/设置一个znodeACL

getChildren

获取一个znode的子节点列表

getData, setData

获取/设置一个znode所保存的数据

sync

将客户端的znode视图与ZooKeeper同步


ZooKeeper
中的更新操作是有条件的。在使用deletesetData操作时必须提供被更新znode的版本号(可以通过exists操作获得)。如果版本号不匹配,则更新操作会失败。更新操作是非阻塞操作,因此一个更新失败 的客户端(由于其他进程同时在更新同一个znode)可以决定是否重试,或执 行其他操作,并不会因此而阻塞其他进程的执行。

虽然ZooKeeper可以被看作是一个文件系统,但出于简单性的需要,有一些文件系统的基本操作被它摒弃   了。由于ZooKeeper中的文件较小并且总是被整体读写,因此没有必要提供打幵、关闭或査找操作。

sync操作与POSIX文件系统中fsync()操作是不同的。如前所述,ZooKeeper中的写操作具有原子性,一个成功的写操作会保证将数据写到ZooKeeper服务器的持久存储介质中。然而,ZooKeeper允许客户端读到的数据滞后于ZooKeeper服务的最新状态,因此客户端可以使用sync操作来获取数据的最新状态。相关详情请参见14.3.4节。

1. 集合更新(Multiupdate)

ZooKeeper中有一个被称为multi的操作,用于将多个基本操作集合成一个操作单元,并确保这些基本操作同时被成功执行,或者同时失败,不会 发生其中部分基本操作被成功执行而其他基本操作失败的情况。 集合更新可以被用于在ZooKeeper中构建需要保持全局一致性的数据结构,例如构建一个无向图。在ZooKeeper中用一个znode来表示无向图中的一个顶点,为了在两个顶点之间添加或删除一条边,我们需要同时更新 两个顶点所分别对应的两个znode,因为每个znode中都有指向对方的引用。如果我们只用ZooKeeper的基本操作来实现边的更新,可能会让其他客户端发现无向图处于不一致的状态,即一个顶点具有指向另一个顶点的引用而对方却没有对应的引用。将针对两个znode的更新操作集合到一个multi操作中可以保证这组更新操作的原子性,也就保证了一对顶点之间不会出现不完整的连接。

2.关于API

对于ZooKeeper客户端来说,主要有两种语言绑定(binding)可以使用Java C;当然也可以使用PerlPythonRESTcontrib绑定。对于每一种绑定语言来说,在执行操作时都可以选择同步执行或异步执行。我们已 经看过同步执行的Java API。下面是exists操作的签名,它返回一个封装有znode元数据的Stat对象(如果znode不存在,则返回null):

public Stat exists(String path. Watcher watcher) throws KeeperException, InterruptedException

ZooKeeper类中同样可以找到异步执行的签名,如下所示:

public void exists(String path. Watcher watcher, StatCallback cb, Object ctx)

因为所有异步操作的结果都是通过回调来传送的,因此在Java API中异步 方法的返回类型都是void。调用者传递一个回调的实现,当ZooKeeper响应时,该回调方法被调用。在这种情况下,回调采用StatCallback接 口,它有以下方法:

public void processResult(int re. String path, Object ctx, Stat stat);

其中rc参数是返回代码,对应于KeeperException的代码。每个非零代 码都代表一个异常,在这种情况下,stat参数是nullpathctx参数 对应于客户端传递给exists()方法的参数,用于识别这个回调所响应的请 求。ctx参数可以是任意对象,当path参数不能提供足够的信息时,客户 端可以通过ctx参数来区分不同请求。如果path参数提供了足够的信息,可以将ctx参数设成null

实际上,有两个C语言的共享库。单线程库zookeeper_st只支持异步 API,并且主要在没有pthread库或pthread库不稳定的平台上使用。大部分开发人员都使用多线程库zookeeper_mt,它既支持同步API也支持 异步API。要想进一步了解如何构建和使用C语言API,请参考ZooKeeper 安装目录下wc/c子目录中的文件。

我该使用同步API还是异步API?

两种类型的API提供相同的功能,因此选择哪一种只是风格问题。例 如,如果你习惯于事件驱动的编程模型,则异步API更合适一些。

异步API允许你以流水线方式处理请求,这在某些情况下可以提供更好 的吞吐量。想象一下,你打算读取一大批znode并且分别对它们进行处 理。如果使用同步API,每一个读操作都会阻塞进程,直到该读操作返 回;但如果使用异步API,你可以非常快地启动所有的异步读操作并在另 外一个单独的线程中来处理读操作的返回。

3. 观察触发器 

existsgetChildrengetData这些读操作可以设置观察,这些 观察可以被写操作create, deletesetData触发。ACL相关的操作不参与触发任何观察。当一个观察被触发时会产生一个观察事件,这个观察 和触发它的操作共同决定着观察事件的类型。

当所观察的znode被创建、删除或其数据被更新时,设置在 exists操作上的观察将被触发。

•当所观察的znode被删除或其数据被更新时,设置在getData操 作上的观察将被触发。创建znode不会触发getData操作上的观 察,因为getData操作成功执行的前提是znode必须已经存在。

•当所观察的znode的一个子节点被创建或删除时,或所观察的 znode自己被删除时,设置在getChildren操作上的观察将会被触发。可以通过观察事件的类型来判断被删除的是znode还是其子节点:NodeDelete 类型代表 znode 被删除,NodeChildrenChanged类型代表一个子节点被删除。

表14-3列出了观察及其触发操作所对应的事件类型。

14-3.观察及其触发操作所对应的事件类型观察触发器

设置观察的操作setDate

创建znode

创建子节点

删除znode

删除子节点

exists

NodeCreated

NodeDeleted

NodeData

Changed

getData

NodeDeleted

NodeData

Changed

getChildren

NodeChildren

NodeDeleted

NodeChildren

Changed

Changed


一个观察事件中包含涉及该事件的
znode的路径,因此对于NodeCreated NodeDeleted事件来说,可通过路径来判断哪一个节点被创建或删除。为了能够在NodeChildrenChanged事件发生之后判断是哪些子节点被修改,需要 重新调用getChildren来获取新的子节点列表。与之类似,为了能够在NodeDataChanged事件之后获取新的数据,需要调用getData。在这两种情况下,从收到观察事件到执行读操作(getChildrengetData)期间,znode的状态可能会发生改变,在写程序的时候必须牢记这一点。

4. ACL列表

每个znode被创建时都会带有一个ACL列表,用于决定谁可以对它执行何种操作。

ACL依赖于ZooKeeper的客户端身份验证机制。ZooKeeper提供了以下几种身份验证方式:

• Digest通过用户名和密码来识别客户端

• sasl通过Kerberos来识别客户端

• Ip通过客户端的[P地址来识别客户端

在建立一个ZooKeeper会话之后,客户端可以对自己进行身份验证。虽然 znodeACL列表会要求所有的客户端是经过验证的,但ZooKeeper的身 份验证过程却是可选的,客户端必须自己进行身份验证来支持对znode访问。这里有一个使用digest方式(用户名和密码)进行身份验证的例子:

zk.addAuthInfo("digest", "tom:secret".getBytes());

每个ACL都是身份验证方式、符合该方式的一个身份和一组权限的组合。例如,如果我们打算给IP地址为10.0.0.1的客户端对某个znode的读权限,可以使用ip验证方式、10.0.0.1READ权限在该znode上设置一个ACL。在Java语言中,我们可以如下所示来创建这个ACL对象:

new ACL(Perms.READ,

new Id("ip' "10.0.0.1"));

14-4列出了一个完整的权限集合。注意exists操作并不受ACL权限的限制,因此任何客户端都可以调用exists来检索一个znode的状态或查询一个znode是否存在。

14-4. ACL权限

ACL权限 CREATE

允许的操作 create(子节点)

READ

getChildren

getData

WRITE

setData

DELETE

cfelete(子节点)

ADMIN

setACL


在类ZooDefs.Ids中有一些预定义的ACL, OPEN_ACL_UNSAFE是其中之

一,它将所有的权限(不包括ADMIN权限)授予每个人。此外,ZooKeeper还支持插入式身份验证机制,如果需要的话,它可以集成 第三方的身份验证系统。

ZooKeeper服务有两种不同的运行模式。一种是“独立模式”standalone mode),即只有一个ZooKeeper服务器。这种模式较为简单,比较适合于测 试环境(甚至可以在单元测试中采用),但是不能保证高可用性和可恢复性。 在生产环境中的ZooKeeper通常以“复制模式”replicated mode)运行于 个计算机集群上,这个计算机集群被称为一个“集合体”ensemble)。 ZooKeeper通过复制来实现高可用性,只要集合体中半数以上的机器处干可 用状态,它就能够提供服务。例如,在一个有5个节点的集合体中,任意2 台机器出现故障,都可以保证服务继续,因为剩下的3台机器超过了半 数。注意,6个节点的集合体也只能够容忍2台机器出现故障,因为如果3台机器出现故障,剩下的3台机器没有超过集合体的半数。出于这个原 因,一个集合体通常包含奇数台机器。

从概念上来说,ZooKeeper是非常简单的:它所做的就是确保对znode树的 每一个修改都会被复制到集合体中超过半数的机器上。如果少于半数的机器出现故障,则最少有一台机器会保存最新的状态,其余的副本最终也会 更新到这个状态。

然而,这个简单想法的实现却不简单。ZooKeeper使用了Zab协议,该协议包括两个可以无限重复的阶段。 

1. 阶段1:领导者选举

集合体中的所有机器通过一个选择过程来选出一台被称为“领导者”leader)的机器,其他的机器被称为“跟随者”follower)。一旦半数以上(或指定数 量)的跟随者已经将其状态与领导者同步,则表明这个阶段已经完成。

2. 阶段2:原子广播

所有的写请求都会被转发给领导者,再由领导者将更新广播给跟随者。当半数以上的跟随者已经将修改持久化之后,领导者才会提交这个更新,然后客户端才会收到一个更新成功的响应。这个用来达成共识的协议被设计 成具有原子性,因此每个修改要么成功要么失败。这类似于数据库中的两阶段提交协议。


ZooKeeper 是否使用 Paxos?

否。ZooKeeper的Zab协议不同于众所周知的Paxos算法(Leslie Lamport, wPaxos Made Simple,ACM SIGACT News [Distributed Computing Column) 32, 4 [Whole Number 121,December 2001] 51-58.)。虽然有些类似,但是 Zab在操作方面是不同的,例如它依靠TCP来保证其消息的顺序。

有关Zab的描述,可参见Benjamin Reed和Flavio Junqueira的论文“A simple totally ordered broadcast protoc〇r'(LADIS 2008, in: Proceedings of the 2nd Workshop on Large-Scale Distributed Systems and Middleware, Article 2, New York, NY, USA, 2008. ACM)。

Google 的 Chubby 锁服务(Mike Burrows, “The Chubby Lock Service for LooselyCoupled Distributed Systems,,5 November 2006, http://labs.google.com/ papers/ 是基于 Paxos 的,它的功能与 ZooKeeper 的功能类似。

如果领导者出现故障,其余的机器会选出另外-个领导者,并和新的领导者一起继续提供服务。随后,如果之前的领导者恢复正常,会成为一个跟随者。领导者选举的过程是非常快的,根据一个已公布的结果来看,只需要大约200毫秒,因此在领导者选举的过程中不会出现系统性能的明显降低。

在更新内存中的znode树之前,集合体中的所有机器都会先将更新写入磁盘。任何一台机器都可以为读请求提供服务,并且由于读请求只涉及内存检索,因此非常快。

14.3.4 一致性 

理解ZooKeeper的实现有助于理解其服务所提供的一致性保证。在集合体中所使用的术语“领导者”和“跟随者”是恰当的,它们表明一个跟随可能滞后干领导者几个更新。这也表明在一个修改被提交之前,只需要集 合体中半数以上机器已经将该修改持久化即可。对ZooKeeper来说,理想的情况就是将客户端都连接到与领导者状态一致的服务器上。每个客户端都有可能被连接到领导者,但客户端对此无法控制,甚至它自己都无法知道是 否连接到领导者。参见图14-2。

每一个对znode树的更新都被藏予一个全局唯一的ID称为zxid决代表 ZooKeeper Transaction ID)。ZooKeeper要求对所有的更新进行编号并 排序,它决定了分布式系统的执行顺序,如果zxid z1小于Z2,则z1—定发生在Q之前。

image.png

14-2.跟随者负责响应读请求,领导者负责提交写请求在ZooKeeper的设计中,以下几点考虑保证了数据的一致性。

1.顺序一致性

来自任意特定客户端的更新都会按其发送顺序被提交。也就是说,如果一个客户端将znodez的值更新为a,在之后的操作中,它又将z的值更新为则没有客户端能够在看到z的值是b之后再看到值a(如果没有其他对z 的更新)。

每个更新要么成功,要么失败。这意味着如果一个更新失败,则不会有客户端看到这个更新的结果。

3. 单一系统映像 

一个客户端无论连接到哪一台服务器,它看到的都是同样的系统视图。这意味着,如果一个客户端在同一个会话中连接到一台新的服务器,它所看到的系统状态不会比在之前服务器上所看到的更老。当一台服务器出现故 障,导致它的一个客户端需要尝试连接集合体中其他的服务器时,所有状态滞后于故障服务器的服务器都不会接受该连接请求,除非这些服务器将状态更新至故障服务器的水平。

4. 持久性

一个更新一旦成功,其结果就会持久存在并且不会被撤销。这表明更新不会受到服务器故障的影响。

5. 及时性

任何客户端所看到的滞后系统视图都是有限的,不会超过几十秒。这意味 着与其允许一个客户端看到非常陈旧的数据,还不如将服务器关闭,强迫 该客户端连接到一个状态较新的服务器。

出于性能的原因,所有的读操作都是从ZooKeeper服务器的内存获得数据,它们不参与写操作的全局排序。如果客户端之间通过ZooKeeper之外的机制进行通信,则客户端可能会发现它们所看到的ZooKeeper状态是不 一致的。

例如,客户端Aznode z的值从a更新为a’,接着A告诉B去读2 值,而B读到的值是a而不是这与ZooKeeper的一致性保证是完全兼容的(这种情况称为“跨客户端视图的同时一致性”)。为了避免这种情况发生,B应该在读z的值之前对z调用sync操作。sync操作会强制B所连接的ZooKeeper服务器“赶上”领导者,这样当Bz的值时,所读到的将 会是A所更新的(或后来更新的)。

容易让人疑惑的是,sync操作只能以异步的方式被调用。你不需要等待sync调用的返回,ZooKeeper会保证任何后续的操作都在服务 器的sync操作完成后才执行,哪怕这些操作是在sync操作完成之前发出的。

14.3.5会话

每个ZooKeeper客户端的配置中都包括集合体中服务器的列表。在启动 时,客户端会尝试连接到列表中的一台服务器。如果连接失败,它会尝试 连接另一台服务器,以此类推,直到成功与一台服务器建立连接或因为所 ZooKeeper服务器都不可用而失败。

一旦客户端与一台ZooKeeper服务器建立连接,这台服务器就会为该客户端创建一个新的会话。每个会话都会有一个超时的时间设置,这个设置由 创建会话的应用来设定。如果服务器在超时时间段内没有收到任何请求, 则相应的会话会过期。一旦一个会话已经过期,就无法重新被打开,并且任何与该会话相关联的短暂znode都会丢失。会话通常都会长期存在,而 会话过期则是一种比较罕见的事件,但对于应用来说,如何处理会话过期 仍是非常重要的(详情可参见14.4.2)。

只要一个会话空闲超过一定时间,都可以通过客户端发送ping请求(也称为心跳)来保持会话不过期。ping请求是由ZooKeeper的客户端库自动发送, 因此在你的代码中不需要考虑如何维护会话。)这个时间长度的设置应当足 够低,以便能够检测出服务器故障(由读超时体现),并且能够在会话超时的时间段内重新连接到另外-台服务器。

ZooKeeper客户端可以自动地进行故障切换,切换至另一台ZooKeeper服务器,并且关键的是,在另一台服务器接替故障服务器之后,所有的会话(和相关的短暂znode)仍然是有效的。

在故障切换过程中,应用程序将收到断开连接和连接至服务的通知。当客户端断开连接时,观察通知将无法发送,但是当客户端成功恢复连接后,这些延迟的通知还会被发送。当然,在客户端重新连接至另一台服务器的 过程中,如果应用程序试图执行一个操作,这个操作将会失败。这充分说明在真实的ZooKeeper应用中处理连接丢失异常的重要性(详情可参见 14.4.2 )。

ZooKeeper中有几个时间参数。“滴答tick time)参数定义了ZooKeeper中的基本时间周期,并被集合体中的服务器用来定义相互交互的时间表。其他设置都是根据滴答参数来定义的,或至少受它限制。例如, 会话超时(session timeout)参数的值不可以小于2个滴答并且不可以大于20个滴答。如果你试图将会话超时参数设置在这个范围之外,它将会被自动修改到这个范围之内。

通常将滴答参数设置为2秒(2000毫秒),对应于允许的会话超时范围是4 到40秒。在选择会话超时设置时有几点需要考虑。

较短的会话超时设置会较快地检测到机器故障。在组成员管理的例子中,会话超时的时间就是用来将故障机器从组中删除的时间。但要避免将会话超时时间设得太低,因为繁忙的网络会导致数据包传输延迟,从而可能会 无意中导致会话过期。在这种情况下,机器可能会出现“振动”(flap)现象:在很短的时间内反复出现离开后又重新加入组的情况。

对于那些创建较复杂暂时状态的应用程序来说,由于重建的代价较大,因此比较适合设置较长的会话超时。在某些情况下,可以对应用程序进行设计,使它能够在会话超时之前重启,从而避免出现会话过期的情况(这适合 于对应用进行维护或升级)。服务器会为每个会话分配一个唯一的ID和密 码,如果在建立连接的过程中将它们传递给ZooKeeper,可以用于恢复一个会话(只要该会话没有过期)。将会话ID和密码保存在稳定存储器中之后,可以将一个应用程序正常关闭,然后在重启应用之前凭借所保存的会话ID 和密码来恢复会话环境。

你可以将这个特征看成是一种用来帮助避免会话过期的优化技术,但不能因此忽略对会话过期异常的处理,因为机器的意外故障也会导致会话过期,或者,即使应用程序是正常关闭,也有可能因任何原因导致它没有在 会话未过期之前完成重启。

般的规则是,ZooKeeper集合体中的服务器越多,会话超时的设置应越 大。连接超时、读超时和ping周期都被定义为集合体中服务器数量的函 数,因此集合体中服务器数量越多,这些参数的值反而越小。如果频繁遇 到连接丢失的情况,应考虑增大超时的设置。可以使用JMX来监控 ZooKeeper的度量指标,例如请求延迟的统计信息。


14.3.6状态

ZooKeeper对象在其生命周期中会经历几种不同的状态(参见图丨4-3)。你 可以在任何时刻通过getState()方法来査询对象的状态:

public States getState()

States被定义成代表ZooKeeper对象不同状态的枚举类型值(不管是什么 枚举值,一个ZooKeeper的实例在一个时刻只能处于一种状态)。在试图与 ZooKeeper服务建立连接的过程中,一个新建的ZooKeeper实例处于 CONNECTING状态。一旦建立连接,它就会进入CONNECTED状态。

image.png

14-3. ZooKeeper状态转换图

通过注册观察对象,使用了ZooKeeper对象的客户端可以收到状态转换通知。在进入CONNECTED状态时,观察对象会收到一个WatchedEvent 通知,其中 KeeperState的值是SyncConnected

ZooKeeperwatcher对象肩负着双重责任:一方面它可以被用于获得ZooKeeper状态变化的相关通知(如本节所述)另一方面还可以被 用于获得znode变化的相关通知(参见14.3.2节对观察触发器的讨 论)。传递给ZooKeeper对象构造函数的(默认的)观察被用于监视其状态的变化。监视znode的变化可以使用一个专用的观察对象(将其 传递给适当的读操作),也可以通过读操作中的布尔标识来设定是否 共享使用默认的观察。

ZooKeeper实例可以断开然后重新连接到ZooKeeper服务,此时它的状态 就在CONNECTEDCONNECTING之间转换。如果它断开连接,观察会收到 一个Disconnected事件。注意,这些状态转换都是由ZooKeeper实例自

己发起的,如果连接丢失,它会自动尝试重新连接。如果close()方法被调用或出现会话超时(观察事件的KeeperState值为 Expired)时,ZooKeeper实例会转换到第三个状态CLOSED。一旦处于CLOSED状态,ZooKeeper对象不再被认为是活跃的(可以对States使用 isAliveO方法来测试),并且不能再用。为了重新连接到ZooKeeper 务,客户端必须创建一个新的ZooKeeper实例。

14.4使用ZooKeeper来构建应用 

在一定程度上了解ZooKeeper之后,我们接下来用ZooKeeper写一些有用的应用程序。

14.4.1配置服务

配置服务是分布式应用所需要的基本服务之一,它使集群中的机器可以共享配置信息中那些公共的部分。简单地说,ZooKeeper可以作为一个具有高可用性的配置存储器,允许分布式应用的参与者检索和更新配置文件。使 ZooKeeper中的观察机制,可以建立一个活跃的配置服务,使那些感兴 趣的客户端能够获得配置信息修改的通知。

让我们来写一个这样的服务。我们通过两个假设来简化所需实现的服务(稍加修改就可以取消这两个假设)。第一,我们唯一需要存储的配置数据是字符串,关键字是znode的路径,因此我们在每个znode上存储了一个键/值 对。第二,在任何时候只有一个客户端会执行更新操作。除此之外,这个模型看起来就像是有一个主人(类似于HDFS中的namenode)在更新信息,而他的工人则需要遵循这些信息。

我们在名为ActiveKeyValueStore的类中写了如下代码:

public class ActiveKeyValueStore extends ConnectionWatcher{
    private static final Charset CHARSET = Charset.forName("UTF-S");
    public void write(String path, String value) throws InterruptedException^,
    KeeperException {
    Stat stat = zk.exists(pathJ false); 
        if (stat == null) {
            zk.create(path, value.getBytes(CHARSET), 
            Ids.OPEN_ACL一UNSAFE, CreateMode.PERSISTENT);
        } else {
            zk.setData(pathJ value.getBytes(CHARSET), -1);
        }
    }
}

write()方法的任务是将一个关键字及其值写人ZooKeeper。它隐藏了创建一个新的znode和用一个新值更新现有znode之间的区别,而是使用exists操作来检测znode是否存在,然后再执行相应的操作。其他值得一提的细节是需要将字符串值转换为字节数组,因为我们只用了UTF-8编码getBytes()方法。

为了说明ActiveKeyValueStore的用法,我们编写了一个用来更新配置属性值的类ConfigUpdater,如范例14-6所示。

范例14-6.程序随机更新ZooKeeper中配置属性值的程序

public class ConfigUpdater {
    public static final String PATH = "/config";
    private ActiveKeyValueStore store; 
    private Random random = new Random();
    public ConfigUpdater(String hosts) throws IOException, InterruptedException { 
        store = new ActiveKeyValueStore(); 
        store.connect(hosts);
    }
    public void run() throws InterruptedException, KeeperException {
        while (true) {
            String value = random.nextlnt(100) + f,; store.write(PATH, value);
            System.out.printf("Set %s to %s\n", PATH, value);
            TimeUnit.SECONDS.sleep(random.nextInt(10));
        }
    }
    public static void main(String[] args) throws Exception {
        ConfigUpdater configUpdater = new ConfigUpdater(args[0]); 
        configUpdater.run();
    }
}

这个程序很简单,ConfigUpdater中定义了一个 ActiveKeyValueStore它在ConfigUpdater的构造函数中连接到ZooKeeperrun()方法永远在循环,在随机时间以随机值更新/config znode

接下来,让我们看看如何读取/config配置属性的值。首先,我们在ActiveKeyValueStore中添加一个读方法:

public String read(String path, Watcher watcher) throws InterruptedException, 
KeeperException {
        byte[] data = zk.getData(path, watcher, null/*stat*/); 
        return new String(data, CHARSET);
}

ZooKeepergetData()方法有三个参数:路径、一个观察对象和一个 Stat对象。Stat对象由getData()方法返回的值填充,用来将信息回传给调用者。通过这个方法,调用者可以获得一个znode的数据和元数据, 但在这个例子中,由于我们对元数据不感兴趣,因此将Stat参数设为null

作为配置服务的用户,ConfigWatcher(参见范例14-7)创建了一个 ActiveKeyValueStore对象store,并且在启动之后调用了 store read()方法(在displayConfig()方法中),将自身作为观察传递给 storedisplayConfig()方法用于显示它所读到的配置信息的初始值。

范例14-7.观察ZooKeeper中配置属性的更新情况并将其打印到控制台的应用

public class ConfigWatcher implements Watcher { 
    private ActiveKeyValueStore store;
    public ConfigWatcher(String hosts) throws IOException, InterruptedException { 
    store = new ActiveKeyValueStore(); 
    store.connect(hosts);
    }
    public void displayConfig() throws InterruptedException, KeeperException { 
    String value = store.read(ConfigUpdater.PATH, this);
    System.out.printf("Read %s as %s\n", ConfigUpdater.PATHj value);
    }
    @Override
    public void process(WatchedEvent event) {
        if (event.getType() == EventType.NodeDataChanged) { 
            try {
                ayConfig();
            } catch (InterruptedException e) {
                System.err.printIn("Interrupted. Exiting.");
                Thread.currentThread().interrupt();
            } catch (KeeperException e) {
            System.err.printf("KeeperException: %s. Exiting.\n", e);
            }
        }
    }
    public static void main(String[] args) throws Exception {
        ConfigWatcher configWatcher = new ConfigWatcher(args[0]); 
        configWatcher.displayConfig();
        // stay alive until process is killed or thread is interrupted 
        Thread.sleep(Long.MAX_VALUE);
    }
}

ConfigUpdater更新znode时,ZooKeeper产生一个类型为 EventType.NodeDataChanged的事件,从而触发观察ConfigWatcher 在它的process()方法中对这个事件做出反应,读取并显示配置的最新 版本。 由于观察仅发送单次信号,因此每次我们调用ActiveKeyValueStoreread()方法时,都将一个新的观察告知ZooKeeper,以确保我们可以看到将来的更新。尽管如此,我们还是不能保证接收到每一个更新,因为在收 到观察事件通知与下一次读之间,znode可能已经被更新过,而且可能是很多次更新,由于客户端在这段时间没有注册任何观察,因此不会收到通 知。对于示例中的配置服务,这不是问题,因为客户端只关心属性的最新值,最新值优先干之前的值。但在一般情况下,这个潜在的问题是不容忽视的。

让我们看看如何使用这个程序。在一个终端窗口中运行ConfigUpdater:

% java ConfigUpdater localhost
Set /config to 79 Set /config to 14 Set /config to 78

然后紧接着在另一个终端窗口启动ConfigWatcher:

% java ConfigWatcher localhost
Read /config as 79 Read /config as 14 Read /config as 78


14.4.2 可复原的ZooKeeper应用 


关于分布式计算的第一个误区是“网络是可靠的”。按照他们的观点,程序总是有一个可靠的网络,因此当程序运行在真正的网络中时,往往会出现各种各样的故障。让我们看看各种可能的故障模式,以及能够解决故障的措施,使我们的程序在面对故障时能够及时复原。

在Java API中的每一个ZooKeeper操作都在其throws子句中声明了两种类 型的异常,分别是 InterruptedException 和 KeeperException。

1. InterruptedException 异常

如果操作被中断,则会有一个InterruptedException异常被抛出。在 Java语言中有一个取消阻塞方法的标准机制,即针对存在阻塞方法的线程调用interrupts()。一个成功的取消操作将产生一个InterruptedException 异常。ZooKeeper也遵循这一机制,因此你可以使用这种方法来取消一个 ZooKeeper操作。使用了ZooKeeper的类或库通常会传播 InterruptedException异常,使客户端能够取消它们的操作。

InterruptedException异常并不意味着有故障,而是表明相应的操作已经被取消,所以在配置服务的示例中,可以通过传播异常来中止应用程序的运行。

2. KeeperException 异常

如果ZooKeeper服务器发出一个错误信号或与服务器存在通信问题,抛出 的则是KeeperException异常。针对不同的错误情况,KeeperException 异常存在不同的子类。例如/KeeperException.NoNodeException KeeperException的一个子类,如果你试图针对一个不存在的znode执行操作,抛出的则是该异常。

每一个KeeperException异常的子类都对应一个关于错误类型信息的代 码。例如,KeeperException‘NoNoc/eException 异常的代码是 KeeperException.

有两种方法被用来处理KeerException异常:一种是捕捉KeeperException异常并且通过检测它的代码来决定采取何种补救措施;另一种是捕捉等价KeeperException子类并且在每段捕捉代码中执行相应的操作。

KeeperException异常分为三大类。

•状态异常当一个操作因不能被应用于znode树而导致失败时,就 会出现状态异常。状态异常产生的原因通常是在同-时间有另外一 个进程正在修改znode。例如,如果一个znode先被另外一个进程 更新了,根据版本号执行setData操作的进程就会失败,并收到一个KeeperException.BadVersionException异常,这是因为版本号不匹配。程序员通常都知道这种冲突总是存在的,也都会写代码来进行处理。

               —些状态异常会指出程序中的错误,例如KeeperException. NoChildrenForEphemeralsException 异常,                 试图在短暂znode下创建子节点时就会抛出该异常。

  

.可恢复的异常可恢复的异常是指那些应用程序能够在同一个 ZooKeeper会话中恢复的异常。一个可恢复的异常是通过KeeperException.ConnectionLossException 来表示的,它意 味着已经丢失了与ZooKeeper的连接。ZooKeeper会尝试重新连 接,并且在大多数情况下重新连接会成功,并确保会话是完整的。

但是 ZooKeeper 不能判断与 KeeperException.Connection LossExction异常相关的操作是否成功执行。这种情况就是部分 失败的一个例子(在本章开始时提到的)。这时程序员有责任来解决 这种不确定性,并且根据应用的情况来采取适当的操作。

在这一点上,就需要对“幂等”idempotent)操作和“非幂等” (Nonidempotent)操作进行区分。幂等操作是指那些一次或多次执行都会产生相同结果的操作,例如读请求或无条件执行的setData 作。对于幂等操作,只需要简单地进行重试即可。

对于非幂等操作,就不能盲目地进行重试,因为它们多次执行 的结果与一次执行是完全不同的。程序可以通过在znode的路径和 它的数据中编码信息来检测是否非幂等操作的更新已经完成。在 14.4.3节对可恢复的异常的讨论中,我们将通过实现一个锁服务来讨论如何处理失畋的非幂等操作。

不可恢复的异常在某些情况下,ZooKeeper会话会失效——也许因为超时或因为会话被关闭(两种情况下都会收到KeeperException.SessionExpiredException 异常),或因为身 份验证失败(KeeperException.AuthFailedException 异常)。无论上述哪种情况,所有与会话相关联的短暂znode都将丢失,因此应用程序需要在重新连接到ZooKeeper之前重建它的状态。


3. 可靠的配置服务

让我们回到ActiveKeyValueStorewrite()方法,它由一个exists操作紧跟着一个create操作或setData操作组成:

public void write(String path, String value) throws InterruptedException, 
KeeperException {
    Stat stat = zk.existsCpath, false); 
    if (stat == null) {
        zk.create(path, value.getBytes(CHARSET), Ids.OPEN_ACL_UNSAFEj CreateMode.PERSISTENT);
        }else {
        zk.setData(path, value.getBytes(CHARSET), -1);
        }
}

作为一个整体,write()方法是一个幂等操作,所以我们可以对它进行无条件重试。这里有一个write()方法修改后的版本,能够循环执行重试。

其中设置了重试的最大次数MAX_RETRIES和两次重试之间的时间间隔 RETRY_PERIOD_SECONDS

public void write(String path, String value) throws InterruptedException, 
KeeperException { 
int retries = 0; 
while (true) { 
    try {
        Stat stat = zk.exists(path, false); 
            if (stat == null) {
            zk.create(path, value.getBytes(CHARSET), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            } else {
            zk.setData(path, value.getBytes(CHARSET), stat.getVersion()); 
            }
            return;
        } catch (KeeperException.SessionExpiredException e) { 
        throw e;
        } catch (KeeperException e) { 
        if (retries++ == MAX_RETRIES) {
        throw  e;
        }
        // sleep then retry
        TimeUnit .SECONDS. sleep(RETRY__PERIOD_SECONDS);
        }
    }
}

这段代码没有在KeeperException.SessionExpiredException异常处进行重试,因为当一个会话过期时,ZooKeeper对象会进入CLOSED状态,此状态下它不能再进行重新连接(参见图M-3)。我们只是简单地将这个异常 重新抛出并且让调用者创建一个新的ZooKeeper实例,以重试整个write()方法。一个简单的创建新实例如方法是创建一个新的于恢复过期会话:

public static void main(String[] args) throws Exception { 
  while (true) { 
    try {
      ResilientConfigUpdater configUpdater = new ResilientConfigUpdater(args[0]); 
      configUpdater.run();
      } catch (KeeperException.SessionExpiredException e) { 
            // start a new session 
        } catch (KeeperException e) {
            // already retried, so exit 
            e.printStackTrace(); 
            break;
        }
    }
}

处理会话过期的另一种方式是在观察中(在这个例子中应该是 ConnectionWatcher)监测类型为 Expired  KeeperState,然后在监测的时候创建一个新的连接。即使我们收到KeeperException. SessionExpiredException异常,由于连接最终是能够重新建立的,我们 就可以使用这种方式在write()方法内不断进行重试。不管我们采用何种 机制从过期会话中恢复,重要的是需要对这种不同干连接丢失的故障类型 进行不同的处理。

实际上,这里忽略了另一种故障模式。当ZooKeeper对象被创建时,它会尝试连接一个ZooKeeper服务器。如果连接失败或超时, 那么它会尝试连接集合体中的另一台服务器。如果在尝试集合体中所有服务器之后仍然无法建立连接,它会抛出一个IOException异 常。由于所有ZooKeeper服务器都不可用的可能性很小,所以一些应用程序选择循环重试操作,直到ZooKeeper服务可用为止。

这仅仅是一种重试处理策略,还有许多其他策略,例如使用“指数退回” (exponential backoff),每次将重试的间隔乘以一个常数。Hadoop内核中的 org.apache.hadoop.io.retry包是一组工具,能够以可重用的方式将重试逻辑加入用户代码,因此对于构建ZooKeeper应用是非常有用的。

14.4.3锁服务

分布式锁能够在一组进程之间提供互斥机制,使得在任何时刻只有一个进程可以持有锁。分布式锁可以用于在大型分布式系统中实现领导者选举,在任何时间点,持有锁的那个进程就是系统的领导者。

不要将ZooKeeper自己的领导者选举和使用ZooKeeper基本操作实 现的般的领导者选举服务混为一谈(事实上,ZooKeeper中包含有 一个领导者选举服务的实现。ZooKeeper自己的领导者选举机制是不对外公开的,我们这里所描述的一般领导者选举服务则不同,它是为那些需要所有进程与主进程保持一致的分布式系统所设计的。

为了使用ZooKeeper来实现分布式锁服务,我们使用顺序znode来为那些 竞争锁的进程强制排序。思路很简单:首先指定一个作为锁的znode,通常 用它来描述被锁定的实体,称为然后希望获得锁的客户端创建一些短暂顺序znode,作为锁znode的子节点。在任何时间点,顺序号最小的 客户端将持有锁。例如,有两个客户端差不多同时创建znode,分别为/leaer/lock-1和/leader/lock-2的客户端会持有锁,因为它的znode顺序号最小。ZooKeeper服务是顺序的仲裁者,因为它负责分配顺序号。

通过删除znode/leader/lock-1即可简单地将锁释放,另外,如果客户端进程死亡,对应的短暂znode也会被删除。接下来,创建znode/leader/locker-2的客户端将持有锁,因为它的顺序号紧跟前一个。通过创建一个关于znode 除的观察,可以使客户端在获得锁时得到通知。

如下是申请获取锁的伪代码。

(1) 在锁znode下创建一个名为lock-的短暂顺序znode,并且记住它的 实际路径名(create操作的返回值)。

(2) 査询锁znode的子节点并且设置一个观察。

(3) 如果步骤1中所创建的znode在步骤2返回的所有子节点中具有最小的顺序号,则获取到锁,退出。

(4) 等待步骤2中所设观察的通知,转到步骤2。

1.羊群效应 

虽然这个算法是正确的,但还是存在一些问题。第一个问题是这种实现会受到“羊群效应herd effect)的影响。在有成百上千客户端的情况,所有的客户端都在尝试获得锁,所以每个客户端都会在锁znode上设置一个观 察,用于捕捉子节点的变化。每次锁被释放或一个新进程开始申请锁的时候,观察都会被触发并且每个客户端都会收到一个通知。“羊群效应”就 是指这种大量客户端收到同一事件的通知,但实际上只有很少一部分需要 处理这一事件。在这种情况下,只有一个客户端会成功地获取锁,但是维护的过程以及向所有客户端发送观察事件会产生峰值流量,这会对ZooKeeper服务器造成压力。

为了避免出现羊群效应,我们需要优化发送通知的条件。关键在于仅当前 一个顺序号的子节点消失时才需要通知下一个客户端,而不是删除(或创建) 任何子节点时都进行通知。在我们的例子中,如果客户端创建了znode /leader/lock-1/leader/lock-2 /leader/lock-3,那么有当/leader/lock-2失时才需要通知/leader/lock-3对应的客户端;/leader/lock-1/消失或有新的znode//leader/lock-4加入时,不需要通知该客户端。

1. 可恢复的异常

这个申请锁的算法目前还存在另一个问题,就是不能处理因连接丢失而导致的create操作失畋。如前所述,在这种情况下我们不知道操作是成功还是 失败。由于创建一个顺序znode是非幂等操作,所以我们不能简单地进行重试。原因在于如果第一次创建已经成功,重试会使我们多出一个永远删不掉的孤儿znode(至少到客户端会话结束前)。最不幸的结果是还将会出现死锁。 问题在于,在重新连接之后客户端不能够判断它是否已经创建过子节点。 解决方案是在znode的名称中嵌入一个ID,如果客户端出现连接丢失的情况,重新连接之后它便可以对锁节点的所有子节点进行检查,看看是否有子节点的名称中包含其ID。如果有一个子节点的名称包含其ID,它便知道自己的创建操作已经成功,不需要再创建子节点。如果没有子节点的名称中包含其ID,则客户端可以安全地创建一个新的顺序子节点。

客户端会话的ID是一个长整数,并且在ZooKeeper服务中是唯一的,因此非常适合在连接丢失后用于重新识别客户端。可以通过调用Java ZooKeeper 类的getSessionldO方法来获得会话的ID

在创建短暂顺序znode时应当采用这样的命名方式, ZooKeeper在其尾部添加顺序号之后,znode的名称会形如lock-<sessionID><sequenceNumber> 。由于顺序号对于父节点来说是唯一的,但对于子节点名 并不唯一,因此采用这样的命名方式可以让子节点在保持创建顺序的同时 能够确定自己的创建者。

3. 不可恢复的异常

如果一个客户端的ZooKeeper会话过期,那么它所创建的短暂znode将会 被删除,已持有的锁会被释放,或者是放弃了申请锁的位置。使用锁的应用程序应当意识到它已经不再持有锁,应当清理它的状态,然后通过创建 并尝试申请一个新的锁对象来重新启动。注意,这个过程是由应用程序控制的,而不是锁,因为锁是不能预知应用程序需要如何清理自己的状态。

4. 实现

正确地实现一个分布式锁是一件棘手的事,因为很难对所有类型的故障都进行正确的解释处理。ZooKeeper带有一个Java语言编写的生产级別的锁实现,名为WriteLock,客户端可以很方便地使用它。

14.4.4更多分布式数据结构和协议

使用ZooKeeper可以实现很多不同的分布式数据结构和协议,例如“屏 障”barrier)、队列和两阶段提交协议。有趣的是它们都是同步协议,但我 们可以使用异步ZooKeeper基本操作(如通知)来实现它们。

ZooKeeper 网站apac/je.org)提供了一些用于实现分布式数据 结构和协议的伪代码。ZooKeeper本身也带有一些标准方法的实现(包括锁、领导者选举和队列),放在安装位置下的recipes目录中。

Curator项目(http://github.com/Netflix/curator)提供了更多的 ZooKeeper方法的实现。

BookKeeper Hedwig

BookKeeper是一个具有髙可用性和可靠性的日志服务。它可以用来实现预 写式日志(write-ahead logging),这是一项在存储系统中用于保证数据完整性的常用技术。在一个使用预写式日志的系统史,每一个写操作在被应用前 都先要写入事务日志。使用这个技术,我们必在每个写操作之后都将数 据写到永久存储器上,因为即使出现系统故障,也可以通过重新执行事务日志中尚未应用的写操作来恢复系统的最后状态。

BookKeeper客户端所创建的日志被称为ledger,每一个添加到ledger的记录被称为ledger entry,每个ledger entry就是一个简单的字节数组。ledger 由保存有ledger数据副本的bookie服务器组进行管理。注意,ledger数据不存储在ZooKeeper中,只有元数据保存在ZooKeeper中。

传统上,为了让使用预写式日志的系统更加稳定,必须解决保存有事务曰 志的节点的故障问题,这通常是通过某种方式复制事务日志来解决这个问 题。例如Hadoop HDFS中的namenode会将它的编辑日志写到多个磁盘 上,每个磁盘都是一个典型的NFS装入盘。尽管如此,当主节点出现故障时,还是需要手动完成故障恢复。通过提供具有高可用性的日志服务,BookKeeper承诺提供透明的故障恢复,因为它可以容忍Bookie服务器的故障。(就3.2.4节所描述的HDFS高可用性来说,使用基于BookKeeper的编辑日志后将不再需要利用NFS来实现共享存储。)

Hedwig是利用BookKeeper实现的一个基于主题的发布-订阅系统。以ZooKeeper作为基础Hedwig提供了一个具有高可用性的服务,即使在订阅者长时间离线的情况下它也能够保证消息的传递。

BookKeeperZooKeeper的一个子项目,在http://zooteeper.apache.org/bookkeeper/可以找到它和Hedwig的更多相关用法。


14.5生产环境中的ZooKeeper 


在生产环境中,应当以复制模式运行ZooKeeper。在这里,我们将讨论使用 ZooKeeper服务器的集合体时需要考虑的一些问题。但是本节的内容不够详尽,建议参考《ZooKeeper管理员指南》获得详细的最新操作指南,包括支持的平台、推荐的硬件、维护过程和配置属性。


14.5.1可恢复性和性能

在安放ZooKeeper所用的机器时,应当考虑尽量减少机器和网络故障可能带来的影响。在实践过程中,一般是跨机架、电源和交换机来安放服务器,这样,这些设备中的任何一个出现故障都不会使集合体损失半数以上 的服务器。

对于那些需要低延迟服务(毫秒级别)的应用来说,最好将所有的服务器都放 在同一个数据中心的同一个集合体中。也有一些应用不需要低延迟服务,它们可以通过跨数据中心(每个数据中心至少两台服务器)安放服务器来获得 更好的可恢复性,领导者选举和分布式粗粒度锁是这类应用的代表。这两个应用中的状态改变都相对较少,因此相对于整个服务来说,数据中心之间传递状态改变消息所需的几十毫秒开销是可以承受的。

ZooKeeper中有一个“观察节点”(observer node)的概念,是指没有投票权的跟随者。由于观察节点不参与写请求过程中达成共识的投 票,因此使用观察节点可以让ZooKeeper集群在不影响写性能的情况下提高读操作的性能。使用观察节点可以让ZooKeeper集群跨越 多个数据中心,同时不会增加正常投票节点的延迟。可以通过将投票节点安放在一个数齒中心,将观察节点安放在另一个数据中心来 实现这一点。

ZooKeeper是具有高可用性的系统,对它来说,最关键的是能够及时地履行其职能。因此,ZooKeeper应当运行在专用的机器上。如果有其他应用程序竞争资源,可能会导致ZooKeeper的性能明显下降。

通过对ZooKeeper进行配置,可以使它的事务日志和数据快照分别保存在不同的磁盘驱动器上。在默认情况下,两者都保存在dataDir属性所指定的目录中,但是通过为dataLogDir属性设置一个值,便可以将事务日志 写在指定的位置。通过指定一个专用的设备(不只是一个分区),一个ZooKeeper服务器可以以最大速率将日志记录写到磁盘,因为写日志是顺序 写,并且没有寻址操作。由干所有的写操作都是通过领导者来完成的,增 加服务器并不能提高写操作的吞吐量,所以提髙性能的关键是写操作的速度。

如果写操作的进程被交换到磁盘上,则性能会受到不利的影响。这是可以避免的,将Java堆的大小设置为小于机器上空闲的物理内存即可。ZooKeeper脚本可以从它的配置目录中获取一个,名为y_ava.e«v的文件,这个 文件被用来设置JVMFLAGS环境变量,包括设置Java堆的大小(和任何其 他所需的JVM参数)。

14.5.2 配置

ZooKeeper服务器的集合体中,每个服务器都有一个数值型的ID,服务器 ID在集合体中是唯一的,并且取值范围在1到255之间。可以通过一个名 为的纯文本文件设定服务器的ID,这个文件保存在dataDir参数所 指定的目录中。

为每台服务器设置ID只完成了工作的一半。我们还需要将集合体中其他服务器的ID和网络位置告诉所有的服务器。在ZooKeeper的配置文件中必须为每台服务器添加下面这行配置:

server.n=hostname:port:port

n是服务器的ID。这里有2个端口设置:第一个是跟随者用来连接领导者的端口,第二个端口被用于领导者选举。这里有一个包含三台机器的复制模式下ZooKeeper集合体的配置例子:

tickT ime=2000
dataDir=/diski/zookeeper
dataLogDir=/disk2/zookeepen
clientPort=2181
initLimit=5
syncLimit=2
server.l=zookeeperl:2888:3888 server.2=zookeeper2:2888:3888 server.3=zookeeper3:2888:3888

连接到这个ZooKeeper集合体的客户端在ZooKeeper对象的构造函数中应当使用 zookeeperl:2181、zookeeper2:2181 zookeeper3:2181作为主机字符串。

在复制模式下,有两个额外的强制参数:initLimitsyncLimit,两者都是以滴答参数的倍数进行度量。

initLimit参数设定了所有跟随者与领导者进行连接并同步的时间范围。 如果在设定的时间段内,半数以上的跟随者未能完成同步,领导者便会宣 布放弃领导地位,然后进行另外一次领导者选举。如果这种情况经常发生 (可以通过日志中的记录发现这种情况),则表明设定的值太小。

syncLimit参数设定了允许一个跟随者与领导者进行同步的时间。如果在设定的时间段内,一个跟随者未能完成同步,会自己重启。所有关联到跟随者的客户端将连接到另一个跟随者。

这些是建立和运行一个ZooKeeper服务器集群所需的最少设置。ZooKeeper管理员指南》列出了更多的配置选项,特别是性能调优方面的。

转载请注明:全栈大数据 » 第十四章 关于 ZooKeeper

喜欢 (1)or分享 (0)
发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址