spark初探

Posted by yan on February 19, 2019

概述

  spark是一个实现快速通用的集群计算平台。它是由加州大学伯克利分校AMP实验室 开发的通用内存并行计算框架,用来构建大型的、低延迟的数据分析应用程序。它扩展了广泛使用的MapReduce计算模型。高效的支撑更多计算模式,包括交互式查询和流处理。spark的一个主要特点是能够在内存中进行计算,及时依赖磁盘进行复杂的运算,Spark依然比MapReduce更加高效。

运行模式

  1. local 本地模式,开发调试
  2. standalone Spark自己可以给自己分配资源(master/slave架构)
  3. apache Mesos 集成Mesos资源管理框架,spark只负责计算
  4. hadoop YARN 集成yarn资源管理器,spark只负责任务调度和计算

RDD介绍

  每个 Spark 应用程序都由一个驱动程序(driver programe)构成,驱动程序在集群上运行用户的 main 函数来执行各种各样的并行操作(parallel operations)。Spark 的主要抽象是提供一个弹性分布式数据集(RDD resilient distributed dataset),RDD 是指能横跨集群所有节点进行并行计算的分区元素集合。
  RDD 可以从 Hadoop 文件系统中的一个文件中创建而来(或其他 Hadoop 支持的文件系统),或者从一个已有的 Scala 集合转换得到。用户可以要求 Spark 将 RDD 持久化(persist)到内存中,来让它在并行计算中高效地重用。最后,RDD 能从节点失败中自动地恢复过来。

开始spark

  Spark 编程的第一步是需要创建一个 SparkContext 对象,用来告诉 Spark 如何访问集群。在创建 SparkContext 之前,你需要构建一个 SparkConf 对象, SparkConf 对象包含了一些你应用程序的信息。

1
2
val conf = new SparkConf().setAppName(appName).setMaster(master)
new SparkContext(conf)

  appName 参数是你程序的名字,它会显示在 cluster UI 上。master 是 Spark, Mesos 或 YARN 集群的 URL,或运行在本地模式时,使用专用字符串 “local”。在实践中,当应用程序运行在一个集群上时,你并不想要把 master 硬编码到你的程序中,你可以用 spark-submit 启动你的应用程序的时候传递它。然而,你可以在本地测试和单元测试中使用 “local” 运行 Spark 进程。

RDD创建

  Spark 核心的概念是 Resilient Distributed Dataset (RDD):一个可并行操作的有容错机制的数据集合。有 2 种方式创建 RDDs:第一种是在你的驱动程序中并行化一个已经存在的集合;另外一种是引用一个外部存储系统的数据集,例如共享的文件系统,HDFS,HBase或其他 Hadoop 数据格式的数据源。

  1. 并行集合
      并行集合 (Parallelized collections) 的创建是通过在一个已有的集合(Scala Seq)上调用 SparkContext 的 parallelize 方法实现的。集合中的元素被复制到一个可并行操作的分布式数据集中。例如,这里演示了如何在一个包含 1 到 5 的数组中创建并行集合:
    1
    2
    
    val data = Array(1, 2, 3, 4, 5)
    val distData = sc.parallelize(data)
    

      一旦创建完成,这个分布式数据集(distData)就可以被并行操作。例如,我们可以调用 distData.reduce((a, b) => a + b) 将这个数组中的元素相加。我们以后再描述在分布式上的一些操作。
      并行集合一个很重要的参数是切片数(slices),表示一个数据集切分的份数。Spark 会在集群上为每一个切片运行一个任务。你可以在集群上为每个 CPU 设置 2-4 个切片(slices)。正常情况下,Spark 会试着基于你的集群状况自动地设置切片的数目。然而,你也可以通过 parallelize 的第二个参数手动地设置(例如:sc.parallelize(data, 10))。

  2. 外部数据集
      Spark 可以从任何一个 Hadoop 支持的存储源创建分布式数据集,包括你的本地文件系统,HDFS,Cassandra,HBase,Amazon S3等。 Spark 支持文本文件(text files),SequenceFiles 和其他 Hadoop InputFormat。
      文本文件 RDDs 可以使用 SparkContext 的 textFile 方法创建。 在这个方法里传入文件的 URI (机器上的本地路径或 hdfs://,s3n:// 等),然后它会将文件读取成一个行集合。这里是一个调用例子:
    1
    2
    
    scala> val distFile = sc.textFile("data.txt")
    distFile: RDD[String] = MappedRDD@1d4cee08
    

      一旦创建完成,distFiile 就能做数据集操作。例如,我们可以用下面的方式使用 map 和 reduce 操作将所有行的长度相加:distFile.map(s => s.length).reduce((a, b) => a + b)。

RDD操作

  RDDs 支持 2 种类型的操作:转换(transformations) 从已经存在的数据集中创建一个新的数据集;动作(actions) 在数据集上进行计算之后返回一个值到驱动程序。例如,map 是一个转换操作,它将每一个数据集元素传递给一个函数并且返回一个新的 RDD。另一方面,reduce 是一个动作,它使用相同的函数来聚合 RDD 的所有元素,并且将最终的结果返回到驱动程序(不过也有一个并行 reduceByKey 能返回一个分布式数据集)。
  在 Spark 中,所有的转换(transformations)都是惰性(lazy)的,它们不会马上计算它们的结果。相反的,它们仅仅记录转换操作是应用到哪些基础数据集(例如一个文件)上的。转换仅仅在这个时候计算:当动作(action) 需要一个结果返回给驱动程序的时候。这个设计能够让 Spark 运行得更加高效。例如,我们可以实现:通过 map 创建一个新数据集在 reduce 中使用,并且仅仅返回 reduce 的结果给 driver,而不是整个大的映射过的数据集。
  默认情况下,每一个转换过的 RDD 会在每次执行动作(action)的时候重新计算一次。然而,你也可以使用 persist (或 cache)方法持久化(persist)一个 RDD 到内存中。在这个情况下,Spark 会在集群上保存相关的元素,在你下次查询的时候会变得更快。在这里也同样支持持久化 RDD 到磁盘,或在多个节点间复制。
  为了说明 RDD 基本知识,考虑下面的简单程序:

1
2
3
val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)

  第一行是定义来自于外部文件的 RDD。这个数据集并没有加载到内存或做其他的操作:lines 仅仅是一个指向文件的指针。第二行是定义 lineLengths,它是 map 转换(transformation)的结果。同样,lineLengths 由于懒惰模式也没有立即计算。最后,我们执行 reduce,它是一个动作(action)。在这个地方,Spark 把计算分成多个任务(task),并且让它们运行在多个机器上。每台机器都运行自己的 map 部分和本地 reduce 部分。然后仅仅将结果返回给驱动程序。
  如果我们想要再次使用 lineLengths,我们可以添加: lineLengths.persist()
在 reduce 之前,它会导致 lineLengths 在第一次计算完成之后保存到内存中。

常见的转换操作

  • filter(func):筛选出满足函数func的元素,并返回一个新的数据集
  • map(func):将每个元素传递到函数func中,并将结果返回为一个新的数据集
  • flatMap(func):与map()相似,但每个输入元素都可以映射到0或多个输出结果
  • groupByKey():应用于(K,V)键值对的数据集时,返回一个新的(K, Iterable)形式的数据集
  • reduceByKey(func):应用于(K,V)键值对的数据集时,返回一个新的(K, V)形式的数据集,其中的每个值是将每个key传递到函数func中进行聚合

常见的动作操作

  • count() 返回数据集中的元素个数
  • collect() 以数组的形式返回数据集中的所有元素
  • first() 返回数据集中的第一个元素
  • take(n) 以数组的形式返回数据集中的前n个元素
  • reduce(func) 通过函数func(输入两个参数并返回一个值)聚合数据集中的元素
  • foreach(func) 将数据集中的每个元素传递到函数func中运行*

RDD持久化

  Spark最重要的一个功能是它可以通过各种操作(operations)持久化(或者缓存)一个集合到内存中。当你持久化一个RDD的时候,每一个节点都将参与计算的所有分区数据存储到内存中,并且这些 数据可以被这个集合(以及这个集合衍生的其他集合)的动作(action)重复利用。这个能力使后续的动作速度更快(通常快10倍以上)。对应迭代算法和快速的交互使用来说,缓存是一个关键的工具。
  你能通过persist()或者cache()方法持久化一个rdd。首先,在action中计算得到rdd;然后,将其保存在每个节点的内存中。Spark的缓存是一个容错的技术-如果RDD的任何一个分区丢失,它 可以通过原有的转换(transformations)操作自动的重复计算并且创建出这个分区。
  Spark自动的监控每个节点缓存的使用情况,利用最近最少使用原则删除老旧的数据。如果你想手动的删除RDD,可以使用RDD.unpersist()方法