整合营销服务商

电脑端+手机端+微信端=数据同步管理

免费咨询热线:

Flink 1.9 实战:使用 SQL 读取 Kaf

Flink 1.9 实战:使用 SQL 读取 Kafka 并写入 MySQL

周六在深圳分享了《Flink SQL 1.9.0 技术内幕和最佳实践》,会后许多小伙伴对最后演示环节的 Demo 代码非常感兴趣,迫不及待地想尝试下,所以写了这篇文章分享下这份代码。希望对于 Flink SQL 的初学者能有所帮助。

这份代码主要由两部分组成:1) 能用来提交 SQL 文件的 SqlSubmit 实现。2) 用于演示的 SQL 示例、Kafka 启动停止脚本、 一份测试数据集、Kafka 数据源生成器。

通过本实战,你将学到:

  1. 如何使用 Blink Planner
  2. 一个简单的 SqlSubmit 是如何实现的
  3. 如何用 DDL 创建一个 Kafka 源表和 MySQL 结果表
  4. 运行一个从 Kafka 读取数据,计算 PVUV,并写入 MySQL 的作业
  5. 设置调优参数,观察对作业的影响

SqlSubmit 的实现

笔者一开始是想用 SQL Client 来贯穿整个演示环节,但可惜 1.9 版本 SQL CLI 还不支持处理 CREATE TABLE 语句。所以笔者就只好自己写了个简单的提交脚本。后来想想,也挺好的,可以让听众同时了解如何通过 SQL 的方式,和编程的方式使用 Flink SQL。

SqlSubmit 的主要任务是执行和提交一个 SQL 文件,实现非常简单,就是通过正则表达式匹配每个语句块。如果是 CREATE TABLE 或 INSERT INTO 开头,则会调用 tEnv.sqlUpdate(...)。如果是 SET 开头,则会将配置设置到 TableConfig 上。其核心代码主要如下所示:

使用 DDL 连接 Kafka 源表

在 flink-sql-submit 项目中,我们准备了一份测试数据集(来自阿里云天池公开数据集,特别鸣谢),位于 src/main/resources/user_behavior.log。数据以 JSON 格式编码,大概长这个样子:

{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "662867", "item_id":"2244074", "category_id": "1575622", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}

为了模拟真实的 Kafka 数据源,笔者还特地写了一个 source-generator.sh 脚本(感兴趣的可以看下源码),会自动读取 user_behavior.log 的数据并以默认每毫秒1条的速率灌到 Kafka 的 user_behavior topic 中。

有了数据源后,我们就可以用 DDL 去创建并连接这个 Kafka 中的 topic(详见 src/main/resources/q1.sql)。

注:可能有用户会觉得其中的 connector.properties.0.key 等参数比较奇怪,社区计划将在下一个版本中改进并简化 connector 的参数配置。

使用 DDL 连接 MySQL 结果表

连接 MySQL 可以使用 Flink 提供的 JDBC connector。例如

PV UV 计算

假设我们的需求是计算每小时全网的用户访问量,和独立用户数。很多用户可能会想到使用滚动窗口来计算。但这里我们介绍另一种方式。即 Group Aggregation 的方式。

INSERT INTO pvuv_sink
SELECT
 DATE_FORMAT(ts, 'yyyy-MM-dd HH:00') dt,
 COUNT(*) AS pv,
 COUNT(DISTINCT user_id) AS uv
FROM user_log
GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd HH:00')

它使用 DATE_FORMAT 这个内置函数,将日志时间归一化成“年月日小时”的字符串格式,并根据这个字符串进行分组,即根据每小时分组,然后通过 COUNT(*) 计算用户访问量(PV),通过 COUNT(DISTINCT user_id) 计算独立用户数(UV)。这种方式的执行模式是每收到一条数据,便会进行基于之前计算的值做增量计算(如+1),然后将最新结果输出。所以实时性很高,但输出量也大。

我们将这个查询的结果,通过 INSERT INTO 语句,写到了之前定义的 pvuv_sink MySQL 表中。

注:在深圳 Meetup 中,我们有对这种查询的性能调优做了深度的介绍。

实战演示

环境准备

本实战演示环节需要安装一些必须的服务,包括:

  • Flink 本地集群:用来运行 Flink SQL 任务。
  • Kafka 本地集群:用来作为数据源。
  • MySQL 数据库:用来作为结果表。
  • Flink 本地集群安装

1.下载 Flink 1.9.0 安装包并解压:https://www.apache.org/dist/flink/flink-1.9.0/flink-1.9.0-bin-scala_2.11.tgz

2.下载以下依赖 jar 包,并拷贝到 flink-1.9.0/lib/ 目录下。因为我们运行时需要依赖各个 connector 实现。

  • flink-sql-connector-kafka_2.11-1.9.0.jar
  • http://central.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.9.0/flink-sql-connector-kafka_2.11-1.9.0.jar
  • flink-json-1.9.0-sql-jar.jar
  • http://central.maven.org/maven2/org/apache/flink/flink-json/1.9.0/flink-json-1.9.0-sql-jar.jar
  • flink-jdbc_2.11-1.9.0.jar
  • http://central.maven.org/maven2/org/apache/flink/flink-jdbc_2.11/1.9.0/flink-jdbc_2.11-1.9.0.jar
  • mysql-connector-java-5.1.48.jar
  • https://dev.mysql.com/downloads/connector/j/5.1.html

3.将 flink-1.9.0/conf/flink-conf.yaml 中的 taskmanager.numberOfTaskSlots 修改成 10,因为我们的演示任务可能会消耗多于1个的 slot。

4.在 flink-1.9.0 目录下执行 ./bin/start-cluster.sh,启动集群。

运行成功的话,可以在 http://localhost:8081 访问到 Flink Web UI。

另外,还需要将 Flink 的安装路径填到 flink-sql-submit 项目的 env.sh 中,用于后面提交 SQL 任务,如我的路径是

FLINK_DIR=/Users/wuchong/dev/install/flink-1.9.0

Kafka 本地集群安装

下载 Kafka 2.2.0 安装包并解压:https://www.apache.org/dist/kafka/2.2.0/kafka_2.11-2.2.0.tgz

将安装路径填到 flink-sql-submit 项目的 env.sh 中,如我的路径是

KAFKA_DIR=/Users/wuchong/dev/install/kafka_2.11-2.2.0

在 flink-sql-submit 目录下运行 ./start-kafka.sh 启动 Kafka 集群。

在命令行执行 jps,如果看到 Kafka 进程和 QuorumPeerMain 进程即表明启动成功。

MySQL 安装

可以在官方页面下载 MySQL 并安装:

https://dev.mysql.com/downloads/mysql/

如果有 Docker 环境的话,也可以直接通过 Docker 安装

https://hub.docker.com/_/mysql

$ docker pull mysql
$ docker run --name mysqldb -p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 -d mysql

然后在 MySQL 中创建一个 flink-test 的数据库,并按照上文的 schema 创建 pvuv_sink 表。

提交 SQL 任务

1.在 flink-sql-submit 目录下运行 ./source-generator.sh,会自动创建 user_behavior topic,并实时往里灌入数据。

2.在 flink-sql-submit 目录下运行 ./run.sh q1, 提交成功后,可以在 Web UI 中看到拓扑。

在 MySQL 客户端,我们也可以实时地看到每个小时的 pv uv 值在不断地变化

结尾

本文带大家搭建基础集群环境,并使用 SqlSubmit 提交纯 SQL 任务来学习了解如何连接外部系统。flink-sql-submit/src/main/resources/q1.sql 中还有一些注释掉的调优参数,感兴趣的同学可以将参数打开,观察对作业的影响。关于这些调优参数的原理,可以看下我在 深圳 Meetup 上的分享《Flink SQL 1.9.0 技术内幕和最佳实践》。

作者:巴蜀真人


lt;table> 表格的样式:


表格 <table>标签的定义:

HTML 表格:<table> 标签定义 。

简单的 HTML 表格包括: table 元素,一个或多个 tr、th 以及 td 元素。


<table>表格结构:

<table border=1px;>
 <tr> <th> row1,col1 </th><th>row1,col2 </th> </tr>
 <tr> <th> row2,col1 </th><td>row2,col2 </td> </tr>
</table>

<table>表格元素:

tr(行标签)、 th(表头单元格标签)、 td(普通单元格标签) caption(表格标题)、 col(定义列)、 colgroup(对表格中的列进行组合)、thead(组合表头的内容) tbody(组合表格的主题内容) 、tfoot(组合表格的脚注内容) 等

<td>(普通单元格标签) : 元素定义表格单元格

	<td>11,980</td>

<th>(表头单元格标签) : 元素定义表格表头

<th scope="row">工资</th>

<tr>(行标签) : 元素定义表格行

<tr>
		<th scope="row">工资</th>
		<td>11,980</td>
		<td>12,650</td>
		<td>9,700</td>
		<td>10,600</td>
 <td rowspan="5">工作</td>
	</tr>

<caption>(表格标题):元素定义表格标题

<caption class="c1">月度收入4月 - 7月 </caption>

还有一些复杂的表格才能用到的元素 :<col>、 <colgroup>、<thead>、 <tbody> 、<tfoot>等.

  • thead主要用来存放表格的表头的;
  • tbody 用来存放真正的数据的;
  • tfoot是表格的脚部, 主要是用来对表格做总结性的统计, 备注等.

由于thead, tbody, tfoot是从语义 上来划分 表格结构的, 主要用于比较复杂的表格中。


<colspan>合并行元素 :定义<table>中的行的合并

 <td colspan="5">153,629</td>

<rowspan>合并列元素 :定义<table>中的列的合并

 <td rowspan="5">工作</td>

<table>表格的嵌套:

<tr>
		<th scope="row">总计</th>
		<td>36,060</td>
		<td>38,759</td>
		<td>38,110</td>
		<td>40,700</td>
 <td class="ct">
 <table id="t2"> 
 <tr><td></td></tr>
 </table>
 </td>
	</tr>

嵌套


HTML基础表格的应用,上面的简单元素就可以满足,运用表格逻辑思维去思考,可以更快的掌握<table>表格标签。

本文部分图片来自网络,如有侵权,请联系修改。

lt;table>和<form>结合效果图:

在HTML/CSS 中,我们经常用HTML来布局和填充内容,用CSS来添加效果,修饰内容和布局,使整个页面变得更好看。


HTML和CSS的配合方法:

即在<head></head>标签内添加CSS样式表的链接:

代码展示如下:

<head>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
	<title>Make a Table Frame</title>
 <link rel="stylesheet" type="text/css" href="CSS/tableframe.css" >
</head>

CSS选择器

定义:

对带有指定属性的 HTML 元素设置样式。

注意:

只有在规定了 !DOCTYPE 时,IE7 和 IE8 才支持属性选择器。在 IE6 及更低的版本中,不支持属性选择。

功能:

“选择器”指明了{}中的“样式”的作用对象,也就是“样式”作用于网页中的哪些元素。

CSS中的选择器有三个:

标签选择器、class类选择器、id选择器

1.标签选择器样式表:a{}、 div{}、table{} ...

{对全局所有的选中类型标签的样式修改}

2.class类选择器 样式表: .class{}

{在HTML中每个标签都可以同时绑定多个类名,每个标签都可以设置class;同一个界面中class是不可重复}

3. id选择器样式表: #d1 {}

{每个标签都可有id,每个页面不可重复id,}

【一个HTML标签只能绑定一个id属性,一个HTML标签可以绑定多个class属性】

单纯选择<div>标签的时候 是对全局的的(所有的)<div>进行修饰;

选择器优先级:

id选择器>class类选择器>标签选择器

所以有id和class 选择器的标签将不会被覆盖。交叉时是按照优先级覆盖显示的。


<table>部分:电脑配件管理表2018年5月-8月

单纯的HTML 表格表单内容

标题<caption>标签:

表格的标题<caption>的内容填充(HTML)

<caption> 表格标题</caption> 

标题<caption>的样式修饰(CSS)

table.formdata caption
{
	text-shadow: #FF00ff;
	text-align: center;
	padding-bottom: 6px;
	font-weight: bold;
}

其他<table>标签相关内容可参考 HTML中表格的实例应用 一文。


<form>部分:

form在网页中主要负责数据采集功能。

一个表单有三个基本组成部分:

(1)表单标签:包含了处理表单数据所用CGI程序的URL以及数据提交到服务器的方法。

(2)表单域:包含了文本框、密码框、隐藏域、多行文本框、复选框、单选框、下拉选择框和文件上传框等。

(3)表单按钮:提交按钮、复位按钮和一般按钮;用于将数据传送到服务器上的CGI脚本或者取消输入。还可以用表单按钮来控制其他定义了处理脚本的处理工作。

<input>标签

定义:

<input> 标签规定用户可输入数据的输入字段。

根据不同的 type 属性,输入字段有多种形态。输入字段可以是文本字段、复选框、密码字段、单选按钮、按钮等等

语法代码:

<input type="value" >

实例代码:

<td><input type="text" name="Mainboard 6月" id="Mainboard 6月"></td>

关系展示:


<input>中 submit属性 和reset属性

实例代码:

<p>

<input type="submit" name="btnSubmit" id="btnSubmit" value="Add Data" class="btn">

<input type="reset" value="Reset All" class="btn">

</p>

实例展示:

<input>标签的其他属性值:


<input>标签外是否添加<form>标签?

input标签外是否添加form标签需要按情形区分:

应用场景的区别:

1.所有向后台提交数据(包括原生和ajax提交)的<input>都建议用<form>包裹.

2.如果只是用来做前台交互效果则不推荐使用form包裹。

但提交数据时,其实也可以不用form包裹input标签:

1.如果有form标签,在点击提交铵钮时,浏览器自动收集参数,并打包一个http请求到服务器,完成表单提交。在这一过程中,浏览器会根据method的不同,将参数编码后,放在urI中(get),或者放在请求的data中(post)。然后发送到服务器。

2.如果没有form,post方式的提交要使用ajax手工完成。get方式的提交需要自己拼接url。


<form>表单其他相关内容可参考 HTML中 表单 的应用实例 一文。


最后,附带一下该可输入的EXCEL表格的源码。

HTML code:

<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
<html xmlns="http://www.w3.org/1999/xhtml">
<head>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
	<title>Make a Table Frame</title>
 <link rel="stylesheet" type="text/css" href="CSS/tableframe.css" >
</head>
<body image="">
 <form method="psot">
	<table border="1px" class="formdata">
 	<caption>电脑配件管理表2018年5月-8月</caption>
 	<tr>
 		<th></th>
 		<th scope="col">5月</th>
 		<th scope="col">6月</th>
 		<th scope="col">7月</th>
 		<th scope="col">8月</th>
 	</tr>
 	<tr>
 		<th scope="row">Hard Disk</th>
 		<td><input type="text" name="Hard Disk 5月" id="Hard Disk 5月"></td>
 		<td><input type="text" name="Hard Disk 6月" id="Hard Disk 6月"></td>
 		<td><input type="text" name="Hard Disk 7月" id="Hard Disk 7月"></td>
 		<td><input type="text" name="Hard Disk 8月" id="Hard Disk 8月"></td>
 	</tr>
 	<tr>
 		<th scope="row">Mainboard</th>
 		<td><input type="text" name="Mainboard 5月" id="Mainboard 5月"></td>
 <td><input type="text" name="Mainboard 6月" id="Mainboard 6月"></td>
 <td><input type="text" name="Mainboard 7月" id="Mainboard 7月"></td>
 <td><input type="text" name="Mainboard 8月" id="Mainboard 8月"></td>
 	</tr>
 	<tr>
 		<th scope="row">Case</th>
 		<td><input type="text" name="Case 5月" id="Case 5月"></td>
 <td><input type="text" name="Case 6月" id="Case 6月"></td>
 <td><input type="text" name="Case 7月" id="Case 7月"></td>
 <td><input type="text" name="Case 8月" id="Case 8月"></td>
 	</tr>
 	<tr>
 	 <th scope="row">Power</th>
 	 <td><input type="text" name="Power 5月" id="Power 5月"></td>
 <td><input type="text" name="Power 6月" id="Power 6月"></td>
 <td><input type="text" name="Power 7月" id="Power 7月"></td>
 <td><input type="text" name="Power 8月" id="Power 8月"></td>	
 	</tr>
 	<tr>
 		<th scope="row">CPU Fan</th>
 		<td><input type="text" name="CPU Fan 5月" id="CPU Fan 5月"></td>
 <td><input type="text" name="CPU Fan 6月" id="CPU Fan 6月"></td>
 <td><input type="text" name="CPU Fan 7月" id="CPU Fan 7月"></td>
 <td><input type="text" name="CPU Fan 8月" id="CPU Fan 8月"></td> 
 	</tr>
 	<tr>
 		<th scope="row">Total</th>
 		<td><input type="text" name="Total 5月" id="Total 5月"></td>
 <td><input type="text" name="Total 6月" id="Total 6月"></td>
 <td><input type="text" name="Total7月" id="Total 7月"></td>
 <td><input type="text" name="Total 8月" id="Total 8月"></td>
 	</tr>
	</table>
 <p>
 <input type="submit" name="btnSubmit" id="btnSubmit" value="Add Data" class="btn">
 <input type="reset" value="Reset All" class="btn">
 </p>
 </form>
</body>
</html>

CSS code :

body
{
	font-family: Arial;
	/*background-image: url(image/mainroad.jpg) no-repeat;*/
	background-color: #00ff00;
	background-size: 100%;
}
table.formdata
{
	width: 300px;
	height: 150px;
	border: 2px solid #F00;
	border-collapse: collapse;
	font-family: Arial;
}
table.formdata caption
{
	text-shadow: #FF00ff;
	text-align: center;
	padding-bottom: 6px;
	font-weight: bold;
}
table.formdata th
{
	border:1px solid #be34hc;
	background-color: #E2E2E2;
	color:#000000;
	text-aglin:center;
	font-weight: normal;
	padding: 2px 8px 2px 6px;
	margin: 0px;
}
table.formdata input
{
	width: 100px;
	padding: 1px 3px 1px 3px;
	margin: 0px;
	border:none;
	font-family: Arial;
}
.btn
{
	width:100px;
	background-color: #FF00ee;
	border:1px solid #00f2f2;
	font-family: Arial;
}

本文部分内容来自网络,如有侵权,请联系修改。