整合营销服务商

电脑端+手机端+微信端=数据同步管理

免费咨询热线:

0069-如何使用Java连接Kerberos的Ka

0069-如何使用Java连接Kerberos的Kafka

馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。

1.文档编写目的


Kafka从0.8版本以后出了新的API接口,用于异步方式发送消息,性能优于旧的API,本篇文章主要使用新的API接口进行测试。继上一篇文章

如何通过Cloudera Manager为Kafka启用Kerberos及使用

,本篇文章主要讲述如何使用Java连接Kerberos的Kafka集群生产和消费消息。

  • 内容概述

1.环境准备

2.创建Java工程

3.编写生产消息代码

4.编写消费消息代码

5.测试

  • 测试环境

1.RedHat7.2

2.CM和CDH版本为5.11.2

3.Kafka2.2.0-0.10.2

  • 前置条件

1.Intellij已安装且正常运行

2.Maven环境正常

2.环境准备


1.创建topic,test3有3个replication,3个partition

[ec2-user@ip-172-31-22-86~]$ kafka-topics --create --zookeeper ip-172-31-22-86.ap-southeast-1.compute.internal:2181 --replication-factor 3 --partitions 3 --topic test3

2.krb5.conf配置(直接使用CDH集群的Kerberos配置)

# Configuration snippets may beplaced in this directory as well

includedir /etc/krb5.conf.d/

[logging]

default=FILE:/var/log/krb5libs.log

kdc=FILE:/var/log/krb5kdc.log

admin_server=FILE:/var/log/kadmind.log

[libdefaults]

dns_lookup_realm=false

ticket_lifetime=24h

renew_lifetime=7d

forwardable=true

rdns=false

default_realm=CLOUDERA.COM

#default_ccache_name=KEYRING:persistent:%{uid}

[realms]

CLOUDERA.COM={

kdc=ip-172-31-22-86.ap-southeast-1.compute.internal

admin_server=ip-172-31-22-86.ap-southeast-1.compute.internal

}

[domain_realm]

.ip-172-31-22-86.ap-southeast-1.compute.internal=CLOUDERA.COM

ip-172-31-22-86.ap-southeast-1.compute.internal=CLOUDERA.COM

3.Kerberos的keytab文件

使用kadmin为Kerberos账号生成keytab,fayson.keytab文件生成在当前目录下。

[ec2-user@ip-172-31-22-86~]$ sudo kadmin.local

Authenticating as principal hdfs/admin@CLOUDERA.COM with password.

kadmin.local: xst -norandkey -k fayson.keytab fayson@CLOUDERA.COM

...

kadmin.local: exit

[ec2-user@ip-172-31-22-86~]$

4.jaas-cache.conf配置文件

KafkaClient{

com.sun.security.auth.module.Krb5LoginModule required

useKeyTab=true

keyTab="/Volumes/Transcend/keytab/fayson.keytab"

principal="fayson@CLOUDERA.COM";

};

5.在当前开发环境下配置集群的主机信息到hosts文件

在/etc/hosts文件中添加

提示:Fayson使用的AWS环境,所以使用公网IP和hostname对应。如果你的开发环境可以直连Hadoop集群,可以直接配置Hadoop内网IP和hostname对应即可。

3.创建Java工程


1.使用Intellij创建Java Maven工程

2.在pom.xml配置文件中增加Kafka API的Maven依赖

<dependency>
 <groupId>org.apache.kafka</groupId>
 <artifactId>kafka-clients</artifactId>
 <version>0.10.2.0</version>
</dependency>

4.编写生产消息代码


package com.cloudera;

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.Producer;

import org.apache.kafka.clients.producer.ProducerConfig;

import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**

* Created by fayson on 2017/10/24.

*/

public class MyProducer {

public static String TOPIC_NAME ="test3";

public static void main(String[] args){

System.setProperty("java.security.krb5.conf", "/Volumes/Transcend/keytab/krb5.conf");

System.setProperty("java.security.auth.login.config", "/Volumes/Transcend/keytab/jaas-cache.conf");

System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");

// System.setProperty("sun.security.krb5.debug","true");

Properties props=new Properties();

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "ip-172-31-21-45.ap-southeast-1.compute.internal:9092,ip-172-31-26-102.ap-southeast-1.compute.internal:9020,ip-172-31-26-80.ap-southeast-1.compute.internal:9020");

props.put(ProducerConfig.ACKS_CONFIG, "all");

props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

props.put("security.protocol", "SASL_PLAINTEXT");

props.put("sasl.kerberos.service.name", "kafka");

Producer<String,String> producer=new KafkaProducer<String,String>(props);

for (int i=0; i < 10; i++) {

String key="key-"+ i;

String message="Message-"+ i;

ProducerRecord record=new ProducerRecord<String, String>(TOPIC_NAME, key, message);

producer.send(record);

System.out.println(key + "----"+ message);

}

producer.close();

}

}

5.编写消费消息代码


package com.cloudera;

import org.apache.kafka.clients.consumer.*;

import org.apache.kafka.common.TopicPartition;

import java.util.Arrays;

import java.util.Properties;

/**

* Created by fayson on 2017/10/24.

*/

public class MyConsumer {

private static String TOPIC_NAME ="test3";

public static void main(String[] args){

System.setProperty("java.security.krb5.conf", "/Volumes/Transcend/keytab/krb5.conf");

System.setProperty("java.security.auth.login.config", "/Volumes/Transcend/keytab/jaas-cache.conf");

System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");

Properties props=new Properties();

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "ip-172-31-21-45.ap-southeast-1.compute.internal:9092,ip-172-31-26-102.ap-southeast-1.compute.internal:9020,ip-172-31-26-80.ap-southeast-1.compute.internal:9020");

props.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer");

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");

props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");

props.put("security.protocol", "SASL_PLAINTEXT");

props.put("sasl.kerberos.service.name", "kafka");

KafkaConsumer<String,String> consumer=new KafkaConsumer<String,String>(props);

TopicPartition partition0=new TopicPartition(TOPIC_NAME, 0);

TopicPartition partition1=new TopicPartition(TOPIC_NAME, 1);

TopicPartition partition2=new TopicPartition(TOPIC_NAME, 2);

consumer.assign(Arrays.asList(partition0,partition1, partition2));

ConsumerRecords<String,String> records=null;

while (true){

try {

Thread.sleep(10000l);

System.out.println();

records=consumer.poll(Long.MAX_VALUE);

for (ConsumerRecord<String, String> record : records) {

System.out.println("Receivedmessage: (" + record.key() + "," + record.value() + ") at offset " + record.offset());

}

} catch (InterruptedException e){

e.printStackTrace();

}

}

}

}

6.代码测试


1.执行消费程序,消费topic为test3的所有partition消息

启动成功,等待消费test3的消息

2.执行生产消息程序,向test3的topic生产消息

向test3的topic发送的消息

3.查看消费程序读取到的消息

7.总结


在开发环境下通过Java代码直接连接到已启用Kerberos的Kafka集群时,则需要将krb5.conf和jaas.conf配置加载到程序运行环境中。至于使用Kerberos密码的方式Fayson也不会。

测试使用的topic有3个partiton,如果没有将所有的broker列表配置到bootstrap.servers中,会导致部分消息丢失。

参考文档:

http://kafka.apache.org/0102/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html

http://kafka.apache.org/documentation/#api

为天地立心,为生民立命,为往圣继绝学,为万世开太平。

温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。


您可能还想看

安装


CENTOS6.5安装CDH5.12.1(一)

CENTOS6.5安装CDH5.12.1(二)

CENTOS7.2安装CDH5.10和Kudu1.2(一)

CENTOS7.2安装CDH5.10和Kudu1.2(二)

如何在CDH中安装Kudu&Spark2&Kafka

如何升级Cloudera Manager和CDH

如何卸载CDH(附一键卸载github源码)

如何迁移Cloudera Manager节点

如何在Windows Server2008搭建DNS服务并配置泛域名解析

安全


如何在CDH集群启用Kerberos

如何在Hue中使用Sentry

如何在CDH启用Kerberos的情况下安装及使用Sentry(一)

如何在CDH启用Kerberos的情况下安装及使用Sentry(二)

如何在CDH未启用认证的情况下安装及使用Sentry

如何使用Sentry管理Hive外部表权限

如何使用Sentry管理Hive外部表(补充)

如何在Kerberos与非Kerberos的CDH集群BDR不可用时复制数据

Windows Kerberos客户端配置并访问CDH

数据科学


如何在CDSW中使用R绘制直方图

如何使用Python Impyla客户端连接Hive和Impala

如何在CDH集群安装Anaconda&搭建Python私有源

如何使用CDSW在CDH中分布式运行所有R代码

如何使用CDSW在CDH集群通过sparklyr提交R的Spark作业

如何使用R连接Hive与Impala

如何在Redhat中安装R的包及搭建R的私有源

如何在Redhat中配置R环境

什么是sparklyr

其他


CDH网络要求(Lenovo参考架构)

大数据售前的中年危机

如何实现CDH元数据库MySQL的主备

如何在CDH中使用HPLSQL实现存储过程

如何在Hive&Impala中使用UDF

Hive多分隔符支持示例


推荐关注Hadoop实操,第一时间,分享更多Hadoop干货,欢迎转发和分享。

原创文章,欢迎转载,转载请注明:转载自微信公众号Hadoop实操

ava 中的注释详解

不管是那种编程语言, 代码的注释都是必备的语法功能, 并且一个好的程序的指标之一,就是能有一个好的注释。 那 Java 中的注释是怎么定义的呢? 我们来说说。

Java 中 3 种注释类型

  • 单行注释
  • 多行注释
  • 文档注释

单行注释

单行注释: Java 中最简单的注释方法, 使用两个反斜杠 // 就可以了。注释的内容从 // 开始。

举个例子:

单行注释不仅可以注释备注信息, 并且也可以注释代码内容。

  // 返回一个字符串
  //return "苗子说全栈 ";
  return "";

在学习初期可以使用 System.out 输出字符串进行一些调试, 初学可以考虑这个, 后期学习日志框架之后, 推荐使用日志的方式输出调试信息。这里只是演示单行注释的用法。

单行注释的推荐写法就是写在代码的上一行中。

多行注释

多行注释:Java 中使用以 /* 开始, 以 */ 结束的注释方式。

举个例子:

需要注意:

对于多行注释开始和结束中间不要再有结束符。为了好看会在注释内容首字母写入 * 举例说明:

//正确的
/*
 * 多行注释
 */

//错误的实例
/*
 *这个是多行注释
/*
 *多行注释内容
 */
*/  这里就错误了。 

匹配规则就是 /* 与之对应的最近的 */ 结束符。

除了单行注释和多行注释,还有一种是文档注释。

文档注释

编写任何的代码,都少不了编写程序的文档, 怎么高效的编写文档内容,并且文档的内容能够随着版本的更新进行更新。Java 语言中有一种注释规则, 就是文档注释类型。对于文档注释的内容,是可以通过 Java 内置命令行工具 javadoc 生成对应的文档内容的。并且Java 的 API 也是基于这种机制生成的文档。

使用文档注释的方式 以 /** 开头 */ 结尾。包含在这之内的内容就是文档注释的内容。 而且针对文档注释有一系列的标记。该注释一般会放在 类、方法、变量、常量上。

演示案例:

这里就使用了文档的注释方法, 并且使用了 @author 作者标记, @since 该类从哪个版本开始实现的。

在方法上我们使用了 @param 参数标记, 主要是给参数加上一个说明。

标记分类 “类标记” 和 “方法标记”常用的有以下列表,只是通用这样并不是非要这样写,这样写可以减少再次沟通的成本。

类标记

  • @author 代表的是作者是谁,如果有多个,可以写多个标记
  • @since 从哪个版本开始支持该类的功能
  • @version 当前类的版本信息
  • @see 该标记可用在类与方法上,表示参考的类或方法。

方法标记

  • @param 参数的描述信息
  • @return 当前方法返回的描述信息
  • @exception 异常的描述信息,于描述方法 throws 对应的异常
  • @throws 描述该方法可能抛出的异常信息,和 @exception 联合使用
  • @see 该标记可用在类与方法上,表示参考的类或方法。

当然也有通用的用法, 可以标记上面说的四种类型。

  • @code 内容使用代码格式。不会转移显示。
  • @link 用于快速的定位到定义的相关类的代码上。使用格式: @link 包名.类名#方法名(参数类型)

把上面的源码生成以下文档, 我们看一下大概的内容。

javadoc -encoding utf-8 -d docs Hello.java

进入到 docs 打开index.html。效果如下:

这样你开发的这个文件的开发文档就编写完成。非常 easy, 非常 nice ! 我们来看看 Java 17 的 Object.java 的源码。

看一下 Java 17 的源码注释

找到安装路径, 然后根目录中找到 lib/src.zip 这个压缩包就是 JDK 17 的源码。

解压这个 src.zip 文件。 因为现在 JDK 都是基于模块开发。 找到基础模块 java.base 并进入到 java.lang 目录, 并找到 Object.java 类。这个是类中之王。 除了基础的类所有的类都默认实现该类。

不需要理解这里面的含义, 只需要理解注释的使用方式。 package 是包名, 后续会针对这个单独开一章, 你可以认为这是一个文件的层次划分。主要是为了解决类名重复的问题。package 上的内容是针对 Java 的开源协议的一个说明。 以后你如果想开源软件, 可以参考这种方式编写你的开源协议内容。

进入到 Java 17 的文档 API 官方页面。

地址为:https://docs.oracle.com/en/java/javase/17/docs/api/index.html

同样的方式进入到 java.base 的模块中。 Java.base 是 Java SE 的基础 API。


进入到 Package 的 java.lang 中。并翻阅到 All Classes and Interfaces 中, 找到 Object。

或者直接进入该地址: https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/lang/Object.html

可以看到如下界面:

可以看到从版面UI, 还是风格都和 javadoc 生成的一模一样, 从这里也能看出来 Java 的 api 文档,就是使用这种方式进行生成的。 从源码中直接获得注释, 这样你的注释才显得可靠有据。

从该类的描述中我们也能看到 说是 Object 的这个类是所有的类的根类。 也就是后面说所有的对象类都有一个超类。也都继承了该类的相关方法和实现。

扯得有点远了。 先知道有 3 种注释方法, 知道有 Java 的 API 存在, 这篇内容对于初学的你来说。 也就足够了。

后续更多内容。 关注我。

Kafka 最佳实践,涉及

  1. 典型使用场景
  2. Kafka 使用的最佳实践

Kafka 典型使用场景

Data Streaming

Kafka 能够对接到 Spark、Flink、Flume 等多个主流的流数据处理技术。利用 Kafka 高吞吐量的特点,客户可以通过 Kafka 建立传输通道,把应用侧的海量数据传输到流数据处理引擎中,数据经过处理分析后,可支持后端大数据分析,AI 模型训练等多种业务。



日志平台

Kafka 最常用也是我最熟悉的场景是日志分析系统。典型的实现方式是在客户端部署 日志收集器(如 Fluentd、Filebeat 或者 Logstash 等)进行日志采集,并将数据发送到 Kafka,之后通过后端的 ES 等进行数据运算,再搭建一个展示层如 Kibana 进行统计分析数据的展示。

物联网

随着有价值的用例的出现,物联网(IoT)正得到越来越多的关注。然而,一个关键的挑战是整合设备和机器来实时和大规模地处理数据。Apache Kafka?及其周边的生态系统,包括Kafka Connect、Kafka Streams,已经成为集成和处理这类数据集的首选技术。

Kafka 已经被用于许多物联网部署,包括消费者物联网和工业物联网(IIoT)。大多数场景都需要可靠、可伸缩和安全的端到端集成,从而支持实时的双向通信和数据处理。一些具体的用例是:

  • 联网的汽车基础设施
  • 智能城市和智能家居
  • 智能零售和客户360
  • 智能制造

具体的实现架构如下图所示:

使用的最佳实践

可靠性最佳实践

基于生产者和消费者配置满足不同的可靠性

生产者 At Least Once

生产者需要设置 request.required.acks=ALL,服务端主节点写成功且备节点同步成功才 返回 Response。

消费者 At Least Once

消费者接收消息后,应先进行对应业务操作,随后再进行 commit 标识消息已被处理,通过这种处理方式可以确保一条消息在业务处理失败时,能够重新被消费。注意消费者的 enable.auto.commit 参数需要设置为 False,确保 commit 动作手工控制。

生产者 At Most Once

保障一条消息最多投放一次,需要设置 request.required.acks=0,同时设置 retries=0。这里的原理是生产者遇到任何异常都不重试,并且不考虑 broker 是否响应写入成功。

消费者 At Most Once

保障一条消息最多被消费一次,需要消费者在收到消息后先进行 commit 标识消息已被处理,随后再进行对应业务操作。这里的原理是消费者不需要管实际业务的处理结果,拿到消息以后立刻 commit 告诉 broker 消息处理成功。 注意消费者的 enable.auto.commit 参数需要设置为 False,确保 commit 动作手工控制。

生产者 Exactly-once

Kafka 0.11 版本起新增了幂等消息的语义,通过设置 enable.idempotence=true 参数,可以实现单个分区的消息幂等。

如果 Topic 涉及多个分区或者需要多条消息封装成一个事务保障幂等,则需要增加 Transaction 控制,样例如下:

// 开启幂等控制参数
producerProps.put("enbale.idempotence", "true");
// 初始化事务
producer.initTransactions();
// 设置事务 ID
producerProps.put("transactional.id", "id-001");

try{
  // 开始事务,并在事务中发送 2 条消息
  producer.beginTranscation();
  producer.send(record0);
  producer.send(record1);
  // 提交事务
  producer.commitTranscation();
} catch( Exception e ) {
  producer.abortTransaction();
  producer.close();
}

消费者 Exactly-once

需要设置 isolation.level=read_committed,并设置 enable.auto.commit=false,确保消费者只消费生产者已经提交事务的消息,消费者业务需要确保事务性避免重复处理消息,比如说把消息持久化到数据库,然后向服务端提交 commit。

根据业务场景选用合适的语义

使用 At Least Once 语义支撑可接受少量消息重复的业务

At Least Once 是最常用的语义,可确保消息只多不少的发送和消费,性能和可靠性上有较好的平衡,可以作为默认选用的模式。业务侧也可以通过在消息体加入唯一的业务主键自行保障幂等性,在消费侧确保同一个业务主键的消息只被处理一次。

使用 Exactly Once 语义支撑需要强幂等性业务

Exactly Once 语义一般用绝对不容许重复的关键业务,典型案例是订单和支付相关场景

使用 At Most Once 语义支撑非关键业务

At Most Once 语义一般用在非关键业务,业务对于消息丢失并不敏感,只需要尽量确保消息成功生产消费即可。典型使用 At Most Once 语义的场景是消息通知,出现少量遗漏消息影响不大,相比之下重复发送通知会造成较坏的用户体验。

性能调优最佳实践

合理设置 Topic 的 partition 数量

以下汇总了通过 partition 调优性能建议考虑的维度,建议您根据理论分析配合压力测试对系统整体性能进行调优。

考虑维度

说明

吞吐量

增加 partition 的数量可以消息消费的并发度,当系统瓶颈在于消费端,而消费端又可以水平扩展的时候,增加 partition 可以增加系统吞吐量。 在 Kafka 内部每个 Topic 下的每个 partition 都是一个独立的消息处理通道 , 一个 partition 内的消息只能被同时被一个 consumer group 消费,当 consumer group 数量多于partition的数量时,多余的 consumer group 会出现空闲。

消息顺序

Kafka 可以保障一个 partition 内的消息顺序性,partition 之间的消息顺序无法保证,增加 partition 的时候需要考虑消息顺序对业务的影响。

实例 Partition 上限

Partition 增加会消耗底层更多的内存,IO 和文件句柄等资源。在规划 Topic 的 partition 数量时需要考虑 Kafka 集群能支持的 partition 上限。

生产者,消费者与 partition 的关系说明。

合理设置 batch 大小

如果 Topic 设置了多个分区,生产者发送消息时需要先确认往哪个分区发送。在给同一个分区发送多条消息时,Producer 客户端会将相关消息打包成一个 Batch,批量发送到服务端。一般情况下,小 Batch 会导致 Producer 客户端产生大量请求,造成请求队列在客户端和服务端的排队,从而整体推高了消息发送和消费延迟。

一个合适的 batch 大小,可以减少发送消息时客户端向服务端发起的请求次数,在整体上提高消息发送的吞吐和延迟。

Batch 参数说明如下:

参数

说明

batch.size

发往每个分区(Partition)的消息缓存量(消息内容的字节数之和,不是条数)。达到设置的数值时,就会触发一次网络请求,然后 Producer 客户端把消息批量发往服务器。

linger.ms

每条消息在缓存中的最长时间。若超过这个时间,Producer 客户端就会忽略 batch.size 的限制,立即把消息发往服务器。

buffer.memory

所有缓存消息的总体大小超过这个数值后,就会触发把消息发往服务器,此时会忽略 batch.size 和 linger.ms 的限制。buffer.memory 的默认数值是 32MB,对于单个 Producer 而言,可以保证足够的性能。

Batch 相关参数值的选择并没有通用的方法,建议针对性能敏感的业务场景进行压测调优。

使用粘性分区处理大批量发送

Kafka 生产者与服务端发送消息时有批量发送的机制,只有发送到相同 Partition 的消息才会被放到同一个 Batch 中。在大批量发送场景,如果消息散落到多个 Partition 当中就可能会形成多个小 Batch,导致批量发送机制失效而降低性能。

Kafka 默认选择分区的策略如下

场景

策略

消息指定 Key

对消息的 Key 进行哈希,然后根据哈希结果选择分区,保证相同 Key 的消息会发送到同一个分区。

消息没有指定 Key

默认策略是循环使用主题的所有分区,将消息以轮询的方式发送到每一个分区上。

从默认机制可见 partition 的选择随机性很强,因此在大批量传输的场景下,推荐设置 partitioner.class参数,指定自定义的分区选择算法实现 粘性分区

其中一种实现方法是在固定的时间段内使用同一个 partition,过一段时间切换到下一个分区,避免数据散落到多个不同 partition。

通用最佳实践

Kafka 对消息顺序的保障

Kafka 会在同一个 partition 内保障消息顺序,如果 Topic 存在多个 partition 则无法确保全局顺序。如果需要保障全局顺序,则需要控制 partition 数量为 1 个。

对消息设置唯一的 Key

消息队列 Kafka 的消息有 Key(消息标识)和 Value(消息内容)两个字段。为了便于追踪,建议为消息设置一个唯一的 Key。之后可以通过 Key 追踪某消息,打印发送日志和消费日志,了解该消息的生产和消费情况。

合理设置队列的重试策略

分布式环境下,由于网络等原因,消息偶尔会出现发送失败的情况,其原因可能是消息已经发送成功但是 ACK 机制失败或者消息确实没有发送成功。默认的参数能满足大部分场景,但可以根据业务需求,按需设置以下重试参数:

参数

说明

retries

重试次数,默认值为 3,但对于数据丢失零容忍的应用而言,请考虑设置为 Integer.MAX_VALUE(有效且最大)。

retry.backoff.ms

重试间隔,建议设置为 1000。

? 注意:

如果希望实现 At Most Once 语义,重试需要关闭。

接入最佳实践

Spark Streaming 接入 Kafka

Spark Streaming 是 Spark Core 的一个扩展,用于高吞吐且容错地处理持续性的数据,目前支持的外部输入有 Kafka、Flume、HDFS/S3、Kinesis、Twitter 和 TCP socket。

Spark Streaming 将连续数据抽象成 DStream(Discretized Stream),而 DStream 由一系列连续的 RDD(弹性分布式数据集)组成,每个 RDD 是一定时间间隔内产生的数据。使用函数对 DStream 进行处理其实即为对这些 RDD 进行处理。

使用 Spark Streaming 作为 Kafka 的数据输入时,可支持 Kafka 稳定版本与实验版本:

Kafka Version

spark-streaming-kafka-0.8

spark-streaming-kafka-0.10

Broker Version

0.8.2.1 or higher

0.10.0 or higher

Api Maturity

Deprecated

Stable

Language Support

Scala、Java、Python

Scala、Java

Receiver DStream

Yes

No

Direct DStream

Yes

Yes

SSL / TLS Support

No

Yes

Offset Commit Api

No

Yes

Dynamic Topic Subscription

No

Yes

本次实践使用 0.10.2.1 版本的 Kafka 依赖。

操作步骤

步骤1:创建 Kafka 集群及 Topic

创建 Kafka 集群的步骤略,再创建一个名为 test 的 Topic。

步骤2:准备服务器环境

Centos6.8 系统

package

version

sbt

0.13.16

hadoop

2.7.3

spark

2.1.0

protobuf

2.5.0

ssh

CentOS 默认安装

Java

1.8

具体安装步骤略,包括以下步骤:

  1. 安装 sbt
  2. 安装 protobuf
  3. 安装 Hadoop
  4. 安装 Spark

步骤3:对接 Kafka

向 Kafka 中生产消息

这里使用 0.10.2.1 版本的 Kafka 依赖。

  1. build.sbt 添加依赖:
name :="Producer Example"
version :="1.0"
scalaVersion :="2.11.8"
libraryDependencies +="org.apache.kafka" % "kafka-clients" % "0.10.2.1"
  1. 配置 producer_example.scala
import java.util.Properties
import org.apache.kafka.clients.producer._
object ProducerExample extends App {
    val  props=new Properties()
    props.put("bootstrap.servers", "172.0.0.1:9092") //实例信息中的内网 IP 与端口

    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

    val producer=new KafkaProducer[String, String](props)
    val TOPIC="test"  //指定要生产的 Topic
    for(i<- 1 to 50){
        val record=new ProducerRecord(TOPIC, "key", s"hello $i") //生产 key 是"key",value 是 hello i 的消息
        producer.send(record)
    }
    val record=new ProducerRecord(TOPIC, "key", "the end "+new java.util.Date)
    producer.send(record)
    producer.close() //最后要断开
}

更多有关 ProducerRecord 的用法请参考 ProducerRecord 文档。

从 Kafka 消费消息

####### DirectStream

  1. build.sbt 添加依赖:
name :="Consumer Example"
version :="1.0"
scalaVersion :="2.11.8"
libraryDependencies +="org.apache.spark" %% "spark-core" % "2.1.0"
libraryDependencies +="org.apache.spark" %% "spark-streaming" % "2.1.0"
libraryDependencies +="org.apache.spark" %% "spark-streaming-kafka-0-10" % "2.1.0"
  1. 配置 DirectStream_example.scala
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.TopicPartition
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.OffsetRange
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import collection.JavaConversions._
import Array._
object Kafka {
    def main(args: Array[String]) {
        val kafkaParams=Map[String, Object](
            "bootstrap.servers" -> "172.0.0.1:9092",
            "key.deserializer" -> classOf[StringDeserializer],
            "value.deserializer" -> classOf[StringDeserializer],
            "group.id" -> "spark_stream_test1",
            "auto.offset.reset" -> "earliest",
            "enable.auto.commit" -> "false"
        )

        val sparkConf=new SparkConf()
        sparkConf.setMaster("local")
        sparkConf.setAppName("Kafka")
        val ssc=new StreamingContext(sparkConf, Seconds(5))
        val topics=Array("spark_test")

        val offsets : Map[TopicPartition, Long]=Map()

        for (i <- 0 until 3){
            val tp=new TopicPartition("spark_test", i)
            offsets.updated(tp , 0L)
        }
        val stream=KafkaUtils.createDirectStream[String, String](
            ssc,
            PreferConsistent,
            Subscribe[String, String](topics, kafkaParams)
        )
        println("directStream")
        stream.foreachRDD{ rdd=>
	        //输出获得的消息
            rdd.foreach{iter=>
                val i=iter.value
                println(s"${i}")
            }
            //获得offset
            val offsetRanges=rdd.asInstanceOf[HasOffsetRanges].offsetRanges
            rdd.foreachPartition { iter=>
                val o: OffsetRange=offsetRanges(TaskContext.get.partitionId)
                println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
            }
        }

        // Start the computation
        ssc.start()
        ssc.awaitTermination()
    }
}

####### RDD

  1. 配置build.sbt(配置同上,单击查看)。
  2. 配置RDD_example
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.OffsetRange
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import collection.JavaConversions._
import Array._
object Kafka {
    def main(args: Array[String]) {
        val kafkaParams=Map[String, Object](
            "bootstrap.servers" -> "172.0.0.1:9092",
            "key.deserializer" -> classOf[StringDeserializer],
            "value.deserializer" -> classOf[StringDeserializer],
            "group.id" -> "spark_stream",
            "auto.offset.reset" -> "earliest",
            "enable.auto.commit" -> (false: java.lang.Boolean)
        )
        val sc=new SparkContext("local", "Kafka", new SparkConf())
        val java_kafkaParams : java.util.Map[String, Object]=kafkaParams
        //按顺序向 parition 拉取相应 offset 范围的消息,如果拉取不到则阻塞直到超过等待时间或者新生产消息达到拉取的数量
        val offsetRanges=Array[OffsetRange](
            OffsetRange("spark_test", 0, 0, 5),
            OffsetRange("spark_test", 1, 0, 5),
            OffsetRange("spark_test", 2, 0, 5)
        )
        val range=KafkaUtils.createRDD[String, String](
            sc,
            java_kafkaParams,
            offsetRanges,
            PreferConsistent
        )
        range.foreach(rdd=>println(rdd.value))
        sc.stop()
    }
}

更多 kafkaParams 用法参考 kafkaParams 文档。

Flume接入 Kafka

Apache Flume 是一个分布式、可靠、高可用的日志收集系统,支持各种各样的数据来源(如 HTTP、Log 文件、JMS、监听端口数据等),能将这些数据源的海量日志数据进行高效收集、聚合、移动,最后存储到指定存储系统中(如 Kafka、分布式文件系统、Solr 搜索服务器等)。

Flume 基本结构如下:

Flume 以 agent 为最小的独立运行单位。一个 agent 就是一个 JVM,单个 agent 由 Source、Sink 和 Channel 三大组件构成。

Flume 与 Kafka

把数据存储到 HDFS 或者 HBase 等下游存储模块或者计算模块时需要考虑各种复杂的场景,例如并发写入的量以及系统承载压力、网络延迟等问题。Flume 作为灵活的分布式系统具有多种接口,同时提供可定制化的管道。
在生产处理环节中,当生产与处理速度不一致时,Kafka 可以充当缓存角色。Kafka 拥有 partition 结构以及采用 append 追加数据,使 Kafka 具有优秀的吞吐能力;同时其拥有 replication 结构,使 Kafka 具有很高的容错性。
所以将 Flume 和 Kafka 结合起来,可以满足生产环境中绝大多数要求。

准备工作

  • 下载 Apache Flume (1.6.0以上版本兼容 Kafka)
  • 下载 Kafka工具包 (0.9.x以上版本,0.8已经不支持)
  • 确认 Kafka 的 Source、 Sink 组件已经在 Flume 中。

接入方式

Kafka 可作为 Source 或者 Sink 端对消息进行导入或者导出。

Kafka Source

配置 kafka 作为消息来源,即将自己作为消费者,从 Kafka 中拉取数据传入到指定 Sink 中。主要配置选项如下:

配置项

说明

channels

自己配置的 Channel

type

必须为:org.apache.flume.source.kafka.KafkaSource

kafka.bootstrap.servers

Kafka Broker 的服务器地址

kafka.consumer.group.id

作为 Kafka 消费端的 Group ID

kafka.topics

Kafka 中数据来源 Topic

batchSize

每次写入 Channel 的大小

batchDurationMillis

每次写入最大间隔时间

示例:

tier1.sources.source1.type=org.apache.flume.source.kafka.KafkaSource 
tier1.sources.source1.channels=channel1
tier1.sources.source1.batchSize=5000
tier1.sources.source1.batchDurationMillis=2000
tier1.sources.source1.kafka.bootstrap.servers=localhost:9092
tier1.sources.source1.kafka.topics=test1, test2
tier1.sources.source1.kafka.consumer.group.id=custom.g.id

更多内容请参考 Apache Flume 官网。

Kafka Sink

配置 Kafka 作为内容接收方,即将自己作为生产者,推到 Kafka Server 中等待后续操作。主要配置选项如下:

配置项

说明

channel

自己配置的 Channel

type

必须为:org.apache.flume.sink.kafka.KafkaSink

kafka.bootstrap.servers

Kafka Broker 的服务器

kafka.topics

Kafka 中数据来源 Topic

kafka.flumeBatchSize

每次写入的 Bacth 大小

kafka.producer.acks

Kafka 生产者的生产策略

示例:

a1.sinks.k1.channel=c1
a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic=mytopic
a1.sinks.k1.kafka.bootstrap.servers=localhost:9092
a1.sinks.k1.kafka.flumeBatchSize=20
a1.sinks.k1.kafka.producer.acks=1

更多内容请参考 Apache Flume 官网。

Storm 接入 Kafka

Storm 是一个分布式实时计算框架,能够对数据进行流式处理和提供通用性分布式 RPC 调用,可以实现处理事件亚秒级的延迟,适用于对延迟要求比较高的实时数据处理场景。

Storm 工作原理

在 Storm 的集群中有两种节点,控制节点Master Node和工作节点Worker NodeMaster Node上运行Nimbus进程,用于资源分配与状态监控。Worker Node上运行Supervisor进程,监听工作任务,启动executor执行。整个 Storm 集群依赖zookeeper负责公共数据存放、集群状态监听、任务分配等功能。

用户提交给 Storm 的数据处理程序称为topology,它处理的最小消息单位是tuple,一个任意对象的数组。topologyspoutbolt构成,spout是产生tuple的源头,bolt可以订阅任意spoutbolt发出的tuple进行处理。

Storm with Kafka

Storm 可以把 Kafka 作为spout,消费数据进行处理;也可以作为bolt,存放经过处理后的数据提供给其它组件消费。

Centos6.8系统

package

version

maven

3.5.0

storm

2.1.0

ssh

5.3

Java

1.8

前提条件

  • 下载并安装 JDK 8。具体操作,请参见 Download JDK 8。
  • 下载并安装 Storm,参考 Apache Storm downloads。
  • 已创建 Kafka 集群。

操作步骤

步骤1:创建 Topic

步骤2:添加 Maven 依赖

pom.xml 配置如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>storm</groupId>
  <artifactId>storm</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <name>storm</name> 
     <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-kafka-client</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.10.2.1</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass>ExclamationTopology</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

步骤3:生产消息

使用 spout/bolt

topology 代码:

//TopologyKafkaProducerSpout.java
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.kafka.bolt.KafkaBolt;
import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.utils.Utils;

import java.util.Properties;

public class TopologyKafkaProducerSpout {
    //申请的kafka实例ip:port
    private final static String BOOTSTRAP_SERVERS="xx.xx.xx.xx:xxxx";
    //指定要将消息写入的topic
    private final static String TOPIC="storm_test";
    public static void main(String[] args) throws Exception {
        //设置producer属性
        //函数参考:https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
        //属性参考:http://kafka.apache.org/0102/documentation.html
        Properties properties=new Properties();
        properties.put("bootstrap.servers", BOOTSTRAP_SERVERS);
        properties.put("acks", "1");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        //创建写入kafka的bolt,默认使用fields("key" "message")作为生产消息的key和message,也可以在FieldNameBasedTupleToKafkaMapper()中指定
        KafkaBolt kafkaBolt=new KafkaBolt()
                .withProducerProperties(properties)
                .withTopicSelector(new DefaultTopicSelector(TOPIC))
                .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());
        TopologyBuilder builder=new TopologyBuilder();
        //一个顺序生成消息的spout类,输出field是sentence
        SerialSentenceSpout spout=new SerialSentenceSpout();
        AddMessageKeyBolt bolt=new AddMessageKeyBolt();
        builder.setSpout("kafka-spout", spout, 1);
        //为tuple加上生产到kafka所需要的fields
        builder.setBolt("add-key", bolt, 1).shuffleGrouping("kafka-spout");
        //写入kafka
        builder.setBolt("sendToKafka", kafkaBolt, 8).shuffleGrouping("add-key");
    
        Config config=new Config();
        if (args !=null && args.length > 0) {
            //集群模式,用于打包jar,并放到storm运行
            config.setNumWorkers(1);
            StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.createTopology());
        } else {
            //本地模式
            LocalCluster cluster=new LocalCluster();
            cluster.submitTopology("test", config, builder.createTopology());
            Utils.sleep(10000);
            cluster.killTopology("test");
            cluster.shutdown();
        }
    
    }
}

创建一个顺序生成消息的 spout 类:

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

import java.util.Map;
import java.util.UUID;

public class SerialSentenceSpout extends BaseRichSpout {

    private SpoutOutputCollector spoutOutputCollector;
    
    @Override
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this.spoutOutputCollector=spoutOutputCollector;
    }
    
    @Override
    public void nextTuple() {
        Utils.sleep(1000);
        //生产一个UUID字符串发送给下一个组件
        spoutOutputCollector.emit(new Values(UUID.randomUUID().toString()));
    }
    
    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("sentence"));
    }
}

tuple 加上 key、message 两个字段,当 key 为 null 时,生产的消息均匀分配到各个 partition,指定了 key 后将按照 key 值 hash 到特定 partition 上:

//AddMessageKeyBolt.java
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

public class AddMessageKeyBolt extends BaseBasicBolt {

    @Override
    public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
        //取出第一个filed值
        String messae=tuple.getString(0);
        //System.out.println(messae);
        //发送给下一个组件
        basicOutputCollector.emit(new Values(null, messae));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        //创建发送给下一个组件的schema
        outputFieldsDeclarer.declare(new Fields("key", "message"));
    }
}

使用 trident

使用 trident 类生成 topology:

//TopologyKafkaProducerTrident.java
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.kafka.trident.TridentKafkaStateFactory;
import org.apache.storm.kafka.trident.TridentKafkaStateUpdater;
import org.apache.storm.kafka.trident.mapper.FieldNameBasedTupleToKafkaMapper;
import org.apache.storm.kafka.trident.selector.DefaultTopicSelector;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.operation.BaseFunction;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

import java.util.Properties;

public class TopologyKafkaProducerTrident {
    //申请的kafka实例ip:port
    private final static String BOOTSTRAP_SERVERS="xx.xx.xx.xx:xxxx";
    //指定要将消息写入的topic
    private final static String TOPIC="storm_test";
    public static void main(String[] args) throws Exception {
        //设置producer属性
        //函数参考:https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
        //属性参考:http://kafka.apache.org/0102/documentation.html
        Properties properties=new Properties();
        properties.put("bootstrap.servers", BOOTSTRAP_SERVERS);
        properties.put("acks", "1");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //设置Trident
        TridentKafkaStateFactory stateFactory=new TridentKafkaStateFactory()
                .withProducerProperties(properties)
                .withKafkaTopicSelector(new DefaultTopicSelector(TOPIC))
                //设置使用fields("key", "value")作为消息写入  不像FieldNameBasedTupleToKafkaMapper有默认值
                .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("key", "value"));
        TridentTopology builder=new TridentTopology();
        //一个批量产生句子的spout,输出field为sentence
        builder.newStream("kafka-spout", new TridentSerialSentenceSpout(5))
                .each(new Fields("sentence"), new AddMessageKey(), new Fields("key", "value"))
                .partitionPersist(stateFactory, new Fields("key", "value"), new TridentKafkaStateUpdater(), new Fields());

        Config config=new Config();
        if (args !=null && args.length > 0) {
            //集群模式,用于打包jar,并放到storm运行
            config.setNumWorkers(1);
            StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.build());
        } else {
            //本地模式
            LocalCluster cluster=new LocalCluster();
            cluster.submitTopology("test", config, builder.build());
            Utils.sleep(10000);
            cluster.killTopology("test");
            cluster.shutdown();
        }
    
    }
    
    private static class AddMessageKey extends BaseFunction {
    
        @Override
        public void execute(TridentTuple tridentTuple, TridentCollector tridentCollector) {
            //取出第一个filed值
            String messae=tridentTuple.getString(0);
            //System.out.println(messae);
            //发送给下一个组件
            //tridentCollector.emit(new Values(Integer.toString(messae.hashCode()), messae));
            tridentCollector.emit(new Values(null, messae));
        }
    }
}

创建一个批量生成消息的 spout 类:

//TridentSerialSentenceSpout.java
import org.apache.storm.Config;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.spout.IBatchSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

import java.util.Map;
import java.util.UUID;

public class TridentSerialSentenceSpout implements IBatchSpout {

    private final int batchCount;
    
    public TridentSerialSentenceSpout(int batchCount) {
        this.batchCount=batchCount;
    }
    
    @Override
    public void open(Map map, TopologyContext topologyContext) {
    
    }
    
    @Override
    public void emitBatch(long l, TridentCollector tridentCollector) {
        Utils.sleep(1000);
        for(int i=0; i < batchCount; i++){
            tridentCollector.emit(new Values(UUID.randomUUID().toString()));
        }
    }
    
    @Override
    public void ack(long l) {
    
    }
    
    @Override
    public void close() {
    
    }
    
    @Override
    public Map<String, Object> getComponentConfiguration() {
        Config conf=new Config();
        conf.setMaxTaskParallelism(1);
        return conf;
    }
    
    @Override
    public Fields getOutputFields() {
        return new Fields("sentence");
    }
}

步骤4:消费消息

使用 spout/bolt

//TopologyKafkaConsumerSpout.java
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.kafka.spout.*;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

import java.util.HashMap;
import java.util.Map;

import static org.apache.storm.kafka.spout.FirstPollOffsetStrategy.LATEST;

public class TopologyKafkaConsumerSpout {
    //申请的kafka实例ip:port
    private final static String BOOTSTRAP_SERVERS="xx.xx.xx.xx:xxxx";
    //指定要将消息写入的topic
    private final static String TOPIC="storm_test";

    public static void main(String[] args) throws Exception {
        //设置重试策略
        KafkaSpoutRetryService kafkaSpoutRetryService=new KafkaSpoutRetryExponentialBackoff(
                KafkaSpoutRetryExponentialBackoff.TimeInterval.microSeconds(500),
                KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(2),
                Integer.MAX_VALUE,
                KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(10)
        );
        ByTopicRecordTranslator<String, String> trans=new ByTopicRecordTranslator<>(
                (r) -> new Values(r.topic(), r.partition(), r.offset(), r.key(), r.value()),
                new Fields("topic", "partition", "offset", "key", "value"));
        //设置consumer参数
        //函数参考http://storm.apache.org/releases/1.1.0/javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html
        //参数参考http://kafka.apache.org/0102/documentation.html
        KafkaSpoutConfig spoutConfig=KafkaSpoutConfig.builder(BOOTSTRAP_SERVERS, TOPIC)
                .setProp(new HashMap<String, Object>(){{
                    put(ConsumerConfig.GROUP_ID_CONFIG, "test-group1"); //设置group
                    put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "50000"); //设置session超时
                    put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "60000"); //设置请求超时
                }})
                .setOffsetCommitPeriodMs(10_000) //设置自动确认时间
                .setFirstPollOffsetStrategy(LATEST) //设置拉取最新消息
                .setRetry(kafkaSpoutRetryService)
                .setRecordTranslator(trans)
                .build();
    
        TopologyBuilder builder=new TopologyBuilder();
        builder.setSpout("kafka-spout", new KafkaSpout(spoutConfig), 1);
        builder.setBolt("bolt", new BaseRichBolt(){
            private OutputCollector outputCollector;
            @Override
            public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    
            }
    
            @Override
            public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
                this.outputCollector=outputCollector;
            }
    
            @Override
            public void execute(Tuple tuple) {
                System.out.println(tuple.getStringByField("value"));
                outputCollector.ack(tuple);
            }
        }, 1).shuffleGrouping("kafka-spout");
    
        Config config=new Config();
        config.setMaxSpoutPending(20);
        if (args !=null && args.length > 0) {
            config.setNumWorkers(3);
            StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.createTopology());
        }
        else {
            LocalCluster cluster=new LocalCluster();
            cluster.submitTopology("test", config, builder.createTopology());
            Utils.sleep(20000);
            cluster.killTopology("test");
            cluster.shutdown();
        }
    }
}

使用 trident

//TopologyKafkaConsumerTrident.java
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.kafka.spout.ByTopicRecordTranslator;
import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutConfig;
import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque;
import org.apache.storm.trident.Stream;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.operation.BaseFunction;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

import java.util.HashMap;

import static org.apache.storm.kafka.spout.FirstPollOffsetStrategy.LATEST;


public class TopologyKafkaConsumerTrident {
    //申请的kafka实例ip:port
    private final static String BOOTSTRAP_SERVERS="xx.xx.xx.xx:xxxx";
    //指定要将消息写入的topic
    private final static String TOPIC="storm_test";

    public static void main(String[] args) throws Exception {
        ByTopicRecordTranslator<String, String> trans=new ByTopicRecordTranslator<>(
                (r) -> new Values(r.topic(), r.partition(), r.offset(), r.key(), r.value()),
                new Fields("topic", "partition", "offset", "key", "value"));
        //设置consumer参数
        //函数参考http://storm.apache.org/releases/1.1.0/javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html
        //参数参考http://kafka.apache.org/0102/documentation.html
        KafkaTridentSpoutConfig spoutConfig=KafkaTridentSpoutConfig.builder(BOOTSTRAP_SERVERS, TOPIC)
                .setProp(new HashMap<String, Object>(){{
                    put(ConsumerConfig.GROUP_ID_CONFIG, "test-group1"); //设置group
                    put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); //设置自动确认
                    put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "50000"); //设置session超时
                    put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "60000"); //设置请求超时
                }})
                .setFirstPollOffsetStrategy(LATEST) //设置拉取最新消息
                .setRecordTranslator(trans)
                .build();
    
        TridentTopology builder=new TridentTopology();
//      Stream spoutStream=builder.newStream("spout", new KafkaTridentSpoutTransactional(spoutConfig)); //事务型
        Stream spoutStream=builder.newStream("spout", new KafkaTridentSpoutOpaque(spoutConfig));
        spoutStream.each(spoutStream.getOutputFields(), new BaseFunction(){
            @Override
            public void execute(TridentTuple tridentTuple, TridentCollector tridentCollector) {
                System.out.println(tridentTuple.getStringByField("value"));
                tridentCollector.emit(new Values(tridentTuple.getStringByField("value")));
            }
        }, new Fields("message"));

        Config conf=new Config();
        conf.setMaxSpoutPending(20);conf.setNumWorkers(1);
        if (args !=null && args.length > 0) {
            conf.setNumWorkers(3);
            StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.build());
        }
        else {
            StormTopology stormTopology=builder.build();
            LocalCluster cluster=new LocalCluster();
            cluster.submitTopology("test", conf, stormTopology);
            Utils.sleep(10000);
            cluster.killTopology("test");
            cluster.shutdown();stormTopology.clear();
        }
    }
}

步骤5:提交 Storm

使用 mvn package 编译后,可以提交到本地集群进行 debug 测试,也可以提交到正式集群进行运行。

storm jar your_jar_name.jar topology_name
storm jar your_jar_name.jar topology_name tast_name

Logstash 接入 Kafka

Logstash 是一个开源的日志处理工具,可以从多个源头收集数据、过滤收集的数据并对数据进行存储作为其他用途。

Logstash 灵活性强,拥有强大的语法分析功能,插件丰富,支持多种输入和输出源。Logstash 作为水平可伸缩的数据管道,与 Elasticsearch 和 Kibana 配合,在日志收集检索方面功能强大。

Logstash 工作原理

Logstash 数据处理可以分为三个阶段:inputs → filters → outputs。

  1. inputs:产生数据来源,例如文件、syslog、redis 和 beats 此类来源。
  2. filters:修改过滤数据, 在 Logstash 数据管道中属于中间环节,可以根据条件去对事件进行更改。一些常见的过滤器包括:grok、mutate、drop 和 clone 等。
  3. outputs:将数据传输到其他地方,一个事件可以传输到多个 outputs,当传输完成后这个事件就结束。Elasticsearch 就是最常见的 outputs。

同时 Logstash 支持编码解码,可以在 inputs 和 outputs 端指定格式。



Logstash 接入 Kafka 的优势

  • 可以异步处理数据:防止突发流量。
  • 解耦:当 Elasticsearch 异常的时候不会影响上游工作。

? 注意:
Logstash 过滤消耗资源,如果部署在生产 server 上会影响其性能。

操作步骤

准备工作

  • 下载并安装 Logstash,参考 Download Logstash。
  • 下载并安装 JDK 8,参考 Download JDK 8。
  • 已创建 Kafka 集群。

步骤1:创建 Topic

创建一个名为 logstash_test的 Topic。

步骤2:接入 Kafka

作为 inputs 接入

  1. 执行 bin/logstash-plugin list,查看已经支持的插件是否含有 logstash-input-kafka
  2. .bin/ 目录下编写配置文件 input.conf
    此处将标准输出作为数据终点,将 Kafka 作为数据来源。
  3. input { kafka { bootstrap_servers=> "xx.xx.xx.xx:xxxx" // kafka 实例接入地址 group_id=> "logstash_group" // kafka groupid 名称 topics=> ["logstash_test"] // kafka topic 名称 consumer_threads=> 3 // 消费线程数,一般与 kafka 分区数一致 auto_offset_reset=> "earliest" } } output { stdout{codec=>rubydebug} }
  4. 执行以下命令启动 Logstash,进行消息消费。
  5. ./logstash -f input.conf
  6. 会看到刚才 Topic 中的数据被消费出来。

作为 outputs 接入

  1. 执行 bin/logstash-plugin list,查看已经支持的插件是否含有 logstash-output-kafka
  2. 在.bin/目录下编写配置文件 output.conf
    此处将标准输入作为数据来源,将 Kafka 作为数据目的地。
  3. input { input { stdin{} } } output { kafka { bootstrap_servers => "xx.xx.xx.xx:xxxx" // ckafka 实例接入地址 topic_id => "logstash_test" // ckafka topic 名称 } }
  4. 执行如下命令启动 Logstash,向创建的 Topic 发送消息。
  5. ./logstash -f output.conf
  6. 启动 Kafka 消费者,检验上一步的生产数据。
  7. ./kafka-console-consumer.sh --bootstrap-server 172.0.0.1:9092 --topic logstash_test --from-begging --new-consumer

Filebeats 接入 Kafka

Beats 平台 集合了多种单一用途数据采集器。这些采集器安装后可用作轻量型代理,从成百上千或成千上万台机器向目标发送采集数据。



Beats 有多种采集器,您可以根据自身的需求下载对应的采集器。本文以 Filebeat(轻量型日志采集器)为例,向您介绍 Filebeat 接入 Kafka 的操作指方法,及接入后常见问题的解决方法。

前提条件

  • 下载并安装 Filebeat(参见 Download Filebeat)
  • 下载并安装JDK 8(参见 Download JDK 8)
  • 已 创建 Kafka 集群

操作步骤

步骤1:创建 Topic

创建一个名为 test的 Topic。

步骤2:准备配置文件

进入 Filebeat 的安装目录,创建配置监控文件 filebeat.yml。

##=======Filebeat prospectors==========filebeat.prospectors:
- input_type: log 
## 此处为监听文件路径
  paths:
    - /var/log/messages

##=======Outputs=========##------------------ kafka -------------------------------------
output.kafka:
  version:0.10.2 // 根据不同 Kafka 集群版本配置
  # 设置为Kafka实例的接入地址
  hosts: ["xx.xx.xx.xx:xxxx"]
  # 设置目标topic的名称
  topic: 'test'
  partition.round_robin:
    reachable_only: false

  required_acks: 1
  compression: none
  max_message_bytes: 1000000

  # SASL 需要配置下列信息,如果不需要则下面两个选项可不配置
  username: "yourinstance#yourusername"  //username 需要拼接实例ID和用户名
  password: "yourpassword"

步骤4:Filebeat 发送消息

  1. 执行如下命令启动客户端。
  2. sudo ./filebeat -e -c filebeat.yml
  3. 为监控文件增加数据(示例为写入监听的 testlog 文件)。
  4. echo ckafka1 >> testlog echo ckafka2 >> testlog echo ckafka3 >> testlog
  5. 开启 Consumer 消费对应的 Topic,获得以下数据。
{"@timestamp":"2017-09-29T10:01:27.936Z","beat":{"hostname":"10.193.9.26","name":"10.193.9.26","version":"5.6.2"},"input_type":"log","message":"ckafka1","offset":500,"source":"/data/ryanyyang/hcmq/beats/filebeat-5.6.2-linux-x86_64/testlog","type":"log"}
{"@timestamp":"2017-09-29T10:01:30.936Z","beat":{"hostname":"10.193.9.26","name":"10.193.9.26","version":"5.6.2"},"input_type":"log","message":"ckafka2","offset":508,"source":"/data/ryanyyang/hcmq/beats/filebeat-5.6.2-linux-x86_64/testlog","type":"log"}
{"@timestamp":"2017-09-29T10:01:33.937Z","beat":{"hostname":"10.193.9.26","name":"10.193.9.26","version":"5.6.2"},"input_type":"log","message":"ckafka3","offset":516,"source":"/data/ryanyyang/hcmq/beats/filebeat-5.6.2-linux-x86_64/testlog","type":"log"}

SASL/PLAINTEXT 模式

如果您需要进行 SALS/PLAINTEXT 配置,则需要配置用户名与密码。 在 Kafka 配置区域新增加 username 和 password 配置即可。