spark系列3:spark入门编程与介绍

2021-03-30 20:25发布

3. Spark 入门
目标

通过理解 Spark 小案例, 来理解 Spark 应用

理解编写 Spark 程序的两种常见方式

spark-shell

spark-submit

Spark 官方提供了两种方式编写代码, 都比较重要, 分别如下

spark-shell
Spark shell 是 Spark 提供的一个基于 Scala 语言的交互式解释器, 类似于 Scala 提供的交互式解释器, Spark shell 也可以直接在 Shell 中编写代码执行
这种方式也比较重要, 因为一般的数据分析任务可能需要探索着进行, 不是一蹴而就的, 使用 Spark shell 先进行探索, 当代码稳定以后, 使用独立应用的方式来提交任务, 这样是一个比较常见的流程

spark-submit
Spark submit 是一个命令, 用于提交 Scala 编写的基于 Spark 框架, 这种提交方式常用作于在集群中运行任务

3.1. Spark shell 的方式编写 WordCount
概要

在初始阶段工作可以全部使用 Spark shell 完成, 它可以加快原型开发, 使得迭代更快, 很快就能看到想法的结果. 但是随着项目规模越来越大, 这种方式不利于代码维护, 所以可以编写独立应用. 一般情况下, 在探索阶段使用 Spark shell, 在最终使用独立应用的方式编写代码并使用 Maven 打包上线运行

接下来使用 Spark shell 的方式编写一个 WordCount

     
Spark shell 简介

启动 Spark shell
进入 Spark 安装目录后执行 spark-shell --master master 就可以提交Spark 任务

Spark shell 的原理是把每一行 Scala 代码编译成类, 最终交由 Spark 执行

     
Master地址的设置

Master 的地址可以有如下几种设置方式

Table 3. master
地址    解释
local[N]

使用 N 条 Worker 线程在本地运行

spark://host:port

在 Spark standalone 中运行, 指定 Spark 集群的 Master 地址, 端口默认为 7077

mesos://host:port

在 Apache Mesos 中运行, 指定 Mesos 的地址

yarn

在 Yarn 中运行, Yarn 的地址由环境变量 HADOOP_CONF_DIR 来指定

Step 1 准备文件

在 Node01 中创建文件 /export/data/wordcount.txt

hadoop spark flume
spark hadoop
flume hadoop
Step 2 启动 Spark shell

cd /export/servers/spark
bin/spark-shell --master local[2]
Step 3 执行如下代码

scala> val sourceRdd = sc.textFile("file:///export/data/wordcount.txt")
sourceRdd: org.apache.spark.rdd.RDD[String] = file:///export/data/wordcount.txt MapPartitionsRDD[1] at textFile at <console>:24
 
scala> val flattenCountRdd = sourceRdd.flatMap(_.split(" ")).map((_, 1))
flattenCountRdd: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[3] at map at <console>:26
 
scala> val aggCountRdd = flattenCountRdd.reduceByKey(_ + _)
aggCountRdd: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:28
 
scala> val result = aggCountRdd.collect
result: Array[(String, Int)] = Array((spark,2), (hadoop,3), (flume,2))
     
sc

上述代码中 sc 变量指的是 SparkContext, 是 Spark 程序的上下文和入口

正常情况下我们需要自己创建, 但是如果使用 Spark shell 的话, Spark shell 会帮助我们创建, 并且以变量 sc 的形式提供给我们调用

运行流程

flatMap(_.split(" ")) 将数据转为数组的形式, 并展平为多个数据

map_, 1 将数据转换为元组的形式

reduceByKey(_ + _) 计算每个 Key 出现的次数

总结

使用 Spark shell 可以快速验证想法

Spark 框架下的代码非常类似 Scala 的函数式调用

3.2. 读取 HDFS 上的文件
目标

理解 Spark 访问 HDFS 的两种方式

Step 1 上传文件到 HDFS 中

cd /export/data
hdfs dfs -mkdir /dataset
hdfs dfs -put wordcount.txt /dataset/
Step 2 在 Spark shell 中访问 HDFS

val sourceRdd = sc.textFile("hdfs://node01:8020/dataset/wordcount.txt")
val flattenCountRdd = sourceRdd.flatMap(_.split(" ")).map((_, 1))
val aggCountRdd = flattenCountRdd.reduceByKey(_ + _)
val result = aggCountRdd.collect
     
如何使得 Spark 可以访问 HDFS?

可以通过指定 HDFS 的 NameNode 地址直接访问, 类似于上面代码中的 sc.textFile("hdfs://node01:8020/dataset/wordcount.txt")

也可以通过向 Spark 配置 Hadoop 的路径, 来通过路径直接访问

1.在 spark-env.sh 中添加 Hadoop 的配置路径

export HADOOP_CONF_DIR="/etc/hadoop/conf"

2.在配置过后, 可以直接使用 hdfs:///路径 的形式直接访问

3.在配置过后, 也可以直接使用路径访问

3.4. 编写独立应用提交 Spark 任务
目标

理解如何编写 Spark 独立应用

理解 WordCount 的代码流程

Step 1 创建工程

创建 IDEA 工程

 →  → 

 →  → 

增加 Scala 支持

右键点击工程目录 

选择增加框架支持 

选择 Scala 添加框架支持

Step 2 编写 Maven 配置文件 pom.xml

工程根目录下增加文件 pom.xml

添加以下内容

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
 
    <groupId>cn.itcast</groupId>
    <artifactId>spark</artifactId>
    <version>0.1.0</version>
 
    <properties>
        <scala.version>2.11.8</scala.version>
        <spark.version>2.2.0</spark.version>
        <slf4j.version>1.7.16</slf4j.version>
        <log4j.version>1.2.17</log4j.version>
    </properties>
 
    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.6.0</version>
        </dependency>
 
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>jcl-over-slf4j</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>${log4j.version}</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.10</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>
 
    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
 
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
 
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.0</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
 
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass></mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
因为在 pom.xml 中指定了 Scala 的代码目录, 所以创建目录 src/main/scala 和目录 src/test/scala

创建 Scala object WordCount

Step 3 编写代码

object WordCounts {
 
  def main(args: Array[String]): Unit = {
    // 1. 创建 Spark Context
    val conf = new SparkConf().setMaster("local[2]")
    val sc: SparkContext = new SparkContext(conf)
 
    // 2. 读取文件并计算词频
    val source: RDD[String] = sc.textFile("hdfs://node01:8020/dataset/wordcount.txt", 2)
    val words: RDD[String] = source.flatMap { line => line.split(" ") }
    val wordsTuple: RDD[(String, Int)] = words.map { word => (word, 1) }
    val wordsCount: RDD[(String, Int)] = wordsTuple.reduceByKey { (x, y) => x + y }
 
    // 3. 查看执行结果
    println(wordsCount.collect)
  }
}
     和 Spark shell 中不同, 独立应用需要手动创建 SparkContext
Step 4 运行

运行 Spark 独立应用大致有两种方式, 一种是直接在 IDEA 中调试, 另一种是可以在提交至 Spark 集群中运行, 而 Spark 又支持多种集群, 不同的集群有不同的运行方式

直接在 IDEA 中运行 Spark 程序

在工程根目录创建文件夹和文件

修改读取文件的路径为`dataset/wordcount.txt`

右键运行 Main 方法

     
spark-submit 命令

spark-submit [options] <app jar> <app options>
app jar 程序 Jar 包

app options 程序 Main 方法传入的参数

options 提交应用的参数, 可以有如下选项

Table 4. options 可选参数
参数

解释

--master <url>

同 Spark shell 的 Master, 可以是spark, yarn, mesos, kubernetes等 URL

--deploy-mode <client or cluster>

Driver 运行位置, 可选 Client 和 Cluster, 分别对应运行在本地和集群(Worker)中

--class <class full name>

Jar 中的 Class, 程序入口

--jars <dependencies path>

依赖 Jar 包的位置

--driver-memory <memory size>

Driver 程序运行所需要的内存, 默认 512M

--executor-memory <memory size>

Executor 的内存大小, 默认 1G

提交到 Spark Standalone 集群中运行

在 IDEA 中使用 Maven 打包

拷贝打包的 Jar 包上传到 node01 中

在 node01 中 Jar 包所在的目录执行如下命令

spark-submit --master spark://node01:7077 \
--class cn.itcast.spark.WordCounts \
original-spark-0.1.0.jar
     
如何在任意目录执行 spark-submit 命令?

在 /etc/profile 中写入如下

export SPARK_BIN=/export/servers/spark/bin
export PATH=$PATH:$SPARK_BIN
执行 /etc/profile 使得配置生效

source /etc/profile
总结: 三种不同的运行方式

Spark shell

作用

一般用作于探索阶段, 通过 Spark shell 快速的探索数据规律

当探索阶段结束后, 代码确定以后, 通过独立应用的形式上线运行

功能

Spark shell 可以选择在集群模式下运行, 还是在线程模式下运行

Spark shell 是一个交互式的运行环境, 已经内置好了 SparkContext 和 SparkSession 对象, 可以直接使用

Spark shell 一般运行在集群中安装有 Spark client 的服务器中, 所以可以自有的访问 HDFS

本地运行

作用

在编写独立应用的时候, 每次都要提交到集群中还是不方便, 另外很多时候需要调试程序, 所以在 IDEA 中直接运行会比较方便, 无需打包上传了

功能

因为本地运行一般是在开发者的机器中运行, 而不是集群中, 所以很难直接使用 HDFS 等集群服务, 需要做一些本地配置, 用的比较少

需要手动创建 SparkContext

集群运行

作用

正式环境下比较多见, 独立应用编写好以后, 打包上传到集群中, 使用`spark-submit`来运行, 可以完整的使用集群资源

功能

同时在集群中通过`spark-submit`来运行程序也可以选择是用线程模式还是集群模式

集群中运行是全功能的, HDFS 的访问, Hive 的访问都比较方便

需要手动创建 SparkContext
————————————————

作者:涤生手记

链接:https://blog.csdn.net/qq_26442553/article/details/114824942?spm=1001.2014.3001.5502

来源:CSDN

著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。