本文作者:Jay Kreps,linkedin公司首席工程师;文章来自于他在linkedin上的分享;原文标题:The Log: What every software engineer should know about real-time data’s unifying abstraction。
例如LinkedIn几乎没有批量数据收集。大部分的数据或者是活动数据或者是数据库变更,这两者都是不间断发生的。事实上,你可以想到的任何商业,正如:Jack Bauer告诉我们的,低层的机制都是实时发生的不间断的流程事件。数据是成批收集的,它总是会依赖于一些人为的步骤,或者缺少数字化或者是一些自动化的非数字化流程处理的遗留信息。当传送和处理这些数据的机制是邮件或者人工的处理时,这一过程是非常缓慢的。首轮自动化总是保持着最初的处理形式,它常常会持续相当长的时间。
我想缺少实时数据收集就像是商用流处理系统注定的命运。他们的客户仍然需要处理面向文件的、每日批量处理ETL和数据集成。公司建设流处理系统关注的是提供附着在实时数据流的处理引擎,但是最终当时极少数人真正使用了实时数据流。事实上,在我在LinkedIn工作的初期,有一家公司试图把一个非常棒的流处理系统销售给我们,但是因为当时我们的全部数据都按小时收集在的文件里,当时我们提出的最好的应用就是在每小时的最后把这些文件输入到流处理系统中。他们注意到这是一个普遍性的问题。这些异常证明了如下规则:流处理系统要满足的重要商业目标之一是:财务, 它是实时数据流已具备的基准,并且流处理已经成为了瓶颈。
首先,它确保每个数据集都有多个订阅者和有序的。让我们回顾一下状态复制原则来记住顺序的重要性。为了使这个更加具体,设想一下从数据库中更新数据流–如果在处理过程中我们把对同一记录的两次更新重新排序,可能会产生错误的输出。 TCP之类的链接仅仅局限于单一的点对点链接,这一顺序的持久性要优于TCP之类的链接,它可以在流程处理失败和重连时仍然存在。
第二,日志提供了流程的缓冲。这是非常基础的。如果处理流程是非同步的,那么上行生成流数据的作业比下行消费流数据的作业运行的更快。这将会导致处理流程阻塞,或者缓冲数据,或者丢弃数据。丢弃数据并不是可行的方法,阻塞将会导致整个流程图立即停止。 日志实际上是一个非常大的缓冲,它允许流程重启或者停止但不会影响流程图其它部分的处理速度。如果要把数据流扩展到更大规模的组织,如果处理作业是由多个不同的团队提供的,这种隔离性是极其重的。我们不能容忍一个错误的作业引发后台的压力,这种压力会使得整个处理流程停止。
流处理器可以把它的状态保存在本地的表或索引–bdb,或者leveldb,甚至于类似于Lucene 或fastbit一样不常见的索引。这些内容存储在它的输入流中(或许是使用任意的转化)。生成的变更日志记录了本地的索引,它允许存储事件崩溃、重启等的状态信息。流处理提供了通用的机制用于在本地输入流数据的随机索引中保存共同分片的状态。
Part Three: Logs & Real-time Stream Processing
So far, I have only described what amounts to a fancy method of copying data from place-to-place. But shlepping bytes between storage systems is not the end of the story. It turns out that “log” is another word for “stream” and logs are at the heart of stream processing.
But, wait, what exactly is stream processing?
If you are a fan of late 90s and early 2000s database literature or semi-successful data infrastructure products, you likely associate stream processing with efforts to build a SQL engine or “boxes and arrows” interface for event driven processing.
If you follow the explosion of open source data systems, you likely associate stream processing with some of the systems in this space—for example, Storm, Akka, S4, and Samza. But most people see these as a kind of asynchronous message processing system not that different from a cluster-aware RPC layer (and in fact some things in this space are exactly that).
Both these views are a little limited. Stream processing has nothing to do with SQL. Nor is it limited to real-time processing. There is no inherent reason you can’t process the stream of data from yesterday or a month ago using a variety of different languages to express the computation.
I see stream processing as something much broader: infrastructure for continuous data processing. I think the computational model can be as general as MapReduce or other distributed processing frameworks, but with the ability to produce low-latency results.
The real driver for the processing model is the method of data collection. Data which is collected in batch is naturally processed in batch. When data is collected continuously, it is naturally processed continuously.
The US census provides a good example of batch data collection. The census periodically kicks off and does a brute force discovery and enumeration of US citizens by having people walking around door-to-door. This made a lot of sense in 1790 when the census was first begun. Data collection at the time was inherently batch oriented, it involved riding around on horseback and writing down records on paper, then transporting this batch of records to a central location where humans added up all the counts. These days, when you describe the census process one immediately wonders why we don’t keep a journal of births and deaths and produce population counts either continuously or with whatever granularity is needed.
This is an extreme example, but many data transfer processes still depend on taking periodic dumps and bulk transfer and integration. The only natural way to process a bulk dump is with a batch process. But as these processes are replaced with continuous feeds, one naturally starts to move towards continuous processing to smooth out the processing resources needed and reduce latency.
LinkedIn, for example, has almost no batch data collection at all. The majority of our data is either activity data or database changes, both of which occur continuously. In fact, when you think about any business, the underlying mechanics are almost always a continuous process—events happen in real-time, as Jack Bauer would tell us. When data is collected in batches, it is almost always due to some manual step or lack of digitization or is a historical relic left over from the automation of some non-digital process. Transmitting and reacting to data used to be very slow when the mechanics were mail and humans did the processing. A first pass at automation always retains the form of the original process, so this often lingers for a long time.
Production “batch” processing jobs that run daily are often effectively mimicking a kind of continuous computation with a window size of one day. The underlying data is, of course, always changing. These were actually so common at LinkedIn (and the mechanics of making them work in Hadoop so tricky) that we implemented a whole framework for managing incremental Hadoop workflows.
Seen in this light, it is easy to have a different view of stream processing: it is just processing which includes a notion of time in the underlying data being processed and does not require a static snapshot of the data so it can produce output at a user-controlled frequency instead of waiting for the “end” of the data set to be reached. In this sense, stream processing is a generalization of batch processing, and, given the prevalence of real-time data, a very important generalization.
So why has the traditional view of stream processing been as a niche application? I think the biggest reason is that a lack of real-time data collection made continuous processing something of an academic concern.
I think the lack of real-time data collection is likely what doomed the commercial stream-processing systems. Their customers were still doing file-oriented, daily batch processing for ETL and data integration. Companies building stream processing systems focused on providing processing engines to attach to real-time data streams, but it turned out that at the time very few people actually had real-time data streams. Actually, very early at my career at LinkedIn, a company tried to sell us a very cool stream processing system, but since all our data was collected in hourly files at that time, the best application we could come up with was to pipe the hourly files into the stream system at the end of the hour! They noted that this was a fairly common problem. The exception actually proves the rule here: finance, the one domain where stream processing has met with some success, was exactly the area where real-time data streams were already the norm and processing had become the bottleneck.
Even in the presence of a healthy batch processing ecosystem, I think the actual applicability of stream processing as an infrastructure style is quite broad. I think it covers the gap in infrastructure between real-time request/response services and offline batch processing. For modern internet companies, I think around 25% of their code falls into this category.
It turns out that the log solves some of the most critical technical problems in stream processing, which I’ll describe, but the biggest problem that it solves is just making data available in real-time multi-subscriber data feeds. For those interested in more details, we have open sourced Samza, a stream processing system explicitly built on many of these ideas. We describe a lot of these applications in more detail in the documentation here.
Data flow graphs
The most interesting aspect of stream processing has nothing to do with the internals of a stream processing system, but instead has to do with how it extends our idea of what a data feed is from the earlier data integration discussion. We discussed primarily feeds or logs of primary data—the events and rows of data produced in the execution of various applications. But stream processing allows us to also include feeds computed off other feeds. These derived feeds look no different to consumers then the feeds of primary data from which they are computed. These derived feeds can encapsulate arbitrary complexity.
Let’s dive into this a bit. A stream processing job, for our purposes, will be anything that reads from logs and writes output to logs or other systems. The logs they use for input and output join these processes into a graph of processing stages. Indeed, using a centralized log in this fashion, you can view all the organization’s data capture, transformation, and flow as just a series of logs and processes that write to them.
A stream processor need not have a fancy framework at all: it can be any process or set of processes that read and write from logs, but additional infrastructure and support can be provided for helping manage processing code.
The purpose of the log in the integration is two-fold.
First, it makes each dataset multi-subscriber and ordered. Recall our “state replication” principle to remember the importance of order. To make this more concrete, consider a stream of updates from a database—if we re-order two updates to the same record in our processing we may produce the wrong final output. This order is more permanent than what is provided by something like TCP as it is not limited to a single point-to-point link and survives beyond process failures and reconnections.
Second, the log provides buffering to the processes. This is very fundamental. If processing proceeds in an unsynchronized fashion it is likely to happen that an upstream data producing job will produce data more quickly than another downstream job can consume it. When this occurs processing must block, buffer or drop data. Dropping data is likely not an option; blocking may cause the entire processing graph to grind to a halt. The log acts as a very, very large buffer that allows process to be restarted or fail without slowing down other parts of the processing graph. This isolation is particularly important when extending this data flow to a larger organization, where processing is happening by jobs made by many different teams. We cannot have one faulty job cause back-pressure that stops the entire processing flow.
Both Storm and Samza are built in this fashion and can use Kafka or other similar systems as their log.
Stateful Real-Time Processing
Some real-time stream processing is just stateless record-at-a-time transformation, but many of the uses are more sophisticated counts, aggregations, or joins over windows in the stream. One might, for example, want to enrich an event stream (say a stream of clicks) with information about the user doing the click—in effect joining the click stream to the user account database. Invariably, this kind of processing ends up requiring some kind of state to be maintained by the processor: for example, when computing a count, you have the count so far to maintain. How can this kind of state be maintained correctly if the processors themselves can fail?
The simplest alternative would be to keep state in memory. However if the process crashed it would lose its intermediate state. If state is only maintained over a window, the process could just fall back to the point in the log where the window began. However, if one is doing a count over an hour, this may not be feasible.
An alternative is to simply store all state in a remote storage system and join over the network to that store. The problem with this is that there is no locality of data and lots of network round-trips.
How can we support something like a “table” that is partitioned up with our processing?
Well recall the discussion of the duality of tables and logs. This gives us exactly the tool to be able to convert streams to tables co-located with our processing, as well as a mechanism for handling fault tolerance for these tables.
A stream processor can keep it’s state in a local “table” or “index”—a bdb, leveldb, or even something more unusual such as a Lucene or fastbit index. The contents of this this store is fed from its input streams (after first perhaps applying arbitrary transformation). It can journal out a changelog for this local index it keeps to allow it to restore its state in the event of a crash and restart. This mechanism allows a generic mechanism for keeping co-partitioned state in arbitrary index types local with the incoming stream data.
When the process fails, it restores its index from the changelog. The log is the transformation of the local state into a sort of incremental record at a time backup.
This approach to state management has the elegant property that the state of the processors is also maintained as a log. We can think of this log just like we would the log of changes to a database table. In fact, the processors have something very like a co-partitioned table maintained along with them. Since this state is itself a log, other processors can subscribe to it. This can actually be quite useful in cases when the goal of the processing is to update a final state and this state is the natural output of the processing.
When combined with the logs coming out of databases for data integration purposes, the power of the log/table duality becomes clear. A change log may be extracted from a database and indexed in different forms by various stream processors to join against event streams.
We give more detail on this style of managing stateful processing in Samza and a lot more practical examples here.
Log Compaction
Of course, we can’t hope to keep a complete log for all state changes for all time. Unless one wants to use infinite space, somehow the log must be cleaned up. I’ll talk a little about the implementation of this in Kafka to make it more concrete. In Kafka, cleanup has two options depending on whether the data contains keyed updates or event data. For event data, Kafka supports just retaining a window of data. Usually, this is configured to a few days, but the window can be defined in terms of time or space. For keyed data, though, a nice property of the complete log is that you can replay it to recreate the state of the source system (potentially recreating it in another system).
However, retaining the complete log will use more and more space as time goes by, and the replay will take longer and longer. Hence, in Kafka, we support a different type of retention. Instead of simply throwing away the old log, we remove obsolete records—i.e. records whose primary key has a more recent update. By doing this, we still guarantee that the log contains a complete backup of the source system, but now we can no longer recreate all previous states of the source system, only the more recent ones. We call this feature log compaction.
原文作者:Jay Kreps 译者:LitStone, super0555, 几点人, cmy00cmy, tnjin, 928171481, 黄劼等。来自:开源中国