2 Star 2 Fork 0

余二六呀 / mystorm

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
ClusterDoubleAndTripleBoltApp.java 4.82 KB
一键复制 编辑 原始数据 按行查看 历史
余二六呀 提交于 2018-11-13 09:50 . 在生产上运行我们的应用
/**
* Copyright (C), 2015-2018, XXX有限公司
* FileName: DoubleAndTripleBoltApp
* Author: An-Il
* Date: 2018/11/9 9:19
* Description: 平方三次方
* History:
* <author> <time> <version> <desc>
* 作者姓名 修改时间 版本号 描述
*/
package com.blog.storm.example.cluster;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import java.util.Map;
/**
* 在生产上面运行我们本地开发的代码,使用StormSubmitter将拓扑提交到集群
* 可以参考此url
* @url http://storm.apache.org/releases/1.2.2/Running-topologies-on-a-production-cluster.html
*/
public class ClusterDoubleAndTripleBoltApp {
/**
* 数据源参数的Spout
*/
public static class DataSourceSpout extends BaseRichSpout {
SpoutOutputCollector collector;
Integer val = 1;
/**
* 初始化
*
* @param map 配置参数
* @param topologyContext 上下文
* @param spoutOutputCollector 数据发射器
*/
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.collector = spoutOutputCollector;
}
/**
* 此方法会死循环
*/
@Override
public void nextTuple() {
System.out.println("send val:" + val);
this.collector.emit(new Values(val++));
Utils.sleep(3000);
}
/**
* 声明输出字段
*
* @param outputFieldsDeclarer
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("val"));
}
}
/**
* 接受数据并处理
*/
public static class DoubleAndTripleBolt extends BaseRichBolt {
/**
* 初始化
*
* @param stormConf 配置参数
* @param context 上下文
* @param collector 数据发射器
*/
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
}
@Override
public void execute(Tuple input) {
/**
* 这里通过上面定义的Fields进行获取
* 也可以通过下标获取:input.getInteger(0)
*/
Integer _val = input.getIntegerByField("val");
System.err.println(_val + "*2=" + _val * 2);
System.err.println(_val + "*3=" + _val * 3);
//TODO:可以继续发送数据到下流
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
/**
* 构建本地Topology
*
* @param args
* @url http://storm.apache.org/releases/1.2.2/Local-mode.html
* http://storm.apache.org/releases/1.2.2/Running-topologies-on-a-production-cluster.html
*/
public static void main(String[] args) {
/**
* 定义拓扑,在生产群集上运行拓扑与在本地模式下运行类似
* Topology需要指定相关的Spout和Bolt的执行顺序
* shuffleGrouping("dataSourceSpout"):Tuples以一种随机分布方式在Bolt的任务中,每个Bolt都保证获得相同数量的Tuples
*/
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("dataSourceSpout", new DataSourceSpout());
builder.setBolt("doubleAndTripleBolt", new DoubleAndTripleBolt()).shuffleGrouping("dataSourceSpout");
StormTopology topology = builder.createTopology();
/**
* 提交到集群上运行
* @url http://storm.apache.org/releases/1.2.2/Running-topologies-on-a-production-cluster.html
*/
String name = ClusterDoubleAndTripleBoltApp.class.getSimpleName();
Config conf = new Config();
try {
StormSubmitter.submitTopology(name, conf, topology);
} catch (Exception e) {
e.printStackTrace();
}
}
}
Java
1
https://gitee.com/yy1122/mystorm.git
git@gitee.com:yy1122/mystorm.git
yy1122
mystorm
mystorm
master

搜索帮助