果你不了解python,可以先了解python的简单用法。不过人邮君相信,在座的各位都是大佬,我们直接介绍操作。
python 与 mysql 实现交互的过程,通常分为:建立连接、把sql语句定义为字符串,提交指令、关闭连接。
核心的技能在于 sql语句;除了定义sql语句,其余3个处理都是固定的写法。接下来,人邮君结合《MySQL是怎样运行的》这本书,以Linux环境为主,为大家进行说明。
MySQL是怎样运行的 从根儿上理解MySQL
首先来看第一步,安装 MySQL 数据库:
如果你想要使用python操作MySQL数据库,就必须先要安装pymysql库,这个库的安装很简单;
第二步,pymysql 模块安装与使用:
MySQL-python驱动,是python 操作mysql必不可少的模块。
下载MySQL-python-1.2.5.zip 文件之后直接解压。进入MySQL-python-1.2.5目录:
>>python setup.py install
下载地址:https://pypi.python.org/pypi/MySQL-python/
第三步,python与mysql的交互实现:
1)连接
pymysql .connect () 函数:连接数据库
使用 pymysql 的 connect() 方法连接数据库,涉及到几个参数,具体代表意义如下:
host:MySQL服务的地址,若数据库在本地上,使用 localhost 或者127.0.0.1。如果在其它服务器上,则写对应的 IP地址
port:服务的端口号,默认为3306,不写则为默认值。
user:登录数据库的用户名
passwd:登录 MySQL 的密码
db:数据库名
charset:设置为 utf8 编码,解决存汉字乱码问题
eg:
# 导入模块
import pymysql
# 打开数据库连接
conn=pymysql.connect(
host="127.0.0.1",
user="root",
password="123456",
database="test_db",
charset="utf8")
print(conn)
print(type(conn))
输出结果显示如下:表面数据库连接成功
详细可以参考
https://www.cnblogs.com/qjj19931230/p/12550384.html?utm_source=tuicool
这里要强调的是,除了上面的连接方式,还有其他的连接。在《MySQL是怎样运行的》这本书中,介绍到,mysql连接分为内连接和外连接。内外连接的根本区别是在驱动表中记录不符合ON子句中的连接条件时,内连接不会把该记录加入到最后的结果集中,而外连接会。外连接分为左(外)连接和右(外)连接。
三种链接方式如下图所示:
2)获取游标
conn.cursor():获取游标
对数据库进行操作,只连接数据库是不够的,还需要获取操作数据库的游标,才能进行后续的操作。游标的主要作用是用来接收数据库操作后的返回结果,比如数据查询、插入和删除等。通过获取到的数据库连接实例 conn 下的 cursor() 方法来创建游标,如下:
# 导入模块
import pymysql
# 打开数据库连接
conn=pymysql.connect(
host="127.0.0.1",
user="root",
password="123456",
database="test_db",
charset="utf8")
# print(conn)
# print(type(conn))
# 获取连接下的游标
cursor_test=conn.cursor()
print(cursor_test)
3)数据库操作
import pymysql
# 打开数据库连接
conn=pymysql.connect(
host="127.0.0.1",
user="root",
password="123456",
database="test_db",
charset="utf8")
# 获取连接下的游标
cursor_test=conn.cursor()
# 使用 execute() 方法执行 SQL,如果表存在则删除
cursor_test.execute("DROP TABLE IF EXISTS EMPLOYEE")
# 使用预处理语句创建表
sql="""CREATE TABLE user1 (
FIRST_NAME CHAR(20) NOT NULL,
LAST_NAME CHAR(20),
AGE INT,
SEX CHAR(1),
INCOME FLOAT )"""
cursor_test.execute(sql)
# 关闭数据库连接
conn.close()
如下所示数据库表创建成功:
mysql> desc user1;
+------------+----------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+------------+----------+------+-----+---------+-------+
| FIRST_NAME | char(20) | NO | | NULL | |
| LAST_NAME | char(20) | YES | | NULL | |
| AGE | int(11) | YES | | NULL | |
| SEX | char(1) | YES | | NULL | |
| INCOME | float | YES | | NULL | |
+------------+----------+------+-----+---------+-------+
5 rows in set (0.00 sec)
import pymysql
# 打开数据库连接
conn=pymysql.connect(
host="127.0.0.1",
user="root",
password="123456",
database="test_db",
charset="utf8")
# 获取连接下的游标
cursor_test=conn.cursor()
# 使用预处理语句创建表
sql="""INSERT INTO user1(FIRST_NAME,
LAST_NAME, AGE, SEX, INCOME)
VALUES ('Fei', 'Fei', 20, 'M', 1000)"""
try:
# 执行sql语句
cursor_test.execute(sql)
# 提交到数据库执行
conn.commit()
except:
# 如果发生错误则回滚
conn.rollback()
# 关闭数据库连接
conn.close()
import pymysql
# 打开数据库连接
conn=pymysql.connect(
host="127.0.0.1",
user="root",
password="123456",
database="test_db",
charset="utf8")
# 获取连接下的游标
cursor_test=conn.cursor()
sql="""
select * from user1"""
try:
# 执行 sql 语句
cursor_test.execute(sql)
# 显示出所有数据
data_result=cursor_test.fetchall()
for row in data_result:
fname=row[0]
lname=row[1]
age=row[2]
sex=row[3]
income=row[4]
# 打印结果
print("fname=%s,lname=%s,age=%s,sex=%s,income=%s" % \
(fname, lname, age, sex, income))
except:
print("Error: unable to fetch data")
# 关闭数据库连接
conn.close()
# 导入模块
import pymysql
# 打开数据库连接
conn=pymysql.connect(
host="127.0.0.1",
user="root",
password="123456",
database="test_db",
charset="utf8")
# print(conn)
# print(type(conn))
# 获取连接下的游标
cursor_test=conn.cursor()
sql="DELETE * FROM user1"
try:
# 执行SQL语句
cursor_test.execute(sql)
# 提交到数据库执行
conn.commit()
except:
# 发生错误时回滚
conn.rollback()
# 关闭数据库连接
conn.close()
在《MySQL是怎样运行的》,作者小孩子4919强调,嵌套循环连接算法是指驱动表只访问一次,但被驱动表却可能会访问多次,访问次数取决于驱动表执行单表查询后的结果集中有多少条记录,大致过程如下:
步骤1,选取驱动表,使用与驱动表相关的过滤条件,选取代价最低的单表访问方法来执行对驱动表的单表查询;
步骤2,对步骤1中查询驱动表得到的结果集中的每一条记录,都分别到被驱动表中查找匹配的记录。
由于被驱动表可能会访问多次,因此可以为被驱动表建立合适的索引以加快查询速度。
所以,如果被驱动表非常大,即需要完成大量的数据交换,多次访问被驱动表可能导致很多次的磁盘I/O读取操作,此时可以使用基于块的嵌套循环连接算法来缓解由此造成的性能损耗。Mysql的设计者,提出了名为Join Buffer(连接缓冲区)的概念:
有兴趣的同学,建议根据书中详细描述走一遍。
此外,人邮君特别建议大家看看《MySQL是怎样运行的》,它解决了“为什么这个SQL语句执行得这么慢?为什么我明明建立了索引,但是查询计划显示没用?为什么IN查询中的参数一多就不使用索引了?为什么我的数据显示成了乱码?”等等每一位DBA和后端开发人员在与MySQL打交道时,所遇到的很多常见问题。除此之外,索引结构、MVCC、隔离级别的实现、锁的使用等知识,也是求职人员在MySQL面试中躲不过去的高频问题,作者都在书中给出了很详细的介绍。
MySQL是怎样运行的 从根儿上理解MySQL
这本书的初稿最初是以小册的形式发布在掘金平台上的,一经发布便得到大家的青睐,十分火爆!历经两年,现在终于成书,有兴趣的小伙伴也可以去掘金围观~(小孩子4919 的个人主页)
从底层到应用,从基础到进阶,关于MySQL的一切,作者都在书中讲解得非常清楚,帮助你从根儿上理解MySQL。
在现代Web应用程序中,一个强大的后端服务器是必不可少的。而Node.js作为一种轻量级高效的后端开发语言,已经被广泛运用于各种应用场景。
在本文中,我们将探讨如何使用Node.js和MySQL数据库进行交互,以便构建一个强大的后端服务器,并使其能够支持复杂的数据操作。
在开始之前,我们需要确保已经安装了Node.js和MySQL服务器。如果你还没有安装,可以按照以下步骤进行:
完成以上两个步骤后,我们需要创建一个新的Node.js项目。在终端中执行以下命令来创建一个空白的Node.js项目:
bash复制代码mkdir my-project
cd my-project
npm init -y
接下来,我们需要安装一些必要的依赖项,包括mysql和express:
bash复制代码npm install mysql express
在这里,我们使用了Express框架来创建我们的Web服务器。而MySQL则是用于连接和操作数据库的一种流行的关系型数据库。
首先,我们需要创建一个MySQL数据库并创建一张表来存储一些数据。在终端中执行以下命令来连接到MySQL服务器:
bash复制代码mysql -u root -p
这里,-u参数用于指定用户名,而-p参数则用于提示输入密码。如果你的MySQL服务器没有设置密码,则可以省略-p参数。
接下来,我们可以创建一个新的数据库和表。在MySQL客户端中执行以下命令:
sql复制代码CREATE DATABASE my_database;
USE my_database;
CREATE TABLE users (
id INT(11) NOT NULL AUTO_INCREMENT,
name VARCHAR(255) NOT NULL,
email VARCHAR(255) NOT NULL,
password VARCHAR(255) NOT NULL,
PRIMARY KEY (id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
这个表将用于存储用户信息。它有三个字段:name、email和password。
现在,我们已经准备好连接到MySQL服务器并开始使用它了。在我们的Node.js应用程序中,我们需要使用以下代码来连接到MySQL:
js复制代码const mysql=require('mysql');
const connection=mysql.createConnection({
host: 'localhost',
user: 'root',
password: 'your_password',
database: 'my_database'
});
connection.connect((error)=> {
if (error) {
console.error('Error connecting to MySQL: ' + error.stack);
return;
}
console.log('Connected to MySQL as ID ' + connection.threadId);
});
在这里,我们使用createConnection()函数来创建一个新的MySQL连接对象,并传入一些必要的参数。这些参数包括MySQL服务器的主机名、用户名、密码和数据库名称。
一旦连接到MySQL服务器,我们可以使用connection.query()函数来向服务器发送SQL查询。接下来我们将会探讨如何使用connection.query()函数来执行常用的SQL操作。
现在,我们已经连接到MySQL服务器了,接下来让我们看看如何从数据库中获取数据。在这里,我们将使用SELECT语句来查询用户信息。
js复制代码app.get('/users', (req, res)=> {
connection.query('SELECT * FROM users', (error, results, fields)=> {
if (error) throw error;
res.json(results);
});
});
在这里,我们使用Express框架创建了一个路由处理程序,该处理程序将处理HTTP GET请求并返回所有用户数据。当我们收到GET请求时,我们将使用connection.query()函数来执行SELECT * FROM users语句,并将结果作为JSON格式发送回客户端。如果发生错误,我们将抛出一个异常。
results参数包含从MySQL服务器返回的结果集。这是一个数组,其中每个元素都表示表中的一行数据。而fields参数则包含结果集的字段信息。
现在,我们可以使用浏览器或其他HTTP客户端来访问/users路由,并获得所有用户数据了。
除了查询数据外,我们还需要能够向数据库中插入新数据。在这里,我们将使用INSERT INTO语句来插入新用户数据。
js复制代码app.post('/users', (req, res)=> {
const { name, email, password }=req.body;
connection.query('INSERT INTO users SET ?', { name, email, password }, (error, results, fields)=> {
if (error) throw error;
res.send('User added successfully');
});
});
在这里,我们创建了一个路由处理程序,该处理程序将处理HTTP POST请求并将新用户数据插入到数据库中。当收到POST请求时,我们首先从请求正文中提取name、email和password数据。然后,我们使用connection.query()函数执行INSERT INTO语句,并将新用户数据作为对象传递给它。最后,我们向客户端发送一条成功消息。
更新数据与插入数据类似。在这里,我们使用UPDATE语句来更新数据库中的现有数据。
javascript复制代码js
app.put('/users/:id', (req, res)=> {
const { name, email, password }=req.body;
const id=req.params.id;
connection.query('UPDATE users SET name=?, email=?, password=? WHERE id=?', [name, email, password, id], (error, results, fields)=> {
if (error) throw error;
res.send('User updated successfully');
});
});
在这里,我们创建了一个路由处理程序,该处理程序将处理HTTP PUT请求并更新指定ID的用户数据。我们首先从请求正文中提取name、email和password数据,然后从URL参数中提取用户ID。接下来,我们使用connection.query()函数执行UPDATE语句,并将新的用户数据和用户ID作为参数传递给它。
最后,我们需要能够从数据库中删除现有数据。在这里,我们将使用DELETE语句来删除指定ID的用户数据。
js复制代码app.delete('/users/:id', (req, res)=> {
const id=req.params.id;
connection.query('DELETE FROM users WHERE id=?', [id], (error, results, fields)=> {
if (error) throw error;
res.send('User deleted successfully');
});
});
在这里,我们创建了一个路由处理程序,该处理程序将处理HTTP DELETE请求并删除指定ID的用户数据。我们从URL参数中提取用户ID,并使用connection.query()函数执行DELETE语句,将用户ID作为参数传递给它。
在本文中,我们探讨了如何使用Node.js和MySQL数据库进行交互,以便构建一个强大的后端服务器,并使其能够支持复杂的数据操作。我们学习了一些常用的SQL操作,包括查询、插入、更新和删除数据。通过将这些技术应用于你的下一个项目中,你将能够创建出一个高效、可靠并且易于维护的Web应用程序。
距离上一篇文章发布又过去了两周,这次先填掉上一篇秒杀系统文章结尾处开的坑,介绍一下数据库中间件Canal的使用。
「Canal用途很广,并且上手非常简单,小伙伴们在平时完成公司的需求时,很有可能会用到。」
举个例子:
公司目前有多个开发人员正在开发一套服务,为了缩短调用延时,对部分接口数据加入了缓存。一旦这些数据在数据库中进行了更新操作,缓存就成了旧数据,必须及时删除。
删除缓存的代码「理所当然可以写在更新数据的业务代码里」,但有时候者写操作是在别的项目代码里,你可能无权修改,亦或者别人不愿你在他代码里写这种业务之外的代码。(毕竟多人协作中间会产生各种配合问题)。又或者就是单纯的删除缓存的操作失败了,缓存依然是旧数据。
正如上篇文章缓存与数据库双写一致性实战里面所说,我们可以将缓存更新操作完全独立出来,形成一套单独的系统。「Canal正是这么一个很好的帮手。」 能帮我们实现像下图这样的系统:
「本篇文章的要点如下:」
?
欢迎关注我的个人公众号获取最全的原创文章:「后端技术漫谈」(二维码见文章底部)
?
众所周知,阿里是国内比较早地大量使用MySQL的互联网企业(去IOE化:去掉IBM的小型机、Oracle数据库、EMC存储设备,代之以自己在开源软件基础上开发的系统),并且基于阿里巴巴/淘宝的业务,从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。
Canal应运而生,它通过伪装成数据库的从库,读取主库发来的binlog,用来实现「数据库增量订阅和消费业务需求」。
「Canal用途:」
开源项目地址:
https://github.com/alibaba/canal
在这里就不再摘抄项目简介了,提炼几个值得注意的点:
Canal实际是将自己伪装成数据库的从库,来读取Binlog。我们先补习下关于「MySQL数据库主从数据库」的基础知识,这样就能更快的理解Canal。
为了应对高并发场景,MySQL支持把一台数据库主机分为单独的一台写主库(主要负责写操作),而把读的数据库压力分配给读的从库,而且读从库可以变为多台,这就是读写分离的典型场景。
实现数据库的读写分离,是通过数据库主从同步,让从数据库监听主数据库Binlog实现的。大体流程如下图:
?
MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
?
详细主从同步原理在这里就不展开细说了。
可以看到,这种架构下会有一个问题,「数据库主从同步会存在延迟,那么就会有短暂的时间,主从数据库的数据是不一致的。」
这种不一致大多数情况下非常短暂,很多时候我们可以忽略他。
但一旦要求数据一致,就会引申出如何解决这个问题的思考。
我们通常使用MySQL主从复制来解决MySQL的单点故障问题,其通过逻辑复制的方式把主库的变更同步到从库,主备之间无法保证严格一致的模式,
于是,MySQL的主从复制带来了主从“数据一致性”的问题。「MySQL的复制分为:异步复制、半同步复制、全同步复制。」
MySQL默认的复制即是异步复制,主库在执行完客户端提交的事务后会立即将结果返给给客户端,并不关心从库是否已经接收并处理,这样就会有一个问题,「主如果crash掉了,此时主上已经提交的事务可能并没有传到从库上,如果此时,强行将从提升为主,可能导致新主上的数据不完整。」
?
主库将事务 Binlog 事件写入到 Binlog 文件中,此时主库只会通知一下 Dump 线程发送这些新的 Binlog,然后主库就会继续处理提交操作,而此时不会保证这些 Binlog 传到任何一个从库节点上。
?
指当主库执行完一个事务,所有的从库都执行了该事务才返回给客户端。「因为需要等待所有从库执行完该事务才能返回」,所以全同步复制的性能必然会收到严重的影响。
?
当主库提交事务之后,所有的从库节点必须收到、APPLY并且提交这些事务,然后主库线程才能继续做后续操作。但缺点是,主库完成一个事务的时间会被拉长,性能降低。
?
是介于全同步复制与全异步复制之间的一种,「主库只需要等待至少一个从库节点收到」并且 Flush Binlog 到 Relay Log 文件即可,主库不需要等待所有从库给主库反馈。同时,「这里只是一个收到的反馈,而不是已经完全完成并且提交的反馈」,如此,节省了很多时间。
?
介于异步复制和全同步复制之间,主库在执行完客户端提交的事务后不是立刻返回给客户端,而是等待至少一个从库接收到并写到relay log中才返回给客户端。相对于异步复制,半同步复制提高了数据的安全性,「同时它也造成了一定程度的延迟,这个延迟最少是一个TCP/IP往返的时间。所以,半同步复制最好在低延时的网络中使用。」
?
「事实上,半同步复制并不是严格意义上的半同步复制,MySQL半同步复制架构中,主库在等待备库ack时候,如果超时会退化为异步后,也可能导致“数据不一致”。」
?
当半同步复制发生超时时(由rpl_semi_sync_master_timeout参数控制,单位是毫秒,默认为10000,即10s),会暂时关闭半同步复制,转而使用异步复制。当master dump线程发送完一个事务的所有事件之后,如果在rpl_semi_sync_master_timeout内,收到了从库的响应,则主从又重新恢复为半同步复制。
?
关于半同步复制的详细原理分析可以看这篇引申文章,在此不展开:
https://www.cnblogs.com/ivictor/p/5735580.html
回顾了数据库从库的数据同步原理,理解Canal十分简单,直接引用官网原文:
这个步骤我在之前的文章教你使用Binlog日志恢复误删的MySQL数据已经提到过,这里完善了一下,再贴一下,方便大家。
首先进入数据库控制台,运行指令:
mysql> show variables like'log_bin%';
+---------------------------------+-------+
| Variable_name | Value |
+---------------------------------+-------+
| log_bin | OFF |
| log_bin_basename | |
| log_bin_index | |
| log_bin_trust_function_creators | OFF |
| log_bin_use_v1_row_events | OFF |
+---------------------------------+-------+
5 rows in set (0.00 sec)
可以看到我们的binlog是关闭的,都是OFF。接下来我们需要修改Mysql配置文件,执行命令:
sudo vi /etc/mysql/mysql.conf.d/mysqld.cnf
在文件末尾添加:
log-bin=/var/lib/mysql/mysql-bin
binlog-format=ROW
保存文件,重启mysql服务:
sudo service mysql restart
重启完成后,查看下mysql的状态:
systemctl status mysql.service
这时,如果你的mysql版本在5.7或更高版本,就会报错:
Jan 06 15:49:58 VM-0-11-ubuntu mysqld[5930]: 2020-01-06T07:49:58.190791Z 0 [Warning] Changed limits: max_open_files: 1024 (requested 5000)
Jan 06 15:49:58 VM-0-11-ubuntu mysqld[5930]: 2020-01-06T07:49:58.190839Z 0 [Warning] Changed limits: table_open_cache: 431 (requested 2000)
Jan 06 15:49:58 VM-0-11-ubuntu mysqld[5930]: 2020-01-06T07:49:58.359713Z 0 [Warning] TIMESTAMP with implicit DEFAULT value is deprecated. Please use --explicit_defaults_for_timestamp server option (se
Jan 06 15:49:58 VM-0-11-ubuntu mysqld[5930]: 2020-01-06T07:49:58.361395Z 0 [Note] /usr/sbin/mysqld (mysqld 5.7.28-0ubuntu0.16.04.2-log) starting as process 5930 ...
Jan 06 15:49:58 VM-0-11-ubuntu mysqld[5930]: 2020-01-06T07:49:58.363017Z 0 [ERROR] You have enabled the binary log, but you haven't provided the mandatory server-id. Please refer to the proper server
Jan 06 15:49:58 VM-0-11-ubuntu mysqld[5930]: 2020-01-06T07:49:58.363747Z 0 [ERROR] Aborting
Jan 06 15:49:58 VM-0-11-ubuntu mysqld[5930]: 2020-01-06T07:49:58.363922Z 0 [Note] Binlog end
Jan 06 15:49:58 VM-0-11-ubuntu mysqld[5930]: 2020-01-06T07:49:58.364108Z 0 [Note] /usr/sbin/mysqld: Shutdown complete
Jan 06 15:49:58 VM-0-11-ubuntu systemd[1]: mysql.service: Main process exited, code=exited, status=1/FAILURE
「You have enabled the binary log, but you haven't provided the mandatory server-id. Please refer to the proper server」
之前我们的配置,对于5.7以下版本应该是可以的。但对于高版本,我们需要指定server-id。
我们给这个MySQL指定为2(只要不与其他库id重复):
server-id=2
mysql> select user, host from user;
+------------------+-----------+
| user | host |
+------------------+-----------+
| root | % |
| debian-sys-maint | localhost |
| mysql.session | localhost |
| mysql.sys | localhost |
| root | localhost |
+------------------+-----------+
5 rows in set
CREATE USER canal IDENTIFIED BY 'xxxx'; (填写密码)
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
show grants for 'canal'
去Github下载最近的Canal稳定版本包:
解压缩:
mkdir /tmp/canal
tar zxvf canal.deployer-$version.tar.gz -C /tmp/canal
配置文件设置:
主要有两个文件配置,一个是conf/canal.properties一个是conf/example/instance.properties。
为了快速运行Demo,只修改conf/example/instance.properties里的数据库连接账号密码即可
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=xxxxxxx
canal.instance.connectionCharset=UTF-8
请先确保机器上有JDK,接着运行Canal启动脚本:
sh bin/startup.sh
下图即成功运行:
我在秒杀系统系列文章的代码仓库里(miaosha-job)编写了如下客户端代码
仓库源码地址:https://github.com/qqxx6661/miaosha
package job;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
public class CanalClient {
private static final Logger LOGGER=LoggerFactory.getLogger(CanalClient.class);
public static void main(String[] args) {
// 第一步:与canal进行连接
CanalConnector connector=CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111),
"example", "", "");
connector.connect();
// 第二步:开启订阅
connector.subscribe();
// 第三步:循环订阅
while (true) {
try {
// 每次读取 1000 条
Message message=connector.getWithoutAck(1000);
long batchID=message.getId();
int size=message.getEntries().size();
if (batchID==-1 || size==0) {
LOGGER.info("当前暂时没有数据,休眠1秒");
Thread.sleep(1000);
} else {
LOGGER.info("-------------------------- 有数据啦 -----------------------");
printEntry(message.getEntries());
}
connector.ack(batchID);
} catch (Exception e) {
LOGGER.error("处理出错");
} finally {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
/**
* 获取每条打印的记录
*/
public static void printEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
// 第一步:拆解entry 实体
Header header=entry.getHeader();
EntryType entryType=entry.getEntryType();
// 第二步: 如果当前是RowData,那就是我需要的数据
if (entryType==EntryType.ROWDATA) {
String tableName=header.getTableName();
String schemaName=header.getSchemaName();
RowChange rowChange=null;
try {
rowChange=RowChange.parseFrom(entry.getStoreValue());
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
EventType eventType=rowChange.getEventType();
LOGGER.info(String.format("当前正在操作表 %s.%s, 执行操作=%s", schemaName, tableName, eventType));
// 如果是‘查询’ 或者 是 ‘DDL’ 操作,那么sql直接打出来
if (eventType==EventType.QUERY || rowChange.getIsDdl()) {
LOGGER.info("执行了查询语句:[{}]", rowChange.getSql());
return;
}
// 第三步:追踪到 columns 级别
rowChange.getRowDatasList().forEach((rowData) -> {
// 获取更新之前的column情况
List<Column> beforeColumns=rowData.getBeforeColumnsList();
// 获取更新之后的 column 情况
List<Column> afterColumns=rowData.getAfterColumnsList();
// 当前执行的是 删除操作
if (eventType==EventType.DELETE) {
printColumn(beforeColumns);
}
// 当前执行的是 插入操作
if (eventType==EventType.INSERT) {
printColumn(afterColumns);
}
// 当前执行的是 更新操作
if (eventType==EventType.UPDATE) {
printColumn(afterColumns);
// 进行删除缓存操作
deleteCache(afterColumns, tableName, schemaName);
}
});
}
}
}
/**
* 每个row上面的每一个column 的更改情况
* @param columns
*/
public static void printColumn(List<Column> columns) {
columns.forEach((column) -> {
String columnName=column.getName();
String columnValue=column.getValue();
String columnType=column.getMysqlType();
// 判断 该字段是否更新
boolean isUpdated=column.getUpdated();
LOGGER.info(String.format("数据列:columnName=%s, columnValue=%s, columnType=%s, isUpdated=%s", columnName, columnValue, columnType, isUpdated));
});
}
/**
* 秒杀下单接口删除库存缓存
*/
public static void deleteCache(List<Column> columns, String tableName, String schemaName) {
if ("stock".equals(tableName) && "m4a_miaosha".equals(schemaName)) {
AtomicInteger id=new AtomicInteger();
columns.forEach((column) -> {
String columnName=column.getName();
String columnValue=column.getValue();
if ("id".equals(columnName)) {
id.set(Integer.parseInt(columnValue));
}
});
// TODO: 删除缓存
LOGGER.info("Canal删除stock表id:[{}] 的库存缓存", id);
}
}
}
代码中有详细的注释,就不做解释了。
我们跑起代码,紧接着我们在数据库中进行更改UPDATE操作,把法外狂徒张三改成张三1,然后再改回张三,见下图。
Canal成功收到了两条更新操作:
紧接着我们模拟一个删除Cache缓存的业务,在代码中有:
/**
* 秒杀下单接口删除库存缓存
*/
public static void deleteCache(List<Column> columns, String tableName, String schemaName) {
if ("stock".equals(tableName) && "m4a_miaosha".equals(schemaName)) {
AtomicInteger id=new AtomicInteger();
columns.forEach((column) -> {
String columnName=column.getName();
String columnValue=column.getValue();
if ("id".equals(columnName)) {
id.set(Integer.parseInt(columnValue));
}
});
// TODO: 删除缓存
LOGGER.info("Canal删除stock表id:[{}] 的库存缓存", id);
}
}
「在上面的代码中,在收到m4a_miaosha.stock表的更新操作后,我们刷新库存缓存。效果如下:」
简单的Canal使用就介绍到这里,剩下的发挥空间留给各位读者大大们。
本文总结了Canal的基本原理和简单的使用。
「总结如下几点:」
「希望大家多多支持我的原创技术文章公众号:后端技术漫谈,我最全的原创文章都在这里首发。」
我是一名后端开发工程师。主要关注后端开发,数据安全,爬虫,物联网,边缘计算等方向,欢迎交流。
个人公众号:后端技术漫谈
「如果文章对你有帮助,不妨收藏,转发,在看起来~」
*请认真填写需求信息,我们会在24小时内与您取得联系。