为什么spark比mapreduce与spark快

中国领先的IT技术网站
51CTO旗下网站
Spark 是否真的比 MapReduce 技高一筹
作为一个开源的数据处理框架,Spark 是如何做到如此迅速地处理数据的呢?秘密就在于它是运行在集群的内存上的,而且不受限于 MapReduce 的二阶段范式。这大大加快了重复访问同一数据的速度。听上去好像 Spark 已经注定要取代 Hadoop MapReduce 了。但真的是这样吗?
作者:译:古月水语来源:| 10:05
Apache 基金会下的 Spak 再次引爆了大数据的话题。带着比 Hadoop MapReduce 速度要快 100 倍的承诺以及更加灵活方便的
API,一些人认为这或许预示着 Hadoop MapReduce 的终结。
作为一个开源的数据处理框架,Spark 是如何做到如此迅速地处理数据的呢?秘密就在于它是运行在集群的内存上的,而且不受限于 MapReduce
的二阶段范式。这大大加快了重复访问同一数据的速度。
Spark 既可以单独运行,也可以运行在 Hadoop YARN
上(注:Hadoop第二代框架中的改进框架,用于将资源管理和处理组件分开,基于YARN的结构不受 MapReduce 约束),此时 Spark 可以直接从
HDFS (Hadoop Distributed File System 分布式文件系统)中读取数据。 诸如
Yahoo(雅虎)、Intel(因特尔)、Baidu(百度)、Trend Micro(趋势科技)和 Groupon(高朋)等公司已经在使用 Spark
听上去好像 Spark 已经注定要取代 Hadoop MapReduce 了。但真的是这样吗?本文我们将对比这两个平台来看看是否 Spark
真的技高一筹。
Spark 在内存中处理数据,而 Hadoop MapReduce 是通过 map 和 reduce 操作在磁盘中处理数据。因此从这个角度上讲 Spark
的性能应该是超过 Hadoop MapReduce 的。
然而,既然在内存中处理,Spark 就需要很大的内存容量。就像一个标准的数据库系统操作一样, Spark
每次将处理过程加载到内存之中,然后该操作作为缓存一直保持在内存中直到下一步操作。如果 Spark 与其它资源需求型服务一同运行在 Hadoop YARN
上,又或者数据块太大以至于不能完全读入内存,此时 Spark 的性能就会有很大的降低。
与此相反, MapReduce 会在一个工作完成的时候立即结束该进程,因此它可以很容易的和其它服务共同运行而不会产生明显的性能降低。
当涉及需要重复读取同样的数据进行迭代式计算的时候,Spark 有着自身优势。 但是当涉及单次读取、类似 ETL
(抽取、转换、加载)操作的任务,比如数据转化、数据整合等时,MapReduce 绝对是不二之选,因为它就是为此而生的。
小结:当数据大小适于读入内存,尤其是在专用集群上时,Spark 表现更好;Hadoop MapReduce
适用于那些数据不能全部读入内存的情况,同时它还可以与其它服务同时运行。
Spark 有着灵活方便的Java,Scala和 Python 的API,同时对已经熟悉 SQL 的技术员工来说, Spark 还适用 Spark
SQL(也就是之前被人熟知的 Shark)。多亏了 Spark
提供的简单易用的构造模块,我们可以很容易的编写自定义函数。它甚至还囊括了可以即时反馈的交互式命令模式。
Hadoop MapReduce 是用 Java 编写的,但由于其难于编程而备受诟病。尽管需要一定时间去学习语法,Pig 还是在一定程度上简化了这个过程,
Hive也为平台提供了 SQL 的兼容。一些 Hadoop 工具也可以无需编程直接运行 MapReduce 任务。Xplenty 就是一个基于 Hadoop
的数据整合服务,而且也不需要进行任何编程和部署。
尽管 Hive 提供了命令行接口,但 MapReduce 并没有交互式模式。诸如 Impala,Presto 和 Tez 等项目都在尝试希望为
Hadoop 提供全交互式查询模式。
安装与维护方面, Spark 并不绑定在 Hadoop 上,虽然 在 Hortonworks(HDP 2.2 版) 和 Cloudera(CDH 5 版)
的产品中 Spark 和 Hadoop MapReduce 都包含在其分布式系统中。(注: Cloudera, Hortonworks 及 MapR 是
Hadoop 领域三大知名的初创公司,致力于打造更好的 Hadoop 企业版应用)。
小结:Spark 更易于编程,同时也包含交互式模式;Hadoop MapReduce 不易编程但是现有的很多工具使其更易于使用。
Spark 和 Hadoop MapReduce 都是开源的,但是机器和人工的花费仍是不可避免的。
这两个框架既可以在商用服务器上也可以运行在云端,下表可以看到它们有着相似的硬件需求:
框架 Apache Spark Apache Hadoop balanced workload slaves
内核 8&16 4
内存 8 GB 到数百GB 24 GB
硬盘 4&8 4&6 1TB
网络 10 GB 或更多 1 GB 以太网
Spark 集群的内存至少要和需要处理的数据块一样大,因为只有数据块和内存大小合适才能发挥出其最优的性能。所以如果真的需要处理非常大的数据,Hadoop
绝对是合适之选,毕竟硬盘的费用要远远低于内存的费用。
考虑到 Spark
的性能标准,在执行相同的任务的时候,需要的硬件更少而运行速度却更快,因此应该是更合算的,尤其是在云端的时候,此时只需要即用即付。
在技术人员方面,即使 Hadoop 从 2005 年就开始普及,但是 MapReduce 方面的专家仍然存在着短缺。而对于从 2010 年才开始普及的
Spark ,这又意味着什么呢? 或许投身 Spark 学习的人正在快速增加,但是相比于 Hadoop MapReduce
仍然存在着更大的技术人才的缺口。
进一步讲,现存了大量的 Hadoop 即服务的资料和基于 Hadoop 的服务(比如我们 Xplenty
的数据整合服务),这些都降低对技术人员能力和底层硬件知识的要求。相比之下,几乎没有现有可选的 Spark 服务,仅有的那些也是新产品。
小结:根据基准要求, Spark 更加合算, 尽管人工成本会很高。依靠着更多熟练的技术人员和 Hadoop 即服务的供给, Hadoop
MapReduce 可能更便宜。
Spark 既可以单独运行,也可以在 Hadoop YARN 上,或者在预置 Mesos 上以及云端。它支持实现 Hadoop
输入范式的数据源,所以可以整合所有 Hadoop 支持的数据源和文件格式。 根据 Spark 官方教程, 它还可以通过 JDBC 和 ODBC 同
BI(商业智能) 工具一起运行。 Hive 和 Pig 也在逐步实现这样的功能。
小结: Spark 和 Hadoop MapReduce 具有相同的数据类型和数据源的兼容性。
除了平常的数据处理,Spark 可以做的远不止这点:它还可以处理图和利用现有的机器学习库。高性能也使得 Spark
在实时处理上的表现和批处理上的表现一样好。这也催生了一个更好的机遇,那就是用一个平台解决所有问题而不是只能根据任务选取不同的平台,毕竟所有的平台都需要学习和维护。
Hadoop MapReduce 在批处理上表现卓越。如果需要进行实时处理,可以利用另外的平台比如 Storm 或者 Impala,而图处理则可以用
Giraph。MapReduce 过去是用 Mahout 做机器学习的,但其负责人已经将其抛弃转而支持 Spark 和 h2o(机器学习引擎)。
小结:Spark 是数据处理的瑞士军刀;Hadoop MapReduce 是批处理的突击刀。
和 MapReduce 一样, Spark 会重试每个任务并进行预测执行。然而,MapReduce
是依赖于硬盘驱动器的,所以如果一项处理中途失败,它可以从失败处继续执行,而 Spark 则必须从头开始执行,所以 MapReduce 这样节省了时间。
小结:Spark 和 Hadoop MapReduce 都有着较好的容错能力,但是 Hadoop MapReduce 要稍微更好一点。
在安全性上, 此时的 Spark 还略显不足。 授权验证由共享秘钥机制支持,网络用户接口则通过 servlet 过滤器和事件日志保护。Spark
可以运行在 YARN 上并配合使用 HDFS, 这也就意味着它同时还拥有 Kerberos 认证授权验证,HDFS 文件许可机制和节点间的加密机制。
Hadoop MapReduce 拥有所有 Hadoop 支持的安全机制,同时也整合了其它基于 Hadoop 的安全项目, 比如 Knox 网关和
Sentry。志在解决 Hadoop 安全的 Rhino 项目也只是在添加 Sentry 支持时添加了 Spark 支持。否则 Spark
开发者们只能自己去提升其安全性了。
小结: Spark 的安全机制仍处在发展期。 Hadoop MapReduce 拥有更多安全控制机制和项目。
Spark 是大数据领域冉冉升起的新星,但是 Hadoop MapReduce 仍有着较广的应用领域。
在内存中进行数据处理使得 Spark 具有较好的性能表现,也比较高效合算。它兼容所有 Hadoop 的数据源和文件格式, 支持多种语言的简单易用的 API
也使人们更快速的可以上手。 Spark 甚至实现了图处理和机器学习工具。
Hadoop MapReduce
是一个更加成熟的平台,为进行批处理而生。当遇到确实非常大的数据以至于无法完全读入内存,又或是依靠着大量对该平台有经验的技术人员,它可能会比 Spark
更加合算。 而且围绕 Hadoop MapReduce 的衍生系统正在依靠着更多的支撑项目、工具和云服务而更加壮大。
但是即使看上去 Spark 像是最终的赢家,问题在于我们永远不会单独使用它&我们需要 HDFS 存储数据,或许还会需要用到
HBase,Hive,Pig,Impala 或其他 Hadoop 项目。这意味着在处理非常大的数据的时候,Spark 仍然需要同 Hadoop 和
MapReduce 共同运行。【编辑推荐】【责任编辑: TEL:(010)】
大家都在看猜你喜欢
热点头条热点头条头条
24H热文一周话题本月最赞
讲师:5人学习过
讲师:1人学习过
讲师:4人学习过
精选博文论坛热帖下载排行
本书作为思科认证体系中的入门级教材,主要讲述了网络的基本知识和思科设备的基本命令,以及路由、交换等深层次网络知识的入门知识,其体系...
订阅51CTO邮刊Spark vs. MapReduce 时间节约66%,计算节约40%
发表于 17:10|
作者腾讯大数据
摘要:本文将介绍基于物品的协同过滤推荐算法案例在TDW Spark与MapReudce上的实现对比,相比于MapReduce,TDW Spark执行时间减少了66%,计算成本降低了40%。
MapReduce为大数据挖掘提供了有力的支持,但是复杂的挖掘算法往往需要多个MapReduce作业才能完成,多个作业之间存在着冗余的磁盘读写开销和多次资源申请过程,使得基于MapReduce的算法实现存在严重的性能问题。后起之秀Spark得益于其在迭代计算和内存计算上的优势,可以自动调度复杂的计算任务,避免中间结果的磁盘读写和资源申请过程,非常适合数据挖掘算法。腾讯TDW Spark平台基于社区最新Spark版本进行深度改造,在性能、稳定和规模方面都得到了极大的提高,为大数据挖掘任务提供了有力的支持。本文将介绍基于物品的协同过滤推荐算法案例在TDW Spark与MapReudce上的实现对比,相比于MapReduce,TDW Spark执行时间减少了66%,计算成本降低了40%。算法介绍互联网的发展导致了信息爆炸。面对海量的信息,如何对信息进行刷选和过滤,将用户最关注最感兴趣的信息展现在用户面前,已经成为了一个亟待解决的问题。推荐系统可以通过用户与信息之间的联系,一方面帮助用户获取有用的信息,另一方面又能让信息展现在对其感兴趣的用户面前,实现了信息提供商与用户的双赢。协同过滤推荐(Collaborative Filtering Recommendation)算法是最经典最常用的推荐算法,算法通过分析用户兴趣,在用户群中找到指定用户的相似用户,综合这些相似用户对某一信息的评价,形成系统对该指定用户对此信息的喜好程度预测。协同过滤可细分为以下三种:User-based CF: 基于User的协同过滤,通过不同用户对Item的评分来评测用户之间的相似性,根据用户之间的相似性做出推荐;Item-based CF: 基于Item的协同过滤,通过用户对不同Item的评分来评测Item之间的相似性,根据Item之间的相似性做出推荐;Model-based CF: 以模型为基础的协同过滤(Model-based
Collaborative Filtering)是先用历史资料得到一个模型,再用此模型进行预测推荐。问题描述输入数据格式:Uid,ItemId,Rating&(用户Uid对ItemId的评分)。输出数据:每个ItemId相似性最高的前N个ItemId。由于篇幅限制,这里我们只选择基于Item的协同过滤算法解决这个例子。算法逻辑基于Item的协同过滤算法的基本假设为两个相似的Item获得同一个用户的好评的可能性较高。因此,该算法首先计算用户对物品的喜好程度,然后根据用户的喜好计算Item之间的相似度,最后找出与每个Item最相似的前N个Item。该算法的详细描述如下:计算用户喜好:不同用户对Item的评分数值可能相差较大,因此需要先对每个用户的评分做二元化处理,例如对于某一用户对某一Item的评分大于其给出的平均评分则标记为好评1,否则为差评0。计算Item相似性:采用Jaccard系数作为计算两个Item的相似性方法。狭义Jaccard相似度适合计算两个集合之间的相似程度,计算方法为两个集合的交集除以其并集,具体的分为以下三步。1)&
Item好评数统计,统计每个Item的好评用户数。2)&
Item好评键值对统计,统计任意两个有关联Item的相同好评用户数。3)&
Item相似性计算,计算任意两个有关联Item的相似度。
找出最相似的前N个Item。这一步中,Item的相似度还需要归一化后整合,然后求出每个Item最相似的前N个Item,具体的分为以下三步。
Item相似性归一化。2)&
Item相似性评分整合。3)&
获取每个Item相似性最高的前N个Item。
基于MapReduce的实现方案使用MapReduce编程模型需要为每一步实现一个MapReduce作业,一共存在包含七个MapRduce作业。每个MapReduce作业都包含Map和Reduce,其中Map从HDFS读取数,输出数据通过Shuffle把键值对发送到Reduce,Reduce阶段以&key,Iterator&value&&作为输入,输出经过处理的键值对到HDFS。其运行原理如图1 所示。图1七个MapReduce作业意味着需要七次读取和写入HDFS,而它们的输入输出数据存在关联,七个作业输入输出数据关系如图2所示。图2基于MapReduce实现此算法存在以下问题:
为了实现一个业务逻辑需要使用七个MapReduce作业,七个作业间的数据交换通过HDFS完成,增加了网络和磁盘的开销。七个作业都需要分别调度到集群中运行,增加了Gaia集群的资源调度开销。MR2和MR3重复读取相同的数据,造成冗余的HDFS读写开销。
这些问题导致作业运行时间大大增长,作业成本增加。基于Spark的实现方案相比与MapReduce编程模型,Spark提供了更加灵活的DAG(Directed Acyclic Graph) 编程模型, 不仅包含传统的map、reduce接口, 还增加了filter、flatMap、union等操作接口,使得编写Spark程序更加灵活方便。使用Spark编程接口实现上述的业务逻辑如图3所示。图3相对于MapReduce,Spark在以下方面优化了作业的执行时间和资源使用。
DAG编程模型。 通过Spark的DAG编程模型可以把七个MapReduce简化为一个Spark作业。Spark会把该作业自动切分为八个Stage,每个Stage包含多个可并行执行的Tasks。Stage之间的数据通过Shuffle传递。最终只需要读取和写入HDFS一次。减少了六次HDFS的读写,读写HDFS减少了70%。Spark作业启动后会申请所需的Executor资源,所有Stage的Tasks以线程的方式运行,共用Executors,相对于MapReduce方式,Spark申请资源的次数减少了近90%。Spark引入了RDD(Resilient
Distributed Dataset)模型,中间数据都以RDD的形式存储,而RDD分布存储于slave节点的内存中,这就减少了计算过程中读写磁盘的次数。RDD还提供了Cache机制,例如对上图的rdd3进行Cache后,rdd4和rdd7都可以访问rdd3的数据。相对于MapReduce减少MR2和MR3重复读取相同数据的问题。效果对比
测试使用相同规模的资源,其中MapReduce方式包含200个Map和100个Reduce,每个Map和Reduce配置4G的内存; 由于Spark不再需要Reduce资源, 而MapReduce主要逻辑和资源消耗在Map端,因此使用200和400个Executor做测试,每个Executor包含4G内存。测试结果如下表所示,其中输入记录约38亿条。
运行时间(min)
成本(Slot*秒)
200 Map+100 Reduce(4G)
200 Executor(4G)
400 Executor(4G)
对比结果表的第一行和第二行,Spark运行效率和成本相对于MapReduce方式减少非常明显,其中,DAG模型减少了70%的HDFS读写、cache减少重复数据的读取,这两个优化即能减少作业运行时间又能降低成本;而资源调度次数的减少能提高作业的运行效率。对比结果表的第二行和第三行,增加一倍的Executor数目,作业运行时间减少约50%,成本增加约25%,从这个结果看到,增加Executor资源能有效的减少作业的运行时间,但并没有做到完全线性增加。这是因为每个Task的运行时间并不是完全相等的, 例如某些task处理的数据量比其他task多;这可能导致Stage的最后时刻某些Task未结束而无法启动下一个Stage,另一方面作业是一直占有Executor的,这时候会出现一些Executor空闲的状况,于是导致了成本的增加。
数据挖掘类业务大多具有复杂的处理逻辑,传统的MapReduce/Pig类框架在应对此类数据处理任务时存在着严重的性能问题。针对这些任务,如果利用Spark的迭代计算和内存计算优势,将会大幅降低运行时间和计算成本。TDW目前已经维护了千台规模的Spark集群,并且会在资源利用率、稳定性和易用性等方面做进一步的提升和改进,为业务提供更有利的支持。
推荐阅读相关主题:
CSDN官方微信
扫描二维码,向CSDN吐槽
微信号:CSDNnews
相关热门文章48被浏览4536分享邀请回答1添加评论分享收藏感谢收起主题信息(必填)
主题描述(最多限制在50个字符)
申请人信息(必填)
申请信息已提交审核,请注意查收邮件,我们会尽快给您反馈。
如有疑问,请联系
CSDN &《程序员》研发主编,投稿&纠错等事宜请致邮
你只管努力,剩下的交给时光!
如今的编程是一场程序员和上帝的竞赛,程序员要开发出更大更好、傻瓜都会用到软件。而上帝在努力创造出更大更傻的傻瓜。目前为止,上帝是赢的。个人网站:。个人QQ群:、
个人大数据技术博客:
对比 Spark 和 MapReduce如何将 MapReduce 转化为 Spark
MapReduce VS
Spark目前的大数据处理可以分为以下三个类型:复杂的批量数据处理(batch data processing),通常的时间跨度在数十分钟到数小时之间;基于历史数据的交互式查询(interactive query),通常的时间跨度在数十秒到数分钟之间;基于实时数据流的数据处理(streaming data processing),通常的时间跨度在数百毫秒到数秒之间。大数据处理势必需要依赖集群环境,而集群环境有三大挑战,分别是并行化、单点失败处理、资源共享,分别可以采用以并行化的方式重写应用程序、对单点失败的处理方式、动态地进行计算资源的分配等解决方案来面对挑战。针对集群环境出现了大量的大数据编程框架,首先是 Google 的 MapReduce,它给我们展示了一个简单通用和自动容错的批处理计算模型。但是对于其他类型的计算,比如交互式和流式计算,MapReduce 并不适合。这也导致了大量的不同于 MapReduce 的专有的数据处理模型的出现,比如 Storm、Impala 等等。但是这些专有系统也存在一些不足:重复工作:许多专有系统在解决同样的问题,比如分布式作业以及容错,举例来说,一个分布式的 SQL 引擎或者一个机器学习系统都需要实现并行聚合,这些问题在每个专有系统中会重复地被解决。组合问题:在不同的系统之间进行组合计算是一件麻烦的事情。对于特定的大数据应用程序而言,中间数据集是非常大的,而且移动的成本很高。在目前的环境下,我们需要将数据复制到稳定的存储系统,比如 HDFS,以便在不同的计算引擎中进行分享。然而,这样的复制可能比真正的计算所花费的代价要大,所以以流水线的形式将多个系统组合起来效率并不高。适用范围的局限性:如果一个应用不适合一个专有的计算系统,那么使用者只能换一个系统,或者重写一个新的计算系统。资源分配:在不同的计算引擎之间进行资源的动态共享比较困难,因为大多数的计算引擎都会假设它们在程序运行结束之前拥有相同的机器节点的资源。管理问题:对于多个专有系统,需要花费更多的精力和时间来管理和部署,尤其是对于终端使用者而言,需要学习多种 API 和系统模型。Spark 是伯克利大学推出的大数据处理框架,它提出了 RDD 概念 (Resilient Distributed
Datasets),即抽象的弹性数据集概念。Spark 是对 MapReduce 模型的一种扩展。要在 MapReduce 上实现其不擅长的计算工作 (比如迭代式、交互式和流式),是比较困难的,因为 MapReduce 缺少在并行计算的各个阶段进行有效的数据共享的能力,而这种能力是 RDD 的本质所在。利用这种有效地数据共享和类似 MapReduce 的操作接口,上述的各种专有类型计算都能够有效地表达,而且能够获得与专有系统同等的性能。MapReduce 和 Spark 介绍MapReduceMapReduce 是为 Apache
Hadoop 量身订做的,它非常适用于 Hadoop 的使用场景,即大规模日志处理系统、批量数据提取加载工具 (ETL 工具) 等类似操作。但是伴随着 Hadoop 地盘的不断扩张,Hadoop 的开发者们发现 MapReduce 在很多场景下并不是最佳选择,于是 Hadoop 开始把资源管理放入到自己独立的组件 YARN 里面。此外,类似于 Impala 这样的项目也开始逐渐进入到我们的架构中,Impala 提供 SQL 语义,能查询存储在 Hadoop 的 HDFS 和 HBase 中的 PB 级大数据。之前也有类似的项目,例如 Hive。Hive 系统虽然也提供了 SQL 语义,但由于 Hive 底层执行使用的是 MapReduce 引擎,仍然是一个批处理过程,难以满足查询的交互性。相比之下,Impala 的最大特点也是最大卖点就是它的效率。第一代 Hadoop
MapReduce 是一个在计算机集群上分布式处理海量数据集的软件框架,包括一个 JobTracker 和一定数量的 TaskTracker。运行流程图如图 1 所示。图 1. MapReduce 运行流程图在最上层有 4 个独立的实体,即客户端、jobtracker、tasktracker 和分布式文件系统。客户端提交 MapReduce 作业;jobtracker 协调作业的运行;jobtracker 是一个 Java 应用程序,它的主类是 JobTracker;tasktracker 运行作业划分后的任务,tasktracker 也是一个 Java 应用程序,它的主类是 TaskTracker。Hadoop 运行 MapReduce 作业的步骤主要包括提交作业、初始化作业、分配任务、执行任务、更新进度和状态、完成作业等 6 个步骤。Spark 简介Spark 生态系统的目标就是将批处理、交互式处理、流式处理融合到一个软件框架内。Spark 是一个基于内存计算的开源的集群计算系统,目的是让数据分析更加快速。Spark 非常小巧玲珑,由加州伯克利大学 AMP 实验室的 Matei 为主的小团队所开发。使用的语言是 Scala,项目的 core 部分的代码只有 63 个 Scala 文件,非常短小精悍。Spark
启用了内存分布数据集,除了能够提供交互式查询外,它还可以优化迭代工作负载。Spark 提供了基于内存的计算集群,在分析数据时将数据导入内存以实现快速查询,速度比基于磁盘的系统,如 Hadoop 快很多。Spark 最初是为了处理迭代算法,如机器学习、图挖掘算法等,以及交互式数据挖掘算法而开发的。在这两种场景下,Spark 的运行速度可以达到 Hadoop 的几百倍。Spark 允许应用在内存中保存工作集以便高效地重复利用,它支持多种数据处理应用,同时也保持了 MapReduce 的重要特性,如高容错性、数据本地化、大规模数据处理等。此外,提出了弹性分布式数据集 (Resilient
Distributed Datasets) 的概念:RDD 表现为一个 Scala 对象,可由一个文件创建而来;分布在一个集群内的,不可变的对象切分集;通过并行处理(map、filter、groupby、join)固定数据(BaseRDD)创建模型,生成 Transformed
RDD;故障时可使用 RDD 血统信息重建;可高速缓存,以便再利用。图 2 所示是一个日志挖掘的示例代码,首先将日志数据中的 error 信息导入内存,然后进行交互搜索。图 2. RDD 代码示例在导入数据时,模型以 block 形式存在于 worker 上,由 driver 向 worker 分发任务,处理完后 work 向 driver 反馈结果。也可在 work 上对数据模型建立高速缓存 cache,对 cache 的处理过程与 block 类似,也是一个分发、反馈的过程。Spark 的 RDD 概念能够取得和专有系统同样的性能,还能提供包括容错处理、滞后节点处理等这些专有系统缺乏的特性。迭代算法:这是目前专有系统实现的非常普遍的一种应用场景,比如迭代计算可以用于图处理和机器学习。RDD 能够很好地实现这些模型,包括 Pregel、HaLoop 和 GraphLab 等模型。关系型查询:对于 MapReduce 来说非常重要的需求就是运行 SQL 查询,包括长期运行、数小时的批处理作业和交互式的查询。然而对于 MapReduce 而言,对比并行数据库进行交互式查询,有其内在的缺点,比如由于其容错的模型而导致速度很慢。利用 RDD 模型,可以通过实现许多通用的数据库引擎特性,从而获得很好的性能。MapReduce 批处理:RDD 提供的接口是 MapReduce 的超集,所以 RDD 能够有效地运行利用 MapReduce 实现的应用程序,另外 RDD 还适合更加抽象的基于 DAG 的应用程序。流式处理:目前的流式系统也只提供了有限的容错处理,需要消耗系统非常大的拷贝代码或者非常长的容错时间。特别是在目前的系统中,基本都是基于连续计算的模型,常住的有状态的操作会处理到达的每一条记录。为了恢复失败的节点,它们需要为每一个操作复制两份操作,或者将上游的数据进行代价较大的操作重放,利用 RDD 实现离散数据流,可以克服上述问题。离散数据流将流式计算当作一系列的短小而确定的批处理操作,而不是常驻的有状态的操作,将两个离散流之间的状态保存在 RDD 中。离散流模型能够允许通过 RDD 的继承关系图进行并行性的恢复而不需要进行数据拷贝。Spark 内部术语解释Application:基于 Spark 的用户程序,包含了 driver 程序和集群上的 executor;Driver Program:运行 main 函数并且新建 SparkContext 的程序;Cluster Manager:在集群上获取资源的外部服务 (例如:standalone,Mesos,Yarn);Worker Node:集群中任何可以运行应用代码的节点;Executor:是在一个 worker
node 上为某应用启动的一个进程,该进程负责运行任务,并且负责将数据存在内存或者磁盘上。每个应用都有各自独立的 executors;Task:被送到某个 executor 上的工作单元;Job:包含很多任务的并行计算,可以与 Spark 的 action 对应;Stage:一个 Job 会被拆分很多组任务,每组任务被称为 Stage(就像 Mapreduce 分 map 任务和 reduce 任务一样)。SparkDemo 程序运行Spark 源代码可以在 http://spark-project.org/download 处下载,也可以到 github 直接复制 Spark 项目。Spark 提供基本源码压缩包,同时也提供已经编译好的压缩包。Spark 是通过 Scala
Shell 来与外界进行交互的。开始安装,推荐的方法是首先在第一个节点上部署和启动 master,获取 master spark
url,然后在部署到其他节点之前修改 conf/spark-env.sh 中的内容。开始单机 master 服务:./bin/start-master.sh下载了 spark-0.9.1-bin-cdh4 后上传到/home/zhoumingyao 目录 (可以自定义目录,本例使用的是 CentosV6.5 操作系统) 下,具体子目录如清单 1 所示。清单 1. 目录列表-rw-r--r-- 1 root root 3899 3 月 27 13:36 README.md
-rw-r--r-- 1 root root 25379 3 月 27 13:36 pom.xml
-rw-r--r-- 1 root root 162 3 月 27 13:36 NOTICE
-rw-r--r-- 1 root root 4719 3 月 27 13:36 make-distribution.sh
-rw-r--r-- 1 root root 21118 3 月 27 13:36 LICENSE
-rw-r--r-- 1 root root
月 27 13:36 CHANGES.txt
drwxr-xr-x 4 root root 4096 5 月 6 13:35 assembly
drwxr-xr-x 4 root root 4096 5 月 6 13:36 bagel
drwxr-xr-x 2 root root 4096 5 月 6 13:36 bin
drwxr-xr-x 2 root root 4096 5 月 6 13:36 conf
drwxr-xr-x 4 root root 4096 5 月 6 13:37 core
drwxr-xr-x 2 root root 4096 5 月 6 13:37 data
drwxr-xr-x 4 root root 4096 5 月 6 13:37 dev
drwxr-xr-x 3 root root 4096 5 月 6 13:37 docker
drwxr-xr-x 7 root root 4096 5 月 6 13:37 docs
drwxr-xr-x 4 root root 4096 5 月 6 13:37 ec2
drwxr-xr-x 4 root root 4096 5 月 6 13:37 examples
drwxr-xr-x 7 root root 4096 5 月 6 13:38 external
drwxr-xr-x 3 root root 4096 5 月 6 13:38 extras
drwxr-xr-x 5 root root 4096 5 月 6 13:38 graphx
drwxr-xr-x 5 root root 4096 5 月 6 13:38 mllib
drwxr-xr-x 3 root root 4096 5 月 6 13:38 project
drwxr-xr-x 6 root root 4096 5 月 6 13:38 python
drwxr-xr-x 4 root root 4096 5 月 6 13:38 repl
drwxr-xr-x 2 root root 4096 5 月 6 13:38 sbin
drwxr-xr-x 2 root root 4096 5 月 6 13:38 sbt
drwxr-xr-x 4 root root 4096 5 月 6 13:39 streaming
drwxr-xr-x 3 root root 4096 5 月 6 13:39 target
drwxr-xr-x 4 root root 4096 5 月 6 13:39 tools
drwxr-xr-x 5 root root 4096 5 月 6 13:39 yarn进入 bin 目录,执行 spark-shell.sh,进入 scala shell 状态,如清单 2 所示。清单 2. 运行命令scala& val data = Array(1, 2, 3, 4, 5) //产生 data
data: Array[Int] = Array(1, 2, 3, 4, 5)下面开始将 data 处理成 RDD,如清单 3 所示。清单 3. 处理成 RDDscala& val distData = sc.parallelize(data) //将 data 处理成 RDD
distData: spark.RDD[Int] = spark.ParallelCollection@7a0ec850(显示出的类型为 RDD)清单 4. 在 RDD 上运算scala& distData.reduce(_+_) //在 RDD 上进行运算,对 data 里面元素进行加和清单 5. 启动 Spark[root@facenode1 sbin]# ./start-all.sh
starting org.apache.spark.deploy.master.Master,
logging to /home/zhoumingyao/spark-0.9.1-bin-cdh4/sbin/../logs/
spark-root-org.apache.spark.deploy.master.Master-1-facenode1.out
localhost: Warning: Permanently added 'localhost' (RSA)
to the list of known hosts.localhost:
starting org.apache.spark.deploy.worker.Worker, logging to
/home/zhoumingyao/spark-0.9.1-bin-cdh4/sbin/../
logs/spark-root-org.apache.spark.deploy.worker.Worker-1-facenode1.out清单 6. 查看服务[root@facenode1 sbin]# ps -ef | grep spark
16:45 pts/1 00:00:03 /usr/java/jdk1.6.0_31/bin/java
-cp :/home/zhoumingyao/spark-0.9.1-bin-cdh4/conf:/home/
zhoumingyao/spark-0.9.1-bin-cdh4/assembly/target/scala-2.10/
spark-assembly_2.10-0.9.1-hadoop2.0.0-mr1-cdh4.2.0.jar:/etc/alternatives/
hadoopconf -Dspark.akka.logLifecycleEvents=true -Djava.library.path= -Xms512m -Xmx512m
org.apache.spark.deploy.master.Master --ip facenode1 --port 7077 --webui-port 8080
16:45 ? 00:00:03 java -cp :/home/zhoumingyao/
spark-0.9.1-bin-cdh4/conf:/home/zhoumingyao/spark-0.9.1-bin-cdh4/
assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.0.0-mr1-cdh4.2.0.jar
-Dspark.akka.logLifecycleEvents=true -Djava.library.path= -Xms512m -Xmx512m
org.apache.spark.deploy.worker.Worker spark://facenode1:7077可以启动多个工作站,通过以下命令连接到 master 服务器,如清单 7 所示。清单 7. 连接 Master 服务器./spark-class org.apache.spark.deploy.worker.Worker spark://facenode1:7077
输出如下:
14/05/06 16:49:06 INFO ui.WorkerWebUI: Started Worker web UI at http://facenode1:8082
14/05/06 16:49:06 INFO worker.Worker: Connecting to master spark://facenode1:7077...
14/05/06 16:49:06 INFO worker.Worker: Successfully registered with master spark://facenode1:7077进入 master server 的 Web UI 可以看到主节点、从节点的工作情况,如清单 8 所示。清单 8. 访问 Web 客户端注意,如果是集群方式,请在 conf 文件夹下面的 slaves 文件里面逐行加入需要加入集群的 master、works 服务器的 ip 地址或者 hostname。MapReduce 转换到 SparkSpark 是类似于 MapReduce 的计算引擎,它提出的内存方式解决了 MapReduce 存在的读取磁盘速度较慢的困难,此外,它基于 Scala 的函数式编程风格和 API,进行并行计算时效率很高。由于 Spark 采用的是 RDD(弹性分布式结果集) 方式对数据进行计算,这种方式与 MapReduce 的 Map()、Reduce() 方式差距较大,所以很难直接使用 Mapper、Reducer 的 API,这也是阻碍 MapReduce 转为 Spark 的绊脚石。Scala 或者 Spark 里面的 map() 和 reduce() 方法与 Hadoop MapReduce 里面的 map()、reduce() 方法相比,Hadoop
MapReduce 的 API 更加灵活和复杂,下面列出了 Hadoop MapReduce 的一些特性:Mappers 和 Reducers 通常使用 key-value 键值对作为输入和输出;一个 key 对应一个 Reducer 的 reduce;每一个 Mapper 或者 Reducer 可能发出类似于 0,1 这样的键值对作为每一次输出;Mappers 和 Reducers 可能发出任意的 key 或者 value,而不是标准数据集方式;Mapper 和 Reducer 对象对每一次 map() 和 reduce() 的调用都存在生命周期。它们支持一个 setup() 方法和 cleanup() 方法,这些方法可以被用来在处理批量数据之前的操作。试想这么一个场景,我们需要计算一个文本文件里每一行的字符数量。在 Hadoop
MapReduce 里,我们需要为 Mapper 方法准备一个键值对,key 用作行的行数,value 的值是这一行的字符数量。清单 9.
MapReduce 方式 Map 函数public class LineLengthCountMapper
extends Mapper&LongWritable,Text,IntWritable,IntWritable& {
protected void map(LongWritable lineNumber, Text line, Context context)
throws IOException, InterruptedException {
context.write(new IntWritable(line.getLength()), new IntWritable(1));
}清单 9 所示代码,由于 Mappers 和 Reducers 只处理键值对,所以对于类 LineLengthCountMapper 而言,输入是 TextInputFormat 对象,它的 key 由行数提供,value 就是该行所有字符。换成 Spark 之后的代码如清单 10 所示。清单 10. Spark 方式 Map 函数lines.map(line =& (line.length, 1))在 Spark 里,输入是弹性分布式数据集 (Resilient Distributed
Dataset),Spark 不需要 key-value 键值对,代之的是 Scala 元祖 (tuple),它是通过 (line.length,
1) 这样的 (a,b) 语法创建的。以上代码中 map() 操作是一个 RDD,(line.length,
1) 元祖。当一个 RDD 包含元祖时,它依赖于其他方法,例如 reduceByKey(),该方法对于重新生成 MapReduce 特性具有重要意义。清单 11 所示代码是 Hadoop MapReduce 统计每一行的字符数,然后以 Reduce 方式输出。清单 11.
MapReduce 方式 Reduce 函数public class LineLengthReducer
extends Reducer&IntWritable,IntWritable,IntWritable,IntWritable& {
protected void reduce(IntWritable length, Iterable&IntWritable& counts, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable count : counts) {
sum += count.get();
context.write(length, new IntWritable(sum));
}Spark 里面的对应代码如清单 12 所示。清单 12.
Spark 方式 Reduce 函数val lengthCounts = lines.map(line =& (line.length, 1)).reduceByKey(_ + _)Spark 的 RDD API 有一个 reduce() 方法,它会 reduce 所有的 key-value 键值对到一个独立的 value。我们现在需要统计大写字母开头的单词数量,对于文本的每一行而言,一个 Mapper 可能需要统计很多个键值对,代码如清单 13 所示。清单 13.
MapReduce 方式计算字符数量public class CountUppercaseMapper
extends Mapper&LongWritable,Text,Text,IntWritable& {
protected void map(LongWritable lineNumber, Text line, Context context)
throws IOException, InterruptedException {
for (String word : line.toString().split(" ")) {
if (Character.isUpperCase(word.charAt(0))) {
context.write(new Text(word), new IntWritable(1));
}在 Spark 里面,对应的代码如清单 14 所示。清单 14. Spark 方式计算字符数量lines.flatMap(
_.split(" ").filter(word =& Character.isUpperCase(word(0))).map(word =& (word,1))
)MapReduce 依赖的 Map 方法这里并不适用,因为每一个输入必须对应一个输出,这样的话,每一行可能占用到很多的输出。相反的,Spark 里面的 Map 方法比较简单。Spark 里面的方法是,首先对每一行数据进行汇总后存入一个输出结果物数组,这个数组可能是空的,也可能包含了很多的值,最终这个数组会作为一个 RDD 作为输出物。这就是 flatMap() 方法的功能,它对每一行文本里的单词转换成函数内部的元组后进行了过滤。在 Spark 里面,reduceByKey() 方法可以被用来统计每篇文章里面出现的字母数量。如果我们想统计每一篇文章里面出现的大写字母数量,在 MapReduce 里程序可以如清单 15 所示。清单 15. MapReduce 方式public class CountUppercaseReducer
extends Reducer&Text,IntWritable,Text,IntWritable& {
protected void reduce(Text word, Iterable&IntWritable& counts, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable count : counts) {
sum += count.get();
context.write(new Text(word.toString().toUpperCase()), new IntWritable(sum));
}在 Spark 里,代码如清单 16 所示。清单 16. Spark 方式groupByKey().map { case (word,ones) =& (word.toUpperCase, ones.sum) }groupByKey() 方法负责收集一个 key 的所有值,不应用于一个 reduce 方法。本例中,key 被转换成大写字母,值被直接相加算出总和。但这里需要注意,如果一个 key 与很多个 value 相关联,可能会出现 Out
Of Memory 错误。Spark 提供了一个简单的方法可以转换 key 对应的值,这个方法把 reduce 方法过程移交给了 Spark,可以避免出现 OOM 异常。reduceByKey(_ + _).map { case (word,total) =& (word.toUpperCase,total)
}setup() 方法在 MapReduce 里面主要的作用是在 map 方法开始前对输入进行处理,常用的场景是连接数据库,可以在 cleanup() 方法中释放在 setup() 方法里面占用的资源。清单 17. MapReduce 方式public class SetupCleanupMapper extends Mapper&LongWritable,Text,Text,IntWritable& {
private Connection dbC
protected void setup(Context context) {
dbConnection = ...;
protected void cleanup(Context context) {
dbConnection.close();
}在 Spark 里面没有这样的方法。结束语通过本文的学习,读者了解了 MapReduce 和 Spark 之间的差异及切换成本。本文针对的是对 Spark 完全没有了解的用户,后续文章会从实际应用出发,从安装、应用程序的角度给出更加实际的教程。
相关主题参考 首页,了解 IBM 开发者论坛已经收录的 Spark 文章。参考 首页,了解 Spark 原理。&参考书籍《Spark 大数据处理技术》,作者作为 Spark 社区的主要推动者,对于 Spark 技术有深入的介绍。:查找丰富的操作信息、工具和项目更新,帮助您掌握开源技术并将其用于 IBM 产品。
添加或订阅评论,请先或。
有新评论时提醒我
static.content.url=/developerworks/js/artrating/SITE_ID=10Zone=Open sourceArticleID=1006838ArticleTitle=如何将 MapReduce 转化为 Sparkpublish-date=

我要回帖

更多关于 mapreduce和spark区别 的文章

 

随机推荐