storm的本地模式demo怎么实现
admin
2023-02-21 01:00:07
0

SimpleTopology.java

package com.zgl.helloword;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;

/**
 * 定义了一个简单的topology,包括一个数据喷发节点spout和一个数据处理节点bolt。
 * 
 * @author Administrator
 *
 */
public class SimpleTopology {
    public static void main(String[] args) {
        try {
            // 实例化TopologyBuilder类。
            TopologyBuilder topologyBuilder = new TopologyBuilder();
            // 设置喷发节点并分配并发数,该并发数将会控制该对象在集群中的线程数。
            topologyBuilder.setSpout("SimpleSpout", new SimpleSpout(), 1);
            // 设置数据处理节点并分配并发数。指定该节点接收喷发节点的策略为随机方式。
            topologyBuilder.setBolt("SimpleBolt", new SimpleBolt(), 3).shuffleGrouping("SimpleSpout");
            Config config = new Config();
            config.setDebug(false);
            if (args != null && args.length > 0) {
                config.setNumWorkers(1);
                StormSubmitter.submitTopology(args[0], config, topologyBuilder.createTopology());
            } else {
                // 这里是本地模式下运行的启动代码。
                config.setMaxTaskParallelism(1);
                LocalCluster cluster = new LocalCluster();
                cluster.submitTopology("simple", config, topologyBuilder.createTopology());
            }
            
        } catch (Exception e) {
            e.printStackTrace(); 
        }
    }
}

SimpleSpout.java

package com.zgl.helloword;

import java.util.Map;
import java.util.Random;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

/**
 * Spout起到和外界沟通的作用,他可以从一个数据库中按照某种规则取数据,也可以从分布式队列中取任务
 * 
 * @author Administrator
 *
 */

public class SimpleSpout extends BaseRichSpout{
    /**
     * 
     */
    private static final long serialVersionUID = 1L;
    //用来发射数据的工具类
    private SpoutOutputCollector collector;
    private static String[] info = new String[]{
        "comaple\t,12424,44w46,654,12424,44w46,654,",
        "lisi\t,435435,6537,12424,44w46,654,",
        "lipeng\t,45735,6757,12424,44w46,654,",
        "hujintao\t,45735,6757,12424,44w46,654,",
        "jiangmin\t,23545,6457,2455,7576,qr44453",
        "beijing\t,435435,6537,12424,44w46,654,",
        "xiaoming\t,46654,8579,w3675,85877,077998,",
        "xiaozhang\t,9789,788,97978,656,345235,09889,",
        "ceo\t,46654,8579,w3675,85877,077998,",
        "cto\t,46654,8579,w3675,85877,077998,",
        "zhansan\t,46654,8579,w3675,85877,077998,"};
    
    Random random=new Random();
    
    /**
     * 初始化collector
     */
    @SuppressWarnings("rawtypes")
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
    }
    
    /**
     * 在SpoutTracker类中被调用,每调用一次就可以向storm集群中发射一条数据(一个tuple元组),该方法会被不停的调用
     */
    
    public void nextTuple() {
        try {
            String msg = info[random.nextInt(11)];
            // 调用发射方法
            collector.emit(new Values(msg));
            // 模拟等待100ms
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 定义字段id,该id在简单模式下没有用处,但在按照字段分组的模式下有很大的用处。
     * 该declarer变量有很大作用,我们还可以调用declarer.declareStream();来定义stramId,该id可以用来定义更加复杂的流拓扑结构
     */
   
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("source")); //collector.emit(new Values(msg));参数要对应
    }

}

SimpleBolt.java

package com.zgl.helloword;

import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

/**
 * 接收喷发节点(Spout)发送的数据进行简单的处理后,发射出去。
 * 
 * @author Administrator
 * 
 */
@SuppressWarnings("serial")
public class SimpleBolt extends BaseBasicBolt {

    public void execute(Tuple input, BasicOutputCollector collector) {
        try {
            String msg = input.getString(0);
            if (msg != null){
                System.out.println("msg="+msg);
                collector.emit(new Values(msg + "msg is processed!"));
            }
                
        } catch (Exception e) {
            e.printStackTrace(); 
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("info"));
    }

}

pom.xml


    4.0.0

    strom-zgl
    storm-zgl
    0.0.1-SNAPSHOT
    jar

    storm-zgl
    http://maven.apache.org

    
        UTF-8
    

    
        
            junit
            junit
            3.8.1
            test
        
        
            org.apache.storm
            storm-core
            0.9.1-incubating
        
    

相关内容

热门资讯

萧美琴称年花6000元新台币保... 海峡导报综合报道 岛内朝野对8年1.25万亿元(新台币,下同)的“防务特别条例”仍无共识,台湾地区副...
巴基斯坦消息人士:美伊谈判似已... 巴基斯坦权威消息人士6日表示,美国和伊朗通过巴基斯坦进行的谈判似已显现希望,“低调的谈判有望转化为切...
雷暴大风、短时强降水、局地冰雹... 今天上午我省多地晴朗在线、升温持续,10点京广线及以东地区升至25-27℃,预计午后最高气温除了西部...
无障碍阅读“建设蓝图”出炉!多... 【大河财立方消息】 5月6日消息,中国残联、教育部、文化和旅游部、国家新闻出版署、共青团中央、全国妇...
蚂蚁集团等入股大晓机器人 【大河财立方消息】天眼查App显示,近日,大晓机器人关联公司上海大晓无限机器人有限公司发生工商变更,...
南航扭亏的AB面:物流压舱、子... 【大河财立方 记者 陈诗昂】4月30日,南方航空举行2026年一季度业绩发布会。一季报显示,南航营收...
“五一”超800场好戏燃动全城... “五一” 假期落幕,全国文旅市场持续火爆、亮点频现。山东临沂琅琊古城凭借沉浸式国风演艺、全时段互动体...
Siri升级延期,苹果或赔17... 来源:市场资讯 (来源:界面新闻) 据多家外媒报道,当地时间5月5日,美国苹果公司就一起集体诉讼达成...
时代电气获得发明专利授权:“列... 证券之星消息,根据天眼查APP数据显示时代电气(688187)新获得一项发明专利授权,专利名为“列车...
高市早苗在澳大利亚下跪献花,网... 据日本外务省网站消息,当地时间5月4日,日本首相高市早苗对澳大利亚进行访问时,前往位于堪培拉的澳大利...