阅读量:0
文章目录
Structured Streaming入门案例
我们使用Structured Streaming来监控socket数据统计WordCount。这里我们使用Spark版本为3.4.3版本,首先在Maven pom文件中导入以下依赖:
<!-- 配置以下可以解决 在jdk1.8环境下打包时报错 “-source 1.5 中不支持 lambda 表达式” --> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <spark.version>3.4.3</spark.version> </properties> <dependencies> <!-- Spark-core --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>${spark.version}</version> </dependency> <!-- SparkSQL --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>${spark.version}</version> </dependency> <!-- SparkSQL ON Hive--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.12</artifactId> <version>${spark.version}</version> </dependency> <!--mysql依赖的jar包--> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.47</version> </dependency> <!--SparkStreaming--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.12</artifactId> <version>${spark.version}</version> </dependency> <!-- Kafka 0.10+ Source For Structured Streaming--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0-10_2.12</artifactId> <version>${spark.version}</version> </dependency> <!-- 向kafka 生产数据需要包 --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.8.0</version> </dependency> <!-- Scala 包--> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.12.15</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-compiler</artifactId> <version>2.12.15</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-reflect</artifactId> <version>2.12.15</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.12</version> </dependency> <dependency> <groupId>com.google.collections</groupId> <artifactId>google-collections</artifactId> <version>1.0</version> </dependency> </dependencies>
一、Scala代码如下
package com.lanson.structuredStreaming /** * Structured Streaming 实时读取Socket数据 */ import org.apache.spark.sql.streaming.StreamingQuery import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} /** * Structured Streaming 读取Socket数据 */ object SSReadSocketData { def main(args: Array[String]): Unit = { //1.创建SparkSession对象 val spark: SparkSession = SparkSession.builder() .master("local") .appName("StructuredSocketWordCount") //默认200个并行度,由于源头数据量少,可以设置少一些并行度 .config("spark.sql.shuffle.partitions",1) .getOrCreate() import spark.implicits._ spark.sparkContext.setLogLevel("Error") //2.读取Socket中的每行数据,生成DataFrame默认列名为"value" val lines: DataFrame = spark.readStream .format("socket") .option("host", "node3") .option("port", 9999) .load() //3.将每行数据切分成单词,首先通过as[String]转换成Dataset操作 val words: Dataset[String] = lines.as[String].flatMap(line=>{line.split(" ")}) //4.按照单词分组,统计个数,自动多一个列count val wordCounts: DataFrame = words.groupBy("value").count() //5.启动流并向控制台打印结果 val query: StreamingQuery = wordCounts.writeStream //更新模式设置为complete .outputMode("complete") .format("console") .start() query.awaitTermination() } }
二、Java 代码如下
package com.lanson.structuredStreaming; import java.util.Arrays; import java.util.Iterator; import java.util.concurrent.TimeoutException; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.streaming.StreamingQuery; import org.apache.spark.sql.streaming.StreamingQueryException; public class SSReadSocketData01 { public static void main(String[] args) throws StreamingQueryException, TimeoutException { SparkSession spark = SparkSession.builder().master("local") .appName("SSReadSocketData01") .config("spark.sql.shuffle.partitions", 1) .getOrCreate(); spark.sparkContext().setLogLevel("Error"); Dataset<Row> lines = spark.readStream().format("socket") .option("host", "node3") .option("port", 9999) .load(); Dataset<String> words = lines.as(Encoders.STRING()) .flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String line) throws Exception { return Arrays.asList(line.split(" ")).iterator(); } }, Encoders.STRING()); Dataset<Row> wordCounts = words.groupBy("value").count(); StreamingQuery query = wordCounts.writeStream() .outputMode("complete") .format("console") .start(); query.awaitTermination(); } }
以上代码编写完成之后,在node3节点执行“nc -lk 9999”启动socket服务器,然后启动代码,向socket中输入以下数据:
第一次输入:a b c 第二次输入:d a c 第三次输入:a b c
可以看到控制台打印如下结果:
------------------------------------------- Batch: 1 ------------------------------------------- +-----+-----+ |value|count| +-----+-----+ | c| 1| | b| 1| | a| 1| +-----+-----+ ------------------------------------------- Batch: 2 ------------------------------------------- +-----+-----+ |value|count| +-----+-----+ | d| 1| | c| 2| | b| 1| | a| 2| +-----+-----+ ------------------------------------------- Batch: 3 ------------------------------------------- +-----+-----+ |value|count| +-----+-----+ | d| 1| | c| 3| | b| 2| | a| 3| +-----+-----+
三、以上代码注意点如下
- SparkSQL 默认并行度为200,这里由于数据量少,可以将并行度通过参数“spark.sql.shuffle.partitions”设置少一些。
- StructuredStreaming读取过来数据默认是DataFrame,默认有“value”名称的列
- 对获取的DataFrame需要通过as[String]转换成Dataset进行操作
- 结果输出时的OutputMode有三种输出模式:Complete Mode、Append Mode、Update Mode。
- 📢博客主页:https://lansonli.blog.csdn.net
- 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
- 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
- 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨