概览
Structured Streaming 是一个可拓展,容错的,基于Spark SQL执行引擎的流处理引擎。使用小量的静态数据模拟流处理。伴随流数据的到来,Spark SQL引擎会逐渐连续处理数据并且更新结果到最终的Table中。你可以在Spark SQL上引擎上使用DataSet/DataFrame API处理流数据的聚集,事件窗口,和流与批次的连接操作等。最后Structured Streaming 系统快速,稳定,端到端的恰好一次保证,支持容错的处理。
使用范例
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder
.appName("StructuredNetworkWordCount")
.getOrCreate()
import spark.implicits._
val lines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
// Split the lines into words
val words = lines.as[String].flatMap(_.split(" "))
// Generate running word count
val wordCounts = words.groupBy("value").count()
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.start()
query.awaitTermination()
编程模型
结构化流的关键思想是将实时数据流视为一个连续附加的表
基本概念
将输入的数据当成一个输入的表格,每一个数据当成输入表的一个新行,如下图所示:
Output是写入到外部存储的写方式,写入方式有不同的模式:
- Complete模式: 将整个更新表写入到外部存储,写入整个表的方式由存储连接器决定。
- Append模式:只有自上次触发后在结果表中附加的新行将被写入外部存储器。这仅适用于结果表中的现有行不会更改的查询。
- Update模式:只有自上次触发后在结果表中更新的行将被写入外部存储器(在Spark 2.0中尚不可用)。注意,这与完全模式不同,因为此模式不输出未更改的行。
处理事件时间和延迟数据
事件时间是嵌入在数据本身中的时间。对于许多应用程序,您可能希望在此事件时间操作。例如,如果要获取IoT设备每分钟生成的事件数,则可能需要使用生成数据的时间(即数据中的事件时间),而不是Spark接收的时间他们。此事件时间在此模型中非常自然地表示 - 来自设备的每个事件都是表中的一行,事件时间是该行中的一个列值。这允许基于窗口的聚合(例如每分钟的事件数)仅仅是偶数时间列上的特殊类型的分组和聚合 - 每个时间窗口是一个组,并且每一行可以属于多个窗口/组。因此,可以在静态数据集(例如,来自收集的设备事件日志)以及数据流上一致地定义这种基于事件时间窗的聚合查询,使得用户的生活更容易。
此外,该模型自然地处理基于其事件时间比预期到达的数据。由于Spark正在更新结果表,因此当存在延迟数据时,它可以完全控制更新旧聚合,以及清除旧聚合以限制中间状态数据的大小。由于Spark 2.1,我们支持水印,允许用户指定后期数据的阈值,并允许引擎相应地清除旧的状态。稍后将在“窗口操作”部分中对此进行详细说明。
容错语义
提供端到端的一次性语义是结构化流的设计背后的关键目标之一。为了实现这一点,我们设计了结构化流源,接收器和执行引擎,以可靠地跟踪处理的确切进展,以便它可以通过重新启动和/或重新处理来处理任何类型的故障。假定每个流源具有偏移量(类似于Kafka偏移量或Kinesis序列号)以跟踪流中的读取位置。引擎使用检查点和预写日志来记录每个触发器中正在处理的数据的偏移范围。流接收器被设计为用于处理再处理的幂等。结合使用可重放源和幂等宿,结构化流可以确保在任何故障下的端到端的一次性语义。
使用DataFrame和DataSet API
从Spark 2.0开始,DataFrames和Datasets可以表示静态,有界数据,以及流式,无界数据。与静态DataSets/ DataFrames类似,您可以使用公共入口点SparkSession(Scala / Java / Python文档)从流源创建流DataFrames /DataSets,并对它们应用与静态DataFrames / Datasets相同的操作。如果您不熟悉Datasets / DataFrames,强烈建议您使用DataFrame / Dataset编程指南熟悉它们。
创建数据框流和数据集流
Streaming DataFrames可以通过SparkSession.readStream()返回的DataStreamReader接口(Scala / Java / Python docs)创建。类似于用于创建静态DataFrame的读取接口,您可以指定源 - 数据格式,模式,选项等的详细信息。
数据源
在Spark 2.0,有几个内置的数据源:
- 文件源:将写入目录中的文件读取为数据流。支持的文件格式有text,csv,json,parquet。请参阅DataStreamReader界面的文档以获取更新的列表,以及每种文件格式支持的选项。注意,文件必须原子地放置在给定目录中,在大多数文件系统中,可以通过文件移动操作来实现。
- Kafka源:从kafka拉取数据,支持kafka broker versions 0.10.0 or higher.从kafka集成指南获取更多信息。
- Socket源(测试用):从套接字连接读取UTF8文本数据。监听服务器套接字在驱动程序。注意,这应该仅用于测试,因为这不提供端到端容错保证
这些示例生成无类型的流式DataFrames,这意味着在编译时不检查DataFrame的模式,仅在提交查询时在运行时检查。一些操作,如map,flatMap等,需要在编译时知道类型。要做到这些,你可以使用与静态DataFrame相同的方法将这些无类型的流DataFrames转换为类型化流数据集。有关更多详细信息,请参阅SQL编程指南。此外,有关支持的流媒体源的更多详细信息将在文档中稍后讨论。
数据框/数据集流的模式推理和分区
默认情况下,基于文件的源的结构化流要求您指定模式,而不是依靠Spark自动推断。此限制确保即使在发生故障的情况下,一致的模式也将用于流式查询。对于临时用例,可以通过将spark.sql.streaming.schemaInference设置为true来重新启用模式推断。
当名为/ key = value /的子目录存在时,发生分区发现,并且列表将自动递归到这些目录中。如果这些列出现在用户提供的模式中,它们将由Spark根据正在读取的文件的路径填充。当查询开始时,组成分区方案的目录必须存在,并且必须保持静态。例如,可以添加/ data / year = 2016 / when / data / year = 2015 /存在,但是更改分区列是无效的(即通过创建目录/ data / date = 2016-04-17 /)。
流式DataFrames/Datasets上的操作
您可以对流式DataFrames /数据集应用各种操作 - 从无类型,类似SQL的操作(例如select,where,groupBy)到类型化的RDD类操作(例如map,filter,flatMap)。有关更多详细信息,请参阅SQL编程指南。让我们来看看一些你可以使用的示例操作。
基本操作 - 选择,投影,聚合
case class DeviceData(device: String, type: String, signal: Double, time: DateTime)
val df: DataFrame = ... // streaming DataFrame with IOT device data with schema { device: string, type: string, signal: double, time: string }
val ds: Dataset[DeviceData] = df.as[DeviceData] // streaming Dataset with IOT device data
// Select the devices which have signal more than 10
df.select("device").where("signal > 10") // using untyped APIs
ds.filter(_.signal > 10).map(_.device) // using typed APIs
// Running count of the number of updates for each device type
df.groupBy("type").count() // using untyped API
// Running average signal for each device type
import org.apache.spark.sql.expressions.scalalang.typed._
ds.groupByKey(_.type).agg(typed.avg(_.signal)) // using typed API
事件时间上的窗口操作
滑动事件时间窗口上的聚合通过结构化流直接进行。理解基于窗口的聚合的关键思想与分组聚合非常相似。在分组聚合中,为用户指定的分组列中的每个唯一值维护聚合值(例如计数)。在基于窗口的聚合的情况下,对于行的事件时间落入的每个窗口维持聚合值。让我们用插图来理解这一点。
想象一下,我们的快速示例被修改,流现在包含行以及生成行的时间。我们不想运行字数,而是计算10分钟内的字数,每5分钟更新一次。也就是说,在10分钟窗口12:00-12:10,12:05-12:15,12:10-12:20等之间接收的词中的字数。注意,12:00 -12:10意味着数据在12:00之后但在12:10之前到达。现在,考虑在12:07收到的一个字。这个单词应该增加对应于两个窗口12:00 - 12:10和12:05 - 12:15的计数。因此,计数将通过分组键(即字)和窗口(可以从事件时间计算)来索引。结果表将如下所示:
由于此窗口类似于分组,因此在代码中,可以使用groupBy()和window()操作来表示窗口化聚合。您可以在Scala / Java / Python中查看以下示例的完整代码。
处理延迟数据和水位线
现在考虑如果事件中的一个到达应用程序的迟到会发生什么。例如,例如,在12:04(即事件时间)生成的词可以由应用在12:11接收到。应用程序应使用时间12:04而不是12:11来更新窗口12:00 - 12:10的旧计数。这在我们的基于窗口的分组中自然地发生 - 结构化流可以长时间地保持部分聚合的中间状态,使得晚期数据可以正确地更新旧窗口的聚集,如下所示:
但是,要运行此查询的天数,系统必须绑定其累积的中间内存中状态的数量。这意味着系统需要知道何时可以从内存中状态删除旧聚合,因为应用程序将不再接收该聚合的延迟数据。为了实现这一点,在Spark 2.1中,我们引入了水印,让我们的引擎自动跟踪数据中的当前事件时间,并尝试相应地清理旧的状态。您可以通过指定事件时间列和根据事件时间预计数据延迟的阈值来定义查询的水印。对于在时间T开始的特定窗口,引擎将保持状态并允许后期数据更新状态,直到(由引擎看到的最大事件时间 - 后期阈值> T)。换句话说,阈值内的晚数据将被聚合,但晚于阈值的数据将被丢弃。让我们用一个例子来理解这个。我们可以使用withWatermark()在上面的例子中轻松定义水印,如下所示。
import spark.implicits._
val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
// Group the data by window and word and compute the count of each group
val windowedCounts = words
.withWatermark("timestamp", "10 minutes")
.groupBy(
window($"timestamp", "10 minutes", "5 minutes"),
$"word")
.count()
在这个例子中,我们定义查询的水印对列“timestamp”的值,并且还定义“10分钟”作为允许数据超时的阈值。如果此查询在Append输出模式(稍后在“输出模式”部分中讨论)中运行,则引擎将从列“timestamp”跟踪当前事件时间,并在最终确定窗口计数和添加之前等待事件时间的额外“10分钟”他们到结果表。这是一个例证。
如图所示,由引擎跟踪的最大事件时间是蓝色虚线,并且在每个触发的开始处设置为(最大事件时间 - '10分钟')的水印是红色线。例如,当引擎观察数据(12:14,狗),它将下一个触发器的水印设置为12:04。对于窗口12:00 - 12:10,部分计数保持为内部状态,而系统正在等待延迟数据。在系统发现数据(即(12:21,owl))使得水印超过12:10之后,部分计数被最终确定并附加到表。此计数将不会进一步更改,因为所有超过12:10的“太晚”数据将被忽略。
请注意,在追加输出模式下,系统必须等待“延迟阈值”时间才能输出窗口的聚合。如果数据可能很晚,(例如1天),并且您希望部分计数而不等待一天,这可能不是理想的。将来,我们将添加更新输出模式,这将允许每次更新聚合写入到每个触发器。
用于清除聚合状态的水印的条件重要的是要注意,水印应当满足以下条件以清除聚合查询中的状态(从Spark 2.1开始,将来会改变)。
- 输出模式必须为追加。完成模式要求保留所有聚合数据,因此不能使用水印来删除中间状态。有关每种输出模式的语义的详细说明,请参见“输出模式”部分。
- 聚合必须具有事件时列,或事件时列上的窗口。
- withWatermark必须在与聚合中使用的时间戳列相同的列上调用。例如:df.withWatermark("time", "1 min").groupBy("time2").count()在Append输出模式下无效,因为水印是在与聚合列不同的列上定义的。
- 其中在要使用水印细节的聚合之前必须调用withWatermark。例如:df.groupBy("time").count().withWatermark("time", "1 min")在Append输出模式中无效。
Join操作
流DataFrames可以与静态DataFrames连接以创建新的流DataFrames。这里有几个例子。
val staticDf = spark.read. ...
val streamingDf = spark.readStream. ...
streamingDf.join(staticDf, "type") // inner equi-join with a static DF
streamingDf.join(staticDf, "type", "right_join") // right outer join with a static DF
不支持的操作
但是,请注意,所有适用于静态DataFrames /数据集的操作在流式DataFrames /数据集中不受支持。虽然这些不支持的操作中的一些将在未来的Spark版本中得到支持,但还有一些基本上难以有效地在流数据上实现。例如,输入流数据集不支持排序,因为它需要跟踪流中接收的所有数据。因此,这在根本上难以有效地执行。从Spark 2.0开始,一些不受支持的操作如下:
- 在流数据集上还不支持多个流聚集(即,流DF上的聚合链)。
- 在流数据集上不支持限制和获取前N行。
- 不支持对流数据集进行不同操作。
- 排序操作仅在聚合后在完整输出模式下支持流数据集。
- 条件支持流式传输和静态数据集之间的外连接。
- 不支持带有流数据集的完全外连接
- 不支持左外部连接与右侧的流数据集
- 不支持左侧的流数据集的右外部联接
- 尚不支持两个流数据集之间的任何类型的连接。
此外,还有一些Dataset方法不能用于流数据集。它们是将立即运行查询并返回结果的操作,这对流数据集没有意义。相反,这些功能可以通过显式地启动流查询来完成(参见下一部分)。
相反,使用ds.groupBy.count()返回包含运行计数的流数据集。
- foreach() - 而是使用ds.writeStream.foreach(...)(参见下一节)。
- show() - 而是使用控制台接收器(请参阅下一节)。
如果您尝试任何这些操作,您将看到一个AnalysisException如“操作XYZ不支持与流DataFrames /数据集”。
启动流式查询
一旦定义了最终结果DataFrame / Dataset,剩下的就是启动流计算。为此,您必须使用通过Dataset.writeStream()返回的DataStreamWriter。您必须在此界面中指定以下一个或多个。
- 输出接收器的详细信息:数据格式,位置等
- 输出模式:指定写入输出接收器的内容。
- 查询名称:(可选)指定查询的唯一名称以进行标识。
- 触发间隔:可选择指定触发间隔。如果未指定,系统将在上一个处理完成后立即检查新数据的可用性。如果由于先前处理尚未完成而错过触发时间,则系统将尝试在下一触发点处触发,而不是在处理完成之后立即触发。
- 检查点位置:对于可以保证端到端容错的某些输出接收器,请指定系统将写入所有检查点信息的位置。这应该是HDFS兼容的容错文件系统中的目录。检查点的语义将在下一节中更详细地讨论。
输出模式
有几种类型的输出模式:
- 附加模式(默认) - 这是默认模式,其中只有自上次触发后添加到结果表中的新行将输出到接收器。这仅支持那些添加到结果表中的行从不会更改的查询。因此,该模式保证每行只输出一次(假设容错宿)。例如,只有select,where,map,flatMap,filter,join等的查询将支持Append模式。
- 完成模式 - 每次触发后,整个结果表将输出到接收器。聚合查询支持此选项。
- 更新模式 - (在Spark 2.1中不可用)只有结果表中自上次触发后更新的行才会输出到接收器。更多信息将在未来版本中添加。
不同类型的流查询支持不同的输出模式。这里是兼容性矩阵:
查询类型 |
|
支持的输出模式 |
注 |
无聚合的查询 |
Append |
支持完整模式,因为不可能将所有数据保存在结果表中。 |
带有聚合的聚合 |
在带有水印的event-time上聚合 |
Append, Complete |
追加模式使用水印来删除旧的聚合状态。
Append mode uses watermark to drop old aggregation state. But the output of a
windowed aggregation is delayed the late threshold specified in `withWatermark()` as by
the modes semantics, rows can be added to the Result Table only once after they are
finalized (i.e. after watermark is crossed). See
Late Data section for more details.Complete mode does drop not old aggregation state since by definition this mode
preserves all data in the Result Table. |
其他聚合 |
Complete |
Append mode is not supported as aggregates can update thus violating the semantics of
this mode.Complete mode does drop not old aggregation state since by definition this mode
preserves all data in the Result Table. |
输出接收器
有几种类型的内置输出接收器:
- 文件接收器 - 将输出存储到目录。
- Foreach sink - 对输出中的记录运行任意计算。有关详细信息,请参阅后面的部分。
- 控制台接收器(用于调试) - 每次有触发器时将输出打印到控制台/ stdout。这应该用于低数据量上的调试目的,因为每次触发后,整个输出被收集并存储在驱动程序的内存中。
- 内存接收器(用于调试) - 输出作为内存表存储在内存中。支持附加和完成输出模式。这应该用于低数据量上的调试目的,因为每次触发后,整个输出被收集并存储在驱动程序的内存中。
下面是所有接收器的表格和相应的设置:
接收器 |
支持的输出模式 |
用法 |
容错 |
备注 |
文件接收器 |
Append |
writeStream.format("parquet").start() |
Yes |
支持对分区表的写入。按时间分区可能有用。 |
Foreach 接收器 |
Complete |
writeStream.foreach(...).start() |
取决于ForeachWriter实现 |
更多细节在下一节 |
控制台接收器 |
Append, Complete |
writeStream.format("console").start() |
No |
|
内存接收器 |
Append, Complete |
writeStream.format("memory").queryName("table").start() |
No |
将输出数据保存为表,用于交互式查询。表名是查询名称。 |
最后,你必须调用start()才能真正开始执行查询。这返回一个StreamingQuery对象,它是连续运行的执行的句柄。您可以使用此对象来管理查询,我们将在下一小节中讨论。现在,让我们通过几个例子来理解这一切。
// ========== DF with no aggregations ==========
Dataset<row> noAggDF = deviceDataDf.select("device").where("signal > 10");
// Print new data to console
noAggDF
.writeStream()
.format("console")
.start();
// Write new data to Parquet files
noAggDF
.writeStream()
.parquet("path/to/destination/directory")
.start();
// ========== DF with aggregation ==========
Dataset</row><row> aggDF = df.groupBy("device").count();
// Print updated aggregations to console
aggDF
.writeStream()
.outputMode("complete")
.format("console")
.start();
// Have all the aggregates in an in-memory table
aggDF
.writeStream()
.queryName("aggregates") // this query name will be the table name
.outputMode("complete")
.format("memory")
.start();
spark.sql("select * from aggregates").show(); // interactively query in-memory table
使用foreach
foreach操作允许对输出数据计算任意操作。从Spark 2.1开始,这只适用于Scala和Java。要使用这个,你必须实现接口ForeachWriter(Scala / Java docs),它有一个方法,当触发后产生一系列行作为输出时被调用。请注意以下要点。
编写器必须是可序列化的,因- 为它将被序列化并发送到执行器以供执行。
- 所有三个方法,打开,处理和关闭将被调用的执行者。
- 只有当调用open方法时,写程序必须执行所有的初始化(例如打开连接,启动事务等)。请注意,如果在创建对象时在类中有任何初始化,那么该初始化将在驱动程序中进行(因为这是创建实例的地方),这可能不是您想要的。
- 版本和分区是open中的两个参数,它们唯一地表示需要被推出的一组行。版本是一个单调增加的id,随着每个触发器增加。partition是表示输出的分区的id,因为输出是分布式的,并且将在多个执行器上处理。
- open可以使用版本和分区来选择是否需要写行序列。因此,它可以返回true(继续写入)或false(不需要写入)。如果返回false,那么将不会在任何行上调用进程。例如,在部分故障之后,失败触发器的一些输出分区可能已经被提交到数据库。基于存储在数据库中的元数据,写者可以识别已经提交的分区,因此返回false以跳过再次提交它们。
- 每当调用open时,也将调用close(除非JVM由于某些错误而退出)。即使open返回false,也是如此。如果在处理和写入数据时出现任何错误,将使用错误调用close。您有责任清除在开放中创建的状态(例如连接,事务等),以便没有资源泄漏。
管理流式查询
启动查询时创建的StreamingQuery对象可用于监视和管理查询。
StreamingQuery query = df.writeStream().format("console").start(); // get the query object
query.id(); // get the unique identifier of the running query
query.name(); // get the name of the auto-generated or user-specified name
query.explain(); // print detailed explanations of the query
query.stop(); // stop the query
query.awaitTermination(); // block until query is terminated, with stop() or with error
query.exception(); // the exception if the query has been terminated with error
query.sourceStatus(); // progress information about data has been read from the input sources
query.sinkStatus(); // progress information about data written to the output sink
您可以在单个SparkSession中启动任意数量的查询。他们将同时运行共享集群资源。您可以使用sparkSession.streams()获取可用于管理当前活动查询的StreamingQueryManager(Scala / Java / Python文档)。
SparkSession spark = ...
spark.streams().active(); // get the list of currently active streaming queries
spark.streams().get(id); // get a query object by its unique id
spark.streams().awaitAnyTermination(); // block until any one of them terminates
监视流查询
有两个API用于以交互式和异步方式监视和调试活动的查询。
交互式API
您可以使用streamingQuery.lastProgress()和streamingQuery.status()直接获取活动查询的当前状态和指标。 lastProgress()在Scala和Java中返回一个StreamingQueryProgress对象,在Python中返回一个具有相同字段的字典。它具有关于在流的最后触发中所进行的进展的所有信息 - 什么数据被处理,什么是处理速率,等待时间等。还有streamingQuery.recentProgress,它返回最后几个进度的数组。
此外,streamingQuery.status()在Scala和Java中返回StreamingQueryStatus对象,在Python中返回具有相同字段的字典。它提供有关查询立即执行的操作的信息 - 是触发器活动,正在处理数据等。这里有几个例子。
StreamingQuery query = ...
System.out.println(query.lastProgress());
/* Will print something like the following.
{
"id" : "ce011fdc-8762-4dcb-84eb-a77333e28109",
"runId" : "88e2ff94-ede0-45a8-b687-6316fbef529a",
"name" : "MyQuery",
"timestamp" : "2016-12-14T18:45:24.873Z",
"numInputRows" : 10,
"inputRowsPerSecond" : 120.0,
"processedRowsPerSecond" : 200.0,
"durationMs" : {
"triggerExecution" : 3,
"getOffset" : 2
},
"eventTime" : {
"watermark" : "2016-12-14T18:45:24.873Z"
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KafkaSource[Subscribe[topic-0]]",
"startOffset" : {
"topic-0" : {
"2" : 0,
"4" : 1,
"1" : 1,
"3" : 1,
"0" : 1
}
},
"endOffset" : {
"topic-0" : {
"2" : 0,
"4" : 115,
"1" : 134,
"3" : 21,
"0" : 534
}
},
"numInputRows" : 10,
"inputRowsPerSecond" : 120.0,
"processedRowsPerSecond" : 200.0
} ],
"sink" : {
"description" : "MemorySink"
}
}
*/
System.out.println(query.status());
/* Will print something like the following.
{
"message" : "Waiting for data to arrive",
"isDataAvailable" : false,
"isTriggerActive" : false
}
*/
异步API
您还可以通过附加StreamingQueryListener(Scala / Java docs)异步监视与SparkSession相关联的所有查询。使用sparkSession.streams.attachListener()附加自定义StreamingQueryListener对象后,当查询启动和停止以及活动查询中有进度时,您将获得回调。这里是一个例子
SparkSession spark = ...
spark.streams.addListener(new StreamingQueryListener() {
@Overrides void onQueryStarted(QueryStartedEvent queryStarted) {
System.out.println("Query started: " + queryStarted.id());
}
@Overrides void onQueryTerminated(QueryTerminatedEvent queryTerminated) {
System.out.println("Query terminated: " + queryTerminated.id());
}
@Overrides void onQueryProgress(QueryProgressEvent queryProgress) {
System.out.println("Query made progress: " + queryProgress.progress());
}
});
使用检查点从故障中恢复
在故障或故意关闭的情况下,您可以恢复先前查询的先前进度和状态,并继续在其停止的地方。这是通过使用检查点和预写日志来完成的。您可以配置具有检查点位置的查询,并且查询将保存所有进度信息(即每个触发器中处理的偏移范围)和正在运行的聚合(例如快速示例中的字计数)到检查点位置。此检查点位置必须是HDFS兼容文件系统中的路径,并且可以在启动查询时在DataStreamWriter中设置为选项。
aggDF
.writeStream()
.outputMode("complete")
.option("checkpointLocation", "path/to/HDFS/dir")
.format("memory")
.start();</row>