一个Apache Storm v2.0流计算入门项目的开发、测试和运行(IDEA/Maven)
关于流计算框架Apache Storm最新版的安装,可以参考:
流计算框架-最新版Apache Storm v2.0单机模式安装详细步骤
流计算框架Apache Storm核心概念、架构设计
应用名称:firststorm
storm-client 依赖包信息,添加到项目的pom.xml文件中。
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-client</artifactId>
<version>2.0.0</version>
</dependency>
maven会自动下载相关依赖并放到Maven Dependencies下,这些jar包可以点击下拉查看,并且会自动添加到项目classpath中,作为编译使用,等jar包全部下载完毕,现在开始编写具体的计算逻辑了,在这个项目中我们把所有的类都建立在包com.rickie.bigdata.firststorm下。
storm提供了两种运行模式:本地模式(Local Mode)和分布式模式。本地模式针对开发调试storm topologies非常有用。
因为多数程序开发者都是使用windows系统进行程序开发,如果在本机不安装storm环境的情况下,如何在本地开发、调试storm程序呢?你可以参考本文提供的解决方案。
如下来自Storm 官方文档:
http://storm.apache.org/releases/2.0.0-SNAPSHOT/Local-mode.html
Local Mode
Local mode simulates a Storm cluster in process and is useful for developing and testing topologies. Running topologies in local mode is similar to running topologies on a cluster.
To run a topology in local mode you have two options. The most common option is to run your topology with storm local instead of storm jar.
This will bring up a local simulated cluster and force all interactions with nimbus to go through the simulated cluster instead of going to a separate process.
If you want to do some automated testing but without actually launching a storm cluster you can use the same classes internally that storm local does.
To do this you first need to pull in the dependencies needed to access these classes. For the java API you should depend on storm-server as a test dependency.
To create an in-process cluster, simply use the LocalCluster class.
如上文所述,使用本地模式(Local Mode),需要先引入storm-server 依赖包。
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-server</artifactId>
<version>2.0.0</version>
<scope>test</scope>
</dependency>
引入的storm-server 依赖包。
在本地模式上运行topology类似在一个集群上运行topology。
创建一个本地集群,大致代码如下所示:
完整代码可以参考下面。
(1)首先建立RandomSpout类作为数据源,并且继承于父类BaseRichSpout,确定后可以看到系统自动补全3个方法:nextTuple,open和declareOutputFields。
我们现在就需要重写这3个方法,open方法是数据源的初始化,nextTuple的作用是把Tuple发送至下游,declareOutputFields用来定义输出字段,下面我们手动分配一个数组,并且随机取里面的元素,代码如下:
package com.rickie.bigdata;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import java.util.Map;
import java.util.Random;
public class RandomSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
private static String[] words = {"Rickie", "Hadoop", "MapReduce", "Storm", "Spark", "Spark Streaming", "Flink"};
@Override
public void open(Map<String, Object> map, TopologyContext topologyContext,
SpoutOutputCollector spoutOutputCollector) {
this.collector = spoutOutputCollector;
}
@Override
public void nextTuple() {
String word = words[new Random().nextInt(words.length)];
collector.emit(new Values(word));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("randomString"));
}
}
(2)然后新建一个类SenqueceBolt,继承于BaseBasicBolt类,并且重写方法execute和declareOutputFields。
这个类就是用于执行具体的作业,准确的说是execute方法用来执行相关的计算,这里只是简单的输出,代码如下:
package com.rickie.bigdata;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Tuple;
public class SequenceBolt extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
String word = (String) tuple.getValue(0);
String out = "Hello " + word +"!";
System.out.println(out);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
}
(3)最后建立一个类FirstStorm。
这个类是主类,在main方法中定义Topology,并且综合设置Spout和Bolt,从而调用其中的方法,这里流式计算时间设置为30s,代码如下:
package com.rickie.bigdata;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.utils.Utils;
public class FirstStorm
{
public static void main( String[] args ) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSpout());
builder.setBolt("bolt", new SequenceBolt()).shuffleGrouping("spout");
Config conf = new Config();
conf.setDebug(false);
String name = "firststorm";
if(args != null && args.length >0){
name = args[0];
conf.setNumWorkers(3);
try {
StormSubmitter.submitTopology(name, conf, builder.createTopology());
} catch (Exception e) {
e.printStackTrace();
}
} else {
try(LocalCluster cluster = new LocalCluster()) {
cluster.submitTopology("firststorm", conf, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("firststorm");
cluster.shutdown();
}
}
System.out.println( "Well done!" );
}
}
可以用本地模式运行,在IDEA中直接运行即可,方便开发调试。
在Console可以看到如下输出信息:
接下来我们将这个项目放到Storm服务器集群中运行。
storm jar 命令用于启动一个Topology。
(1)以本地模式(Local Mode)运行
storm local firststorm-1.0-SNAPSHOT.jar com.rickie.bigdata.FirstStorm
运行结果和之前IDEA本地模式运行输出类似。
(2)以分布式模式运行
storm jar firststorm-1.0-SNAPSHOT.jar com.rickie.bigdata.FirstStorm RickieStorm
可以查看worker日志,看到SequenceBolt 线程输出的信息。只有kill 这个topology,日志输出信息才会终止。
在Storm架构中,Topology代表的并不是确定的作业,而是持续的计算过程,在确定的业务逻辑处理框架下,输入数据源源不断地进入系统,经过流式处理后以较低的延迟产生输出。如果不主动结束这个Topology或者关闭Storm集群,那么数据处理的过程就会持续地进行下去。
另外需要注意的是:由于在分布式模式下运行,worker工作在独立的进程中,因此无法直接在storm jar命令行输出窗口,看到上述SequenceBolt组件的输出信息。
(3)storm list 查看正在运行的topologies和它们的状态
storm list
也可以通过访问http://192.168.56.103:8080/ storm server,查看并操作正在运行的 topology。
(4)kill正在运行的topology
storm kill [storm name]
运行“storm kill”这个命令,仅仅只是调用Nimbus的Thirft接口去kill掉相对应的Topology。
Nimbus接受到kill命令,会将”kill”事务应用到topology上,修改Topology的状态为”killed”以及将“remove”事件列入到未来几秒钟的计划中,即未来几秒后会触发remove时间。这里的kill实际上停止相关的Worker。
默认kill的等待时间是Topology消息的超时时间,但是可以通过storm kill命令中的-w标志对其进行重写,设置了以上参数之后,topology会在指定的等待时间停止运行。这样给了Topology一个机会在shutdown workers之后完成当前没有处理完成的任务;删除Topology以及清理zookeeper中的分配信息和静态信息;清理存储在本地的心跳dir和jar/configs。
现在,第一个Storm入门项目的开发和测试运行都完毕了,更复杂的流计算逻辑模式也基本相同,主要就是Maven项目中出现了更复杂的模块和调用,整个运行的流程其实都是差不多的。恭喜你,现在算是步入Storm流式计算的殿堂的大门了。
程链接:https://www.itwangzi.cn/4907.html
Storm 实现了低延迟,还做不到高吞吐,也不能在故障发生时准确地处理计算状态;Spark Streaming通过采用微批处理方法实现了高吞吐和容错性,但是牺牲了低延迟和实时处理能力,也不能使窗口与自然时间相匹配,并且表现力欠佳。而flink就是目前为止的最佳答案。
我们在选择一个新的技术框架的时候,首先考虑的是他的应用场景,再牛逼的框架没有应用场景也是一无是处,当然牛逼的框架大多都是基于某一个或者某一类应用场景而产生,而flink主要应用于以下三个场景:
Flink 主要应用场景有三类:
1.Event-driven Applications【事件驱动】
2.Data Analytics Applications【分析】
3.Data Pipeline Applications【管道式ETL】
上图包含两块:
Traditional transaction Application(传统事务应用)
Event-driven Applications(事件驱动应用)
Traditional transaction Application执行流程:
比如点击流Events可以通过Application写入Transaction DB(数据库),同时也可以通过Application从Transaction DB将数据读出,并进行处理,当处理结果达到一个预警值就会触发一个Action动作,这种方式一般为事后诸葛亮。
Event-driven Applications执行流程:
比如采集的数据Events可以不断的放入消息队列,Flink应用会不断ingest(消费)消息队列中的数据,Flink 应用内部维护着一段时间的数据(state),隔一段时间会将数据持久化存储(Persistent sstorage),防止Flink应用死掉。Flink应用每接受一条数据,就会处理一条数据,处理之后就会触发(trigger)一个动作(Action),同时也可以将处理结果写入外部消息队列中,其他Flink应用再消费。
典型的事件驱动类应用:
1.欺诈检测(Fraud detection)
2.异常检测(Anomaly detection)
3.基于规则的告警(Rule-based alerting)
4.业务流程监控(Business process monitoring)
5.Web应用程序(社交网络)
Data Analytics Applications包含Batch analytics(批处理分析)和Streaming analytics(流处理分析)。
Batch analytics可以理解为周期性查询:
比如Flink应用凌晨从Recorded Events中读取昨天的数据,然后做周期查询运算,最后将数据写入Database或者HDFS,或者直接将数据生成报表供公司上层领导决策使用。
Streaming analytics可以理解为连续性查询:
比如实时展示双十一天猫销售GMV,用户下单数据需要实时写入消息队列,Flink 应用源源不断读取数据做实时计算,然后不断的将数据更新至Database或者K-VStore,最后做大屏实时展示。
Data Pipeline Applications包含Periodic (周期性)ETL和Data Pipeline(管道)
Periodic ETL:比如每天凌晨周期性的启动一个Flink ETL Job,读取传统数据库中的数据,然后做ETL,最后写入数据库和文件系统。
Data Pipeline:比如启动一个Flink 实时应用,数据源(比如数据库、Kafka)中的数据不断的通过Flink Data Pipeline流入或者追加到数据仓库(数据库或者文件系统),或者Kafka消息队列。
阿里在Flink的应用主要包含四个模块:实时监控、实时报表、流数据分析和实时仓库。
实时监控:
用户行为预警、app crash 预警、服务器攻击预警
对用户行为或者相关事件进行实时监测和分析,基于风控规则进行预警
实时报表:
双11、双12等活动直播大屏
对外数据产品:生意参谋等
数据化运营
流数据分析:
实时计算相关指标反馈及时调整决策
内容投放、无线智能推送、实时个性化推荐等
实时仓库:
数据实时清洗、归并、结构化
数仓的补充和优化
欺诈检测
背景:
假设你是一个电商公司,经常搞运营活动,但收效甚微,经过细致排查,发现原来是羊毛党在薅平台的羊毛,把补给用户的补贴都薅走了,钱花了不少,效果却没达到。怎么办呢?
你可以做一个实时的异常检测系统,监控用户的高危行为,及时发现高危行为并采取措施,降低损失。
系统流程:
1.用户的行为经由app 上报或web日志记录下来,发送到一个消息队列里去;
2.然后流计算订阅消息队列,过滤出感兴趣的行为,比如:购买、领券、浏览等;
3.流计算把这个行为特征化;
4.流计算通过UDF调用外部一个风险模型,判断这次行为是否有问题(单次行为);
5.流计算里通过CEP功能,跨多条记录分析用户行为(比如用户先做了a,又做了b,又做了3次c),整体识别是否有风险;
6.综合风险模型和CEP的结果,产出预警信息。
《大数据和人工智能交流》头条号向广大初学者新增C 、Java 、Python 、Scala、javascript 等目前流行的计算机、大数据编程语言,希望大家以后关注本头条号更多的内容。
一、搭建zookeeper集群
注意:为了大家学习的方便,这里在一台机器上搭建zookeeper集群,在一个机器搭建集群和在多台机器搭建集群原理是相同的。搭建单节点(一台主机放3个服务【1个leader,2个flower】)zk集群
在/home建立zk1、zk2、zk3三个目录,3个目录搭建过程类似。
(一)搭建过程
【1】搭建节点1
(1)在zk1建立zkdata ,在zkdata下存放myid文件
[root@node1 zkdata]# more myid
1
(2) tar -zxvf zookeeper-3.4.5.tar.gz
[root@node1 zk1]# ls
zkdata zookeeper-3.4.5 zookeeper-3.4.5.tar.gz
(3)修改配置文件
[root@node1 zk1]# cd zookeeper-3.4.5
[root@node1 zookeeper-3.4.5]# cd conf
[root@node1 zookeeper-3.4.5]# mv zoo_sample.cfg zoo.cfg
[root@node1 conf]# vi zoo.cfg
注意:修改的配置信息
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
dataDir=/home/zk1/zkdata
server.1=node1:2888:3888
server.2=node1:2889:3889
server.3=node1:2890:3890
【2】搭建节点2
(1)在zk1建立zkdata ,在zkdata下存放myid文件
[root@node1 zkdata]# more myid
2
(2) tar -zxvf zookeeper-3.4.5.tar.gz
[root@node1 zk1]# ls
zkdata zookeeper-3.4.5 zookeeper-3.4.5.tar.gz
(3)修改配置文件
[root@node1 zk1]# cd zookeeper-3.4.5
[root@node1 zookeeper-3.4.5]# cd conf
[root@node1 zookeeper-3.4.5]# mv zoo_sample.cfg zoo.cfg
[root@node1 conf]# vi zoo.cfg
注意:修改的配置信息
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2182
dataDir=/home/zk2/zkdata
server.1=node1:2888:3888
server.2=node1:2889:3889
server.3=node1:2890:3890
【3】搭建节点3
(1)在zk1建立zkdata ,在zkdata下存放myid文件
[root@node1 zkdata]# more myid
3
(2) tar -zxvf zookeeper-3.4.5.tar.gz
[root@node1 zk1]# ls
zkdata zookeeper-3.4.5 zookeeper-3.4.5.tar.gz
(3)修改配置文件
[root@node1 zk1]# cd zookeeper-3.4.5
[root@node1 zookeeper-3.4.5]# cd conf
[root@node1 zookeeper-3.4.5]# mv zoo_sample.cfg zoo.cfg
[root@node1 conf]# vi zoo.cfg
注意:修改的配置信息
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2183
dataDir=/home/zk3/zkdata
server.1=node1:2888:3888
server.2=node1:2889:3889
server.3=node1:2890:3890
(二)、启动集群
1、启动各个
[root@node1 bin]# ./zkServer.sh start
JMX enabled by default
Using config: /home/zk1/zookeeper-3.4.5/bin/../conf/zoo.cfg
Starting zookeeper ... ./zkServer.sh: line 103: [: /tmp/zookeeper: binary operator expected
STARTED
[root@node1 bin]#
2、在zk1、zk2、zk3各个目录查看节点状态
[root@node1 bin]# ./zkServer.sh status
JMX enabled by default
Using config: /home/zk1/zookeeper-3.4.5/bin/../conf/zoo.cfg
Mode: follower
二、搭建storm集群
(一)、准备工作
1、zk必须正常,3个节点联网通信正常。我这里是3个节点:
192.168.100.1是主节点
192.168.100.2是从节点
192.168.100.3是从节点
将包上传-->解压 -->改名-->这里改为storm
[root@node1 storm001]# ls
apache-storm-0.9.3.tar.gz storm
2、修改配置文件
[root@node1 storm001]# cd storm
[root@node1 storm]# ls
bin conf examples lib logback NOTICE README.markdown SECURITY.md
CHANGELOG.md DISCLAIMER external LICENSE logs public RELEASE storm.jar
[root@node1 storm]# cd conf
[root@node1 conf]# ls
storm_env.ini storm.yaml
[root@node1 conf]# vi storm.yaml
storm.zookeeper.servers:
- "192.168.100.1"
- "192.168.100.1"
- "192.168.100.1"
nimbus.host: "192.168.100.1"
storm.local.dir: "/usr/local/storm/tmp"
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
解释:
(1)storm.local.dir:存储目录
(2)nimbus.host: "192.168.100.1"代表选择主机
(3)supervisor.slots.ports:对于每个supervisor机器,通过这项配置可以决定运行多少个worker进程在
这台机器上,每个worker使用一个单独的port来接受消息。supervisor并不会立即启动这个4个worker进程,
当接收到分配的任务的时候会启动,具体启动多少个worker根据我们Topology在这个supervisor需要几个worker
来确定
3、把storm目录拷贝到其它2个节点
[root@node1 storm001]# ls
apache-storm-0.9.3.tar.gz storm
scp -r storm node2:/home/storm001
scp -r storm node3:/home/storm001
(二)、启动集群
1、启动主节点nimbus
[root@node1 bin]# ls
storm storm.cmd storm-config.cmd
[root@node1 bin]# ./storm nimbus
可以使用linux命令将其放到后台./storm nimbus >/dev/null 2>&1 &
在主节点通过web前端访问的进程
./storm ui
./storm ui >/dev/null 2>&1 &
2、启动从节点node2
./storm supervisor
./storm supervisor >/dev/null 2>&1 &
从节点需要启动日志进程
./storm logviewer
3、启动从节点node3
./storm supervisor
./storm supervisor >/dev/null 2>&1 &
从节点需要启动日志进程
./storm logviewer
4、进入storm的ui界面:
http://192.168.100.1:8080/index.html
5、将java开发的storm应用打包上传服务器
bin/storm jar storm.jar com.test.A001
《大数据和人工智能交流》的宗旨
1、将大数据和人工智能的专业数学:概率数理统计、线性代数、决策论、优化论、博弈论等数学模型变得通俗易懂。
2、将大数据和人工智能的专业涉及到的数据结构和算法:分类、聚类 、回归算法、概率等算法变得通俗易懂。
3、最新的高科技动态:数据采集方面的智能传感器技术;医疗大数据智能决策分析;物联网智慧城市等等。
根据初学者需要会有C语言、Java语言、Python语言、Scala函数式等目前主流计算机语言。
根据读者的需要有和人工智能相关的计算机科学与技术、电子技术、芯片技术等基础学科通俗易懂的文章。
*请认真填写需求信息,我们会在24小时内与您取得联系。