整合营销服务商

电脑端+手机端+微信端=数据同步管理

免费咨询热线:

Apache Storm v2.0入门项目的开发、测试和运行(IDEA/Maven)

一个Apache Storm v2.0流计算入门项目的开发、测试和运行(IDEA/Maven)

关于流计算框架Apache Storm最新版的安装,可以参考:

流计算框架-最新版Apache Storm v2.0单机模式安装详细步骤

流计算框架Apache Storm核心概念、架构设计

一、基于IDEA/Maven创建一个Storm应用

应用名称:firststorm

二、添加storm-client的Maven jar包依赖

storm-client 依赖包信息,添加到项目的pom.xml文件中。

<dependency>

<groupId>org.apache.storm</groupId>

<artifactId>storm-client</artifactId>

<version>2.0.0</version>

</dependency>

maven会自动下载相关依赖并放到Maven Dependencies下,这些jar包可以点击下拉查看,并且会自动添加到项目classpath中,作为编译使用,等jar包全部下载完毕,现在开始编写具体的计算逻辑了,在这个项目中我们把所有的类都建立在包com.rickie.bigdata.firststorm下。

storm提供了两种运行模式:本地模式(Local Mode)和分布式模式。本地模式针对开发调试storm topologies非常有用。

因为多数程序开发者都是使用windows系统进行程序开发,如果在本机不安装storm环境的情况下,如何在本地开发、调试storm程序呢?你可以参考本文提供的解决方案。

如下来自Storm 官方文档:

http://storm.apache.org/releases/2.0.0-SNAPSHOT/Local-mode.html

Local Mode

Local mode simulates a Storm cluster in process and is useful for developing and testing topologies. Running topologies in local mode is similar to running topologies on a cluster.

To run a topology in local mode you have two options. The most common option is to run your topology with storm local instead of storm jar.

This will bring up a local simulated cluster and force all interactions with nimbus to go through the simulated cluster instead of going to a separate process.

If you want to do some automated testing but without actually launching a storm cluster you can use the same classes internally that storm local does.

To do this you first need to pull in the dependencies needed to access these classes. For the java API you should depend on storm-server as a test dependency.

To create an in-process cluster, simply use the LocalCluster class.

如上文所述,使用本地模式(Local Mode),需要先引入storm-server 依赖包。

<dependency>

<groupId>org.apache.storm</groupId>

<artifactId>storm-server</artifactId>

<version>2.0.0</version>

<scope>test</scope>

</dependency>

引入的storm-server 依赖包。

在本地模式上运行topology类似在一个集群上运行topology。

创建一个本地集群,大致代码如下所示:

  • import org.apache.storm.LocalCluster;
  • LocalCluster cluster = new LocalCluster();
  • 提交集群使用submitTopology,
  • 杀死集群使用killTopology
  • 关闭一个本地集群使用cluster.shutdown();

完整代码可以参考下面。

三、Storm 应用的代码逻辑开发

(1)首先建立RandomSpout类作为数据源,并且继承于父类BaseRichSpout,确定后可以看到系统自动补全3个方法:nextTuple,open和declareOutputFields。

我们现在就需要重写这3个方法,open方法是数据源的初始化,nextTuple的作用是把Tuple发送至下游,declareOutputFields用来定义输出字段,下面我们手动分配一个数组,并且随机取里面的元素,代码如下:

package com.rickie.bigdata;

import org.apache.storm.spout.SpoutOutputCollector;

import org.apache.storm.task.TopologyContext;

import org.apache.storm.topology.OutputFieldsDeclarer;

import org.apache.storm.topology.base.BaseRichSpout;

import org.apache.storm.tuple.Fields;

import org.apache.storm.tuple.Values;

import java.util.Map;

import java.util.Random;

public class RandomSpout extends BaseRichSpout {

private SpoutOutputCollector collector;

private static String[] words = {"Rickie", "Hadoop", "MapReduce", "Storm", "Spark", "Spark Streaming", "Flink"};

@Override

public void open(Map<String, Object> map, TopologyContext topologyContext,

SpoutOutputCollector spoutOutputCollector) {

this.collector = spoutOutputCollector;

}

@Override

public void nextTuple() {

String word = words[new Random().nextInt(words.length)];

collector.emit(new Values(word));

}

@Override

public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

outputFieldsDeclarer.declare(new Fields("randomString"));

}

}

(2)然后新建一个类SenqueceBolt,继承于BaseBasicBolt类,并且重写方法execute和declareOutputFields。

这个类就是用于执行具体的作业,准确的说是execute方法用来执行相关的计算,这里只是简单的输出,代码如下:

package com.rickie.bigdata;

import org.apache.storm.topology.BasicOutputCollector;

import org.apache.storm.topology.OutputFieldsDeclarer;

import org.apache.storm.topology.base.BaseBasicBolt;

import org.apache.storm.tuple.Tuple;

public class SequenceBolt extends BaseBasicBolt {

@Override

public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {

String word = (String) tuple.getValue(0);

String out = "Hello " + word +"!";

System.out.println(out);

}

@Override

public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

}

}

(3)最后建立一个类FirstStorm。

这个类是主类,在main方法中定义Topology,并且综合设置Spout和Bolt,从而调用其中的方法,这里流式计算时间设置为30s,代码如下:

package com.rickie.bigdata;

import org.apache.storm.Config;

import org.apache.storm.LocalCluster;

import org.apache.storm.StormSubmitter;

import org.apache.storm.topology.TopologyBuilder;

import org.apache.storm.utils.Utils;

public class FirstStorm

{

public static void main( String[] args ) throws Exception {

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("spout", new RandomSpout());

builder.setBolt("bolt", new SequenceBolt()).shuffleGrouping("spout");

Config conf = new Config();

conf.setDebug(false);

String name = "firststorm";

if(args != null && args.length >0){

name = args[0];

conf.setNumWorkers(3);

try {

StormSubmitter.submitTopology(name, conf, builder.createTopology());

} catch (Exception e) {

e.printStackTrace();

}

} else {

try(LocalCluster cluster = new LocalCluster()) {

cluster.submitTopology("firststorm", conf, builder.createTopology());

Utils.sleep(10000);

cluster.killTopology("firststorm");

cluster.shutdown();

}

}

System.out.println( "Well done!" );

}

}

四、运行调试Storm应用

可以用本地模式运行,在IDEA中直接运行即可,方便开发调试。

在Console可以看到如下输出信息:

接下来我们将这个项目放到Storm服务器集群中运行。

storm jar 命令用于启动一个Topology。

(1)以本地模式(Local Mode)运行

storm local firststorm-1.0-SNAPSHOT.jar com.rickie.bigdata.FirstStorm

运行结果和之前IDEA本地模式运行输出类似。

(2)以分布式模式运行

storm jar firststorm-1.0-SNAPSHOT.jar com.rickie.bigdata.FirstStorm RickieStorm

可以查看worker日志,看到SequenceBolt 线程输出的信息。只有kill 这个topology,日志输出信息才会终止。

在Storm架构中,Topology代表的并不是确定的作业,而是持续的计算过程,在确定的业务逻辑处理框架下,输入数据源源不断地进入系统,经过流式处理后以较低的延迟产生输出。如果不主动结束这个Topology或者关闭Storm集群,那么数据处理的过程就会持续地进行下去。

另外需要注意的是:由于在分布式模式下运行,worker工作在独立的进程中,因此无法直接在storm jar命令行输出窗口,看到上述SequenceBolt组件的输出信息。

(3)storm list 查看正在运行的topologies和它们的状态

storm list

也可以通过访问http://192.168.56.103:8080/ storm server,查看并操作正在运行的 topology。

(4)kill正在运行的topology

storm kill [storm name]

运行“storm kill”这个命令,仅仅只是调用Nimbus的Thirft接口去kill掉相对应的Topology。

Nimbus接受到kill命令,会将”kill”事务应用到topology上,修改Topology的状态为”killed”以及将“remove”事件列入到未来几秒钟的计划中,即未来几秒后会触发remove时间。这里的kill实际上停止相关的Worker。

默认kill的等待时间是Topology消息的超时时间,但是可以通过storm kill命令中的-w标志对其进行重写,设置了以上参数之后,topology会在指定的等待时间停止运行。这样给了Topology一个机会在shutdown workers之后完成当前没有处理完成的任务;删除Topology以及清理zookeeper中的分配信息和静态信息;清理存储在本地的心跳dir和jar/configs。

现在,第一个Storm入门项目的开发和测试运行都完毕了,更复杂的流计算逻辑模式也基本相同,主要就是Maven项目中出现了更复杂的模块和调用,整个运行的流程其实都是差不多的。恭喜你,现在算是步入Storm流式计算的殿堂的大门了。

五、Storm架构是如何解决Hadoop架构瓶颈的?

  • Storm的Topology只需初始化一次。在将Topology提交到Storm集群的时候,集群会针对该Topology做一次初始化的工作。此后,在Topology运行过程中,对于输入数据而言,是没有计算框架初始化耗时的,有效避免了计算框架初始化的时间损耗。
  • Storm使用Netty作为底层的消息队列来传递消息,保证消息能够得到快速的处理。
  • 同时Storm采用内存计算模式,无需借助文件存储,直接通过网络直传中间计算结果,避免了组件之间传输数据的大量时间损耗。

六、Apache Storm中的核心概念

  • Topology:一个实时计算任务被称作为Topology,包含Spout和Bolt。

  • Tuple:数据模型,代表处理单元,可以包含多个Field,K/V的Map。
  • Worker:一个topology可能会在一个或者多个worker(工作进程)里面执行,每个worker是一个物理JVM并且执行整个topology的一部分。比如,对于并行度是300的topology来说,如果我们使用50个工作进程worker来执行,那么每个工作进程会处理其中的6个tasks。Storm会尽量均匀的工作分配给所有的worker,setBolt 的最后一个参数是你想为bolts的并行量。
  • Spouts
  1. 消息源Spout是Storm里面一个topology里面的消息生产者。一般来说消息源会从一个外部源读取数据并且向topology里面发出消息:tuple。Spout可以是可靠的也可以是不可靠的,如果这个tuple没有被storm成功处理,可靠的消息源spouts可以重新发射一个tuple,但是不可靠的消息源spouts一旦发出一个tuple就不能重发了。
  2. 消息源可以发射多条消息流stream。使用OutputFieldsDeclarer。declareStream来定义多个stream,然后使用SpoutOutputCollector来发射指定的stream。代码上是这样的:collector.emit(new Values(str));
  3. Spout类里面最重要的方法是nextTuple。要么发射一个新的tuple到topology里面或者简单的返回如果已经没有新的tuple。要注意的是nextTuple方法不能阻塞,因为storm在同一个线程上面调用所有消息源spout的方法。另外两个比较重要的spout方法是ack和fail。storm在检测到一个tuple被整个topology成功处理的时候调用ack,否则调用fail。storm只对可靠的spout调用ack和fail。
  • Bolts
  1. 所有的消息处理逻辑被封装在bolts里面。Bolts可以做很多事情:过滤,聚合,查询数据库等等。
  2. Bolts可以简单的做消息流的传递(来一个元组,调用一次execute)。复杂的消息流处理往往需要很多步骤,从而也就需要经过很多bolts。比如算出一堆图片里面被转发最多的图片就至少需要两步:第一步算出每个图片的转发数量,第二步找出转发最多的前10个图片。(如果要把这个过程做得更具有扩展性那么可能需要更多的步骤)。
  3. Bolts可以发射多条消息流, 使用OutputFieldsDeclarer.declareStream定义stream,使用OutputCollector.emit来选择要发射的stream。
  4. Bolts的主要方法是execute,它以一个tuple作为输入,bolts使用OutputCollector来发射tuple(spout使用SpoutOutputCollector来发射指定的stream),bolts必须要为它处理的每一个tuple调用OutputCollector的ack方法,以通知Storm这个tuple被处理完成了,从而通知这个tuple的发射者spouts。一般的流程是: bolts处理一个输入tuple, 发射0个或者多个tuple, 然后调用ack通知storm自己已经处理过这个tuple了。storm提供了一个IBasicBolt会自动调用ack。

程链接:https://www.itwangzi.cn/4907.html

Storm 实现了低延迟,还做不到高吞吐,也不能在故障发生时准确地处理计算状态;Spark Streaming通过采用微批处理方法实现了高吞吐和容错性,但是牺牲了低延迟和实时处理能力,也不能使窗口与自然时间相匹配,并且表现力欠佳。而flink就是目前为止的最佳答案。

我们在选择一个新的技术框架的时候,首先考虑的是他的应用场景,再牛逼的框架没有应用场景也是一无是处,当然牛逼的框架大多都是基于某一个或者某一类应用场景而产生,而flink主要应用于以下三个场景:

Flink 主要应用场景有三类:

1.Event-driven Applications【事件驱动】

2.Data Analytics Applications【分析】

3.Data Pipeline Applications【管道式ETL】

Event-driven Applications

上图包含两块:

Traditional transaction Application(传统事务应用)

Event-driven Applications(事件驱动应用)

Traditional transaction Application执行流程:

比如点击流Events可以通过Application写入Transaction DB(数据库),同时也可以通过Application从Transaction DB将数据读出,并进行处理,当处理结果达到一个预警值就会触发一个Action动作,这种方式一般为事后诸葛亮。

Event-driven Applications执行流程:

比如采集的数据Events可以不断的放入消息队列,Flink应用会不断ingest(消费)消息队列中的数据,Flink 应用内部维护着一段时间的数据(state),隔一段时间会将数据持久化存储(Persistent sstorage),防止Flink应用死掉。Flink应用每接受一条数据,就会处理一条数据,处理之后就会触发(trigger)一个动作(Action),同时也可以将处理结果写入外部消息队列中,其他Flink应用再消费。

典型的事件驱动类应用:

1.欺诈检测(Fraud detection)

2.异常检测(Anomaly detection)

3.基于规则的告警(Rule-based alerting)

4.业务流程监控(Business process monitoring)

5.Web应用程序(社交网络)

Data Analytics Applications

Data Analytics Applications包含Batch analytics(批处理分析)和Streaming analytics(流处理分析)。

Batch analytics可以理解为周期性查询:

比如Flink应用凌晨从Recorded Events中读取昨天的数据,然后做周期查询运算,最后将数据写入Database或者HDFS,或者直接将数据生成报表供公司上层领导决策使用。

Streaming analytics可以理解为连续性查询:

比如实时展示双十一天猫销售GMV,用户下单数据需要实时写入消息队列,Flink 应用源源不断读取数据做实时计算,然后不断的将数据更新至Database或者K-VStore,最后做大屏实时展示。

Data Pipeline Applications

Data Pipeline Applications包含Periodic (周期性)ETL和Data Pipeline(管道)

Periodic ETL:比如每天凌晨周期性的启动一个Flink ETL Job,读取传统数据库中的数据,然后做ETL,最后写入数据库和文件系统。

Data Pipeline:比如启动一个Flink 实时应用,数据源(比如数据库、Kafka)中的数据不断的通过Flink Data Pipeline流入或者追加到数据仓库(数据库或者文件系统),或者Kafka消息队列。


阿里Flink应用场景

阿里在Flink的应用主要包含四个模块:实时监控、实时报表、流数据分析和实时仓库

实时监控:

  1. 用户行为预警、app crash 预警、服务器攻击预警

  2. 对用户行为或者相关事件进行实时监测和分析,基于风控规则进行预警

实时报表:

  1. 双11、双12等活动直播大屏

  2. 对外数据产品:生意参谋等

  3. 数据化运营

流数据分析:

  1. 实时计算相关指标反馈及时调整决策

  2. 内容投放、无线智能推送、实时个性化推荐等

实时仓库:

  1. 数据实时清洗、归并、结构化

  2. 数仓的补充和优化

欺诈检测

背景:

假设你是一个电商公司,经常搞运营活动,但收效甚微,经过细致排查,发现原来是羊毛党在薅平台的羊毛,把补给用户的补贴都薅走了,钱花了不少,效果却没达到。怎么办呢?

你可以做一个实时的异常检测系统,监控用户的高危行为,及时发现高危行为并采取措施,降低损失。

系统流程:

1.用户的行为经由app 上报或web日志记录下来,发送到一个消息队列里去;

2.然后流计算订阅消息队列,过滤出感兴趣的行为,比如:购买、领券、浏览等;

3.流计算把这个行为特征化;

4.流计算通过UDF调用外部一个风险模型,判断这次行为是否有问题(单次行为);

5.流计算里通过CEP功能,跨多条记录分析用户行为(比如用户先做了a,又做了b,又做了3次c),整体识别是否有风险;

6.综合风险模型和CEP的结果,产出预警信息。

《大数据和人工智能交流》头条号向广大初学者新增C 、Java 、Python 、Scala、javascript 等目前流行的计算机、大数据编程语言,希望大家以后关注本头条号更多的内容。


一、搭建zookeeper集群

注意:为了大家学习的方便,这里在一台机器上搭建zookeeper集群,在一个机器搭建集群和在多台机器搭建集群原理是相同的。搭建单节点(一台主机放3个服务【1个leader,2个flower】)zk集群

在/home建立zk1、zk2、zk3三个目录,3个目录搭建过程类似。

(一)搭建过程

【1】搭建节点1

(1)在zk1建立zkdata ,在zkdata下存放myid文件

[root@node1 zkdata]# more myid

1

(2) tar -zxvf zookeeper-3.4.5.tar.gz

[root@node1 zk1]# ls

zkdata zookeeper-3.4.5 zookeeper-3.4.5.tar.gz

(3)修改配置文件

[root@node1 zk1]# cd zookeeper-3.4.5

[root@node1 zookeeper-3.4.5]# cd conf

[root@node1 zookeeper-3.4.5]# mv zoo_sample.cfg zoo.cfg

[root@node1 conf]# vi zoo.cfg

注意:修改的配置信息

dataDir=/tmp/zookeeper

# the port at which the clients will connect

clientPort=2181

dataDir=/home/zk1/zkdata

server.1=node1:2888:3888

server.2=node1:2889:3889

server.3=node1:2890:3890

【2】搭建节点2

(1)在zk1建立zkdata ,在zkdata下存放myid文件

[root@node1 zkdata]# more myid

2

(2) tar -zxvf zookeeper-3.4.5.tar.gz

[root@node1 zk1]# ls

zkdata zookeeper-3.4.5 zookeeper-3.4.5.tar.gz

(3)修改配置文件

[root@node1 zk1]# cd zookeeper-3.4.5

[root@node1 zookeeper-3.4.5]# cd conf

[root@node1 zookeeper-3.4.5]# mv zoo_sample.cfg zoo.cfg

[root@node1 conf]# vi zoo.cfg

注意:修改的配置信息

dataDir=/tmp/zookeeper

# the port at which the clients will connect

clientPort=2182

dataDir=/home/zk2/zkdata

server.1=node1:2888:3888

server.2=node1:2889:3889

server.3=node1:2890:3890

【3】搭建节点3

(1)在zk1建立zkdata ,在zkdata下存放myid文件

[root@node1 zkdata]# more myid

3

(2) tar -zxvf zookeeper-3.4.5.tar.gz

[root@node1 zk1]# ls

zkdata zookeeper-3.4.5 zookeeper-3.4.5.tar.gz

(3)修改配置文件

[root@node1 zk1]# cd zookeeper-3.4.5

[root@node1 zookeeper-3.4.5]# cd conf

[root@node1 zookeeper-3.4.5]# mv zoo_sample.cfg zoo.cfg

[root@node1 conf]# vi zoo.cfg

注意:修改的配置信息

dataDir=/tmp/zookeeper

# the port at which the clients will connect

clientPort=2183

dataDir=/home/zk3/zkdata

server.1=node1:2888:3888

server.2=node1:2889:3889

server.3=node1:2890:3890


(二)、启动集群

1、启动各个

[root@node1 bin]# ./zkServer.sh start

JMX enabled by default

Using config: /home/zk1/zookeeper-3.4.5/bin/../conf/zoo.cfg

Starting zookeeper ... ./zkServer.sh: line 103: [: /tmp/zookeeper: binary operator expected

STARTED

[root@node1 bin]#

2、在zk1、zk2、zk3各个目录查看节点状态

[root@node1 bin]# ./zkServer.sh status

JMX enabled by default

Using config: /home/zk1/zookeeper-3.4.5/bin/../conf/zoo.cfg

Mode: follower


二、搭建storm集群

(一)、准备工作

1、zk必须正常,3个节点联网通信正常。我这里是3个节点:

192.168.100.1是主节点

192.168.100.2是从节点

192.168.100.3是从节点

将包上传-->解压 -->改名-->这里改为storm

[root@node1 storm001]# ls

apache-storm-0.9.3.tar.gz storm

2、修改配置文件

[root@node1 storm001]# cd storm

[root@node1 storm]# ls

bin conf examples lib logback NOTICE README.markdown SECURITY.md

CHANGELOG.md DISCLAIMER external LICENSE logs public RELEASE storm.jar

[root@node1 storm]# cd conf

[root@node1 conf]# ls

storm_env.ini storm.yaml

[root@node1 conf]# vi storm.yaml

storm.zookeeper.servers:

- "192.168.100.1"

- "192.168.100.1"

- "192.168.100.1"

nimbus.host: "192.168.100.1"

storm.local.dir: "/usr/local/storm/tmp"

supervisor.slots.ports:

- 6700

- 6701

- 6702

- 6703

解释:

(1)storm.local.dir:存储目录

(2)nimbus.host: "192.168.100.1"代表选择主机

(3)supervisor.slots.ports:对于每个supervisor机器,通过这项配置可以决定运行多少个worker进程在

这台机器上,每个worker使用一个单独的port来接受消息。supervisor并不会立即启动这个4个worker进程,

当接收到分配的任务的时候会启动,具体启动多少个worker根据我们Topology在这个supervisor需要几个worker

来确定

3、把storm目录拷贝到其它2个节点

[root@node1 storm001]# ls

apache-storm-0.9.3.tar.gz storm

scp -r storm node2:/home/storm001

scp -r storm node3:/home/storm001

(二)、启动集群

1、启动主节点nimbus

[root@node1 bin]# ls

storm storm.cmd storm-config.cmd

[root@node1 bin]# ./storm nimbus

可以使用linux命令将其放到后台./storm nimbus >/dev/null 2>&1 &

在主节点通过web前端访问的进程

./storm ui

./storm ui >/dev/null 2>&1 &

2、启动从节点node2

./storm supervisor

./storm supervisor >/dev/null 2>&1 &

从节点需要启动日志进程

./storm logviewer

3、启动从节点node3

./storm supervisor

./storm supervisor >/dev/null 2>&1 &

从节点需要启动日志进程

./storm logviewer

4、进入storm的ui界面:

http://192.168.100.1:8080/index.html

5、将java开发的storm应用打包上传服务器

bin/storm jar storm.jar com.test.A001



《大数据和人工智能交流》的宗旨

1、将大数据和人工智能的专业数学:概率数理统计、线性代数、决策论、优化论、博弈论等数学模型变得通俗易懂。

2、将大数据和人工智能的专业涉及到的数据结构和算法:分类、聚类 、回归算法、概率等算法变得通俗易懂。

3、最新的高科技动态:数据采集方面的智能传感器技术;医疗大数据智能决策分析;物联网智慧城市等等。

根据初学者需要会有C语言、Java语言、Python语言、Scala函数式等目前主流计算机语言。

根据读者的需要有和人工智能相关的计算机科学与技术、电子技术、芯片技术等基础学科通俗易懂的文章。