storm編程指南

storm編程指南

@(博客文章)[storm|大數據]github

本文介紹了storm的基本編程,關於trident的編程,請見???編程

本示例使用storm運行經典的wordcount程序,拓撲以下:
sentence-spout—>split-bolt—>count-bolt—>report-bolt
分別完成句子的產生、拆分出單詞、單詞數量統計、統計結果輸出
完整代碼請見 https://github.com/jinhong-lu/stormdemo
如下是關鍵代碼的分析。數組

(一)建立spout

public class SentenceSpout extends BaseRichSpout {
    private SpoutOutputCollector collector;
    private int index = 0;
    private String[] sentences = { "when i was young i'd listen to the radio",
            "waiting for my favorite songs", "when they played i'd sing along",
            "it make me smile",
            "those were such happy times and not so long ago",
            "how i wondered where they'd gone",
            "but they're back again just like a long lost friend",
            "all the songs i love so well", "every shalala every wo'wo",
            "still shines.", "every shing-a-ling-a-ling",
            "that they're starting", "to sing so fine"};

    public void open(Map conf, TopologyContext context,
            SpoutOutputCollector collector) {
        this.collector = collector;
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("sentence"));
    }

    public void nextTuple() {
        this.collector.emit(new Values(sentences[index]));
        index++;
        if (index >= sentences.length) {
            index = 0;
        }
        try {
            Thread.sleep(1);
        } catch (InterruptedException e) {
            //e.printStackTrace();
        }
    }
}

上述類中,將string數組中內容逐行發送出去,主要的方法有:
(1)open()方法完成spout的初始化工做,與bolt的prepare()方法相似
(2)declareOutputFileds()定義了發送內容的字段名稱與字段數量,bolt中的方法名稱同樣。
(3)nextTuple()方法是對每個須要處理的數據均會執行的操做,也bolt的executor()方法相似。它是整個邏輯處理的核心,經過emit()方法將數據發送到拓撲中的下一個節點。app

(二)建立split-bolt

public class SplitSentenceBolt extends BaseRichBolt{
    private OutputCollector collector;

    public void prepare(Map stormConf, TopologyContext context,
            OutputCollector collector) {
        this.collector = collector;
    }
    
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }

    public void execute(Tuple input) {
        String sentence = input.getStringByField("sentence");
        String[] words = sentence.split(" ");
        for(String word : words){
            this.collector.emit(new Values(word));
            //System.out.println(word);
        }
    }
}

三個方法的含義與spout相似,這個類根據空格把收到的句子進行拆分,拆成一個一個的單詞,而後把單詞逐個發送出去。
input.getStringByField("sentence」)能夠根據上一節點發送的關鍵字獲取到相應的內容。eclipse

(三)建立wordcount-bolt

public class WordCountBolt extends BaseRichBolt{
    private OutputCollector collector;
    private Map<String,Long> counts = null;

    public void prepare(Map stormConf, TopologyContext context,
            OutputCollector collector) {
        this.collector = collector;
        this.counts = new HashMap<String, Long>();
    }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word","count"));
    }

    public void execute(Tuple input) {
        String word = input.getStringByField("word");
        Long count = this.counts.get(word);
        if(count == null){
            count = 0L;
        }
        count++;
        this.counts.put(word, count);
        this.collector.emit(new Values(word,count));
        //System.out.println(count);
    }
}

本類將接收到的word進行數量統計,並把結果發送出去。
這個bolt發送了2個filed:ide

declarer.declare(new Fields("word","count"));
        this.collector.emit(new Values(word,count));

(四)建立report-bolt

public class ReportBolt extends BaseRichBolt{
    private Map<String, Long> counts;

    public void prepare(Map stormConf, TopologyContext context,
            OutputCollector collector) {
        this.counts = new HashMap<String,Long>();
    }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        
    }

    public void execute(Tuple input) {
        String word = input.getStringByField("word");
        Long count = input.getLongByField("count");
        counts.put(word, count);
    }

    public void cleanup() {
        System.out.println("Final output");
        Iterator<Entry<String, Long>> iter = counts.entrySet().iterator();
        while (iter.hasNext()) {
            Entry<String, Long> entry = iter.next();
            String word = (String) entry.getKey();
            Long count = (Long) entry.getValue();
            System.out.println(word + " : " + count);
        }
        
        super.cleanup();
    }    
    
}

本類將從wordcount-bolt接收到的數據進行輸出。
先將結果放到一個map中,當topo被關閉時,會調用cleanup()方法,此時將map中的內容輸出。大數據

(五)建立topo

public class WordCountTopology {
    private static final String SENTENCE_SPOUT_ID = "sentence-spout";
    private static final String SPLIT_BOLT_ID = "split-bolt";
    private static final String COUNT_BOLT_ID = "count-bolt";
    private static final String REPORT_BOLT_ID = "report-bolt";
    private static final String TOPOLOGY_NAME = "word-count-topology";

    public static void main(String[] args) {
        SentenceSpout spout = new SentenceSpout();
        SplitSentenceBolt splitBolt = new SplitSentenceBolt();
        WordCountBolt countBolt = new WordCountBolt();
        ReportBolt reportBolt = new ReportBolt();

        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout(SENTENCE_SPOUT_ID, spout);
        builder.setBolt(SPLIT_BOLT_ID, splitBolt).shuffleGrouping(
                SENTENCE_SPOUT_ID);
        builder.setBolt(COUNT_BOLT_ID, countBolt).fieldsGrouping(SPLIT_BOLT_ID,
                new Fields("word"));
        builder.setBolt(REPORT_BOLT_ID, reportBolt).globalGrouping(
                COUNT_BOLT_ID);

        Config conf = new Config();

        if (args.length == 0) {
            LocalCluster cluster = new LocalCluster();

            cluster.submitTopology(TOPOLOGY_NAME, conf,
                    builder.createTopology());
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
            }
            cluster.killTopology(TOPOLOGY_NAME);
            cluster.shutdown();
        } else {
            try {
                StormSubmitter.submitTopology(args[0], conf,builder.createTopology());
            } catch (AlreadyAliveException e) {
                e.printStackTrace();
            } catch (InvalidTopologyException e) {
                e.printStackTrace();
            }

        }
    }
}

關鍵步驟爲:
(1)建立TopologyBuilder,併爲這個builder指定spout與boltui

builder.setSpout(SENTENCE_SPOUT_ID, spout);
        builder.setBolt(SPLIT_BOLT_ID, splitBolt).shuffleGrouping(
                SENTENCE_SPOUT_ID);
        builder.setBolt(COUNT_BOLT_ID, countBolt).fieldsGrouping(SPLIT_BOLT_ID,
                new Fields("word"));
        builder.setBolt(REPORT_BOLT_ID, reportBolt).globalGrouping(
                COUNT_BOLT_ID);

(2)建立conf對象this

Config conf = new Config();

這個對象用於指定一些與拓撲相關的屬性,如並行度、nimbus地址等。
(3)建立並運行拓撲,這裏使用了2種方式
一是當沒有參數時,創建一個localcluster,在本地上直接運行,運行10秒後,關閉集羣:

LocalCluster cluster = new LocalCluster();
cluster.submitTopology(TOPOLOGY_NAME, conf,builder.createTopology());
Thread.sleep(10000);
cluster.killTopology(TOPOLOGY_NAME);
cluster.shutdown();

二是有參數是,將拓撲提交到集羣中:

StormSubmitter.submitTopology(args[0], conf,builder.createTopology());

第一個參數爲拓撲的名稱。

六、本地運行
直接在eclipse中運行便可,輸出結果在console中看到

七、集羣運行
(1)編譯並打包

mvn clean compile

(2)把編譯好的jar包上傳到nimbus機器上,而後

storm jar com.ljh.storm.5_stormdemo  com.ljh.storm.wordcount.WordCountTopology  topology_name

將拓撲提交到集羣中。