Dinky 是一个开箱即用、易扩展,以 Apache Flink 为基础,连接 OLAP 和数据湖等众多框架的一站式实时计算平台,致力于流批一体和湖仓一体的探索与实践。
Dinky 作为 Apache Flink 的 FlinkSQL 的实时计算平台,具有以下核心特点。
支持 Flink 原生语法、连接器、UDF 等: 几乎零成本将 Flink 作业迁移至 Dinky。
增强 FlinkSQL 语法: 表值聚合函数、全局变量、CDC多源合并、执行环境、语句合并等。
支持 Flink 多版本: 支持作为多版本 FlinkSQL Server 的能力以及 OpenApi。
支持外部数据源的 DB SQL 操作: 如 ClickHouse、Doris、Hive、Mysql、Oracle、Phoenix、PostgreSql、Presto、SqlServer、StarRocks 等。
支持实时任务运维: 作业上线下线、作业信息、集群信息、作业快照、异常信息、作业日志、数据地图、即席查询、历史版本、报警记录等。
Dinky 实时计算平台开发模块包括 数据开发、运维中心、注册中心 和 系统设置 四大模块。
数据开发包括作业管理、作业配置和运维管理等
注册中心包括集群管理、Jar管理、数据源管理、报警管理和文档管理
系统设置包括用户管理和Flink设置
通过 dinky-mysql-server 和 dinky-standalone-server 镜像快速体验 Flink 实时计算平台。
-启动该镜像提供 Dinky 的 Mysql 业务库能力。
docker run --name dinky-mysql dinkydocker/dinky-mysql-server:0.7.2
-启动该镜像提供 Dinky 实时计算平台。
docker run --restart=always -p 8888:8888 -p 8081:8081 -e MYSQL_ADDR=dinky-mysql:3306 --name dinky --link dinky-mysql:dinky-mysql dinkydocker/dinky-standalone-server:0.7.2-flink14
IP:8888 地址打开平台并 admin/admin 登录,创建 功能示例 目录,创建 HelloWorld 的 FlinkSQL 作业。
执行模式选择 Local 并输入以下语句:
CREATE TABLE Orders ( order_number BIGINT, price DECIMAL(32,2), buyer ROW<first_name STRING, last_name STRING>, order_time TIMESTAMP(3) ) WITH ( 'connector'='datagen', 'rows-per-second'='1', 'number-of-rows'='50' ); select order_number,price,first_name,last_name,order_time from Orders
点击 执行按钮(执行当前的SQL),下方切换至 结果 选项卡,点击 获取最新数据 ,即可查看 Select 语句的执行结果。
-FlinkSQL 操作步骤?
1.进入 Dinky 的 Data Studio
2.在左侧菜单栏,右键 目录
3.新建目录或作业
4.在新建文件的对话框,填写作业信息
参数说明备注作业名称作业名称在当前项目中必须保持唯一作业类型流作业和批作业均支持以下作业类型:FlinkSQL:支持SET、DML、DDL语法FlinkSQLEnv:支持SET、DDL语法FlinkSQLEnv 场景适用于所有作业的SET、DDL语法统一管理的场景,当前FlinkSQLEnv 在SQL编辑器的语句限制在1000行以内
5.在作业开发 SQL 编辑器,编写 DDL 和 DML 代码
示例代码如下:
-创建源表datagen_source CREATE TABLE datagen_source( id BIGINT, name STRING ) WITH ( 'connector'='datagen' ); -创建结果表blackhole_sink CREATE TABLE blackhole_sink( id BIGINT, name STRING ) WITH ( 'connector'='blackhole' ); -将源表数据插入到结果表 INSERT INTO blackhole_sink SELECT id , name from datagen_source;
新建作业如下图:
6.在作业开发页面右侧 执行配置,填写配置信息
类型配置项备注作业配置执行模式区别详见用户手册数据开发中的作业概述作业配置集群实例Standalone 和 Session 执行模式需要选择集群实例,详见集群实例管理作业配置集群配置Per-Job 和 Application 执行模式需要选择集群配置,详见集群配置管理作业配置FlinkSQL 环境选择已创建的 FlinkSQLEnv,如果没有则不选作业配置任务并行度指定作业级任务并行度,默认为 1作业配置Insert 语句集默认禁用,开启后将 SQL编辑器中编写的多个 Insert 语句合并为一个 JobGraph 进行提交作业配置全局变量默认禁用,开启后可以使用数据源连接配置变量、自定义变量等作业配置批模式默认禁用,开启后启用 Batch Mode作业配置SavePoint 策略默认禁用,策略包括:最近一次最早一次指定一次作业配置报警组报警组配置详见报警管理作业配置其他配置其他的 Flink 作业配置,具体可选参数,请参考 Flink 官网
作业配置如下图:
-功能说明
执行当前的 SQL: 提交执行未保存的作业配置,并可以同步获取 SELECT、SHOW 等执行结果,常用于 Local、Standalone、Session 执行模式;
异步提交: 提交执行最近保存的作业配置,可用于所有执行模式;
发布 发布当前作业的最近保存的作业配置,发布后无法修改;
上线 提交已发布的作业配置,可触发报警;
下线 停止已上线的作业,并触发 SavePoint;
停止 只停止已提交的作业;
维护 使已发布的作业进入维护状态,可以被修改;
注销 标记作业为注销不可用状态。
-常用场景
查看 FlinkSQL 执行结果: 执行当前的 SQL。
提交作业: 异步提交。
上线作业: SavePoint 最近一次 + 上线。
下线作业: 下线。
升级作业: 下线后上线。
全新上线作业: SavePoint 禁用 + 上线。
-Flink作业启动操作步骤
1.首先登录Dinky数据开发控制台
2.在左侧菜单栏选择目录 > 作业名称 > 检查当前的FlinkSql > 发布 > 上线
或者选择目录 > 作业名称 > 检查当前的FlinkSql > 异步提交
作业启动异步提交如下图:
作业启动发布上线如下图:
可以选择使用 Standalone 或 Session 集群在开发测试环境对作业调试,如作业运行、检查结果等。配置 Standalone 或 Session 集群请参考注册中心中集群管理的集群实例管理。
也可以调试普通的 DB SQL 作业。
FlinkSQL作业调试步骤
1.进入 Data Studio
2.点击 目录 > 新建目录 > 新建作业
3.填写完作业信息后,单击 确认,作业类型选择 FlinkSQL
4.编写完整的 FlinkSQL 语句,包含 CREATE TABLE 等
示例代码如下:
-创建源表datagen_source CREATE TABLE datagen_source( id BIGINT, name STRING ) WITH ( 'connector'='datagen' ); -将源表数据插入到结果表 SELECT id BIGINT, name STRING from datagen_source
5.单击保存
6.单击语法检查
7.配置执行配置
配置项 说明
预览结果 默认开启,可预览 FlinkSQL 的执行结果
打印流 默认禁用,开启后将展示 ChangeLog
最大行数 默认 100,可预览的执行结果最大的记录数
自动停止 默认禁用,开启后达到最大行数将停止作业
注意: 预览聚合结果如 count 等操作时,关闭打印流可合并最终结果。
8.单击执行当前的SQL
9.结果 或者 历史 > 预览数据 可以手动查询最新的执行结果
1.执行模式必须是 Local、Standalone、Yarn Session、Kubernetes Session 其中的一种;
2.必须关闭 Insert 语句集;
3.除 SET 和 DDL 外,必须只提交一个 SELECT 或 SHOW 或 DESC 语句;
4.必须开启 预览结果;
5.作业必须是提交成功并且返回 JID,同时在远程集群可以看到作业处于 RUNNING 或 FINISHED 状态;
6.Dinky 重启后,之前的预览结果将失效
选择对应数据源,并书写其 sql 执行即可。
Dinky 是基于 Flink 的流批一体化数据汇聚、数据同步的实时计算平台,通过阅读本文档,您将可以零基础上手实时计算平台 Dinky 。
首先,登录 Dlinky,选择注册中心>>集群管理>>集群实例管理或集群配置管理,点击新建 Flink 集群
-创建作业
选择数据开发>>目录,首先点击创建目录,点击创建好的目录右键即可创建作业
Dinky 推荐您在使用 Yarn Session、K8s Session、StandAlone 采用集群实例的方式注册集群。
1.可通过数据开发中的快捷引导注册集群实例。或者通过注册中心中的集群管理注册集群实例。
2.添加 Flink 集群
-集群配置
Dinky 推荐您在使用 Yarn Per Job、Yarn Application、K8s Application 采用集群配置的方式注册集群。
1.可通过数据开发中的快捷引导注册集群配置。或者通过注册中心中的集群管理注册集群配置。
2.添加集群配置
-创建集群完成后,就可进一步开发 FlinkSQL 作业
-脚本选用 Flink 官网提供的 SQL 脚本,参考链接如下:
https://github.com/ververica/flink-sql-cookbook
-下载 flink-faker 放入$FLINK_HOME/lib下及Dlinky的plugins下
https://github.com/knaufk/flink-faker/releases
下面创建一个作业名称为"test66"的作业
创建完成后,即可在"test66"作业下写 SQL 及 配置作业参数
FlinkSQL 作业编写,分为三部分内容,分别是 SET 参数设置、DDL 语句编写、DML 语句编写。下面以Inserting Into Tables 为例。
当 FlinkSQL 编写完成后,即可进行作业的配置。作业配置的详细说明详见用户手册的作业基础配置
在作业配置中,您可以选择作业执行模式、Flink 集群、SavePoint策略等配置,对作业进行提交前的配置。
上述 FlinkSQL 作业配置完成后,可以对 SQL 做查询预览。
点击执行配置,开启打印流,保存。点击执行当前的SQL。即可获取到最新结果。
在数据写入 Sink 端时,Dlinky 提供了异步提交 和 上线发布功能,将其作业提交到远程集群
当作业提交到远程集群后,您可以在运维中心查看作业的运行情况。
选择注册中心>>数据源管理>>新建,假设您连接Doris。
测试创建成功后,显示如下
点击数据开发>>目录>>右键,出现创建作业菜单。作业类型选择Doris
作业创建完成后,在最右侧会出现数据源,选择连接的数据源
外部数据源可以创建 DDL、DML语句对其进行ETL开发。
当 ETL 开发结束 或者做即席查询时,可以点击保存>>语法检查>>运行当前的SQL 将 SQL 提交。
目前通过 FlinkCDC 进行会存在诸多问题,如需要定义大量的 DDL 和编写大量的 INSERT INTO,更为严重的是会占用大量的数据库连接,对 Mysql 和网络造成压力。
Dinky 定义了 CDCSOURCE 整库同步的语法,该语法和 CDAS 作用相似,可以直接自动构建一个整库入仓入湖的实时任务,并且对 source 进行了合并,不会产生额外的 Mysql 及网络压力,支持对任意 sink 的同步,如 kafka、doris、hudi、jdbc 等等
?
?Dinky 采用的是只构建一个 source,然后根据 schema、database、table 进行分流处理,分别 sink 到对应的表。
Dinky 是通过自身的数据源中心的元数据功能捕获源库的元数据信息,并同步构建 sink 阶段 datastream 或 tableAPI 所使用的 FlinkDDL。
Dinky 提供了各式各样的 sink 方式,通过修改语句参数可以实现不同的 sink 方式。Dinky 支持通过 DataStream 来扩展新的 sink,也可以使用 FlinkSQL 无需修改代码直接扩展新的 sink。
禁用全局变量、禁用语句集、禁用批模式。
目前 dlink-client-1.14 内的整库同步能力最多且主要维护,如果要使用其他 flink 版本的整库同步,如果 SQLSink 不满足需求,需要DataStreamSink 支持,请手动仿照 dlink-client-1.14 扩展相应代码实现,很简单。
目前 dlink-client-1.14 内默认实现常用的 Flink CDC,如 MysqlCDC、OracleCDC、PostgresCDC 和 SQLServerCDC,如果要使用其他 FlinkCDC,请在 Dinky 源码中仿照 MysqlCDC 进行扩展,很简单。
由于 CDCSOURCE 是 Dinky 封装的新功能,Apache Flink 源码不包含,非 Application 模式提交需要在远程 Flink 集群所使用的依赖里添加一下依赖:
# 将下面 Dinky根目录下 整库同步依赖包放置 $FLINK_HOME/lib下
lib/dlink-client-base-${version}.jar
lib/dlink-common-${version}.jar
plugins/flink-${flink-version}/dlink-client-${version}.jar
目前已经支持 application ,需提前准备好相关jar包,或者和 add jar语法并用。以 mysqlcdc-2.3.0 和 flink-1.14 为例,需要以下 jar
flink-shaded-guava-18.0-13.0.jar
HikariCP-4.0.3.jar
druid-1.2.8.jar
dlink-metadata-mysql-0.7.2.jar
dlink-metadata-base-0.7.2.jar
jackson-datatype-jsr310-2.13.4.jar
flink-sql-connector-mysql-cdc-2.3.0.jar
dlink-client-1.14-0.7.2.jar
cdcsource_example.png
一个 FlinkSQL 任务只能写一个 CDCSOURCE,CDCSOURCE 前可写 set、add jar 和 ddl 语句。
配置项中的英文逗号前不能加空格,需要紧随右单引号。
配置项 是否必须 默认值 说明 connector 是 无 指定要使用的连接器 hostname 是 无 数据库服务器的 IP 地址或主机名 port 是 无 数据库服务器的端口号 username 是 无 连接到数据库服务器时要使用的数据库的用户名 password 是 无 连接到数据库服务器时要使用的数据库的密码 scan.startup.mode 否 latest-offset 消费者的可选启动模式,有效枚举为“initial”和“latest-offset” database-name 否 无 此参数非必填 table-name 否 无 只支持正则,示例:"test\.student,test\.score",所有表示例:"test\..*" source.* 否 无 指定个性化的 CDC 配置,如 source.server-time-zone 即为 server-time-zone 配置参数。 checkpoint 否 无 单位 ms parallelism 否 无 任务并行度 sink.connector 是 无 指定 sink 的类型,如 datastream-kafka、datastream-doris、datastream-hudi、kafka、doris、hudi、jdbc 等等,以 datastream- 开头的为 DataStream 的实现方式 sink.sink.db 否 无 目标数据源的库名,不指定时默认使用源数据源的库名 sink.table.prefix 否 无 目标表的表名前缀,如 ODS 即为所有的表名前拼接 ODS sink.table.suffix 否 无 目标表的表名后缀 sink.table.upper 否 false 目标表的表名全大写 sink.table.lower 否 false 目标表的表名全小写 sink.auto.create 否 false 目标数据源自动建表,目前只支持 Mysql,其他可自行扩展 sink.timezone 否 UTC 指定目标数据源的时区,在数据类型转换时自动生效 sink.column.replace.line-break 否 false 指定是否去除换行符,即在数据转换中进行 REGEXP_REPLACE(column, '\n', '') sink.* 否 无 目标数据源的配置信息,同 FlinkSQL,使用 ${schemaName} 和 ${tableName} 可注入经过处理的源表名 sink[N].* 否 无 N代表为多数据源写入, 默认从0开始到N, 其他配置参数信息参考sink.*的配置.
该示例为将 mysql 整库同步到另一个 mysql 数据库,写入 test 库,表名前缀 test_,表名全小写,开启自动建表。
EXECUTE CDCSOURCE cdc_mysql WITH ( 'connector'='mysql-cdc', 'hostname'='127.0.0.1', 'port'='3306', 'username'='root', 'password'='123456', 'checkpoint'='3000', 'scan.startup.mode'='initial', 'parallelism'='1', 'table-name'='bigdata\.products,bigdata\.orders', 'sink.connector'='jdbc', 'sink.url'='jdbc:mysql://127.0.0.1:3306/test?characterEncoding=utf-8&useSSL=false', 'sink.username'='root', 'sink.password'='123456', 'sink.sink.db'='test', 'sink.table.prefix'='test_', 'sink.table.lower'='true', 'sink.table-name'='${tableName}', 'sink.driver'='com.mysql.jdbc.Driver', 'sink.sink.buffer-flush.interval'='2s', 'sink.sink.buffer-flush.max-rows'='100', 'sink.sink.max-retries'='5', 'sink.auto.create'='true'
该示例将 Oracle 数据库 TEST 下所有表同步到该数据库的 TEST2下。
EXECUTE CDCSOURCE cdc_oracle WITH ( 'connector'='oracle-cdc', 'hostname'='127.0.0.1', 'port'='1521', 'username'='root', 'password'='123456', 'database-name'='ORCL', 'checkpoint'='3000', 'scan.startup.mode'='initial', 'parallelism'='1', 'table-name'='TEST\..*', 'connector'='jdbc', 'url'='jdbc:oracle:thin:@127.0.0.1:1521:orcl', 'username'='root', 'password'='123456', 'table-name'='TEST2.${tableName}' )
汇总到一个 topic
当指定 sink.topic 参数时,所有 Change Log 会被写入这一个 topic。
EXECUTE CDCSOURCE cdc_kafka_one WITH ( 'connector'='mysql-cdc', 'hostname'='127.0.0.1', 'port'='3306', 'username'='root', 'password'='123456', 'checkpoint'='3000', 'scan.startup.mode'='initial', 'parallelism'='1', 'table-name'='bigdata\.products,bigdata\.orders', 'sink.connector'='datastream-kafka', 'sink.topic'='cdctest', 'sink.brokers'='bigdata2:9092,bigdata3:9092,bigdata4:9092' )
同步到对应 topic?
当不指定 sink.topic 参数时,所有 Change Log 会被写入对应库表名的 topic。
EXECUTE CDCSOURCE cdc_kafka_mul WITH ( 'connector'='mysql-cdc', 'hostname'='127.0.0.1', 'port'='3306', 'username'='root', 'password'='123456', 'checkpoint'='3000', 'scan.startup.mode'='initial', 'parallelism'='1', 'table-name'='bigdata\.products,bigdata\.orders', 'sink.connector'='datastream-kafka', 'sink.brokers'='bigdata2:9092,bigdata3:9092,bigdata4:9092' )
使用 FlinkSQL 同步到对应 topic
EXECUTE CDCSOURCE cdc_upsert_kafka WITH ( 'connector'='mysql-cdc', 'hostname'='127.0.0.1', 'port'='3306', 'username'='root', 'password'='123456', 'checkpoint'='3000', 'scan.startup.mode'='initial', 'parallelism'='1', 'table-name'='bigdata\.products,bigdata\.orders', 'sink.connector'='upsert-kafka', 'sink.topic'='${tableName}', 'sink.properties.bootstrap.servers'='bigdata2:9092,bigdata3:9092,bigdata4:9092', 'sink.key.format'='avro', 'sink.value.format'='avro' )
EXECUTE CDCSOURCE cdc_clickhouse WITH ( 'connector'='mysql-cdc', 'hostname'='127.0.0.1', 'port'='3306', 'username'='root', 'password'='123456', 'checkpoint'='3000', 'scan.startup.mode'='initial', 'parallelism'='1', 'table-name'='bigdata\.products,bigdata\.orders', 'sink.connector'='clickhouse', 'sink.url'='clickhouse://127.0.0.1:8123', 'sink.username'='default', 'sink.password'='123456', 'sink.sink.db'='test', 'sink.table.prefix'='test_', 'sink.table.lower'='true', 'sink.database-name'='test', 'sink.table-name'='${tableName}', 'sink.sink.batch-size'='500', 'sink.sink.flush-interval'='1000', 'sink.sink.max-retries'='3' )
EXECUTE CDCSOURCE jobname WITH ( 'connector'='mysql-cdc', 'hostname'='127.0.0.1', 'port'='3306', 'username'='root', 'password'='123456', 'checkpoint'='3000', 'scan.startup.mode'='initial', 'parallelism'='1', 'table-name'='test\.student,test\.score', 'sink[0].connector'='doris', 'sink[0].fenodes'='127.0.0.1:8030', 'sink[0].username'='root', 'sink[0].password'='dw123456', 'sink[0].sink.batch.size'='1', 'sink[0].sink.max-retries'='1', 'sink[0].sink.batch.interval'='60000', 'sink[0].sink.db'='test', 'sink[0].table.prefix'='ODS_', 'sink[0].table.upper'='true', 'sink[0].table.identifier'='${schemaName}.${tableName}', 'sink[0].sink.label-prefix'='${schemaName}_${tableName}_1', 'sink[0].sink.enable-delete'='true', 'sink[1].connector'='datastream-kafka', 'sink[1].topic'='cdc', 'sink[1].brokers'='127.0.0.1:9092' )
在搭建Dinky开发环境之前请确保你已经安装如下软件
环境版本npm7.19.0node.js14.17.0jdk1.8maven3.6.0+lombokIDEA插件安装mysql5.7+
请通过 git 管理工具从 GitHub 中拉取 Dinky 源码
mkdir workspace
cd workspace
git clone https://github.com/DataLinkDC/dlink.git
IDEA 提供了插件设置来安装 Lombok 插件。如果尚未安装,请在导入 Dlink 之前按照以下说明来进行操作以启用对 Lombok 注解的支持:
安装 node.js, 安装 npm
因 node.js 安装后 npm 版本较高,因此需要可用版本 7.19.0,升级npm命令如下:
npm install npm@7.19.0 -g
初始化依赖
npm install --force
IDEA 里 Build → Build Project
mvn clean install -Dmaven.test.skip=true
# 如若修改版本,按以下指定即可。flink可支持多版本(1.11-1.16)
mvn clean install -Dmaven.test.skip=true -P pord,scala-2.11,flink-1.14,flink-1.15
# 如若不需要web编译,-P 后面加: `!web`
mvn clean install -Dmaven.test.skip=true -P !web,pord,scala-2.11,flink-1.14,flink-1.15
需要修改 dlink根目录下的pom文件,下面以本地开发为例,修改如下:
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>${target.java.version}</maven.compiler.source> <maven.compiler.target>${target.java.version}</maven.compiler.target> <!-- `provided` for product environment ,`compile` for dev environment --> <scope.runtime>compile</scope.runtime> </properties>
修改dlink根目录下/dlink-admin/src/main/resources/application.ym文件
配置数据库连接信息:
spring: datasource: url: jdbc:mysql://127.0.0.1:3306/dlink?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true username: dlink password: dlink driver-class-name: com.mysql.cj.jdbc.Driver
在MySQL数据库创建 dlink 用户并在 dlink 数据库中执行 script/sql/dinky-mysql.sql 文件。此外 script/sql/upgrade 目录下存放了了各版本的升级 sql 请依次按照版本号执行。
启动 dlink-admin 下的 Dlink 启动类,可见 8888 端口。
稍微等待一会,即可访问 127.0.0.1:8888 可见登录页。
登录用户/密码: admin/admin
下载最新的编译包,对比一下 安装目录/config 下的文件,主要是 application.yml ,如果没用到最新特性,在最新版中修改一下mysql连接配置即可, 如果需要使用新特性,相关配置看相关文档描述即可
-- 注意: 按照版本号依次升级 切不可跨版本升级 ${version} 代表的是你目前的 dinky版本+1 依次往下执行
- 其中 /opt/dinky 是你dinky安装的目录
mysql> source /opt/dinky/sql/upgrade/${version}_schema/mysql/dinky_ddl.sql
-- 表的ddl
mysql> source /opt/dinky/sql/upgrade/${version}_schema/mysql/dinky_dml.sql
-- 表初始化数据 (部分版本无)
完
超市、地铁、车站等很多场景中,人脸识别已经被广泛应用,但是这个功能究竟是怎么实现的?
在本文中,将以 pico.js 库为例,分享实现轻量级人脸识别功能的具体开发过程 。
pico.js 是一个只有 200 行纯 JavaScript 代码的人脸检测库,具备实时检测功能(在实际环境中可达到200+ FPS),压缩后仅 2kB 。
开源代码地址:https://github.com/tehnokv/picojs;
简介
本文将介绍pico.js,这一由JavaScript编写的用于人脸检测的代码库,并展示其工作原理。尽管现已有类似的项目,但我们的目标是提供更小、计算效率更高的替代方案。
在深入考究其细节前,建议各位用计算机的网络摄像头体验一下人脸检测的实时演示(也适用于移动设备)。注意,所有进程都是在客户端完成的,即不向服务端发送图像。因此,各位无需担心在运行这段代码时的隐私问题。
在接下来的篇幅里,我将阐述pico.js的理论背景及其工作原理。
Pico对象监测框架
2013年,Markus团队在一个技术报告中介绍了这一由JavaScript实现的pico.js代码库。它是参考C语言实现的,我们可在GitHub上获取其源码:https://github.com/nenadmarkus/pico。我们密切关注其实现方法,因为我们不打算复制学习过程,而仅关注它的运行。这背后的原因是,我们最好学习带有官方代码的检测器,将其加载到JavaScript中并执行进程,如此就带有独特的优势(比如跨操作系统与设备的强大的可移植性)。
Pico对象检测框架是流行的Viola-Jones方法的一个改进。
Viola-Jones方法是基于区域分类的概念。这意味着在图像的每个合理位置和尺度上都使用分类器。这个区域枚举过程的可视化如下图所示:
该分类器试图判断当前区域是否存在人脸。最后,获取到的人脸区域将根据重叠程度进行聚类。鉴于每张图像都有很多区域,在这实时进程中有两个小技巧:
分类级联由一系列分类器组成。这些分类器中的每一个都能正确识别几乎所有的人脸,并丢弃一小部分非人脸区域。如果一个图像区域通过了级联的所有成员,那么它就被认定为人脸。通过(设计)序列中靠前的分类器比靠后的分类器更简单,这种效果得到了进一步放大。级联分类算法如下图所示:
每个阶段包括一个分类器Cn,它既可以拒绝图像区域(R),也可以接受图像区域(A)。一旦被拒绝,该区域将不会进入下一级联成员。如果没有一个分类器拒绝该区域,我们认为它是一张人脸。
在Viola-Jones框架中,每个分类器Cn都基于Haar-like特性。这使得每个区域可通过名为积分图像的预算结构来进行O(1)计算时间。
然而,积分图像也有一些缺点。最明显的缺点是,这种数据结构需要额外的内存来储存:通常是unit8输入图像的4倍。另外一个问题是构建一个完整的图像所需的时间(也与输入的像素数有关)。在功能有限的小型硬件上处理大的图像也可能会有问题。这种方法的一个更微妙的问题是它的优雅性:随之而来的问题是我们是否能够创建一个不需要这种结构、并且具有所有重要属性的框架。
Pico框架对每个分类器Cn用像素对比测试取代了Haar-like特性,形式如下:
其中R是一个图像区域,(Xi,Yi)表示用于比较像素值的位置。注意,这种测试可以应用于各种尺寸的区域,而不需要任何专门的数据结构,这与Haar-like的特性不同。这是通过将位置(Xi,Yi)存储在标准化坐标中(例如,(Xi,Yi)在[?1,1]×[?1,1]中),并乘以当前区域的比例。这就是pico实现多尺度检测功能的思路。
由于此类测试很简单,又因混叠和噪声而存在潜在问题,我们有必要将大量测试应用于该区域,以便对其内容进行推理。在pico框架中,这是通过
这可以用数学符号表示,如下:
其中Tt(R)表示决策树Tt在输入区域R上生成的标量输出。由于每个决策树都由若干个像素比较测试组成,这些测试可以根据需要调整大小,因此运行分类阶段Cn的计算复杂度与区域大小无关。
每个Cn决策树都是AdaBoost的变体。接下来以这种方式将阈值设置为Cn的输出,以获取期望的真阳率(例如0.995)。所有得分低于这个阈值的区域都不认为是人脸。添加级联的新成员,直到达到预期的假阳率。请参阅原出版物学习相关细节内容。
正如简介中说的那样,我们不会复制pico的学习过程,而仅关注它的运行。如果您想学习自定义对象/人脸检测器,请使用官方的实现方法。Pico.js能够加载二进制级联文件并有效地处理图像。接下来的小节将解释如何使用pico.js来检测图像中的人脸。
pico.js的组件
库的组成部分如下:
通过<script src="pico.js"></script>(或它的压缩版本) 引入并进行一些预处理后,就可以使用这些工具了。我们将讨论对图像进行人脸检测的JS代码(GitHub repo中的代码)。但愿这能详尽说明使用该库的方法。实时演示也有说明。
实例化区域分类器
区域分类器应识别图像区域是否为人脸。其思路是在整个图像中运行这个分类器,以获得其中的所有面孔(稍后详细介绍)。Pico.js的区域分类过程封装在一个函数中,其原型如下:
function(r, c, s, pixels, ldim) { /* ... */} /* ... */ }
前三个参数(r、c和s)指定区域的位置(其中心的行和列)及其大小。pixels阵列包含图像的灰度强度值。参数ldim规定从图像的一行移动到下一行的方式(在诸如OpenCV的库中称为stride)。也就是说,从代码中可以看出(r,c)位置的像素强度为[r*ldim + c]像素。该函数会返回一个浮点值,表示该区域的得分。如果分数大于或等于0.0,则该区域认定为人脸。如果分数低于0.0,则该区域认定为非人脸,即属于背景类。
Pico.js中pico.unpack_cascade过程将二进制的级联作为参数,将其解压并返回一个带有分类过程和分类器数据的闭包函数。我们用它初始化区域分类过程,以下是详细说明。
官方pico的人脸检测级联称为facefinder。它由近450个决策树组成,每个决策树的深度为6,它们集成一个25级联。该级联将在我们是实验中用到,它能对正脸图像以适当的检测速率进行实时处理,正如实时演示看到的那样。
facefinder级联可以直接从官方的github库上下载,代码写为:
var facefinder_classify_region=function(r, c, s, pixels, ldim) {return -1.0;};var cascadeurl='https://raw.githubusercontent.com/nenadmarkus/pico/c2e81f9d23cc11d1a612fd21e4f9de0921a5d0d9/rnt/cascades/facefinder';fetch(cascadeurl).then(function(response) { response.arrayBuffer().then(function(buffer) { var bytes=new Int8Array(buffer); facefinder_classify_region=pico.unpack_cascade(bytes); console.log('* cascade loaded'); })})function(r, c, s, pixels, ldim) {return -1.0;}; var cascadeurl='https://raw.githubusercontent.com/nenadmarkus/pico/c2e81f9d23cc11d1a612fd21e4f9de0921a5d0d9/rnt/cascades/facefinder'; fetch(cascadeurl).then(function(response) { response.arrayBuffer().then(function(buffer) { var bytes=new Int8Array(buffer); facefinder_classify_region=pico.unpack_cascade(bytes); console.log('* cascade loaded'); }) })
首先,将facefinder_classify_region初始化,即任何图像区域先认定为非人脸(它总是返回-1.0)。接下来,我们使用Fetch API从cascadeurl URL中获取级联二进制数据。这是一个异步调用,我们不能即刻获取到数据。最后,在获取到响应数据后,将其转换为int8数组并传递给pico.unpack_cascade,然后pico.unpack_cascade生成正确的facefinder_classify_region函数。
将facefinder_classify_region函数应用于图像中每个区域的合理位置和等级以便检测到所有的人脸。这个过程将在下一小节中解释。
在图像上运行分类器
假定HTML body内有一个canvas元素,一个image标签和一个带有onclick回调的button标签。用户一旦点击了人脸检测按钮,检测过程就开始了。
下面的JS代码用于绘制内容和图像,并获取原始像素值(红、绿、蓝+ alpha的格式):
var img=document.getElementById('image');var ctx=document.getElementById('canvas').getContext('2d');ctx.drawImage(img, 0, 0);var rgba=ctx.getImageData(0, 0, 480, 360).data; // the size of the image is 480x360 (width x height)document.getElementById('image'); var ctx=document.getElementById('canvas').getContext('2d'); ctx.drawImage(img, 0, 0); var rgba=ctx.getImageData(0, 0, 480, 360).data; // the size of the image is 480x360 (width x height)
下面,我们编写一个辅助函数,将输入的RGBA数组转换为灰度:
function rgba_to_grayscale(rgba, nrows, ncols) { var gray=new Uint8Array(nrows*ncols); for(var r=0; r<nrows; ++r) for(var c=0; c<ncols; ++c) // gray=0.2*red + 0.7*green + 0.1*blue gray[r*ncols + c]=(2*rgba[r*4*ncols+4*c+0]+7*rgba[r*4*ncols+4*c+1]+1*rgba[r*4*ncols+4*c+2])/10; return gray;} var gray=new Uint8Array(nrows*ncols); for(var r=0; r<nrows; ++r) for(var c=0; c<ncols; ++c) // gray=0.2*red + 0.7*green + 0.1*blue gray[r*ncols + c]=(2*rgba[r*4*ncols+4*c+0]+7*rgba[r*4*ncols+4*c+1]+1*rgba[r*4*ncols+4*c+2])/10; return gray; }
现在我们准备调用这个过程,它将在整个图像中运行facefinder_classify_region函数:
image={ "pixels": rgba_to_grayscale(rgba, 360, 480), "nrows": 360, "ncols": 480, "ldim": 480}params={ "shiftfactor": 0.1, // move the detection window by 10% of its size "minsize": 20, // minimum size of a face "maxsize": 1000, // maximum size of a face "scalefactor": 1.1 // for multiscale processing: resize the detection window by 10% when moving to the higher scale}// run the cascade over the image// dets is an array that contains (r, c, s, q) quadruplets// (representing row, column, scale and detection score)dets=pico.run_cascade(image, facefinder_classify_region, params);"pixels": rgba_to_grayscale(rgba, 360, 480), "nrows": 360, "ncols": 480, "ldim": 480 } params={ "shiftfactor": 0.1, // move the detection window by 10% of its size "minsize": 20, // minimum size of a face "maxsize": 1000, // maximum size of a face "scalefactor": 1.1 // for multiscale processing: resize the detection window by 10% when moving to the higher scale } // run the cascade over the image // dets is an array that contains (r, c, s, q) quadruplets // (representing row, column, scale and detection score) dets=pico.run_cascade(image, facefinder_classify_region, params);
注意,人脸的最小尺寸默认设置为20。这太小了,对于大部分应用程序来说都是不必要的。但还需要注意的是,运行速度在很大程度上取决于此参数。对于实时应用程序,应该将此值设置为100。但是,设置的最小尺寸需匹配示例图像。
检测过程完成后,数组dets包含表单(r,c,s,q),其中r,c,s指定人脸区域的位置(行,列)和大小,q表示检测分数。该地区得分越高,越有可能是人脸。
我们可以将得到的检测结果渲染到画布上:
qthresh=5.0for(i=0; i<dets.length; ++i) // check the detection score // if it's above the threshold, draw it if(dets[i][3]>qthresh) { ctx.beginPath(); ctx.arc(dets[i][1], dets[i][0], dets[i][2]/2, 0, 2*Math.PI, false); ctx.lineWidth=3; ctx.strokeStyle='red'; ctx.stroke(); }<dets.length; ++i) // check the detection score // if it's above the threshold, draw it if(dets[i][3]>qthresh) { ctx.beginPath(); ctx.arc(dets[i][1], dets[i][0], dets[i][2]/2, 0, 2*Math.PI, false); ctx.lineWidth=3; ctx.strokeStyle='red'; ctx.stroke(); }
我们需要根据经验设置变量qthresh(5.0刚好,适用于facefinder级联和静止图像中的人脸检测)。典型的检测结果是这样的:
我们可以看到每张脸周围都有多个探测器。这个问题用非极大值抑制来解决,在下一小节中解释。
原始检测的非极大值抑制(聚类)
非极大值抑制聚类的目的是将重叠的人脸区域融合在一起。每个集群的代表是其中得分最高的一次检测(该方法因此而得名)。它的分数更新为集群中所有检测分数的总和。
pico.js中的实现方式是:
dets=pico.cluster_detections(dets, 0.2); // set IoU threshold to 0.20.2); // set IoU threshold to 0.2
IoU阈值设置为0.2。这意味着两个重叠大于该值的检测将合并在一起。
现在的结果是这样的:
我们已经学习了使用pico.js检测静止图像中人脸的基本知识。值得注意的是,pico方法不如基于深度学习的现代人脸检测器强大。然而,pico非常快,这使得它成为许多应用程序的首选,比如那些需要实时处理的应用程序。
在视频中使用pico.js进行实时人脸检测
由于pico.js产生的检测噪声比较大,我们开发了一种时间记忆模块,在处理实时视频时可减轻少此问题。该方法用于上述实时演示中,显著提高了主观检测质量。
其思想是将几个连续帧的检测结合起来,以准确判断给定区域是否为人脸。这是通过实例化一个电路缓冲区来实现的,该缓冲区包含从最后一个f帧检测到的信号:
var update_memory=pico.instantiate_detection_memory(5); // f is set to 5 in this example// f is set to 5 in this example
update_memory闭包封装了电路缓冲区和刷新数据的代码。返回的数组包含来自最后f帧的检测。
现在我们不再从单帧中检测聚类,而是在聚类之前进行累加:
dets=pico.run_cascade(image, facefinder_classify_region, params);dets=update_memory(dets); // accumulates detections from last f framesdets=pico.cluster_detections(dets, 0.2); // set IoU threshold to 0.2 dets=update_memory(dets); // accumulates detections from last f frames dets=pico.cluster_detections(dets, 0.2); // set IoU threshold to 0.2
最终的分类阈值qthresh会显著提高,这会减少假阳性的数量,而不会显著影响到真阳率。
转载自:https://blog.csdn.net/csdnnews/article/details/92841099
随着 Web 的发展,用户对于 Web 的实时推送要求也越来越高 ,比如,工业运行监控、Web 在线通讯、即时报价系统、在线游戏等,都需要将后台发生的变化主动地、实时地传送到浏览器端,而不需要用户手动地刷新页面。本文对过去和现在流行的 Web 实时推送技术进行了比较与总结。
HTTP 协议有一个缺陷:通信只能由客户端发起。举例来说,我们想了解今天的天气,只能是客户端向服务器发出请求,服务器返回查询结果。HTTP 协议做不到服务器主动向客户端推送信息。这种单向请求的特点,注定了如果服务器有连续的状态变化,客户端要获知就非常麻烦。在WebSocket协议之前,有三种实现双向通信的方式:轮询(polling)、长轮询(long-polling)和iframe流(streaming)。
1.轮询(polling)
轮询是客户端和服务器之间会一直进行连接,每隔一段时间就询问一次。其缺点也很明显:连接数会很多,一个接受,一个发送。而且每次发送请求都会有Http的Header,会很耗流量,也会消耗CPU的利用率。
// 1.html
<div id="clock"></div>
<script>
let clockDiv=document.getElementById('clock');
setInterval(function(){
let xhr=new XMLHttpRequest;
xhr.open('GET','/clock',true);
xhr.onreadystatechange=function(){
if(xhr.readyState==4 && xhr.status==200){
console.log(xhr.responseText);
clockDiv.innerHTML=xhr.responseText;
}
}
xhr.send();
},1000);
</script>
//轮询 服务端
let express=require('express');
let app=express();
app.use(express.static(__dirname));
app.get('/clock',function(req,res){
res.end(new Date().toLocaleString());
});
app.listen(8080);
启动本地服务,打开http://localhost:8080/1.html,得到如下结果:
2.长轮询(long-polling)
长轮询是对轮询的改进版,客户端发送HTTP给服务器之后,看有没有新消息,如果没有新消息,就一直等待。当有新消息的时候,才会返回给客户端。在某种程度上减小了网络带宽和CPU利用率等问题。由于http数据包的头部数据量往往很大(通常有400多个字节),但是真正被服务器需要的数据却很少(有时只有10个字节左右),这样的数据包在网络上周期性的传输,难免对网络带宽是一种浪费。
// 2.html 服务端代码同上
<div id="clock"></div>
<script>
let clockDiv=document.getElementById('clock')
function send() {
let xhr=new XMLHttpRequest()
xhr.open('GET', '/clock', true)
xhr.timeout=2000 // 超时时间,单位是毫秒
xhr.onreadystatechange=function() {
if (xhr.readyState==4) {
if (xhr.status==200) {
//如果返回成功了,则显示结果
clockDiv.innerHTML=xhr.responseText
}
send() //不管成功还是失败都会发下一次请求
}
}
xhr.ontimeout=function() {
send()
}
xhr.send()
}
send()
</script>
3.iframe流(streaming)
iframe流方式是在页面中插入一个隐藏的iframe,利用其src属性在服务器和客户端之间创建一条长连接,服务器向iframe传输数据(通常是HTML,内有负责插入信息的javascript),来实时更新页面。
// 3.html
<body>
<div id="clock"></div>
<iframe src="/clock" style="display:none"></iframe>
</body>
//iframe流
let express=require('express')
let app=express()
app.use(express.static(__dirname))
app.get('/clock', function(req, res) {
setInterval(function() {
let date=new Date().toLocaleString()
res.write(`
<script type="text/javascript">
parent.document.getElementById('clock').innerHTML="${date}";//改变父窗口dom元素
</script>
`)
}, 1000)
})
app.listen(8080)
启动本地服务,打开http://localhost:8080/3.html,得到如下结果:
上述代码中,客户端只请求一次,然而服务端却是源源不断向客户端发送数据,这样服务器维护一个长连接会增加开销。
以上我们介绍了三种实时推送技术,然而各自的缺点很明显,使用起来并不理想,接下来我们着重介绍另一种技术--websocket,它是比较理想的双向通信技术。
1.什么是websocket
WebSocket是一种全新的协议,随着HTML5草案的不断完善,越来越多的现代浏览器开始全面支持WebSocket技术了,它将TCP的Socket(套接字)应用在了webpage上,从而使通信双方建立起一个保持在活动状态连接通道。
一旦Web服务器与客户端之间建立起WebSocket协议的通信连接,之后所有的通信都依靠这个专用协议进行。通信过程中可互相发送JSON、XML、HTML或图片等任意格式的数据。由于是建立在HTTP基础上的协议,因此连接的发起方仍是客户端,而一旦确立WebSocket通信连接,不论服务器还是客户端,任意一方都可直接向对方发送报文。
初次接触 WebSocket 的人,都会问同样的问题:我们已经有了 HTTP 协议,为什么还需要另一个协议?
2.HTTP的局限性
3.WebSocket的特点
相对于传统的HTTP每次请求-应答都需要客户端与服务端建立连接的模式,WebSocket是类似Socket的TCP长连接的通讯模式,一旦WebSocket连接建立后,后续数据都以帧序列的形式传输。在客户端断开WebSocket连接或Server端断掉连接前,不需要客户端和服务端重新发起连接请求。在海量并发和客户端与服务器交互负载流量大的情况下,极大的节省了网络带宽资源的消耗,有明显的性能优势,且客户端发送和接受消息是在同一个持久连接上发起,实时性优势明显。
接下来我看下websocket如何实现客户端与服务端双向通信:
// websocket.html
<div id="clock"></div>
<script>
let clockDiv=document.getElementById('clock')
let socket=new WebSocket('ws://localhost:9999')
//当连接成功之后就会执行回调函数
socket.onopen=function() {
console.log('客户端连接成功')
//再向服务 器发送一个消息
socket.send('hello') //客户端发的消息内容 为hello
}
//绑定事件是用加属性的方式
socket.onmessage=function(event) {
clockDiv.innerHTML=event.data
console.log('收到服务器端的响应', event.data)
}
</script>
// websocket.js
let express=require('express')
let app=express()
app.use(express.static(__dirname))
//http服务器
app.listen(3000)
let WebSocketServer=require('ws').Server
//用ws模块启动一个websocket服务器,监听了9999端口
let wsServer=new WebSocketServer({ port: 9999 })
//监听客户端的连接请求 当客户端连接服务器的时候,就会触发connection事件
//socket代表一个客户端,不是所有客户端共享的,而是每个客户端都有一个socket
wsServer.on('connection', function(socket) {
//每一个socket都有一个唯一的ID属性
console.log(socket)
console.log('客户端连接成功')
//监听对方发过来的消息
socket.on('message', function(message) {
console.log('接收到客户端的消息', message)
socket.send('服务器回应:' + message)
})
})
启动本地服务,打开http://localhost:3000/websocket.html,得到如下结果:
综上所述:Websocket协议不仅解决了HTTP协议中服务端的被动性,即通信只能由客户端发起,也解决了数据同步有延迟的问题,同时还带来了明显的性能优势,所以websocket 是Web 实时推送技术的比较理想的方案,但如果要兼容低版本浏览器,可以考虑用轮询来实现。
*请认真填写需求信息,我们会在24小时内与您取得联系。