整套大数据学习资料(视频+笔记)百度网盘无门槛下载:http://www.edu360.cn/news/content?id=3377

14.4使用ZooKeeper来构建应用 14.4.1配置服务

hadoop 花牛 12℃ 0评论

在一定程度上了解ZooKeeper之后,我们接下来用ZooKeeper写一些有用的应用程序。

配置服务是分布式应用所需要的基本服务之一,它使集群中的机器可以共享配置信息中那些公共的部分。简单地说,ZooKeeper可以作为一个具有高可用性的配置存储器,允许分布式应用的参与者检索和更新配置文件。使 用ZooKeeper中的观察机制,可以建立一个活跃的配置服务,使那些感兴 趣的客户端能够获得配置信息修改的通知。

让我们来写一个这样的服务。我们通过两个假设来简化所需实现的服务(稍加修改就可以取消这两个假设)。第一,我们唯一需要存储的配置数据是字符串,关键字是znode的路径,因此我们在每个znode上存储了一个键/值 对。第二,在任何时候只有一个客户端会执行更新操作。除此之外,这个模型看起来就像是有一个主人(类似于HDFS中的namenode)在更新信息,而他的工人则需要遵循这些信息。

我们在名为ActiveKeyValueStore的类中写了如下代码:

public class ActiveKeyValueStore extends ConnectionWatcher{
    private static final Charset CHARSET = Charset.forName("UTF-S");
    public void write(String path, String value) throws InterruptedException^,
    KeeperException {
    Stat stat = zk.exists(pathJ false); 
        if (stat == null) {
            zk.create(path, value.getBytes(CHARSET), 
            Ids.OPEN_ACL一UNSAFE, CreateMode.PERSISTENT);
        } else {
            zk.setData(pathJ value.getBytes(CHARSET), -1);
        }
    }
}

write()方法的任务是将一个关键字及其值写人ZooKeeper。它隐藏了创建一个新的znode和用一个新值更新现有znode之间的区别,而是使用exists操作来检测znode是否存在,然后再执行相应的操作。其他值得一提的细节是需要将字符串值转换为字节数组,因为我们只用了UTF-8编码getBytes()方法。

为了说明ActiveKeyValueStore的用法,我们编写了一个用来更新配置属性值的类ConfigUpdater,如范例14-6所示。

范例14-6.该程序随机更新ZooKeeper中配置属性值的程序

public class ConfigUpdater {
    public static final String PATH = "/config";
    private ActiveKeyValueStore store; 
    private Random random = new Random();
    public ConfigUpdater(String hosts) throws IOException, InterruptedException { 
        store = new ActiveKeyValueStore(); 
        store.connect(hosts);
    }
    public void run() throws InterruptedException, KeeperException {
        while (true) {
            String value = random.nextlnt(100) + f,; store.write(PATH, value);
            System.out.printf("Set %s to %s\n", PATH, value);
            TimeUnit.SECONDS.sleep(random.nextInt(10));
        }
    }
    public static void main(String[] args) throws Exception {
        ConfigUpdater configUpdater = new ConfigUpdater(args[0]); 
        configUpdater.run();
    }
}

这个程序很简单,ConfigUpdater中定义了一个 ActiveKeyValueStore它在ConfigUpdater的构造函数中连接到ZooKeeperrun()方法永远在循环,在随机时间以随机值更新/config znode

接下来,让我们看看如何读取/config配置属性的值。首先,我们在ActiveKeyValueStore中添加一个读方法:

public String read(String path, Watcher watcher) throws InterruptedException, 
KeeperException {
        byte[] data = zk.getData(path, watcher, null/*stat*/); 
        return new String(data, CHARSET);
}

ZooKeepergetData()方法有三个参数:路径、一个观察对象和一个 Stat对象。Stat对象由getData()方法返回的值填充,用来将信息回传给调用者。通过这个方法,调用者可以获得一个znode的数据和元数据, 但在这个例子中,由于我们对元数据不感兴趣,因此将Stat参数设为null

作为配置服务的用户,ConfigWatcher(参见范例14-7)创建了一个 ActiveKeyValueStore对象store,并且在启动之后调用了 store read()方法(在displayConfig()方法中),将自身作为观察传递给 storedisplayConfig()方法用于显示它所读到的配置信息的初始值。

范例14-7.观察ZooKeeper中配置属性的更新情况并将其打印到控制台的应用

public class ConfigWatcher implements Watcher { 
    private ActiveKeyValueStore store;
    public ConfigWatcher(String hosts) throws IOException, InterruptedException { 
    store = new ActiveKeyValueStore(); 
    store.connect(hosts);
    }
    public void displayConfig() throws InterruptedException, KeeperException { 
    String value = store.read(ConfigUpdater.PATH, this);
    System.out.printf("Read %s as %s\n", ConfigUpdater.PATHj value);
    }
    @Override
    public void process(WatchedEvent event) {
        if (event.getType() == EventType.NodeDataChanged) { 
            try {
                ayConfig();
            } catch (InterruptedException e) {
                System.err.printIn("Interrupted. Exiting.");
                Thread.currentThread().interrupt();
            } catch (KeeperException e) {
            System.err.printf("KeeperException: %s. Exiting.\n", e);
            }
        }
    }
    public static void main(String[] args) throws Exception {
        ConfigWatcher configWatcher = new ConfigWatcher(args[0]); 
        configWatcher.displayConfig();
        // stay alive until process is killed or thread is interrupted 
        Thread.sleep(Long.MAX_VALUE);
    }
}

ConfigUpdater更新znode时,ZooKeeper产生一个类型为 EventType.NodeDataChanged的事件,从而触发观察。ConfigWatcher 在它的process()方法中对这个事件做出反应,读取并显示配置的最新 版本。 由于观察仅发送单次信号,因此每次我们调用ActiveKeyValueStoreread()方法时,都将一个新的观察告知ZooKeeper,以确保我们可以看到将来的更新。尽管如此,我们还是不能保证接收到每一个更新,因为在收 到观察事件通知与下一次读之间,znode可能已经被更新过,而且可能是很多次更新,由于客户端在这段时间没有注册任何观察,因此不会收到通 知。对于示例中的配置服务,这不是问题,因为客户端只关心属性的最新值,最新值优先干之前的值。但在一般情况下,这个潜在的问题是不容忽视的。

让我们看看如何使用这个程序。在一个终端窗口中运行ConfigUpdater:

% java ConfigUpdater localhost
Set /config to 79 Set /config to 14 Set /config to 78

然后紧接着在另一个终端窗口启动ConfigWatcher:

% java ConfigWatcher localhost
Read /config as 79 Read /config as 14 Read /config as 78

转载请注明:全栈大数据 » 14.4使用ZooKeeper来构建应用 14.4.1配置服务

喜欢 (0)or分享 (0)
发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址