SpringBoot】springboot如何整合spark进行开发

2020-06-24 09:10发布

2条回答
小猴哥哥
2楼 · 2020-06-24 15:39

        org.springframework.boot
        spring-boot-starter-parent
        1.3.2.RELEASE
         
    

    
        2.10.4
        1.6.2
    

    

        
            org.springframework.boot
            spring-boot-starter-web
            
                
                    org.springframework.boot
                    spring-boot-starter-logging
                
            
        
        
            org.springframework.boot
            spring-boot-starter-log4j
        
        
            org.springframework.boot
            spring-boot-starter-test
            test
        
        
            org.springframework.boot
            spring-boot-starter-thymeleaf
        

        
            org.scala-lang
            scala-library
            ${scala.version}
        
        
            com.fasterxml.jackson.core
            jackson-databind
            2.4.4
        
        
            org.apache.spark
            spark-core_2.10
            ${spark.version}
            
                
                    org.slf4j
                    slf4j-log4j12
                
                
                    log4j
                    log4j
                
            
        
        
            org.apache.spark
            spark-launcher_2.10
            ${spark.version}
        
        
            org.apache.spark
            spark-mllib_2.10
            ${spark.version}
        
        
            org.apache.spark
            spark-streaming_2.10
            ${spark.version}
        
        
            junit
            junit
            4.4
            test
        
        
            org.specs
            specs
            1.2.5
            test
        

        
            org.ansj
            ansj_seg
            5.1.1
        

    

    
        
            
                org.springframework.boot
                spring-boot-maven-plugin
            
            
                org.apache.maven.plugins
                maven-compiler-plugin
                
                    1.8
                    1.8
                
            

        
    



@Componentpublic class WordCountService implements Serializable {
    private static final Pattern SPACE = Pattern.compile(" ");

    @Autowired
    private transient JavaSparkContext sc;

    public Map run() {
        Map result = new HashMap<>();
        JavaRDD lines = sc.textFile("C:\\Users\\bd2\\Downloads\\blsmy.txt").cache();

        lines.map(new Function() {
            @Override
            public String call(String s) throws Exception {
                System.out.println(s);
                return s;
            }
        });
        
        System.out.println(lines.count());

        JavaRDD words = lines.flatMap(new FlatMapFunction() {


            @Override
            public Iterable call(String s) throws Exception {
                return Arrays.asList(SPACE.split(s));
            }
        });

        JavaPairRDD ones = words.mapToPair(new PairFunction() {

            private static final long serialVersionUID = 1L;

            public Tuple2 call(String s) {
                return new Tuple2(s, 1);
            }
        });

        JavaPairRDD counts = ones.reduceByKey(new Function2() {

            private static final long serialVersionUID = 1L;

            public Integer call(Integer i1, Integer i2) {
                return i1 + i2;
            }
        });

        List> output = counts.collect();
        for (Tuple2 tuple : output) {
            result.put(tuple._1(),tuple._2());

        }

        return result;

    }}


征戰撩四汸
3楼 · 2021-11-15 16:14

pom.xml 依赖:

<parent> <groupId>org.springframework.bootgroupId> <artifactId>spring-boot-starter-parentartifactId> <version>1.5.3.RELEASEversion> <relativePath /> parent> <properties> <project.build.sourceEncoding>UTF-8project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8project.reporting.outputEncoding> <java.version>1.8java.version> <scala.version>2.11scala.version> <spark.version>2.2.0spark.version> properties> <dependencies> <dependency> <groupId>org.springframework.bootgroupId> <artifactId>spring-boot-starter-aopartifactId> dependency> <dependency> <groupId>org.springframework.bootgroupId> <artifactId>spring-boot-starter-webartifactId> dependency> <dependency> <groupId>org.apache.sparkgroupId> <artifactId>spark-core_${scala.version}artifactId> <version>${spark.version}version> <exclusions> <exclusion> <groupId>org.slf4jgroupId> <artifactId>slf4j-log4j12artifactId> exclusion> <exclusion> <groupId>log4jgroupId> <artifactId>log4jartifactId> exclusion> exclusions> <scope>providedscope> dependency> <dependency> <groupId>org.apache.sparkgroupId> <artifactId>spark-streaming_${scala.version}artifactId> <version>${spark.version}version> <scope>providedscope> dependency> <dependency> <groupId>org.apache.sparkgroupId> <artifactId>spark-sql_${scala.version}artifactId> <version>${spark.version}version> dependency> <dependency> <groupId>org.springframework.bootgroupId> <artifactId>spring-boot-starter-testartifactId> <scope>testscope> dependency> <dependency> <groupId>org.springframework.bootgroupId> <artifactId>spring-boot-configuration-processorartifactId> <optional>trueoptional> dependency> dependencies>

配置类:

@Configuration@ConfigurationProperties(prefix = "spark")public class SparkContextBean {    private String sparkHome = ".";    private String appName = "sparkTest";    private String master = "local";    @Bean    @ConditionalOnMissingBean(SparkConf.class)    public SparkConf sparkConf() throws Exception {   SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);   return conf;    }    @Bean    @ConditionalOnMissingBean(JavaSparkContext.class)    public JavaSparkContext javaSparkContext() throws Exception {   return new JavaSparkContext(sparkConf());    }    public String getSparkHome() {  return sparkHome;    }    public void setSparkHome(String sparkHome) {   this.sparkHome = sparkHome;    }    public String getAppName() {   return appName;    }    public void setAppName(String appName) {   this.appName = appName;    }    public String getMaster() {   return master;    }    public void setMaster(String master) {   this.master = master;    } }

实现类:

@Servicepublic class SparkTestService {        private static final Logger logger = LoggerFactory.getLogger(SparkTestService.class);        private static final Pattern SPACE = Pattern.compile(" ");    @Autowired    private JavaSparkContext sc;            public Map calculateTopTen() {   Map result = new HashMap();   JavaRDD lines = sc.textFile("src/test/java/test.txt").cache();   System.out.println();   System.out.println("-------------------------------------------------------");   System.out.println(lines.count());   JavaRDD words = lines.flatMap(str -> Arrays.asList(SPACE.split(str)).iterator());   JavaPairRDD ones = words.mapToPair(str -> new Tuple2(str, 1));   JavaPairRDD counts = ones.reduceByKey((Integer i1, Integer i2) -> (i1 + i2));   JavaPairRDD temp = counts.mapToPair(tuple -> new Tuple2(tuple._2, tuple._1));   JavaPairRDD sorted = temp.sortByKey(false).mapToPair(tuple -> new Tuple2(tuple._2, tuple._1));   System.out.println();   System.out.println("-------------------------------------------------------");   System.out.println(sorted.count());   //List> output = sorted.collect();   //List> output = sorted.take(10);   List> output = sorted.top(10);   for (Tuple2 tuple : output) {       result.put(tuple._1(), tuple._2());   }   return result;    }        /**     * 练习demo,熟悉其中API     */    public void sparkExerciseDemo() {   List data = Lists.newArrayList(1,2,3,4,5,6);   JavaRDD rdd01 = sc.parallelize(data);   rdd01 = rdd01.map(num ->{      return num * num;   });   //data map :1,4,9,16,25,36   logger.info("data map :{}",Joiner.on(",").skipNulls().join(rdd01.collect()).toString());   rdd01 = rdd01.filter(x -> x < 6);   //data filter :1,4   logger.info("data filter :{}",Joiner.on(",").skipNulls().join(rdd01.collect()).toString());   rdd01 = rdd01.flatMap( x ->{      Integer[] test = {x,x+1,x+2};      return Arrays.asList(test).iterator();   });   //flatMap :1,2,3,4,5,6   logger.info("flatMap :{}",Joiner.on(",").skipNulls().join(rdd01.collect()).toString());   JavaRDD unionRdd = sc.parallelize(data);   rdd01 = rdd01.union(unionRdd);   //union :1,2,3,4,5,6,1,2,3,4,5,6   logger.info("union :{}",Joiner.on(",").skipNulls().join(rdd01.collect()).toString());   List result = Lists.newArrayList();   result.add(rdd01.reduce((Integer v1,Integer v2) -> {       return v1+v2;   }));    //reduce :42   logger.info("reduce :{}",Joiner.on(",").skipNulls().join(result).toString());   result.forEach(System.out::print);   JavaPairRDD> groupRdd  = rdd01.groupBy(x -> {       logger.info("======grouby========:{}",x);       if (x > 10) return 0;       else  return 1;   });   List>> resul = groupRdd.collect();   //group by  key:1 value:1,2,3,4,5,6,1,2,3,4,5,6   resul.forEach(x -> {        logger.info("group by  key:{} value:{}",x._1,Joiner.on(",").skipNulls().join(x._2).toString());   });    }        /**     * spark streaming 练习     */    public void sparkStreaming() throws InterruptedException {   JavaStreamingContext jsc = new JavaStreamingContext(sc,Durations.seconds(10));//批间隔时间   JavaReceiverInputDStream lines = jsc.receiverStream(new CustomReceiver(StorageLevel.MEMORY_AND_DISK_2()));   JavaDStream count =  lines.count();   count = count.map(x -> {       logger.info("这次批一共多少条数据:{}",x);      return x;   });   count.print();   jsc.start();   jsc.awaitTermination();   jsc.stop();    } }/** * 自定义接收streaming类 */public class CustomReceiver extends Receiver<String>{        private static Logger logger = LoggerFactory.getLogger(CustomReceiver.class);            /**     *       * @author hz15041240     * @date 2018年1月18日 下午4:37:22     * @version         */    private static final long serialVersionUID = 5817531198342629801L;        public CustomReceiver(StorageLevel storageLevel) {  super(storageLevel);    }        @Override    public void onStart() {   new Thread(this::doStart).start();   logger.info("开始启动Receiver...");   //doStart();    }        public void doStart() {   while(!isStopped()) {       int value = RandomUtils.nextInt(100);       if(value <20) {      try {          Thread.sleep(1000);      }catch (Exception e) {          logger.error("sleep exception",e);          restart("sleep exception", e);      }       }       store(String.valueOf(value));   }    }            @Override    public void onStop() {   logger.info("即将停止Receiver...");    } }@RestControllerpublic class DemoController { @Autowired private SparkTestService sparkTestService; @RequestMapping("/demo/top10") public Map calculateTopTen() { return sparkTestService.calculateTopTen(); } @RequestMapping("/demo/exercise") public void exercise() { sparkTestService.sparkExerciseDemo(); } @RequestMapping("/demo/stream") public void streamingDemo() throws InterruptedException { sparkTestService.sparkStreaming();

相关问题推荐

  • 回答 3

    最近看了一些spring书籍,主要都是工作需要,实话说,没有必要买这么多书,每个主题一本就足够了,其他的补充可以通过项目实战和上网看官网或者博客补充。说是推荐,其实只是一些简单读后感想而已,每本书都有它的价值,即使有些写得不好,也很难否定作者的努...

  • 回答 8

    先看看SpringBoot的主配置类:里面有一个main方法运行了一个run()方法,在run方法中必须要传入一个被@SpringBootApplication注解的类。@SpringBootApplicationSpringBoot应用标注在某个类上说明这个类是SpringBoot的主配置类,SpringBoot就会运行这个类的main...

  • 回答 1
    已采纳

    spring-boot-starter-web 嵌入tomcat和web开发需要servlet与jsp支持spring-boot-starter-data-jpa 数据库支持spring-boot-starter-data-redis redis数据库支持spring-boot-starter-data-solr solr支持mybatis-spring-boot-starter 第三方的myba......

  • 回答 1
    已采纳

    应该说是过时了。我们起初的微服务架构是符合当时的情况的,也解决了当时的性能问题还有目的地之间孤立实现。尽管如此,我们没有准备好服务激增的改变准备。当需要批量更新时,我们缺乏适当的工具来测试和部署微服务。结果就是,我们的研发效率因此出现了滑坡...

  • 回答 10
    已采纳

    springboot的学习思路是首先掌握spring和springmvc,有了这两个框架作为基础,springboot的学习是非常简单的。springboot是spring族系中具有革命性变革的一门技术,springboot的主要设计目的是为了让开发者快速构建spirng环境,并且封装了大量的模板化配置。让...

  • 回答 1

    自己随意编写一份测试数据,所用的测试数据如下,需求是按照第一列的字母分组,然后按照第二列数据取出每一组内前N个数据,后面我分别列出了我使用的三种方案来实现该需求,不同方案在不同的场景下会有各自的优势 a 25b 36c 24d 45e 60a 33b 26c 47d 43e 62a...

  • 回答 9

    1. Spark 概述1.1. 什么是 Spark(官网:http://spark.apache.org)spark 中文官网:http://spark.apachecn.orgSpark 是一种快速、通用、可扩展的大数据分析引擎,2009 年诞生于加州大学伯克利分校AMPLab,2010 年开源,2013 年 6 月成为 Apache 孵化项目,20...

  • 回答 2

    搭建高可用模式用的协同处理框架。

  • 回答 1

    多数是版本不对、、、、、、引用错误,,,等

  • 回答 1

    目前,还是选择saprkflink还有成长的空间

  • 回答 1

    一、图概念术语1.1 基本概念图是由顶点集合(vertex)及顶点间的关系集合(边edge)组成的一种数据结构。这里的图并非指代数中的图。图可以对事物以及事物之间的关系建模,图可以用来表示自然发生的连接数据,如:社交网络、互联网web页面常用的应用有:在地图...

  • 回答 4

    该模式被称为 Local[N] 模式,是用单机的多个线程来模拟 Spark 分布式计算,通常用来验证开发出来的应用程序逻辑上有没有问题其中N代表可以使用 N 个线程,每个线程拥有一个 core 。如果不指定 N,则默认是1个线程(该线程有1个 core )。如果是 local[*],则...

没有解决我的问题,去提问