IT博客汇
  • 首页
  • 精华
  • 技术
  • 设计
  • 资讯
  • 扯淡
  • 权利声明
  • 登录 注册

    [原]Storm实例:实时单词计数

    fansy1990发表于 2015-03-27 17:23:29
    love 0

    软件版本:Storm:0.9.3 ,Redis:2.8.19;jedis:2.6.2;

    代码及Jedis下载:Storm实时单词计数

    Storm应用场景--实时单词计数,有点类似《Getting Started with Storm》中的chapter6的real-life app。

    场景描述:

    1. 使用一个java程序每间隔一定时间向Redis数据库A中存入数据;

    2. Storm的Spout读取Redis数据库A中的数据,读取后删除Redis中的数据;

    3. Storm的SplitBolt读取Spout的输出,对其进行解析,并输出;

    4. Storm的CountBolt对SplitBolt的数据进行计数,并每隔一定间隔把数据存储在Redis数据库B中;

    5. 另外的java程序定时读取Redis数据库B中的数据,并打印;

    具体实现:

    1. Java定时向Redis发送数据

    while(true){// 每次发送3个数据
    			try {
    				Thread.sleep(200);// 每200毫秒产生一次数据
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			} 
    			interval ++; 
    			int index = random.nextInt(normal.length);
    			if(!jedis.exists("0")){// 如果不存在key说明已经被取走了,就再次产生,否则不产生
    				jedis.set("0",normal[index]);
    			}
    				index = random.nextInt(normal.length);
    			if(!jedis.exists("1")){	
    				jedis.set("1", normal[index]);
    			}
    			index = random.nextInt(normal.length);
    			if(!jedis.exists("2")){
    				jedis.set("2", normal[index]);
    			}
    			
    			if(interval*200/1000==2*60) {// 每间隔200毫秒产生数据后,产生了2分钟,共2*60*1000/200*3 个数据记录
    				// 暂停 5分钟
    				System.out.println(new java.util.Date()+":数据暂定5分钟产生...");
    				try {
    					interval=0;
    					Thread.sleep(5*60*1000);
    					
    				} catch (InterruptedException e) {
    					e.printStackTrace();
    				}
    				System.out.println(new java.util.Date()+":5分钟暂停完成,继续产生数据...");
    			
    			}
    		}

    这里使用一个固定的字符串数组,每次从里面随机抽取三个字符串,使用Jedis存储到Redis的数据库中;

    2. Spout读取Redis数据

    @Override
    	public void nextTuple() {
    		long interval =0;
    		while(true){// 获取数据
    			interval++;
    			String zero = getItem("0");
    			String one = getItem("1");
    			String two =  getItem("2");
    			
    			try {
    				Thread.sleep(200);// 每200毫秒发送一次数据
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			} 
    			if(zero==null||one==null||two==null){
    				// do nothing
    				// 没有数据
    //				if(interval%15==0){
    //				}
    			}else{
    				String tmpStr =zero+","+one+","+two;
    				if(thisTaskId==tmpStr.hashCode()%numTasks){ // spout负载均衡
    					this.collector.emit(new Values(tmpStr));
    				
    					if(interval%15==0&&"fast".equals(slow_fast)){
    						System.out.println(RedisUtils.getCurrDateWithInfo(String.valueOf(thisTaskId),
    								taskId, "Spout:["+zero+","+one+","+two+"]"));
    					}else if("slow".equals(slow_fast)){
    						System.out.println(RedisUtils.getCurrDateWithInfo(String.valueOf(thisTaskId),
    								taskId, "Spout:["+zero+","+one+","+two+"]"));
    					}else{
    						new RuntimeException("Wrong argument!");
    					}
    				}
    			}
    			
    		}	
    	}
    这里使用了负载均衡,Spout处理的数据按task进行分隔。

    getItem用于从Redis中获取数据,并删除对应的数据,代码如下:

    /**
    	 * Redis中获取键值并删除对应的键
    	 * @param index
    	 */
    	private String getItem(String index){
    		if(!jedis.exists(index)){
    			return null;
    		}
    		String val = jedis.get(index);
    //		if(val==null||"null".equals("null")){
    //			return ;
    //		}
    		
    		jedis.del(index);
    		return val;
    	}
    3. SplitBolt就是一般的单词分隔代码:

    public void execute(Tuple input, BasicOutputCollector collector) {
            interval++;
    		String sentence = input.getString(0);
    		if(interval%15==0&&"fast".equals(slow_fast)){
    //        System.out.println(new java.util.Date()+":ConponentId:"+conponentId+",taskID:"+taskId+
    //        		"splitBolt:"+sentence);
    			System.out.println(RedisUtils.getCurrDateWithInfo(conponentId, taskId, "splitBolt:"+sentence));
    		}else if("slow".equals(slow_fast)){
    			System.out.println(RedisUtils.getCurrDateWithInfo(conponentId, taskId, "splitBolt:"+sentence));
    		}
            String[] words = sentence.split(",");
            for(String word : words){
                word = word.trim();
                if(!word.isEmpty()){
                    word = word.toLowerCase();
                    collector.emit(new Values(word));
                }
            }
    	}

    4. CountBolt进行单词计数,并向Redis数据库中存储单词的计数

    public void execute(Tuple input, BasicOutputCollector collector) {
    		interval++;
    		String str = input.getString(0);
    		/**
    		 * If the word dosn't exist in the map we will create
    		 * this, if not We will add 1 
    		 */
    		if(!counters.containsKey(str)){
    			counters.put(str, 1);
    		}else{
    			Integer c = counters.get(str) + 1;
    			counters.put(str, c);
    		}
    		
    		// 每records条数据则向向数据库中更新
    		if(interval%records==0){
    			for(Map.Entry m :counters.entrySet()){	
    				jedis.set(m.getKey(), String.valueOf(m.getValue()));// 
    				
    			}
    		}
    }
    5. Java程序定时读取Redis中单词计数,并打印

    private void read() {
    		System.out.println("数据获取开始。。。,10s后打印。。。");
    		long interval =0;
    		while(true){// 获取数据
    			interval++;
    			Set keys = jedis.keys("*");
    			for(String key:keys){
    				push2Map(key);
    			}
    //			push2Map("one");
    			try {
    				Thread.sleep(200);// 每200毫秒获取一次数据
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			} 
    			if(interval*200/1000==10) {// 每10秒打印一次
    				interval=0;
    				printMap();
    			}
    		}		
    	}

    Storm作为实时大数据处理框架,从这个小例子中就可以感受一二。


    ps:相关调用接口:

    System.out.println("\nwc.redis.WCTopology  " +
    					"    "+
    					" ");
    打包使用storm jar命令运行的时候,其中的参数解释如下:

    storeFrequent : CountBolt每多少条记录往Redis数据库中存储一次数据;

    num_works : worker的数量;

    parallel_spout :Spout并行的数量;

    parallel_split_bolt :SplitBolt并行的数量;

    parallel_count_bolt :CountBolt并行的数量;

    slow|fast :在Spout/SplitBolt/CountBolt中日志打印的频率;

    printWC:在CountBolt中的日志是否打印(true|false);


    分享,成长,快乐

    脚踏实地,专注

    转载请注明blog地址:http://blog.csdn.net/fansy1990




沪ICP备19023445号-2号
友情链接