spark的主要知识内容

spark 中文文档

中文文档

spark 设计与实现

spark设计与实现

spark常识

0、常用知识点

spark相关的面试题跟答案,带着问题学习效果更佳哟。 ) http://www.aboutyun.com/thread-24246-1-1.html?pilola=38wdy1

https://blog.csdn.net/lxhandlbb/article/details/54599512

1、spark中的RDD是什么,有哪些特性

RDD(Resilient Distributed Dataset)叫做分布式数据集,是Spark中最基本的数据抽象, 它代表一个不可变、可分区、里面的元素可并行计算的集合。

Dataset:就是一个集合,用于存放数据的 Distributed:分布式,可以并行在集群计算 Resilient:表示弹性的弹性表示

1、RDD中的数据可以存储在内存或者是磁盘 2、RDD中的分区是可以改变的 五大特性: A list of partitions 一个分区列表,RDD中的数据都存在一个分区列表里面 A function for computing each split 作用在每一个分区中的函数 A list of dependencies on other RDDs 一个RDD依赖于其他多个RDD,这个点很重要,RDD的容错机制就是依据这个特性而来的 Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) 可选的,针对于kv类型的RDD才具有这个特性,作用是决定了数据的来源以及数据处理后的去向 Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file) 可选项,数据本地性,数据位置最优

1.1 RDD、 DataFrame 和 DataSet有什么联系和区别

http://www.infoq.com/cn/articles/three-apache-spark-apis-rdds-dataframes-and-datasets

通过这篇文章可以知道,在什么情况下你应该选用哪一种以及为什么? 并概述他们的性能和优缺点,列举哪些应该使用DataFrame和DataSet而不是RDD的场景; 这里面会更多得关注DataFrame和DataSet, 因为在2.0中这两种API被整合起来了。

    在什么情况下使用RDD:
  • 你希望可以对你的数据集进行最基本的转换、处理和控制
  • 你的数据是非结构化、比如流媒体或者字符流
  • 你想通过函数式编程而不是特定领域内的表达来处理你的数据
  • 你比希望像进行列式处理一定定义一个模式,通过名字或字段来处理或者访问数据属性
  • 你并不在意通过DataFrame和DataSet进行结构化和半结构化数据处理所能获得的一些优化和性能的好处。

RDD不会降级为二等公民,它可以在三者之间无缝切换。

DataFrame也是数据的一个不可变分布式集合,但与RDD不同的是,数据都被组织到有名字的列中,就像关系型数据库中的表一样; 设计DataFrame的目的就是要让对大兴数据集的处理变得更简单,它让开发者可以为分布式数据集指定一个模式,进行更高层次的抽象, 而不仅限于专业的数据工程师。

你可以把DataFrame当作一些通用对象Dataset[Row]的集合的一个别名,而一行就是一个通用的无类型的JVM对象。 与之形成对比,Dataset就是一些有明确类型定义的JVM对象的集合。

总结: 在什么时候该选用RDD、DataFrame或Dataset看起来好像挺明显。 前者可以提供底层的功能和控制,后者支持定制的视图和结构,可以提供高级和特定领域的操作,节约空间并快速运行。

当我们回顾从早期版本的Spark中获得的经验教训时,我们问自己该如何为开发者简化Spark呢?

该如何优化它,让它性能更高呢?我们决定把底层的RDD API进行高级抽象,成为DataFrame和Dataset,用它们在Catalyst优化器和Tungsten之上构建跨库的一致数据抽象。

DataFrame和Dataset,或RDD API,按你的实际需要和场景选一个来用吧,当你像大多数开发者一样对数据进行结构化或半结构化的处理时,我不会有丝毫惊讶。

2、概述一下spark中的常用算子区别(map、mapPartitions、foreach、foreachPartition)

  • map:用于遍历RDD,将函数f应用于每一个元素,返回新的RDD(transformation算子)。
  • foreach:用于遍历RDD,将函数f应用于每一个元素,无返回值(action算子)。
  • mapPartitions:用于遍历操作RDD中的每一个分区,返回生成一个新的RDD(transformation算子)。
  • foreachPartition: 用于遍历操作RDD中的每一个分区。无返回值(action算子)。

总结:一般使用mapPartitions或者foreachPartition算子比map和foreach更加高效,推荐使用。

3、谈谈spark中的宽窄依赖

RDD和它依赖的父RDD(s)的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency)。

宽依赖

宽依赖:指的是多个子RDD的Partition会依赖同一个父RDD的Partition

窄依赖

窄依赖:指的是每一个父RDD的Partition最多被子RDD的一个Partition使用。

宽依赖和窄依赖的比较

  • 宽依赖往往对应着shuffle操作,需要在运行的过程中将同一个RDD分区传入到不同的RDD分区中,中间可能涉及到多个节点之间数据的传输,而窄依赖的每个父RDD分区通常只会传入到另一个子RDD分区,通常在一个节点内完成。
  • 当RDD分区丢失时,对于窄依赖来说,由于父RDD的一个分区只对应一个子RDD分区,这样只需要重新计算与子RDD分区对应的父RDD分区就行。这个计算对数据的利用是100%的
  • 当RDD分区丢失时,对于宽依赖来说,重算的父RDD分区只有一部分数据是对应丢失的子RDD分区的,另一部分就造成了多余的计算。宽依赖中的子RDD分区通常来自多个父RDD分区,极端情况下,所有父RDD都有可能重新计算。如下图,par4丢失,则需要重新计算par1,par2,par3,产生了冗余数据par5

丢失

宽依赖窄依赖函数

  • 窄依赖的函数有:
  • map, filter, union, join(父RDD是hash-partitioned ), mapPartitions, mapValues 例如map,flatMap,还有filter,不涉及Shuffle操作
  • 宽依赖的函数有:
  • groupByKey, join(父RDD不是hash-partitioned ), partitionBy 宽依赖也对应Shuffle操作,例如reduceBykey和groupByKey等

4、spark中如何划分stage

  • 1.Spark Application中可以因为不同的Action触发众多的job,一个Application中可以有很多的job,每个job是由一个或者多个Stage构成的,后面的Stage依赖于前面的Stage,也就是说只有前面依赖的Stage计算完毕后,后面的Stage才会运行。
  • 2.Stage划分的依据就是宽依赖,何时产生宽依赖,例如reduceByKey,groupByKey的算子,会导致宽依赖的产生。
  • 3.由Action(例如collect)导致了SparkContext.runJob的执行,最终导致了DAGScheduler中的submitJob的执行,其核心是通过发送一个case class JobSubmitted对象给eventProcessLoop。
  • eventProcessLoop是DAGSchedulerEventProcessLoop的具体实例,而DAGSchedulerEventProcessLoop是eventLoop的子类,具体实现EventLoop的onReceive方法,onReceive方法转过来回调doOnReceive
  • 4.在doOnReceive中通过模式匹配的方法把执行路由到
  • 5.在handleJobSubmitted中首先创建finalStage,创建finalStage时候会建立父Stage的依赖链条

总结:以来是从代码的逻辑层面上来展开说的,可以简单点说:写介绍什么是RDD中的宽窄依赖,然后在根据DAG有向无环图进行划分,从当前job的最后一个算子往前推,遇到宽依赖,那么当前在这个批次中的所有算子操作都划分成一个stage,然后继续按照这种方式在继续往前推,如在遇到宽依赖,又划分成一个stage,一直到最前面的一个算子。最后整个job会被划分成多个stage,而stage之间又存在依赖关系,后面的stage依赖于前面的stage。

实例


wordcount

wordcount 对应的stage图

val file = sc.textFile("hdfs://hacluster/aa/hello.txt")
val rdd = file.flatMap(line => line.split(" ")).map(word => (word, 1)).
  reduceByKey(_+_)
rdd.collect()
rdd.foreach(println)


http://www.idataskys.com/2017/08/30/spark%E4%B8%ADjob%E8%BF%90%E8%A1%8C%E8%BF%87%E7%A8%8B%E8%AF%A6%E8%A7%A3%E5%92%8Cjob-stage-task%E8%A7%A3%E6%9E%90/

5、spark-submit的时候如何引入外部jar包

在通过spark-submit提交任务时,可以通过添加配置参数来指定 driver-class-path 外部jar包;jars 外部jar包

6、spark 如何防止内存溢出

  • driver端的内存溢出
  • 可以增大driver的内存参数:spark.driver.memory (default 1g) 这个参数用来设置Driver的内存。 在Spark程序中,SparkContext,DAGScheduler都是运行在Driver端的。 对应rdd的Stage切分也是在Driver端运行,如果用户自己写的程序有过多的步骤,切分出过多的Stage,这部分信息消耗的是Driver的内存,这个时候就需要调大Driver的内存。
  • map过程产生大量对象导致内存溢出
  • 这种溢出的原因是在单个map中产生了大量的对象导致的,例如:rdd.map(x=>for(i <- 1 to 10000) yield i.toString), 这个操作在rdd中,每个对象都产生了10000个对象,这肯定很容易产生内存溢出的问题。 针对这种问题,在不增加内存的情况下,可以通过减少每个Task的大小,以便达到每个Task即使产生大量的对象Executor的内存也能够装得下。 具体做法可以在会产生大量对象的map操作之前调用repartition方法,分区成更小的块传入map。例如:rdd.repartition(10000).map(x=>for(i <- 1 to 10000) yield i.toString)。 面对这种问题注意,不能使用rdd.coalesce方法,这个方法只能减少分区,不能增加分区,不会有shuffle的过程。
  • 数据不平衡导致内存溢出
  • 数据不平衡除了有可能导致内存溢出外,也有可能导致性能的问题,解决方法和上面说的类似,就是调用repartition重新分区。这里就不再累赘了。
  • shuffle后内存溢出
  • shuffle内存溢出的情况可以说都是shuffle后,单个文件过大导致的。 在Spark中,join,reduceByKey这一类型的过程,都会有shuffle的过程,在shuffle的使用,需要传入一个partitioner,大部分Spark中的shuffle操作,默认的partitioner都是HashPatitioner,默认值是父RDD中最大的分区数,这个参数通过spark.default.parallelism控制(在spark-sql中用spark.sql.shuffle.partitions) , spark.default.parallelism参数只对HashPartitioner有效,所以如果是别的Partitioner或者自己实现的Partitioner就不能使用spark.default.parallelism这个参数来控制shuffle的并发量了。如果是别的partitioner导致的shuffle内存溢出,就需要从partitioner的代码增加partitions的数量。 standalone模式下资源分配不均匀导致内存溢出

在standalone的模式下如果配置了total-executor-cores 和 executor-memory 这两个参数,但是没有配置executor-cores这个参数的话,就有可能导致,每个Executor的memory是一样的,但是cores的数量不同,那么在cores数量多的Executor中,由于能够同时执行多个Task,就容易导致内存溢出的情况。这种情况的解决方法就是同时配置–executor-cores或者spark.executor.cores参数,确保Executor资源分配均匀。 使用rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)代替rdd.cache()

rdd.cache()和rdd.persist(Storage.MEMORY_ONLY)是等价的,在内存不足的时候rdd.cache()的数据会丢失,再次使用的时候会重算, 而rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)在内存不足的时候会存储在磁盘,避免重算,只是消耗点IO时间。

7、spark中cache和persist的区别

cache:缓存数据,默认是缓存在内存中,其本质还是调用persist persist:缓存数据,有丰富的数据缓存策略。 数据可以保存在内存也可以保存在磁盘中,使用的时候指定对应的缓存级别就可以了。

8、简要描述Spark分布式集群搭建的步骤

地球人都知道 这里可以概述下如何搭建高可用的spark集群(HA) 主要是引入了zookeeper

9、spark中的数据倾斜的现象、原因、后果

  • (1)、数据倾斜的现象
  • 多数task执行速度较快,少数task执行时间非常长,或者等待很长时间后提示你内存不足,执行失败。
  • (2)、数据倾斜的原因
  • 数据问题 1、key本身分布不均衡(包括大量的key为空) 2、key的设置不合理 spark使用问题 1、shuffle时的并发度不够 2、计算方式有误
  • (3)、数据倾斜的后果
  • 1、spark中的stage的执行时间受限于最后那个执行完成的task,因此运行缓慢的任务会拖垮整个程序的运行速度(分布式程序运行的速度是由最慢的那个task决定的)。 2、过多的数据在同一个task中运行,将会把executor撑爆。

10、如何解决spark中的数据倾斜问题

发现数据倾斜的时候,不要急于提高executor的资源,修改参数或是修改程序,首先要检查数据本身,是否存在异常数据。

1、数据问题造成的数据倾斜

找出异常的key 如果任务长时间卡在最后最后1个(几个)任务,首先要对key进行抽样分析,判断是哪些key造成的。 选取key,对数据进行抽样,统计出现的次数,根据出现次数大小排序取出前几个。 比如:

df.select(“key”).sample(false,0.1).(k=>(k,1)).reduceBykey(+).map(k=>(k._2,k._1)).sortByKey(false).take(10)
    如果发现多数数据分布都较为平均,而个别数据比其他数据大上若干个数量级,则说明发生了数据倾斜。 经过分析,倾斜的数据主要有以下三种情况:
  • 1、null(空值)或是一些无意义的信息()之类的,大多是这个原因引起。
  • 2、无效数据,大量重复的测试数据或是对结果影响不大的有效数据。
  • 3、有效数据,业务导致的正常数据分布。 解决办法
  • 第1,2种情况,直接对数据进行过滤即可(因为该数据对当前业务不会产生影响)。
    第3种情况则需要进行一些特殊操作,常见的有以下几种做法
  • (1) 隔离执行,将异常的key过滤出来单独处理,最后与正常数据的处理结果进行union操作。
  • (2) 对key先添加随机值,进行操作后,去掉随机值,再进行一次操作。
  • (3) 使用reduceByKey 代替 groupByKey(reduceByKey用于对每个key对应的多个value进行merge操作,最重要的是它能够在本地先进行merge操作,并且merge操作可以通过函数自定义.)
  • (4) 使用map join。 案例

如果使用reduceByKey因为数据倾斜造成运行失败的问题。具体操作流程如下:

2、spark使用不当造成的数据倾斜

  • (1) 将原始的 key 转化为 key + 随机值(例如Random.nextInt)
  • (2) 对数据进行 reduceByKey(func)
  • (3) 将 key + 随机值 转成 key
  • (4) 再对数据进行 reduceByKey(func) 案例操作流程分析:
  • 假设说有倾斜的Key,我们给所有的Key加上一个随机数,然后进行reduceByKey操作;此时同一个Key会有不同的随机数前缀,在进行reduceByKey操作的时候原来的一个非常大的倾斜的Key就分而治之变成若干个更小的Key,不过此时结果和原来不一样,怎么破?进行map操作,目的是把随机数前缀去掉,然后再次进行reduceByKey操作。(当然,如果你很无聊,可以再次做随机数前缀),这样我们就可以把原本倾斜的Key通过分而治之方案分散开来,最后又进行了全局聚合 注意1: 如果此时依旧存在问题,建议筛选出倾斜的数据单独处理。最后将这份数据与正常的数据进行union即可。 注意2: 单独处理异常数据时,可以配合使用Map Join解决。

提高shuffle并行度

dataFrame和sparkSql可以设置spark.sql.shuffle.partitions参数控制shuffle的并发度,默认为200。 rdd操作可以设置spark.default.parallelism控制并发度,默认参数由不同的Cluster Manager控制。 局限性: 只是让每个task执行更少的不同的key。无法解决个别key特别大的情况造成的倾斜,如果某些key的大小非常大,即使一个task单独执行它,也会受到数据倾斜的困扰。

使用map join 代替reduce join

11、flume整合sparkStreaming问题

在小表不是特别大(取决于你的executor大小)的情况下使用,可以使程序避免shuffle的过程,自然也就没有数据倾斜的困扰了. 局限性: 因为是先将小数据发送到每个executor上,所以数据量不能太大。 (1)、如何实现sparkStreaming读取flume中的数据 可以这样说: 前期经过技术调研,查看官网相关资料,发现sparkStreaming整合flume有2种模式,一种是拉模式,一种是推模式,然后在简单的聊聊这2种模式的特点,以及如何部署实现,需要做哪些事情,最后对比两种模式的特点,选择那种模式更好。 推模式:Flume将数据Push推给Spark Streaming 拉模式:Spark Streaming从flume 中Poll拉取数据 (2)、在实际开发的时候是如何保证数据不丢失的

可以这样说: flume那边采用的channel是将数据落地到磁盘中,保证数据源端安全性(可以在补充一下,flume在这里的channel可以设置为memory内存中,提高数据接收处理的效率,但是由于数据在内存中,安全机制保证不了,故选择channel为磁盘存储。整个流程运行有一点的延迟性) sparkStreaming通过拉模式整合的时候,使用了FlumeUtils这样一个类,该类是需要依赖一个额外的jar包(spark-streaming-flume_2.10) 要想保证数据不丢失,数据的准确性,可以在构建StreamingConext的时候,利用StreamingContext.getOrCreate(checkpoint, creatingFunc: () => StreamingContext)来创建一个StreamingContext,使用StreamingContext.getOrCreate来创建StreamingContext对象,传入的第一个参数是checkpoint的存放目录,第二参数是生成StreamingContext对象的用户自定义函数。如果checkpoint的存放目录存在,则从这个目录中生成StreamingContext对象;如果不存在,才会调用第二个函数来生成新的StreamingContext对象。在creatingFunc函数中,除了生成一个新的StreamingContext操作,还需要完成各种操作,然后调用ssc.checkpoint(checkpointDirectory)来初始化checkpoint功能,最后再返回StreamingContext对象。 这样,在StreamingContext.getOrCreate之后,就可以直接调用start()函数来启动(或者是从中断点继续运行)流式应用了。如果有其他在启动或继续运行都要做的工作,可以在start()调用前执行。 流失计算中使用checkpoint的作用: 保存元数据,包括流式应用的配置、流式没崩溃之前定义的各种操作、未完成所有操作的batch。元数据被存储到容忍失败的存储系统上,如HDFS。这种ckeckpoint主要针对driver失败后的修复。 保存流式数据,也是存储到容忍失败的存储系统上,如HDFS。这种ckeckpoint主要针对window operation、有状态的操作。无论是driver失败了,还是worker失败了,这种checkpoint都够快速恢复,而不需要将很长的历史数据都重新计算一遍(以便得到当前的状态)。 设置流式数据checkpoint的周期 对于一个需要做checkpoint的DStream结构,可以通过调用DStream.checkpoint(checkpointInterval)来设置ckeckpoint的周期,经验上一般将这个checkpoint周期设置成batch周期的5至10倍。 使用write ahead logs功能 这是一个可选功能,建议加上。这个功能将使得输入数据写入之前配置的checkpoint目录。这样有状态的数据可以从上一个checkpoint开始计算。开启的方法是把spark.streaming.receiver.writeAheadLogs.enable这个property设置为true。另外,由于输入RDD的默认StorageLevel是MEMORY_AND_DISK_2,即数据会在两台worker上做replication。实际上,Spark Streaming模式下,任何从网络输入数据的Receiver(如kafka、flume、socket)都会在两台机器上做数据备份。如果开启了write ahead logs的功能,建议把StorageLevel改成MEMORY_AND_DISK_SER。修改的方法是,在创建RDD时由参数传入。 使用以上的checkpoint机制,确实可以保证数据0丢失。但是一个前提条件是,数据发送端必须要有缓存功能,这样才能保证在spark应用重启期间,数据发送端不会因为spark streaming服务不可用而把数据丢弃。而flume具备这种特性,同样kafka也具备。 (3)Spark Streaming的数据可靠性

12、kafka整合sparkStreaming问题

(1)、如何实现sparkStreaming读取kafka中的数据

有了checkpoint机制、write ahead log机制、Receiver缓存机器、可靠的Receiver(即数据接收并备份成功后会发送ack),可以保证无论是worker失效还是driver失效,都是数据0丢失。原因是:如果没有Receiver服务的worker失效了,RDD数据可以依赖血统来重新计算;如果Receiver所在worker失败了,由于Reciever是可靠的,并有write ahead log机制,则收到的数据可以保证不丢;如果driver失败了,可以从checkpoint中恢复数据重新构建。

    可以这样说:在kafka0.10版本之前有二种方式与sparkStreaming整合,一种是基于receiver,一种是direct,然后分别阐述这2种方式分别是什么
  • receiver:
  • 是采用了kafka高级api,利用receiver接收器来接受kafka topic中的数据,从kafka接收来的数据会存储在spark的executor中, 之后spark streaming提交的job会处理这些数据,kafka中topic的偏移量是保存在zk中的。 基本使用:
val kafkaStream = KafkaUtils.createStream(streamingContext,[ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume]) 

还有几个需要注意的点: 在Receiver的方式中,Spark中的partition和kafka中的partition并不是相关的, 所以如果我们加大每个topic的partition数量, 仅仅是增加线程来处理由单一Receiver消费的主题。

但是这并没有增加Spark在处理数据上的并行度. 对于不同的Group和topic我们可以使用多个Receiver创建不同的Dstream来并行接收数据, 之后可以利用union来统一成一个Dstream。 在默认配置下,这种方式可能会因为底层的失败而丢失数据. 因为receiver一直在接收数据,在其已经通知zookeeper数据接收完成但是还没有处理的时候,executor突然挂掉(或是driver挂掉通知executor关闭), 缓存在其中的数据就会丢失. 如果希望做到高可靠, 让数据零丢失, 如果我们启用了Write Ahead Logs(spark.streaming.receiver.writeAheadLog.enable=true) 该机制会同步地将接收到的Kafka数据写入分布式文件系统(比如HDFS)上的预写日志中. 所以, 即使底层节点出现了失败, 也可以使用预写日志中的数据进行恢复. 复制到文件系统如HDFS,那么storage level需要设置成 StorageLevel.MEMORY_AND_DISK_SER, 也就是KafkaUtils.createStream(…, StorageLevel.MEMORY_AND_DISK_SER)

  • direct:
  • 在spark1.3之后,引入了Direct方式。不同于Receiver的方式,Direct方式没有receiver这一层, 其会周期性的获取Kafka中每个topic的每个partition中的最新offsets,之后根据设定的maxRatePerPartition来处理每个batch。 (设置spark.streaming.kafka.maxRatePerPartition=10000。限制每秒钟从topic的每个partition最多消费的消息条数)。

(2) 对比这2中方式的优缺点:

采用receiver方式:这种方式可以保证数据不丢失,但是无法保证数据只被处理一次, WAL实现的是At-least-once语义(至少被处理一次),如果在写入到外部存储的数据还没有将offset更新到zookeeper就挂掉, 这些数据将会被反复消费. 同时,降低了程序的吞吐量。 采用direct方式:相比Receiver模式而言能够确保机制更加健壮. 区别于使用Receiver来被动接收数据, Direct模式会周期性地主动查询Kafka, 来获得每个topic+partition的最新的offset, 从而定义每个batch的offset的范围. 当处理数据的job启动时, 就会使用Kafka的简单consumer api来获取Kafka指定offset范围的数据。

    优点:
  • 1、简化并行读取
  • 如果要读取多个partition, 不需要创建多个输入DStream然后对它们进行union操作. Spark会创建跟Kafka partition一样多的RDD partition, 并且会并行从Kafka中读取数据. 所以在Kafka partition和RDD partition之间, 有一个一对一的映射关系.
  • 2、高性能
  • 如果要保证零数据丢失, 在基于receiver的方式中, 需要开启WAL机制. 这种方式其实效率低下, 因为数据实际上被复制了两份, Kafka自己本身就有高可靠的机制, 会对数据复制一份, 而这里又会复制一份到WAL中. 而基于direct的方式, 不依赖Receiver, 不需要开启WAL机制, 只要Kafka中作了数据的复制, 那么就可以通过Kafka的副本进行恢复.
  • 3、一次且仅一次的事务机制
  • 基于receiver的方式, 是使用Kafka的高阶API来在ZooKeeper中保存消费过的offset的. 这是消费Kafka数据的传统方式. 这种方式配合着WAL机制可以保证数据零丢失的高可靠性, 但是却无法保证数据被处理一次且仅一次, 可能会处理两次. 因为Spark和ZooKeeper之间可能是不同步的. 基于direct的方式, 使用kafka的简单api, Spark Streaming自己就负责追踪消费的offset, 并保存在checkpoint中. Spark自己一定是同步的, 因此可以保证数据是消费一次且仅消费一次。不过需要自己完成将offset写入zk的过程,在官方文档中都有相应介绍.

简单代码实例:

 messages.foreachRDD(rdd=>{
val message = rdd.map(_._2)//对数据进行一些操作
message.map(method)//更新zk上的offset (自己实现)
updateZKOffsets(rdd)
})

sparkStreaming程序自己消费完成后,自己主动去更新zk上面的偏移量。 也可以将zk中的偏移量保存在mysql或者redis数据库中,下次重启的时候,直接读取mysql或者redis中的偏移量,获取到上次消费的偏移量,接着读取数据。

(3) spark整合不同版本spark的方式

http://spark.apache.org/docs/2.1.0/streaming-programming-guide.html

In Spark 1.3, we have introduced a new Kafka Direct API, which can ensure that all the Kafka data is received by Spark Streaming exactly once. Along with this, if you implement exactly-once output operation, you can achieve end-to-end exactly-once guarantees. This approach is further discussed in the Kafka Integration Guide.

资料

  • 英文的整合资料

http://spark.apache.org/docs/2.1.0/streaming-kafka-0-10-integration.html

  • 对应中文的资料

http://www.aboutyun.com/thread-24226-1-1.html

https://blog.csdn.net/V_Gbird/article/details/80457064

13、利用scala语言实现排序

(1)冒泡排序:

package cn.itcast.sort 
//冒泡排序 
class BubbleSort { 
  def main(args: Array[String]): Unit = { 
val list = List(3, 12, 43, 23, 7, 1, 2, 0) 
println(sort(list)) 
} 
//定义一个方法,传入的参数是要进行排序的List集合,输出的是排序后的List集合 
def sort(list: List[Int]): List[Int] = list match {
 case List() => List() case head :: tail => compute(head, sort(tail)) 
}

def compute(data: Int, dataSet: List[Int]): List[Int] = dataSet match { 
case List() => List(data) case head :: tail => if (data <= head) data :: dataSet else * head :: compute(data, tail) 
  } 
}

(2) 快读排序

package cn.itcast.sort //快速排序 object QuickSort { def main(args: Array[String]): Unit = { val list = List(3, 12, 43, 23, 7, 1, 2, 0) println(quickSort(list))
} //定义一个方法,传入的参数是要进行排序的List集合,输出的是排序后的List集合 def quickSort(list: List[Int]): List[Int] = { //对输入参数list进行模式匹配 list match { //如果是空,返回nil case Nil => Nil case List() => List() //不为空从list中提取出首元素和剩余元素组成的列表分别到head和tail中 case head :: tail => //对剩余元素列表调用partition方法,这个方法会将列表分为两部分。 // 划分依据接受的参数,这个参数是一个函数(这里是(_ < x))。 // partition方法会对每个元素调用这个函数,根据返回的true,false分成两部分。 // 这里&rsquo;_ < x&rsquo;是一个匿名函数(又称lambda),&rsquo;_&rsquo;关键字是函数输入参数的占位符, // 输入参数这里是列表中的每个元素。 val (left, right) = tail.partition(_ < head) //最后对划分好的两部分递归调用quickSort //其中head::quickSort(right) 这里::是List定义的一个方法,用于将两部分合成一个列表 quickSort(left) ++ (head :: quickSort(right)) } } }

14、spark高可用配置

https://andone1cc.github.io/2017/03/02/Spark/spark%E9%AB%98%E5%8F%AF%E7%94%A8/

15. Spark为什么比mapreduce快?

1)基于内存计算,减少低效的磁盘交互;2)高效的调度算法,基于DAG;3)容错机制Linage,精华部分就是DAG和Lingae

Spark多个任务之间数据通信是基于内存,而Hadoop是基于磁盘

Spark并不是基于内存的技术,而是使用了缓存机制的技术。

Spark只有在shuffle的时候将数据写入磁盘, 而Hadoop中多个MR作业之间的数据交互都要依赖于磁盘交互。

典型的MR工作流是由很多MR作业组成的,他们之间的数据交互需要把数据持久化到磁盘才可以; 而Spark支持DAG以及pipelining,在没有遇到shuffle完全可以不把数据缓存到磁盘。

16、Spark DAG详解

val data1 = Array[(Int, Char)](
    (1, 'a'), (2, 'b'),
    (3, 'c'), (4, 'd'),
    (5, 'e'), (3, 'f'),
    (2, 'g'), (1, 'h'))
val rangePairs1 = sc.parallelize(data1, 3)

val hashPairs1 = rangePairs1.partitionBy(new HashPartitioner(3))

val data2 = Array[(Int, String)]((1, "A"), (2, "B"),(3, "C"), (4, "D"))

val pairs2 = sc.parallelize(data2, 2)
val rangePairs2 = pairs2.map(x => (x._1, x._2.charAt(0)))

val data3 = Array[(Int, Char)]((1, 'X'), (2, 'Y'))
val rangePairs3 = sc.parallelize(data3, 2)

val rangePairs = rangePairs2.union(rangePairs3)
val result = hashPairs1.join(rangePairs)

DAG

17、Spark Lineage(血统)

http://houzhicheng.com/blog/spark/2015/05/23/spark-DAG.html

利用内存加快数据加载,在众多的其它的In-Memory类数据库或Cache类系统中也有实现, Spark的主要区别在于它处理分布式运算环境下的数据容错性(节点实效/数据丢失)问题时采用的方案。 为了保证RDD中数据的鲁棒性,RDD数据集通过所谓的血统关系(Lineage)记住了它是如何从其它RDD中演变过来的。 相比其它系统的细颗粒度的内存数据更新级别的备份或者LOG机制, RDD的Lineage记录的是粗颗粒度的特定数据转换(Transformation)操作(filter, map, join etc.)行为。 当这个RDD的部分分区数据丢失时,它可以通过Lineage获取足够的信息来重新运算和恢复丢失的数据分区。 这种粗颗粒的数据模型,限制了Spark的运用场合,但同时相比细颗粒度的数据模型,也带来了性能的提升。 https://blog.csdn.net/u013063153/article/details/73865123

18、Spark有哪几种算子?

1)transformation,rdd由一种转为另一种rdd 2)action, 3)cronroller,crontroller是控制算子,cache,persist,对性能和效率的有很好的支持

https://blog.csdn.net/silentwolfyh/article/details/72625676

19、Spark使用parquet文件存储格式能带来哪些好处?

1) 如果说HDFS 是大数据时代分布式文件系统首选标准,那么parquet则是整个大数据时代文件存储格式实时首选标准 2) 速度更快:从使用spark sql操作普通文件CSV和parquet文件速度对比上看,绝大多数情况 会比使用csv等普通文件速度提升10倍左右,在一些普通文件系统无法在spark上成功运行的情况 下,使用parquet很多时候可以成功运行 3) parquet的压缩技术非常稳定出色,在spark sql中对压缩技术的处理可能无法正常的完成工作 (例如会导致lost task,lost executor)但是此时如果使用parquet就可以正常的完成 4) 极大的减少磁盘I/o,通常情况下能够减少75%的存储空间,由此可以极大的减少spark sql处理 数据的时候的数据输入内容,尤其是在spark1.6x中有个下推过滤器在一些情况下可以极大的 减少磁盘的IO和内存的占用,(下推过滤器) 5) spark 1.6x parquet方式极大的提升了扫描的吞吐量,极大提高了数据的查找速度spark1.6和spark1.5x相比而言,提升了大约1倍的速度,在spark1.6X中,操作parquet时候cpu也进行了极大的优化,有效的降低了cpu 6) 采用parquet可以极大的优化spark的调度和执行。我们测试spark如果用parquet可以有效的减少stage的执行消耗,同时可以优化执行路径 2.

20、map与flatMap的区别

map:对RDD每个元素转换,文件中的每一行数据返回一个数组对象 flatMap:对RDD每个元素转换,然后再扁平化 将所有的对象合并为一个对象,文件中的所有行数据仅返回一个数组 对象,会抛弃值为null的值

21、头条的TOP-N算法

https://blog.csdn.net/bntX2jSQfEHy7/article/details/80276225

数据量小-redis的有序集合

数据量大-分布式

TODO 22、用Spark解决一些经典MapReduce问题

https://segmentfault.com/a/1190000007649903 http://www.cnblogs.com/yurunmiao/p/4898672.html

环境搭建-maven

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.sudy</groupId>
    <artifactId>SparkStudy</artifactId>
    <version>1.0-SNAPSHOT</version>


    <properties>
        <spark.version>2.2.0</spark.version>
        <scala.version>2.11</scala.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>


    </dependencies>


    <build>
        <plugins>
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.6.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>


            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <configuration>
                    <skip>true</skip>
                </configuration>
            </plugin>

        </plugins>
    </build>


</project>

为什么选择spark 2.3.0

gbdt模型中解决了下面这个问题

GradientBoostedTreesModel doesn't have featureSubsetStrategy parameter

Building a Unified Data Pipeline with Apache Spark and XGBoost

Building a Unified Data Pipeline with Apache Spark and XGBoost

XGBoost4J-Spark Tutorial

XGBoost4J-Spark Tutorial

Note: XGBoost4J-Spark requires Spark 2.3+

XGBoost4J-Spark now requires Spark 2.3+. Latest versions of XGBoost4J-Spark uses facilities of org.apache.spark.ml.param.shared extensively to provide for a tight integration with Spark MLLIB framework, and these facilities are not fully available on earlier versions of Spark.

问题

程序在本地跑的时候,出现卡在中间的情况: https://github.com/dmlc/xgboost/issues/3615

https://github.com/dmlc/xgboost/issues/3470

出现这种问题的主要原因是,xgboost写的num_excutor的数量要和spark配置的excotor的数量一致

示例代码

https://github.com/dmlc/xgboost/blob/master/jvm-packages/xgboost4j-example/src/main/scala/ml/dmlc/xgboost4j/scala/example/spark/SparkTraining.scala

当Spark遇上TensorFlow分布式深度学习框架原理和实践

https://www.cnblogs.com/lsyz/p/8859101.html

Spark ML函数VectorAssembler

https://blog.csdn.net/lichao_ustc/article/details/52688127

import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.Vectors

val dataset = spark.createDataFrame(
  Seq((0, 18, 1.0, Vectors.dense(0.0, 10.0, 0.5), 1.0))
).toDF("id", "hour", "mobile", "userFeatures", "clicked")

val assembler = new VectorAssembler()
  .setInputCols(Array("hour", "mobile", "userFeatures"))
  .setOutputCol("features")

val output = assembler.transform(dataset)
println(output.select("features", "clicked").first())

详细探究Spark的shuffle实现

http://jerryshao.me/2014/01/04/spark-shuffle-detail-investigation/

参考资料

https://www.cnblogs.com/purstar/p/8043631.html