整合营销服务商

电脑端+手机端+微信端=数据同步管理

免费咨询热线:

深入解读HBase2.0新功能之Assignment

深入解读HBase2.0新功能之AssignmentManagerV2

AssignmentManager模块是HBase中一个非常重要的模块,Assignment Manager(之后简称AM)负责了HBase中所有region的Assign,UnAssign,以及split/merge过程中region状态变化的管理等等。在HBase-0.90之前,AM的状态全部存在内存中,自从HBASE-2485之后,AM把状态持久化到了Zookeeper上。在此基础上,社区对AM又修复了大量的bug和优化(见此文章),最终形成了用在HBase-1.x版本上的这个AM。

老Assignment Mananger的问题

相信深度使用过HBase的人一般都会被Region RIT的状态困扰过,长时间的region in transition状态简直令人抓狂。

除了一些确实是由于Region无法被RegionServer open的case,大部分的RIT,都是AM本身的问题引起的。总结一下HBase-1.x版本中AM的问题,主要有以下几点:

region状态变化复杂

这张图很好地展示了region在open过程中参与的组件和状态变化。可以看到,多达7个组件会参与region状态的变化。并且在region open的过程中多达20多个步骤!越复杂的逻辑意味着越容易出bug

region状态多处缓存

region的状态会缓存在多个地方,Master中RegionStates会保存Region的状态,Meta表中会保存region的状态,Zookeeper上也会保存region的状态,要保持这三者完全同步是一件很困难的事情。同时,Master和RegionServer都会修改Meta表的状态和Zookeeper的状态,非常容易导致状态的混乱。如果出现不一致,到底以哪里的状态为准?每一个region的transition流程都是各自为政,各自有各自的处理方法

重度依赖Zookeeper

在老的AM中,region状态的通知完全通过Zookeeper。比如说RegionServer打开了一个region,它会在Zookeeper把这个region的RIT节点改成OPEN状态,而不去直接通知Master。Master会在Zookeeper上watch这个RIT节点,通过Zookeeper的通知机制来通知Master这个region已经发生变化。Master再根据Zookeeper上读取出来的新状态进行一定的操作。严重依赖Zookeeper的通知机制导致了region的上线/下线的速度存在了一定的瓶颈。特别是在region比较多的时候,Zookeeper的通知会出现严重的滞后现象。

正是这些问题的存在,导致AM的问题频发。我本人就fix过多个AM导致region无法open的issue。比如说这三个相互关联的“连环”case:HBASE-17264,HBASE-17265,HBASE-17275。

Assignment Mananger V2

面对这些问题的存在,社区也在不断尝试解决这些问题,特别是当region的规模达到100w级别的时候,AM成为了一个严重的瓶颈。HBASE-11059中提出的ZK-less Region Assignment就是一个非常好的改良设计。在这个设计中,AM完全摆脱了Zookeeper的限制,在测试中,zk-less的assign比zk的assign快了一个数量级!

但是在这个设计中,它摒弃了Zookeeper这个持久化的存储,一些region transition过程中的中间状态无法被保存。因此,在此基础上,社区又更进了一步,提出了Assignment Mananger V2在这个方案。在这个方案中,仍然摒弃了Zookeeper参与Assignment的整个过程。但是,它引入了ProcedureV2这个持久化存储来保存Region transition中的各个状态,保证在master重启时,之前的assing/unassign,split等任务能够从中断点重新执行。具体的来说,AMv2方案中,主要的改进有以下几点:

Procedure V2

关于Procedure V2,我之后将独立写文章介绍。这里,我只大概介绍下ProcedureV2和引入它所带来的价值。

我们知道,Master中会有许多复杂的管理工作,比如说建表,region的transition。这些工作往往涉及到非常多的步骤,如果master在做中间某个步骤的时候宕机了,这个任务就会永远停留在了中间状态(RIT因为之前有Zookeeper做持久化因此会继续从某个状态开始执行)。比如说在enable/disable table时,如果master宕机了,可能表就停留在了enabling/disabling状态。需要一些外部的手段进行恢复。那么从本质上来说,ProcedureV2提供了一个持久化的手段(通过ProcedureWAL,一种类似RegionServer中WAL的日志持久化到HDFS上),使master在宕机后能够继续之前未完成的任务继续完成。同时,ProcedureV2提供了非常丰富的状态转换并支持回滚执行,即使执行到某一个步骤出错,master也可以按照用户的逻辑对之前的步骤进行回滚。比如建表到某一个步骤失败了,而之前已经在HDFS中创建了一些新region的文件夹,那么ProcedureV2在rollback的时候,可以把这些残留删除掉。

Procedure中提供了两种Procedure框架,顺序执行和状态机,同时支持在执行过程中插入subProcedure,从而能够支持非常丰富的执行流程。在AMv2中,所有的Assign,UnAssign,TableCreate等等流程,都是基于Procedure实现的。

去除Zookeeper依赖

有了Procedure V2之后,所有的状态都可以持久化在Procedure中,Procedure中每次的状态变化,都能够持久化到ProcedureWAL中,因此数据不会丢失,宕机后也能恢复。同时,AMv2中region的状态扭转(OPENING,OPEN,CLOSING,CLOSE等)都会由Master记录在Meta表中,不需要Zookeeper做持久化。再者,之前的AM使用的Zookeeper watch机制通知master region状态的改变,而现在每当RegionServer Open或者close一个region后,都会直接发送RPC给master汇报,因此也不需要Zookeeper来做状态的通知。综合以上原因,Zookeeper已经在AMv2中没有了存在的必要。

减少状态冲突的可能性

之前我说过,在之前的AM中,region的状态会同时存在于meta表,Zookeeper和master的内存状态。同时Master和regionserver都会去修改Zookeeper和meta表,维护状态统一的代价非常高,非常容易出bug。而在AMv2中,只有master才能去修改meta表。并在region整个transition中做为一个“权威”存在,如果regionserver汇报上来的region状态与master看到的不一致,则master会命令RegionServer abort。Region的状态,都以master内存中保存的RegionStates为准。

除了上述这些优化,AMv2中还有许多其他的优化。比如说AMv2依赖Procedure V2提供的一套locking机制,保证了对于一个实体,如一张表,一个region或者一个RegionServer同一时刻只有一个Procedure在执行。同时,在需要往RegionServer发送命令,如发送open,close等命令时,AMv2实现了一个RemoteProcedureDispatcher来对这些请求做batch,批量把对应服务器的指令一起发送等等。在代码结构上,之前处理相应region状态的代码散落在AssignmentManager这个类的各个地方,而在AMv2中,每个对应的操作,都有对应的Procedure实现,如AssignProcedure,DisableTableProcedure,SplitTableRegionProcedure等等。这样下来,使AssignmentManager这个之前杂乱的类变的清晰简单,代码量从之前的4000多行减到了2000行左右。

AssignProcedure

AMv2中有太多的Procedure对应各种不同的transition,这里不去详细介绍每个Procedure的操作。我将以AssignProcedure为例,讲解一下在AMv2中,一个region是怎么assign给一个RegionServer,并在对应的RS上Open的。

AssignProcedure是一个基于Procedure实现的状态机。它拥有3个状态:

  • REGION_TRANSITION_QUEUE: Assign开始时的状态。在这个状态时,Procedure会对region状态做一些改变和存储,并丢到AssignmentManager的assign queue中。对于单独region的assign,AssignmentManager会把他们group起来,再通过LoadBalancer分配相应的服务器。当这一步骤完成后,Procedure会把自己标为REGION_TRANSITION_DISPATCH,然后看是否已经分配服务器,如果还没有被分配服务器的话,则会停止继续执行,等待被唤醒。
  • REGION_TRANSITION_DISPATCH: 当AssignmentManager为这个region分配好服务器时,Procedure就会被唤醒。或者Procedure在执行完REGION_TRANSITION_QUEUE状态时master宕机,Procedure被恢复后,也会进入此步骤执行。所以在此步骤下,Procedure会先检查一下是否分配好了服务器,如果没有,则把状态转移回REGION_TRANSITION_QUEUE,否则的话,则把这个region交给RemoteProcedureDispatcher,发送RPC给对应的RegionServer来open这个region。同样的,RemoteProcedureDispatcher也会对相应的指令做一个batch,批量把一批region open的命令发送给某一台服务器。当命令发送完成之后,Procedure又会进入休眠状态,等待RegionServer成功OPen这个region后,唤醒这个Procedure
  • REGION_TRANSITION_FINISH: 当有RegionServer汇报了此region被打开后,会把Procedure的状态置为此状态,并唤醒Procedure执行。此时,AssignProcedure会做一些状态改变的工作,并修改meta表,把meta表中这个region的位置指向对应的RegionServer。至此,region assign的工作全部完成。

AMv2中提供了一个Web页面(Master页面中的‘Procedures&Locks’链接)来展示当前正在执行的Procedure和持有的锁。

其实通过log,我们也可以看到Assign的整个过程。

假设,一台server宕机,此时master会产生一个ServerCrashProcedure 来处理,在这个Procedure中,会做一系列的工作,比如WAL的restore。当这些前置的工作做完后,就会开始assign之前在宕掉服务器上的region,比如56f985a727afe80a184dac75fbf6860c。此时会在ServerCrashProcedure产生一系列的子任务:

2017-05-23 12:04:24,175 INFO [ProcExecWrkr-30] procedure2.ProcedureExecutor: Initialized subprocedures=[{pid=1178, ppid=1176, state=RUNNABLE:REGION_TRANSITION_QUEUE; AssignProcedure table=IntegrationTestBigLinkedList, region=bfd57f0b72fd3ca77e9d3c5e3ae48d76, target=ve0540.halxg.cloudera.com,16020,1495525111232}, {pid=1179, ppid=1176, state=RUNNABLE:REGION_TRANSITION_QUEUE; AssignProcedure table=IntegrationTestBigLinkedList, region=56f985a727afe80a184dac75fbf6860c, target=ve0540.halxg.cloudera.com,16020,1495525111232}]

可以看到,ServerCrashProcedure的pid(Procedure ID)为1178,在此Procedure中产生的assign 56f985a727afe80a184dac75fbf6860c这个region的子Procedure的pid为1179,同时他的ppid(Parent Procedure ID)为1178。在AMv2中,通过追踪这些ID,就非常容易把一个region的transition整个过程全部串起来。

接下来,pid=1170这个Procedure开始执行,首先执行的是REGION_TRANSITION_QUEUE状态的逻辑,然后进入睡眠状态。

2017-05-23 12:04:24,241 INFO [ProcExecWrkr-30] assignment.AssignProcedure: Start pid=1179, ppid=1176, state=RUNNABLE:REGION_TRANSITION_QUEUE; AssignProcedure table=IntegrationTestBigLinkedList, region=56f985a727afe80a184dac75fbf6860c, target=ve0540.halxg.cloudera.com,16020,1495525111232; rit=OFFLINE, location=ve0540.halxg.cloudera.com,16020,1495525111232; forceNewPlan=false, retain=false

当target server被指定时,Procedure进入REGION_TRANSITION_DISPATCH状态,dispatch了region open的请求,同时把meta表中region的状态改成了OPENING,然后再次进入休眠状态

2017-05-23 12:04:24,494 INFO [ProcExecWrkr-38] assignment.RegionStateStore: pid=1179 updating hbase:meta row=IntegrationTestBigLinkedList,H\xE3@\x8D\x964\x9D\xDF\x8F@9'\x0F\xC8\xCC\xC2,1495566261066.56f985a727afe80a184dac75fbf6860c., regionState=OPENING, regionLocation=ve0540.halxg.cloudera.com,16020,1495525111232 2017-05-23 12:04:24,498 INFO [ProcExecWrkr-38] assignment.RegionTransitionProcedure: Dispatch pid=1179, ppid=1176, state=RUNNABLE:REGION_TRANSITION_DISPATCH; AssignProcedure table=IntegrationTestBigLinkedList, region=56f985a727afe80a184dac75fbf6860c, target=ve0540.halxg.cloudera.com,16020,1495525111232; rit=OPENING, location=ve0540.halxg.cloudera.com,16020,1495525111232

最后,当RegionServer打开了这个region后,会发RPC通知master,那么在通知过程中,这个Procedure再次被唤醒,开始执行REGION_TRANSITION_FINISH的逻辑,最后更新meta表,把这个region置为打开状态。

2017-05-23 12:04:26,643 DEBUG [RpcServer.default.FPBQ.Fifo.handler=46,queue=1,port=16000] assignment.RegionTransitionProcedure: Received report OPENED seqId=11984985, pid=1179, ppid=1176, state=RUNNABLE:REGION_TRANSITION_DISPATCH; AssignProcedure table=IntegrationTestBigLinkedList, region=56f985a727afe80a184dac75fbf6860c, target=ve0540.halxg.cloudera.com,16020,1495525111232; rit=OPENING, location=ve0540.halxg.cloudera.com,16020,1495525111232 2017-05-23 12:04:26,643 INFO [ProcExecWrkr-9] assignment.RegionStateStore: pid=1179 updating hbase:meta row=IntegrationTestBigLinkedList,H\xE3@\x8D\x964\x9D\xDF\x8F@9'\x0F\xC8\xCC\xC2,1495566261066.56f985a727afe80a184dac75fbf6860c., regionState=OPEN, openSeqNum=11984985, regionLocation=ve0540.halxg.cloudera.com,16020,1495525111232
2017-05-23 12:04:26,836 INFO [ProcExecWrkr-9] procedure2.ProcedureExecutor: Finish suprocedure pid=1179, ppid=1176, state=SUCCESS; AssignProcedure table=IntegrationTestBigLinkedList, region=56f985a727afe80a184dac75fbf6860c, target=ve0540.halxg.cloudera.com,16020,1495525111232

一路看下来,由于整个region assign的过程都是在Procedure中执行,整个过程清晰明了,非常容易追述,也没有了Zookeeper一些event事件的干扰。

总结

Assignment Mananger V2依赖Procedure V2实现了一套清晰明了的region transition机制。去除了Zookeeper依赖,减少了region状态冲突的可能性。整体上来看,代码的可读性更强,出了问题也更好查错。对于解决之前AM中的一系列“顽疾”,AMv2做了很好的尝试,也是一个非常好的方向。

AMv2之所以能保持简洁高效的一个重要原因就是重度依赖了Procedure V2,把一些复杂的逻辑都转移到了Procedure V2中。但是这样做的问题是:一旦ProcedureWAL出现了损坏,或者Procedure本身存在bug,这个后果就是灾难性的。事实上在我们的测试环境中,就出现过PRocedureWAL损坏导致region RIT的情况。

另外需要注意的是,截止目前为止,HBCK仍然无法支持AMv2,这会导致一旦出现问题,修复起来会比较困难。

当然,新的事务还是要有一段成熟期,相信经过一段时间的bug修复和完善后,我相信AMv2一定会完美解决之前的一些问题,给HBase的运维上带来一些不同的体验。愿世界不再被HBase的RIT困扰 :-)。

云端使用

阿里HBase目前已经在阿里云提供商业化服务,任何有需求的用户都可以在阿里云端使用深入改进的、一站式的HBase服务。云HBase版本与自建HBase相比在运维、可靠性、性能、稳定性、安全、成本等方面均有很多的改进。

同时,云HBase2.0 在2018年6月6日正式发布,点击了解更多!

文作者为京东算法服务部的张颖和段学浩,并由 Apache Hive PMC,阿里巴巴技术专家李锐帮忙校对。主要内容为:

1.背景

2.Flink SQL 的优化

3.总结

一、背景

目前,京东搜索推荐的数据处理流程如上图所示。可以看到实时和离线是分开的,离线数据处理大部分用的是 Hive / Spark,实时数据处理则大部分用 Flink / Storm。

这就造成了以下现象:在一个业务引擎里,用户需要维护两套环境、两套代码,许多共性不能复用,数据的质量和一致性很难得到保障。且因为流批底层数据模型不一致,导致需要做大量的拼凑逻辑;甚至为了数据一致性,需要做大量的同比、环比、二次加工等数据对比,效率极差,并且非常容易出错。

而支持批流一体的 Flink SQL 可以很大程度上解决这个痛点,因此我们决定引入 Flink 来解决这种问题。

在大多数作业,特别是 Flink 作业中,执行效率的优化一直是 Flink 任务优化的关键,在京东每天数据增量 PB 级情况下,作业的优化显得尤为重要。

写过一些 SQL 作业的同学肯定都知道,对于 Flink SQL 作业,在一些情况下会造成同一个 UDF 被反复调用的情况,这对一些消耗资源的任务非常不友好;此外,影响执行效率大致可以从 shuffle、join、failover 策略等方面考虑;另外,Flink 任务调试的过程也非常复杂,对于一些线上机器隔离的公司来说尤甚。

为此,我们实现了内嵌式的 Derby 来作为 Hive 的元数据存储数据库 (allowEmbedded);在任务恢复方面,批式作业没有 checkpoint 机制来实现failover,但是 Flink 特有的 region 策略可以使批式作业快速恢复;此外,本文还介绍了对象重用等相关优化措施。

二、 Flink SQL 的优化

1. UDF 重用

在 Flink SQL 任务里会出现以下这种情况:如果相同的 UDF 既出现在 LogicalProject 中,又出现在 Where 条件中,那么 UDF 会进行多次调用 (见https://issues.apache.org/jira/browse/FLINK-20887)。但是如果该 UDF 非常耗 CPU 或者内存,这种多余的计算会非常影响性能,为此我们希望能把 UDF 的结果缓存起来下次直接使用。在设计的时候需要考虑:(非常重要:请一定保证 LogicalProject 和 where 条件的 subtask chain 到一起)

  • 一个 taskmanager 里面可能会有多个 subtask,所以这个 cache 要么是 thread (THREAD LOCAL) 级别要么是 tm 级别;
  • 为了防止出现一些情况导致清理 cache 的逻辑走不到,一定要在 close 方法里将 cache 清掉;
  • 为了防止内存无限增大,选取的 cache 最好可以主动控制 size;至于 “超时时间”,建议可以配置一下,但是最好不要小于 UDF 先后调用的时间;
  • 上文有提到过,一个 tm 里面可能会有多个 subtask,相当于 tm 里面是个多线程的环境。首先我们的 cache 需要是线程安全的,然后可根据业务判断需不需要锁。

根据以上考虑,我们用 guava cache 将 UDF 的结果缓存起来,之后调用的时候直接去cache 里面拿数据,最大可能降低任务的消耗。下面是一个简单的使用(同时设置了最大使用 size、超时时间,但是没有写锁):

public class RandomFunction extends ScalarFunction {
    private static Cache<String, Integer> cache=CacheBuilder.newBuilder()
            .maximumSize(2)
            .expireAfterWrite(3, TimeUnit.SECONDS)
            .build();

    public int eval(String pvid) {
        profileLog.error("RandomFunction invoked:" + atomicInteger.incrementAndGet());
        Integer result=cache.getIfPresent(pvid);
        if (null==result) {
            int tmp=(int)(Math.random() * 1000);
            cache.put("pvid", tmp);
            return tmp;
        }
        return result;
    }
    @Override
    public void close() throws Exception {
        super.close();
        cache.cleanUp();
    }
}

2. 单元测试

大家可能会好奇为什么会把单元测试也放到优化里面,大家都知道 Flink 任务调试过程非常复杂,对于一些线上机器隔离的公司来说尤甚。京东的本地环境是没有办法访问任务服务器的,因此在初始阶段调试任务,我们耗费了很多时间用来上传 jar 包、查看日志等行为。

为了降低任务的调试时间、增加代码开发人员的开发效率,实现了内嵌式的 Derby 来作为 Hive 的元数据存储数据库 (allowEmbedded),这算是一种优化开发时间的方法。具体思路如下:

首先创建 Hive Conf:

public static HiveConf createHiveConf() {
    ClassLoader classLoader=new HiveOperatorTest().getClass().getClassLoader();
    HiveConf.setHiveSiteLocation(classLoader.getResource(HIVE_SITE_XML));

    try {
        TEMPORARY_FOLDER.create();
        String warehouseDir=TEMPORARY_FOLDER.newFolder().getAbsolutePath() + "/metastore_db";
        String warehouseUri=String.format(HIVE_WAREHOUSE_URI_FORMAT, warehouseDir);

        HiveConf hiveConf=new HiveConf();
        hiveConf.setVar(
                HiveConf.ConfVars.METASTOREWAREHOUSE,
                TEMPORARY_FOLDER.newFolder("hive_warehouse").getAbsolutePath());
        hiveConf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY, warehouseUri);

        hiveConf.set("datanucleus.connectionPoolingType", "None");
        hiveConf.set("hive.metastore.schema.verification", "false");
        hiveConf.set("datanucleus.schema.autoCreateTables", "true");
        return hiveConf;
    } catch (IOException e) {
        throw new CatalogException("Failed to create test HiveConf to HiveCatalog.", e);
    }
}

接下来创建 Hive Catalog:(利用反射的方式调用 embedded 的接口)

public static void createCatalog() throws Exception{
    Class clazz=HiveCatalog.class;
    Constructor c1=clazz.getDeclaredConstructor(new Class[]{String.class, String.class, HiveConf.class, String.class, boolean.class});
    c1.setAccessible(true);
    hiveCatalog=(HiveCatalog)c1.newInstance(new Object[]{"test-catalog", null, createHiveConf(), "2.3.4", true});
    hiveCatalog.open();
}

创建 tableEnvironment:(同官网)

EnvironmentSettings settings=EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment tableEnv=TableEnvironment.create(settings);
TableConfig tableConfig=tableEnv.getConfig();
Configuration configuration=new Configuration();
configuration.setInteger("table.exec.resource.default-parallelism", 1);
tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
tableEnv.useCatalog(hiveCatalog.getName());

最后关闭 Hive Catalog:

public static void closeCatalog() {
    if (hiveCatalog !=null) {
        hiveCatalog.close();
    }
}

此外,对于单元测试,构建合适的数据集也是一个非常大的功能,我们实现了 CollectionTableFactory,允许自己构建合适的数据集,使用方法如下:

CollectionTableFactory.reset();
CollectionTableFactory.initData(Arrays.asList(Row.of("this is a test"), Row.of("zhangying480"), Row.of("just for test"), Row.of("a test case")));
StringBuilder sbFilesSource=new StringBuilder();
sbFilesSource.append("CREATE temporary TABLE db1.`search_realtime_table_dump_p13`(" + "  `pvid` string) with ('connector.type'='COLLECTION','is-bounded'='true')");
tableEnv.executeSql(sbFilesSource.toString());

3. join 方式的选择

传统的离线 Batch SQL (面向有界数据集的 SQL) 有三种基础的实现方式,分别是 Nested-loop Join、Sort-Merge Join 和 Hash Join。


效率

空间

备注

Nested-loop Join

占用大


Sort-Merge Join

有sort merge开销

占用小

有序数据集的一种优化措施

Hash Join

占用大

适合大小表

  • Nested-loop Join 最为简单直接,将两个数据集加载到内存,并用内嵌遍历的方式来逐个比较两个数据集内的元素是否符合 Join 条件。Nested-loop Join 的时间效率以及空间效率都是最低的,可以使用:table.exec.disabled-operators:NestedLoopJoin 来禁用。以下两张图片是禁用前和禁用后的效果 (如果你的禁用没有生效,先看一下是不是 Equi-Join):
  • Sort-Merge Join 分为 Sort 和 Merge 两个阶段:首先将两个数据集进行分别排序,然后再对两个有序数据集分别进行遍历和匹配,类似于归并排序的合并。(Sort-Merge Join 要求对两个数据集进行排序,但是如果两个输入是有序的数据集,则可以作为一种优化方案)。
  • Hash Join 同样分为两个阶段:首先将一个数据集转换为 Hash Table,然后遍历另外一个数据集元素并与 Hash Table 内的元素进行匹配。第一阶段和第一个数据集分别称为 build 阶段和 build table;第二个阶段和第二个数据集分别称为 probe 阶段和 probe table。Hash Join 效率较高但是对空间要求较大,通常是作为 Join 其中一个表为适合放入内存的小表的情况下的优化方案 (并不是不允许溢写磁盘)。

注意:Sort-Merge Join 和 Hash Join 只适用于 Equi-Join ( Join 条件均使用等于作为比较算子)。

Flink 在 join 之上又做了一些细分,具体包括:


特点

使用

Repartition-Repartition strategy

对数据集分别进行分区和shuffle,如果数据集大的时候效率极差

两个数据集相差不大

Broadcast-Forward strategy

将小表的数据全部发送到大表数据的机器上

两个数据集有较大的差距

  • Repartition-Repartition strategy:Join 的两个数据集分别对它们的 key 使用相同的分区函数进行分区,并经过网络发送数据;
  • Broadcast-Forward strategy:大的数据集不做处理,另一个比较小的数据集全部复制到集群中一部分数据的机器上。

众所周知,batch 的 shuffle 非常耗时间。

  • 如果两个数据集有较大差距,建议采用 Broadcast-Forward strategy;
  • 如果两个数据集差不多,建议采用 Repartition-Repartition strategy。

可以通过:table.optimizer.join.broadcast-threshold 来设置采用 broadcast 的 table 大小,如果设置为 “-1”,表示禁用 broadcast。

下图为禁用前后的效果:

4. multiple input

在 Flink SQL 任务里,降低 shuffle 可以有效的提高 SQL 任务的吞吐量,在实际的业务场景中经常遇到这样的情况:上游产出的数据已经满足了数据分布要求 (如连续多个 join 算子,其中 key 是相同的),此时 Flink 的 forward shuffle 是冗余的 shuffle,我们希望将这些算子 chain 到一起。Flink 1.12 引入了 mutiple input 的特性,可以消除大部分没必要的 forward shuffle,把 source 的算子 chain 到一起。

table.optimizer.multiple-input-enabled:true

下图为开了 multiple input 和没有开的拓扑图 ( operator chain 功能已经打开):?

5. 对象重用

上下游 operator 之间会经过序列化 / 反序列化 / 复制阶段来进行数据传输,这种行为非常影响 Flink SQL 程序的性能,可以通过启用对象重用来提高性能。但是这在 DataStream 里面非常危险,因为可能会发生以下情况:在下一个算子中修改对象意外影响了上面算子的对象。

但是 Flink 的 Table / SQL API 中是非常安全的,可以通过如下方式来启用:

StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableObjectReuse();

或者是通过设置:pipeline-object-reuse:true

为什么启用了对象重用会有这么大的性能提升?在 Blink planner 中,同一任务的两个算子之间的数据交换最终将调用 BinaryString#copy,查看实现代码,可以发现 BinaryString#copy 需要复制底层 MemorySegment 的字节,通过启用对象重用来避免复制,可以有效提升效率。

下图为没有开启对象重用时相应的火焰图:

6. SQL 任务的 failover 策略

batch 任务模式下 checkpoint 以及其相关的特性全部都不可用,因此针对实时任务的基于 checkpoint 的 failover 策略是不能应用在批任务上面的,但是 batch 任务允许 Task 之间通过 Blocking Shuffle 进行通信,当一个 Task 因为任务未知的原因失败之后,由于 Blocking Shuffle 中存储了这个 Task 所需要的全部数据,所以只需要重启这个 Task 以及通过 Pipeline Shuffle 与其相连的全部下游任务即可:

jobmanager.execution.failover-strategy:region (已经 finish 的 operator 可直接恢复)

table.exec.shuffle-mode:ALL_EDGES_BLOCKING (shuffle 策略)。

7. shuffle

Flink 里的 shuffle 分为 pipeline shuffle 和 blocking shuffle。

  • pipeline shuffle 性能好,但是对资源的要求高,而且容错比较差 (会将该 operator 分到前面的一个 region 里面,对于 batch 任务来说,如果这个算子出问题,将从上一个 region 恢复);
  • blocking shuffle 就是传统的 batch shuffle,会将数据落盘,这种 shuffle 的容错好,但是会产生大量的磁盘、网络 io (如果为了省心的话,建议用 blocking suffle)。blocking shuffle 又分为 hash shuffle 和 sort shuffle,如果你的磁盘是 ssd 并且并发不太大的话,可以选择使用 hash shuffle,这种 shuffle 方式产生的文件多、随机读多,对磁盘 io 影响较大;如果你是 sata 并且并发比较大,可以选择用 sort-merge shuffle,这种 shuffle 产生的数据少,顺序读,不会产生大量的磁盘 io,不过开销会更大一些 (sort merge)。

相应的控制参数:

table.exec.shuffle-mode,该参数有多个参数,默认是 ALL_EDGES_BLOCKING,表示所有的边都会用 blocking shuffle,不过大家可以试一下 POINTWISE_EDGES_PIPELINED,表示 forward 和 rescale edges 会自动开始 pipeline 模式。

taskmanager.network.sort-shuffle.min-parallelism ,将这个参数设置为小于你的并行度,就可以开启 sort-merge shuffle;这个参数的设置需要考虑一些其他的情况,具体的可以按照官网设置。

三、总结

本文着重从 shuffle、join 方式的选择、对象重用、UDF 重用等方面介绍了京东在 Flink SQL 任务方面做的优化措施。另外,感谢京东实时计算研发部付海涛等全部同事的支持与帮助。

原文链接:http://click.aliyun.com/m/1000288770/

本文为阿里云原创内容,未经允许不得转载。

.创建主键

create table ESC_STOTE.TF_B_AIR_CONFIG(

TYPE_ID VARCHAR2(20) not null,

PROVINCE_CODE VARCHAR2(4) not null,

PROVINCE_TYPE VARCHAR2(2) not null,

LIMIT_NUM VARCHAR2(2) not null,

EFFECTIVE_FALG VARCHAR2 default '1',

UPDATE_TIME DATE default sysdate,

constraint TF_B_AIR_CONFIG_PK primary key(TYPE_ID)--单列主键

)

create table ECS_STORE.TF_B_AIR_CONFIG(

TYPE_ID VARCHAR2(20) not null,

PROVINCE_CODE VARCHAR2(4) not null,

PARAMETER_TYPE VARCHAR2(2) not null,

LIMIT_NUM VARCHAR2(4) not null,

EFFECTIVE_FALG VARCHAR2(2) default '1',

UPDATE_TIME DATE default sysdate,

constraint TF_B_AIR_CONFIG_PK primary key(TYPE_ID , PROVINCE_CODE)--复合主键

)

第二种:创建表后,再创建约束

alter table table_name add constraint constraint_name primary key(col1,col2,...coln);

示例:

----创建TF_B_AIR_CONFIG表

create table ECS_STORE.TF_B_AIR_CONFIG(

TYPE_ID VARCHAR2(20) not null,

PROVINCE_CODE VARCHAR2(4) not null,

PARAMETER_TYPE VARCHAR2(2) not null,

LIMIT_NUM VARCHAR2(4) not null,

EFFECTIVE_FALG VARCHAR2(2) default '1',

UPDATE_TIME DATE default sysdate

)

--单列主键

alter table ECS_STORE.TF_B_AIR_CONFIG add constraint TF_B_AIR_CONFIG_PK primary key (TYPE_ID);

--联合主键

alter table ECS_STORE.TF_B_AIR_CONFIG add constraint TF_B_AIR_CONFIG_PK primary key (TYPE_ID , PROVINCE_CODE);

2.禁用主键

alter table table_name disable constraint constraint_name;

alter table ECS_STORE.TF_B_AIR_CONFIG disable constraint TF_B_AIR_CONFIG_PK ;

3.启用主键

alter table table_name enable constraint constraint_name;

alter table ECS_STORE.TF_B_AIR_CONFIG enable constraint TF_B_AIR_CONFIG_PK ;

4.删除主键

alter table table_name drop constraint constraint_name;