2021-02-25 19:18发布
自己随意编写一份测试数据,所用的测试数据如下,需求是按照第一列的字母分组,然后按照第二列数据取出每一组内前N个数据,后面我分别列出了我使用的三种方案来实现该需求,不同方案在不同的场景下会有各自的优势
a 25
b 36
c 24
d 45
e 60
a 33
b 26
c 47
d 43
e 62
a 13
b 16
c 42
d 66
e 31
a 19
b 75
c 61
d 71
e 80
a 85
b 90
c 54
d 48
第一种方式:适合求每一组别中所需要的top个数很大的情况,是对数据分组后对每一个组内进行排序,先获得所有组的key的集合,然后循环每个key排序,最后只需要采用take(num)即可得到前num个数据。
import java.util.Iterator;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
/*适合求每一组别中所需要的top个数很大的情况*/
public class GroupTopN {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("GroupTopN").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaPairRDD> grouppair = sc.textFile("E:/mr/grouptopn.txt").mapToPair(new PairFunction() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2 call(String line) throws Exception {
return new Tuple2(line.split(" ")[0],Integer.parseInt(line.split(" ")[1]));
}
}).groupByKey();
// System.out.println(linepair.count());
List keys = grouppair.map(new Function>, String>() {
public String call(Tuple2> tuple) throws Exception {
return tuple._1;
}).collect();
for (int i = 0; i < keys>
System.out.println(keys.get(i));
final int key = i;
JavaPairRDD result = grouppair.filter(new Function>, Boolean>() {
public Boolean call(Tuple2> tuple) throws Exception {
return tuple._1.equals(keys.get(key));
}).flatMap(new FlatMapFunction>, Integer>() {
public Iterator call(Tuple2> tuple) throws Exception {
return tuple._2.iterator();
}).mapToPair(new PairFunction() {
public Tuple2 call(Integer in) throws Exception {
return new Tuple2(in, keys.get(key));
}).sortByKey(false).mapToPair(new PairFunction, String, Integer>() {
public Tuple2 call(Tuple2 tuple) throws Exception {
return new Tuple2(tuple._2, tuple._1);
});
List> list = result.take(4);
for (Tuple2 tuple2 : list) {
System.out.println(tuple2._1+" "+tuple2._2);
// result.foreach(new VoidFunction>() {
//
// private static final long serialVersionUID = 1L;
// @Override
// public void call(Tuple2 tuple) throws Exception {
// System.out.println(tuple._1+" "+tuple._2);
// }
// });
sc.close();
测试结果如下:
2018-07-17 10:19:41 INFO DAGScheduler:54 - Job 0 finished: collect at GroupTopN.java:44, took 0.374648 s
d
2018-07-17 10:19:41 INFO SparkContext:54 - Starting job: take at
……
GroupTopN.java:88, took 0.080054 s
第二种方式:当所求组内数据量不大时采取此方法较为合适,是将每一个组内的value值都存放在list中,使用Collections.sort(List,Comparator)给list排序,重写public int compare(Object o1, Object o2) 方法自定义实现逆序排列,然后再截取list。
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
/*当所求组内数据量不大时采取此方法较为合适*/
public class GroupTopN2 {
SparkConf conf = new SparkConf().setAppName("GroupTopN2").setMaster("local");
System.out.println(grouppair.count());
JavaPairRDD> result = grouppair.mapToPair(new PairFunction>, String, Iterable>() {
public Tuple2> call(Tuple2> tuple) throws Exception {
List list = new ArrayList<>();
Iterator it = tuple._2.iterator();
while (it.hasNext()) {
int in = it.next();
list.add(in);
Collections.sort(list, new Comparator() {
public int compare(Integer o1, Integer o2) {
return -o1.compareTo(o2);
List re = list.subList(0, 4);
return new Tuple2>(tuple._1, re);
result.foreach(new VoidFunction>>() {
public void call(Tuple2> tuple) throws Exception {
System.out.println(tuple._1+" "+tuple._2);
d [71, 66, 48, 45]
e [80, 62, 62, 60]
a [85, 33, 25, 19]
b [90, 75, 36, 26]
c [61, 54, 47, 42]
第三种方式:利用插入排序的思想,适用于那些组内数据量大,但所取top数量较小时,定义一个大小为N的数组存TopN的数据
/*利用插入排序的思想,适用于那些组内数据量大,但所取top数量较小时*/
public class GroupTopN3 {
SparkConf conf = new SparkConf().setAppName("GroupTopN3").setMaster("local");
int[] arrin = new int[4];
for (int i = 0; i < arrin>
if (in>arrin[i]) {
for(int j =arrin.length-1;j>i;j--){
arrin[j] = arrin[j-1];
arrin[i] = in;
break;
list.add(arrin[i]);
return new Tuple2>(tuple._1, list);
————————————————
1. Spark 概述1.1. 什么是 Spark(官网:http://spark.apache.org)spark 中文官网:http://spark.apachecn.orgSpark 是一种快速、通用、可扩展的大数据分析引擎,2009 年诞生于加州大学伯克利分校AMPLab,2010 年开源,2013 年 6 月成为 Apache 孵化项目,20...
搭建高可用模式用的协同处理框架。
多数是版本不对、、、、、、引用错误,,,等
目前,还是选择saprkflink还有成长的空间
org.springframework.boot spring-boot-starter-parent 1.3.2.RELEASE 2.10.4 1.6.2 ...
一、图概念术语1.1 基本概念图是由顶点集合(vertex)及顶点间的关系集合(边edge)组成的一种数据结构。这里的图并非指代数中的图。图可以对事物以及事物之间的关系建模,图可以用来表示自然发生的连接数据,如:社交网络、互联网web页面常用的应用有:在地图...
该模式被称为 Local[N] 模式,是用单机的多个线程来模拟 Spark 分布式计算,通常用来验证开发出来的应用程序逻辑上有没有问题其中N代表可以使用 N 个线程,每个线程拥有一个 core 。如果不指定 N,则默认是1个线程(该线程有1个 core )。如果是 local[*],则...
Spark Streaming的输出一般是靠foreachRDD()算子来实现,它默认是at least once的。如果输出过程中途出错,那么就会重复执行直到写入成功。为了让它符合exactly once,可以施加两种限制之一:幂等性写入(idempotent write)、事务性写入(transactional write...
json File 日期类型 怎样处理?怎样从字符型,转换为Date或DateTime类型?json文件如下,有字符格式的日期类型```{ name : Andy, age : 30, time :2015-03-03T08:25:55.769Z}{ name : Justin, age : 19, time : 2015-04-04T08:25:55.769Z }{ name : pan, ag.....
python中 方法中定义了一个其他的方法,调用方法的时候,也执行了其他的方法def outer(): name = out print(name) def inner(): name = inner print(name) return inner s...
一:Spark集群部署二:Job提交解密三:Job生成和接受四:Task的运行五:再论shuffle1,从spark Runtime 的角度讲来讲有5大核心对象:Master , Worker , Executor ,Driver , Co...
1).使用程序中的集合创建rdd2).使用本地文件系统创建rdd3).使用hdfs创建rdd,4).基于数据库db创建rdd5).基于Nosql创建rdd,如hbase6).基于s3创建rdd,7).基于数据流,如socket创建rdd
1)自动的进行内存和磁盘的存储切换;2)基于Lineage的高效容错;3)task如果失败会自动进行特定次数的重试;4)stage如果失败会自动进行特定次数的重试,而且只会计算失败的分片;5)checkpoint和persist,数据计算之后持久化缓存6)数据调度弹性,DAG TASK...
Spark中数据的本地化方式分为5种1、PROCESS_LOCAL : 进程本地化,指task计算的数据在本进程(Executor)中2、NODE_LOCAL:节点本地化,指task计算的数据在本节点(node)的磁盘上,当task在本进程中一直没有执行(如果Driver分发task 3s后没有执行,且重复5次...
一、spark普通shuffle的基本原理 1、假如现在在一个节点上由4个shufflemapTask在执行,但是这个节点的core的数量数2,在远端有4个resultTask等待接收shuffleMapTask的数据进行处理 2、这样可以有两个shufflemaptask可以同时执行,在每一...
最多设置5个标签!
自己随意编写一份测试数据,所用的测试数据如下,需求是按照第一列的字母分组,然后按照第二列数据取出每一组内前N个数据,后面我分别列出了我使用的三种方案来实现该需求,不同方案在不同的场景下会有各自的优势
a 25
b 36
c 24
d 45
e 60
a 33
b 26
c 47
d 43
e 62
a 13
b 16
c 42
d 66
e 31
a 19
b 75
c 61
d 71
e 80
a 85
b 90
c 54
d 48
e 62
第一种方式:适合求每一组别中所需要的top个数很大的情况,是对数据分组后对每一个组内进行排序,先获得所有组的key的集合,然后循环每个key排序,最后只需要采用take(num)即可得到前num个数据。
import java.util.Iterator;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
/*适合求每一组别中所需要的top个数很大的情况*/
public class GroupTopN {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("GroupTopN").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaPairRDD> grouppair = sc.textFile("E:/mr/grouptopn.txt").mapToPair(new PairFunction() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2 call(String line) throws Exception {
return new Tuple2(line.split(" ")[0],Integer.parseInt(line.split(" ")[1]));
}
}).groupByKey();
// System.out.println(linepair.count());
List keys = grouppair.map(new Function>, String>() {
private static final long serialVersionUID = 1L;
@Override
public String call(Tuple2> tuple) throws Exception {
return tuple._1;
}
}).collect();
for (int i = 0; i < keys>
System.out.println(keys.get(i));
final int key = i;
JavaPairRDD result = grouppair.filter(new Function>, Boolean>() {
private static final long serialVersionUID = 1L;
@Override
public Boolean call(Tuple2> tuple) throws Exception {
return tuple._1.equals(keys.get(key));
}
}).flatMap(new FlatMapFunction>, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Iterator call(Tuple2> tuple) throws Exception {
return tuple._2.iterator();
}
}).mapToPair(new PairFunction() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2 call(Integer in) throws Exception {
return new Tuple2(in, keys.get(key));
}
}).sortByKey(false).mapToPair(new PairFunction, String, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2 call(Tuple2 tuple) throws Exception {
return new Tuple2(tuple._2, tuple._1);
}
});
List> list = result.take(4);
for (Tuple2 tuple2 : list) {
System.out.println(tuple2._1+" "+tuple2._2);
}
// result.foreach(new VoidFunction>() {
//
// private static final long serialVersionUID = 1L;
//
// @Override
// public void call(Tuple2 tuple) throws Exception {
// System.out.println(tuple._1+" "+tuple._2);
//
// }
// });
}
sc.close();
}
}
测试结果如下:
2018-07-17 10:19:41 INFO DAGScheduler:54 - Job 0 finished: collect at GroupTopN.java:44, took 0.374648 s
d
2018-07-17 10:19:41 INFO SparkContext:54 - Starting job: take at
……
GroupTopN.java:88, took 0.080054 s
d 71
d 66
d 48
d 45
第二种方式:当所求组内数据量不大时采取此方法较为合适,是将每一个组内的value值都存放在list中,使用Collections.sort(List,Comparator)给list排序,重写public int compare(Object o1, Object o2) 方法自定义实现逆序排列,然后再截取list。
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
/*当所求组内数据量不大时采取此方法较为合适*/
public class GroupTopN2 {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("GroupTopN2").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaPairRDD> grouppair = sc.textFile("E:/mr/grouptopn.txt").mapToPair(new PairFunction() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2 call(String line) throws Exception {
return new Tuple2(line.split(" ")[0],Integer.parseInt(line.split(" ")[1]));
}
}).groupByKey();
System.out.println(grouppair.count());
JavaPairRDD> result = grouppair.mapToPair(new PairFunction>, String, Iterable>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2> call(Tuple2> tuple) throws Exception {
List list = new ArrayList<>();
Iterator it = tuple._2.iterator();
while (it.hasNext()) {
int in = it.next();
list.add(in);
}
Collections.sort(list, new Comparator() {
@Override
public int compare(Integer o1, Integer o2) {
return -o1.compareTo(o2);
}
});
List re = list.subList(0, 4);
return new Tuple2>(tuple._1, re);
}
});
result.foreach(new VoidFunction>>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Tuple2> tuple) throws Exception {
System.out.println(tuple._1+" "+tuple._2);
}
});
sc.close();
}
}
测试结果如下:
d [71, 66, 48, 45]
e [80, 62, 62, 60]
a [85, 33, 25, 19]
b [90, 75, 36, 26]
c [61, 54, 47, 42]
第三种方式:利用插入排序的思想,适用于那些组内数据量大,但所取top数量较小时,定义一个大小为N的数组存TopN的数据
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
/*利用插入排序的思想,适用于那些组内数据量大,但所取top数量较小时*/
public class GroupTopN3 {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("GroupTopN3").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaPairRDD> grouppair = sc.textFile("E:/mr/grouptopn.txt").mapToPair(new PairFunction() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2 call(String line) throws Exception {
return new Tuple2(line.split(" ")[0],Integer.parseInt(line.split(" ")[1]));
}
}).groupByKey();
System.out.println(grouppair.count());
JavaPairRDD> result = grouppair.mapToPair(new PairFunction>, String, Iterable>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2> call(Tuple2> tuple) throws Exception {
List list = new ArrayList<>();
int[] arrin = new int[4];
Iterator it = tuple._2.iterator();
while (it.hasNext()) {
int in = it.next();
for (int i = 0; i < arrin>
if (in>arrin[i]) {
for(int j =arrin.length-1;j>i;j--){
arrin[j] = arrin[j-1];
}
arrin[i] = in;
break;
}
}
}
for (int i = 0; i < arrin>
list.add(arrin[i]);
}
return new Tuple2>(tuple._1, list);
}
});
result.foreach(new VoidFunction>>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Tuple2> tuple) throws Exception {
System.out.println(tuple._1+" "+tuple._2);
}
});
sc.close();
}
}
测试结果如下:
d [71, 66, 48, 45]
e [80, 62, 62, 60]
a [85, 33, 25, 19]
b [90, 75, 36, 26]
c [61, 54, 47, 42]
————————————————
相关问题推荐
1. Spark 概述1.1. 什么是 Spark(官网:http://spark.apache.org)spark 中文官网:http://spark.apachecn.orgSpark 是一种快速、通用、可扩展的大数据分析引擎,2009 年诞生于加州大学伯克利分校AMPLab,2010 年开源,2013 年 6 月成为 Apache 孵化项目,20...
搭建高可用模式用的协同处理框架。
多数是版本不对、、、、、、引用错误,,,等
目前,还是选择saprkflink还有成长的空间
org.springframework.boot spring-boot-starter-parent 1.3.2.RELEASE 2.10.4 1.6.2 ...
一、图概念术语1.1 基本概念图是由顶点集合(vertex)及顶点间的关系集合(边edge)组成的一种数据结构。这里的图并非指代数中的图。图可以对事物以及事物之间的关系建模,图可以用来表示自然发生的连接数据,如:社交网络、互联网web页面常用的应用有:在地图...
该模式被称为 Local[N] 模式,是用单机的多个线程来模拟 Spark 分布式计算,通常用来验证开发出来的应用程序逻辑上有没有问题其中N代表可以使用 N 个线程,每个线程拥有一个 core 。如果不指定 N,则默认是1个线程(该线程有1个 core )。如果是 local[*],则...
Spark Streaming的输出一般是靠foreachRDD()算子来实现,它默认是at least once的。如果输出过程中途出错,那么就会重复执行直到写入成功。为了让它符合exactly once,可以施加两种限制之一:幂等性写入(idempotent write)、事务性写入(transactional write...
json File 日期类型 怎样处理?怎样从字符型,转换为Date或DateTime类型?json文件如下,有字符格式的日期类型```{ name : Andy, age : 30, time :2015-03-03T08:25:55.769Z}{ name : Justin, age : 19, time : 2015-04-04T08:25:55.769Z }{ name : pan, ag.....
python中 方法中定义了一个其他的方法,调用方法的时候,也执行了其他的方法def outer(): name = out print(name) def inner(): name = inner print(name) return inner s...
一:Spark集群部署二:Job提交解密三:Job生成和接受四:Task的运行五:再论shuffle1,从spark Runtime 的角度讲来讲有5大核心对象:Master , Worker , Executor ,Driver , Co...
1).使用程序中的集合创建rdd2).使用本地文件系统创建rdd3).使用hdfs创建rdd,4).基于数据库db创建rdd5).基于Nosql创建rdd,如hbase6).基于s3创建rdd,7).基于数据流,如socket创建rdd
1)自动的进行内存和磁盘的存储切换;2)基于Lineage的高效容错;3)task如果失败会自动进行特定次数的重试;4)stage如果失败会自动进行特定次数的重试,而且只会计算失败的分片;5)checkpoint和persist,数据计算之后持久化缓存6)数据调度弹性,DAG TASK...
Spark中数据的本地化方式分为5种1、PROCESS_LOCAL : 进程本地化,指task计算的数据在本进程(Executor)中2、NODE_LOCAL:节点本地化,指task计算的数据在本节点(node)的磁盘上,当task在本进程中一直没有执行(如果Driver分发task 3s后没有执行,且重复5次...
一、spark普通shuffle的基本原理 1、假如现在在一个节点上由4个shufflemapTask在执行,但是这个节点的core的数量数2,在远端有4个resultTask等待接收shuffleMapTask的数据进行处理 2、这样可以有两个shufflemaptask可以同时执行,在每一...