Storm 集群搭建及编写WordCount
moboyou 2025-07-06 14:13 17 浏览
准备工作
1. 下载zookeeper-3.4.7
2. 下载Storm apache-storm-0.9.3
3. 安装JDK 1.7
注:
Storm0.9之前的版本,底层通讯用的是ZeroMQ,所以在安装0.9之前的版本需要安装0MQ,但是在0.9之后 我们直接安装就可以了。
因为在storm被移交到apache之后,这块用java的NIO矿建Netty代替了。
首先建立hadoop用户,我把和大数据相关的框架和软件都放在hadoop用户中。
安装ZK
1. 登陆到10.10.113.41并解压tar包
2. 建立zookeeper的data目录,
/home/hadoop/zookeeper/data
mkdir -p /home/hadoop/zookeeper/data
3. 建立zk集群的myid文件 (单机版可以跳过该步)
cd /home/hadoop/zookeeper/data
echo 1 > myid
4. 拷贝zookeeper的conf/zoo_sample.cfg并重命名为zoo.cfg,修改如下:
dataDir=/home/hadoop/zookeeper/data
server.1=10.10.113.41:2888:3888
server.2=10.10.113.42:2888:3888
server.3=10.10.113.43:2888:3888
dataDir是配置zk的数据目录的
server.A=B:C:D是集群zk使用的。如果你只想用单个zk,可以不配置。
A - 是一个数字,表示这是第几号服务器。与/var/tmp/zkdata下的myid文件内容一致
B - 是该服务器的IP地址
C - 表示该服务器与集群中的Leader服务器交换信息的端口
D - 表示如果万一集群中的Leader服务器挂了,需要各服务器重新选举时所用的通讯端口
5. (Optional)将zk的bin目录路径加入环境变量
修改/etc/profile文件,在尾部添加如下:
#zookeeper
export ZOOKEEPER==/home/hadoop/zookeeper
PATH=$PATH:$ZOOKEEPER/bin
6. 启动zk
zkServer.sh start
在剩下两台机器重复以上步骤,注意myid要对应
6.查看zk的运行状态
zkServer.sh status
安装Storm
1. 解压tar包并赋予执行权限
2. 将Storm的bin目录加入系统路径
修改/etc/profile文件,在尾部加入如下:
PATH=$PATH:/home/hadoop/storm
使其生效
3. 创建一个Storm的本地数据目录
mkdir -p /home/hadoop/storm/data
以上步骤在Storm的集群上的其他机器上重复执行,然后进行配置:
a. 配置storm.yaml
修改storm的conf/storm.yaml文件如下:
storm.zookeeper.servers: #zk地址
- "10.10.113.41"
- "10.10.113.42"
- "10.10.113.43"
nimbus.host: "10.10.113.41" #master 节点地址
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
storm.local.dir: "/home/hadoop/storm/data" #数据存放地址
注意:
在每个配置项前面必须留有空格,否则会无法识别。
启动集群
1. 启动nimbus
在nimbus机器的Storm的bin目录下执行
nohup bin/storm nimbus >/dev/null 2>&1 & #启动主节点
nohup bin/storm ui >/dev/null 2>&1 & #启动stormUI
nohup bin/storm logviewer >/dev/null 2>&1 & #启动logviewer 功能
2. 启动supervisor
在supervisor机器的Storm的bin目录下执行,所有supervisor节点都使用如下命令
nohup bin/storm supervisor >/dev/null 2>&1 &
nohup bin/storm logviewer >/dev/null 2>&1 &
3. 检查
打开Storm UI 页面。
http://10.10.113.41:8080/index.html
默认是启在8080端口上,如果你想改成其他的,如8089,直接修改nimbus的storm.yaml文件,添加
ui.port=8089
部署程序
1. 这里我使用 Intellij IDEA + maven来开发一个wordcount的Demo
2. 添加maven依赖
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>0.9.3<
1. 这里我使用 Intellij IDEA + maven来开发一个wordcount的Demo部署程序
2. 添加maven依赖
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>0.9.3</version>
</dependency>
3. 新建项目,编写程序
package cn.oraclers.storm;
import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
public class WordCount {
public static class SpoutSource extends BaseRichSpout {
Map map;
TopologyContext topologyContext;
SpoutOutputCollector spoutOutputCollector;
Random random;
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
map = map;
topologyContext = topologyContext;
spoutOutputCollector = spoutOutputCollector;
random = random;
}
String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",
"four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
@Override
public void nextTuple() {
Utils.sleep(1000);
for (String sentence:sentences){
spoutOutputCollector.emit(new Values(sentence));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("sentence"));
}
}
public static class SplitBoltSource extends BaseRichBolt{
Map map;
TopologyContext topologyContext;
OutputCollector outputCollector;
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
map = map;
topologyContext = topologyContext;
outputCollector = outputCollector;
}
@Override
public void execute(Tuple tuple) {
String sentence = tuple.getStringByField("sentence");
String[] words = sentence.split(" ");
for (String word:words){
this.outputCollector.emit(new Values(word));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("word"));
}
}
public static class SumBoltSource extends BaseRichBolt{
Map map;
TopologyContext topologyContext;
OutputCollector outputCollector;
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.map = map;
this.topologyContext = topologyContext;
this.outputCollector = outputCollector;
}
Map<String,Integer> mapCount = new HashMap<String, Integer>();
@Override
public void execute(Tuple tuple) {
String word = tuple.getStringByField("word");
Integer count = mapCount.get(word);
if(count == null){
count=0;
}
count++;
mapCount.put(word,count);
outputCollector.emit(new Values(word,count));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("word", "count"));
}
}
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("data_source",new SpoutSource());
builder.setBolt("bolt_split",new SplitBoltSource()).shuffleGrouping("data_source");
builder.setBolt("bolt_sum",new SplitBoltSource()).fieldsGrouping("bolt_split",new Fields("word"));
try {
Config stormConf = new Config();
stormConf.setDebug(true);
StormSubmitter.submitTopology("Clustertopology", stormConf,builder.createTopology());
} catch (AlreadyAliveException e) {
e.printStackTrace();
} catch (InvalidTopologyException e) {
e.printStackTrace();
}
}
}
4. 打包部署topology
./storm jar storm jar sd-1.0-SNAPSHOT.jar cn.oraclers.storm.WordCount
5. 查看结果
两种方式,
a. 查看StormUI
注意:一定不要使用IE内核的浏览器,否则看不到Topology Summary 下面的东西!!!
b. storm的bin目录下运行
Topology_name Status Num_tasks Num_workers Uptime_secs
-------------------------------------------------------------------
test ACTIVE 28 3 5254
Clustertopology ACTIVE 4 1 83
mytopo ACTIVE 6 3 555
6. 关闭topology
a. StormUI上面点选要关闭的topology,如test,然后在新页面的Topology actions中选kill
b. 运行./storm kill test
相关推荐
- Excel技巧:SHEETSNA函数一键提取所有工作表名称批量生产目录
-
首先介绍一下此函数:SHEETSNAME函数用于获取工作表的名称,有三个可选参数。语法:=SHEETSNAME([参照区域],[结果方向],[工作表范围])(参照区域,可选。给出参照,只返回参照单元格...
- Excel HOUR函数:“小时”提取器_excel+hour函数提取器怎么用
-
一、函数概述HOUR函数是Excel中用于提取时间值小时部分的日期时间函数,返回0(12:00AM)到23(11:00PM)之间的整数。该函数在时间数据分析、考勤统计、日程安排等场景中应用广泛。语...
- Filter+Search信息管理不再难|多条件|模糊查找|Excel函数应用
-
原创版权所有介绍一个信息管理系统,要求可以实现:多条件、模糊查找,手动输入的内容能去空格。先看效果,如下图动画演示这样的一个效果要怎样实现呢?本文所用函数有Filter和Search。先用filter...
- FILTER函数介绍及经典用法12:FILTER+切片器的应用
-
EXCEL函数技巧:FILTER经典用法12。FILTER+切片器制作筛选按钮。FILTER的函数的经典用法12是用FILTER的函数和切片器制作一个筛选按钮。像左边的原始数据,右边想要制作一...
- office办公应用网站推荐_office办公软件大全
-
以下是针对Office办公应用(Word/Excel/PPT等)的免费学习网站推荐,涵盖官方教程、综合平台及垂直领域资源,适合不同学习需求:一、官方权威资源1.微软Office官方培训...
- WPS/Excel职场办公最常用的60个函数大全(含卡片),效率翻倍!
-
办公最常用的60个函数大全:从入门到精通,效率翻倍!在职场中,WPS/Excel几乎是每个人都离不开的工具,而函数则是其灵魂。掌握常用的函数,不仅能大幅提升工作效率,还能让你在数据处理、报表分析、自动...
- 收藏|查找神器Xlookup全集|一篇就够|Excel函数|图解教程
-
原创版权所有全程图解,方便阅读,内容比较多,请先收藏!Xlookup是Vlookup的升级函数,解决了Vlookup的所有缺点,可以完全取代Vlookup,学完本文后你将可以应对所有的查找难题,内容...
- 批量查询快递总耗时?用Excel这个公式,自动计算揽收到签收天数
-
批量查询快递总耗时?用Excel这个公式,自动计算揽收到签收天数在电商运营、物流对账等工作中,经常需要统计快递“揽收到签收”的耗时——比如判断某快递公司是否符合“3天内送达”的服务承...
- Excel函数公式教程(490个实例详解)
-
Excel函数公式教程(490个实例详解)管理层的财务人员为什么那么厉害?就是因为他们精通excel技能!财务人员在日常工作中,经常会用到Excel财务函数公式,比如财务报表分析、工资核算、库存管理等...
- Excel(WPS表格)Tocol函数应用技巧案例解读,建议收藏备用!
-
工作中,经常需要从多个单元格区域中提取唯一值,如体育赛事报名信息中提取唯一的参赛者信息等,此时如果复制粘贴然后去重,效率就会很低。如果能合理利用Tocol函数,将会极大地提高工作效率。一、功能及语法结...
- Excel中的SCAN函数公式,把计算过程理清,你就会了
-
Excel新版本里面,除了出现非常好用的xlookup,Filter公式之外,还更新一批自定义函数,可以像写代码一样写公式其中SCAN函数公式,也非常强大,它是一个循环函数,今天来了解这个函数公式的计...
- Excel(WPS表格)中多列去重就用Tocol+Unique组合函数,简单高效
-
在数据的分析和处理中,“去重”一直是绕不开的话题,如果单列去重,可以使用Unique函数完成,如果多列去重,如下图:从数据信息中可以看到,每位参赛者参加了多项运动,如果想知道去重后的参赛者有多少人,该...
- Excel(WPS表格)函数Groupby,聚合统计,快速提高效率!
-
在前期的内容中,我们讲了很多的统计函数,如Sum系列、Average系列、Count系列、Rank系列等等……但如果用一个函数实现类似数据透视表的功能,就必须用Groupby函数,按指定字段进行聚合汇...
- Excel新版本,IFS函数公式,太强大了!
-
我们举一个工作实例,现在需要计算业务员的奖励数据,右边是公司的奖励标准:在新版本的函数公式出来之前,我们需要使用IF函数公式来解决1、IF函数公式IF函数公式由三个参数组成,IF(判断条件,对的时候返...
- Excel不用函数公式数据透视表,1秒完成多列项目汇总统计
-
如何将这里的多组数据进行汇总统计?每组数据当中一列是不同菜品,另一列就是该菜品的销售数量。如何进行汇总统计得到所有的菜品销售数量的求和、技术、平均、最大、最小值等数据?不用函数公式和数据透视表,一秒就...
- 一周热门
- 最近发表
-
- Excel技巧:SHEETSNA函数一键提取所有工作表名称批量生产目录
- Excel HOUR函数:“小时”提取器_excel+hour函数提取器怎么用
- Filter+Search信息管理不再难|多条件|模糊查找|Excel函数应用
- FILTER函数介绍及经典用法12:FILTER+切片器的应用
- office办公应用网站推荐_office办公软件大全
- WPS/Excel职场办公最常用的60个函数大全(含卡片),效率翻倍!
- 收藏|查找神器Xlookup全集|一篇就够|Excel函数|图解教程
- 批量查询快递总耗时?用Excel这个公式,自动计算揽收到签收天数
- Excel函数公式教程(490个实例详解)
- Excel(WPS表格)Tocol函数应用技巧案例解读,建议收藏备用!
- 标签列表
-
- 外键约束 oracle (36)
- oracle的row number (32)
- 唯一索引 oracle (34)
- oracle in 表变量 (28)
- oracle导出dmp导出 (28)
- 多线程的创建方式 (29)
- 多线程 python (30)
- java多线程并发处理 (32)
- 宏程序代码一览表 (35)
- c++需要学多久 (25)
- css class选择器用法 (25)
- css样式引入 (30)
- css教程文字移动 (33)
- php简单源码 (36)
- php个人中心源码 (25)
- php小说爬取源码 (23)
- 云电脑app源码 (22)
- html画折线图 (24)
- docker好玩的应用 (28)
- linux有没有pe工具 (34)
- 可以上传视频的网站源码 (25)
- 随机函数如何生成小数点数字 (31)
- 随机函数excel公式总和不变30个数据随机 (33)
- 所有excel函数公式大全讲解 (22)
- 有动图演示excel函数公式大全讲解 (32)
