Spark译文(三)

2019年11月14日 阅读数:96
这篇文章主要向大家介绍Spark译文(三),主要内容包括基础应用、实用技巧、原理机制等方面,希望对大家有所帮助。

Structured Streaming Programming Guide(结构化流编程指南)

Overview(概貌)

·Structured Streaming是一种基于Spark SQL引擎的可扩展且容错的流处理引擎。
·您能够像表达静态数据的批处理计算同样表达流式计算。
·Spark SQL引擎将负责逐步和连续地运行它,并在流数据继续到达时更新最终结果。
·您可使用Scala,Java,Python或R中的数据集/数据框架API来表示流聚合,事件时间窗口,流到批处理链接等。计算在同一优化的Spark SQL引擎上执行。
·最后,系统经过检查点和预写日志确保端到端的一次性容错保证。
·简而言之,Structured Streaming提供快速,可扩展,容错,端到端的精确一次流处理,而无需用户推理流式传输。
·在内部,默认状况下,结构化流式查询使用微批处理引擎进行处理,该引擎将数据流做为一系列小批量做业处理,从而实现低至100毫秒的端到端延迟和彻底一次的容错保证。
·可是,自Spark 2.3以来,咱们引入了一种称为连续处理的新型低延迟处理模式,它能够实现低至1毫秒的端到端延迟,而且具备至少一次保证。
·无需更改查询中的数据集/数据框操做,您就能够根据应用程序要求选择模式。
·在本指南中,咱们将引导您完成编程模型和API。
·咱们将主要使用默认的微批处理模型来解释这些概念,而后讨论连续处理模型。
·首先,让咱们从一个结构化流式查询的简单示例开始 - 一个流式字数。

Quick Example(快速示例)

·假设您但愿维护从侦听TCP套接字的数据服务器接收的文本数据的运行字数。
·让咱们看看如何使用Structured Streaming表达这一点。
·您能够在Scala / Java / Python / R中看到完整的代码。
·若是你下载Spark,你能够直接运行这个例子。
·在任何状况下,让咱们一步一步地了解示例,并了解它是如何工做的。
·首先,咱们必须导入必要的类并建立一个本地SparkSession,这是与Spark相关的全部功能的起点。
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode from pyspark.sql.functions import split spark = SparkSession \ .builder \ .appName("StructuredNetworkWordCount") \ .getOrCreate()

接下来,让咱们建立一个流式DataFrame,它表示从侦听localhost:9999的服务器接收的文本数据,并转换DataFrame以计算字数。html

# Create DataFrame representing the stream of input lines from connection to localhost:9999
lines = spark \
    .readStream \ .format("socket") \ .option("host", "localhost") \ .option("port", 9999) \ .load() # Split the lines into words words = lines.select( explode( split(lines.value, " ") ).alias("word") ) # Generate running word count wordCounts = words.groupBy("word").count()
·这行DataFrame表示包含流文本数据的无界表。
·此表包含一列名为“value”的字符串,而且流式文本数据中的每一行都成为表中的一行。
·请注意,因为咱们只是设置转换,而且还没有启动它,所以目前没有接收任何数据。
·接下来,咱们使用了两个内置的SQL函数 - split和explode,将每行分红多行,每行包含一个单词。
·此外,咱们使用函数别名将新列命名为“word”。
·最后,咱们经过对数据集中的惟一值进行分组并对它们进行计数来定义wordCounts DataFrame。
·请注意,这是一个流式DataFrame,它表示流的运行字数。
·咱们如今已经设置了关于流数据的查询。
·剩下的就是实际开始接收数据并计算计数。
·为此,咱们将其设置为每次更新时将完整的计数集(由outputMode(“complete”)指定)打印到控制台。
·而后使用start()开始流式计算。
 # Start running the query that prints the running counts to the console
query = wordCounts \
    .writeStream \ .outputMode("complete") \ .format("console") \ .start() query.awaitTermination()
·执行此代码后,流式计算将在后台启动。
·查询对象是该活动流式查询的句柄,咱们决定使用awaitTermination()等待查询终止,以防止进程在查询处于活动状态时退出。
·要实际执行此示例代码,您能够在本身的Spark应用程序中编译代码,或者只需在下载Spark后运行该示例。
·咱们正在展现后者。
·您首先须要使用Netcat(在大多数类Unix系统中找到的小实用程序)做为数据服务器运行
$ nc -lk 9999

而后,在不一样的终端中,您可使用启动示例java

$ ./bin/spark-submit examples/src/main/python/sql/streaming/structured_network_wordcount.py localhost 9999
·而后,在运行netcat服务器的终端中键入的任何行将被计数并每秒在屏幕上打印。
·它看起来像下面这样。
# TERMINAL 1:
# Running Netcat

$ nc -lk 9999
apache spark
apache hadoop



















...
 
# TERMINAL 2: RUNNING structured_network_wordcount.py

$ ./bin/spark-submit examples/src/main/python/sql/streaming/structured_network_wordcount.py localhost 9999

-------------------------------------------
Batch: 0
-------------------------------------------
+------+-----+
| value|count| +------+-----+ |apache| 1| | spark| 1| +------+-----+ ------------------------------------------- Batch: 1 ------------------------------------------- +------+-----+ | value|count| +------+-----+ |apache| 2| | spark| 1| |hadoop| 1| +------+-----+ ...

Programming Model(编程模型)

·Structured Streaming中的关键思想是将实时数据流视为连续追加的表。
·这致使新的流处理模型很是相似于批处理模型。
·您将流式计算表示为静态表上的标准批处理查询,Spark将其做为无界输入表上的增量查询运行。
·让咱们更详细地了解这个模型。

Basic Concepts(基本概念)

·将输入数据流视为“输入表”。
·到达流的每一个数据项都像一个新行被附加到输入表。

Stream as a Table

·对输入的查询将生成“结果表”。
·每一个触发间隔(例如,每1秒),新行将附加到输入表,最终更新结果表。
·每当结果表更新时,咱们都但愿将更改的结果行写入外部接收器。

Model

·“输出”定义为写入外部存储器的内容。
·输出能够以不一样的模式定义:
·完整模式 - 整个更新的结果表将写入外部存储器。
·由存储链接器决定如何处理整个表的写入。
·追加模式 - 自上次触发后,只有结果表中附加的新行才会写入外部存储器。
·这仅适用于预计结果表中的现有行不会更改的查询。
·更新模式 - 只有自上次触发后在结果表中更新的行才会写入外部存储(自Spark 2.1.1起可用)。
·请注意,这与完整模式的不一样之处在于此模式仅输出自上次触发后已更改的行。
·若是查询不包含聚合,则它将等同于追加模式。
·请注意,每种模式适用于某些类型的查询。
·稍后将对此进行详细讨论。
·为了说明此模型的使用,让咱们在上面的快速示例的上下文中理解模型。
·第一行DataFrame是输入表,最后一个wordCounts DataFrame是结果表。
·请注意,生成wordCounts的流线DataFrame上的查询与静态DataFrame彻底相同。
·可是,当启动此查询时,Spark将不断检查套接字链接中的新数据。
·若是有新数据,Spark将运行“增量”查询,该查询将先前运行的计数与新数据相结合,以计算更新的计数,以下所示。

Model

请注意,Structured Streaming不会实现整个表。·它从流数据源读取最新的可用数据,逐步处理以更新结果,而后丢弃源数据。·它只保留更新结果所需的最小中间状态数据(例如前面例子中的中间计数)python

·该模型与许多其余流处理引擎明显不一样。
·许多流系统要求用户本身维护运行聚合,所以必须推断容错和数据一致性(至少一次,或至多一次,或彻底一次)。
·在此模型中,Spark负责在有新数据时更新结果表,从而减轻用户对其的推理。
·做为一个例子,让咱们看看这个模型如何处理基于事件时间的处理和迟到的数据。

Handling Event-time and Late Data(处理事件时间和后期数据)

·事件时间是嵌入数据自己的时间。
·对于许多应用程序,您可能但愿在此事件时间运行。
·例如,若是您想每分钟获取IoT设备生成的事件数,那么您可能但愿使用生成数据的时间(即数据中的事件时间),而不是Spark接收的时间
·此事件时间在此模型中很是天然地表达 - 来自设备的每一个事件都是表中的一行,事件时间是行中的列值。
·这容许基于窗口的聚合(例如,每分钟的事件数)在事件时间列上只是一种特殊类型的分组和聚合 - 每一个时间窗口是一个组,每行能够属于多个窗口/组。
·所以,能够在静态数据集(例如,来自收集的设备事件日志)以及数据流上一致地定义这种基于事件时间窗口的聚合查询,使得用户的生活更加容易。
·此外,该模型天然地处理基于其事件时间到达的时间晚于预期的数据。
·因为Spark正在更新结果表,所以它能够在存在延迟数据时彻底控制更新旧聚合,以及清理旧聚合以限制中间状态数据的大小。
·从Spark 2.1开始,咱们支持水印,容许用户指定后期数据的阈值,并容许引擎相应地清理旧状态。
·稍后将在“窗口操做”部分中详细介绍这些内容。

Fault Tolerance Semantics(容错语义)

·提供端到端的一次性语义是结构化流的设计背后的关键目标之一。
·为实现这一目标,咱们设计告终构化流媒体源,接收器和执行引擎,以可靠地跟踪处理的确切进度,以便经过从新启动和/或从新处理来处理任何类型的故障。
·假设每一个流源都具备偏移(相似于Kafka偏移或Kinesis序列号)以跟踪流中的读取位置。
·引擎使用检查点和预写日志来记录每一个触发器中正在处理的数据的偏移范围。
·流式接收器设计为处理从新处理的幂等功能。
·结合使用可重放的源和幂等接收器,结构化流能够确保在任何失败的状况下端到端彻底一次的语义。

API using Datasets and DataFrames(使用数据集和数据框架的API)

·从Spark 2.0开始,DataFrames和Datasets能够表示静态的,有界的数据,以及流式无界数据。
·与静态数据集/数据框相似,您可使用公共入口点SparkSession(Scala / Java / Python / R docs)从流源建立流式DataFrames / Datasets,并对它们应用与静态DataFrames / Datasets相同的操做。
·若是您不熟悉数据集/数据框架,强烈建议您使用“数据框架/数据集编程指南”熟悉它们。

Creating streaming DataFrames and streaming Datasets(建立流式DataFrame和流式数据集)

·能够经过SparkSession.readStream()返回的DataStreamReader接口(Scala / Java / Python文档)建立Streaming DataFrame。
·在R中,使用read.stream()方法。
·与用于建立静态DataFrame的读取接口相似,您能够指定源的详细信息 - 数据格式,架构,选项等。

Input Sources(输入源)

·有一些内置源。
·文件来源 - 将目录中写入的文件做为数据流读取。
·支持的文件格式为text,csv,json,orc,parquet。
·有关更新的列表,请参阅DataStreamReader接口的文档,以及每种文件格式支持的选项。
·请注意,文件必须原子地放置在给定目录中,在大多数文件系统中,能够经过文件移动操做来实现。
·Kafka来源 - 从Kafka读取数据。
·它与Kafka经纪人版本0.10.0或更高版本兼容。
·有关更多详细信息,请参阅Kafka集成指南。
·套接字源(用于测试) - 从套接字链接读取UTF8文本数据。
·侦听服务器套接字位于驱动程序中。
·请注意,这应仅用于测试,由于这不提供端到端的容错保证。
·速率源(用于测试) - 以每秒指定的行数生成数据,每一个输出行包含时间戳和值。
·其中timestamp是包含消息调度时间的Timestamp类型,value是包含消息计数的Long类型,从0开始做为第一行。
·此源用于测试和基准测试。
·某些源不具备容错能力,由于它们没法保证在发生故障后可使用检查点偏移重放数据。
·请参阅前面的容错语义部分。
·如下是Spark中全部源代码的详细信息。
Source Options Fault-tolerant Notes
File source path: path to the input directory, and common to all file formats. 
maxFilesPerTrigger: maximum number of new files to be considered in every trigger (default: no max) 
latestFirst: whether to process the latest new files first, useful when there is a large backlog of files (default: false) 
fileNameOnly: whether to check new files based on only the filename instead of on the full path (default: false). With this set to `true`, the following files would be considered as the same file, because their filenames, "dataset.txt", are the same: 
"file:///dataset.txt"
"s3://a/dataset.txt"
"s3n://a/b/dataset.txt"
"s3a://a/b/c/dataset.txt"


For file-format-specific options, see the related methods in DataStreamReader(Scala/Java/Python/R). E.g. for "parquet" format options see DataStreamReader.parquet()

In addition, there are session configurations that affect certain file-formats. See the SQL Programming Guide for more details. E.g., for "parquet", see Parquet configuration section.
Yes Supports glob paths, but does not support multiple comma-separated paths/globs.
Socket Source host: host to connect to, must be specified
port: port to connect to, must be specified
No  
Rate Source rowsPerSecond (e.g. 100, default: 1): How many rows should be generated per second.

rampUpTime (e.g. 5s, default: 0s): How long to ramp up before the generating speed becomes rowsPerSecond. Using finer granularities than seconds will be truncated to integer seconds. 

numPartitions (e.g. 10, default: Spark's default parallelism): The partition number for the generated rows. 

The source will try its best to reach rowsPerSecond, but the query may be resource constrained, and numPartitions can be tweaked to help reach the desired speed.
Yes  
Kafka Source See the Kafka Integration Guide. Yes  
       

Here are some examples.sql

spark = SparkSession. ... # Read text from socket socketDF = spark \ .readStream \ .format("socket") \ .option("host", "localhost") \ .option("port", 9999) \ .load() socketDF.isStreaming() # Returns True for DataFrames that have streaming sources socketDF.printSchema() # Read all the csv files written atomically in a directory userSchema = StructType().add("name", "string").add("age", "integer") csvDF = spark \ .readStream \ .option("sep", ";") \ .schema(userSchema) \ .csv("/path/to/directory") # Equivalent to format("csv").load("/path/to/directory")
·这些示例生成无类型的流式DataFrame,这意味着在编译时不检查DataFrame的架构,仅在提交查询时在运行时检查。
·map,flatMap等一些操做须要在编译时知道类型。
·要执行这些操做,您可使用与静态DataFrame相同的方法将这些无类型流式DataFrame转换为类型化流式数据集。
·有关更多详细信息,请参见SQL编程指南。
·此外,有关受支持的流媒体源的更多详细信息将在本文档后面讨论。

Schema inference and partition of streaming DataFrames/Datasets(流式DataFrames / Datasets的模式推理和分区)

·默认状况下,基于文件的源的结构化流须要您指定架构,而不是依靠Spark自动推断它。
·此限制可确保即便在出现故障的状况下,也将使用一致的架构进行流式查询。
·对于临时用例,能够经过将spark.sql.streaming.schemaInference设置为true来从新启用模式推断。
·当名为/ key = value /的子目录存在且列表将自动递归到这些目录中时,会发生分区发现。
·若是这些列出如今用户提供的模式中,则Spark将根据正在读取的文件的路径填充它们。
·构成分区方案的目录必须在查询开始时存在,而且必须保持静态。
·例如,能够添加/ data / year = 2016 / when / data / year = 2015 /,但更改分区列无效(即经过建立目录/ data / date = 2016-04-
·17 /)。

Operations on streaming DataFrames/Datasets(流式传输DataFrames / Datasets的操做)

·您能够对流式数据框架/数据集应用各类操做 - 从无类型,相似SQL的操做(例如select,where,groupBy)到类型化RDD类操做(例如map,filter,flatMap)。
·有关更多详细信息,请参阅SQL编程指南。
·咱们来看看您可使用的一些示例操做。

Basic Operations - Selection, Projection, Aggregation(基本操做 - 选择,投影,聚合)

·DataFrame / Dataset上的大多数常见操做都支持流式传输。
·本节稍后将讨论几个不受支持的操做
df = ...  # streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: DateType }

# Select the devices which have signal more than 10 df.select("device").where("signal > 10") # Running count of the number of updates for each device type df.groupBy("deviceType").count()

您还能够将流式DataFrame / Dataset注册为临时视图,而后在其上应用SQL命令。apache

df.createOrReplaceTempView("updates") spark.sql("select count(*) from updates") # returns another streaming DF

注意,您可使用df.isStreaming来识别DataFrame / Dataset是否具备流数据。编程

df.isStreaming()

Window Operations on Event Time(事件时间的窗口操做)

·使用结构化流式传输时,滑动事件时间窗口上的聚合很是简单,而且与分组聚合很是类似。
·在分组聚合中,为用户指定的分组列中的每一个惟一值维护聚合值(例如计数)。
·在基于窗口的聚合的状况下,为每一个窗口维护一行的事件时间的聚合值。
·让咱们经过一个例子来理解这一点。
·想一下,咱们的快速示例已被修改,流如今包含行以及生成行的时间。
·咱们不想运行字数,而是计算10分钟内的单词,每5分钟更新一次。
·也就是说,在10分钟窗口12:00-12:10,12:05-12:15,12:10-12:20等之间收到的单词数量。请注意,12:00 - 12:10表示数据
·在12:00以后但在12:10以前到达。
·如今,考虑一下在12:07收到的一个字。
·这个词应该增长对应于两个窗口12:00 - 12:10和12:05 - 12:15的计数。
·所以,计数将由分组键(即单词)和窗口(能够从事件时间计算)二者索引。

结果表看起来以下所示。json

Window Operations

·因为此窗口相似于分组,所以在代码中,您可使用groupBy()和window()操做来表示窗口化聚合。
·您能够在Scala / Java / Python中看到如下示例的完整代码。
words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group windowedCounts = words.groupBy( window(words.timestamp, "10 minutes", "5 minutes"), words.word ).count()

Handling Late Data and Watermarking(处理延迟数据和水印)

·如今考虑若是其中一个事件到达应用程序的后期会发生什么。
·例如,应用程序在12:11能够接收在12:04(即事件时间)生成的单词。
·应用程序应使用时间12:04而不是12:11来更新窗口12:00 - 12:10的旧计数。
·这在咱们基于窗口的分组中天然发生 - 结构化流能够长时间维持部分聚合的中间状态,以便后期数据能够正确更新旧窗口的聚合,以下所示。

Handling Late Data

·可是,要运行此查询数天,系统必须限制它累积的中间内存中状态的数量。
·这意味着系统须要知道什么时候能够从内存状态中删除旧聚合,由于应用程序再也不接收该聚合的后期数据。
·为了实现这一点,咱们在Spark 2.1中引入了水印,使引擎可以自动跟踪数据中的当前事件时间并尝试相应地清理旧状态。
·您能够经过指定事件时间列以及根据事件时间预计数据的延迟时间来定义查询的水印。
·对于在时间T结束的特定窗口,引擎将保持状态并容许延迟数据更新状态直到(引擎看到的最大事件时间 - 晚阈值> T)。
·换句话说,阈值内的后期数据将被聚合,可是晚于阈值的数据将开始被丢弃(参见本节后面的确切保证)。
·让咱们经过一个例子来理解这一点。
·咱们可使用withWatermark()在上一个示例中轻松定义水印,以下所示。
words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group windowedCounts = words \ .withWatermark("timestamp", "10 minutes") \ .groupBy( window(words.timestamp, "10 minutes", "5 minutes"), words.word) \ .count()
·在这个例子中,咱们在“timestamp”列的值上定义查询的水印,而且还将“10分钟”定义为容许数据延迟的阈值。
·若是此查询在更新输出模式下运行(稍后将在“输出模式”部分中讨论),则引擎将继续更新结果表中窗口的计数,直到窗口早于水印,该水印落后于列中的当前事件时间“
·时间戳“10分钟。
·这是一个例子。

Watermarking in Update Mode

·如图所示,引擎跟踪的最大事件时间是蓝色虚线,而且在每一个触发开始时设置为(最大事件时间 - '10分钟')的水印是红线。
·例如,当引擎观察数据(12:14,dog)时,它会将下一次触发的水印设置为12:04。
·该水印使发动机保持中间状态另外10分钟,以容许计算延迟数据。
·例如,数据(12:09,cat)无序且迟到,它在Windows 12:00-12:10和12:05 - 12:15之间。
·由于它仍然在触发器中的水印12:04以前,因此引擎仍然将中间计数保持为状态而且正确地更新相关窗口的计数。
·可是,当水印更新为12:11时,窗口的中间状态(12:00-12:10)被清除,全部后续数据(例如(12:04,驴))被认为“太晚”,所以忽略了.
·注意,在每次触发以后,更新的计数(即紫色行)被写入接收器做为触发输出,如更新模式所指示的。
·某些接收器(例如文件)可能不支持更新模式所需的细粒度更新。
·为了使用它们,咱们还支持附加模式,其中只有最终计数被写入接收器。
·这以下图所示。
·请注意,在非流式数据集上使用withWatermark是no-op。
·因为水印不该以任何方式影响任何批量查询,咱们将直接忽略它。

Watermarking in Append Mode

·与以前的更新模式相似,引擎维护每一个窗口的中间计数。
·可是,部分计数不会更新到结果表,也不会写入接收器。
·引擎等待“10分钟”以计算延迟日期,而后丢弃窗口<水印的中间状态,并将最终计数附加到结果表/接收器。
·例如,仅在水印更新为12:11后,窗口12:00 - 12:10的最终计数才会附加到结果表中。
Conditions for watermarking to clean aggregation state(用于清除聚合状态的水印的条件)
·值得注意的是,在聚合查询中,水印清除状态必须知足如下条件(从Spark 2.1.1开始,未来可能会有变化)。
·输出模式必须为Append或Update。
·完整模式要求保留全部聚合数据,所以不能使用水印来下降中间状态。
·有关每种输出模式语义的详细说明,请参见“输出模式”部分。
·聚合必须具备事件时间列或事件时间列上的窗口。
·必须在与聚合中使用的时间戳列相同的列上调用withWatermark。
·例如,df.withWatermark(“time”,“1 min”)。groupBy(“time2”)。count()在Append输出模式中无效,由于水印是在与聚合列不一样的列上定义的。
·必须在聚合以前调用withWatermark才能使用水印细节。
·例如,df.groupBy(“time”)。count()。withWatermark(“time”,“1 min”)在追加输出模式下无效。
Semantic Guarantees of Aggregation with Watermarking(带水印聚合的语义保证)
·水印延迟(使用withWatermark设置)为“2小时”可确保引擎永远不会丢弃任何延迟小于2小时的数据。
·换句话说,任何不到2小时(在事件时间方面)的数据都保证汇总到那时处理的最新数据。
·可是,保证只在一个方向严格。
·延迟2小时以上的数据不能保证被丢弃;
·它可能会也可能不会聚合。
·更多延迟的是数据,发动机进行处理的可能性较小。

Join Operations(流静态链接)

·结构化流式传输支持将流式数据集/数据框架与静态数据集/数据框架以及另外一个流式数据集/数据框架链接起来。
·流链接的结果以递增方式生成,相似于上一节中的流聚合的结果。
·在本节中,咱们将探讨在上述状况下支持哪一种类型的链接(即内部,外部等)。
·请注意,在全部受支持的链接类型中,与流式数据集/数据框架的链接结果与使用包含流中相同数据的静态数据集/数据框架的结果彻底相同。

Stream-static Joins(流静态链接)

·自Spark 2.0引入以来,Structured Streaming支持流和静态DataFrame / Dataset之间的链接(内链接和某种类型的外链接)。
·这是一个简单的例子。
staticDf = spark.read. ... streamingDf = spark.readStream. ... streamingDf.join(staticDf, "type") # inner equi-join with a static DF streamingDf.join(staticDf, "type", "right_join") # right outer join with a static DF
·请注意,流静态链接不是有状态的,所以不须要进行状态管理。可是,尚不支持几种类型的流静态外链接。这些列在此加入部分的末尾。

Stream-stream Joins(流静态链接)

In Spark 2.3, we have added support for stream-stream joins, that is, you can join two streaming Datasets/DataFrames. The challenge of generating join results between two data streams is that, at any point of time, the view of the dataset is incomplete for both sides of the join making it much harder to find matches between inputs. Any row received from one input stream can match with any future, yet-to-be-received row from the other input stream. Hence, for both the input streams, we buffer past input as streaming state, so that we can match every future input with past input and accordingly generate joined results. Furthermore, similar to streaming aggregations, we automatically handle late, out-of-order data and can limit the state using watermarks. Let’s discuss the different types of supported stream-stream joins and how to use them.bootstrap

Inner Joins with optional Watermarking(内部联合可选水印)
·支持任何类型的列上的内链接以及任何类型的链接条件。
·可是,当流运行时,流状态的大小将无限增加,由于必须保存全部过去的输入,由于任何新输入均可以与过去的任何输入匹配。
·为了不无界状态,您必须定义其余链接条件,以便无限期旧输入没法与未来的输入匹配,所以能够从状态清除。
·换句话说,您必须在链接中执行如下附加步骤。
·定义两个输入上的水印延迟,以便引擎知道输入的延迟时间(相似于流聚合)
·在两个输入上定义事件时间的约束,使得引擎能够肯定什么时候不须要一个输入的旧行(即,将不知足时间约束)与另外一个输入匹配。
·能够用两种方式之必定义该约束。
·时间范围链接条件(例如...... JOE ON leftTime BETWEEN rightTime AND rightTime + INTERVAL 1 HOUR),
·加入事件时间窗口(例如...... JOIN ON leftTimeWindow = rightTimeWindow)。
·让咱们经过一个例子来理解这一点。
·假设咱们但愿加入一系列广告展现次数(展现广告时),并在广告上添加另外一个用户点击流,以便在展现次数达到可获利的点击时进行关联。
·要在此流 - 流链接中容许状态清理,您必须指定水印延迟和时间约束,以下所示。
·水印延迟:好比说,展现次数和相应的点击次数能够分别在事件时间内延迟/无序,最多2个小时和3个小时。
·事件时间范围条件:假设,在相应的印象后0秒到1小时的时间范围内可能发生咔嗒声。
·代码看起来像这样。
from pyspark.sql.functions import expr

impressions = spark.readStream. ... clicks = spark.readStream. ... # Apply watermarks on event-time columns impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours") clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours") # Join with event-time constraints impressionsWithWatermark.join( clicksWithWatermark, expr("""  clickAdId = impressionAdId AND  clickTime >= impressionTime AND  clickTime <= impressionTime + interval 1 hour  """) )
Semantic Guarantees of Stream-stream Inner Joins with Watermarking(具备水印的流内部链接的语义保证)
·这相似于经过聚合水印提供的保证。水印延迟“2小时”可确保发动机永远不会丢失任何延迟小于2小时的数据。但延迟2小时以上的数据可能会或可能不会获得处理。
Outer Joins with Watermarking(带水印的外部链接)
·虽然水印+事件时间约束对于内链接是可选的,但对于左外链接和右外链接,必须指定它们。这是由于为了在外链接中生成NULL结果,引擎必须知道输入行什么时候不会与未来的任何内容匹配。所以,必须指定水印+事件时间约束以生成正确的结果。
·所以,使用外部联接的查询看起来与以前的广告货币化示例很是类似,只是会有一个附加参数将其指定为外部联接
impressionsWithWatermark.join(
  clicksWithWatermark, expr("""  clickAdId = impressionAdId AND  clickTime >= impressionTime AND  clickTime <= impressionTime + interval 1 hour  """), "leftOuter" # can be "inner", "leftOuter", "rightOuter" )
Semantic Guarantees of Stream-stream Outer Joins with Watermarking(具备水印的流 - 流外链接的语义保证)

外链接与内部链接具备相同的保证,关于水印延迟以及数据是否会被丢弃。api

Caveats(注意事项)
·关于如何生成外部结果,有一些重要的特征须要注意。
·将生成外部NULL结果,延迟取决于指定的水印延迟和时间范围条件。
·这是由于引擎必须等待那么长时间以确保没有匹配,而且未来不会再有匹配。
·在微批量引擎的当前实现中,水印在微批次结束时前进,而且下一个微批次使用更新的水印来清理状态并输出外部结果。
·因为咱们仅在存在要处理的新数据时才触发微批处理,所以若是在流中没有接收到新数据,则外部结果的生成可能会延迟。
·简而言之,若是链接的两个输入流中的任何一个在一段时间内没有接收到数据,则外部(两种状况,左侧或右侧)输出可能会延迟
Support matrix for joins in streaming queries(支持流式查询中的链接矩阵)
Left Input Right Input Join Type  
Static Static All types Supported, since its not on streaming data even though it can be present in a streaming query
Stream Static Inner Supported, not stateful
Left Outer Supported, not stateful
Right Outer Not supported
Full Outer Not supported
Static Stream Inner Supported, not stateful
Left Outer Not supported
Right Outer Supported, not stateful
Full Outer Not supported
Stream Stream Inner Supported, optionally specify watermark on both sides + time constraints for state cleanup
Left Outer Conditionally supported, must specify watermark on right + time constraints for correct results, optionally specify watermark on left for all state cleanup
Right Outer Conditionally supported, must specify watermark on left + time constraints for correct results, optionally specify watermark on right for all state cleanup
Full Outer Not supported
       
·有关支持链接的其余详细信息
·链接能够级联,也就是说,你能够作df1.join(df2,...)。join(df3,...)。join(df4,....)。
·从Spark 2.3开始,只有在查询处于追加输出模式时才能使用链接。其余输出模式尚不支持。
·从Spark 2.3开始,在链接以前不能使用其余非相似地图的操做。如下是一些不能使用的例子。
·在加入以前没法使用流聚合。
·在链接以前,没法在更新模式下使用mapGroupsWithState和flatMapGroupsWithState。

Streaming Deduplication(流式重复数据删除)

·您可使用事件中的惟一标识符对数据流中的记录进行重复数据删除。
·这与使用惟一标识符列的静态重复数据删除彻底相同。
·该查询将存储来自先前记录的必要数据量,以便它能够过滤重复记录。
·与聚合相似,您可使用带或不带水印的重复数据删除。
·使用水印 - 若是重复记录的到达时间有上限,则能够在事件时间列上定义水印,并使用guid和事件时间列进行重复数据删除。
·该查询将使用水印从过去的记录中删除旧的状态数据,这些记录不会再被重复。
·这限制了查询必须维护的状态量。
·没有水印 - 因为重复记录可能到达时没有界限,查询未来自全部过去记录的数据存储为状态。
streamingDf = spark.readStream. ... # Without watermark using guid column streamingDf.dropDuplicates("guid") # With watermark using guid and eventTime columns streamingDf \ .withWatermark("eventTime", "10 seconds") \ .dropDuplicates("guid", "eventTime")

Policy for handling multiple watermarks(处理多个水印的政策)

·流式查询能够具备多个联合或链接在一块儿的输入流。
·每一个输入流能够具备不一样的后期数据阈值,这些阈值须要被容忍用于有状态操做。
·您能够在每一个输入流上使用withWatermarks(“eventTime”,delay)指定这些阈值。
·例如,考虑在inputStream1和inputStream2之间使用流 - 流链接的查询。
·inputStream1.withWatermark(“eventTime1”,“1小时”)。join(inputStream2.withWatermark(“eventTime2”,“2小时”),joinCondition)
·在执行查询时,Structured Streaming单独跟踪每一个输入流中看到的最大事件时间,根据相应的延迟计算水印,并选择单个全局水印用于有状态操做。
·默认状况下,选择最小值做为全局水印,由于它确保若是其中一个流落后于其余流(例如,其中一个流因上游故障而中止接收数据),则不会意外丢弃数据。
·换句话说,全局水印将以最慢流的速度安全地移动,而且查询输出将相应地延迟。
·可是,在某些状况下,您可能但愿得到更快的结果,即便这意味着从最慢的流中删除数据。
·从Spark 2.4开始,您能够设置多个水印策略,经过将SQL配置spark.sql.streaming.multipleWatermarkPolicy设置为max(默认为min)来选择最大值做为全局水印。
·这使得全球水印以最快的速度发展。
·可是,做为反作用,来自较慢流的数据将被积极地丢弃。
·所以,明智地使用此配置。

Arbitrary Stateful Operations(任意有状态的行动)

·许多用例须要比聚合更高级的有状态操做。
·例如,在许多用例中,您必须从事件的数据流中跟踪会话。
·要进行此类会话,您必须将任意类型的数据保存为状态,并使用每一个触发器中的数据流事件对状态执行任意操做。
·从Spark 2.2开始,这可使用操做mapGroupsWithState和更强大的操做flatMapGroupsWithState来完成。
·这两个操做都容许您在分组数据集上应用用户定义的代码以更新用户定义的状态。
·有关更具体的详细信息,请查看API文档(Scala / Java)和示例(Scala / Java)。

Unsupported Operations(不支持的操做)

·流式DataFrames / Datasets不支持一些DataFrame / Dataset操做。其中一些以下。
·流数据集上尚不支持多个流聚合(即,流DF上的聚合链)。
·流数据集不支持限制和前N行。
·不支持对流数据集进行不一样的操做。
·仅在聚合和彻底输出模式以后,流数据集才支持排序操做。
·不支持流数据集上的几种外链接类型。
·有关详细信息,请参阅“链接操做”部分中的支持矩阵。
·此外,有一些数据集方法不适用于流数据集。
·它们是当即运行查询并返回结果的操做,这对流式数据集没有意义。
·相反,这些功能能够经过显式启动流式查询来完成(请参阅下一节)。
·count() - 没法从流数据集返回单个计数。
·相反,使用ds.groupBy()。count()返回包含运行计数的流数据集。
·foreach() - 而是使用ds.writeStream.foreach(...)(参见下一节)。
·show() - 而是使用控制台接收器(参见下一节)。
·若是您尝试这些操做中的任何一个,您将看到一个AnalysisException,例如“流数据框架/数据集不支持操做XYZ”。
·虽然其中一些可能在将来的Spark版本中获得支持,但还有一些基本上难以有效地实现流数据。
·例如,不支持对输入流进行排序,由于它须要跟踪流中接收的全部数据。
·所以,这基本上难以有效执行。

Starting Streaming Queries(启动流式查询)

·一旦定义了最终结果DataFrame / Dataset,剩下的就是开始流式计算。
·为此,您必须使用经过Dataset.writeStream()返回的DataStreamWriter(Scala / Java / Python文档)。
·您必须在此界面中指定如下一项或多项。
·输出接收器的详细信息:数据格式,位置等。
·输出模式:指定写入输出接收器的内容。
·查询名称:可选地,指定查询的惟一名称以进行标识。
·触发间隔:可选择指定触发间隔。
·若是未指定,则系统将在前一处理完成后当即检查新数据的可用性。
·若是因为先前的处理还没有完成而错过了触发时间,则系统将当即触发处理。
·检查点位置:对于能够保证端到端容错的某些输出接收器,请指定系统写入全部检查点信息的位置。
·这应该是与HDFS兼容的容错文件系统中的目录。
·检查点的语义将在下一节中详细讨论。

Output Modes(输出模式)

·有几种类型的输出模式。
·追加模式(默认) - 这是默认模式,其中只有自上次触发后添加到结果表的新行才会输出到接收器。
·仅支持那些添加到结果表中的行永远不会更改的查询。
·所以,此模式保证每行仅输出一次(假设容错接收器)。
·例如,仅使用select,where,map,flatMap,filter,join等的查询将支持Append模式。
·完成模式 - 每次触发后,整个结果表将输出到接收器。
·聚合查询支持此功能。
·更新模式 - (自Spark 2.1.1起可用)仅将结果表中自上次触发后更新的行输出到接收器。
·在未来的版本中添加更多信息。
·不一样类型的流式查询支持不一样的输出模式。
·这是兼容性矩阵。
Query Type   Supported Output Modes Notes
Queries with aggregation Aggregation on event-time with watermark Append, Update, Complete Append mode uses watermark to drop old aggregation state. But the output of a windowed aggregation is delayed the late threshold specified in `withWatermark()` as by the modes semantics, rows can be added to the Result Table only once after they are finalized (i.e. after watermark is crossed). See the Late Data section for more details. 

Update mode uses watermark to drop old aggregation state. 

Complete mode does not drop old aggregation state since by definition this mode preserves all data in the Result Table.
Other aggregations Complete, Update Since no watermark is defined (only defined in other category), old aggregation state is not dropped. 

Append mode is not supported as aggregates can update thus violating the semantics of this mode.
Queries with mapGroupsWithState Update  
Queries with flatMapGroupsWithState Append operation mode Append Aggregations are allowed after flatMapGroupsWithState.
Update operation mode Update Aggregations not allowed after flatMapGroupsWithState.
Queries with joins Append Update and Complete mode not supported yet. See the support matrix in the Join Operations section for more details on what types of joins are supported.
Other queries Append, Update Complete mode not supported as it is infeasible to keep all unaggregated data in the Result Table.
       

Output Sinks(输出接收器)

有几种类型的内置输出接收器数组

  • 文件接收器 - 将输出存储到目录。
writeStream
    .format("parquet") // can be "orc", "json", "csv", etc. .option("path", "path/to/destination/dir") .start()
  • Kafka sink - 将输出存储到Kafka中的一个或多个主题
writeStream
    .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("topic", "updates") .start()
  • Foreach接收器 - 对输出中的记录运行任意计算。有关详细信息,请参阅本节后面的内容
writeStream
    .foreach(...)
    .start()
  • 控制台接收器(用于调试) - 每次触发时将输出打印到控制台/标准输出。支持Append和Complete输出模式。这应该用于低数据量的调试目的,由于在每次触发后收集整个输出并将其存储在驱动程序的内存中
writeStream
    .format("console") .start()
  • 内存接收器(用于调试) - 输出做为内存表存储在内存中。支持Append和Complete输出模式。这应该用于低数据量的调试目的,由于整个输出被收集并存储在驱动程序的内存中。所以,请谨慎使用。
writeStream
    .format("memory") .queryName("tableName") .start()
·某些接收器不具备容错能力,由于它们不保证输出的持久性,仅用于调试目的。请参阅前面的容错语义部分。如下是Spark中全部接收器的详细信息。
Sink Supported Output Modes Options Fault-tolerant Notes
File Sink Append path: path to the output directory, must be specified. 

For file-format-specific options, see the related methods in DataFrameWriter (Scala/Java/Python/R). E.g. for "parquet" format options see DataFrameWriter.parquet()
Yes (exactly-once) Supports writes to partitioned tables. Partitioning by time may be useful.
Kafka Sink Append, Update, Complete See the Kafka Integration Guide Yes (at-least-once) More details in the Kafka Integration Guide
Foreach Sink Append, Update, Complete None Depends on ForeachWriter implementation More details in the next section
ForeachBatch Sink Append, Update, Complete None Depends on the implementation More details in the next section
Console Sink Append, Update, Complete numRows: Number of rows to print every trigger (default: 20) 
truncate: Whether to truncate the output if too long (default: true)
No  
Memory Sink Append, Complete None No. But in Complete Mode, restarted query will recreate the full table. Table name is the query name.
         
·请注意,您必须调用start()来实际开始执行查询。这将返回一个StreamingQuery对象,该对象是持续运行的执行的句柄。您可使用此对象来管理查询,咱们将在下一小节中讨论。
·如今,让咱们经过几个例子来理解这一切。
# ========== DF with no aggregations ==========
noAggDF = deviceDataDf.select("device").where("signal > 10") # Print new data to console noAggDF \ .writeStream \ .format("console") \ .start() # Write new data to Parquet files noAggDF \ .writeStream \ .format("parquet") \ .option("checkpointLocation", "path/to/checkpoint/dir") \ .option("path", "path/to/destination/dir") \ .start() # ========== DF with aggregation ========== aggDF = df.groupBy("device").count() # Print updated aggregations to console aggDF \ .writeStream \ .outputMode("complete") \ .format("console") \ .start() # Have all the aggregates in an in-memory table. The query name will be the table name aggDF \ .writeStream \ .queryName("aggregates") \ .outputMode("complete") \ .format("memory") \ .start() spark.sql("select * from aggregates").show() # interactively query in-memory table
Using Foreach and ForeachBatch(使用Foreach和ForeachBatch)
·foreach和foreachBatch操做容许您在流式查询的输出上应用任意操做和编写逻辑。它们的用例略有不一样 - 虽然foreach容许在每一行上自定义写入逻辑,foreachBatch容许在每一个微批量的输出上进行任意操做和自定义逻辑。让咱们更详细地了解他们的用法。
ForeachBatch
·foreachBatch(...)容许您指定在流式查询的每一个微批次的输出数据上执行的函数。从Spark 2.4开始,Scala,Java和Python都支持它。它须要两个参数:DataFrame或Dataset,它具备微批次的输出数据和微批次的惟一ID。
def foreach_batch_function(df, epoch_id): # Transform and write batchDF pass streamingDF.writeStream.foreachBatch(foreach_batch_function).start() 

使用foreachBatch,您能够执行如下操做。

  • 重用现有的批处理数据源 - 对于许多存储系统,可能尚未可用的流式接收器,但可能已经存在用于批量查询的数据写入器。使用foreachBatch,您能够在每一个微批次的输出上使用批处理数据编写器。
  • 写入多个位置 - 若是要将流式查询的输出写入多个位置,则能够简单地屡次写入输出DataFrame / Dataset。可是,每次写入尝试都会致使从新计算输出数据(包括可能从新读取输入数据)。要避免从新计算,您应该缓存输出DataFrame / Dataset,将其写入多个位置,而后将其解除。这是一个大纲。

    streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => batchDF.persist() batchDF.write.format(…).save(…) // location 1 batchDF.write.format(…).save(…) // location 2 batchDF.unpersist() }

  • 应用其余DataFrame操做 - 流式DataFrame中不支持许多DataFrame和Dataset操做,由于Spark不支持在这些状况下生成增量计划。使用foreachBatch,您能够在每一个微批输出上应用其中一些操做。可是,您必须本身解释执行该操做的端到端语义。

Note:

  • 默认状况下,foreachBatch仅提供至少一次写保证。可是,您可使用提供给该函数的batchId做为重复数据删除输出并得到一次性保证的方法。
  • foreachBatch不适用于连续处理模式,由于它从根本上依赖于流式查询的微批量执行。若是以连续模式写入数据,请改用foreach。
Foreach
·若是foreachBatch不是一个选项(例如,相应的批处理数据写入器不存在,或连续处理模式),那么您可使用foreach表达自定义编写器逻辑。具体来讲,您能够经过将数据划分为三种方法来表达数据写入逻辑:打开,处理和关闭。
·从Spark 2.4开始,foreach可用于Scala,Java和Python。
·在Python中,您能够经过两种方式调用foreach:在函数中或在对象中。该函数提供了一种表达处理逻辑的简单方法,可是当故障致使某些输入数据的从新处理时,不容许您对生成的数据进行重复数据删除。对于这种状况,您必须在对象中指定处理逻辑。
  1. 该函数将一行做为输入。
      def process_row(row): # Write row to storage pass query = streamingDF.writeStream.foreach(process_row).start() 
  1. 该对象有一个处理方法和可选的打开和关闭方法:
      class ForeachWriter:
          def open(self, partition_id, epoch_id): # Open connection. This method is optional in Python. pass def process(self, row): # Write row to connection. This method is NOT optional in Python. pass def close(self, error): # Close the connection. This method in optional in Python. pass query = streamingDF.writeStream.foreach(ForeachWriter()).start() 

执行语义启动流式查询时,Spark如下列方式调用函数或对象的方法:

  • 此对象的单个副本负责查询中单个任务生成的全部数据。换句话说,一个实例负责处理以分布式方式生成的数据的一个分区。
  • 此对象必须是可序列化的,由于每一个任务都将得到所提供对象的新的序列化反序列化副本。所以,强烈建议在调用open()方法以后完成用于写入数据的任何初始化(例如,打开链接或启动事务),这表示任务已准备好生成数据
  • 方法的生命周期以下:

    • 对于partition_id的每一个分区:

      • 对于epoch_id的流数据的每一个批次/纪元:

        • 方法open(partitionId,epochId)被调用。

        • 若是open(...)返回true,则对于分区和批处理/纪元中的每一行,将调用方法进程(行)

        • 调用方法close(错误),在处理行时看到错误(若是有)。

  • 若是open()方法存在而且成功返回(无论返回值),则调用close()方法(若是存在),除非JVM或Python进程在中间崩溃。

  • Note: 当失败致使某些输入数据的从新处理时,open()方法中的partitionId和epochId可用于对生成的数据进行重复数据删除。这取决于查询的执行模式。若是以微批处理模式执行流式查询,则保证由惟一元组(partition_id,epoch_id)表示的每一个分区具备相同的数据。所以,(partition_id,epoch_id)可用于对数据进行重复数据删除和/或事务提交,并实现一次性保证。可是,若是正在以连续模式执行流式查询,则此保证不成立,所以不该用于重复数据删除。

Triggers(触发器)

流式查询的触发器设置定义了流式数据处理的时间,查询是做为具备固定批处理间隔的微批量查询仍是做为连续处理查询来执行。如下是支持的各类触发器。
Trigger Type Description
unspecified (default) 若是未明确指定触发设置,则默认状况下,查询将以微批处理模式执行,一旦前一个微批处理完成处理,将当即生成微批处理。
Fixed interval micro-batches 查询将以微批处理模式执行,其中微批处理将以用户指定的间隔启动。
  • 若是前一个微批次在该间隔内完成,那么引擎将等待该间隔结束,而后开始下一个微批次
  • 若是前一个微批次须要的时间长于完成的间隔(即若是错过了间隔边界),则下一个微批次将在前一个完成后当即开始(即,它不会等待下一个间隔边界)。
  • 若是没有可用的新数据,则不会启动微批次
One-time micro-batch
查询将执行*仅一个*微批处理全部可用数据,而后自行中止。这在您但愿按期启动集群,处理自上一个时间段以来可用的全部内容,而后关闭集群的方案中很是有用。在某些状况下,这可能会显着节省成本。
Continuous with fixed checkpoint interval
(experimental)
查询将以新的低延迟,连续处理模式执行。
·在下面的连续处理部分中阅读更多相关信息

如下是一些代码示例:

# Default trigger (runs micro-batch as soon as it can)
df.writeStream \
  .format("console") \ .start() # ProcessingTime trigger with two-seconds micro-batch interval df.writeStream \ .format("console") \ .trigger(processingTime='2 seconds') \ .start() # One-time trigger df.writeStream \ .format("console") \ .trigger(once=True) \ .start() # Continuous trigger with one-second checkpointing interval df.writeStream .format("console") .trigger(continuous='1 second') .start()

Managing Streaming Queries(管理流式查询)

启动查询时建立的StreamingQuery对象可用于监视和管理查询

query = df.writeStream.format("console").start() # get the query object query.id() # get the unique identifier of the running query that persists across restarts from checkpoint data query.runId() # get the unique id of this run of the query, which will be generated at every start/restart query.name() # get the name of the auto-generated or user-specified name query.explain() # print detailed explanations of the query query.stop() # stop the query query.awaitTermination() # block until query is terminated, with stop() or with error query.exception() # the exception if the query has been terminated with error query.recentProgress() # an array of the most recent progress updates for this query query.lastProgress() # the most recent progress update of this streaming query
·您能够在单个SparkSession中启动任意数量的查询。它们将同时运行,共享群集资源。您可使用sparkSession.streams()来获取可用于管理当前活动查询的StreamingQueryManager(Scala / Java / Python文档)
spark = ...  # spark session

spark.streams().active # get the list of currently active streaming queries spark.streams().get(id) # get a query object by its unique id spark.streams().awaitAnyTermination() # block until any one of them terminates

Monitoring Streaming Queries(监视流式查询)

·有多种方法能够监控活动的流式查询。您可使用Spark的Dropwizard Metrics支持将指标推送到外部系统,也能够经过编程方式访问它们。

Reading Metrics Interactively(以交互方式阅读度量标准)

·您可使用streamingQuery.lastProgress()和streamingQuery.status()直接获取活动查询的当前状态和指标。lastProgress()返回Scala和Java中的StreamingQueryProgress对象以及Python中具备相同字段的字典。它包含有关在流的最后一次触发中所取得进展的所 有信息 - 处理了哪些数据,处理速率,延迟等等。还有streamingQuery.recentProgress,它返回最后几个进展的数组。此外,streamingQuery.status()返回Scala和Java中的StreamingQueryStatus对象以及Python中具备相同字段的字典。它提供了有关查询当即执行操做的信息 - 触发器是否处于活动状态,是否正在处理数据等。
·这里有一些例子。
query = ...  # a StreamingQuery
print(query.lastProgress) ''' Will print something like the following. {u'stateOperators': [], u'eventTime': {u'watermark': u'2016-12-14T18:45:24.873Z'}, u'name': u'MyQuery', u'timestamp': u'2016-12-14T18:45:24.873Z', u'processedRowsPerSecond': 200.0, u'inputRowsPerSecond': 120.0, u'numInputRows': 10, u'sources': [{u'description': u'KafkaSource[Subscribe[topic-0]]', u'endOffset': {u'topic-0': {u'1': 134, u'0': 534, u'3': 21, u'2': 0, u'4': 115}}, u'processedRowsPerSecond': 200.0, u'inputRowsPerSecond': 120.0, u'numInputRows': 10, u'startOffset': {u'topic-0': {u'1': 1, u'0': 1, u'3': 1, u'2': 0, u'4': 1}}}], u'durationMs': {u'getOffset': 2, u'triggerExecution': 3}, u'runId': u'88e2ff94-ede0-45a8-b687-6316fbef529a', u'id': u'ce011fdc-8762-4dcb-84eb-a77333e28109', u'sink': {u'description': u'MemorySink'}} ''' print(query.status) ''' Will print something like the following. {u'message': u'Waiting for data to arrive', u'isTriggerActive': False, u'isDataAvailable': False} '''

Reporting Metrics programmatically using Asynchronous APIs(使用异步API以编程方式报告度量标准)

您还能够经过附加StreamingQueryListener(Scala / Java文档)异步监视与SparkSession关联的全部查询。使用sparkSession.streams.attachListener()附加自定义StreamingQueryListener对象后,将在启动和中止查询以及活动查询中取得进展时得到回调。
这是一个例子,
Not available in Python.

Reporting Metrics using Dropwizard(使用Dropwizard报告指标)

Spark支持使用Dropwizard库报告指标。要同时报告结构化流式查询的指标,您必须在SparkSession中显式启用配置spark.sql.streaming.metricsEnabled。
spark.conf.set("spark.sql.streaming.metricsEnabled", "true") # or spark.sql("SET spark.sql.streaming.metricsEnabled=true")

All queries started in the SparkSession after this configuration has been enabled will report metrics through Dropwizard to whatever sinks have been configured (e.g. Ganglia, Graphite, JMX, etc.).

Recovering from Failures with Checkpointing(经过检查点从故障中恢复)

若是发生故障或故意关机,您能够恢复先前查询的先前进度和状态,并从中断处继续。这是使用检查点和预写日志完成的。您可使用检查点位置配置查询,查询将保存全部进度信息(即每一个触发器中处理的偏移范围)和运行聚合(例如快速示例中的字数)到检查点位置。此检查点位置必须是HDFS兼容文件系统中的路径,而且能够在启动查询时设置为DataStreamWriter中的选项。
aggDF \
    .writeStream \
    .outputMode("complete") \ .option("checkpointLocation", "path/to/HDFS/dir") \ .format("memory") \ .start()

Recovery Semantics after Changes in a Streaming Query(流式查询中更改后的恢复语义)

在从同一检查点位置从新启动之间容许对流查询进行哪些更改存在限制。如下是一些不容许的更改,或者更改的效果未明肯定义。
  • 术语“容许”意味着您能够执行指定的更改,但其效果的语义是否明肯定义取决于查询和更改.

  • 术语“不容许”意味着您不该该执行指定的更改,由于从新启动的查询可能会因不可预测的错误而失败。sdf表示使用sparkSession.readStream生成的流式DataFrame / Dataset

Types of changes(变化的类型)

  • 输入源的数量或类型(即不一样来源)的变化:这是不容许的。

  • 输入源参数的更改:是否容许此更改以及更改的语义是否明肯定义取决于源和查询。这里有一些例子。
    • 容许添加/删除/修改速率限制: spark.readStream.format("kafka").option("subscribe", "topic") to  spark.readStream.format("kafka").option("subscribe", "topic").option("maxOffsetsPerTrigger", ...)
    • 不容许对订阅的主题/文件进行更改,由于结果是不可预测:spark.readStream.format("kafka").option("subscribe", "topic") to  spark.readStream.format("kafka").option("subscribe", "newTopic")
  • 输出接收器类型的变化:容许几个特定接收器组合之间的变化。这须要根据具体状况进行验证。这里有一些例子。
    • File sink to Kafka sink is allowed. Kafka will see only the new data.

    • Kafka sink to file sink is not allowed.

    • Kafka sink changed to foreach, or vice versa is allowed.

  • Changes in the parameters of output sink: Whether this is allowed and whether the semantics of the change are well-defined depends on the sink and the query. Here are a few examples.

    • Changes to output directory of a file sink is not allowed: sdf.writeStream.format("parquet").option("path", "/somePath") to sdf.writeStream.format("parquet").option("path", "/anotherPath")

    • Changes to output topic is allowed: sdf.writeStream.format("kafka").option("topic", "someTopic") to sdf.writeStream.format("kafka").option("topic", "anotherTopic")

    • Changes to the user-defined foreach sink (that is, the ForeachWriter code) is allowed, but the semantics of the change depends on the code.

  • *Changes in projection / filter / map-like operations**: Some cases are allowed. For example:

    • Addition / deletion of filters is allowed: sdf.selectExpr("a") to sdf.where(...).selectExpr("a").filter(...).

    • Changes in projections with same output schema is allowed: sdf.selectExpr("stringColumn AS json").writeStream to sdf.selectExpr("anotherStringColumn AS json").writeStream

    • Changes in projections with different output schema are conditionally allowed: sdf.selectExpr("a").writeStream to sdf.selectExpr("b").writeStream is allowed only if the output sink allows the schema change from "a" to "b".

  • Changes in stateful operations: Some operations in streaming queries need to maintain state data in order to continuously update the result. Structured Streaming automatically checkpoints the state data to fault-tolerant storage (for example, HDFS, AWS S3, Azure Blob storage) and restores it after restart. However, this assumes that the schema of the state data remains same across restarts. This means that any changes (that is, additions, deletions, or schema modifications) to the stateful operations of a streaming query are not allowed between restarts. Here is the list of stateful operations whose schema should not be changed between restarts in order to ensure state recovery:

    • Streaming aggregation: For example, sdf.groupBy("a").agg(...). Any change in number or type of grouping keys or aggregates is not allowed.

    • Streaming deduplication: For example, sdf.dropDuplicates("a"). Any change in number or type of grouping keys or aggregates is not allowed.

    • Stream-stream join: For example, sdf1.join(sdf2, ...) (i.e. both inputs are generated with sparkSession.readStream). Changes in the schema or equi-joining columns are not allowed. Changes in join type (outer or inner) not allowed. Other changes in the join condition are ill-defined.

    • Arbitrary stateful operation: For example, sdf.groupByKey(...).mapGroupsWithState(...) or sdf.groupByKey(...).flatMapGroupsWithState(...). Any change to the schema of the user-defined state and the type of timeout is not allowed. Any change within the user-defined state-mapping function are allowed, but the semantic effect of the change depends on the user-defined logic. If you really want to support state schema changes, then you can explicitly encode/decode your complex state data structures into bytes using an encoding/decoding scheme that supports schema migration. For example, if you save your state as Avro-encoded bytes, then you are free to change the Avro-state-schema between query restarts as the binary state will always be restored successfully.

Continuous Processing(连续处理)

[Experimental]

连续处理是Spark 2.3中引入的一种新的实验性流执行模式,可实现低(~1 ms)端到端延迟,而且至少具备一次容错保证。将其与默认的微批处理引擎相比较,该引擎能够实现一次性保证,但最多可实现~100ms的延迟。
对于某些类型的查询(在下面讨论),您能够选择执行它们的模式而无需修改应用程序逻辑(即不更改DataFrame / Dataset操做)。
要在连续处理模式下运行支持的查询,您只需指定一个连续触发器,并将所需的检查点间隔做为参数。
例如:
spark \
  .readStream \
  .format("kafka") \ .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ .option("subscribe", "topic1") \ .load() \ .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \ .writeStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ .option("topic", "topic1") \ .trigger(continuous="1 second") \ # only change in query .start()

A checkpoint interval of 1 second means that the continuous processing engine will records the progress of the query every second. The resulting checkpoints are in a format compatible with the micro-batch engine, hence any query can be restarted with any trigger. For example, a supported query started with the micro-batch mode can be restarted in continuous mode, and vice versa. Note that any time you switch to continuous mode, you will get at-least-once fault-tolerance guarantees.

Supported Queries(支持的查询)

从Spark 2.3开始,连续处理模式仅支持如下类型的查询。

  • 操做:在连续模式下仅支持相似地图的数据集/数据框操做,即仅投影(select,map,flatMap,mapPartitions等)和选择(where,filter等)。
    • 除了聚合函数(由于尚不支持聚合),current_timestamp()和current_date()(使用时间的肯定性计算具备挑战性)以外,支持全部SQL函数。
  • Sources:
    • Kafka来源:支持全部选项。
    • Rate source: Good for testing. Only options that are supported in the continuous mode are numPartitions and rowsPerSecond.
  • Sinks:
    • Kafka sink:支持全部选项。
    • Memory sink: Good for debugging.
    • Console sink: Good for debugging. All options are supported. Note that the console will print every checkpoint interval that you have specified in the continuous trigger.
有关它们的更多详细信息,请参阅输入源和输出接收器部分。虽然控制台接收器很是适合测试,可是使用Kafka做为源和接收器能够最好地观察到端到端的低延迟处理,由于这容许引擎处理数据并使结果在输出主题中可用
输入主题中可用的毫秒输入数据。

Caveats(注意事项)

  • 连续处理引擎启动多个长时间运行的任务,这些任务不断从源中读取数据,处理数据并连续写入接收器。查询所需的任务数取决于查询能够并行从源读取的分区数。所以,在开始连续处理查询以前,必须确保群集中有足够的核心并行执行全部任务。
    例如,若是您正在读取具备10个分区的Kafka主题,则群集必须至少具备10个核心才能使查询取得进展。
  • 中止连续处理流可能会产生虚假的任务终止警告。这些能够安全地忽略。
  • 目前没有自动重试失败的任务。任何失败都将致使查询中止,而且须要从检查点手动从新启动。

Additional Information(附加信息)

Further Reading(进一步阅读)

Talks

  • Spark Summit Europe 2017
  • Spark Summit 2016