Storm 是一个开源的、大数据处理系统,与其他系统不同,它旨在用于分布式实时处理且与语言无关。了解更多请自己google,安装过程也请自己搜索。
做了一个简单的例子
package mapstorm; import backtype.storm.Config; import backtype.storm.StormSubmitter; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; public class StormMain { public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("word-reader", new WordReader()); builder.setBolt("word-normalizer", new WordNormalizer()).shuffleGrouping("word-reader"); builder.setBolt("word-counter", new WordCounter(), 1).fieldsGrouping("word-normalizer", new Fields("word")); //Configuration Config conf = new Config(); conf.put("wordsFile", args[0]); conf.setDebug(true); //conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1); StormSubmitter.submitTopology("wordCounterTopology", conf, builder.createTopology()); // Thread.sleep(1000); //StormSubmitter.("wordCounterTopology"); // StormSubmitter.shutdown(); //Topology run //conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1); //LocalCluster cluster = new LocalCluster(); //cluster.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology()); //Thread.sleep(2000); //cluster.shutdown(); // } }
package mapstorm; import java.util.HashMap; import java.util.Map; import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Tuple; public class WordCounter extends BaseBasicBolt { private static final long serialVersionUID = 5678586644899822142L; Integer id; String name; Mapcounters; @Override public void execute(Tuple input, BasicOutputCollector collector) { String str = input.getString(0); System.out.println("WordCounter word "+ str); if(!counters.containsKey(str)){ counters.put(str, 1); }else{ Integer c = counters.get(str) + 1; counters.put(str, c); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) {} @Override public void cleanup() { System.out.println("-- Word Counter ["+name+"-"+id+"] --"); for(Map.Entry entry : counters.entrySet()){ System.out.println(entry.getKey()+": "+entry.getValue()); } System.out.println("finish-----------"); } @Override public void prepare(Map stormConf, TopologyContext context) { this.counters = new HashMap (); this.name = context.getThisComponentId(); this.id = context.getThisTaskId(); } }
package mapstorm; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class WordNormalizer extends BaseBasicBolt { public void cleanup() { System.out.println("finish"); } @Override public void execute(Tuple input, BasicOutputCollector collector) { String sentence = input.getString(0); String[] words = sentence.split(" "); System.out.println("WordNormalizer recevie "+ sentence); for(String word : words){ word = word.trim(); if(!word.isEmpty()){ word = word.toLowerCase(); System.out.println("WordNormalizer recevie "+ sentence+"words "+ word); collector.emit(new Values(word)); } } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }
package mapstorm; import java.io.BufferedReader; import java.io.FileNotFoundException; import java.io.FileReader; import java.util.Map; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; public class WordReader extends BaseRichSpout { private SpoutOutputCollector collector; private FileReader fileReader; private String filePath; private boolean completed = false; public void ack(Object msgId) { System.out.println("OK:"+msgId); } public void close() {} public void fail(Object msgId) { System.out.println("FAIL:"+msgId); } @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { try { this.fileReader = new FileReader(conf.get("wordsFile").toString()); } catch (FileNotFoundException e) { throw new RuntimeException("Error reading file ["+conf.get("wordFile")+"]"); } this.filePath = conf.get("wordsFile").toString(); this.collector = collector; } @Override public void nextTuple() { if(completed){ try { Thread.sleep(1000); } catch (InterruptedException e) { } return; } String str; BufferedReader reader =new BufferedReader(fileReader); try{ while((str = reader.readLine()) != null){ System.out.println("WordReader read"+ str); this.collector.emit(new Values(str),str); System.out.println("WordReader out"+ str); } }catch(Exception e){ throw new RuntimeException("Error reading tuple",e); }finally{ completed = true; } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("line")); } }
完成后打包成storm.jar
通过storm jar storm.jar mapstorm.StormMain /data/words.txt即可启动运行.ps:words.txt要分发到各Supervisor相应目录下。
可以通过storm ui页面看到Topology中多了一条任务。
如果要终止任务storm kill name即可,这里是storm kill wordCounterTopology