override def write(records: Iterator[Product2[K, V]]): Unit = {
//是否在map端进行排序
//mapSideCombine如果这个值为真就在咋map端进行合并操作,比如现在map端的输出是
//hello 1 hello 1 经过聚合之后的数据就是hello 2
sorter = if (dep.mapSideCombine) {
//aggregator这是我们定义的算子排序的函数比如reduceByKey等等
//要求这个值为真才进行按照我们的逻辑进行聚合的操作
require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
//在map端的分区类按照key进行聚合
new ExternalSorter[K, V, C](
context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
} else {
// In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
// care whether the keys get sorted in each partition; that will be done on the reduce side
// if the operation being run is sortByKey.
//如果为假就忽略不用按照key进行聚合排序
new ExternalSorter[K, V, V](
context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
}
//得到上面是否在map端进行聚合的数据
sorter. (records)
// Don't bother including the time to open the merged output file in the shuffle write time,
// because it just opens a single file, so is typically too fast to measure accurately
// (see SPARK-3570).
//从shuffle的map输出的文件中去拉取数据
val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
val tmp = Utils.tempFileWith(output)
try {
//得到block的id也就是
val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
//按照id把文件写入本地不同的分类区中
val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
//提交开始写文件
shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
//把完成后的信息写入mapStatus中
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
} finally {
if (tmp.exists() && !tmp.delete()) {
logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")
}
}
}
2、采用类似拉链的操作去存储数据,只需记录下数据的开始和结束的位置
def writeIndexFileAndCommit(
shuffleId: Int,
mapId: Int,
lengths: Array[Long],
dataTmp: File): Unit = {
//得到目前要写文件的下标
val indexFile = getIndexFile(shuffleId, mapId)
val indexTmp = Utils.tempFileWith(indexFile)
try {
val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexTmp)))
Utils.tryWithSafeFinally {
// We take in lengths of each block, need to convert it to offsets.
var offset = 0L
out.writeLong(offset)
for (length <- lengths) {
//得到开始写数据的位置
offset += length
out.writeLong(offset)
}
} {
out.close()
}
val dataFile = getDataFile(shuffleId, mapId)
// There is only one IndexShuffleBlockResolver per executor, this synchronization make sure
// the following check and rename are atomic.
synchronized {
val existingLengths = checkIndexAndDataFile(indexFile, dataFile, lengths.length)
if (existingLengths != null) {
// Another attempt for the same task has already written our map outputs successfully,
// so just use the existing partition lengths and delete our temporary map outputs.
System.arraycopy(existingLengths, 0, lengths, 0, lengths.length)
if (dataTmp != null && dataTmp.exists()) {
dataTmp.delete()
}
indexTmp.delete()
} else {
// This is the first successful attempt in writing the map outputs for this task,
// so override any existing index and data files with the ones we wrote.
if (indexFile.exists()) {
indexFile.delete()
}
if (dataFile.exists()) {
dataFile.delete()
}
if (!indexTmp.renameTo(indexFile)) {
throw new IOException("fail to rename file " + indexTmp + " to " + indexFile)
}
if (dataTmp != null && dataTmp.exists() && !dataTmp.renameTo(dataFile)) {
throw new IOException("fail to rename file " + dataTmp + " to " + dataFile)
}
}
}
} finally {
if (indexTmp.exists() && !indexTmp.delete()) {
logError(s"Failed to delete temporary index file at ${indexTmp.getAbsolutePath}")
}
}
}
def writePartitionedFile(
blockId: BlockId,
outputFile: File): Array[Long] = {
// Track location of each range in the output file
//每一个分区还剩内存的数目的多少
val lengths = new Array[Long](numPartitions)
//得到一个写文件的句柄
val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize,
context.taskMetrics().shuffleWriteMetrics)
if (spills.isEmpty) {
// Case where we only have in-memory data
//如果我们定义了数据聚合的算子就为真,也就是需要用map对数据进行处理
//如果没有定义那么就直接用buffer存储数据
val collection = if (aggregator.isDefined) map else buffer
//得到一个写数据的迭代器便于把数据都写入文件中去
val it = collection.destructiveSortedWritablePartitionedIterator(comparator)
while (it.hasNext) {
val partitionId = it.nextPartition()
while (it.hasNext && it.nextPartition() == partitionId) {
it.writeNext(writer)
}
//得到一个端的地址
val segment = writer.commitAndGet()
lengths(partitionId) = segment.length
}
} else {
// We must perform merge-sort; get an iterator by partition and write everything directly.
for ((id, elements) <- this.partitionedIterator) {
if (elements.hasNext) {
for (elem <- elements) {
writer.write(elem._1, elem._2)
}
val segment = writer.commitAndGet()
lengths(id) = segment.length
}
}
}
writer.close()
context.taskMetrics().incMemoryBytesSpilled(memoryBytesSpilled)
context.taskMetrics().incDiskBytesSpilled(diskBytesSpilled)
context.taskMetrics().incPeakExecutionMemory(peakMemoryUsedBytes)
lengths
}
json File 日期类型 怎样处理?怎样从字符型,转换为Date或DateTime类型?json文件如下,有字符格式的日期类型```{ name : Andy, age : 30, time :2015-03-03T08:25:55.769Z}{ name : Justin, age : 19, time : 2015-04-04T08:25:55.769Z }{ name : pan, ag.....
一、spark普通shuffle的基本原理
1、假如现在在一个节点上由4个shufflemapTask在执行,但是这个节点的core的数量数2,在远端有4个resultTask等待接收shuffleMapTask的数据进行处理
2、这样可以有两个shufflemaptask可以同时执行,在每一个shufflemaptask下面都会产生4个bucket,这是为什么呢,因为每一个shufflemaptask都会为每一个resulttask建立一个数据分区,但是这个bucket是在内存中的当数量达到一定的阈值的时候就会把数据写入本地的磁盘当中也就是shuffleblockfile。
3、shufflemaptask的输出会作为mapstatus发送到DAGscheduler上面mapoutputTracker上面的Master上面去。
4、在resultTask需要拉取数据的时候会去找mapstatus然后使用BlockManager把数据拉取到本地。(到这儿有没有觉得这和MapReduce的执行过程简直就是一样的,其实不然他们还是有那么一点区别,MapReduce在shuffle阶段需要把数据完全存储完之后才把reduce采取拉取数据,但是spark的shuffle阶段不需要这样shufflemaptask可以一边把数据写入本地的缓存,resultTask可以一边读取数据,这样的操作的速度是不是比mapreduce会,这是为什么呢,因为在hadoop的MapReduce阶段存在在分区内按照key排序,这就是为啥不能像spark的shuffle的原因)
5、说了这么多还不如看看图
6、存在的问题:
假如有1000个shufflemaptask,1000个resultTask那么就会产生100万个磁盘文件,这样在会进行多次的磁盘io,由于磁盘io速度很慢,这样磁盘io就会严重的降低了整个系统的性能。
二、优化后的shuffle原理
1、可以自定义是否在map端进行聚合排序等操作
2、采用类似拉链的操作去存储数据,只需记录下数据的开始和结束的位置
3、示意图
相关问题推荐
自己随意编写一份测试数据,所用的测试数据如下,需求是按照第一列的字母分组,然后按照第二列数据取出每一组内前N个数据,后面我分别列出了我使用的三种方案来实现该需求,不同方案在不同的场景下会有各自的优势 a 25b 36c 24d 45e 60a 33b 26c 47d 43e 62a...
1. Spark 概述1.1. 什么是 Spark(官网:http://spark.apache.org)spark 中文官网:http://spark.apachecn.orgSpark 是一种快速、通用、可扩展的大数据分析引擎,2009 年诞生于加州大学伯克利分校AMPLab,2010 年开源,2013 年 6 月成为 Apache 孵化项目,20...
搭建高可用模式用的协同处理框架。
多数是版本不对、、、、、、引用错误,,,等
目前,还是选择saprkflink还有成长的空间
org.springframework.boot spring-boot-starter-parent 1.3.2.RELEASE 2.10.4 1.6.2 ...
一、图概念术语1.1 基本概念图是由顶点集合(vertex)及顶点间的关系集合(边edge)组成的一种数据结构。这里的图并非指代数中的图。图可以对事物以及事物之间的关系建模,图可以用来表示自然发生的连接数据,如:社交网络、互联网web页面常用的应用有:在地图...
该模式被称为 Local[N] 模式,是用单机的多个线程来模拟 Spark 分布式计算,通常用来验证开发出来的应用程序逻辑上有没有问题其中N代表可以使用 N 个线程,每个线程拥有一个 core 。如果不指定 N,则默认是1个线程(该线程有1个 core )。如果是 local[*],则...
Spark Streaming的输出一般是靠foreachRDD()算子来实现,它默认是at least once的。如果输出过程中途出错,那么就会重复执行直到写入成功。为了让它符合exactly once,可以施加两种限制之一:幂等性写入(idempotent write)、事务性写入(transactional write...
json File 日期类型 怎样处理?怎样从字符型,转换为Date或DateTime类型?json文件如下,有字符格式的日期类型```{ name : Andy, age : 30, time :2015-03-03T08:25:55.769Z}{ name : Justin, age : 19, time : 2015-04-04T08:25:55.769Z }{ name : pan, ag.....
python中 方法中定义了一个其他的方法,调用方法的时候,也执行了其他的方法def outer(): name = out print(name) def inner(): name = inner print(name) return inner s...
一:Spark集群部署二:Job提交解密三:Job生成和接受四:Task的运行五:再论shuffle1,从spark Runtime 的角度讲来讲有5大核心对象:Master , Worker , Executor ,Driver , Co...
1).使用程序中的集合创建rdd2).使用本地文件系统创建rdd3).使用hdfs创建rdd,4).基于数据库db创建rdd5).基于Nosql创建rdd,如hbase6).基于s3创建rdd,7).基于数据流,如socket创建rdd
1)自动的进行内存和磁盘的存储切换;2)基于Lineage的高效容错;3)task如果失败会自动进行特定次数的重试;4)stage如果失败会自动进行特定次数的重试,而且只会计算失败的分片;5)checkpoint和persist,数据计算之后持久化缓存6)数据调度弹性,DAG TASK...
Spark中数据的本地化方式分为5种1、PROCESS_LOCAL : 进程本地化,指task计算的数据在本进程(Executor)中2、NODE_LOCAL:节点本地化,指task计算的数据在本节点(node)的磁盘上,当task在本进程中一直没有执行(如果Driver分发task 3s后没有执行,且重复5次...