百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术文章 > 正文

spark分布式框架(spark分布式计算框架)

ccwgpt 2024-11-06 09:51 31 浏览 0 评论

本篇文章从spark是什么,为什么要用spark以及spark怎么用三个维度学习spark分布式框架。

(一)spark是什么

Apache Spark是专为大规模数据处理而设计的快速通用的计算引擎。Spark拥有Hadoop MapReduce所具有的优点。但不同于 MapReduce 的是 Job 中间输出结果可以保存在内存中,从而不再需要读写 HDFS,因此 Spark 能更好地适用于数据挖掘与机器学习等需要迭代的 MapReduce 的算法。如下所示,MapReduce计算得到的结果是直接存放到hdfs中,下次MapReduce使用的时候直接用hdfs从读取数据,而Spark是将结果存放到内存中,下次Spark执行的时候直接调用内存中的数据。

Spark 是 Scala 编写。

MapReduce:MR-->hdfs-->MR
Spark:Spark-->内存-->Spark


(二)为什么要用spark

运行速度快:

与Hadoop的MapReduce相比,Spark基于内存的运算要快100倍以上,基于硬盘的运算也要快10倍以上。Spark实现了高效的DAG执行引擎,可以通过基于内存来高效处理数据流。计算的中间结果是存在于内存中

易用性好:

Spark支持Java、Python和Scala的API,还支持超过80种高级算法,使用户可以快速构建不同的应用。而且Spark支持交互式的Python和Scala的Shell,可以非常方便地在这些Shell中使用Spark集群来验证解决问题的方法

通用性强:

Spark提供了统一的解决方案。Spark可以用于,交互式查询(Spark SQL)、实时流处理(Spark Streaming)、机器学习(Spark MLlib)和图计算(GraphX)。这些不同类型的处理都可以在同一个应用中无缝使用。减少了开发和维护的人力成本和部署平台的物力成本

高兼容性:

Spark可以非常方便地与其他的开源产品进行融合。比如,Spark可以使用Hadoop的YARN和Apache Mesos作为它的资源管理和调度器,并且可以处理所有Hadoop支持的数据,包括HDFS、HBase等。这对于已经部署Hadoop集群的用户特别重要,因为不需要做任何数据迁移就可以使用Spark的强大处理能力

官网地址:Apache Spark? - Unified Engine for large-scale data analytics

(三)spark怎么用(这里以Scala为例)

学习编程语言的时候,写的第一个案例大部分是“hello world”,这里就用第一个Spark代码WordCount解释一下。

(1)IDEA 创建 Maven 项目

(2)创建maven项目后配置相关的pom文件,否则无法在项目中使用Scala编写代码

pom.xml如下:

<!-- 配置以下可以解决 在jdk1.8环境下打包时报错 “-source 1.5 中不支持 lambda 表达式” -->
<properties>
  <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  <maven.compiler.source>1.8</maven.compiler.source>
  <maven.compiler.target>1.8</maven.compiler.target>
</properties>

<dependencies>
  <!-- Spark-core -->
  <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.3.1</version>
  </dependency>

  <!-- Scala 包-->
  <dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-library</artifactId>
    <version>2.11.7</version>
  </dependency>
  <dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-compiler</artifactId>
    <version>2.11.7</version>
  </dependency>
  <dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-reflect</artifactId>
    <version>2.11.7</version>
  </dependency>
  <dependency>
    <groupId>log4j</groupId>
    <artifactId>log4j</artifactId>
    <version>1.2.12</version>
  </dependency>
  <dependency>
    <groupId>com.google.collections</groupId>
    <artifactId>google-collections</artifactId>
    <version>1.0</version>
  </dependency>

</dependencies>

<build>
  <plugins>

    <!-- 在maven项目中既有java又有scala代码时配置 maven-scala-plugin 插件打包时可以将两类代码一起打包 -->
    <plugin>
      <groupId>org.scala-tools</groupId>
      <artifactId>maven-scala-plugin</artifactId>
      <version>2.15.2</version>
      <executions>
        <execution>
          <goals>
            <goal>compile</goal>
            <goal>testCompile</goal>
          </goals>
        </execution>
      </executions>
    </plugin>

    <!-- maven 打jar包需要插件 -->
    <plugin>
      <artifactId>maven-assembly-plugin</artifactId>
      <version>2.4</version>
      <configuration>
        <!-- 设置false后是去掉 MySpark-1.0-SNAPSHOT-jar-with-dependencies.jar 后的 “-jar-with-dependencies” -->
        <!--<appendAssemblyId>false</appendAssemblyId>-->
        <descriptorRefs>
          <descriptorRef>jar-with-dependencies</descriptorRef>
        </descriptorRefs>
        <archive>
          <manifest>
            <mainClass>com.lw.scalaspark.core.examples.ExecuteLinuxShell</mainClass>
          </manifest>
        </archive>
      </configuration>
      <executions>
        <execution>
          <id>make-assembly</id>
          <phase>package</phase>
          <goals>
            <goal>assembly</goal>
          </goals>
        </execution>
      </executions>
    </plugin>

  </plugins>
</build>

配置完成pom.xml文件后刷新pom

创建Scala项目

案例:编写第一个案例WorldCount

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object SparkWordCountScala {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf()
      .setMaster("local")
      .setAppName("Spark-Word-Count")
      .set("spark.driver.memery","10G")

    val sc: SparkContext = new SparkContext(conf)
    val line: RDD[String] = sc.textFile("./data/words")
    val word: RDD[String] = line.flatMap(line=>{line.split("")})
    val PairWord: RDD[(String, Int)] = word.map(word=>{new Tuple2(word,1)})
    val result: RDD[(String, Int)] = PairWord.reduceByKey((v1, v2)=>{v1+v2})

    result.foreach(println)

    sc.stop()
  }

执行结果如下:

结合上面案例详细说一下整个Spark代码过程:

1. 创建 SparkConf 对象

可以设置 Application name。

可以设置运行模式

可以设置 Spark applicatiion 的资源需求。

/**
* SparkConf 是Spark的配置,可以设置:
* 1).Spark运行模式
* local:本地运行模式,多用于本地使用eclipse | IDEA 测试代码。
* yarn: hadoop生态圈中的资源调度框架,Spark可以基于Yarn进行调度资源
* standalone:Spark自带的资源调度框架,支持分布式搭建,spark可以基于自带的资源调度框架来进行调度。
* mesos:资源调度框架。
* k8s:虚拟化的方式运行。
*
* 2).可以设置在Spark WEBUI中展示的Spark Application的名称
* 3).可以设置运行的资源情况
* 主要的资源包含core 和内存
*/
 val conf: SparkConf = new SparkConf()
      .setMaster("local")
      .setAppName("Spark-Word-Count")
      .set("spark.driver.memery","10G")

2. 创建 SparkContext 对象

/**
      * SparkContext 是通往Spark集群的唯一通道
    */
val sc: SparkContext = new SparkContext(conf)

3. 基于 Spark 的上下文创建一个 RDD,对 RDD 进行处理。

/**
      * 创建RDD
       * 在Scala中创建RDD的常见方式有以下几种:
       *1、从Scala集合创建:
				*val numbers = sc.parallelize(Array(1, 2, 3, 4, 5))
				*
				*2、从外部存储系统创建:
				*val textFile = sc.textFile("path/to/file.txt")
				*
				*3、从其他RDD转换创建:
				*val mappedRDD = numbers.map(_ * 2)
				*
				*4、使用SparkContext的makeRDD方法:
				*val fromSeq = sc.makeRDD(Array(1, 2, 3, 4, 5))
				*
				*5、从Hadoop支持的数据集创建:
				*val sequenceFile = sc.sequenceFile["key.class","value.class"]("path/to/file.seq")
				*
				*6、从数据库等外部数据源创建,需要使用Spark SQL的JDBC接口:
				*val jdbcRDD = sc.jdbc("jdbc:mysql://localhost/database", "table", "column1='value1'")
    */
val line: RDD[String] = sc.textFile("./data/words")

4. 应用程序中要有 Action 类算子来触发 Transformation 类算子执

行。

/**
      * 通过Transformation 类算子对RDD进行处理
    */
val line: RDD[String] = sc.textFile("./data/words")
val word: RDD[String] = line.flatMap(line=>{line.split("")})
val PairWord: RDD[(String, Int)] = word.map(word=>{new Tuple2(word,1)})
val result: RDD[(String, Int)] = PairWord.reduceByKey((v1, v2)=>{v1+v2})
/**
      * 通过Action 类算子触发Transformation 类算子
    */
result.foreach(println)

5. 关闭 Spark 上下文对象 SparkContext。

  /**
      * 关闭SparkContext
   */
sc.stop()


补充:RDD和算子说明:

RDD:

什么是RDD:

RDD(Resilient Distributed Dateset),弹性分布式数据集。

RDD 的五大特性:

1. RDD 是由一系列的 partition 组成的。

HDFS的最小单位是block,而RDD的最小单位是partition

2. 函数(算子)是作用在每一个 partition上的。

3. RDD 之间有一系列的依赖关系。

RDD理解图中rdd3依赖rdd2,rdd2依赖rdd1

4. 分区器是作用在 K,V 格式的 RDD 上。

说明:K,V 格式的 RDD指存储的数据都是二元组对象,比如


5. RDD 提供一系列最佳的计算位置。

RDD 提供计算最佳位置,体现了数据本地化。体现了大数据中“计算移动数据不移动”的理念。


算子:

算子:可以理解为函数,如上面案例中的textFile、flatMap、map、reduceByKey等

在Spark中,算子分为三类:分别是Transformations 转换算子、Action 行动算子和持久化算子

(1)Transformations 转换算子:也叫懒加载执行算子,如filter、flatMap、map、reduceByKey等

特点:没有Action 行动算子触发,不执行

(2)Action 行动算子:如foreach,collect,count 等

一个 application 应用程序中有几个Action 类算子执行,就有几个 job 运行。

用来触发Transformations 转换算子

(3)持久化算子,也叫控制算子

持久化算子主要有三种cache,persist,checkpoint,持久化的单位是 partition。cache 和 persist 都是懒执行的。必须有一个 action 类算子触发执行。checkpoint 算子不仅能将 RDD持久化到磁盘,还能切断 RDD 之间的依赖关系。

cache:持久化到内存中

用法:

persist:可以指定持久化的级别。最常用的是 MEMORY_ONLY 和

MEMORY_AND_DISK。”_2”表示有副本数。

用法:

持久化级别如下:

注意:chche () =persist()=persist(StorageLevel.Memory_Only)

checkpoint:checkpoint 将 RDD 持久化到磁盘,还可以切断 RDD 之间的依赖关

系。checkpoint 目录数据当 application 执行完之后不会被清除。

用法:

相关推荐

一个基于.Net Core遵循Clean Architecture原则开源架构

今天给大家推荐一个遵循CleanArchitecture原则开源架构。项目简介这是基于Asp.netCore6开发的,遵循CleanArchitecture原则,可以高效、快速地构建基于Ra...

AI写代码翻车无数次,我发现只要提前做好这3步,bug立减80%

写十万行全是bug之后终于找到方法了开发"提示词管理助手"新版本那会儿,我差点被bug整崩溃。刚开始两周,全靠AI改代码架构,结果十万行程序漏洞百出。本来以为AI说没问题就稳了,结果...

OneCode低代码平台的事件驱动设计:架构解析与实践

引言:低代码平台的事件驱动范式在现代软件开发中,事件驱动架构(EDA)已成为构建灵活、松耦合系统的核心范式。OneCode低代码平台通过创新性的注解驱动设计,将事件驱动理念深度融入平台架构,实现了业务...

国内大厂AI插件评测:根据UI图生成Vue前端代码

在IDEA中安装大厂的AI插件,打开ruoyi增强项目:yudao-ui-admin-vue31.CodeBuddy插件登录腾讯的CodeBuddy后,大模型选择deepseek-v3,输入提示语:...

AI+低代码技术揭秘(二):核心架构

本文档介绍了为VTJ低代码平台提供支持的基本架构组件,包括Engine编排层、Provider服务系统、数据模型和代码生成管道。有关UI组件库和widget系统的信息,请参阅UI...

GitDiagram用AI把代码库变成可视化架构图

这是一个名为gitdiagram的开源工具,可将GitHub仓库实时转换为交互式架构图,帮助开发者快速理解代码结构。核心功能一键可视化:替换GitHubURL中的"hub...

30天自制操作系统:第六天:代码架构整理与中断处理

1.拆开bootpack.c文件。根据设计模式将对应的功能封装成独立的文件。2.初始化pic:pic(可编程中断控制器):在设计上,cpu单独只能处理一个中断。而pic是将8个中断信号集合成一个中断...

AI写代码越帮越忙?2025年研究揭露惊人真相

近年来,AI工具如雨后春笋般涌现,许多人开始幻想程序员的未来就是“对着AI说几句话”,就能轻松写出完美的代码。然而,2025年的一项最新研究却颠覆了这一期待,揭示了一个令人意外的结果。研究邀请了16位...

一键理解开源项目:两个自动生成GitHub代码架构图与说明书工具

一、GitDiagram可以一键生成github代码仓库的架构图如果想要可视化github开源项目:https://github.com/luler/reflex_ai_fast,也可以直接把域名替换...

5分钟掌握 c# 网络通讯架构及代码示例

以下是C#网络通讯架构的核心要点及代码示例,按协议类型分类整理:一、TCP协议(可靠连接)1.同步通信//服务器端usingSystem.Net.Sockets;usingTcpListene...

从复杂到优雅:用建造者和责任链重塑代码架构

引用设计模式是软件开发中的重要工具,它为解决常见问题提供了标准化的解决方案,提高了代码的可维护性和可扩展性,提升了开发效率,促进了团队协作,提高了软件质量,并帮助开发者更好地适应需求变化。通过学习和应...

低代码开发当道,我还需要学习LangChain这些框架吗?| IT杂谈

专注LLM深度应用,关注我不迷路前两天有位兄弟问了个问题:当然我很能理解这位朋友的担忧:期望效率最大化,时间用在刀刃上,“不要重新发明轮子”嘛。铺天盖地的AI信息轰炸与概念炒作,很容易让人浮躁与迷茫。...

框架设计并不是简单粗暴地写代码,而是要先弄清逻辑

3.框架设计3.框架设计本节我们要开发一个UI框架,底层以白鹭引擎为例。框架设计的第一步并不是直接撸代码,而是先想清楚设计思想,抽象。一个一个的UI窗口是独立的吗?不是的,...

大佬用 Avalonia 框架开发的 C# 代码 IDE

AvalonStudioAvalonStudio是一个开源的跨平台的开发编辑器(IDE),AvalonStudio的目标是成为一个功能齐全,并且可以让开发者快速使用的IDE,提高开发的生产力。A...

轻量级框架Lagent 仅需20行代码即可构建自己的智能代理

站长之家(ChinaZ.com)8月30日消息:Lagent是一个专注于基于LLM模型的代理开发的轻量级框架。它的设计旨在简化和提高这种模型下代理的开发效率。LLM模型是一种强大的工具,可以...

取消回复欢迎 发表评论: