本文作者:Jay Kreps,linkedin公司首席工程师;文章来自于他在linkedin上的分享;原文标题:The Log: What every software engineer should know about real-time data’s unifying abstraction。
文章内容非常干货,非常值得学习。文章将以四部分进行阐述,建议大家耐心看完。
到此为止,我只是描述从端到端数据复制的理想机制。但是在存储系统中搬运字节不是所要讲述内容的全部。最终我们发现日志是流的另一种说法,日志是流处理的核心。
但是,等等,什么是流处理呢?
如果你是90年代晚期或者21世纪初数据库文化或者数据基础架构产品的爱好者,那么你就可能会把流处理与建创SQL引擎或者创建“箱子和箭头”接口用于事件驱动的处理等联系起来。
如果你关注开源数据库系统的大量出现,你就可能把流处理和一些开源数据库系统关联起来,这些系统包括了:Storm,Akka,S4和Samza.但是大部分人会把这些系统作为异步消息处理系统,这些系统与支持群集的远程过程调用层的应用没什么差别(而事实上在开源数据库系统领域某些方面确实如此)。
这些视图都有一些局限性。流处理与SQL是无关的。它也局限于实时流处理。不存在内在的原因限制你不能处理昨天的或者一个月之前的流数据,且使用多种不同的语言表达计算。
我把流处理视为更广泛的概念:持续数据流处理的基础架构。我认为计算模型可以像MapReduce或者分布式处理架构一样普遍,但是有能力处理低时延的结果。
处理模型的实时驱动是数据收集方法。成批收集的数据是分批处理的。数据是不断收集的,它也是按顺序不断处理的。
美国的统计调查就是成批收集数据的良好典范。统计调查周期性的开展,通过挨门挨户的走访,使用蛮力发现和统计美国的公民信息。1790年统计调查刚刚开始时这种方式是奏效的。那时的数据收集是批处理的,它包括了骑着马悠闲的行进,把信息写在纸上,然后把成批的记录传送到人们统计数据的中心站点。现在,在描述这个统计过程时,人们立即会想到为什么我们不保留出生和死亡的记录,这样就可以产生人口统计信息这些信息或是持续的或者是其它维度的。
这是一个极端的例子,但是大量的数据传送处理仍然依赖于周期性的转储,批量转化和集成。处理大容量转储的唯一方法就是批量的处理。但是随着这些批处理被持续的供给所取代,人们自然而然的开始不间断的处理以平滑的处理所需资源并且消除延迟。
例如LinkedIn几乎没有批量数据收集。大部分的数据或者是活动数据或者是数据库变更,这两者都是不间断发生的。事实上,你可以想到的任何商业,正如:Jack Bauer告诉我们的,低层的机制都是实时发生的不间断的流程事件。数据是成批收集的,它总是会依赖于一些人为的步骤,或者缺少数字化或者是一些自动化的非数字化流程处理的遗留信息。当传送和处理这些数据的机制是邮件或者人工的处理时,这一过程是非常缓慢的。首轮自动化总是保持着最初的处理形式,它常常会持续相当长的时间。
每天运行的批量处理作业常常是模拟了一种一天的窗口大小的不间断计算。当然,低层的数据也经常变化。在LinkedIn,这些是司空见贯的,并且使得它们在Hadoop运转的机制是有技巧的,所以我们实施了一整套管理增量的Hadoop工作流的架构。
由此看来,对于流处理可以有不同的观点。流处理包括了在底层数据处理的时间概念,它不需要数据的静态快照,它可以产生用户可控频率的输出,而不用等待数据集的全部到达。从这个角度上讲,流处理就是广义上的批处理,随着实时数据的流行,会儿更加普遍。
这就是为什么从传统的视角看来流处理是利基应用。我个人认为最大的原因是缺少实时数据收集使得不间断的处理成为了学术性的概念。
我想缺少实时数据收集就像是商用流处理系统注定的命运。他们的客户仍然需要处理面向文件的、每日批量处理ETL和数据集成。公司建设流处理系统关注的是提供附着在实时数据流的处理引擎,但是最终当时极少数人真正使用了实时数据流。事实上,在我在LinkedIn工作的初期,有一家公司试图把一个非常棒的流处理系统销售给我们,但是因为当时我们的全部数据都按小时收集在的文件里,当时我们提出的最好的应用就是在每小时的最后把这些文件输入到流处理系统中。他们注意到这是一个普遍性的问题。这些异常证明了如下规则:流处理系统要满足的重要商业目标之一是:财务, 它是实时数据流已具备的基准,并且流处理已经成为了瓶颈。
甚至于在一个健康的批处理系统中,流处理作为一种基础架构的实际应用能力是相当广泛的。它跨越了实时数据请求-应答服务和离线批量处理之间的鸿沟。现在的互联网公司,大约25%的代码可以划分到这个类型中。
最终这些日志解决了流处理中绝大部分关键的技术问题。在我看来,它所解决的最大的问题是它使得多订阅者可以获得实时数据。对这些技术细节感兴趣的朋友,我们可以用开源的Samza,它是基于这些理念建设的一个流处理系统。这些应用的更多技术细节我们在此文档中有详细的描述。
流处理最有趣的角度是它与流处理系统内部无关,但是与之密切相关的是如何扩展了我们谈到的早期数据集成的数据获取的理念。我们主要讨论了基础数据的获取或日志–事件和各类系统执行中产生的数据等。但是流处理允许我们包括了计算其它数据的数据。这些衍生的数据在消费者看来与他们计算的原始数据没什么差别。这些衍生的数据可以按任意的复杂度进行压缩。
让我们再深入一步。我们的目标是:流处理作业可以读取任意的日志并把日志写入到日志或者其它的系统中。他们用于输入输出的日志把这些处理关联到一组处理过程中。事实上,使用这种样式的集中日志,你可以把组织全部的数据抓取、转化和工作流看成是一系列的日志和写入它们的处理过程。
流处理器根本不需要理想的框架:它可能是读写日志的任何处理器或者处理器集合,但是额外的基础设施和辅助可以提供帮助管理处理代码。
日志集成的目标是双重的:
首先,它确保每个数据集都有多个订阅者和有序的。让我们回顾一下状态复制原则来记住顺序的重要性。为了使这个更加具体,设想一下从数据库中更新数据流–如果在处理过程中我们把对同一记录的两次更新重新排序,可能会产生错误的输出。 TCP之类的链接仅仅局限于单一的点对点链接,这一顺序的持久性要优于TCP之类的链接,它可以在流程处理失败和重连时仍然存在。
第二,日志提供了流程的缓冲。这是非常基础的。如果处理流程是非同步的,那么上行生成流数据的作业比下行消费流数据的作业运行的更快。这将会导致处理流程阻塞,或者缓冲数据,或者丢弃数据。丢弃数据并不是可行的方法,阻塞将会导致整个流程图立即停止。 日志实际上是一个非常大的缓冲,它允许流程重启或者停止但不会影响流程图其它部分的处理速度。如果要把数据流扩展到更大规模的组织,如果处理作业是由多个不同的团队提供的,这种隔离性是极其重的。我们不能容忍一个错误的作业引发后台的压力,这种压力会使得整个处理流程停止。
Storm和Sama这两者都是按非同步方式设计的,可以使用Kafka或者其它类似的系统作为它们的日志。
一些实时流处理在转化时是无状态的记录。在流处理中大部分的应用会是相当复杂的统计、聚合、不同窗口之间的关联。例如有时人们想扩大包含用户操作信息的事件流(一系列的单击动作)–实际上关联了用户的单击动作流与用户的账户信息数据库。不变的是这类流程最终会需要由处理器维护的一些状态信息。例如数据统计时,你需要统计到目前为止需要维护的计数器。如果处理器本身失败了,如何正确的维护这些状态信息呢?
最简单的替换方案是把这些状态信息保存在内存中。但是如果流程崩溃,它就会丢失中间状态。如果状态是按窗口维护的,流程就会回退到日志中窗口开始的时间点上。但是,如果统计是按小时进行的,那么这种方式就会变得不可行。
另一个替换方案是简单的存储所有的状态信息到远程的存储系统,通过网络与这些存储关联起来。这种机制的问题是没有本地数据和大量的网络间通信。
我们如何支持处理过程可以像表一样分区的数据呢?
回顾一下关于表和日志二相性的讨论。这一机制提供了工具把数据流转化为与处理过程协同定位的表,同时也提供了这些表的容错处理的机制。
流处理器可以把它的状态保存在本地的表或索引–bdb,或者leveldb,甚至于类似于Lucene 或fastbit一样不常见的索引。这些内容存储在它的输入流中(或许是使用任意的转化)。生成的变更日志记录了本地的索引,它允许存储事件崩溃、重启等的状态信息。流处理提供了通用的机制用于在本地输入流数据的随机索引中保存共同分片的状态。
当流程运行失败时,它会从变更日志中恢复它的索引。每次备份时,日志把本地状态转化成一系列的增量记录。
这种状态管理的方法有一个优势是把处理器的状态也做为日志进行维护。我们可以把这些日志看成与数据库表相对应的变更日志。事实上,这些处理器同时维护着像共同分片表一样的表。因为这些状态它本身就是日志,其它的处理器可以订阅它。如果流程处理的目标是更新结点的最后状态,这种状态又是流程的输出,那么这种方法就显得尤为重要。
为了数据集成,与来自数据库的日志关联,日志和数据库表的二象性就更加清晰了。变更日志可以从数据库中抽取出来,日志可以由不同的流处理器(流处理器用于关联不同的事件流)按不同的方式进行索引。
我们可以列举在Samza中有状态流处理管理的更多细节和大量实用的例子。
当然,我们不能奢望保存全部变更的完整日志。除非想要使用无限空间,日志不可能完全清除。为了澄清它,我们再来聊聊Kafka的实现。在Kafka中,清理有两种选择,这取决于数据是否包括关键更新和事件数据。对于事件数据,Kafka支持仅维护一个窗口的数据。通常,配置需要一些时间,窗口可以按时间或空间定义。虽然对于关键数据而言,完整日志的重要特征是你可以重现源系统的状态信息,或者在其它的系统重现。
随着时间的推移,保持完整的日志会使用越来越多的空间,重现所耗费的时间越来越长。因些在Kafka中,我们支持不同类型的保留。我们移除了废弃的记录(这些记录的主键最近更新过)而不是简单的丢弃旧日志。我们仍然保证日志包含了源系统的完整备份,但是现在我们不再重现原系统的全部状态,而是仅仅重现最近的状态。我们把这一特征称为日志压缩。
英语原文:
Part Three: Logs & Real-time Stream Processing
So far, I have only described what amounts to a fancy method of copying data from place-to-place. But shlepping bytes between storage systems is not the end of the story. It turns out that “log” is another word for “stream” and logs are at the heart of stream processing.
But, wait, what exactly is stream processing?
If you are a fan of late 90s and early 2000s database literature or semi-successful data infrastructure products, you likely associate stream processing with efforts to build a SQL engine or “boxes and arrows” interface for event driven processing.
If you follow the explosion of open source data systems, you likely associate stream processing with some of the systems in this space—for example, Storm, Akka, S4, and Samza. But most people see these as a kind of asynchronous message processing system not that different from a cluster-aware RPC layer (and in fact some things in this space are exactly that).
Both these views are a little limited. Stream processing has nothing to do with SQL. Nor is it limited to real-time processing. There is no inherent reason you can’t process the stream of data from yesterday or a month ago using a variety of different languages to express the computation.
I see stream processing as something much broader: infrastructure for continuous data processing. I think the computational model can be as general as MapReduce or other distributed processing frameworks, but with the ability to produce low-latency results.
The real driver for the processing model is the method of data collection. Data which is collected in batch is naturally processed in batch. When data is collected continuously, it is naturally processed continuously.
The US census provides a good example of batch data collection. The census periodically kicks off and does a brute force discovery and enumeration of US citizens by having people walking around door-to-door. This made a lot of sense in 1790 when the census was first begun. Data collection at the time was inherently batch oriented, it involved riding around on horseback and writing down records on paper, then transporting this batch of records to a central location where humans added up all the counts. These days, when you describe the census process one immediately wonders why we don’t keep a journal of births and deaths and produce population counts either continuously or with whatever granularity is needed.
This is an extreme example, but many data transfer processes still depend on taking periodic dumps and bulk transfer and integration. The only natural way to process a bulk dump is with a batch process. But as these processes are replaced with continuous feeds, one naturally starts to move towards continuous processing to smooth out the processing resources needed and reduce latency.
LinkedIn, for example, has almost no batch data collection at all. The majority of our data is either activity data or database changes, both of which occur continuously. In fact, when you think about any business, the underlying mechanics are almost always a continuous process—events happen in real-time, as Jack Bauer would tell us. When data is collected in batches, it is almost always due to some manual step or lack of digitization or is a historical relic left over from the automation of some non-digital process. Transmitting and reacting to data used to be very slow when the mechanics were mail and humans did the processing. A first pass at automation always retains the form of the original process, so this often lingers for a long time.
Production “batch” processing jobs that run daily are often effectively mimicking a kind of continuous computation with a window size of one day. The underlying data is, of course, always changing. These were actually so common at LinkedIn (and the mechanics of making them work in Hadoop so tricky) that we implemented a whole framework for managing incremental Hadoop workflows.
Seen in this light, it is easy to have a different view of stream processing: it is just processing which includes a notion of time in the underlying data being processed and does not require a static snapshot of the data so it can produce output at a user-controlled frequency instead of waiting for the “end” of the data set to be reached. In this sense, stream processing is a generalization of batch processing, and, given the prevalence of real-time data, a very important generalization.
So why has the traditional view of stream processing been as a niche application? I think the biggest reason is that a lack of real-time data collection made continuous processing something of an academic concern.
I think the lack of real-time data collection is likely what doomed the commercial stream-processing systems. Their customers were still doing file-oriented, daily batch processing for ETL and data integration. Companies building stream processing systems focused on providing processing engines to attach to real-time data streams, but it turned out that at the time very few people actually had real-time data streams. Actually, very early at my career at LinkedIn, a company tried to sell us a very cool stream processing system, but since all our data was collected in hourly files at that time, the best application we could come up with was to pipe the hourly files into the stream system at the end of the hour! They noted that this was a fairly common problem. The exception actually proves the rule here: finance, the one domain where stream processing has met with some success, was exactly the area where real-time data streams were already the norm and processing had become the bottleneck.
Even in the presence of a healthy batch processing ecosystem, I think the actual applicability of stream processing as an infrastructure style is quite broad. I think it covers the gap in infrastructure between real-time request/response services and offline batch processing. For modern internet companies, I think around 25% of their code falls into this category.
It turns out that the log solves some of the most critical technical problems in stream processing, which I’ll describe, but the biggest problem that it solves is just making data available in real-time multi-subscriber data feeds. For those interested in more details, we have open sourced Samza, a stream processing system explicitly built on many of these ideas. We describe a lot of these applications in more detail in the documentation here.
Data flow graphs
The most interesting aspect of stream processing has nothing to do with the internals of a stream processing system, but instead has to do with how it extends our idea of what a data feed is from the earlier data integration discussion. We discussed primarily feeds or logs of primary data—the events and rows of data produced in the execution of various applications. But stream processing allows us to also include feeds computed off other feeds. These derived feeds look no different to consumers then the feeds of primary data from which they are computed. These derived feeds can encapsulate arbitrary complexity.
Let’s dive into this a bit. A stream processing job, for our purposes, will be anything that reads from logs and writes output to logs or other systems. The logs they use for input and output join these processes into a graph of processing stages. Indeed, using a centralized log in this fashion, you can view all the organization’s data capture, transformation, and flow as just a series of logs and processes that write to them.
A stream processor need not have a fancy framework at all: it can be any process or set of processes that read and write from logs, but additional infrastructure and support can be provided for helping manage processing code.
The purpose of the log in the integration is two-fold.
First, it makes each dataset multi-subscriber and ordered. Recall our “state replication” principle to remember the importance of order. To make this more concrete, consider a stream of updates from a database—if we re-order two updates to the same record in our processing we may produce the wrong final output. This order is more permanent than what is provided by something like TCP as it is not limited to a single point-to-point link and survives beyond process failures and reconnections.
Second, the log provides buffering to the processes. This is very fundamental. If processing proceeds in an unsynchronized fashion it is likely to happen that an upstream data producing job will produce data more quickly than another downstream job can consume it. When this occurs processing must block, buffer or drop data. Dropping data is likely not an option; blocking may cause the entire processing graph to grind to a halt. The log acts as a very, very large buffer that allows process to be restarted or fail without slowing down other parts of the processing graph. This isolation is particularly important when extending this data flow to a larger organization, where processing is happening by jobs made by many different teams. We cannot have one faulty job cause back-pressure that stops the entire processing flow.
Both Storm and Samza are built in this fashion and can use Kafka or other similar systems as their log.
Stateful Real-Time Processing
Some real-time stream processing is just stateless record-at-a-time transformation, but many of the uses are more sophisticated counts, aggregations, or joins over windows in the stream. One might, for example, want to enrich an event stream (say a stream of clicks) with information about the user doing the click—in effect joining the click stream to the user account database. Invariably, this kind of processing ends up requiring some kind of state to be maintained by the processor: for example, when computing a count, you have the count so far to maintain. How can this kind of state be maintained correctly if the processors themselves can fail?
The simplest alternative would be to keep state in memory. However if the process crashed it would lose its intermediate state. If state is only maintained over a window, the process could just fall back to the point in the log where the window began. However, if one is doing a count over an hour, this may not be feasible.
An alternative is to simply store all state in a remote storage system and join over the network to that store. The problem with this is that there is no locality of data and lots of network round-trips.
How can we support something like a “table” that is partitioned up with our processing?
Well recall the discussion of the duality of tables and logs. This gives us exactly the tool to be able to convert streams to tables co-located with our processing, as well as a mechanism for handling fault tolerance for these tables.
A stream processor can keep it’s state in a local “table” or “index”—a bdb, leveldb, or even something more unusual such as a Lucene or fastbit index. The contents of this this store is fed from its input streams (after first perhaps applying arbitrary transformation). It can journal out a changelog for this local index it keeps to allow it to restore its state in the event of a crash and restart. This mechanism allows a generic mechanism for keeping co-partitioned state in arbitrary index types local with the incoming stream data.
When the process fails, it restores its index from the changelog. The log is the transformation of the local state into a sort of incremental record at a time backup.
This approach to state management has the elegant property that the state of the processors is also maintained as a log. We can think of this log just like we would the log of changes to a database table. In fact, the processors have something very like a co-partitioned table maintained along with them. Since this state is itself a log, other processors can subscribe to it. This can actually be quite useful in cases when the goal of the processing is to update a final state and this state is the natural output of the processing.
When combined with the logs coming out of databases for data integration purposes, the power of the log/table duality becomes clear. A change log may be extracted from a database and indexed in different forms by various stream processors to join against event streams.
We give more detail on this style of managing stateful processing in Samza and a lot more practical examples here.
Log Compaction
Of course, we can’t hope to keep a complete log for all state changes for all time. Unless one wants to use infinite space, somehow the log must be cleaned up. I’ll talk a little about the implementation of this in Kafka to make it more concrete. In Kafka, cleanup has two options depending on whether the data contains keyed updates or event data. For event data, Kafka supports just retaining a window of data. Usually, this is configured to a few days, but the window can be defined in terms of time or space. For keyed data, though, a nice property of the complete log is that you can replay it to recreate the state of the source system (potentially recreating it in another system).
However, retaining the complete log will use more and more space as time goes by, and the replay will take longer and longer. Hence, in Kafka, we support a different type of retention. Instead of simply throwing away the old log, we remove obsolete records—i.e. records whose primary key has a more recent update. By doing this, we still guarantee that the log contains a complete backup of the source system, but now we can no longer recreate all previous states of the source system, only the more recent ones. We call this feature log compaction.
原文作者:Jay Kreps 译者:LitStone, super0555, 几点人, cmy00cmy, tnjin, 928171481, 黄劼等。来自:开源中国
End.