IBM®、Google、VMWare 和 Amazon 等公司已经开始提供云计算产品和战略。本文讲解如何使用 Apache Hadoop 构建一个 MapReduce 框架以建立 Hadoop 集群,以及如何创建在 Hadoop 上运行的示例 MapReduce 应用程序。还将讨论如何在云上设置耗费时间/磁盘的任务。
云计算简介
近来云计算越来越热门了,云计算已经被看作 IT 业的新趋势。云计算可以粗略地定义为使用自己环境之外的某一服务提供的可伸缩计算资源,并按使用量付费。可以通过 Internet 访问 “云” 中的任何资源,而不需要担心计算能力、带宽、存储、安全性和可靠性等问题。
本文简要介绍 Amazon EC2 这样的云计算平台,可以租借这种平台上的虚拟 Linux® 服务器;然后介绍开放源码 MapReduce 框架 Apache Hadoop,这个框架将构建在虚拟 Linux 服务器中以建立云计算框架。但是,Hadoop 不仅可以部署在任何厂商提供的 VM 上,还可以部署在物理机器上的一般 Linux OS 中。
在讨论 Apache Hadoop 之前,我们先简要介绍一下云计算系统的结构。图 1 显示云计算的各个层以及现有的一些服务。
基础设施即服务 (Infrastructure-as-a-Service,IaaS)是指以服务的形式租借基础设施(计算资源和存储)。IaaS 让用户可以租借计算机(即虚拟主机)或数据中心,可以指定特定的服务质量约束,比如能够运行某些操作系统和软件。Amazon EC2 在这些层中作为 IaaS,向用户提供虚拟的主机。平台即服务 (Platform-as-a-Service,PaaS)主要关注软件框架或服务,提供在基础设施中进行 “云” 计算所用的 API。Apache Hadoop 作为 PaaS,它构建在虚拟主机上,作为云计算平台。
图 1. 云计算的层和现有服务
Amazon EC2
Amazon EC2 是一个 Web 服务,它允许用户请求具有各种资源(CPU、磁盘、内存等)的虚拟机器。用户只需按使用的计算时间付费,其他事情全交给 Amazon 处理。
这些实例 (Amazon Machine Image,AMI) 基于 Linux,可以运行您需要的任何应用程序或软件。在从 Amazon 租借服务器之后,可以像对待物理服务器一样使用一般的 SSH 工具设置连接和维护服务器。
对 EC2 的详细介绍超出了本文的范围。
部署 Hadoop 云计算框架的最好方法是把它部署在 AMI 上,这样可以利用云资源,不需要考虑计算能力、带宽、存储等问题。但是,在本文的下一部分中,我们将在本地的 Linux 服务器 VMWare 映像中构建 Hadoop,因为 Hadoop 不仅适用于云解决方案。在此之前,我们先介绍一下 Apache Hadoop。
Apache Hadoop
Apache Hadoop 是一个软件框架(平台),它可以分布式地操纵大量数据。它于 2006 年出现,由 Google、Yahoo! 和 IBM 等公司支持。可以认为它是一种 PaaS 模型。
它的设计核心是 MapReduce 实现和 HDFS (Hadoop Distributed File System),它们源自 MapReduce(由一份 Google 文件引入)和 Google File System。
MapReduce
MapReduce 是 Google 引入的一个软件框架,它支持在计算机(即节点)集群上对大型数据集进行分布式计算。它由两个过程组成,映射(Map)和缩减(Reduce)。
在映射过程中,主节点接收输入,把输入分割为更小的子任务,然后把这些子任务分布到工作者节点。
工作者节点处理这些小任务,把结果返回给主节点。
然后,在缩减过程中,主节点把所有子任务的结果组合成输出,这就是原任务的结果。
图 2 说明 MapReduce 流程的概念。
MapReduce 的优点是它允许对映射和缩减操作进行分布式处理。因为每个映射操作都是独立的,所有映射都可以并行执行,这会减少总计算时间。
HDFS
对 HDFS 及其使用方法的完整介绍超出了本文的范围。
从最终用户的角度来看,HDFS 就像传统的文件系统一样。可以使用目录路径对文件执行 CRUD 操作。但是,由于分布式存储的性质,有 “NameNode” 和 “DataNode” 的概念,它们承担各自的责任。
NameNode 是 DataNode 的主节点。它在 HDFS 中提供元数据服务。元数据说明 DataNode 的文件映射。它还接收操作命令并决定哪些 DataNode 应该执行操作和复制。
DataNode 作为 HDFS 的存储块。它们还响应从 NameNode 接收的块创建、删除和复制命令。
JobTracker 和 TaskTracker
在提交应用程序时,应该提供包含在 HDFS 中的输入和输出目录。JobTracker 作为启动 MapReduce 应用程序的单一控制点,它决定应该创建多少个 TaskTracker 和子任务,然后把每个子任务分配给 TaskTracker。每个 TaskTracker 向 JobTracker 报告状态和完成后的任务。
通常,一个主节点作为 NameNode 和 JobTracker,从节点作为 DataNode 和 TaskTracker。Hadoop 集群的概念视图和 MapReduce 的流程见图 2。
图 2. Hadoop 集群的概念视图和 MapReduce 的流程
设置 Apache Hadoop
现在在 Linux VM 上设置 Hadoop 集群,然后就可以在 Hadoop 集群上运行 MapReduce 应用程序。
Apache Hadoop 支持三种部署模式:- 单独模式:在默认情况下,Hadoop 以非分布的单独模式运行。这个模式适合应用程序调试。
- 伪分布模式:Hadoop 还可以以单节点的伪分布模式运行。在这种情况下,每个 Hadoop 守护进程作为单独的 Java™ 进程运行。
- 全分布模式:Hadoop 配置在不同的主机上,作为集群运行。
要想以单独或伪分布模式设置 Hadoop,请参考 Hadoop 的网站。在本文中,我们只讨论以全分布模式设置 Hadoop。
准备环境
在本文中,我们需要三台 GNU/Linux 服务器;一个作为主节点,另外两个作为从节点。表 1. 服务器信息
服务器 IP | 服务器主机名 | 角色 | 9.30.210.159 | Vm-9-30-210-159 | 主节点(NameNode 和 JobTracker) | 9.30.210.160 | Vm-9-30-210-160 | 从节点 1 (DataNode 和 TaskTracker) | 9.30.210.161 | Vm-9-30-210-161 | 从节点 2 (DataNode 和 TaskTracker) |
每台机器都需要安装 Java SE 6 和 Hadoop 二进制代码。本文使用 Hadoop version 0.19.1。
还需要在每台机器上安装 SSH 并运行 sshd。SUSE 和 RedHat 等流行的 Linux 发行版在默认情况下已经安装了它们。
设置通信
更新 /etc/hosts 文件,确保这三台机器可以使用 IP 和主机名相互通信。
因为 Hadoop 主节点使用 SSH 与从节点通信,所以应该在主节点和从节点之间建立经过身份验证的无密码的 SSH 连接。在每台机器上执行以下命令,从而生成 RSA 公共和私有密钥。
这会在 /root/.ssh 目录中生成 id_rsa.pub。重命名主节点的 id_rsa.pub(这里改名为 59_rsa.pub)并把它复制到从节点。然后执行以下命令,把主节点的公共密钥添加到从节点的已授权密钥中。cat /root/.ssh/59_rsa.pub >> /root/.ssh/authorized_keys |
现在尝试使用 SSH 连接从节点。应该可以成功连接,不需要提供密码。
设置主节点
把 Hadoop 设置为全分布模式需要配置 <Hadoop_home>/conf/ 目录中的配置文件。
在 hadoop-site.xml 中配置 Hadoop 部署。这里的配置覆盖 hadoop-default.xml 中的配置。
表 2. 配置属性
属性 | 解释 | fs.default.name | NameNode URI | mapred.job.tracker | JobTracker URI | dfs.replication | 复制的数量 | hadoop.tmp.dir | 临时目录 |
hadoop-site.xml<?xml version="1.0"?><?xml-stylesheet type="text/xsl" href="configuration.xsl"?><!-- Put site-specific property overrides in this file. --><configuration> <property> <name>fs.default.name</name> <value>hdfs://9.30.210.159:9000</value> </property> <property> <name>mapred.job.tracker</name> <value>9.30.210.159:9001</value> </property> <property> <name>dfs.replication</name> <value>1</value> </property> <property> <name>hadoop.tmp.dir</name> <value>/root/hadoop/tmp/</value> </property></configuration> |
通过配置 hadoop-env.sh 文件指定 JAVA_HOME。注释掉这一行并指定自己的 JAVA_HOME 目录。export JAVA_HOME=<JAVA_HOME_DIR> |
在 master 文件中添加主节点的 IP 地址。
在 slave 文件中添加从节点的 IP 地址。
设置从节点
把 hadoop-site.xml、hadoop-env.sh、masters 和 slaves 复制到每个从节点;可以使用 SCP 或其他复制工具。
对 HDFS 进行格式化
运行以下命令对 HDFS 分布式文件系统进行格式化。<Hadoop_home>/bin/hadoop namenode -format |
检查 Hadoop 集群
现在,可以使用 bin/start-all.sh 启动 Hadoop 集群。命令输出指出主节点和从节点上的一些日志。检查这些日志,确认一切正常。如果弄乱了什么东西,可以格式化 HDFS 并清空 hadoop-site.xml 中指定的临时目录,然后重新启动。
访问以下 URL,确认主节点和从节点是正常的。
现在,已经在云中设置了 Hadoop 集群,该运行 MapReduce 应用程序了。
创建 MapReduce 应用程序
MapReduce 应用程序必须具备 “映射” 和 “缩减” 的性质,也就是说任务或作业可以分割为小片段以进行并行处理。然后,可以缩减每个子任务的结果,得到原任务的结果。这种任务之一是网站关键字搜索。搜索和抓取任务可以分割为子任务并分配给从节点,然后在主节点上聚合所有结果并得到最终结果。
试用示例应用程序
Hadoop 附带一些用于测试的示例应用程序。其中之一是单词计数器,它统计某一单词在几个文件中出现的次数。通过运行这个应用程序检查 Hadoop 集群。
首先,把输入文件放在分布式文件系统中(conf/ 目录下面)。我们将统计单词在这些文件中出现的次数。$ bin/hadoop fs –put conf input |
然后,运行这个示例应用程序,以下命令统计以 “dfs” 开头的单词出现的次数。$ bin/hadoop jar hadoop-*-examples.jar grep input output 'dfs[a-z.]+' |
命令的输出说明映射和缩减过程。
前两个命令会在 HDFS 中生成两个目录,“input” 和 “output”。可以使用以下命令列出它们。
查看分布式文件系统中已经输出的文件。它以键-值对的形式列出以 “dfs*” 开头的单词出现的次数。$ bin/hadoop fs -cat ouput/* |
现在,访问 JobTracker 站点查看完成的作业日志。
创建 Log Analyzer MapReduce 应用程序
现在创建一个 Portal (IBM WebSphere® Portal v6.0) Log Analyzer 应用程序,它与 Hadoop 中的 WordCount 应用程序有许多共同点。这个分析程序搜索所有 Portal 的 SystemOut*.log 文件,显示在特定的时间段内应用程序在 Portal 上启动了多少次。
在 Portal 环境中,所有日志分割为 5MB 的片段,很适合由几个节点并行地分析。
hadoop.sample.PortalLogAnalyzer.java
public class PortalLogAnalyzer { public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private static String APP_START_TOKEN = "Application started:"; private Text application = new Text(); public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); if(line.indexOf(APP_START_TOKEN) > -1) { int startIndex = line.indexOf(APP_START_TOKEN); startIndex += APP_START_TOKEN.length(); String appName = line.substring(startIndex).trim(); application.set(appName); output.collect(application, new IntWritable(1)); } }} public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int sum = 0; while(values.hasNext()) { sum += values.next().get(); } output.collect(key, new IntWritable(sum)); }} public static void main(String[] args) throws IOException { JobConf jobConf = new JobConf(PortalLogAnalyzer.class); jobConf.setJobName("Portal Log Analizer"); jobConf.setOutputKeyClass(Text.class); jobConf.setOutputValueClass(IntWritable.class); jobConf.setMapperClass(Map.class); jobConf.setCombinerClass(Reduce.class); jobConf.setReducerClass(Reduce.class); jobConf.setInputFormat(TextInputFormat.class); jobConf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(jobConf, new Path(args[0])); FileOutputFormat.setOutputPath(jobConf, new Path(args[1])); JobClient.runJob(jobConf);}} |
对 Hadoop API 的完整解释请参见 Hadoop 网站上的 API 文档。这里只做简要说明。
Map 类实现映射功能,它搜索日志文件的每一行,寻找应用程序的名称。然后把应用程序名称以键-值对的形式放在输出集合中。
Reduce 类计算具有相同键(相同应用程序名称)的所有值的总和。因此,这个应用程序最终输出的键-值对表示每个应用程序在 Portal 上启动的次数。
Main 函数配置并运行 MapReduce 作业。
运行 PortalLogAnalyzer
首先,把这些 Java 代码复制到主节点并编译。把 Java 代码复制到<hadoop_home>/workspace 目录中。对它执行编译并存档在一个 Jar 文件中,后面 hadoop 命令将运行这个文件。$ mkdir classes$ javac –cp ../hadoop-0.19.1-core.jar –d classes hadoop/sample/PortalLogAnalyzer.java$ jar –cvf PortalLogAnalyzer.jar –C classes/ . |
把 Portal 日志复制到 workspace/input 中。假设有多个日志文件,其中包含 2009 年 5 月的所有日志。把这些日志放到 HDFS 中。$ bin/hadoop fs –put workspace/input input2 |
在运行 PortalLogAnalyzer 时,输出说明映射和缩减过程。$ bin/hadoop jar workspace/PortalLogAnalizer.jar hadoop.sample.PortalLogAnalizer input2 output2 |
图 3. 任务的输出
应用程序执行完之后,输出应该与图 4 相似。$ bin/hadoop fs –cat output2/* |
图 4. 部分输出
在访问 JobTracker 站点时,会看到另一个完成的作业。注意图 5 中的最后一行。
图 5. 完成的作业
|