最近找工作看到很多大数据处理的基本都是要求Hadoop、MapReduce之类的,其中如果有熟悉Storm、Spark的会有加分。Storm、Spark属于大数据实时处理的框架,MapReduce是属于离线的、非实时的。这几天都在看Storm,(Spark之前有看过,不过当时耽搁了,不是很深入,之前看的是《Fast Data Processing with Spark》)想记录一下自己的学习过程,希望可以和同学们共同探讨。。。
看一个技术框架,或许首先应该先google下,看下别人写的博客,技术文章等,大概了解下,如果英语过关,可以直接看官网文档(不过个人感觉storm的文档有点不是很好,根据其提供的例子弄了好些天才搞定,当然或许是自己能力太差也说不定);然后就是搭建集群(Storm集群、当然单机也可以学习的);运行自己的第一个“Word Count”程序(大数据的“hello world”?);结合一些讲原理的书籍或者博客看WordCount的代码,对照理解;尝试自己写代码,并运行,总结经验;之后就是慢慢的积累过程了!(以上纯属个人观点)
Storm简介:http://www.searchtb.com/2012/09/introduction-to-storm.html , 一篇阿里的技术博客,感觉不错(有一定Hadoop基础看着理解会好点)。里面讲到了Storm记录级容错的原理,看的不是很明白(能力不够呀!)。
一、Storm安装配置
首先参考了官网的配置:https://storm.apache.org/documentation/Setting-up-development-environment.html ,跑着有问题。接着,网上找了一篇:http://www.michael-noll.com/tutorials/running-multi-node-storm-cluster/,这个感觉靠谱点(我就是参考这个配置的)。
集群配置:
node101 : CentOS6.5 64bit、2G 内存、nimbus、ui、zookeeper(虚拟机)
node102 : CentOS6.5 64bit、1G 内存、supervisor(虚拟机)
node103 : CentOS6.5 64bit、1G 内存、supervisor(虚拟机)
版本: Storm:0.9.3 、Zookeeper:zookeeper-3.4.6
1. 安装并启动Zookeeper
1)下载Zookeeper,解压到/opt文件夹(这个文件夹可以自定义)
2)进入解压后的bin目录执行./zkServer.sh start ;
3)查看是否启动:
[root@node101 bin]# ./zkServer.sh status JMX enabled by default Using config: /opt/zookeeper-3.4.6/bin/../conf/zoo.cfg Mode: standalone2. 安装并配置Storm:
(jdk不用说了,自己安装吧)
1)下载ZeroMQ、JZMQ并安装
下载地址:https://github.com/downloads/saltstack/salt/zeromq-2.1.7-1.el6.x86_64.rpm 、https://s3.amazonaws.com/cdn.michael-noll.com/rpms/jzmq-2.1.0.el6.x86_64.rpm。 这两个rpm官网没有说要安装,不安装的话,后面可能会有问题。(我现在都不清楚后面出现的问题是否是因为这个没有安装)
yum install zero* yum install jzmq*
2)下载Storm,并配置;
在Storm官网下载0.9.3版本,并解压到/opt目录。修改conf/storm.yaml如下:
########### These MUST be filled in for a storm configuration storm.zookeeper.servers: - "node101" # - "server2" # # storm.local.dir: "/opt/data/storm" nimbus.host: "node101" supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703
然后使用scp把这个文件拷贝到node102、node103机器对应的文件夹中
3)启动storm相关进程可以在bin目录建立一个start.sh文件,并赋予执行权限;在nimbus机器建立的文件内容如下:
#!/bin/bash /opt/apache-storm-0.9.3/bin/storm nimbus > /dev/null 2>&1 & /opt/apache-storm-0.9.3/bin/storm ui > /dev/null 2>&1 &在supervisor机器建立的文件如下:
#!/bin/bash /opt/apache-storm-0.9.3/bin/storm supervisor > /dev/null 2>&1 &当然,也可以参考http://www.michael-noll.com/tutorials/running-multi-node-storm-cluster/使用supervision把Storm的相关进程管理起来,这样就可以直接使用service命令了。
分别在node101、node102、node103中执行./start.sh 即可启动Storm集群。
4) ui监控
浏览器访问:http://node101:8080 ,即可看到集群的监控,如下:
第一次启动的时候在Topology summary中不会有内容。
这样集群就启动了!
二、Storm第一个程序WordCount
WordCount程序参考《Getting-Started-With-Storm-Jonathan-Leibiusky-Gabriel-E_1276.pdf》
1) 下载https://github.com/storm-book/examples-ch02-getting_started/zipball/master ,导入到自建的java工程中,新建的java工程需要导入Storm的相关包。
2)修改WordCount使其可以在集群中运行:
a. 修改TopologyMain.java
package main; import spouts.WordReader; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; import bolts.WordCounter; import bolts.WordNormalizer; public class TopologyMain { public static void main(String[] args) throws InterruptedException { if(args.length!=4){ System.out.println(""); System.exit(-1); } //Topology definition TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("word-reader",new WordReader()); builder.setBolt("word-normalizer", new WordNormalizer()) .shuffleGrouping("word-reader"); builder.setBolt("word-counter", new WordCounter(),Integer.parseInt(args[2])) .fieldsGrouping("word-normalizer", new Fields("word")); //Configuration Config conf = new Config(); conf.put("wordsFile", args[0]); conf.setDebug(false); //Topology run conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1); if(args[3].equals("local")){ LocalCluster cluster = new LocalCluster(); cluster.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology()); try{ Thread.sleep(Long.parseLong(args[1])); } catch(Exception e){ e.printStackTrace(); } cluster.shutdown(); }else if ("distribute".equals(args[3])){ try { StormSubmitter.submitTopology("wc1-1", conf, builder. createTopology()); } catch (AlreadyAliveException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InvalidTopologyException e) { // TODO Auto-generated catch block e.printStackTrace(); } }else{ System.out.println("Wrong mode!"); System.exit(-1); } System.out.println(new java.util.Date()+": 任务已经提交到Strom集群!"); } }
这里的提交方式可以选择两种,一种是Local模式一种是分布式的。
b. WordCounter.java 修改@Override public void execute(Tuple input, BasicOutputCollector collector) { String str = input.getString(0); /** * If the word dosn't exist in the map we will create * this, if not We will add 1 */ if(!counters.containsKey(str)){ counters.put(str, 1); }else{ Integer c = counters.get(str) + 1; counters.put(str, c); } System.out.println("Counter.size:"+counters.size()+",input:"+str); }
3) 打包运行:
使用eclipse的export把代码打包到jar文件中,在node101中使用storm jar提交任务:
storm jar /opt/storm_user_lib/wc1.1.jar main.TopologyMain /opt/wc.txt 5000 2 distribute这里需要先在node102、node103机器的/opt目录下新建wc.txt文档,不然会报文件找不到的错误(其实只用在运行的节点上运行即可);
wc.txt:(可以自定义)
we are the future see me ok see you because what i am doing what the fuck
4)查看结果:
在storm 监控界面找到名字为 wc1-1的Topology,找到其wordcount的bolt,查看其运行节点及worker端口,如下
在节点node102中的logs中查看work-6702.log文件:
但是这里并没有看到cleanup的打印信息代码,这个是因为cleanup会在集群关闭的时候调用,所以,如果你把集群关闭就可以看到打印的结果了。(如何关闭集群?jps 查看相关的进程,kill -9直接杀掉)。
三、Storm Real-Life 例子
参考《Getting-Started-With-Storm-Jonathan-Leibiusky-Gabriel-E_1276.pdf》 chapter6
版本:Storm:0.9.3 ,Redis:2.8.19; Nodejs:0.12.0;jedis:2.6.2;
1)下载、编译、安装Redis、Nodejs:
redis
下载地址:http://redis.io/download
安装出错:http://www.cnblogs.com/la-isla-bonita/p/3582751.html
nodejs
下载地址:https://nodejs.org/download/
jedis
下载地址:https://github.com/xetorthio/jedis/releases ,下载源码使用mvn编译(使用mvn clean install -Dmaven.test.skip=true命令)
2)下载源码,并打包:
下载地址:https://github.com/storm-book/examples-ch06-real-life-app。
下载源码并导入后,如果没有使用mvn构建的话,会报错。首先把jedis jar包加入classpath路径中,同时也需要把jedis jar包加入到storm_home/lib/下面(三个节点都需要,其实应该只用两个supervisor的就行吧);
由于版本的原因,所以会有些地方报错,一般包括下面三个部分:
a. TopologyStarter.java :
修改redis、nodejs的机器名
public final static String REDIS_HOST = "node101"; public final static int REDIS_PORT = 6379; public final static String WEBSERVER = "http://node101:3000/news";去掉下面这句,应该是在21行左右;
// Logger.getRootLogger().removeAllAppenders();
由于虚拟机资源有限,所以把所有的并行都改成了2;
builder.setSpout("read-feed", new UsersNavigationSpout(), 2); builder.setBolt("get-categ", new GetCategoryBolt(), 2) .shuffleGrouping("read-feed"); builder.setBolt("user-history", new UserHistoryBolt(), 2) .fieldsGrouping("get-categ", new Fields("user")); builder.setBolt("product-categ-counter", new ProductCategoriesCounterBolt(), 2) .fieldsGrouping("user-history", new Fields("product"));提交任务改为集群提交:
// LocalCluster cluster = new LocalCluster(); // cluster.submitTopology("analytics", conf, builder.createTopology()); StormSubmitter.submitTopology("analytics", conf, builder. createTopology());b. ProductCategoriesCounterBolt.java、utilities/ProductsReader.java
redis.clients.jedis.exceptions.JedisConnectionException: java.net.SocketTimeoutException: Read timed out 或 java.lang.RuntimeException: java.lang.ClassCastException: java.lang.Long cannot be cast to [B一种改法是加大time out的时间,修改上面两个类中的reconnect方法
public void reconnect() { // this.jedis = new Jedis(host, port); this.jedis = new Jedis(host, port,10000); // 修改默认time out 到10s }
经过b。a的修改后,还会报错,需要修改上面这个类,把http相关的包改为storm下面的http包相关类
import org.apache.storm.http.HttpResponse; import org.apache.storm.http.client.HttpClient; import org.apache.storm.http.client.methods.HttpPost; import org.apache.storm.http.entity.StringEntity; import org.apache.storm.http.impl.client.DefaultHttpClient;
3) 运行,并查看结果
在node101中运行redis: nohup redis-server & ;
运行jar包:storm jar real-life1.1.jar storm.analytics.TopologyStarter ;
运行Nodejs:node webapp/app.js;
同时,查看Storm的监控,可以看到Storm的spout和bolt模块的传输数据记录:
实例来自《Getting-Started-With-Storm-Jonathan-Leibiusky-Gabriel-E_1276.pdf》。
例子跑完后,可以开始具体分析各个步骤了,暂时不想分析real-life的例子,有点复杂,可以先分析wordcount,同时结合《Getting-Started-With-Storm-Jonathan-Leibiusky-Gabriel-E_1276.pdf》这本电子书。
分享,成长,快乐
脚踏实地,专注
转载请注明blog地址:http://blog.csdn.net/fansy1990