大数据学习笔记4-Spark

Sabthever

Hadoop4

十.Spark

这边用2.3.4

(一)前期

  • MapReduce速度慢,因此用Spark速度比MapRduce快100倍,实际上10倍差不多
  • 计算引擎,不启动Hadoop也能干活
  • 分布式计算引擎
  • 离线数据分析

1.使用原因

  • MapReduce编程模型的局限性
    • 繁杂
      • 只有Map和Reduce两个操作,复杂的逻辑需要大量的样板代码
    • 处理效率低:、
      • Map中间结果写磁盘,Reduce写HDFS,多个Map通过HDFS交换数据
      • 任务调度与启动开销大
    • 不适合迭代处理、交互式处理和流式处理
  • Spark是类Hadoop MapReduce的通用并行框架
    • Job中间输出结果可以保存在内存,不再需要读写HDFS,优先占内存
    • 比MapReduce平均快10倍以上

2.Spark简介

  • 加州大学伯克利分校AMP实验室,基于内存的分布式计算框架
  • 发展
    • 2014.5正式Spark 1.0
    • 2016 1.6、2.x版本

3.Spark优势

image-20240726135746868

  • spark streaming伪实时流,不怎么用
  • 三种提交方式
    • 本地提交
    • YARN提交:中间还有两种

4.Spark技术栈

image-20240726140808334

(二)内容介绍

1.Spark运行架构

image-20240726145529956

  • 在Spark的Driver节点中,主要运行我们提交的程序,程序的入口就是SparkContext。Driver节点会加载Spark的执行环境SparkEnv, 统计Spark执行过程信息,向Cluster Manager 申请Task需要的资源节点等。Driver在执行提交的程序时,会根据Action算子提交Job。 一个Action算子提交一个Job,并将其交给DAGScheduler, DAGScheduler在submitJob时会从后向前根据血缘关系遍历,如果一个RDD是ShuffleRDD, 会将其前后分为两个Stage。之后,会将TaskSets提交给TaskScheduler, 并封装为TaskManager。最后在Worker节点上启动Executor进程,并将Task分发给Worker节点执行。

    程序的提交执行,为一个Application, 其会通过Driver节点向Cluster Manager申请资源,然后在Worker节点启动一批Executor。每一个Executor是一个进程,其只服务于当前申请的Application。一个Worker节点上会存在多个Application申请的Executor进程,它们之间资源是相互隔离的。当分区Task(执行逻辑)分发到当前Worker的Executor上,则会在其上启动一个线程进行执行Task任务。Executor中包含一个blockManager,由于迭代计算会产生很多中间结果,可以将其存储在这个模块中,减少io操作,提高性能。

  • client端不一定和Driver在一起

  • Driver是将来运行提交代码的那台服务器

  • SparkContext作为核心,一个集群里只有一个,是程序的入口

  • 每台机器每个节点相当于Worker Node。一个节点默认一个Executor

  • Executor是自动开辟的空间,有核有内存

  • Task的数量由分配给Executor的内核数量决定。是众算子分出来的一个个小任务

  • 算子交付给每一个Executor,方法分配到Task上执行,算子给有数据的机器

  • MapReduce在Shuffle阶段是Reduce把Mapper拉取过来,影响效率。因此就把算子复制传到节点上并赋予编号,减少数据的传输。通过Yarn来交付,是把程序发过去。过程尽量放内存不落盘。

  • 每一个算子都是一个Task,甚至多个Task。一个Executor可以处理一段算子

  • Cache是节点间用来传输少量的数据

  • 通过repartition改分区数量。一个Task对应一个RDD分区。假设三台服务器,每台服务器2核。那么repartition(6)就会占用各两个核,超过的话会等某个结束计算后,再让它计算。

2.Spark架构核心组件

image-20240726151437724

  • Spark一般部署再Hadoop服务器上,方便找Yarn,也方便找数据。Master最好不要放在集群上,防止崩溃

  • 如一个map算子可能会在多节点查数据,所以可能会出现多个Task

  • 一个Job代表一串算子

  • 一个Job分成多组Task,称为Stage。即从一串转换算子到一个行动算子就是一个Stage

  • 一个分区要占用一个核

  • 网上解释

    1. Job
      • 一个Job是由Spark应用程序中的一个行动(Action)触发的。行动是那些返回数据并触发计算的操作,如count()collect()save()等。
      • 一个Job可以包含一个或多个Stage。
    2. Stage
      • Stage是Job的一个阶段,代表了一组可以并行执行的任务集合。
      • Stage之间存在依赖关系,通常分为两类:窄依赖(Narrow Dependency)和宽依赖(Wide Dependency)。
      • 窄依赖意味着子RDD的每个分区只依赖于父RDD的一个或少数几个分区,不需要进行数据的shuffle操作。
      • 宽依赖则意味着子RDD的每个分区可能依赖于父RDD的所有分区,需要进行数据的shuffle操作。
      • 一个Job从逻辑上被划分为多个Stage,每个Stage的结束标志着一个依赖的完成。
      • 需要传数据了就是一个阶段
    3. Task
      • Task是Stage中的基本执行单元,每个Task负责处理数据的一个分区。
      • Stage中的所有Task都是并行执行的,每个Task对应RDD的一个分区。
      • Task的执行是实际的计算工作,它们读取数据、执行转换操作,并产生结果。

    这三个概念之间的关系可以用以下方式描述:

    • 当Spark应用程序执行一个行动操作时,它首先触发一个Job。
    • Spark的DAG(Directed Acyclic Graph,有向无环图)调度器会根据RDD之间的依赖关系将Job分解为一个或多个Stage。
    • 每个Stage进一步被分解为多个Task,这些Task在集群中的不同节点上并行执行。
    • 窄依赖:在窄依赖的情况下,子RDD的每个分区只依赖于父RDD的一个或少数几个分区。这种情况下,一个Task确实对应处理一个分区的数据。
    • 宽依赖:在宽依赖的情况下,子RDD的每个分区可能依赖于父RDD的所有分区,需要进行数据的shuffle操作。在这种情况下,一个Task可能需要处理来自多个分区的数据。
    • 父RDD中的一个Task就是一个分区

3.Spark API

  • RDD
    • Spark核心,主要数据抽象
    • 类似数组
    • 弹性分布式数据集
    • 存储的是算子,相当于高级方法
  • Dataset
    • 从Spark1.6开始引入的新的抽象,特定领域对象中的强类型集合,它可以使用函数或者相关操作并行地进行转换等操作
    • 类似对象数组
  • DataFrame
    • DataFrame是特殊的Dataset
    • 结构化的RDD
    • 长的像表

4.RDD

A.解释

  • 简单的解释
    • RDD是将数据项拆分为多个分区的集合,存储在集群的工作节点上的内存和磁盘中,并执行正确的操作
  • 复杂的解释
    • RDD是用于数据转换的接口
    • RDD指向了存储在HDFS、Cassandra、HBase等、或缓存(内存、内存+磁盘、仅磁盘等),或在故障或缓存收回时重新计算其他RDD分区中的数据
  • RDD中是没有数据的,里面只有算子和指引对象(数据地址),是分区集合
  • 是spark核心

B.完整解释(重要)

  • RDD是弹性分布式数据集(Resilient Distributed Datasets)
    • 分布式数据集
      • RDD是只读的、分区记录的集合,每个分区分布在集群的不同节点上
      • RDD并不存储真正的数据,只是对数据和操作的描述
    • 弹性
      • RDD默认存放在内存中,当内存不足,Spark自动将RDD写入磁盘
    • 容错性
      • 根据数据血统(设置检查点ckeckpoint),可以自动从节点失败中恢复分区

C.DAG

  • 有向无环图,算子是不能成环的

D.特性

  • 一系列的**分区(分片)**信息,每个任务处理一个分区
  • 每个分区上都有compute函数,计算该分区中的数据
  • RDD之间有一系列的依赖
  • 分区器决定数据(key-value)分配至哪个分区
  • 优先位置列表,将计算任务分派到其所在处理数据块的存储位置

E.RDD的使用

a.使用集合创建

13a5ff67-ed29-4a8f-ac9f-27a4fafe04a5

parallelize是用于变成RDD的

makeRDD

1
2
3
4
5
6
7
8
9
10
11
object MyPartition {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("part").setMaster("local[*]")
val sc = SparkContext.getOrCreate(conf)
val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5, 6, 7))
val size = rdd.partitions.size
println(size)
sc.stop()
}
}
// 我的电脑16,因为16核,*改多少最后就会输出多少,也可以在parallelized最后多加一个数字
  • 查看实际的分区

    mapPartitionsWithIndex

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
object MyPartition {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("part").setMaster("local[*]")
val sc = SparkContext.getOrCreate(conf)
val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5, 6, 7),4)
.mapPartitionsWithIndex((x,iter)=>{
val str=iter.toIterator.toList.mkString(",")
List(s"分区号:${x},分区数据:${str}").toIterator
}).foreach(println);
sc.stop()
}
}

分区号:1,分区数据:2,3
分区号:3,分区数据:6,7
分区号:2,分区数据:4,5
分区号:0,分区数据:1
  • 自己来进行分区,分区一定要用键值对,要重写分区的方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    object MyPartition {
    def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("part").setMaster("local[*]")
    val sc = SparkContext.getOrCreate(conf)
    val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5, 6, 7),4)
    .map(x=>(x,x))
    .partitionBy(new MyPart(4))
    .mapPartitionsWithIndex((x,iter)=>{
    val str=iter.toIterator.toList.mkString(",")
    List(s"分区号:${x},分区数据:${str}").toIterator
    }).foreach(println);
    sc.stop()
    }
    }
    class MyPart(n:Int) extends Partitioner{
    override def numPartitions: Int = n

    override def getPartition(key: Any): Int = {
    val keyNum = key.toString.toInt
    keyNum%numPartitions
    }
    }
b.通过加载文件产生RDD

image-20240726173911665

c.RDD操作(重要)
  • b7debef9-c276-4998-a8f6-b4f0338d9996

  • 转换算子一开始都不干活,直到遇到一个行动算子

  • 简而言之,如果你在Spark程序中看到一个操作返回一个新的RDD,那么它很可能是一个转换算子。如果一个操作返回一个值(如整数、列表等),那么它很可能是一个行动算子。

  • 转换算子

    • map

    • filter

    • mapValues

      原RDD中的Key保持不变,与新的Value一起组成新的RDD中的元素,仅适用于PairRDD

    • image-20240727100517510
    • makeRDD

    • cache

    • persist

  • 动作算子

    • count

    • collect

      但是这个collect不是scala那个。是以Array形式返回RDD所有数据,十分危险,要消耗大量内存。collect可以从RDD变为Array

    • take

      take后的是Array不是RDD,可再用parallelize转为RDD就可以继续了,也可以用makeRDD

    • reduce

    • foreach

    • lookup

      用于PairRDD,返回K对应的所有V值

    • 最值

    • saveAsTextFile

      单反是save都是行动算子,RDD有。落盘文件数量按分区来。如果放在一个文件中,那么就用makeRDD(arr,1)或者使用repartition来设置一个分区

      1
      2
      3
      4
      5
      6
      7
      8
      9
      val rdd = sc.textFile("E:\\ProgramFile\\BigDataStudy\\data\\customers.csv")
      val arr = rdd.map(line => {
      val infos = line.split(",")
      // (infos(2),infos(1))
      (infos(1), infos(2))
      }).groupByKey().mapValues(_.toList.size)
      .repartition(1).sortBy(-_._2).take(10)

      sc.makeRDD(arr,1).saveAsTextFile("e:/Temp/ttt")

F.RDD持久化

缓存
  • cache和persist是减少重复计算

    d406eadf-0eb6-4bcd-bbd7-294dd14ea5cc

  • 姓名全部排序输出,用cache()把转换算子的结果放入cache()来提高效率,重复用的变量尽量都要cache().也得碰到行动算子才会运行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
object Exp01 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("exp01")
val sc = SparkContext.getOrCreate(conf)

val rdd = sc.textFile("E:\\ProgramFile\\BigDataStudy\\data\\customers.csv")
val arr = rdd.map(line => {
val infos = line.split(",")
(infos(1), infos(2))
}).cache()
val arr1 = arr.map(x=>(x._2,x._1))
countRes(arr)
countRes(arr1)
sc.stop()
}
def countRes(rdd:RDD[(String,String)])={
rdd.groupByKey().mapValues(_.toList.size)
.repartition(1).sortBy(-_._2).take(10).foreach(println)
}
}
  • 但是内存很有可能没这么打,用persist,自动cache内存不够自动落盘

    • persist中写缓存策略

      • MEMORY_ONLY只放内存
      • DISK_ONLY只存硬盘
      • MEMORY_AND_DISK先内存再硬盘
      • MEMORY_AND_DISK_SER序列化存储,提高效率
      • MEMORY_AND_DISK_2 内存和落盘都有两个副本(吃内存,不推荐)
    • 改为

    1
    2
    3
    4
    val arr = rdd.map(line => {
    val infos = line.split(",")
    (infos(1), infos(2))
    }).persist(StorageLevel.MEMORY_ONLY)
检查点
  • 检查点 使用来保存相关配置,用于崩溃重启读取处理。一般放在大型计算之前做检查点

    image-20240727115041305

  • 改为 不管血统,只保留结果数据。碰到行动算子才会干活

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    // 设置检查点目录
    sc.setCheckpointDir("e:/Temp/ckpt") // 检查点文件
    val rdd = sc.textFile("E:\\ProgramFile\\BigDataStudy\\data\\customers.csv")

    val arr = rdd.map(line => {
    val infos = line.split(",")
    (infos(1), infos(2))
    })
    arr.checkpoint() // 做个检查点
    arr

G.RDD共享变量

广播变量
  • 广播变量:允许开发者将一个只读变量(Driver端)缓存到每个节点(Executor)上,而不是每个任务传递一个副本

  • image-20240727151925528

    只能针对于能够序列化的对象

  • 广播变量,只读不改

  • 设备温度监控

    1. 普通

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      object TemperatureWatch {
      def main(args: Array[String]): Unit = {
      val conf = new SparkConf().setMaster("local[*]").setAppName("tw")
      val sc = SparkContext.getOrCreate(conf)
      // 读阈值文件 某台节点上
      val threshold = sc.textFile("e:/Temp/thread.txt").cache()
      // 读设备温度监控数据 某几个节点上
      val macdata = sc.textFile("e:/Temp/mactemp.csv").cache()

      // 将阈值文件转为Map(机器部位编号,(最低温度,最高温度))
      val thresholdMap:Map[String,(Int,Int)] = threshold.map(line => {
      val infos = line.split(",")
      (infos(0), (infos(1).toInt, infos(2).toInt))
      }).collect().toMap

      // 根据设备监控数据和阈值文件将超过阈值的数据过滤出来
      macdata.map(line=>{
      val infos = line.split(",")
      // 根据阈值Map找到机器对应部位的最高最低温度
      val (minTemp,maxTemp):(Int,Int)=thresholdMap.get(infos(0)).get
      if(infos(1).toInt<minTemp || infos(1).toInt>maxTemp) {
      (infos(0),infos(1).toInt,infos(2),"温度异常")
      }else{
      (infos(0),infos(1).toInt,infos(2),"温度正常")
      }
      }).foreach(println)

      sc.stop()
      }
      }

      我们会发现阈值文件特别小,就要放进内存,但是本来就是放到内存中的。但是,Spark是把算子交给有数据的节点,读进内存只是当前的节点。而执行val (minTemp,maxTemp):(Int,Int)=thresholdMap.get(infos(0)).get这条语句的时候要拉取该节点内存数据,大文件节点拉小文件没问题,但要是是小文件拉大文件,资源消耗太大,要让小文件存到大文件所在的节点中。因此用2的方法

      image-20240727151524247

    2. 广播变量方式

      共享变量,把小的数据发送到每一个要用的节点中。这句加在取值后

      读取的时候一定要先用.value读出来

      1
      2
           // 将阈值Map存放到共享变量中
      val tm = sc.broadcast(thresholdMap)
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      val thresholdMap:Map[String,(Int,Int)] = threshold.map(line => {
      val infos = line.split(",")
      (infos(0), (infos(1).toInt, infos(2).toInt))
      }).collect().toMap
      // 将阈值Map存放到共享变量中
      val tm = sc.broadcast(thresholdMap) // ------------

      // 根据设备监控数据和阈值文件将超过阈值的数据过滤出来
      macdata.map(line=>{
      val infos = line.split(",")
      // 通过共享变量获取map
      val tmap = tm.value // ------------------
      // 根据阈值Map找到机器对应部位的最高最低温度
      val (minTemp,maxTemp):(Int,Int)=tmap.get(infos(0)).get // ---------
      if(infos(1).toInt<minTemp || infos(1).toInt>maxTemp) {
      (infos(0),infos(1).toInt,infos(2),"温度异常")
      }else{
      (infos(0),infos(1).toInt,infos(2),"温度正常")
      }
      }).foreach(println)
累加器
  • 只允许added操作,常用于实现计数

    image-20240727155526138
  • 类似于一种原子的操作

  • 例子

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    val conf = new SparkConf().setAppName("part").setMaster("local[*]")
    val sc = SparkContext.getOrCreate(conf)
    // var cnt=0
    var cnt=sc.accumulator(0,"cnt")
    sc.makeRDD(List(1,2,3,4,5,6,7,8,9,10)).foreach(n=>cnt+=n)
    println(cnt.value)

    sc.stop()
    // 输出55,用ctn=0的方式就不行,出0。因为RDD的操作是在服务器上的

    //现在都这么用
    val res = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)).reduce(_ + _)
    println(res)

H.RDD分区设计

  • 分区大小限制为2GB
  • 分区太少
    • 不利于并发
    • 更容易受数据倾斜影响
    • groupBy,reduceByKey,sortByKey等内存压力增大
  • 分区过多
    • Shuffle开销越大
    • 创建任务开销越大
  • 经验
    • 每个分区大约128MB
    • 如果分区小于但接近2000,则设置为大于2000

5.Spark WordCount运行原理

  • image-20240727104815941
  • 一旦出现shuffle就分stage(阶段),往往是有By操作时

A.划分stage原因

  • 数据本地化
    • 移动计算,而不是移动数据
    • 保证一个Stage内不会发生数据移动

B.Spark Shuffle过程

  • 宽依赖就要shuffle
  • 数据在shuffle中很有可能落盘
  • 在分区之间重新分配数据
    • 父RDD中同一分区中的数据按照算子要求重新进入子RDD的不同分区中
    • 中间结果写入磁盘
    • 子RDD拉取数据,而不是由父RDD推送
    • 默认情况下,Shuffle不会改变分区数量

C.依赖关系

  • Lineage:血统、遗传
  • 记录了RDD之前的转换操作和依赖关系
  • image-20240727105552098
  • 需要shuffle的都是宽依赖带By的都是宽,join有可能宽有可能窄,distinct宽但有可能不是,要让内部的By不运行
  • 要自己去改算子的先后以及其他方式提高效率。窄依赖的效率高
  • 血统信息包括转换操作,依赖关系,数据流,执行计划,内存和磁盘的使用

6.数据倾斜

image-20240727161815648

加盐加在前面,否则按照字典排序还是分不开

(三)安装

服务器安装

1.服务器安装(单机版)

  • 该例子中所用的spark版本为spark-2.3.4-bin-hadoop2.6.tgz
  1. 把相应包放到/opt

  2. 命令执行

    1
    2
    3
    4
    5
    6
    cd /opt
    tar -zxf spark-2.3.4-bin-hadoop2.6.tgz
    mv spark-2.3.4-bin-hadoop2.6 soft/spark234
    cd soft/spark234/conf
    cp slaves.template slaves
    cp spark-env.sh.template spark-env.sh

    vim spark-env.sh

    1
    2
    3
    4
    5
    export SPARK_MASTER_HOST=192.168.179.139
    export SPARK_MASTER_PORT=7077
    export SPARK_WORKER_CORES=2
    export SPARK_WORKER_MEMORY=3g
    export SPARK_MASTER_WEBUI_PORT=8888

    vim ../sbin/spark-config.sh最后加

    1
    export JAVA_HOME=/opt/soft/jdk180
  3. /opt/soft/spark234/sbin中有启动的脚本,和Hadoop的脚本名重复

  4. 启动与关闭

    1
    2
    3
    4
    cd /opt/soft/spark234/sbin
    ./start-all.sh
    # 多出了Master和Worker
    ./stop-all.sh

(四)使用

image-20240726141101384

1.第一次使用(idea/无环境)

A.导入包

1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.3.4</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.3.4</version>
</dependency>

B.导入scala的库

  • 可以用之前的方法导入

  • 也可以用这种方法

    1
    2
    3
    4
    5
    <dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-library</artifactId>
    <version>2.11.8</version>
    </dependency>
  • 这边用原本的方法

C.使用Scala统计字数

  • 正式写代码,最好使用.getOrCreate来建立,val sc = SparkContext.getOrCreate(conf)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
object WordCount {
def main(args: Array[String]): Unit = {
// 1.开启Spark配置
// Master设置模式(Yarn/K8S)(管内存的是谁)
// *代表利用计算机上所有的资源 数字就是调用多少核
val conf = new SparkConf().setMaster("local[*]").setAppName("wc")
// 2.获取核心对象
// 由谁来指挥
val sc = new SparkContext(conf)
// 3.读文件进行单词统计
sc.textFile("e:/Temp/word.txt")
.flatMap(_.split(" "))
.map((_,1))
.reduceByKey(_+_).foreach(println)
// 上面那个函数按键分组,按值相加
// 4.关闭核心
sc.stop()
}
}

D.使用Java统计字数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

import java.util.Arrays;
import java.util.Iterator;

public class MyWordCount {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("wc")
.setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> words = sc.textFile("e:/Temp/word.txt");
// 只接受迭代器
words.flatMap(line-> Arrays.asList(line.split(" ")).iterator())
.groupBy(x->x)
.mapValues(iter->{ // 这里面是个迭代器没有size
Iterator<String> itor = iter.iterator();
int count=0;
for(;itor.hasNext();itor.next()){
count++;
}
return count;
})
.foreach(x-> System.out.println(x));
sc.stop();
}
}

核心的那条语句可以改成

1
2
3
4
words.flatMap(line-> Arrays.asList(line.split(" ")).iterator())
.mapToPair(word->new Tuple2<String,Integer>(word,1))
.reduceByKey((x,y)->x+y)
.foreach(x-> System.out.println(x));

其中mapToPair可以告诉JVM我出了元组

2.程序提交服务器(发布)

A.Standalone

  • Standalone : 无yarn操作

    1. Client通知Master提交任务
    2. Master将work资源推荐给Client
    3. Client找到WorkNode节点
    4. WorkNode开启一个Executor
    5. 资源不足则WorkNode会通知Client再找Master要新的资源

    image-20240730134342873

  • 操作

    1. 建立相关Scala相关工程,写好相关代码

      (不全的)

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      <properties>    
      <scala.version>2.11</scala.version>
      <spark.version>2.3.4</spark.version>
      </properties>

      <dependencies>
      <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_${scala.version}</artifactId>
      <version>${spark.version}</version>
      </dependency>
      <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_${scala.version}</artifactId>
      <version>${spark.version}</version>
      </dependency>
      </dependencies>

      再pom.xml中build里面的全删了,在里面加入,这是用来打包瘦包的

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      <build>
      <plugins>
      <plugin>
      <groupId>org.scala-tools</groupId>
      <artifactId>maven-scala-plugin</artifactId>
      <version>2.15.2</version>
      <executions>
      <execution>
      <goals>
      <goal>compile</goal>
      <goal>testCompile</goal>
      </goals>
      </execution>
      </executions>
      </plugin>
      </plugins>
      </build>

      或者 用来打胖包的

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
        <plugins>
      <plugin>
      <groupId>net.alchim31.maven</groupId>
      <artifactId>scala-maven-plugin</artifactId>
      <version>3.4.6</version>
      <executions>
      <execution>
      <goals>
      <goal>compile</goal>
      </goals>
      </execution>
      </executions>
      </plugin>
      <plugin>
      <groupId>org.apache.maven.plugins</groupId>
      <artifactId>maven-assembly-plugin</artifactId>
      <version>3.0.0</version>
      <configuration>
      <descriptorRefs>
      <descriptorRef>jar-with-dependencies</descriptorRef>
      </descriptorRefs>
      </configuration>
      <executions>
      <execution>
      <id>make-assembly</id>
      <phase>package</phase>
      <goals>
      <goal>single</goal>
      </goals>
      </execution>
      </executions>
      </plugin>
      </plugins>
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      // 单词统计
      case class MyWords(line:String)
      object App {
      def main(args: Array[String]): Unit = {
      val spark = SparkSession.builder().master("local[*]").appName("mytest").getOrCreate()

      import spark.implicits._
      val df = spark.createDataFrame(List(
      MyWords("Hello World"),
      MyWords("Hello Spark")
      )).select(explode(split($"line"," ")).as("word"))
      .groupBy("word").agg(count($"word").as("word_num"))
      df.show()
      spark.stop()
      }
      }
    2. Maven->clean->package

      把打包的mypacket-1.0-SNAPSHOT.jar拿出放入服务器中,在服务器上打开Hadoop、yarn和Spark。

    3. Master服务器提交

      两种提交方式,一种为cluster一种是client,cluster会把把算子到各个节点,client会汇总到客户端

      spark提交任务的几种方式_spark 提交模式-CSDN博客

      1
      2
      3
      4
      5
      6
      7
      8
         cd /opt/soft/spark234/bin/
      ./spark-submit \
      --class com.njupt.mypackage.App \
      --master spark://192.168.179.139:7077 \
      --executor-memory 3G \
      --total-executor-cores 4 \
      --deploy-mode cluster \
      /opt/mypacket-1.0-SNAPSHOT.jar
      1
      2
      3
      4
      5
      6
      7
         ./spark-submit \
      --class com.njupt.mypackage.App \ # 程序包路径
      --master spark://192.168.179.139:7077 \ # Master地址
      --executor-memory 3G \ # 单个executor内存
      --total-executor-cores 4 \ # 总核数3个
      --deploy-mode cluster \ # 使用--deploy-mode cluster参数提交作业时,意味着你希望Spark作业的驱动程序在集群中运行,而不是在提交作业的客户端机器上运行。这边结果给client,信息最后一定是在driver上看到的
      /opt/mypacket-1.0-SNAPSHOT.jar # 包的主机中路径,后面可以跟参数

B.yarn服务器提交

  1. 先配yarn环境

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    在/etc/profile中(要现有)YARN_HOME
    上面已经有
    export HADOOP_HOME=/opt/soft/hadoop260
    export YARN_HOME=$HADOOP_HOME

    下面要写
    #Yarn Env
    export YARN_CONF_DIR=$YARN_HOME/etc/hadoop

    source /etc/profile
    查看一下
    # echo $YARN_CONF_DIR
    /opt/soft/hadoop260/etc/hadoop
  2. 代码注释掉.master("local[*]")重打包,覆盖原本包

  3. 命令

    1
    2
    3
    4
    5
    6
    7
    8
    cd /opt/soft/spark234/bin/
    ./spark-submit \
    --class com.njupt.mypackage.App \
    --master yarn \ # 改了这个
    --executor-memory 3G \
    --total-executor-cores 4 \
    --deploy-mode client \ #改了这个 发布的时候还是要用cluster
    /opt/mypacket-1.0-SNAPSHOT.jar

(五)基础学习

  • repartition设置分区个数

  • 做全体排序的时候,要设置分区为1

    1
    2
    3
    4
    5
    val infos = line.split(",")
    (infos(2),infos(1))
    }).groupByKey().mapValues(_.toList.size)
    .repartition(1).sortBy(-_._2).take(5)
    .foreach(println)

1.SparkSession

  • SparkSession的使用

  • 算子要有顺序,比如要先分组才能找到相应的聚合项

  • 装载CSV数据源可用sql的语句,可以展示表

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    def main(args: Array[String]): Unit = {
    // val conf = new SparkConf().setAppName("exp03").setMaster("local[*]")
    // val sc = SparkContext.getOrCreate(conf)
    //
    // val rdd = sc.textFile("E:\\ProgramFile\\BigDataStudy\\data\\customer_details.csv")
    // // 获取首行信息
    // val first = rdd.first()
    // // 通过过滤器去除首行
    // rdd.filter(line => !line.equals(first)).take(5).foreach(println)
    // sc.stop()
    val spark = SparkSession.builder().master("local[*]")
    .appName("exp03").getOrCreate();
    // 读有表头的文件
    val df = spark.read.option("header", true)
    .csv("E:\\ProgramFile\\BigDataStudy\\data\\customer_details.csv");
    // df.show();
    // 给df起个名字
    df.createTempView("customer")
    // 尽心查询
    spark.sql(
    """
    |select count(*) s_name from customer where first_name like 'S%'
    |""".stripMargin).show();
    spark.stop();
    }
  • sql经典50题

  • sql中的函数通过import org.apache.spark.sql.functions._导入,要用spark.implicits._要先导入前面的包,这个spark不是绝对的,而是前面开启配置时的对象

  • SparkSession后的算子

    • show(5,false)显示5行不剪切

    • withColumn多加一列

      这儿是把这两列用空格的方式相连生成新一列

      1
      2
      df.withColumn("name",concat_ws(" ",
      col("first_name"),col("last_name"))).show(5)
    • withColumnRenamed改列名

      1
      df.withColumnRenamed("first_name","fname").show();
    • toDF变为DataFrame,起列名

      1
      2
      3
      val df1 = spark.read.csv("E:\\ProgramFile\\BigDataStudy\\data\\customers.csv")
      .toDF("id","fname","lname","xxx","xxx1","address","area","ar","num")
      .show(5)
    • RDD转DataFrame

      1
      2
      3
      // 必须紧跟在toDF上
      import spark.implicits._
      spark.sparkContext.parallelize(Seq((1,"zs"),(2,"ls"))).toDF("id","name").show()
    • orderBy排序

      倒叙前5

      1
      df.orderBy(col("customer_id").desc).show(5);
    • wherelike

      三种方法

      1
      2
      3
      4
      5
      6
      df.where("first_name like 'Z%'").show()
      df.where(col("first_name").like("Z%")).show(5) // 效果相同

      // 效果也相同
      import spark.implicits._
      df.where($"first_name".like("Z%")).show(5)
    • lit相当于做了一个小表一行一列

    • ===是等于

      1
      2
      import spark.implicits._
      df.where($"first_name"===lit("Zachariah")).show(5)
    • and条件相连

      1
      df.where($"first_name".like("Z%") and $"gender"===lit("Female")).show(5)
    • select选择投影

      1
      df.select($"first_name".as("fanme"),$"last_name".as("lname"),$"email").show(5)
    • as改别名

    1
    2
      import spark.implicits._
    df.select($"first_name".as("fanme"),$"last_name".as("lname"),$"email").show(5)
    • join inner join,要左右外联,只要把innerleft或right

      1
      2
      3
      custdf.join(transdf,custdf("customer_id")===transdf("customer_id"),"inner")
      // 列名一样可以用一下的方式相关联
      custdf.join(transdf,Seq("customer_id"),"inner").show(5)
    • crossJoin笛卡尔集

    • groupBy分组,聚合函数

      1
      transdf.groupBy($"customer_id").agg(sum($"price").as("totalprice"),count($"product").as("paynum")).show(5)
    • Windows.partitionedby

      Windows窗口函数

      rank().over(Window.orderBy(desc("event_num")))

    • 但凡有null的地方补为0

      1
      .na.fill(0)

2.装载JSON

3.Spark SQL架构

  • 介绍
    • 提高了更高层次的接口方便地处理数据
    • 是Spark的核心组件,能够直接访问现存的hive
    • 支持SQL、API编程
    • image-20240729091721730
  • image-20240729092734922 - 主要是SparkSession

A.DataSet

image-20240729092912362

B.DataFrame

image-20240729093126751

C.Spark SQL 函数

  • 内置函数

  • 自定义函数

    Java中有LocalDate来转为星期

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    object Exp04 {
    // 自定义UDF
    val myDayOfWeek=udf((time:String)=>{
    LocalDate.parse(time.split(" ")(0)).getDayOfWeek.toString
    })
    def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local[*]").appName("exp04").getOrCreate();
    val ordDF = spark.read.csv("E:/Temp/retail_db/orders.csv")
    .toDF("ordid", "orddate", "userid", "statu")
    val itemDF = spark.read.csv("E:/Temp/retail_db/order_items.csv")
    .toDF("itemid", "ordid", "productid", "buynum", "cntprice", "price")
    import spark.implicits._
    ordDF.join(itemDF,Seq("ordid")).select($"ordid",
    myDayOfWeek($"orddate").as("weekday"),$"buynum")
    .groupBy("weekday").agg(sum("buynum").as("cnt_buynum"))
    .show()
    spark.stop();
    }
    }

E.操作外部数据源

image-20240729111835878
  • PostgreSQL建议自学一下
  • image-20240729115558749
连接MySQL
  1. 导入包

    1
    2
    3
    4
    5
    <dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.38</version>
    </dependency>
  2. 连接取数据

    • 读:spark.read.jdbc(url, 表名, param)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    object Exp05 {
    def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local[*]").appName("exp05").getOrCreate();
    // 读取数据库
    val param = new Properties()
    param.setProperty("user","root")
    param.setProperty("password","ok")
    param.setProperty("driver","com.mysql.jdbc.Driver") // mysql5 8要在mysql后加cj
    val url = "jdbc:mysql://192.168.179.139:3306/exp"
    val custDF = spark.read.jdbc(url, "customers", param)
    custDF.show()
    spark.stop()
    }
    }
    • custDF.write.mode(SaveMode.Append).saveAsTable("products")

      SaveMode.OverWrite属性,要填入

      写样例类,并且用DF传入

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      case class Products(productid:Int,typeid:Int,title:String,
      orginprice:Double,imgs:String);

      object Exp05 {
      def main(args: Array[String]): Unit = {
      val spark = SparkSession.builder().master("local[*]").appName("exp05").getOrCreate();
      // 读取数据库
      val param = new Properties()
      param.setProperty("user","root")
      param.setProperty("password","ok")
      param.setProperty("driver","com.mysql.jdbc.Driver") // mysql5 8要在mysql后加cj 下面的url也要加相关配置
      val url = "jdbc:mysql://192.168.179.139:3306/exp"
      // val custDF = spark.read.jdbc(url, "products", param)
      val df = spark.createDataFrame(List(Products(1500, 1, "中国商品", 1000.12, "http://imgs/123.jpg")))
      df.write.mode(SaveMode.Append).jdbc(url, "products", param)
      // custDF.show()

      spark.stop()
      }
      }
连接Hive
  1. 导入包

    要对应好hive和spark版本

    1
    2
    3
    4
    5
    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-hive_2.11</artifactId>
    <version>2.3.4</version>
    </dependency>
  2. 代码

    9083端口

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    object Exp06 {
    def main(args: Array[String]): Unit = {
    var spark=SparkSession.builder().appName("exp06")
    .master("local[*]")
    .config("hive.metastore.uris","thrift://192.168.179.139:9083")
    .enableHiveSupport()
    .getOrCreate()
    val df = spark.sql("select * from exp.cust")
    df.show()
    spark.stop()
    }
    }
    • 1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      case class Custs(id:String,name:String);
      object Exp06 {
      def main(args: Array[String]): Unit = {
      System.setProperty("HADOOP_USER_NAME","root") // 写的时候要设置用户
      var spark=SparkSession.builder().appName("exp06")
      .master("local[*]")
      .config("hive.metastore.uris","thrift://192.168.179.139:9083")
      .enableHiveSupport()
      .getOrCreate()
      // val df = spark.sql("select * from exp.custs")
      val df = spark.createDataFrame(List(Custs("4", "zl")))
      df.write.format("Hive").mode(SaveMode.Append).saveAsTable("exp.custs") // 文本格式要默认为Hive的
      // df.show()
      spark.stop()
      }
      }
连接Hbase
  1. 导入包

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    <dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-client</artifactId>
    <version>1.2.0</version>
    </dependency>
    <dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-server</artifactId>
    <version>1.2.0</version>
    </dependency>
  2. 导包要注意

    spark.sparkContext那段不知道是什么

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    import org.apache.hadoop.hbase.HBaseConfiguration
    import org.apache.hadoop.hbase.client.Result
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable
    import org.apache.hadoop.hbase.mapreduce.TableInputFormat
    import org.apache.hadoop.hbase.util.Bytes
    import org.apache.spark.sql.{Row, SparkSession}

    case class Users(userid:String,uname:String,gender:String,say:String)
    object Exp07 {
    def main(args: Array[String]): Unit = {
    val spark=SparkSession.builder().appName("exp07")
    .master("local[*]")
    .getOrCreate()
    val config = HBaseConfiguration.create()
    config.set("hbase.zookeeper.quorum","192.168.179.139:2181")
    config.set(TableInputFormat.INPUT_TABLE,"mydemo:users")

    val rdd=spark.sparkContext.newAPIHadoopRDD( // <-------------
    config,
    classOf[TableInputFormat],
    classOf[ImmutableBytesWritable],
    classOf[Result]
    )
    import spark.implicits._
    val arrRDD = rdd.map {
    case (_, result) => {
    val key = Bytes.toString(result.getRow)
    val uname = Bytes.toString(result.getValue("base".getBytes(), "uname".getBytes()))
    val gender = Bytes.toString(result.getValue("base".getBytes(), "gender".getBytes()))
    val say = Bytes.toString(result.getValue("base".getBytes(), "say".getBytes()))
    Users(key, uname, gender, say)
    }
    }.toDF()
    // // 做一个表的元数据结构
    // val tabSchema = StructType(Seq(
    // StructField("userid", StringType),
    // StructField("uname", StringType),
    // StructField("gender", StringType),
    // StructField("say", StringType)
    // ))
    arrRDD.show(false)
    spark.stop()
    }
    }
  3. 这边要注意写入要foreachPartition,因为Hbase驱动在Driver Program端,而塞数据在Executor中,table和conn都是接口,无法序列化传输

    后期再用数组作为缓存,一次性放入

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    package com.njupt.spark001

    import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
    import org.apache.hadoop.hbase.client.{ConnectionFactory, Put}
    import org.apache.spark.sql._

    import scala.collection.mutable.ListBuffer

    case class Users(rk:Int,uname:String)
    object Exp07 {
    def main(args: Array[String]): Unit = {
    val spark=SparkSession.builder().appName("exp07")
    .master("local[*]")
    .getOrCreate()

    // 写数据
    val data= ListBuffer[Users]()
    for(k:Int <- 1000 to 2000){
    Users(k,s"test${k}")+=:data
    }
    val df: DataFrame = spark.createDataFrame[Users](data).repartition(5)
    // 创建hbase连接

    df.foreachPartition(partition=> {
    //一个分区开一次连接
    val config = HBaseConfiguration.create()
    config.set("hbase.zookeeper.quorum","192.168.179.139:2181")
    val conn = ConnectionFactory.createConnection(config)
    val table = conn.getTable(TableName.valueOf("mydemo:uuu"))
    partition.foreach(row => {
    val put = new Put(row.getAs("rk").toString.getBytes())
    put.addColumn("base".getBytes(),"uname".getBytes()
    , row.getAs("uname").toString.getBytes())
    table.put(put)
    })
    table.close()
    conn.close()
    })
    spark.stop()
    }
    }
写入文件
  • 1个分区,有头

    1
    resDF.coalesce(1).write.option("header","true").mode(SaveMode.Overwrite).csv(param.get("write_file_path").get)

4.Spark Machine Learning架构

(1)

1
2
3
4
5
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId>
<version>${spark.version}</version>
</dependency>

(2)

(3)

(4)离线项目

  • 工程名spkmodel

Spark 2.1.0 入门:KMeans聚类算法

Spark ML 之 KMeans算法的应用实操——用户分群召回推荐算法

(六)进阶学习

1.性能优化

A.序列化

image-20240731084117663

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
case class MyWords(line:String)
object App {
def main(args: Array[String]): Unit = {
// 但凡传输和落盘都序列化
val conf = new SparkConf()
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.setAppName("aaa")
// 凡是MyWords类型的数据 在序列化时候都采用Kryo方式进行
conf.registerKryoClasses(Array(classOf[MyWords]))
val spark = SparkSession.builder().config(conf).getOrCreate()

import spark.implicits._
val df = spark.createDataFrame(List(
MyWords("Hello World"),
MyWords("Hello Spark")
)).select(explode(split($"line"," ")).as("word"))
.groupBy("word").agg(count($"word").as("word_num"))
df.show()
spark.stop()
}
}

B.性能优化2

  • 使用对象数组、原始类型代替Java、Scala集合类(如HashMap)
  • 避免嵌套结构
  • 尽量使用数字作为Key,而非字符串
  • 以较大的RDD使用MEMORY ONLY SER
  • 加载CSV、JSON时,仅加载所需字段
  • 仅在需要时持久化中间结果(RDD/DS/DF)
  • 避免不必要的中间结果(RDD/DS/DF)的生成
  • DF的执行速度比DS快约3倍

C.性能优化3

  • 自定义RDD分区与spark.default.parallelism
    • 该参数用于设置每个stage的默认task数量
  • 将大变量广播出去,而不是直接使用
  • 尝试处理本地数据并最小化跨工作节点的数据传输

D.性能优化4

image-20240730173259229

  • 起别名效率高,因为会有映射

最后

  • 标题: 大数据学习笔记4-Spark
  • 作者: Sabthever
  • 创建于 : 2025-07-08 20:02:34
  • 更新于 : 2025-10-09 16:14:31
  • 链接: https://sabthever.cn/2025/07/08/technology/bigdata/Hadoop4-Spark/
  • 版权声明: 本文章采用 CC BY-NC-SA 4.0 进行许可。