IT博客汇
  • 首页
  • 精华
  • 技术
  • 设计
  • 资讯
  • 扯淡
  • 权利声明
  • 登录 注册

    [原]Storm初体验

    liuzhoulong发表于 2014-01-09 18:21:22
    love 0

    Storm 是一个开源的、大数据处理系统,与其他系统不同,它旨在用于分布式实时处理且与语言无关。了解更多请自己google,安装过程也请自己搜索。

    做了一个简单的例子

    package mapstorm;
    
    import backtype.storm.Config;
    import backtype.storm.StormSubmitter;
    import backtype.storm.topology.TopologyBuilder;
    import backtype.storm.tuple.Fields;
    
    public class StormMain {
    
    	public static void main(String[] args) throws Exception {
    
    		TopologyBuilder builder = new TopologyBuilder();
            builder.setSpout("word-reader", new WordReader());
            builder.setBolt("word-normalizer", new WordNormalizer()).shuffleGrouping("word-reader");
            builder.setBolt("word-counter", new WordCounter(), 1).fieldsGrouping("word-normalizer", new Fields("word"));
    
            //Configuration
            Config conf = new Config();
            conf.put("wordsFile", args[0]);
            conf.setDebug(true);
           //conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
            
            StormSubmitter.submitTopology("wordCounterTopology", conf, builder.createTopology());
          //  Thread.sleep(1000);
            //StormSubmitter.("wordCounterTopology");
         //   StormSubmitter.shutdown();
            
           
             //Topology run
            //conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
            //LocalCluster cluster = new LocalCluster();
            //cluster.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology());
            //Thread.sleep(2000);
            //cluster.shutdown();
            //
    	}
    
    }

    package mapstorm;
    
    import java.util.HashMap;
    import java.util.Map;
    
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.BasicOutputCollector;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.base.BaseBasicBolt;
    import backtype.storm.tuple.Tuple;
    
    public class WordCounter extends BaseBasicBolt {
    	private static final long serialVersionUID = 5678586644899822142L;
    	Integer id;
        String name;
        Map counters;
    
    	@Override
    	public void execute(Tuple input, BasicOutputCollector collector) {
    		  String str = input.getString(0);
    	        System.out.println("WordCounter word "+ str);
    	        if(!counters.containsKey(str)){
    	            counters.put(str, 1);
    	        }else{
    	            Integer c = counters.get(str) + 1;
    	            counters.put(str, c);
    	        }
    		
    	}
    
    	@Override
    	public void declareOutputFields(OutputFieldsDeclarer declarer) {}
    
    	@Override
        public void cleanup() {
            System.out.println("-- Word Counter ["+name+"-"+id+"] --");
            for(Map.Entry entry : counters.entrySet()){
                System.out.println(entry.getKey()+": "+entry.getValue());
            }
            System.out.println("finish-----------");
        }
     
    
        @Override
        public void prepare(Map stormConf, TopologyContext context) {
            this.counters = new HashMap();
            this.name = context.getThisComponentId();
            this.id = context.getThisTaskId();
        }
    }
    



    package mapstorm;
    
    import backtype.storm.topology.BasicOutputCollector;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.base.BaseBasicBolt;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Tuple;
    import backtype.storm.tuple.Values;
    
    public class WordNormalizer extends BaseBasicBolt {
    	
    	public void cleanup() {
    		System.out.println("finish");
    	}
    
    	@Override
    	public void execute(Tuple input, BasicOutputCollector collector) {
    		 String sentence = input.getString(0);
    	        String[] words = sentence.split(" ");
    	        System.out.println("WordNormalizer recevie  "+ sentence);
    	        for(String word : words){
    	            word = word.trim();
    	            if(!word.isEmpty()){
    	                word = word.toLowerCase();
    	                System.out.println("WordNormalizer recevie "+ sentence+"words  "+ word);
    	                collector.emit(new Values(word));
    	            }
    	        }
    	}
    
    	@Override
    	public void declareOutputFields(OutputFieldsDeclarer declarer) {
    		declarer.declare(new Fields("word"));
    		
    	}
    
    }
    



    package mapstorm;
    
    import java.io.BufferedReader;
    import java.io.FileNotFoundException;
    import java.io.FileReader;
    import java.util.Map;
    
    import backtype.storm.spout.SpoutOutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.base.BaseRichSpout;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Values;
    
    public class WordReader extends BaseRichSpout {
    	
    	private SpoutOutputCollector collector;
        private FileReader fileReader;
        private String filePath;
        private boolean completed = false;
       
        public void ack(Object msgId) {
            System.out.println("OK:"+msgId);
        }
        public void close() {}
    
        public void fail(Object msgId) {
            System.out.println("FAIL:"+msgId);
        }
    
    	@Override
    	public void open(Map conf, TopologyContext context,
    			SpoutOutputCollector collector) {
    		 try {
    	            this.fileReader = new FileReader(conf.get("wordsFile").toString());
    	        } catch (FileNotFoundException e) {
    	            throw new RuntimeException("Error reading file ["+conf.get("wordFile")+"]");
    	        }
    	    	this.filePath	= conf.get("wordsFile").toString();
    	        this.collector = collector;
    
    	}
    
    	@Override
    	public void nextTuple() {
    		if(completed){
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                }
                return;
            }
            String str;
            BufferedReader reader =new BufferedReader(fileReader);
            try{
                while((str = reader.readLine()) != null){
                	System.out.println("WordReader read"+ str);
                    this.collector.emit(new Values(str),str);
                    System.out.println("WordReader out"+ str);
                }
            }catch(Exception e){
                throw new RuntimeException("Error reading tuple",e);
            }finally{
                completed = true;
            }
    
    	}
    
    	@Override
    	public void declareOutputFields(OutputFieldsDeclarer declarer) {
    		declarer.declare(new Fields("line"));
    
    	}
    
    }



    完成后打包成storm.jar

    通过storm jar storm.jar mapstorm.StormMain /data/words.txt即可启动运行.ps:words.txt要分发到各Supervisor相应目录下。

    可以通过storm ui页面看到Topology中多了一条任务。

    如果要终止任务storm kill name即可,这里是storm kill wordCounterTopology



沪ICP备19023445号-2号
友情链接