Storm 集群搭建及编写WordCount
moboyou 2025-07-06 14:13 6 浏览
准备工作
1. 下载zookeeper-3.4.7
2. 下载Storm apache-storm-0.9.3
3. 安装JDK 1.7
注:
Storm0.9之前的版本,底层通讯用的是ZeroMQ,所以在安装0.9之前的版本需要安装0MQ,但是在0.9之后 我们直接安装就可以了。
因为在storm被移交到apache之后,这块用java的NIO矿建Netty代替了。
首先建立hadoop用户,我把和大数据相关的框架和软件都放在hadoop用户中。
安装ZK
1. 登陆到10.10.113.41并解压tar包
2. 建立zookeeper的data目录,
/home/hadoop/zookeeper/data
mkdir -p /home/hadoop/zookeeper/data
3. 建立zk集群的myid文件 (单机版可以跳过该步)
cd /home/hadoop/zookeeper/data
echo 1 > myid
4. 拷贝zookeeper的conf/zoo_sample.cfg并重命名为zoo.cfg,修改如下:
dataDir=/home/hadoop/zookeeper/data
server.1=10.10.113.41:2888:3888
server.2=10.10.113.42:2888:3888
server.3=10.10.113.43:2888:3888
dataDir是配置zk的数据目录的
server.A=B:C:D是集群zk使用的。如果你只想用单个zk,可以不配置。
A - 是一个数字,表示这是第几号服务器。与/var/tmp/zkdata下的myid文件内容一致
B - 是该服务器的IP地址
C - 表示该服务器与集群中的Leader服务器交换信息的端口
D - 表示如果万一集群中的Leader服务器挂了,需要各服务器重新选举时所用的通讯端口
5. (Optional)将zk的bin目录路径加入环境变量
修改/etc/profile文件,在尾部添加如下:
#zookeeper
export ZOOKEEPER==/home/hadoop/zookeeper
PATH=$PATH:$ZOOKEEPER/bin
6. 启动zk
zkServer.sh start
在剩下两台机器重复以上步骤,注意myid要对应
6.查看zk的运行状态
zkServer.sh status
安装Storm
1. 解压tar包并赋予执行权限
2. 将Storm的bin目录加入系统路径
修改/etc/profile文件,在尾部加入如下:
PATH=$PATH:/home/hadoop/storm
使其生效
3. 创建一个Storm的本地数据目录
mkdir -p /home/hadoop/storm/data
以上步骤在Storm的集群上的其他机器上重复执行,然后进行配置:
a. 配置storm.yaml
修改storm的conf/storm.yaml文件如下:
storm.zookeeper.servers: #zk地址
- "10.10.113.41"
- "10.10.113.42"
- "10.10.113.43"
nimbus.host: "10.10.113.41" #master 节点地址
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
storm.local.dir: "/home/hadoop/storm/data" #数据存放地址
注意:
在每个配置项前面必须留有空格,否则会无法识别。
启动集群
1. 启动nimbus
在nimbus机器的Storm的bin目录下执行
nohup bin/storm nimbus >/dev/null 2>&1 & #启动主节点
nohup bin/storm ui >/dev/null 2>&1 & #启动stormUI
nohup bin/storm logviewer >/dev/null 2>&1 & #启动logviewer 功能
2. 启动supervisor
在supervisor机器的Storm的bin目录下执行,所有supervisor节点都使用如下命令
nohup bin/storm supervisor >/dev/null 2>&1 &
nohup bin/storm logviewer >/dev/null 2>&1 &
3. 检查
打开Storm UI 页面。
http://10.10.113.41:8080/index.html
默认是启在8080端口上,如果你想改成其他的,如8089,直接修改nimbus的storm.yaml文件,添加
ui.port=8089
部署程序
1. 这里我使用 Intellij IDEA + maven来开发一个wordcount的Demo
2. 添加maven依赖
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>0.9.3<
1. 这里我使用 Intellij IDEA + maven来开发一个wordcount的Demo部署程序
2. 添加maven依赖
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>0.9.3</version>
</dependency>
3. 新建项目,编写程序
package cn.oraclers.storm;
import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
public class WordCount {
public static class SpoutSource extends BaseRichSpout {
Map map;
TopologyContext topologyContext;
SpoutOutputCollector spoutOutputCollector;
Random random;
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
map = map;
topologyContext = topologyContext;
spoutOutputCollector = spoutOutputCollector;
random = random;
}
String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",
"four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
@Override
public void nextTuple() {
Utils.sleep(1000);
for (String sentence:sentences){
spoutOutputCollector.emit(new Values(sentence));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("sentence"));
}
}
public static class SplitBoltSource extends BaseRichBolt{
Map map;
TopologyContext topologyContext;
OutputCollector outputCollector;
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
map = map;
topologyContext = topologyContext;
outputCollector = outputCollector;
}
@Override
public void execute(Tuple tuple) {
String sentence = tuple.getStringByField("sentence");
String[] words = sentence.split(" ");
for (String word:words){
this.outputCollector.emit(new Values(word));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("word"));
}
}
public static class SumBoltSource extends BaseRichBolt{
Map map;
TopologyContext topologyContext;
OutputCollector outputCollector;
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.map = map;
this.topologyContext = topologyContext;
this.outputCollector = outputCollector;
}
Map<String,Integer> mapCount = new HashMap<String, Integer>();
@Override
public void execute(Tuple tuple) {
String word = tuple.getStringByField("word");
Integer count = mapCount.get(word);
if(count == null){
count=0;
}
count++;
mapCount.put(word,count);
outputCollector.emit(new Values(word,count));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("word", "count"));
}
}
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("data_source",new SpoutSource());
builder.setBolt("bolt_split",new SplitBoltSource()).shuffleGrouping("data_source");
builder.setBolt("bolt_sum",new SplitBoltSource()).fieldsGrouping("bolt_split",new Fields("word"));
try {
Config stormConf = new Config();
stormConf.setDebug(true);
StormSubmitter.submitTopology("Clustertopology", stormConf,builder.createTopology());
} catch (AlreadyAliveException e) {
e.printStackTrace();
} catch (InvalidTopologyException e) {
e.printStackTrace();
}
}
}
4. 打包部署topology
./storm jar storm jar sd-1.0-SNAPSHOT.jar cn.oraclers.storm.WordCount
5. 查看结果
两种方式,
a. 查看StormUI
注意:一定不要使用IE内核的浏览器,否则看不到Topology Summary 下面的东西!!!
b. storm的bin目录下运行
Topology_name Status Num_tasks Num_workers Uptime_secs
-------------------------------------------------------------------
test ACTIVE 28 3 5254
Clustertopology ACTIVE 4 1 83
mytopo ACTIVE 6 3 555
6. 关闭topology
a. StormUI上面点选要关闭的topology,如test,然后在新页面的Topology actions中选kill
b. 运行./storm kill test
相关推荐
- C#与Docker完美结合:容器化部署实战,让你的应用秒级上线!
-
在当今快速迭代的软件开发环境中,高效的部署流程对于产品的成功至关重要。容器化技术,尤其是Docker,已成为实现快速、可靠部署的首选方案。对于C#开发者而言,将C#应用与Docker相结合,能够显著提...
- 我找到了最适合NAS的记账应用,开源自托管,适合国人的记账方式
-
「亲爱的粉丝朋友们好啊!今天熊猫又来介绍好玩有趣的Docker项目了,喜欢的记得点个关注哦!」引言其实记账软件熊猫之前也发过几个,不过使用起来都不是很理想,要么界面设计不太好看,要么就是项目过于复杂了...
- 手搓各种软件!手把手教学!(如何搓手)
-
shocked!太炸裂了!大家看下这个星标就知道了,youdefinitelyneedtoputittoyourfavorites!这是GitHub上排名第二的开源项目,它能手把手教你...
- 看了《碟中谍8》之后,才发现特工们的黑科技我们NAS用户也能拥有
-
本内容来源于@什么值得买APP,观点仅代表作者本人|作者:Stark-C#头条兴趣联欢会#哈喽小伙伴们好,我是Stark-C~最近《碟中谍8:最终清算》正在热播,作为碟中谍的老粉,我前几天带着家人...
- NAS原来这么有用:利用docker 一键部署mstream私人云音乐
-
本内容来源于@什么值得买APP,观点仅代表作者本人|作者:熊猫不是猫QAQ前言你是否面临以下困扰:曾经想听以前某首音乐,但在网上已经找不到了;即使找到了,因版权原因无法在线听,甚至无法下载;有些曲子...
- 一见钟情!这就是你在寻找的Docker界面!优雅而不简单—Arcane
-
「亲爱的粉丝朋友们好啊!今天熊猫又来介绍好玩有趣的Docker项目了,喜欢的记得点个关注哦!」引言要说DockerUI项目,知名的其实就那么几个,目前感觉备用的比较多的还是老牌的portainer,...
- 大神级产品:手机装 Linux 运行 Docker 如此简单
-
本内容来源于@什么值得买APP,观点仅代表作者本人|作者:灵昱Termux作为一个强大的Android终端模拟器,能够运行多种Linux环境。然而,直接在Termux上运行Docker并不可行,需要...
- 在 Docker 中运行 Mac OS 是什么样的体验
-
大家好,我是你们的章鱼猫。Docker是一个开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个可移植的容器中,然后发布到任何流行的Linux机器或Windows机器上,也可以实现...
- Docker 容器的 5 个实践案例(每天5分钟玩转docker容器技术)
-
Docker是一个开源平台,可以轻松地为任何应用创建一个轻量级的、可移植的、自给自足的容器。大多数Docker容器的核心是在虚拟化环境中运行的轻量级Linux服务器。DockerLinu...
- 使用 Docker Model Runner 在本地构建 GenAI 应用程序
-
想要在本地运行大型语言模型(LLM)?以下是在您自己的桌面上设置DockerModelRunner并访问LLM的方法。DockerModelRunner是DockerDeskt...
- docker部署一个证件照工具HivisionIDPhotos,非常好用!
-
本内容来源于@什么值得买APP,观点仅代表作者本人|作者:略懂的大龙猫今天给大家推荐一个很好玩的开源应用HivisionIDPhotos。这个工具原理是利用AI模型对照片进行一系列处理:智能抠...
- 好好看,好好学!Docker玩法深度教学,小白也能轻松上手
-
Docker对于大多数NAS玩家来说想必都非常熟悉,即便没用过,也应该接触过相关的一些内容。就我个人来说,对于评价一台NAS是否好用,Docker功能占据了不小的比重。8月份终于是又更新了一个大版本,...
- 开源&Docker:自动录制小姐姐,docker部署各平台的自动录制工具
-
本内容来源于@什么值得买APP,观点仅代表作者本人|作者:可爱的小cherry前言大家好,这里是可爱的Cherry。今天给大家分享一下小姐姐们录屏工具,配置完成以后可以自动监听直播并实现自动录播。支...
- 目前发现的一些有趣的docker容器—第十一弹
-
前言本期又是整理汇总的文章,依然是针对NAS下可搭建的一些docker容器的汇总,涉及可能会有介绍过的容器和没介绍过但不太好用的容器,感兴趣的可以翻翻之前的文章哦!!!也欢迎留言你发现的一些有趣的或者...
- 数人科技王璞:Docker与Mesos的结合应用
-
2015年4月16-18日,由CSDN主办、CSDN专家顾问团支持的OpenCloud2015大会将在北京国家会议中心拉开帷幕。为期三天的大会,以推进行业应用中的云计算核心技术发展为主旨,聚焦技术...
- 一周热门
- 最近发表
-
- C#与Docker完美结合:容器化部署实战,让你的应用秒级上线!
- 我找到了最适合NAS的记账应用,开源自托管,适合国人的记账方式
- 手搓各种软件!手把手教学!(如何搓手)
- 看了《碟中谍8》之后,才发现特工们的黑科技我们NAS用户也能拥有
- NAS原来这么有用:利用docker 一键部署mstream私人云音乐
- 一见钟情!这就是你在寻找的Docker界面!优雅而不简单—Arcane
- 大神级产品:手机装 Linux 运行 Docker 如此简单
- 在 Docker 中运行 Mac OS 是什么样的体验
- Docker 容器的 5 个实践案例(每天5分钟玩转docker容器技术)
- 使用 Docker Model Runner 在本地构建 GenAI 应用程序
- 标签列表
-
- 外键约束 oracle (36)
- oracle的row number (32)
- 唯一索引 oracle (34)
- oracle in 表变量 (28)
- oracle导出dmp导出 (28)
- oracle两个表 (20)
- oracle 数据库 字符集 (20)
- oracle安装补丁 (19)
- matlab化简多项式 (20)
- 多线程的创建方式 (29)
- 多线程 python (30)
- java多线程并发处理 (32)
- 宏程序代码一览表 (35)
- c++需要学多久 (25)
- css class选择器用法 (25)
- css样式引入 (30)
- html5和css3新特性 (19)
- css教程文字移动 (33)
- php简单源码 (36)
- php个人中心源码 (25)
- 网站管理平台php源码 (19)
- php小说爬取源码 (23)
- 云电脑app源码 (22)
- html画折线图 (24)
- docker好玩的应用 (28)