Spark面试题
1. Spark Core & SQL
1.1Spark 有几种部署方式?
(1)Local:运行在一台机器上,通常是练练手或者测试环境。
(2)Standalone:构建基于Master+Slaves 的资源调度集群,spark 任务提交给 Master运行,是 Spark 自身的一个调度系统。
(3)Yarn: Spark 客户端直接链接 Yarn,不需要额外构建 Spark 集群。有yarn-client 和 yarn-cluster 两种模式, 主要区别在于 Driver 程序的运行节点。
(4)Mesos:国内大环境比较少用。
1.2 如何理解Spark 中的血统概念( RDD )
RDD 在 LIneage 依赖方面分为两种 Narrow Dependenciss 与 Wide Dependenciss 用来解决数据容错的高小性以及划分任务时候起到重要作用。
1.3 简述 Spark 的宽窄依赖,以及 Spark 如何划分 stage 又根据什么决定 task 个数?
Stage:根据 RDD 之间的依赖关系的不同将 Job 划分成不同的 Stage ,遇到一个依赖则划分一个 Stsge。
Task:Stage 是一个 TaskSet,将 Stage 根据分区划分成一个个的 Stage。
1.3 如何使用Spark实现TopN的获取
方法1:
(1)按照key对数据进行聚合(groupByKey)
(2)将value转换为数组,利用scala的sortBy或者sortWith进行排序(mapValues)数据量太大,会OOM。
方法2:
(1)取出所有的key
(2)对key进行迭代,每次取出一个key利用spark的排序算子进行排序
方法3:
(1)自定义分区器,按照key进行分区,使不同的key进到不同的分区
(2)对每个分区运用spark的排序算子进行排序
1.4 列举Spark的transformation算子(不少于8个),并简述功能
(1)map(func):返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成(每条数据执行一次)。
(2)map Partition( func ):类似与 map ,但独立地在 RDD 的没一个分片上运行,因此在类型为T的RD上运行时,func的函数类型必须是Iterator[T] => Iterator[U](每个分区执行一次)。
(3)mapPartitionsWithIndex:类似于mapPartitions,比mapPartitions多一个参数来表示分区号
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator"))
val rdd = sc.makeRDD(
List(1, 2, 3, 4), 2
)
val mapRDD = rdd.mapPartitionsWithIndex((index, datas) =>{
index match {
case 1 => datas.map(_ * 2)
case _ => datas
}
})
mapRDD.collect().foreach(println)
sc.stop()
(4)reduceByKey ;在一个(K,V)的 RDD 上调用,返回一个(K,V)的RDD,使用定的 reduce 函数,将相同 key 的值聚合到一起,reduce 任务的个数可以通过第二个可选参数来设置。
(5)flatMap:将处理的数据进行扁平化会再进行映射处理,使用算子也称为映射。返回一个可迭代的集合。
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator"))
val rdd = sc.makeRDD(
List(List(1, 2), List(3, 4))
)
val fmRDD = rdd.flatMap(
list => {
list
}
)
fmRDD.collect().foreach(println)
sc.stop()
当集合中的数据类型不同时,可以使用match case进行模式匹配,转换成集合类型。
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator"))
val rdd = sc.makeRDD(
List(List(1, 2),3, List(3, 4))
)
val fmRDD = rdd.flatMap {
case list: List[_] => list
case d => List(d)
}
fmRDD.collect().foreach(println)
sc.stop()
(6)groupBy:根据指定的规则进行分组,分区默认不变,数据会被打乱(shuffle)。极限情况下,数据可能被分到同一个分区中。
一个分区可以有多个组,一个组只能在一个分区中。
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator"))
val rdd = sc.makeRDD(
List(1, 2, 3, 4),2
)
// groupBy 会将数据源中的每一个数据进行分组判断,根据返回的分组key进行分组,相同的key值的数据会被放置在一个组中
def groupFunction(num:Int):Int = {
num % 2
}
val groupRDD = rdd.groupBy(groupFunction)
groupRDD.collect().foreach(println)
sc.stop()
(7)filter:根据指定的规则进行筛选过滤,符合规则的数据保留,不符合的丢弃。当数据进行筛选过滤后,分区不变,但是分区内数据可能不均衡,导致数据倾斜。
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator"))
val rdd = sc.makeRDD(
List(1, 2, 3, 4),2
)
val filterRDD = rdd.filter(_ % 2 == 1)
filterRDD.collect().foreach(println)
sc.stop()
(8) sortBy:根据指定规则进行排序,默认升序,设置第二个参数改变排序方式。默认情况下,不会改变分区个数,但是中间存在shuffle处理。
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator"))
val rdd = sc.makeRDD(
List(4, 5, 1, 3, 2, 6),2
)
val sortRDD = rdd.sortBy(num => num)
sortRDD.saveAsTextFile("output")
sc.stop()
1.5 Spark常用算子reduceByKey与groupByKey的区别,哪一种更具优势?
reduceByKey:按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回结果是RDD[k,v]
groupByKey:按照key进行分组,直接进行shuffle。
在性能上,reduceByKey
通常比groupByKey
更有优势,因为它在传输数据之前进行了预聚合。这意味着它只需要传递每个键的聚合结果,而不是所有的键值对。因此,当处理大量数据时,reduceByKey
可以显著减少网络中的数据传输量,从而提高性能。
1.6 Repartition和Coalesce关系与区别
Repartition和Coalesce都是用来改变RDD(弹性分布式数据集)的分区数量的操作,但它们在处理数据时的方式有所不同。
Repartition一定会发生shuffle,它底层调用的就是coalesce方法:coalesce(numPartitions, shuffle=true)。通过repartition操作,可以将RDD重新分区到指定的分区数量,而这个过程中一定会涉及数据的重新排序和分配,即shuffle。
Coalesce并不会一定会发生shuffle。实际上,Coalesce会根据传入的参数来判断是否发生shuffle。当传入的参数大于当前RDD的分区数量时,不会发生shuffle;而当参数小于当前RDD的分区数量时,会触发shuffle。在大部分情况下,如果需要增大RDD的分区数量,我们通常使用Repartition操作,而当需要减少RDD的分区数量时,则使用Coalesce操作。
1.7 分别简述Spark中的缓存机制(cache和persist)与checkpoint机制,并指出两者的区别与联系
- cache:cache是Spark中最常用的缓存机制,它会将RDD(弹性分布式数据集)或DataFrame缓存到内存中,以便在后续的Spark操作中重复使用。cache操作会触发一次RDD或DataFrame的计算,并将结果缓存在内存中,以便后续的Spark操作可以直接从内存中获取数据,避免了重复计算。
- persist:persist与cache类似,都是将RDD或DataFrame缓存到内存中,但persist提供了更多的灵活性。persist方法可以接受一个StorageLevel参数,用于指定缓存数据的存储级别。StorageLevel参数可以指定数据在不同的存储介质中进行缓存,例如内存、磁盘等。此外,persist还可以指定是否进行序列化和压缩等优化策略
- checkpoint机制是Spark中用于将RDD或DataFrame的数据持久化到磁盘中的机制。checkpoint机制会将数据写入到HDFS(分布式文件系统)或其他兼容的文件系统中,以便在Spark应用程序失败或重启后能够恢复数据。checkpoint机制通常用于数据量较大或计算时间较长的场景,以避免数据重新计算带来的开销。
区别与联系:
- 缓存机制(cache和persist)主要用于提高数据处理速度,避免重复计算,而checkpoint机制主要用于将数据持久化到磁盘中,以便在Spark应用程序失败或重启后能够恢复数据。
- 缓存机制将数据缓存在内存中,而checkpoint机制将数据持久化到磁盘中。因此,缓存机制适用于数据量较小或内存足够的场景,而checkpoint机制适用于数据量较大或需要持久化数据的场景。
- 缓存机制可以通过设置StorageLevel参数来指定数据的存储级别和优化策略,而checkpoint机制没有这样的灵活性。
1.8 简述Spark中共享变量(广播变量和累加器)的基本原理与用途。(重点)
广播变量在Spark中用于在分布式环境下共享不可变、只读、相同的变量。这种变量在每个机器上缓存一份,每个任务都能访问这个变量,这样可以节省资源和优化性能。
累加器是Spark中提供的一种分布式的变量机制,其原理类似于mapreduce。累加器在driver端创建并赋初值,随着任务的分发在taskExecutor执行更新。累加器只能在driver端读取,不能在executor端读取,在executor端只能通过add方法累加。不同executor的累加互不影响。累加器的一个常见用途是在调试时对作业执行过程中的事件进行计数。
共享变量出现的原因:
通常在向 Spark 传递函数时,比如使用 map() 函数或者用 filter() 传条件时,可以使用驱动器程序中定义的变量,但是集群中运行的每个任务都会得到这些变量的一份新的副本,更新这些副本的值也不会影响驱动器中的对应变量。
Spark的两个共享变量,累加器与广播变量,分别为结果聚合与广播这两种常见的通信模式突破了这一限制。
1.9京东:调优之前与调优之后性能的详细对比(例如调整map个数,map个数之前多少、之后多少,有什么提升)
这里举个例子。比如我们有几百个文件,会有几百个map出现,读取之后进行join操作,会非常的慢。这个时候我们可以进行coalesce操作,比如240个map,我们合成60个map,也就是窄依赖。这样再shuffle,过程产生的文件数会大大减少。提高join的时间性能。
1.10 kryo序列化
kryo 序列化比 Java 序列化更快更紧凑,但spark默认的序列化是 Java 序列化,因为spark 并不是支持所有类型,而且没次使用都必须进行注册。注册只针对于RDD。在DataFrames 和 DataSet 当中自动实现了 kryo 序列化。
1.11 创建临时表和全局临时表
DataFrame.createTempView() 创建普通临时表
DataFrame.createGlobalTempView() DataFrame.createOrReplaceTempView() 创建全局临时表
1.12 控制Spark reduce缓存 调优shuffle
spark.reducer.maxSizeInFilght 此参数为reduce task能够拉取多少数据量的一个参数默认48MB,当集群资源足够时,增大此参数可减少reduce拉取数据量的次数,从而达到优化shuffle的效果,一般调大为96MB,资源够大可继续往上跳。
1.13 SparkSQL中join操作与left join操作的区别?
join和sql中的inner join操作很相似,返回结果是前面一个集合和后面一个集合中匹配成功的,过滤掉关联不上的。
leftJoin类似于SQL中的左外关联left outer join,返回结果以第一个RDD为主,关联不上的记录为空。
部分场景下使用 left semi join 代替 left join:
因为 left semi join 是 in(KeySet)的关系,遇到右表重复记录,左表会跳过,性能更高,而 left join 则会一直遍历。但是 left semi join 中最后 select 的结果只允许出现左表中的列名,因为右表只有 join key 参与了计算。
2. Spark Streaming
2.1 简述SparkStreaming窗口函数的原理
窗口函数就是在原来定义的SparkStreaming计算批次大小的基础上再次进行封装,每次计算多个批次的数据,同时还需要传递一个滑动步长的参数,用来设置当次计算任务完成之后下一次从什么地方开始计算。
图中 time1 就是 SparkStreaming 计算批次大小,虚线框以及实线大框就是窗口大小,必须为批次的整数倍。虚线框到大实线框的距离(相隔多少批次),就是滑动步长。
2.2 Spark Streaming第一次运行不丢失数据
kafka参数 auto.offset.reset 参数设置成earliest 从最初始偏移量开始消费数据
2.3 Spark Streaming精准一次消费
- 手动维护偏移量
- 处理完业务数据后,再进行提交偏移量操作
极端情况下,如在提交偏移量时断网或停电会造成spark程序第二次启动时重复消费问题,所以在涉及到金额或精确性非常高的场景会使用事物保证精准一次消费
2.4 Spark Streaming控制每秒消费数据的速度
通过spark.streaming.kafka.maxRatePerPartition参数来设置Spark Streaming从kafka分区每秒拉取的条数
2.5 Spark Streaming背压机制
Spark Streaming的背压机制是一种解决流处理系统中流量毛刺和负载压力问题的机制。在流处理系统中,如果数据流入速度远高于数据处理速度,可能会导致Executor端出现OOM的情况或者任务崩溃。背压机制通过控制数据流入的速度,使数据能够均匀、稳定地进入处理系统,从而避免了对流处理系统构成巨大的负载压力的问题。
把spark.streaming.backpressure.enabled 参数设置为ture,开启背压机制后Spark Streaming会根据延迟动态去kafka消费数据,上限由spark.streaming.kafka.maxRatePerPartition参数控制,所以两个参数一般会一起使用
背压机制的优点是可以自动调整数据流入速度,缺点是需要进行压测以合理设置最大值,如果集群处理能力高于配置的速率,则会造成资源的浪费。此外,参数设置需要手动操作,并且设置后需要重启streaming服务。总体而言,背压机制是一种有效的解决流处理系统中流量毛刺和负载压力问题的手段,但实施时需要考虑集群处理能力和资源配置等因素。