博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Apache Storm技术实战之1 -- WordCountTopology
阅读量:6430 次
发布时间:2019-06-23

本文共 4219 字,大约阅读时间需要 14 分钟。

欢迎转载,转载请注意出处,徽沪一郎。

“源码走读系列”从代码层面分析了storm的具体实现,接下来通过具体的实例来说明storm的使用。因为目前storm已经正式迁移到Apache,文章系列也由twitter storm转为apache storm.

WordCountTopology 使用storm来统计文件中的每个单词的出现次数。

通过该例子来说明tuple发送时的几个要素

  1. source component   发送源
  2. destination component 接收者
  3. stream 消息通道
  4. tuple    消息本身

本文涉及到的开发环境搭建可以参考前面的两篇博文。

awk实现

其实对文件中的单词进行统计是Linux下一个很常见的任务,用awk就可以轻松的解决(如果文件不是太大的话),下面是进行word counting的awk脚本,将其保存为名为wordcount.awk文件。

wordcount.awk

{  for (i = 1; i<=NF; i++)    freq[$i]++}END{  for (word in freq)    printf "%s\t%d\n",word,freq[word]}

运行该脚本,对文件中的单词进行统计

gawk -f wordcount.awk filename

原始版本

从github上复制内容

git clone https://github.com/nathanmarz/storm-starter.git

编译运行

lein depslein compilejava -cp $(lein classpath) WordCountTopology

main函数

main函数的主要内容

TopologyBuilder builder = new TopologyBuilder();    builder.setSpout("spout", new RandomSentenceSpout(), 5);    builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");    builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));

注意:grouping操作的时候,如果没有显示指定stream id,则使用的是default stream. 如shuffleGrouping("spout")表示从名为spout的component中接收从default stream发送过来的tuple.

改进版本

在原始版本中,spout不停的向split bolt随机发送句子,Count bolt统计每个单词出现的次数。

那么能不能让Spout在读取完文件之后,通知下游的bolt显示最柊的统计结果呢?

要想达到上述的改进目标,采用如上图所示的结构即可。改变的地方如下,

  1. 在Spout中添加一个SUCCESS_STREAM
  2. 添加只有一个运行实例的statistics bolt
  3. 当spout读取完文件内容之后,通过SUCCESS_STREAM告诉statistics bolt,文件已经处理完毕,可以打印当前的统计结果

RandomSentenceSpout.java

declareOutputFields

添加SUCCESS_STREAM

@Override  public void declareOutputFields(OutputFieldsDeclarer declarer) {    declarer.declare(new Fields("word"));    declarer.declareStream("SUCCESS_STREAM",new Fields("word"));  }

nextTuple

使用SUCCESS_STREAM通知下游,文件处理完毕

@Override  public void nextTuple() {    Utils.sleep(100);    String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",        "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };    if ( count == sentences.length )     {      System.out.println(count+" try to emit tuple by success_stream");      _collector.emit("SUCCESS_STREAM",new Values(sentences[0]));      count++;    }else if ( count < sentences.length ){      _collector.emit(new Values(sentences[count]));      count++;    }  }

WordCountTopology.java

添加静态类WordCount2

public static class WordCount2 extends BaseBasicBolt {    Map
counts = new HashMap
(); @Override public void execute(Tuple tuple, BasicOutputCollector collector) { if ( tuple.getSourceStreamId() == "SUCCESS_STREAM" ) { System.out.println("prepare to print the statistics"); for (String key : counts.keySet()) { System.out.println(key+"\t"+counts.get(key)); } System.out.println("finish printing"); }else { String word = tuple.getString(0); Integer count = counts.get(word); if (count == null) count = 0; count++; counts.put(word, count); } }

 

main函数

将spout的并行数由5改为1

builder.setSpout("spout", new RandomSentenceSpout(), 1);

在原有的Topology中添加WordCount2 Bolt

builder.setBolt("count2", new WordCount2(), 1).globalGrouping("count").globalGrouping("spout","SUCCESS_STREAM");

WordCount2 Bolt会接收从Count Bolt通过default stream发送的tuple,同时接收Spout通过SUCCESS_STREAM发送的tuple,也就是说wordcount2会接收从两个stream来的数据。

编译

编译修改后的源文件

cd $STROM_STARTER lein compile storm.starter

可能会出现以下异常信息,该异常可以忽略。

Exception in thread "main" java.io.FileNotFoundException: Could not locate storm/starter/WordCountTopology__init.class or storm/starter/WordCountTopology.clj on classpath:

运行

在local模式下运行修改后的WordCountTopology

java -cp $(lein classpath) storm.starter.WordCountTopology

如果一切正常,日志如下所示,线程的名字可能会有所不同。

moon    1 score    1 cow    1 doctor    1 over    1 nature    1 snow    1 four    1 keeps    1 with    1 a    1 white    1 dwarfs    1 at    1 the    4 and    2 i    1 two    1 away    1 seven    2 apple    1 am    1 an    1 jumped    1 day    1 years    1 ago    1

 结果验证

可以将WordCountTopology的运行结果和awk脚本的运行结果相比对,结果应该是一致的。

小技巧

  1. awk脚本的执行结果存为一个文件result1.log, WordCountTopology的输出中单词统计部分存为result2.log
  2. 用vim打开result1.log,进行sorting,保存结果;用vim打开result2.log,进行sorting,保存。
  3. 然后用vimdiff来进行比较 vimdiff result1.log result2.log

转载于:https://www.cnblogs.com/hseagle/p/3505938.html

你可能感兴趣的文章
Android数字选择器-NumberPicker
查看>>
Android SDK Manager 更新失败的解决方法
查看>>
Java并发编程:volatile关键字解析
查看>>
Java实现分页数据获取CachedRowSet
查看>>
Lambda应用设计模式
查看>>
4.2. MuseScore
查看>>
C#使用OleDB操作ACCESS插入数据时提示:参数 @p_Contract 没有默认值
查看>>
HTML基础8--CSS、滑动门
查看>>
Oracle Database 9i/10g安装后的基本环境与服务
查看>>
IBatis 简易框架搭建
查看>>
冰血暴第一季/全集Fargo迅雷下载
查看>>
c语言_文件操作_FILE结构体解释_涉及对操作系统文件FCB操作的解释_
查看>>
解读Google分布式锁服务
查看>>
C#多线程教程(1):BeginInvoke和EndInvoke方法,解决主线程延时Thread.sleep柱塞问题(转)...
查看>>
requires a peer of grunt@&gt;=0.4.0
查看>>
[ACM_数据结构] 线段树模板
查看>>
2017双11技术揭秘—阿里巴巴数据库技术架构演进
查看>>
6.2. EnableWebSecurity
查看>>
路由器的基本设置
查看>>
5.5 进入编辑模式 5.6 vim命令模式 5.7 vim实践
查看>>