IT博客汇
  • 首页
  • 精华
  • 技术
  • 设计
  • 资讯
  • 扯淡
  • 权利声明
  • 登录 注册

    [原]Storm安装配置及Real-Life实例

    fansy1990发表于 2015-03-24 19:08:15
    love 0

    最近找工作看到很多大数据处理的基本都是要求Hadoop、MapReduce之类的,其中如果有熟悉Storm、Spark的会有加分。Storm、Spark属于大数据实时处理的框架,MapReduce是属于离线的、非实时的。这几天都在看Storm,(Spark之前有看过,不过当时耽搁了,不是很深入,之前看的是《Fast Data Processing with Spark》)想记录一下自己的学习过程,希望可以和同学们共同探讨。。。

    看一个技术框架,或许首先应该先google下,看下别人写的博客,技术文章等,大概了解下,如果英语过关,可以直接看官网文档(不过个人感觉storm的文档有点不是很好,根据其提供的例子弄了好些天才搞定,当然或许是自己能力太差也说不定);然后就是搭建集群(Storm集群、当然单机也可以学习的);运行自己的第一个“Word Count”程序(大数据的“hello world”?);结合一些讲原理的书籍或者博客看WordCount的代码,对照理解;尝试自己写代码,并运行,总结经验;之后就是慢慢的积累过程了!(以上纯属个人观点)

    Storm简介:http://www.searchtb.com/2012/09/introduction-to-storm.html , 一篇阿里的技术博客,感觉不错(有一定Hadoop基础看着理解会好点)。里面讲到了Storm记录级容错的原理,看的不是很明白(能力不够呀!)。

    一、Storm安装配置

    首先参考了官网的配置:https://storm.apache.org/documentation/Setting-up-development-environment.html ,跑着有问题。接着,网上找了一篇:http://www.michael-noll.com/tutorials/running-multi-node-storm-cluster/,这个感觉靠谱点(我就是参考这个配置的)。

    集群配置:

    node101 : CentOS6.5 64bit、2G 内存、nimbus、ui、zookeeper(虚拟机)

    node102 : CentOS6.5 64bit、1G 内存、supervisor(虚拟机)

    node103 : CentOS6.5 64bit、1G 内存、supervisor(虚拟机)

    版本: Storm:0.9.3 、Zookeeper:zookeeper-3.4.6

    1. 安装并启动Zookeeper

    1)下载Zookeeper,解压到/opt文件夹(这个文件夹可以自定义)

    2)进入解压后的bin目录执行./zkServer.sh start ;

    3)查看是否启动:

    [root@node101 bin]# ./zkServer.sh status
    JMX enabled by default
    Using config: /opt/zookeeper-3.4.6/bin/../conf/zoo.cfg
    Mode: standalone
    2. 安装并配置Storm:

    (jdk不用说了,自己安装吧)

    1)下载ZeroMQ、JZMQ并安装

    下载地址:https://github.com/downloads/saltstack/salt/zeromq-2.1.7-1.el6.x86_64.rpm 、https://s3.amazonaws.com/cdn.michael-noll.com/rpms/jzmq-2.1.0.el6.x86_64.rpm。 这两个rpm官网没有说要安装,不安装的话,后面可能会有问题。(我现在都不清楚后面出现的问题是否是因为这个没有安装)

    yum install zero*
    yum install jzmq*

    2)下载Storm,并配置;

    在Storm官网下载0.9.3版本,并解压到/opt目录。修改conf/storm.yaml如下:

    ########### These MUST be filled in for a storm configuration
     storm.zookeeper.servers:
         - "node101"
    #     - "server2"
    #
    #
     storm.local.dir: "/opt/data/storm" 
     nimbus.host: "node101"
    
     supervisor.slots.ports:
            - 6700
            - 6701
            - 6702
            - 6703

    然后使用scp把这个文件拷贝到node102、node103机器对应的文件夹中

    3)启动storm相关进程

    可以在bin目录建立一个start.sh文件,并赋予执行权限;在nimbus机器建立的文件内容如下:

    #!/bin/bash
    
    /opt/apache-storm-0.9.3/bin/storm nimbus > /dev/null 2>&1 &
    /opt/apache-storm-0.9.3/bin/storm ui > /dev/null 2>&1 &
    在supervisor机器建立的文件如下:

    #!/bin/bash
    
    /opt/apache-storm-0.9.3/bin/storm supervisor > /dev/null 2>&1 &
    当然,也可以参考http://www.michael-noll.com/tutorials/running-multi-node-storm-cluster/使用supervision把Storm的相关进程管理起来,这样就可以直接使用service命令了。

    分别在node101、node102、node103中执行./start.sh 即可启动Storm集群。

    4) ui监控

    浏览器访问:http://node101:8080 ,即可看到集群的监控,如下:



    第一次启动的时候在Topology summary中不会有内容。

    这样集群就启动了!

    二、Storm第一个程序WordCount

    WordCount程序参考《Getting-Started-With-Storm-Jonathan-Leibiusky-Gabriel-E_1276.pdf》

    1) 下载https://github.com/storm-book/examples-ch02-getting_started/zipball/master ,导入到自建的java工程中,新建的java工程需要导入Storm的相关包。

    2)修改WordCount使其可以在集群中运行:

    a. 修改TopologyMain.java

    package main;
    import spouts.WordReader;
    import backtype.storm.Config;
    import backtype.storm.LocalCluster;
    import backtype.storm.StormSubmitter;
    import backtype.storm.generated.AlreadyAliveException;
    import backtype.storm.generated.InvalidTopologyException;
    import backtype.storm.topology.TopologyBuilder;
    import backtype.storm.tuple.Fields;
    import bolts.WordCounter;
    import bolts.WordNormalizer;
    
    
    public class TopologyMain {
    	public static void main(String[] args) throws InterruptedException {
            
    		if(args.length!=4){
    			System.out.println("   ");
    			System.exit(-1);
    		}
    		
            //Topology definition
    		TopologyBuilder builder = new TopologyBuilder();
    		builder.setSpout("word-reader",new WordReader());
    		builder.setBolt("word-normalizer", new WordNormalizer())
    			.shuffleGrouping("word-reader");
    		builder.setBolt("word-counter", new WordCounter(),Integer.parseInt(args[2]))
    			.fieldsGrouping("word-normalizer", new Fields("word"));
    		
            //Configuration
    		Config conf = new Config();
    		conf.put("wordsFile", args[0]);
    		conf.setDebug(false);
            //Topology run
    		conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
    		if(args[3].equals("local")){
    			LocalCluster cluster = new LocalCluster();
    			cluster.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology());
    		try{
    			Thread.sleep(Long.parseLong(args[1]));
    		}	catch(Exception e){
    			e.printStackTrace();	
    		}
    			
    			cluster.shutdown();
    		}else if ("distribute".equals(args[3])){
    			try {
    				StormSubmitter.submitTopology("wc1-1", conf,
    						 builder. createTopology());
    			} catch (AlreadyAliveException e) {
    				// TODO Auto-generated catch block
    				e.printStackTrace();
    			} catch (InvalidTopologyException e) {
    				// TODO Auto-generated catch block
    				e.printStackTrace();
    			}
    		}else{
    			System.out.println("Wrong mode!");
    			System.exit(-1);
    		}
    		System.out.println(new java.util.Date()+": 任务已经提交到Strom集群!");
    	}
    }
    

    这里的提交方式可以选择两种,一种是Local模式一种是分布式的。

    b. WordCounter.java 修改

    @Override
    	public void execute(Tuple input, BasicOutputCollector collector) {
    		String str = input.getString(0);
    		/**
    		 * If the word dosn't exist in the map we will create
    		 * this, if not We will add 1 
    		 */
    		if(!counters.containsKey(str)){
    			counters.put(str, 1);
    		}else{
    			Integer c = counters.get(str) + 1;
    			counters.put(str, c);
    		}
    		System.out.println("Counter.size:"+counters.size()+",input:"+str);
    	}

    修改其exec方法,在最后加上一句打印即可;

    3) 打包运行:

    使用eclipse的export把代码打包到jar文件中,在node101中使用storm jar提交任务:

    storm jar /opt/storm_user_lib/wc1.1.jar main.TopologyMain /opt/wc.txt 5000 2 distribute
    这里需要先在node102、node103机器的/opt目录下新建wc.txt文档,不然会报文件找不到的错误(其实只用在运行的节点上运行即可);

    wc.txt:(可以自定义)

    we are the future
    see me 
    ok
    see you 
    because what i am doing
    what the fuck

    4)查看结果:

    在storm 监控界面找到名字为 wc1-1的Topology,找到其wordcount的bolt,查看其运行节点及worker端口,如下

    在节点node102中的logs中查看work-6702.log文件:

    但是这里并没有看到cleanup的打印信息代码,这个是因为cleanup会在集群关闭的时候调用,所以,如果你把集群关闭就可以看到打印的结果了。(如何关闭集群?jps 查看相关的进程,kill -9直接杀掉)。

    三、Storm Real-Life 例子

    参考《Getting-Started-With-Storm-Jonathan-Leibiusky-Gabriel-E_1276.pdf》 chapter6

    版本:Storm:0.9.3 ,Redis:2.8.19; Nodejs:0.12.0;jedis:2.6.2;

    1)下载、编译、安装Redis、Nodejs:

    redis
    下载地址:http://redis.io/download
    安装出错:http://www.cnblogs.com/la-isla-bonita/p/3582751.html
    nodejs
    下载地址:https://nodejs.org/download/

    jedis

    下载地址:https://github.com/xetorthio/jedis/releases ,下载源码使用mvn编译(使用mvn clean install -Dmaven.test.skip=true命令)

    2)下载源码,并打包:

    下载地址:https://github.com/storm-book/examples-ch06-real-life-app。

    下载源码并导入后,如果没有使用mvn构建的话,会报错。首先把jedis jar包加入classpath路径中,同时也需要把jedis jar包加入到storm_home/lib/下面(三个节点都需要,其实应该只用两个supervisor的就行吧);

    由于版本的原因,所以会有些地方报错,一般包括下面三个部分:

    a. TopologyStarter.java :

    修改redis、nodejs的机器名

    public final static String REDIS_HOST = "node101";
    	public final static int REDIS_PORT = 6379;
    	public final static String WEBSERVER = "http://node101:3000/news";
    去掉下面这句,应该是在21行左右;

    //        Logger.getRootLogger().removeAllAppenders();

    由于虚拟机资源有限,所以把所有的并行都改成了2;

    builder.setSpout("read-feed", new UsersNavigationSpout(), 2);
            
            builder.setBolt("get-categ", new GetCategoryBolt(), 2)
            				.shuffleGrouping("read-feed");
            
            builder.setBolt("user-history", new UserHistoryBolt(), 2)
            				.fieldsGrouping("get-categ", new Fields("user"));
            
            builder.setBolt("product-categ-counter", new ProductCategoriesCounterBolt(), 2)
            				.fieldsGrouping("user-history", new Fields("product"));
    提交任务改为集群提交:

    //        LocalCluster cluster = new LocalCluster();
    //        cluster.submitTopology("analytics", conf, builder.createTopology());
            StormSubmitter.submitTopology("analytics", conf,
    				 builder. createTopology());
    b. ProductCategoriesCounterBolt.java、utilities/ProductsReader.java

    由于jedis使用中如果有超时,那就会报下面的错误:

    redis.clients.jedis.exceptions.JedisConnectionException: java.net.SocketTimeoutException: Read timed out
    或
    java.lang.RuntimeException: java.lang.ClassCastException: java.lang.Long cannot be cast to [B
    一种改法是加大time out的时间,修改上面两个类中的reconnect方法
    public void reconnect() {
    //		this.jedis = new Jedis(host, port);
    		this.jedis = new Jedis(host, port,10000); // 修改默认time out 到10s
    	}

    c. NewsNotifierBolt.java

    经过b。a的修改后,还会报错,需要修改上面这个类,把http相关的包改为storm下面的http包相关类

    import org.apache.storm.http.HttpResponse;
    import org.apache.storm.http.client.HttpClient;
    import org.apache.storm.http.client.methods.HttpPost;
    import org.apache.storm.http.entity.StringEntity;
    import org.apache.storm.http.impl.client.DefaultHttpClient;

    经过a、b、c的修改,现在可以把源码打成jar包并运行了!


    3) 运行,并查看结果

    在node101中运行redis: nohup redis-server & ;

    运行jar包:storm jar real-life1.1.jar storm.analytics.TopologyStarter ;

    运行Nodejs:node webapp/app.js;

    在浏览器打开http://node101:3000,即可看到下图


    点击某个产品,接着查看相关条目,多点击几次,就会有数据了,如下:


    同时,查看Storm的监控,可以看到Storm的spout和bolt模块的传输数据记录:


    实例来自《Getting-Started-With-Storm-Jonathan-Leibiusky-Gabriel-E_1276.pdf》。

    例子跑完后,可以开始具体分析各个步骤了,暂时不想分析real-life的例子,有点复杂,可以先分析wordcount,同时结合《Getting-Started-With-Storm-Jonathan-Leibiusky-Gabriel-E_1276.pdf》这本电子书。




    分享,成长,快乐

    脚踏实地,专注

    转载请注明blog地址:http://blog.csdn.net/fansy1990





沪ICP备19023445号-2号
友情链接