Storm入门秘籍

0、需求

为了分析广告投放的效果,为了计算投放广告的收益,我们需要实时收集用户的请求日志,曝光日志,点击日志,我们需要一个实时计算平台。

Storm是开源的、分布式、流式计算系统。

什么是分布式呢?就是将一个任务拆解给多个计算机去执行,让许多机器共通完成同一个任务,把这个多机的细节给屏蔽,对外提供同一个接口、同一个服务,这样的系统就是分布式系统,如:storm集群、redis集群、mysql集群等等。

本篇文章,我们仅介绍storm的快速入门,基础案例,后续我们会分享:storm的集群架构、并行度、可靠性、在广告系统中的应用

一、引言

大数据一直是近年的热点话题,随着数据量的急速增长,数据处理的规模也从GB 级别增长到TB 级别,很多图像应用领域已经开始处理PB 级别的数据分析。大数据的核心目标是提升业务的竞争力,找到一些可以采取行动的洞察(Actionable Insight),数据分析就是其中的核心技术,包括数据收集、处理、建模和分析,最后找到改进业务的方案。

整个数据分析的基础架构通常分为以下几类。

之前我们讲过了Hadoop基础实例,这篇文章,我们重点讲下storm的基础入门,为后续进行日志分析做好理论铺垫。欢迎持续关注本头条号,后面精彩不容错过……也会讲解druid

storm如何实现高效的处理(初识实时流处理Storm)(1)

二、编程模型&数据处理流程图

1、编程模型

storm如何实现高效的处理(初识实时流处理Storm)(2)

如图所示:这样的一个Topology(拓扑),在Storm中,就称为用户的一个作业)。这个拓扑包含了许多的节点,以及这些节点之间的边。这些点有两种:数据源节点(Spout)、计算节点(Bolt),点之间的边称为数据流(stream),数据流又由很多Tuple组成。

在图中这个Topology里面,我们看到了两个Spout和5个Bolt,在实际运行的时候,每个Spout节点都可能有很多个实例,每个Bolt也有可能有很多个实例,下篇文章我们会详细介绍下storm的并行度。

2、数据处理流程图

结合上面的变成模型,便有了下面的数据处理流程图:

storm如何实现高效的处理(初识实时流处理Storm)(3)

每一个“水龙头”表示一个Spout,它会不间断地发送一些Tuple给下游的Bolt,这些Bolt经过处理,再发送一个Tuple给下一个Bolt,最后,在这些Bolt里面是可以执行一些写数据到外部存储(如数据库)等操作的。

3、专业术语概览

1、Topology

实时应用程序的逻辑被封装在 Storm topology(拓扑)中. Storm topology(拓扑)类似于 MapReduce 作业. 两者之间关键的区别是 MapReduce 作业最终会完成, 而 topology(拓扑)任务会永远运行(除非 kill 掉它). 一个拓扑是 Spout 和 Bolt 通过 stream groupings 连接起来的有向无环图.

2:Stream

拓扑中的消息流,传输的对象是Tuple,由一系列连续的 Tuple 组成。

stream 是 Storm 中的核心概念.一个 stream 是一个无界的、以分布式方式并行创建和处理的 Tuple 序列. 默认情况下 Tuple 可以包含 integers, longs, shorts, bytes, strings, doubles, floats, booleans, and byte arrays 等数据类型.你也可以定义自己的 serializers, 以至于可以在 Tuple 中使用自定义的类型.

每一个流在声明的时候会赋予一个 ID. 由于只包含一个 stream 的 Spout 和 Bolt 比较常见, OutputFieldsDeclarer有更方便的方法可以定义一个单一的 stream 而不用指定ID. 这个 stream 被赋予一个默认的 ID, "default".

3:Spout

Spout 是一个 topology(拓扑)中 streams 的源头. 通常 Spout 会从外部数据源读取 Tuple,然后把他们发送到拓扑中(如 Kestel 队列, 或者 Twitter API).

4:Bolt

Storm编程模型中的处理组件,定义execute方法进行实际的数据逻辑处理。拓扑中所有的业务处理都在 Bolts 中完成. Bolt 可以做很多事情,过滤, 函数, 聚合, 关联, 与数据库交互等.

Bolt 可以做简单 stream 转换. 复杂的 stream 转换一般需要多个步骤).

5:Tuple

一次消息传递的基本单元

6:Stream Groupings数据流分组策略

topology(拓扑)定义中有一部分是为每一个 bolt 指定输入的 streams . stream grouping 定义了stream 如何在 Bolts tasks 之间分区.

Bolt 和 Sport 都是多线程的,数据在流动的时候,在多个线程中决定流向哪一个线程 就需要Stream Groupings。有些类似SQL中的Group By,用来制定这些计算是怎么分组的。

Shuffle Grouping:随机分组,保证bolt接受的tuple数据相同。

Fileds Grouping:按字段分组,相同tuple会分到一个bolt中。同一个单词会指定到同一个线程里。

Global Grouping:全局分组,所有tuple发送给task_id最小的bolt。

三、流处理编程结构介绍

其实,几乎都是标准模板结构:我们往里套就OK了

1、spout

在 Storm 的结构中 spout 承担了消息的生成功能。常用方法如下:

/** * Called when a task for this component is initialized within a worker on the cluster. * It provides the spout with the environment in which the spout executes. */ void open(Map conf, TopologyContext context, SpoutOutputCollector collector); ​ /** * When this method is called, Storm is requesting that the Spout emit tuples to the * output collector. This method should be non-blocking, so if the Spout has no tuples * to emit, this method should return. nextTuple, ack, and fail are all called in a tight * loop in a single thread in the spout task. When there are no tuples to emit, it is courteous * to have nextTuple sleep for a short amount of time (like a single millisecond) * so as not to waste too much CPU. */ void nextTuple(); ​ /** * Declare the output schema for all the streams of this topology. */ void declareOutputFields(OutputFieldsDeclarer declarer); ​

上面是原文介绍,这里做些简单总结:

2、bolt

Bolt 是基本的消息处理单元,可以处理所有的逻辑操作。常用方法如下:

/** * Called when a task for this component is initialized within a worker on the cluster. * It provides the bolt with the environment in which the bolt executes. */ void prepare(Map stormConf, TopologyContext context, OutputCollector collector); ​ /** * Process a single tuple of input. The Tuple object contains metadata on it * about which component/stream/task it came from. The values of the Tuple can * be accessed using Tuple#getValue. The IBolt does not have to process the Tuple * immediately. It is perfectly fine to hang onto a tuple and process it later * (for instance, to do an aggregation or join). * * Tuples should be emitted using the OutputCollector provided through the prepare method. * It is required that all input tuples are acked or failed at some point using the OutputCollector. * Otherwise, Storm will be unable to determine when tuples coming off the spouts * have been completed. * * For the common case of acking an input tuple at the end of the execute method, * see IBasicBolt which automates this. */ void execute(Tuple input); ​ /** * Declare the output schema for all the streams of this topology. @param declarer this is used to declare output stream ids, output fields, and whether or not each output stream is a direct stream */ void declareOutputFields(OutputFieldsDeclarer declarer);

上面是原文介绍,这里做些简单总结:

四、Storm基础实例

代码下载地址:https://gitee.com/jikeh/JiKeHCN-RELEASE.git

项目名:spring-boot-storm

1、求和:1 2 3……n

1)topology架构图:

storm如何实现高效的处理(初识实时流处理Storm)(4)

2)代码实现

2、wordcount:单词计数

源数据:

welcome to visit our website jikeh and take a look at our premium courses welcome to visit our toutiao and take a look at our courses welcome to visit our toutiao and take a look at our articles

1)topology架构图:

storm如何实现高效的处理(初识实时流处理Storm)(5)

2)代码实现

,