百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术文章 > 正文

spark分布式框架(spark分布式计算框架)

ccwgpt 2024-11-06 09:51 27 浏览 0 评论

本篇文章从spark是什么,为什么要用spark以及spark怎么用三个维度学习spark分布式框架。

(一)spark是什么

Apache Spark是专为大规模数据处理而设计的快速通用的计算引擎。Spark拥有Hadoop MapReduce所具有的优点。但不同于 MapReduce 的是 Job 中间输出结果可以保存在内存中,从而不再需要读写 HDFS,因此 Spark 能更好地适用于数据挖掘与机器学习等需要迭代的 MapReduce 的算法。如下所示,MapReduce计算得到的结果是直接存放到hdfs中,下次MapReduce使用的时候直接用hdfs从读取数据,而Spark是将结果存放到内存中,下次Spark执行的时候直接调用内存中的数据。

Spark 是 Scala 编写。

MapReduce:MR-->hdfs-->MR
Spark:Spark-->内存-->Spark


(二)为什么要用spark

运行速度快:

与Hadoop的MapReduce相比,Spark基于内存的运算要快100倍以上,基于硬盘的运算也要快10倍以上。Spark实现了高效的DAG执行引擎,可以通过基于内存来高效处理数据流。计算的中间结果是存在于内存中

易用性好:

Spark支持Java、Python和Scala的API,还支持超过80种高级算法,使用户可以快速构建不同的应用。而且Spark支持交互式的Python和Scala的Shell,可以非常方便地在这些Shell中使用Spark集群来验证解决问题的方法

通用性强:

Spark提供了统一的解决方案。Spark可以用于,交互式查询(Spark SQL)、实时流处理(Spark Streaming)、机器学习(Spark MLlib)和图计算(GraphX)。这些不同类型的处理都可以在同一个应用中无缝使用。减少了开发和维护的人力成本和部署平台的物力成本

高兼容性:

Spark可以非常方便地与其他的开源产品进行融合。比如,Spark可以使用Hadoop的YARN和Apache Mesos作为它的资源管理和调度器,并且可以处理所有Hadoop支持的数据,包括HDFS、HBase等。这对于已经部署Hadoop集群的用户特别重要,因为不需要做任何数据迁移就可以使用Spark的强大处理能力

官网地址:Apache Spark? - Unified Engine for large-scale data analytics

(三)spark怎么用(这里以Scala为例)

学习编程语言的时候,写的第一个案例大部分是“hello world”,这里就用第一个Spark代码WordCount解释一下。

(1)IDEA 创建 Maven 项目

(2)创建maven项目后配置相关的pom文件,否则无法在项目中使用Scala编写代码

pom.xml如下:

<!-- 配置以下可以解决 在jdk1.8环境下打包时报错 “-source 1.5 中不支持 lambda 表达式” -->
<properties>
  <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  <maven.compiler.source>1.8</maven.compiler.source>
  <maven.compiler.target>1.8</maven.compiler.target>
</properties>

<dependencies>
  <!-- Spark-core -->
  <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.3.1</version>
  </dependency>

  <!-- Scala 包-->
  <dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-library</artifactId>
    <version>2.11.7</version>
  </dependency>
  <dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-compiler</artifactId>
    <version>2.11.7</version>
  </dependency>
  <dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-reflect</artifactId>
    <version>2.11.7</version>
  </dependency>
  <dependency>
    <groupId>log4j</groupId>
    <artifactId>log4j</artifactId>
    <version>1.2.12</version>
  </dependency>
  <dependency>
    <groupId>com.google.collections</groupId>
    <artifactId>google-collections</artifactId>
    <version>1.0</version>
  </dependency>

</dependencies>

<build>
  <plugins>

    <!-- 在maven项目中既有java又有scala代码时配置 maven-scala-plugin 插件打包时可以将两类代码一起打包 -->
    <plugin>
      <groupId>org.scala-tools</groupId>
      <artifactId>maven-scala-plugin</artifactId>
      <version>2.15.2</version>
      <executions>
        <execution>
          <goals>
            <goal>compile</goal>
            <goal>testCompile</goal>
          </goals>
        </execution>
      </executions>
    </plugin>

    <!-- maven 打jar包需要插件 -->
    <plugin>
      <artifactId>maven-assembly-plugin</artifactId>
      <version>2.4</version>
      <configuration>
        <!-- 设置false后是去掉 MySpark-1.0-SNAPSHOT-jar-with-dependencies.jar 后的 “-jar-with-dependencies” -->
        <!--<appendAssemblyId>false</appendAssemblyId>-->
        <descriptorRefs>
          <descriptorRef>jar-with-dependencies</descriptorRef>
        </descriptorRefs>
        <archive>
          <manifest>
            <mainClass>com.lw.scalaspark.core.examples.ExecuteLinuxShell</mainClass>
          </manifest>
        </archive>
      </configuration>
      <executions>
        <execution>
          <id>make-assembly</id>
          <phase>package</phase>
          <goals>
            <goal>assembly</goal>
          </goals>
        </execution>
      </executions>
    </plugin>

  </plugins>
</build>

配置完成pom.xml文件后刷新pom

创建Scala项目

案例:编写第一个案例WorldCount

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object SparkWordCountScala {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf()
      .setMaster("local")
      .setAppName("Spark-Word-Count")
      .set("spark.driver.memery","10G")

    val sc: SparkContext = new SparkContext(conf)
    val line: RDD[String] = sc.textFile("./data/words")
    val word: RDD[String] = line.flatMap(line=>{line.split("")})
    val PairWord: RDD[(String, Int)] = word.map(word=>{new Tuple2(word,1)})
    val result: RDD[(String, Int)] = PairWord.reduceByKey((v1, v2)=>{v1+v2})

    result.foreach(println)

    sc.stop()
  }

执行结果如下:

结合上面案例详细说一下整个Spark代码过程:

1. 创建 SparkConf 对象

可以设置 Application name。

可以设置运行模式

可以设置 Spark applicatiion 的资源需求。

/**
* SparkConf 是Spark的配置,可以设置:
* 1).Spark运行模式
* local:本地运行模式,多用于本地使用eclipse | IDEA 测试代码。
* yarn: hadoop生态圈中的资源调度框架,Spark可以基于Yarn进行调度资源
* standalone:Spark自带的资源调度框架,支持分布式搭建,spark可以基于自带的资源调度框架来进行调度。
* mesos:资源调度框架。
* k8s:虚拟化的方式运行。
*
* 2).可以设置在Spark WEBUI中展示的Spark Application的名称
* 3).可以设置运行的资源情况
* 主要的资源包含core 和内存
*/
 val conf: SparkConf = new SparkConf()
      .setMaster("local")
      .setAppName("Spark-Word-Count")
      .set("spark.driver.memery","10G")

2. 创建 SparkContext 对象

/**
      * SparkContext 是通往Spark集群的唯一通道
    */
val sc: SparkContext = new SparkContext(conf)

3. 基于 Spark 的上下文创建一个 RDD,对 RDD 进行处理。

/**
      * 创建RDD
       * 在Scala中创建RDD的常见方式有以下几种:
       *1、从Scala集合创建:
				*val numbers = sc.parallelize(Array(1, 2, 3, 4, 5))
				*
				*2、从外部存储系统创建:
				*val textFile = sc.textFile("path/to/file.txt")
				*
				*3、从其他RDD转换创建:
				*val mappedRDD = numbers.map(_ * 2)
				*
				*4、使用SparkContext的makeRDD方法:
				*val fromSeq = sc.makeRDD(Array(1, 2, 3, 4, 5))
				*
				*5、从Hadoop支持的数据集创建:
				*val sequenceFile = sc.sequenceFile["key.class","value.class"]("path/to/file.seq")
				*
				*6、从数据库等外部数据源创建,需要使用Spark SQL的JDBC接口:
				*val jdbcRDD = sc.jdbc("jdbc:mysql://localhost/database", "table", "column1='value1'")
    */
val line: RDD[String] = sc.textFile("./data/words")

4. 应用程序中要有 Action 类算子来触发 Transformation 类算子执

行。

/**
      * 通过Transformation 类算子对RDD进行处理
    */
val line: RDD[String] = sc.textFile("./data/words")
val word: RDD[String] = line.flatMap(line=>{line.split("")})
val PairWord: RDD[(String, Int)] = word.map(word=>{new Tuple2(word,1)})
val result: RDD[(String, Int)] = PairWord.reduceByKey((v1, v2)=>{v1+v2})
/**
      * 通过Action 类算子触发Transformation 类算子
    */
result.foreach(println)

5. 关闭 Spark 上下文对象 SparkContext。

  /**
      * 关闭SparkContext
   */
sc.stop()


补充:RDD和算子说明:

RDD:

什么是RDD:

RDD(Resilient Distributed Dateset),弹性分布式数据集。

RDD 的五大特性:

1. RDD 是由一系列的 partition 组成的。

HDFS的最小单位是block,而RDD的最小单位是partition

2. 函数(算子)是作用在每一个 partition上的。

3. RDD 之间有一系列的依赖关系。

RDD理解图中rdd3依赖rdd2,rdd2依赖rdd1

4. 分区器是作用在 K,V 格式的 RDD 上。

说明:K,V 格式的 RDD指存储的数据都是二元组对象,比如


5. RDD 提供一系列最佳的计算位置。

RDD 提供计算最佳位置,体现了数据本地化。体现了大数据中“计算移动数据不移动”的理念。


算子:

算子:可以理解为函数,如上面案例中的textFile、flatMap、map、reduceByKey等

在Spark中,算子分为三类:分别是Transformations 转换算子、Action 行动算子和持久化算子

(1)Transformations 转换算子:也叫懒加载执行算子,如filter、flatMap、map、reduceByKey等

特点:没有Action 行动算子触发,不执行

(2)Action 行动算子:如foreach,collect,count 等

一个 application 应用程序中有几个Action 类算子执行,就有几个 job 运行。

用来触发Transformations 转换算子

(3)持久化算子,也叫控制算子

持久化算子主要有三种cache,persist,checkpoint,持久化的单位是 partition。cache 和 persist 都是懒执行的。必须有一个 action 类算子触发执行。checkpoint 算子不仅能将 RDD持久化到磁盘,还能切断 RDD 之间的依赖关系。

cache:持久化到内存中

用法:

persist:可以指定持久化的级别。最常用的是 MEMORY_ONLY 和

MEMORY_AND_DISK。”_2”表示有副本数。

用法:

持久化级别如下:

注意:chche () =persist()=persist(StorageLevel.Memory_Only)

checkpoint:checkpoint 将 RDD 持久化到磁盘,还可以切断 RDD 之间的依赖关

系。checkpoint 目录数据当 application 执行完之后不会被清除。

用法:

相关推荐

Python+ Appium:Android手机连接与操作详解(附源码)

在移动端自动化测试领域,Appium一直是最热门的开源工具之一。今天这篇文章,我们聚焦Android端自动化测试的完整流程,从环境配置到代码实战,一步一步带你掌握用Python控制Android...

全平台开源即时通讯IM框架MobileIMSDK开发指南,支持鸿蒙NEXT

写在前面在着手基于MobileIMSDK开发自已的即时通讯应用前,建议以Demo工程为脚手架,快速上手MobileIMSDK!Demo工程主要用于演示SDK的API调用等,它位于SDK完整下载包的如下...

移动开发(一):使用.NET MAUI开发第一个安卓APP

对于工作多年的C#程序员来说,近来想尝试开发一款安卓APP,考虑了很久最终选择使用.NETMAUI这个微软官方的框架来尝试体验开发安卓APP,毕竟是使用VisualStudio开发工具,使用起来也...

在安卓系统上开发一款软件详细的流程

安卓app软件开发流程是一个系统而复杂的过程,涉及多个阶段和环节。以下是一个典型的安卓软件开发流程概述:1.需求分析目的:了解用户需求,确定APP的目标、功能、特性和预期效果。活动:开发团队与客户进...

ArkUI-X在Android上使用Fragment开发指南

本文介绍将ArkUI框架的UIAbility跨平台部署至Android平台Fragment的使用说明,实现Android原生Fragment和ArkUI跨平台Fragment的混合开发,方便开发者灵活...

Web3开发者必须要知道的6个框架与开发工具

在Web3领域,随着去中心化应用和区块链的兴起,开发者们需要掌握适用于这一新兴技术的框架与开发工具。这些工具和框架能够提供简化开发流程、增强安全性以及提供更好的用户体验。1.Truffle:Truff...

Python开发web指南之创建你的RESTful APP

上回我们说到了:PythonFlask开发web指南:创建RESTAPI。我们知道了Flask是一个web轻量级框架,可以在上面做一些扩展,我们还用Flask创建了API,也说到了...

python的web开发框架有哪些(python主流web框架)

  python在web开发方面有着广泛的应用。鉴于各种各样的框架,对于开发者来说如何选择将成为一个问题。为此,我特此对比较常见的几种框架从性能、使用感受以及应用情况进行一个粗略的分析。  1Dja...

Qwik:革新Web开发的新框架(webview开源框架)

听说关注我的人,都实现了财富自由!你还在等什么?赶紧加入我们,一起走向人生巅峰!Qwik:革新Web开发的新框架Qwik橫空出世:一场颠覆前端格局的革命?是炒作还是未来?前端框架的更新迭代速度,如同...

Python中Web开发框架有哪些?(python主流web框架)

Python为Web开发提供了许多优秀的框架。以下是一些流行的PythonWeb框架:1.Django:一个高级的Web框架,旨在快速开发干净、实用的Web应用。Django遵...

WPF 工业自动化数据管控框架,支持热拔插 DLL与多语言实现

前言工业自动化开发中,设备数据的采集、处理与管理成为提升生产效率和实现智能制造的关键环节。为了简化开发流程、提高系统的灵活性与可维护性,StarRyEdgeFramework应运而生。该框架专注...

[汇川PLC] 汇川IFA程序框架06-建立气缸控制FB块

前言:汇川的iFA要跟西门子对标啦,这可是新的选择!就在2月14日,汇川刚发布的iFA平台,一眼就能看出来是对标西门子的全集成自动化平台博途(TIAPortal)。这个平台能在同一个...

微软发布.NET 10首个预览版:JIT编译器再进化、跨平台开发更流畅

IT之家2月26日消息,微软.NET团队昨日(2月25日)发布博文,宣布推出.NET10首个预览版更新,重点改进.NETRuntime、SDK、libraries、C#、AS...

大模型部署革命:GGUF量化+vLLM推理的极致性能调优方案

本文较长,建议点赞收藏,以免遗失。更多AI大模型应用开发学习视频及资料,尽在官网-聚客AI学院大模型应用开发微调项目实践课程学习平台一、模型微调核心概念与技术演进1.1微调的本质与优势数学表达:1....

拓扑学到底在研究什么?(拓扑学到底在研究什么问题)

拓扑是“不量尺寸的几何学”,那么它的核心内容,主要方法是什么?如果你问罗巴切夫斯基,他会说“附贴性是物体的一个特殊的属性。如果我们把这个性质掌握,而把物体其他的一切属性,不问是本质的或偶然出现的,均不...

取消回复欢迎 发表评论: