VIA: https://github.com/nathanmarz/storm/wiki/Tutorial
通过这个手册,你可以了解到如何去创建Storm topologies并把它们部署到一个Storm集群。主要用到了Java语言,但是为了说明Storm的多语言支持的特性也使用了Python去实现一些例子。
这个手册中使用的例子来自于storm-starter工程。建议你同步下来此项目,并根据例子来学习。阅读Setting up development environment和Creating a new Storm project两篇文章去搭建你的环境。
一个Sotrm集群表面上看起来和Hadoop集群很像。在Hadoop上你运行的时"MapReduce Job",在Storm上运行的是"topologies"。"Job"和"topologies"之间还是有很大不同的 --最关键的一点是Mapreduce job最终是会完成的,而topology将会持续的执行(直到你终止它)。
Storm集群中有两种不同的节点:Master和Worker。Master节点启动的守护进程叫做"Numbus",这点和Hadoop的"JobTracker"很相似。Nimbus在集群众发布指令,给其他机器分配作业,并且监控失败。
每一个Worker启动的守护进程叫做"Supervisor"。Supervisor接收Nimbus分发给它的工作去启动或者停止真正的工作进程。每个工作进程执行一个topology的子集,topology是执行在多个worker进程乃至许多机器上的。
Nimbus和Supervisors之间是通过Zookeeper集群来协作的,Nimbus守护进程和Supervisor守护进程是fail-fast和无状态的;所有的状态都存在Zookeeper和本地磁盘中。这意味着你可以随意kill -9 Nimbus或者Supervisors,并且当它们再次启动的时候就像什么都没有发生过一样。这样的设计保证了Storm集群的稳定性。
为了在Storm上做实时计算,你需要创建一个"topologies"。Topology是一个计算图。Topology每个节点包含了处理逻辑,节点之间的连接表明了数据应该怎样在它们之间传输。
启动一个topology还是非常简单的。首先,你需要把你的代码打包成一个jar文件。之后,你可以用下面的命令启动它:
$ storm jar all-my-code.jar backtype.storm.MyTopology arg1 arg2
这条指令带着参数arg1和arg2运行了backtype.storm.MyTopology这个类。这个类的main函数定义了toplogy并且提交到Nimbus。storm jar负责连接到Nimbus并上传你的jar文件。
因为topology只是Thrift结构的定义,并且Nimbus是一个Thrift服务,所以你可以使用任意的编程语言去创建提交topologies。之前的例子是用基于JVM的语言完成的。可以从Running topologies on a production cluster来了解关于启动和停止topologies的更多信息。
Storm里最核心的概念就是"Stream"。Stream是一个没有边界的元组序列。Storm为了将一个stream转换为另一个新的stream提供了一些分布式的并且可靠的元语。例如你可以把tweets的信息流传送到话题的流。
Storm提供的流转换的基础元语是"spouts"和"bolt"。为了运行你的应用逻辑,你必须去实现Spouts和bolt的一些接口。
Spout是streams的源头。举个例子,Spout可以从Kestrel读取一组数据并把它们当作stream发送出去。或者spout也能调用Twitter的API去获取tweets并将它们作为stream发送出去。
Bolt去处理输入的所有的stream,并且也能发出新的stream。在复杂的stream传输过程中,像从tweets中计算stream的话题趋势,需要多个阶段,从而存在多个bolt。Bolt可以通过执行,过滤,聚合,关联,读写数据库等操作来做任何事情。
Spouts和bolt被打包成一个"topology",它是一个高度抽象的概念,你可以把它提交给Storm集群去执行。Toplogy是一个Spouts和Bolt之间传输stream的图。在这个图中的箭头表明了bolt订阅了哪些stream。当一个spout或者一个bolt发送一组数据到stream时,所有订阅这个stream的bolt都可以收到这组数据。
topology中节点之间的连接表明了数据传输的流向。举个例子,如果Spout A和Bolt B相连,Spout A又和Bolt C相连,Bolt B也和Bolt C相连,每当Spout A发送数据时,它都将发送给Bolt B和BoltC,当Bolt B发送出的数据也都会发送给Bolt C。
Storm中的每个topology节点的执行都是并行的。在你的topology中,你可以指定你想要每个节点执行的并行数,Storm将会在集群内启动这么多的线程去执行。
Topology的执行不会退出,除非你手动kill掉它。Storm将会自动分配失败的任务。另外,即使在机器宕机消息被丢失的情况下,storm也会保证不会丢失数据。
Storm用元组作为其数据模型。一个元组是一组命名的数值列表,一个元组可以表示任意的类型。Storm支持所有的原生类型,字符串,字节,数组都可以作为元组的值。如果你想使用其他的类型,你只需要去实现这个类型的序列化的方法。
Topology中的每个节点必须声明它的输出字段。举个例子,Bolt声明它发出的两个元组分别是"double"和"triple"。
public class DoubleAndTripleBolt extends BaseRichBolt {
private OutputCollectorBase _collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollectorBase collector) {
_collector = collector;
}
@Override
public void execute(Tuple input) {
int val = input.getInteger(0);
_collector.emit(input, new Values(val*2, val*3));
_collector.ack(input);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("double", "triple"));
}
}
declareOutputFields
函数声明了输出字段["double", "triple"]。剩下的函数将会在下面的章节里介绍。
让我们看一眼一个简单的topology去探索一下它的定义和它的代码结构。我们先看下它的定义ExclamationTopology
。这个例子来自storm-starter
:
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("words", new TestWordSpout(), 10);
builder.setBolt("exclaim1", new ExclamationBolt(), 3)
.shuffleGrouping("words");
builder.setBolt("exclaim2", new ExclamationBolt(), 2)
.shuffleGrouping("exclaim1");
这个Topology包含了一个spout和两个bolt。Spout发送出一个单词,每个bolt在输入的字符串后面加上"!!!"。这些节点连成一条线:spout发给第一个bolt,第一个bolt再发给第二个。如果spout发出的元组是["bob"]和p["john"],那么第二个bolt将会输出单词["bob!!!!!!"]和["john!!!!!!"]。
代码里定义节点使用方法setSpout
和setBolt
。这些方法用一个用户指定的id,一个处理逻辑的函数,还有一个你想启动的并发数作为参数。在这个例子里,spout给定的id是"words",两个bolt给定的id分别是"exclaim1"和"exclaim2"。
这个对象的处理逻辑实现了IRichSpout和IRichBolt的接口。
最后一个参数是指定你想启动多少并行的实例,这个参数是可选的。它实际上是指定在集群中需要启动多少线程。如果你忽略它,Storm将会默认分配一个线程。
setBolt
返回一个InputDeclarer对象,用来定义Bolt的输出。这里bolt"exclami1"声明它需要通过"shuffle grouping的方式去获取"spout"words"的所有输出,"shuffle grouping"意味着元组是以随机分发到各个bolt的任务上的。各个组件之间可以由很多不同的方式来分发这些数据,后面我们将详细介绍这点。
如果你期望bolt "exclaim2"去读所有发自spout "words"和bolt "exclaim1"的数据,你可以这么定义"exclaim2":
builder.setBolt("exclaim2", new ExclamationBolt(), 5)
.shuffleGrouping("words")
.shuffleGrouping("exclaim1");
这里你就可以看到可以声明多一个源作为一个Bolt的输入。
让我们深入了解下这个topology的spout和bolt的实现。Spout负责发送新的消息到topology。在这个topology中,TestWordSpout
每100ms发送list["nathan", "mike", "jackson", "golda", "bertels"]内随机的一个单词作为元组。TestWordSpout
的nextTuple()
实现就像下面这样:
public void nextTuple() {
Utils.sleep(100);
final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
final Random rand = new Random();
final String word = words[rand.nextInt(words.length)];
_collector.emit(new Values(word));
}
你可以看到这个实现是非常的直接了当的。
ExclamationBolt
在输入字符串后添加"!!!"。让我们看下它的所有实现:
public static class ExclamationBolt implements IRichBolt {
OutputCollector _collector;
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
public void execute(Tuple tuple) {
_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
_collector.ack(tuple);
}
public void cleanup() {
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
public Map getComponentConfiguration() {
return null;
}
}
prepare
方法给bolt提供了OutputCollector
,它用来从bolt中发送元组。Bolt可以在任意时刻发送元组 -- 在prepare
, execute
, 或者cleanup
方法中都可以,甚至可以在另外的线程中异步发送。这个prepare
的实现仅仅简单的保存了OutputCollector
的实例,为了待会儿在execute
方法中使用。
execute
方法从bolt的输入中收到了一个元组。ExclamationBolt
从元组中获取第一个字段,并且发送一个追加了"!!!"的字符串的新元组,如果你实现了一个bolt订阅了多个输入源,你可以使用Tuple#getSourceComponent
方法来知道它是来自于哪个组件。
在execute
中还有几个东西需要介绍,输入的元组作为emit
的第一参数被传进来,还有在最后一行会送一个ack。这是storm来保证不丢数据实现的API的一部分,后面我们将详细介绍。
cleanup
方法在Bolt被关闭的时候调用,它清理我们打开的所有资源。这里没有保证这个方法一定会在集群中被调用:举个例子,如果机器上的任务正在爆发性的增长,那么就不能调用这个方法。cleanup
方法更倾向于你使用本地模式时执行,你可以执行和结束掉很多topology来避免资源的泄漏。
declareOutputFields
方法声明了ExclamationBolt
会发送一个携带一个字段名为"word"的元组。
getComponentConfiguration
方法允许你配置组件执行的各个方面。这是一个高端的话题,在Configuration你可以了解到更多。
像cleanup
和getComponentConfiguration
两个方法通常时不需要在bolt中实现的。你可以通过使用基类默认的实现更加简洁的定义bolt。ExclamationBolt
可以简单的继承于BaseRichBolt
,就像这样:
public static class ExclamationBolt extends BaseRichBolt {
OutputCollector _collector;
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
public void execute(Tuple tuple) {
_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
_collector.ack(tuple);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
让我们看看怎么在本地模式运行ExclamationTopology
,观察下它时如何工作的。
Storm由两个工作模式:本地模式和分布式。在本地模式中,Storm的执行完全是在一个进程内,用多个线程来模拟多个节点。本地模式对于topology的测试和开发来说是非常有用的。当你照着storm-starter中的例子运行时,它就启动的是本地模式,你可以看到每个组件发出的消息。你可以从Local mode了解到更多的知识。
在分布式模式下,Storm作为一个集群来工作。当你提交一个topology到master时,你也提交了所有topology需要的代码。Master将会分发你的代码并分配worker去执行你的topology。如果worker宕机了,Master将会重新分配它们到其他地方。你可以从Running topologies on a production cluster这里了解到更多关于topology在集群环境下执行的知识。
ExclamationTopology
在本地模式执行的代码:
Config conf = new Config();
conf.setDebug(true);
conf.setNumWorkers(2);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();
首先,通过创建一个LocalCluster
对象,定义了一个进程内集群。提交topology到这个虚拟的集群和提交到分布式集群的方法是一样的。它通过submitTopology
提交了一个topology到LocaoCluster
,它有三个参数,一个是执行的topology的名字,还有它的配置,以及topology的实例。
这个名字是用来让你kill掉topology时使用的,topology永远不会停止执行,直到你杀死它。
这个配置时用来调整topology的各个方面的。下面两个配置是经常被用到的:
TOPOLOGY_WORKERS
(使用setNumWorkers
来设置) 指定集群分配多少进程来执行topology。每个组件都将会以多线程模式执行。给组件分配线程的数目可以通过setBolt
和setSpout
方法来配置。每个worker进程包含了多个线程,具体点来说,你可以在配置中设置300个线程,并指定使用50个worker进程。那么你的每个worker进程将会执行6个线程,它们可能会属于不同的组件。你可以调整这些数目来提高性能。
TOPOLOGY_DEBUG
(使用setDebug
来设置) 当被设置喂true
时,意味着storm将会记录所有发送的消息。这个对本地模式调试时非常有用,但是你不应该在集群模式中使用它。
你还可以对topology设置更多的配置,不同配置的详细信息可以从the Javadoc for Config了解到。
你可以从这里Creating a new Storm project学习更多的关于开发环境部署以便可以让你在eclipse中执行本地模式。
stream的分组指名了两个组件间的数据流通是以什么样的方式进行的。需要注意的一点是,spout和bolt是并行执行在整个集群上的多个任务。从任务层面我们看它们之间的关系,就像下面这样:
有一个Bolt A向Bolt B发送一个元组的任务,它应该将元组发给哪个任务?
stream分组描述了任务之间元组发送的方式。在我们深入了解不同stream分组前,让我们先看一下storm-starter中的另外一个topology。WordCountTopology从spout中读取一句话,输出stream到WordCountBolt
,来统计句子中单词总共出现的次数:
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("sentences", new RandomSentenceSpout(), 5);
builder.setBolt("split", new SplitSentence(), 8)
.shuffleGrouping("sentences");
builder.setBolt("count", new WordCount(), 12)
.fieldsGrouping("split", new Fields("word"));
SplitSentence
把它收到的巨资拆分成单词发送出去,WordCount
在内存中构造了一个单词和个数的对照表来统计每个单词出现的次数。每当WordCount
收到一个单词,它都会去更新它的状态并发送出一个新的单词计数。
这里在stream分钟上有一点点的不同。
最简单的分组类型被叫做"shuffle 分组",它随机的分发元组到各个任务上。它被用在WordCountTopology
从RandomSentenceSpout
发送给SplitSentence
bolt的过程里。它可以有效的把元组平均分发给SplitSentence
任务。
一个有趣的分组类型是"fields分组"。在SplitSentence
bolt和WordCount
bolt间使用的就是Fields分组。它让相同的字段总是发送到相同的任务上去。否则不同的任务将会看到相同的单词,它们会由于没有收到完整的信息而发送出一个错误的结果。Fields分组使用字段的子集分组stream。相同字段值将会发送给相同的任务。WordCount
使用Fields分组订阅了SplitSentence
的输出,并且使用了"word"字段。相同的单词将会总是发送到相同的任务上去出去,从而是bolt得到一个正确的输出。
Fields分组是实现Stream join和Stream aggregations的基础。Fields分组的机制是给予mod hash的。
你还可以在Concepts这里找到其他stream分组的介绍。
Bolt可以使用任意语言来定义,使用其他语言写的Bolt将作为子进程执行,Storm通过标准输入输出使用JSON格式的消息和它们进行通讯。通信协议只需要100行左右的适配器库,并且storm已经提供了ruby,python和fancy三种语言的库。
这个是WordCountTopology
的SplitSentence
的定义:
public static class SplitSentence extends ShellBolt implements IRichBolt {
public SplitSentence() {
super("python", "splitsentence.py");
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
SplitSentence
重写了ShellBolt
方法,还声明了它是用python
执行splitsentence.py
的。下面是splitsentence.py
的实现:
import storm
class SplitSentenceBolt(storm.BasicBolt):
def process(self, tup):
words = tup.values[0].split(" ")
for word in words:
storm.emit([word])
SplitSentenceBolt().run()
你可以从Using non-JVM languages with Storm了解到更多关于使用其他语言实现spout和bolt的相关信息。
文章的开头我们跳过了关于发送元组的部分内容。你可以从Guaranteeing message processing这里了解到Storm是怎么去保证每条消息都会被处理的,以及作为用户应该怎样利用Storm的高可靠性。
Storm保证了每个消息至少会在topology内传输一次。我们可能会问:"如果在Storm上做计数,你怎么去保证它不会重复统计呢?"。Storm有事务性的topology,它可以保证准确的一次消息通知。从这儿可以了解更多。
这片手册讲解了Storm基础的流式处理。基于Storm还可以做更多的事情。RPC就是其中一个最有意思的应用。点击这儿可以了解更多关于RPC的知识。
手册给出了一个关于开发,测试,部署storm topology的概览。剩下的文档将会带你去更深入的了解Storm使用中的各个方面。