storm-simpleTest
文章内索引
[显示]
Storm分享">Storm分享
部署结构
模块结构(Topology)
数据模型tuple
在Spout和Bolt中传递的是由tuple组成的Stream
A tuple is a list of values, and a field in a tuple can be an object of any type. Out of the box, Storm supports all the primitive types, strings, and byte arrays as tuple field values. To use an object of another type, you just need to implement a serializer for the type.
Tuple是一个赋值完的List。不仅支持基础类型,String和byte[],也可以是用户定义的实现了serializer接口的类型。
代码实现
Spout代码(入口)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
public class RandomSentenceSpout extends BaseRichSpout { SpoutOutputCollector _collector; Random _rand; public void nextTuple() { Utils.sleep(100); String[] sentences = new String[] { "the cow jumped over the moon", "an apple a day keeps the doctor away", "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" }; String sentence = sentences[_rand.nextInt(sentences.length)]; _collector.emit(new Values(sentence)); } @Override public void ack(Object id) { } @Override public void fail(Object id) { } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } } |
Bolt代码(处理类)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
public static class WordCount extends BaseBasicBolt { Map<String, Integer> counts = new HashMap<String, Integer>(); public void execute(Tuple tuple, BasicOutputCollector collector) { String word = tuple.getString(0); Integer count = counts.get(word); if (count == null) count = 0; count++; counts.put(word, count); collector.emit(new Values(word, count)); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } } |
Topology 代码(流式处理逻辑)
1 2 3 4 5 6 7 8 |
public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new RandomSentenceSpout(), 5); builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping( "spout"); builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split" ); } |
©版权声明:本文为【翰林小院】(huhanlin.com)原创文章,转载时请注明出处!
发表评论