Spark】Spark分组后为什么会在某个分组下出现 其他组的数据?

2021-02-25 19:18发布

1条回答

自己随意编写一份测试数据,所用的测试数据如下,需求是按照第一列的字母分组,然后按照第二列数据取出每一组内前N个数据,后面我分别列出了我使用的三种方案来实现该需求,不同方案在不同的场景下会有各自的优势


 


a 25

b 36

c 24

d 45

e 60

a 33

b 26

c 47

d 43

e 62

a 13

b 16

c 42

d 66

e 31

a 19

b 75

c 61

d 71

e 80

a 85

b 90

c 54

d 48

e 62

 


第一种方式:适合求每一组别中所需要的top个数很大的情况,是对数据分组后对每一个组内进行排序,先获得所有组的key的集合,然后循环每个key排序,最后只需要采用take(num)即可得到前num个数据。


import java.util.Iterator;

import java.util.List;

 

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.FlatMapFunction;

import org.apache.spark.api.java.function.Function;

import org.apache.spark.api.java.function.PairFunction;

import org.apache.spark.api.java.function.VoidFunction;

 

import scala.Tuple2;

 

/*适合求每一组别中所需要的top个数很大的情况*/

 

public class GroupTopN {

public static void main(String[] args) {

SparkConf conf = new SparkConf().setAppName("GroupTopN").setMaster("local");

JavaSparkContext sc = new JavaSparkContext(conf);

JavaPairRDD> grouppair = sc.textFile("E:/mr/grouptopn.txt").mapToPair(new PairFunction() {

 

private static final long serialVersionUID = 1L;

 

@Override

public Tuple2 call(String line) throws Exception {

return new Tuple2(line.split(" ")[0],Integer.parseInt(line.split(" ")[1]));

}

}).groupByKey();

// System.out.println(linepair.count());

List keys = grouppair.map(new Function>, String>() {

 

private static final long serialVersionUID = 1L;

 

@Override

public String call(Tuple2> tuple) throws Exception {

 

return tuple._1;

}

}).collect();

for (int i = 0; i < keys>

System.out.println(keys.get(i));

final int key = i;

JavaPairRDD result = grouppair.filter(new Function>, Boolean>() {

 

private static final long serialVersionUID = 1L;

 

@Override

public Boolean call(Tuple2> tuple) throws Exception {

return tuple._1.equals(keys.get(key));

}

}).flatMap(new FlatMapFunction>, Integer>() {

 

private static final long serialVersionUID = 1L;

@Override

public Iterator call(Tuple2> tuple) throws Exception {

return tuple._2.iterator();

}

}).mapToPair(new PairFunction() {

private static final long serialVersionUID = 1L;

@Override

public Tuple2 call(Integer in) throws Exception {

return new Tuple2(in, keys.get(key));

}

}).sortByKey(false).mapToPair(new PairFunction, String, Integer>() {

private static final long serialVersionUID = 1L;

@Override

public Tuple2 call(Tuple2 tuple) throws Exception {

 

return new Tuple2(tuple._2, tuple._1);

}

});

List>  list = result.take(4);

for (Tuple2 tuple2 : list) {

System.out.println(tuple2._1+" "+tuple2._2);

}

// result.foreach(new VoidFunction>() {

//

// private static final long serialVersionUID = 1L;

//

// @Override

// public void call(Tuple2 tuple) throws Exception {

// System.out.println(tuple._1+" "+tuple._2);

//

// }

// });

}

sc.close();

}

 

}

测试结果如下:


2018-07-17 10:19:41 INFO  DAGScheduler:54 - Job 0 finished: collect at GroupTopN.java:44, took 0.374648 s

d

2018-07-17 10:19:41 INFO  SparkContext:54 - Starting job: take at


……

GroupTopN.java:88, took 0.080054 s

d 71

d 66

d 48

d 45


 


第二种方式:当所求组内数据量不大时采取此方法较为合适,是将每一个组内的value值都存放在list中,使用Collections.sort(List,Comparator)给list排序,重写public int compare(Object o1, Object o2) 方法自定义实现逆序排列,然后再截取list。


import java.util.ArrayList;

import java.util.Collections;

import java.util.Comparator;

import java.util.Iterator;

import java.util.List;

 

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.PairFunction;

import org.apache.spark.api.java.function.VoidFunction;

 

import scala.Tuple2;

 

/*当所求组内数据量不大时采取此方法较为合适*/

 

public class GroupTopN2 {

public static void main(String[] args) {

SparkConf conf = new SparkConf().setAppName("GroupTopN2").setMaster("local");

JavaSparkContext sc = new JavaSparkContext(conf);

JavaPairRDD> grouppair = sc.textFile("E:/mr/grouptopn.txt").mapToPair(new PairFunction() {

 

private static final long serialVersionUID = 1L;

 

@Override

public Tuple2 call(String line) throws Exception {

return new Tuple2(line.split(" ")[0],Integer.parseInt(line.split(" ")[1]));

}

}).groupByKey();

System.out.println(grouppair.count());

JavaPairRDD> result = grouppair.mapToPair(new PairFunction>, String, Iterable>() {

 

private static final long serialVersionUID = 1L;

 

@Override

public Tuple2> call(Tuple2> tuple) throws Exception {

List list = new ArrayList<>();

Iterator it = tuple._2.iterator();

while (it.hasNext()) {

int in = it.next();

list.add(in);

}

Collections.sort(list, new Comparator() {

 

@Override

public int compare(Integer o1, Integer o2) {

return -o1.compareTo(o2);

}

});

List re = list.subList(0, 4);

return new Tuple2>(tuple._1, re);

}

});

result.foreach(new VoidFunction>>() {

 

private static final long serialVersionUID = 1L;

 

@Override

public void call(Tuple2> tuple) throws Exception {

System.out.println(tuple._1+" "+tuple._2);

}

});

sc.close();

}

 

}

测试结果如下:


d [71, 66, 48, 45]

e [80, 62, 62, 60]

a [85, 33, 25, 19]

b [90, 75, 36, 26]

c [61, 54, 47, 42]

 


第三种方式:利用插入排序的思想,适用于那些组内数据量大,但所取top数量较小时,定义一个大小为N的数组存TopN的数据


import java.util.ArrayList;

import java.util.Iterator;

import java.util.List;

 

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.PairFunction;

import org.apache.spark.api.java.function.VoidFunction;

 

import scala.Tuple2;

 

/*利用插入排序的思想,适用于那些组内数据量大,但所取top数量较小时*/

 

public class GroupTopN3 {

public static void main(String[] args) {

SparkConf conf = new SparkConf().setAppName("GroupTopN3").setMaster("local");

JavaSparkContext sc = new JavaSparkContext(conf);

JavaPairRDD> grouppair = sc.textFile("E:/mr/grouptopn.txt").mapToPair(new PairFunction() {

 

private static final long serialVersionUID = 1L;

 

@Override

public Tuple2 call(String line) throws Exception {

return new Tuple2(line.split(" ")[0],Integer.parseInt(line.split(" ")[1]));

}

}).groupByKey();

System.out.println(grouppair.count());

JavaPairRDD> result = grouppair.mapToPair(new PairFunction>, String, Iterable>() {

 

private static final long serialVersionUID = 1L;

 

@Override

public Tuple2> call(Tuple2> tuple) throws Exception {

List list = new ArrayList<>();

int[] arrin = new int[4];

Iterator it = tuple._2.iterator();

while (it.hasNext()) {

int in = it.next();

for (int i = 0; i < arrin>

if (in>arrin[i]) {

for(int j =arrin.length-1;j>i;j--){

arrin[j] = arrin[j-1];

}

arrin[i] = in;

break;

}

}

}

for (int i = 0; i < arrin>

list.add(arrin[i]);

}

return new Tuple2>(tuple._1, list);

}

});

result.foreach(new VoidFunction>>() {

 

private static final long serialVersionUID = 1L;

 

@Override

public void call(Tuple2> tuple) throws Exception {

System.out.println(tuple._1+" "+tuple._2);

}

});

sc.close();

}

}

测试结果如下:


d [71, 66, 48, 45]

e [80, 62, 62, 60]

a [85, 33, 25, 19]

b [90, 75, 36, 26]

c [61, 54, 47, 42]

————————————————


相关问题推荐

  • 回答 9

    1. Spark 概述1.1. 什么是 Spark(官网:http://spark.apache.org)spark 中文官网:http://spark.apachecn.orgSpark 是一种快速、通用、可扩展的大数据分析引擎,2009 年诞生于加州大学伯克利分校AMPLab,2010 年开源,2013 年 6 月成为 Apache 孵化项目,20...

  • 回答 2

    搭建高可用模式用的协同处理框架。

  • 回答 1

    多数是版本不对、、、、、、引用错误,,,等

  • 回答 1

    目前,还是选择saprkflink还有成长的空间

  • 回答 2

            org.springframework.boot         spring-boot-starter-parent         1.3.2.RELEASE                             2.10.4         1.6.2                       ...

  • 回答 1

    一、图概念术语1.1 基本概念图是由顶点集合(vertex)及顶点间的关系集合(边edge)组成的一种数据结构。这里的图并非指代数中的图。图可以对事物以及事物之间的关系建模,图可以用来表示自然发生的连接数据,如:社交网络、互联网web页面常用的应用有:在地图...

  • 回答 4

    该模式被称为 Local[N] 模式,是用单机的多个线程来模拟 Spark 分布式计算,通常用来验证开发出来的应用程序逻辑上有没有问题其中N代表可以使用 N 个线程,每个线程拥有一个 core 。如果不指定 N,则默认是1个线程(该线程有1个 core )。如果是 local[*],则...

  • 回答 2

    Spark Streaming的输出一般是靠foreachRDD()算子来实现,它默认是at least once的。如果输出过程中途出错,那么就会重复执行直到写入成功。为了让它符合exactly once,可以施加两种限制之一:幂等性写入(idempotent write)、事务性写入(transactional write...

  • 回答 1

    json File 日期类型 怎样处理?怎样从字符型,转换为Date或DateTime类型?json文件如下,有字符格式的日期类型```{ name : Andy, age : 30, time :2015-03-03T08:25:55.769Z}{ name : Justin, age : 19, time : 2015-04-04T08:25:55.769Z }{ name : pan, ag.....

  • 什么叫闭包?Spark 2020-05-25 22:30
    回答 2

    python中    方法中定义了一个其他的方法,调用方法的时候,也执行了其他的方法def outer():     name = out     print(name)     def inner():         name = inner         print(name)     return inner   s...

  • 回答 2

    一:Spark集群部署二:Job提交解密三:Job生成和接受四:Task的运行五:再论shuffle1,从spark Runtime 的角度讲来讲有5大核心对象:Master , Worker , Executor ,Driver , Co...

  • 回答 3

    1).使用程序中的集合创建rdd2).使用本地文件系统创建rdd3).使用hdfs创建rdd,4).基于数据库db创建rdd5).基于Nosql创建rdd,如hbase6).基于s3创建rdd,7).基于数据流,如socket创建rdd

  • 回答 3

    1)自动的进行内存和磁盘的存储切换;2)基于Lineage的高效容错;3)task如果失败会自动进行特定次数的重试;4)stage如果失败会自动进行特定次数的重试,而且只会计算失败的分片;5)checkpoint和persist,数据计算之后持久化缓存6)数据调度弹性,DAG TASK...

  • 回答 1

    Spark中数据的本地化方式分为5种1、PROCESS_LOCAL : 进程本地化,指task计算的数据在本进程(Executor)中2、NODE_LOCAL:节点本地化,指task计算的数据在本节点(node)的磁盘上,当task在本进程中一直没有执行(如果Driver分发task 3s后没有执行,且重复5次...

  • 回答 1

    一、spark普通shuffle的基本原理      1、假如现在在一个节点上由4个shufflemapTask在执行,但是这个节点的core的数量数2,在远端有4个resultTask等待接收shuffleMapTask的数据进行处理      2、这样可以有两个shufflemaptask可以同时执行,在每一...

没有解决我的问题,去提问