logo头像
Snippet 博客主题

Spark学习之路 (四)Spark的广播变量和累加器

** Spark学习之路 (四)Spark的广播变量和累加器:** <Excerpt in index | 首页摘要>

​ 在spark程序中,当一个传递给Spark操作(例如map和reduce)的函数在远程节点上面运行时,Spark操作实际上操作的是这个函数所用变量的一个独立副本。这些变量会被复制到每台机器上,并且这些变量在远程机器上的所有更新都不会传递回驱动程序。通常跨任务的读写变量是低效的,但是,Spark还是为两种常见的使用模式提供了两种有限的共享变量:广播变(broadcast variable)和累加器(accumulator)

<The rest of contents | 余下全文>

一、广播变量broadcast variable

​ 广播变量允许程序员在每台机器上保留一个只读变量,而不是随副本一起发送它的副本。例如,它们可用于以有效的方式为每个节点提供大输入数据集的副本。Spark还尝试使用有效的广播算法来分发广播变量,以降低通信成本。

​ Spark动作通过一组阶段执行,由分布式“shuffle”操作分隔。Spark自动广播每个阶段中任务所需的公共数据。以这种方式广播的数据以序列化形式缓存并在运行每个任务之前反序列化。这意味着显式创建广播变量仅在跨多个阶段的任务需要相同数据或以反序列化形式缓存数据很重要时才有用。

​ 广播变量是v通过调用从变量创建的SparkContext.broadcast(v)。广播变量是一个包装器v,可以通过调用该value 方法来访问它的值。下面的代码显示了这个:

1.1 为什么要将变量定义成广播变量?

如果我们要在分布式计算里面分发大对象,例如:字典,集合,黑白名单等,这个都会由Driver端进行分发,一般来讲,如果这个变量不是广播变量,那么每个task就会分发一份,这在task数目十分多的情况下Driver的带宽会成为系统的瓶颈,而且会大量消耗task服务器上的资源,如果将这个变量声明为广播变量,那么知识每个executor拥有一份,这个executor启动的task会共享这个变量,节省了通信的成本和服务器的资源。

1.2 广播变量图解

错误的,不使用广播变量

img

正确的,使用广播变量的情况

img

2.3 如何定义一个广播变量?

1
2
val a = 3
val broadcast = sc.broadcast(a)

2.4 如何还原一个广播变量?

1
val c = broadcast.value

2.5 定义广播变量需要的注意点?

变量一旦被定义为一个广播变量,那么这个变量只能读,不能修改

2.6 注意事项

1、能不能将一个RDD使用广播变量广播出去?

​ 不能,因为RDD是不存储数据的。可以将RDD的结果广播出去。

2、 广播变量只能在Driver端定义,不能在Executor端定义。

3、 在Driver端可以修改广播变量的值,在Executor端无法修改广播变量的值。

4、如果executor端用到了Driver的变量,如果不使用广播变量在Executor有多少task就有多少Driver端的变量副本。

5、如果Executor端用到了Driver的变量,如果使用广播变量在每个Executor中只有一份Driver端的变量副本。

二、累加器

​ 累加器是仅通过关联和交换操作“添加”的变量,因此可以并行有效地支持。它们可用于实现计数器(如MapReduce)或总和。Spark本身支持数值类型的累加器,程序员可以添加对新类型的支持。

作为用户,您可以创建命名或未命名的累加器。如下图所示,命名累加器(在此实例中counter)将显示在Web UI中,用于修改该累加器的阶段。Spark显示“任务”表中任务修改的每个累加器的值。

Spark UI中的累加器

跟踪UI中的累加器对于理解运行阶段的进度非常有用(注意:Python中尚不支持)。

2.1 为什么要将一个变量定义为一个累加器?

​ 在spark应用程序中,我们经常会有这样的需求,如异常监控,调试,记录符合某特性的数据的数目,这种需求都需要用到计数器,如果一个变量不被声明为一个累加器,那么它将在被改变时不会再driver端进行全局汇总,即在分布式运行时每个task运行的只是原始变量的一个副本,并不能改变原始变量的值,但是当这个变量被声明为累加器后,该变量就会有分布式计数的功能。

2.2 图解累加器

错误的图解

img

正确的图解

img

2.3 如何定义一个累加器?

1
val a = sc.accumulator(0)

2.4 如何还原一个累加器?

1
val b = a.value

2.5 注意事项

1、 累加器在Driver端定义赋初始值,累加器只能在Driver端读取最后的值,在Excutor端更新。

2、累加器不是一个调优的操作,因为如果不这样做,结果是错的