实时计算框架 Flink 在教育行业的应用实践
ccwgpt 2024-10-26 08:44 26 浏览 0 评论
如今,越来越多的业务场景要求 OLTP 系统能及时得到业务数据计算、分析后的结果,这就需要实时的流式计算如Flink等来保障。例如,在 TB 级别数据量的数据库中,通过 SQL 语句或相关 API直接对原始数据进行大规模关联、聚合操作,是无法做到在极短的时间内通过接口反馈到前端进行展示的。若想实现大规模数据的“即席查询”,就须用实时计算框架构建实时数仓来实现。
本文通过一个教育行业的应用案例,剖析业务系统对实时计算的需求场景,并分析了 Flink和Spark 两种实现方式的异同,最后通过运用UCloud UFlink产品中封装的SQL模块,来加速开发效率,更快地完成需求。
1.1 业务场景简述
在这个 K12 教育的业务系统中,学生不仅局限于纸质的练习册进行练习,还可以通过各类移动终端进行练习。基于移动终端,可以更方便地收集学生的学习数据,然后通过大数据分析,量化学习状态,快速定位薄弱知识点,进行查缺补漏。
在这套业务系统中,学生在手机 App 中对老师布置的作业进行答题训练,每次答题训练提交的数据格式如下表所示:
字段含义举例student_id学生唯一ID学生ID_16textbook_id教材唯一ID教材ID_1grade_id年级唯一ID年级ID_1subject_id科目唯一ID科目ID_2_语文chapter_id章节唯一ID章节ID_chapter_2question_id题目唯一ID题目ID_100score当前题目扣分(0 ~ 10)2answer_time当前题目作答完毕的日期与时间2019-09-11 12:44:01ts当前题目作答完毕的时间戳(java.sql.Timestamp)Sep 11, 2019 12:44:01 PM
例如,传入到后台的单条答题记录数据格式如下:
{ "student_id": "学生ID_16", "textbook_id": "教材ID_1", "grade_id": "年级ID_1", "subject_id": "科目ID_2_语文", "chapter_id": "章节ID_chapter_2", "question_id": "题目ID_100", "score": 2, "answer_time": "2019-09-11 12:44:01", "ts": "Sep 11, 2019 12:44:01 PM" }
然后,基于上述实时流入的数据,需要实现如下的分析任务:
?实时统计每个题目被作答频次
?按照年级实时统计题目被作答频次
?按照科目实时统计每个科目下题目的作答频次
1.2 技术方案选型
针对上述几个需求点,设计了如下的方案。首先会将数据实时发送到 Kafka 中,然后再通过实时计算框架从 Kafka 中读取数据,并进行分析计算,最后将计算结果重新输出到 Kafka 另外的主题中,以方便下游框架使用聚合好的结果。
下游框架从 Kafka 中拿到聚合好的数据,并实时录入到 OLTP 的业务库中(例如:MySQL、UDW、HBase、ES等),以便于接口将想要的结果实时反馈给前端。
中间的实时计算框架,则在Flink和Spark中选择。2018 年 08 月 08 日,Flink 1.6.0 推出,支持状态过期管理(FLINK-9510, FLINK-9938)、支持RocksDB、在 SQL 客户端中支持 UDXF 函数,大大加强了 SQL 处理功能,同时还支持 DML 语句、支持基于多种时间类型的事件处理、Kafka Table Sink等功能。随后推出的 Flink 1.6.x 系列版本中,进行了大量优化。这些使得 Flink 成为一个很好的选择。
早先 Spark 要解决此类需求,是通过 Spark Streaming 组件实现。为此需要先生成 RDD,然后通过 RDD 算子进行分析,或者将 RDD 转换为 DataSet\DataFrame、创建临时视图,并通过 SQL 语法或者 DSL 语法进行分析。相比之下显得不够便捷和高效。后来 Spark 2.0.0 新增了 Structured Streaming 组件,具有了更快的流式处理能力,可达到和 Flink 接近的效果。
架构如下图所示:
本篇将省略下游框架的操作,重点介绍Flink框架进行任务计算的过程(虚线框中的内容),并简述Spark的实现方法,便于读者理解其异同。
1.3 实时计算在学情分析系统中的具体实现
1.3.1 Flink 实践方案
1. 发送数据到 Kafka
后台服务通过 Flume 或后台接口触发的方式调用 Kafka 生产者 API,实时将数据发送到 Kafka 指定主题中。
例如发送数据如下所示:
{"student_id":"学生ID_16","textbook_id":"教材ID_1","grade_id":"年级ID_1","subject_id":"科目ID_2_语文","chapter_id":"章节ID_chapter_2","question_id":"题目ID_100","score":2,"answer_time":"2019-09-11 12:44:01","ts":"Sep 11, 2019 12:44:01 PM"} ………
提示:此处暂且忽略在 Kafka 集群中创建 Topic 的操作。
2. 编写 Flink 任务分析代码
使用 Flink 处理上述需求,需要将实时数据转换为 DataStream 实例,并通过 DataStream 算子进行任务分析,另外,如果想使用 SQL 语法或者 DSL 语法进行任务分析,则需要将 DataStream 转换为 Table 实例,并注册临时视图。
(1)构建 Flink env
env(StreamExecutionEnvironment) 是 Flink 当前上下文对象,用于后续生成DataStream。代码如下所示:
val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(3)
(2)从 Kafka 读取答题数据
在 Flink 中读取 Kafka 数据需要指定 KafkaSource,代码如下所示:
val props = new Properties() props.setProperty("bootstrap.servers", "linux01:9092,linux02:9092,linux03:9092") props.setProperty("group.id", "group_consumer_learning_test01") val flinkKafkaSource = new FlinkKafkaConsumer011[String]("test_topic_learning_1", new SimpleStringSchema(), props) val eventStream = env.addSource[String](flinkKafkaSource)
(3)进行 JSON 解析
这里通过 map 算子实现 JSON 解析,代码示例如下:
val answerDS = eventStream.map(s => { val gson = new Gson() val answer = gson.fromJson(s, classOf[Answer]) answer })
(4)注册临时视图
创建临时视图的目的,是为了在稍后可以基于 SQL 语法来进行数据分析,降低开发工作量。需要先获取TableEnv 实例,再将 DataStream 实例转换为 Table 实例,最后将其注册为临时视图。代码如下所示:
val tableEnv = StreamTableEnvironment.create(env) val table = tableEnv.fromDataStream(answerDS) tableEnv.registerTable("t_answer", table)
(5)进行任务分析
接下来,便可以通过 SQL 语句来进行数据分析任务了,3 个需求对应的分析代码如下所示:
//实时:统计题目被作答频次 val result1 = tableEnv.sqlQuery( """SELECT | question_id, COUNT(1) AS frequency |FROM | t_answer |GROUP BY | question_id """.stripMargin) //实时:按照年级统计每个题目被作答的频次 val result2 = tableEnv.sqlQuery( """SELECT | grade_id, COUNT(1) AS frequency |FROM | t_answer |GROUP BY | grade_id """.stripMargin) //实时:统计不同科目下,每个题目被作答的频次 val result3 = tableEnv.sqlQuery( """SELECT | subject_id, question_id, COUNT(1) AS frequency |FROM | t_answer |GROUP BY | subject_id, question_id """.stripMargin)
此时得到的 result1、result2、result3 均为 Table 实例。
(6)实时输出分析结果
接下来,将不同需求的统计结果分别输出到不同的 Kafka 主题中即可。
在 Flink 中,输出数据之前,需要先将 Table 实例转换为 DataStream 实例,然后通过 addSink 算子添加 KafkaSink即可。
因为涉及到聚合操作,Table 实例需要通过 RetractStream 来转换为 DataStream 实例。
该部分代码如下所示:
tableEnv.toRetractStream[Result1](result1) .filter(_._1) .map(_._2) .map(new Gson().toJson(_)) .addSink(new FlinkKafkaProducer011[String]("linux01:9092,linux02:9092,linux03:9092", "test_topic_learning_2", new SimpleStringSchema())) tableEnv.toRetractStream[Result2](result2) .filter(_._1) .map(_._2) .map(new Gson().toJson(_)) .addSink(new FlinkKafkaProducer011[String]("linux01:9092,linux02:9092,linux03:9092", "test_topic_learning_3", new SimpleStringSchema())) tableEnv.toRetractStream[Result3](result3) .filter(_._1) .map(_._2) .map(new Gson().toJson(_)) .addSink(new FlinkKafkaProducer011[String]("linux01:9092,linux02:9092,linux03:9092", "test_topic_learning_4", new SimpleStringSchema()))
(7)执行分析计划
Flink 支持多流任务同时运行,执行分析计划代码如下所示:
env.execute("Flink StreamingAnalysis")
至此,编译并运行项目后,即可看到实时的统计结果,如下图所示,从左至右的 3 个窗体中,分别代表对应需求的输出结果。
1.3.2 Spark 基于Structured Streaming的实现
Spark发送数据到Kafka,及最后的执行分析计划,与Flink无区别,不再展开。下面简述差异点。
1. 编写 Spark 任务分析代码
(1)构建 SparkSession
如果需要使用 Spark 的Structured Streaming组件,首先需要创建 SparkSession 实例,代码如下所示:
val sparkConf = new SparkConf() .setAppName("StreamingAnalysis") .set("spark.local.dir", "F:\\temp") .set("spark.default.parallelism", "3") .set("spark.sql.shuffle.partitions", "3") .set("spark.executor.instances", "3") val spark = SparkSession .builder .config(sparkConf) .getOrCreate()
(2)从 Kafka 读取答题数据
接下来,从 Kafka 中实时读取答题数,并生成 streaming-DataSet 实例,代码如下所示:
val inputDataFrame1 = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "linux01:9092,linux02:9092,linux03:9092") .option("subscribe", "test_topic_learning_1") .load()
(3)进行 JSON 解析
从 Kafka 读取到数据后,进行 JSON 解析,并封装到 Answer 实例中,代码如下所示:
val keyValueDataset1 = inputDataFrame1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)] val answerDS = keyValueDataset1.map(t => { val gson = new Gson() val answer = gson.fromJson(t._2, classOf[Answer]) answer })
其中 Answer 为 Scala 样例类,代码结构如下所示:
case class Answer(student_id: String, textbook_id: String, grade_id: String, subject_id: String, chapter_id: String, question_id: String, score: Int, answer_time: String, ts: Timestamp) extends Serializable
(4)创建临时视图
创建临时视图代码如下所示:
answerDS.createTempView("t_answer")
(5)进行任务分析
仅以需求1(统计题目被作答频次)为例,编写代码如下所示:
?实时:统计题目被作答频次
//实时:统计题目被作答频次 val result1 = spark.sql( """SELECT | question_id, COUNT(1) AS frequency |FROM | t_answer |GROUP BY | question_id """.stripMargin).toJSON
(6)实时输出分析结果
仅以需求1为例,输出到Kafka 的代码如下所示:
result1.writeStream .outputMode("update") .trigger(Trigger.ProcessingTime(0)) .format("kafka") .option("kafka.bootstrap.servers", "linux01:9092,linux02:9092,linux03:9092") .option("topic", "test_topic_learning_2") .option("checkpointLocation", "./checkpoint_chapter11_1") .start()
1.3.3 使用 UFlink SQL 加速开发
通过上文可以发现,无论基于Flink还是Spark通过编写代码实现数据分析任务时,都需要编写大量的代码,并且在生产集群上运行时,需要打包程序,然后提交打包后生成的 Jar 文件到集群上运行。
为了简化开发者的工作量,不少开发者开始致力于 SQL 模块的封装,希望能够实现只写 SQL 语句,就完成类似上述的需求。UFlink SQL 即是 UCloud 为简化计算模型、降低用户使用实时计算UFlink产品门槛而推出的一套符合 SQL 语义的开发套件。通过 UFlink SQL 模块可以快速完成这一工作,实践如下。
1. 创建 UKafka 集群
在UCloud控制台UKafka创建页,选择配置并设置相关阈值,创建UKafka集群。
提示:此处暂且忽略在 Kafka 集群中创建 Topic 的操作。
2. 创建 UFlink 集群
?在UCloud控制台UFlink创建页,选择配置和运行模式,创建一个 Flink 集群。
?完成创建
3. 编写 SQL 语句
完成之后,只需要在工作空间中创建如下形式的 SQL 语句,即可完成上述3个需求分析任务。
(1)创建数据源表
创建数据源表,本质上就是为 Flink 当前上下文环境执行 addSource 操作,SQL 语句如下:
CREATE TABLE t_answer( student_id VARCHAR, textbook_id VARCHAR, grade_id VARCHAR, subject_id VARCHAR, chapter_id VARCHAR, question_id VARCHAR, score INT, answer_time VARCHAR, ts TIMESTAMP )WITH( type ='kafka11', bootstrapServers ='ukafka-mqacnjxk-kafka001:9092,ukafka-mqacnjxk-kafka002:9092,ukafka-mqacnjxk-kafka003:9092', zookeeperQuorum ='ukafka-mqacnjxk-kafka001:2181/ukafka', topic ='test_topic_learning_1', groupId = 'group_consumer_learning_test01', parallelism ='3' );
(2)创建结果表
创建结果表,本质上就是为 Flink 当前上下文环境执行 addSink 操作,SQL 语句如下:
CREATE TABLE t_result1( question_id VARCHAR, frequency INT )WITH( type ='kafka11', bootstrapServers ='ukafka-mqacnjxk-kafka001:9092,ukafka-mqacnjxk-kafka002:9092,ukafka-mqacnjxk-kafka003:9092', zookeeperQuorum ='ukafka-mqacnjxk-kafka001:2181/ukafka', topic ='test_topic_learning_2', parallelism ='3' ); CREATE TABLE t_result2( grade_id VARCHAR, frequency INT )WITH( type ='kafka11', bootstrapServers ='ukafka-mqacnjxk-kafka001:9092,ukafka-mqacnjxk-kafka002:9092,ukafka-mqacnjxk-kafka003:9092', zookeeperQuorum ='ukafka-mqacnjxk-kafka001:2181/ukafka', topic ='test_topic_learning_3', parallelism ='3' ); CREATE TABLE t_result3( subject_id VARCHAR, question_id VARCHAR, frequency INT )WITH( type ='kafka11', bootstrapServers ='ukafka-mqacnjxk-kafka001:9092,ukafka-mqacnjxk-kafka002:9092,ukafka-mqacnjxk-kafka003:9092', zookeeperQuorum ='ukafka-mqacnjxk-kafka001:2181/ukafka', topic ='test_topic_learning_4', parallelism ='3' );
(3)执行查询计划
最后,执行查询计划,并向结果表中插入查询结果,SQL 语句形式如下:
INSERT INTO t_result1 SELECT question_id, COUNT(1) AS frequency FROM t_answer GROUP BY question_id; INSERT INTO t_result2 SELECT grade_id, COUNT(1) AS frequency FROM t_answer GROUP BY grade_id; INSERT INTO t_result3 SELECT subject_id, question_id, COUNT(1) AS frequency FROM t_answer GROUP BY subject_id, question_id;
SQL 语句编写完毕后,将其直接粘贴到 UFlink 前端页面对话框中,并提交任务,即可快速完成上述 3 个需求。如下图所示:
1.3.4. UFlink SQL 支持多流 JOIN
Flink、Spark 目前都支持多流 JOIN,即stream-stream join,并且也都支持Watermark处理延迟数据,以上特性均可以在 SQL 中体现,得益于此,UFlink SQL 也同样支持纯 SQL 环境下进行 JOIN 操作、维表JOIN操作、自定义函数操作、JSON数组解析、嵌套JSON解析等。
免费分享Java技术资料,需要的朋友可以在关注我后私信我,自助领取
原文:https://mp.weixin.qq.com/s/JFcANUK_Vfa7ZMXnn7sruQ
来源:微信公众号
作者:刘景泽
相关推荐
- 十分钟让你学会LNMP架构负载均衡(impala负载均衡)
-
业务架构、应用架构、数据架构和技术架构一、几个基本概念1、pv值pv值(pageviews):页面的浏览量概念:一个网站的所有页面,在一天内,被浏览的总次数。(大型网站通常是上千万的级别)2、u...
- AGV仓储机器人调度系统架构(agv物流机器人)
-
系统架构层次划分采用分层模块化设计,分为以下五层:1.1用户接口层功能:提供人机交互界面(Web/桌面端),支持任务下发、实时监控、数据可视化和报警管理。模块:任务管理面板:接收订单(如拣货、...
- 远程热部署在美团的落地实践(远程热点是什么意思)
-
Sonic是美团内部研发设计的一款用于热部署的IDEA插件,本文其实现原理及落地的一些技术细节。在阅读本文之前,建议大家先熟悉一下Spring源码、SpringMVC源码、SpringBoot...
- springboot搭建xxl-job(分布式任务调度系统)
-
一、部署xxl-job服务端下载xxl-job源码:https://gitee.com/xuxueli0323/xxl-job二、导入项目、创建xxl_job数据库、修改配置文件为自己的数据库三、启动...
- 大模型:使用vLLM和Ray分布式部署推理应用
-
一、vLLM:面向大模型的高效推理框架1.核心特点专为推理优化:专注于大模型(如GPT-3、LLaMA)的高吞吐量、低延迟推理。关键技术:PagedAttention:类似操作系统内存分页管理,将K...
- 国产开源之光【分布式工作流调度系统】:DolphinScheduler
-
DolphinScheduler是一个开源的分布式工作流调度系统,旨在帮助用户以可靠、高效和可扩展的方式管理和调度大规模的数据处理工作流。它支持以图形化方式定义和管理工作流,提供了丰富的调度功能和监控...
- 简单可靠高效的分布式任务队列系统
-
#记录我的2024#大家好,又见面了,我是GitHub精选君!背景介绍在系统访问量逐渐增大,高并发、分布式系统成为了企业技术架构升级的必由之路。在这样的背景下,异步任务队列扮演着至关重要的角色,...
- 虚拟服务器之间如何分布式运行?(虚拟服务器部署)
-
在云计算和虚拟化技术快速发展的今天,传统“单机单任务”的服务器架构早已难以满足现代业务对高并发、高可用、弹性伸缩和容错容灾的严苛要求。分布式系统应运而生,并成为支撑各类互联网平台、企业信息系统和A...
- 一文掌握 XXL-Job 的 6 大核心组件
-
XXL-Job是一个分布式任务调度平台,其核心组件主要包括以下部分,各组件相互协作实现高效的任务调度与管理:1.调度注册中心(RegistryCenter)作用:负责管理调度器(Schedule...
- 京东大佬问我,SpringBoot中如何做延迟队列?单机与分布式如何做?
-
京东大佬问我,SpringBoot中如何做延迟队列?单机如何做?分布式如何做呢?并给出案例与代码分析。嗯,用户问的是在SpringBoot中如何实现延迟队列,单机和分布式环境下分别怎么做。这个问题其实...
- 企业级项目组件选型(一)分布式任务调度平台
-
官网地址:https://www.xuxueli.com/xxl-job/能力介绍架构图安全性为提升系统安全性,调度中心和执行器进行安全性校验,双方AccessToken匹配才允许通讯;调度中心和执...
- python多进程的分布式任务调度应用场景及示例
-
多进程的分布式任务调度可以应用于以下场景:分布式爬虫:importmultiprocessingimportrequestsdefcrawl(url):response=re...
- SpringBoot整合ElasticJob实现分布式任务调度
-
介绍ElasticJob是面向互联网生态和海量任务的分布式调度解决方案,由两个相互独立的子项目ElasticJob-Lite和ElasticJob-Cloud组成。它通过弹性调度、资源管控、...
- 分布式可视化 DAG 任务调度系统 Taier 的整体流程分析
-
Taier作为袋鼠云的开源项目之一,是一个分布式可视化的DAG任务调度系统。旨在降低ETL开发成本,提高大数据平台稳定性,让大数据开发人员可以在Taier直接进行业务逻辑的开发,而不用关...
- SpringBoot任务调度:@Scheduled与TaskExecutor全面解析
-
一、任务调度基础概念1.1什么是任务调度任务调度是指按照预定的时间计划或特定条件自动执行任务的过程。在现代应用开发中,任务调度扮演着至关重要的角色,它使得开发者能够自动化处理周期性任务、定时任务和异...
你 发表评论:
欢迎- 一周热门
- 最近发表
- 标签列表
-
- MVC框架 (46)
- spring框架 (46)
- 框架图 (58)
- flask框架 (53)
- quartz框架 (51)
- abp框架 (47)
- jpa框架 (47)
- laravel框架 (46)
- springmvc框架 (49)
- 分布式事务框架 (65)
- scrapy框架 (56)
- shiro框架 (61)
- 定时任务框架 (56)
- java日志框架 (61)
- JAVA集合框架 (47)
- grpc框架 (55)
- ppt框架 (48)
- 内联框架 (52)
- winform框架 (46)
- gui框架 (44)
- cad怎么画框架 (58)
- ps怎么画框架 (47)
- ssm框架实现登录注册 (49)
- oracle字符串长度 (48)
- oracle提交事务 (47)