2020-06-24 09:10发布
org.springframework.boot spring-boot-starter-parent 1.3.2.RELEASE 2.10.4 1.6.2 org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-logging org.springframework.boot spring-boot-starter-log4j org.springframework.boot spring-boot-starter-test test org.springframework.boot spring-boot-starter-thymeleaf org.scala-lang scala-library ${scala.version} com.fasterxml.jackson.core jackson-databind 2.4.4 org.apache.spark spark-core_2.10 ${spark.version} org.slf4j slf4j-log4j12 log4j log4j org.apache.spark spark-launcher_2.10 ${spark.version} org.apache.spark spark-mllib_2.10 ${spark.version} org.apache.spark spark-streaming_2.10 ${spark.version} junit junit 4.4 test org.specs specs 1.2.5 test org.ansj ansj_seg 5.1.1 org.springframework.boot spring-boot-maven-plugin org.apache.maven.plugins maven-compiler-plugin 1.8 1.8
@Componentpublic class WordCountService implements Serializable { private static final Pattern SPACE = Pattern.compile(" "); @Autowired private transient JavaSparkContext sc; public Map run() { Map result = new HashMap<>(); JavaRDD lines = sc.textFile("C:\\Users\\bd2\\Downloads\\blsmy.txt").cache(); lines.map(new Function() { @Override public String call(String s) throws Exception { System.out.println(s); return s; } }); System.out.println(lines.count()); JavaRDD words = lines.flatMap(new FlatMapFunction() { @Override public Iterable call(String s) throws Exception { return Arrays.asList(SPACE.split(s)); } }); JavaPairRDD ones = words.mapToPair(new PairFunction() { private static final long serialVersionUID = 1L; public Tuple2 call(String s) { return new Tuple2(s, 1); } }); JavaPairRDD counts = ones.reduceByKey(new Function2() { private static final long serialVersionUID = 1L; public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); List> output = counts.collect(); for (Tuple2 tuple : output) { result.put(tuple._1(),tuple._2()); } return result; }}
pom.xml 依赖:
<parent> <groupId>org.springframework.bootgroupId> <artifactId>spring-boot-starter-parentartifactId> <version>1.5.3.RELEASEversion> <relativePath /> parent> <properties> <project.build.sourceEncoding>UTF-8project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8project.reporting.outputEncoding> <java.version>1.8java.version> <scala.version>2.11scala.version> <spark.version>2.2.0spark.version> properties> <dependencies> <dependency> <groupId>org.springframework.bootgroupId> <artifactId>spring-boot-starter-aopartifactId> dependency> <dependency> <groupId>org.springframework.bootgroupId> <artifactId>spring-boot-starter-webartifactId> dependency> <dependency> <groupId>org.apache.sparkgroupId> <artifactId>spark-core_${scala.version}artifactId> <version>${spark.version}version> <exclusions> <exclusion> <groupId>org.slf4jgroupId> <artifactId>slf4j-log4j12artifactId> exclusion> <exclusion> <groupId>log4jgroupId> <artifactId>log4jartifactId> exclusion> exclusions> <scope>providedscope> dependency> <dependency> <groupId>org.apache.sparkgroupId> <artifactId>spark-streaming_${scala.version}artifactId> <version>${spark.version}version> <scope>providedscope> dependency> <dependency> <groupId>org.apache.sparkgroupId> <artifactId>spark-sql_${scala.version}artifactId> <version>${spark.version}version> dependency> <dependency> <groupId>org.springframework.bootgroupId> <artifactId>spring-boot-starter-testartifactId> <scope>testscope> dependency> <dependency> <groupId>org.springframework.bootgroupId> <artifactId>spring-boot-configuration-processorartifactId> <optional>trueoptional> dependency> dependencies>
配置类:
@Configuration@ConfigurationProperties(prefix = "spark")public class SparkContextBean { private String sparkHome = "."; private String appName = "sparkTest"; private String master = "local"; @Bean @ConditionalOnMissingBean(SparkConf.class) public SparkConf sparkConf() throws Exception { SparkConf conf = new SparkConf().setAppName(appName).setMaster(master); return conf; } @Bean @ConditionalOnMissingBean(JavaSparkContext.class) public JavaSparkContext javaSparkContext() throws Exception { return new JavaSparkContext(sparkConf()); } public String getSparkHome() { return sparkHome; } public void setSparkHome(String sparkHome) { this.sparkHome = sparkHome; } public String getAppName() { return appName; } public void setAppName(String appName) { this.appName = appName; } public String getMaster() { return master; } public void setMaster(String master) { this.master = master; } }
实现类:
@Servicepublic class SparkTestService { private static final Logger logger = LoggerFactory.getLogger(SparkTestService.class); private static final Pattern SPACE = Pattern.compile(" "); @Autowired private JavaSparkContext sc; public Map calculateTopTen() { Map result = new HashMap(); JavaRDD lines = sc.textFile("src/test/java/test.txt").cache(); System.out.println(); System.out.println("-------------------------------------------------------"); System.out.println(lines.count()); JavaRDD words = lines.flatMap(str -> Arrays.asList(SPACE.split(str)).iterator()); JavaPairRDD ones = words.mapToPair(str -> new Tuple2(str, 1)); JavaPairRDD counts = ones.reduceByKey((Integer i1, Integer i2) -> (i1 + i2)); JavaPairRDD temp = counts.mapToPair(tuple -> new Tuple2(tuple._2, tuple._1)); JavaPairRDD sorted = temp.sortByKey(false).mapToPair(tuple -> new Tuple2(tuple._2, tuple._1)); System.out.println(); System.out.println("-------------------------------------------------------"); System.out.println(sorted.count()); //List> output = sorted.collect(); //List> output = sorted.take(10); List> output = sorted.top(10); for (Tuple2 tuple : output) { result.put(tuple._1(), tuple._2()); } return result; } /** * 练习demo,熟悉其中API */ public void sparkExerciseDemo() { List data = Lists.newArrayList(1,2,3,4,5,6); JavaRDD rdd01 = sc.parallelize(data); rdd01 = rdd01.map(num ->{ return num * num; }); //data map :1,4,9,16,25,36 logger.info("data map :{}",Joiner.on(",").skipNulls().join(rdd01.collect()).toString()); rdd01 = rdd01.filter(x -> x < 6); //data filter :1,4 logger.info("data filter :{}",Joiner.on(",").skipNulls().join(rdd01.collect()).toString()); rdd01 = rdd01.flatMap( x ->{ Integer[] test = {x,x+1,x+2}; return Arrays.asList(test).iterator(); }); //flatMap :1,2,3,4,5,6 logger.info("flatMap :{}",Joiner.on(",").skipNulls().join(rdd01.collect()).toString()); JavaRDD unionRdd = sc.parallelize(data); rdd01 = rdd01.union(unionRdd); //union :1,2,3,4,5,6,1,2,3,4,5,6 logger.info("union :{}",Joiner.on(",").skipNulls().join(rdd01.collect()).toString()); List result = Lists.newArrayList(); result.add(rdd01.reduce((Integer v1,Integer v2) -> { return v1+v2; })); //reduce :42 logger.info("reduce :{}",Joiner.on(",").skipNulls().join(result).toString()); result.forEach(System.out::print); JavaPairRDD> groupRdd = rdd01.groupBy(x -> { logger.info("======grouby========:{}",x); if (x > 10) return 0; else return 1; }); List>> resul = groupRdd.collect(); //group by key:1 value:1,2,3,4,5,6,1,2,3,4,5,6 resul.forEach(x -> { logger.info("group by key:{} value:{}",x._1,Joiner.on(",").skipNulls().join(x._2).toString()); }); } /** * spark streaming 练习 */ public void sparkStreaming() throws InterruptedException { JavaStreamingContext jsc = new JavaStreamingContext(sc,Durations.seconds(10));//批间隔时间 JavaReceiverInputDStream lines = jsc.receiverStream(new CustomReceiver(StorageLevel.MEMORY_AND_DISK_2())); JavaDStream count = lines.count(); count = count.map(x -> { logger.info("这次批一共多少条数据:{}",x); return x; }); count.print(); jsc.start(); jsc.awaitTermination(); jsc.stop(); } }/** * 自定义接收streaming类 */public class CustomReceiver extends Receiver<String>{ private static Logger logger = LoggerFactory.getLogger(CustomReceiver.class); /** * * @author hz15041240 * @date 2018年1月18日 下午4:37:22 * @version */ private static final long serialVersionUID = 5817531198342629801L; public CustomReceiver(StorageLevel storageLevel) { super(storageLevel); } @Override public void onStart() { new Thread(this::doStart).start(); logger.info("开始启动Receiver..."); //doStart(); } public void doStart() { while(!isStopped()) { int value = RandomUtils.nextInt(100); if(value <20) { try { Thread.sleep(1000); }catch (Exception e) { logger.error("sleep exception",e); restart("sleep exception", e); } } store(String.valueOf(value)); } } @Override public void onStop() { logger.info("即将停止Receiver..."); } }@RestControllerpublic class DemoController { @Autowired private SparkTestService sparkTestService; @RequestMapping("/demo/top10") public Map calculateTopTen() { return sparkTestService.calculateTopTen(); } @RequestMapping("/demo/exercise") public void exercise() { sparkTestService.sparkExerciseDemo(); } @RequestMapping("/demo/stream") public void streamingDemo() throws InterruptedException { sparkTestService.sparkStreaming();
最近看了一些spring书籍,主要都是工作需要,实话说,没有必要买这么多书,每个主题一本就足够了,其他的补充可以通过项目实战和上网看官网或者博客补充。说是推荐,其实只是一些简单读后感想而已,每本书都有它的价值,即使有些写得不好,也很难否定作者的努...
先看看SpringBoot的主配置类:里面有一个main方法运行了一个run()方法,在run方法中必须要传入一个被@SpringBootApplication注解的类。@SpringBootApplicationSpringBoot应用标注在某个类上说明这个类是SpringBoot的主配置类,SpringBoot就会运行这个类的main...
spring-boot-starter-web 嵌入tomcat和web开发需要servlet与jsp支持spring-boot-starter-data-jpa 数据库支持spring-boot-starter-data-redis redis数据库支持spring-boot-starter-data-solr solr支持mybatis-spring-boot-starter 第三方的myba......
应该说是过时了。我们起初的微服务架构是符合当时的情况的,也解决了当时的性能问题还有目的地之间孤立实现。尽管如此,我们没有准备好服务激增的改变准备。当需要批量更新时,我们缺乏适当的工具来测试和部署微服务。结果就是,我们的研发效率因此出现了滑坡...
springboot的学习思路是首先掌握spring和springmvc,有了这两个框架作为基础,springboot的学习是非常简单的。springboot是spring族系中具有革命性变革的一门技术,springboot的主要设计目的是为了让开发者快速构建spirng环境,并且封装了大量的模板化配置。让...
自己随意编写一份测试数据,所用的测试数据如下,需求是按照第一列的字母分组,然后按照第二列数据取出每一组内前N个数据,后面我分别列出了我使用的三种方案来实现该需求,不同方案在不同的场景下会有各自的优势 a 25b 36c 24d 45e 60a 33b 26c 47d 43e 62a...
1. Spark 概述1.1. 什么是 Spark(官网:http://spark.apache.org)spark 中文官网:http://spark.apachecn.orgSpark 是一种快速、通用、可扩展的大数据分析引擎,2009 年诞生于加州大学伯克利分校AMPLab,2010 年开源,2013 年 6 月成为 Apache 孵化项目,20...
搭建高可用模式用的协同处理框架。
多数是版本不对、、、、、、引用错误,,,等
目前,还是选择saprkflink还有成长的空间
一、图概念术语1.1 基本概念图是由顶点集合(vertex)及顶点间的关系集合(边edge)组成的一种数据结构。这里的图并非指代数中的图。图可以对事物以及事物之间的关系建模,图可以用来表示自然发生的连接数据,如:社交网络、互联网web页面常用的应用有:在地图...
该模式被称为 Local[N] 模式,是用单机的多个线程来模拟 Spark 分布式计算,通常用来验证开发出来的应用程序逻辑上有没有问题其中N代表可以使用 N 个线程,每个线程拥有一个 core 。如果不指定 N,则默认是1个线程(该线程有1个 core )。如果是 local[*],则...
最多设置5个标签!
pom.xml 依赖:
<parent> <groupId>org.springframework.bootgroupId> <artifactId>spring-boot-starter-parentartifactId> <version>1.5.3.RELEASEversion> <relativePath /> parent> <properties> <project.build.sourceEncoding>UTF-8project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8project.reporting.outputEncoding> <java.version>1.8java.version> <scala.version>2.11scala.version> <spark.version>2.2.0spark.version> properties> <dependencies> <dependency> <groupId>org.springframework.bootgroupId> <artifactId>spring-boot-starter-aopartifactId> dependency> <dependency> <groupId>org.springframework.bootgroupId> <artifactId>spring-boot-starter-webartifactId> dependency> <dependency> <groupId>org.apache.sparkgroupId> <artifactId>spark-core_${scala.version}artifactId> <version>${spark.version}version> <exclusions> <exclusion> <groupId>org.slf4jgroupId> <artifactId>slf4j-log4j12artifactId> exclusion> <exclusion> <groupId>log4jgroupId> <artifactId>log4jartifactId> exclusion> exclusions> <scope>providedscope> dependency> <dependency> <groupId>org.apache.sparkgroupId> <artifactId>spark-streaming_${scala.version}artifactId> <version>${spark.version}version> <scope>providedscope> dependency> <dependency> <groupId>org.apache.sparkgroupId> <artifactId>spark-sql_${scala.version}artifactId> <version>${spark.version}version> dependency> <dependency> <groupId>org.springframework.bootgroupId> <artifactId>spring-boot-starter-testartifactId> <scope>testscope> dependency> <dependency> <groupId>org.springframework.bootgroupId> <artifactId>spring-boot-configuration-processorartifactId> <optional>trueoptional> dependency> dependencies>
配置类:
@Configuration@ConfigurationProperties(prefix = "spark")public class SparkContextBean { private String sparkHome = "."; private String appName = "sparkTest"; private String master = "local"; @Bean @ConditionalOnMissingBean(SparkConf.class) public SparkConf sparkConf() throws Exception { SparkConf conf = new SparkConf().setAppName(appName).setMaster(master); return conf; } @Bean @ConditionalOnMissingBean(JavaSparkContext.class) public JavaSparkContext javaSparkContext() throws Exception { return new JavaSparkContext(sparkConf()); } public String getSparkHome() { return sparkHome; } public void setSparkHome(String sparkHome) { this.sparkHome = sparkHome; } public String getAppName() { return appName; } public void setAppName(String appName) { this.appName = appName; } public String getMaster() { return master; } public void setMaster(String master) { this.master = master; } }
实现类:
@Servicepublic class SparkTestService { private static final Logger logger = LoggerFactory.getLogger(SparkTestService.class); private static final Pattern SPACE = Pattern.compile(" "); @Autowired private JavaSparkContext sc; public Map calculateTopTen() {
Map result = new HashMap();
JavaRDD lines = sc.textFile( "src/test/java/test.txt").cache();
System.out.println();
System.out.println("-------------------------------------------------------");
System.out.println(lines.count());
JavaRDD words = lines.flatMap(str -> Arrays.asList(SPACE.split(str)).iterator());
JavaPairRDD ones = words.mapToPair(str -> new Tuple2(str, 1));
JavaPairRDD counts = ones.reduceByKey((Integer i1, Integer i2) -> (i1 + i2));
JavaPairRDD temp = counts.mapToPair(tuple -> new Tuple2(tuple._2, tuple._1));
JavaPairRDD sorted = temp.sortByKey( false).mapToPair(tuple -> new Tuple2(tuple._2, tuple._1));
System.out.println();
System.out.println( "-------------------------------------------------------");
System.out.println(sorted.count());
//List> output = sorted.collect();
//List> output = sorted.take(10);
List> output = sorted.top( 10);
for (Tuple2 tuple : output) {
result.put(tuple._1(), tuple._2());
} return result;
}
/**
* 练习demo,熟悉其中API
*/
public void sparkExerciseDemo() {
List data = Lists.newArrayList( 1,2,3,4,5,6);
JavaRDD rdd01 = sc.parallelize(data);
rdd01 = rdd01.map(num ->{ return num * num;
}); //data map :1,4,9,16,25,36
logger.info("data map :{}",Joiner.on(",").skipNulls().join(rdd01.collect()).toString());
rdd01 = rdd01.filter(x -> x < 6);
//data filter :1,4
logger.info("data filter :{}",Joiner.on(",").skipNulls().join(rdd01.collect()).toString());
rdd01 = rdd01.flatMap( x ->{
Integer[] test = {x,x+1,x+2}; return Arrays.asList(test).iterator();
});
//flatMap :1,2,3,4,5,6
logger.info("flatMap :{}",Joiner.on(",").skipNulls().join(rdd01.collect()).toString());
JavaRDD unionRdd = sc.parallelize(data);
rdd01 = rdd01.union(unionRdd);
//union :1,2,3,4,5,6,1,2,3,4,5,6
logger.info("union :{}",Joiner.on(",").skipNulls().join(rdd01.collect()).toString());
List result = Lists.newArrayList();
result.add(rdd01.reduce((Integer v1,Integer v2) -> { return v1+v2;
}));
//reduce :42
logger.info("reduce :{}",Joiner.on(",").skipNulls().join(result).toString());
result.forEach(System.out::print);
JavaPairRDD> groupRdd = rdd01.groupBy(x -> {
logger.info( "======grouby========:{}",x); if (x > 10) return 0; else return 1;
});
List>> resul = groupRdd.collect();
//group by key:1 value:1,2,3,4,5,6,1,2,3,4,5,6
resul.forEach(x -> {
logger.info("group by key:{} value:{}",x._1,Joiner.on(",").skipNulls().join(x._2).toString());
});
}
/**
* spark streaming 练习
*/
public void sparkStreaming() throws InterruptedException {
JavaStreamingContext jsc = new JavaStreamingContext(sc,Durations.seconds(10));//批间隔时间
JavaReceiverInputDStream lines = jsc.receiverStream( new CustomReceiver(StorageLevel.MEMORY_AND_DISK_2()));
JavaDStream count = lines.count();
count = count.map(x -> {
logger.info( "这次批一共多少条数据:{}",x); return x;
});
count.print();
jsc.start();
jsc.awaitTermination();
jsc.stop();
}
}/**
* 自定义接收streaming类
*/public class CustomReceiver extends Receiver<String>{
private static Logger logger = LoggerFactory.getLogger(CustomReceiver.class);
/**
*
* @author hz15041240
* @date 2018年1月18日 下午4:37:22
* @version
*/
private static final long serialVersionUID = 5817531198342629801L;
public CustomReceiver(StorageLevel storageLevel) { super(storageLevel);
}
@Override
public void onStart() { new Thread(this::doStart).start();
logger.info("开始启动Receiver..."); //doStart();
}
public void doStart() { while(!isStopped()) { int value = RandomUtils.nextInt(100); if(value <20) { try {
Thread.sleep(1000);
}catch (Exception e) {
logger.error("sleep exception",e);
restart("sleep exception", e);
}
}
store(String.valueOf(value));
}
}
@Override
public void onStop() {
logger.info("即将停止Receiver...");
}
}@RestControllerpublic class DemoController { @Autowired
private SparkTestService sparkTestService;
@RequestMapping("/demo/top10") public Map calculateTopTen() { return sparkTestService.calculateTopTen();
}
@RequestMapping("/demo/exercise") public void exercise() {
sparkTestService.sparkExerciseDemo();
} @RequestMapping("/demo/stream") public void streamingDemo() throws InterruptedException {
sparkTestService.sparkStreaming();
相关问题推荐
最近看了一些spring书籍,主要都是工作需要,实话说,没有必要买这么多书,每个主题一本就足够了,其他的补充可以通过项目实战和上网看官网或者博客补充。说是推荐,其实只是一些简单读后感想而已,每本书都有它的价值,即使有些写得不好,也很难否定作者的努...
先看看SpringBoot的主配置类:里面有一个main方法运行了一个run()方法,在run方法中必须要传入一个被@SpringBootApplication注解的类。@SpringBootApplicationSpringBoot应用标注在某个类上说明这个类是SpringBoot的主配置类,SpringBoot就会运行这个类的main...
spring-boot-starter-web 嵌入tomcat和web开发需要servlet与jsp支持spring-boot-starter-data-jpa 数据库支持spring-boot-starter-data-redis redis数据库支持spring-boot-starter-data-solr solr支持mybatis-spring-boot-starter 第三方的myba......
应该说是过时了。我们起初的微服务架构是符合当时的情况的,也解决了当时的性能问题还有目的地之间孤立实现。尽管如此,我们没有准备好服务激增的改变准备。当需要批量更新时,我们缺乏适当的工具来测试和部署微服务。结果就是,我们的研发效率因此出现了滑坡...
springboot的学习思路是首先掌握spring和springmvc,有了这两个框架作为基础,springboot的学习是非常简单的。springboot是spring族系中具有革命性变革的一门技术,springboot的主要设计目的是为了让开发者快速构建spirng环境,并且封装了大量的模板化配置。让...
自己随意编写一份测试数据,所用的测试数据如下,需求是按照第一列的字母分组,然后按照第二列数据取出每一组内前N个数据,后面我分别列出了我使用的三种方案来实现该需求,不同方案在不同的场景下会有各自的优势 a 25b 36c 24d 45e 60a 33b 26c 47d 43e 62a...
1. Spark 概述1.1. 什么是 Spark(官网:http://spark.apache.org)spark 中文官网:http://spark.apachecn.orgSpark 是一种快速、通用、可扩展的大数据分析引擎,2009 年诞生于加州大学伯克利分校AMPLab,2010 年开源,2013 年 6 月成为 Apache 孵化项目,20...
搭建高可用模式用的协同处理框架。
多数是版本不对、、、、、、引用错误,,,等
目前,还是选择saprkflink还有成长的空间
一、图概念术语1.1 基本概念图是由顶点集合(vertex)及顶点间的关系集合(边edge)组成的一种数据结构。这里的图并非指代数中的图。图可以对事物以及事物之间的关系建模,图可以用来表示自然发生的连接数据,如:社交网络、互联网web页面常用的应用有:在地图...
该模式被称为 Local[N] 模式,是用单机的多个线程来模拟 Spark 分布式计算,通常用来验证开发出来的应用程序逻辑上有没有问题其中N代表可以使用 N 个线程,每个线程拥有一个 core 。如果不指定 N,则默认是1个线程(该线程有1个 core )。如果是 local[*],则...