文章目录
2、Fixed interval micro-batches(固定间隔批次)
3、 One-time micro-batch (仅一次触发)
4、Continuous with fixed checkpoint interval(连续处理)
Strctured Streaming简单应用
一、Output Modes输出模式
Structured Streaming中结果输出时outputMode可以设置三种模式,三种默认区别如下:
- Append Mode(默认模式):追加模式,只有自上次触发后追加到结果表中的新行才会被输出。只有select、where、map、flatmap、filter、join查询支持追加模式。
- Complete Mode(完整模式):将整个更新的结果输出。仅可用于代码中有聚合查询情况,代码中没有聚合查询不能使用。
- Update Mode(更新模式):自Spark2.1.1版本后可用,只有自上次触发后更新的行才会被输出。这种模式仅仅输出自上次触发以来发生更改的行。如果结果数据没有聚合操作那么相当于Append Mode。
二、Streaming Table API
在Spark3.1版本之后,我们可以通过DataStreamReader.table()方式实时读取流式表中的数据,使用DataStreamWriter.toTable()向表中实时写数据。
案例:读取Socket数据实时写入到Spark流表中,然后读取流表数据展示数据。
代码示例如下:
package com.lanson.structuredStreaming import org.apache.spark.sql.streaming.StreamingQuery import org.apache.spark.sql.{DataFrame, SparkSession} object StreamTableAPI { def main(args: Array[String]): Unit = { //1.创建对象 val spark: SparkSession = SparkSession.builder().master("local") .appName("StreamTableAPI") .config("spark.sql.shuffle.partitions", 1) .config("spark.sql.warehouse.dir", "./my-spark-warehouse") .getOrCreate() spark.sparkContext.setLogLevel("Error"); import spark.implicits._ //2.读取socket数据,注册流表 val df: DataFrame = spark.readStream .format("socket") .option("host", "node3") .option("port", 9999) .load() //3.对df进行转换 val personinfo: DataFrame = df.as[String] .map(line => { val arr: Array[String] = line.split(",") (arr(0).toInt, arr(1), arr(2).toInt) }).toDF("id", "name", "age") //4.将以上personinfo 写入到流表中 personinfo.writeStream .option("checkpointLocation","./checkpoint/dir1") .toTable("mytbl") import org.apache.spark.sql.functions._ //5.读取mytbl 流表中的数据 val query: StreamingQuery = spark.readStream .table("mytbl") .withColumn("new_age", col("age").plus(6)) .select("id", "name", "age", "new_age") .writeStream .format("console") .start() query.awaitTermination() } }
以上代码编写完成后启动,向控制台输入以下数据:
1,zs,18 2,ls,19 3,ww,20 4,ml,21 5,tq,22
结果输入如下:
注意:以上代码执行时Spark中写出的表由Spark 参数”spark.sql.warehouse.dir”指定的路径临时维护数据,每次执行时,需要将该路径下的表数据清空。
三、Triggers
Structured Streaming Triggers 决定了流式数据被处理时是微批处理还是连续实时处理,以下是支持的Triggers:
实时处理,以下是支持的Triggers:
Trigger Type | 描述 |
Unspecified(默认) |
|
Fixed interval micro-batches(固定间隔批次) |
|
One-time micro-batch(仅一次性触发) |
|
Continuous with fixed checkpoint interval(以固定checkpoint interval连续处理(实验阶段)) |
|
下面以读取Socket数据为例,Scala代码演示各个模式
1、unspecified(默认模式)
代码如下:
//3.默认微批模式执行查询,尽快将结果写出到控制台 val query: StreamingQuery = frame.writeStream .format("console") .start() query.awaitTermination()
2、Fixed interval micro-batches(固定间隔批次)
代码如下:
//3.用户指定固定间隔批次触发查询 val query: StreamingQuery = frame.writeStream .format("console") .trigger(Trigger.ProcessingTime("5 seconds")) // .trigger(Trigger.ProcessingTime(5,TimeUnit.SECONDS) .start() query.awaitTermination()
注意:这种固定间隔批次指的是第一批次处理完成,等待间隔时间,然后处理第二批次数据,依次类推。
3、 One-time micro-batch (仅一次触发)
代码如下:
//4.仅一次触发执行 val query: StreamingQuery = frame.writeStream .format("console") .trigger(Trigger.Once()) .start() query.awaitTermination()
4、Continuous with fixed checkpoint interval(连续处理)
Continuous不再是周期性启动task的批量执行数,而是启动长期运行的task,而是不断一个一个数据进行处理,周期性的通过指定checkpoint来记录状态(如果不指定checkpoint目录,会将状态记录在Temp目录下),保证exactly-once语义,这样就可以实现低延迟。详细内容可以参照后续“Continuous处理”章节。
代码如下:
//3.Continuous 连续触发执行 val query: StreamingQuery = frame.writeStream .format("console") //每10ms 记录一次状态,而不是执行一次 .trigger(Trigger.Continuous(10,TimeUnit.MILLISECONDS)) .option("checkpointLocation","./checkpint/dir4") .start() query.awaitTermination()
- 📢博客主页:https://lansonli.blog.csdn.net
- 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
- 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
- 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨