菜鸟笔记
提升您的技术认知

hive提升篇-ag真人游戏

阅读 : 420

简介

hive 默认是不允许数据更新操作的,毕竟它不擅长,即使在0.14版本后,做一些额外的配置便可开启hive数据更新操作。而在海量数据场景下做update、delete之类的行级数据操作时,效率并不如意。

简单使用

修改hive_home/conf/hive-site.xml,添加如下配置


    hive.support.concurrency
    true


    hive.exec.dynamic.partition.mode
    nonstrict


    hive.txn.manager
    org.apache.hadoop.hive.ql.lockmgr.dbtxnmanager


    hive.compactor.initiator.on
    true


    hive.compactor.worker.threads
    1

建表

create table if not exists accountinfo(
id int,
name string,
age int
)
clustered by (id) into 4 buckets 
stored as orc tblproperties ('transactional'='true');

建表须知

1 注意存储格式按orc方式
2 进行数据分桶
3 添加表属性:‘transactional’=‘true’

分发配置到其他hive节点。

测试如下

插入测试数据

更新数据(此处配置了hive on tez)

数据删除

hive作为数仓常用技术工具,更多的是用于数据的存储分析,而比较少涉及到数据更新。并且在olap场景下并不适合做原有数据更新,更不用说行级别的细粒度操作。记得在一些状态更新场景下会有缓慢渐变维的运用,可即使如此也要运用拉链表保存历史数据,很少将原有数据直接覆盖;你不知道被覆盖的数据蕴含着怎样的价值。

而在一些海量oltp场景中,也会运用hbase去替代传统rdb架构;若在运用中伴有大量的数据更新操作,我想hbase会是不错的选择。

hive事务原理简介

apache hive 0.13 版本引入了事务特性,能够在 hive 表上实现 acid 语义,包括 insert/update/delete/merge 语句、增量数据抽取等。hive 3.0 又对该特性进行了优化,包括改进了底层的文件组织方式,减少了对表结构的限制,以及支持条件下推和向量化查询。hive 事务表的介绍和使用方法可以参考 hive wiki 和 各类教程,本文将重点讲述 hive 事务表是如何在 hdfs 上存储的,及其读写过程是怎样的。

文件结构

插入数据

1
2
3
4
5
6
7
create table employee (id int, name string, salary int)
stored as orc tblproperties ('transactional' = 'true');
insert into employee values
(1, 'jerry', 5000),
(2, 'tom',   8000),
(3, 'kate',  6000);

insert 语句会在一个事务中运行。它会创建名为 delta 的目录,存放事务的信息和表的数据。

1
2
3
/user/hive/warehouse/employee/delta_0000001_0000001_0000
/user/hive/warehouse/employee/delta_0000001_0000001_0000/_orc_acid_version
/user/hive/warehouse/employee/delta_0000001_0000001_0000/bucket_00000

目录名称的格式为 delta_minwid_maxwid_stmtid,即 delta 前缀、写事务的 id 范围、以及语句 id。具体来说:

  • 所有 insert 语句都会创建 delta 目录。update 语句也会创建 delta 目录,但会先创建一个 delete 目录,即先删除、后插入。delete 目录的前缀是 delete_delta;
  • hive 会为所有的事务生成一个全局唯一的 id,包括读操作和写操作。针对写事务(insert、delete 等),hive 还会创建一个写事务 id(write id),该 id 在表范围内唯一。写事务 id 会编码到 delta 和 delete 目录的名称中;
  • 语句 id(statement id)则是当一个事务中有多条写入语句时使用的,用作唯一标识。

再看文件内容,_orc_acid_version 的内容是 2,即当前 acid 版本号是 2。它和版本 1 的主要区别是 update 语句采用了 split-update 特性,即上文提到的先删除、后插入。这个特性能够使 acid 表支持条件下推等功能,具体可以查看 hive-14035。bucket_00000 文件则是写入的数据内容。由于这张表没有分区和分桶,所以只有这一个文件。事务表都以 orc 格式存储的,我们可以使用 orc-tools 来查看文件的内容:

1
2
3
4
$ orc-tools data bucket_00000
{"operation":0,"originaltransaction":1,"bucket":536870912,"rowid":0,"currenttransaction":1,"row":{"id":1,"name":"jerry","salary":5000}}
{"operation":0,"originaltransaction":1,"bucket":536870912,"rowid":1,"currenttransaction":1,"row":{"id":2,"name":"tom","salary":8000}}
{"operation":0,"originaltransaction":1,"bucket":536870912,"rowid":2,"currenttransaction":1,"row":{"id":3,"name":"kate","salary":6000}}

输出内容被格式化为了一行行的 json 字符串,我们可以看到具体数据是在 row 这个键中的,其它键则是 hive 用来实现事务特性所使用的,具体含义为:

  • operation 0 表示插入,1 表示更新,2 表示删除。由于使用了 split-update,update 是不会出现的;
  • originaltransaction 是该条记录的原始写事务 id。对于 insert 操作,该值和 currenttransaction 是一致的。对于 delete,则是该条记录第一次插入时的写事务 id;
  • bucket 是一个 32 位整型,由 bucketcodec 编码,各个二进制位的含义为:
    • 1-3 位:编码版本,当前是 001
    • 4 位:保留;
    • 5-16 位:分桶 id,由 0 开始。分桶 id 是由 clustered by 子句所指定的字段、以及分桶的数量决定的。该值和 bucket_n 中的 n 一致;
    • 17-20 位:保留;
    • 21-32 位:语句 id;
    • 举例来说,整型 536936448 的二进制格式为 00100000000000010000000000000000,即它是按版本 1 的格式编码的,分桶 id 为 1;
  • rowid 是一个自增的唯一 id,在写事务和分桶的组合中唯一;
  • currenttransaction 当前的写事务 id;
  • row 具体数据。对于 delete 语句,则为 null

我们可以注意到,文件中的数据会按 (originaltransactionbucketrowid) 进行排序,这点对后面的读取操作非常关键。

这些信息还可以通过 row__id 这个虚拟列进行查看:

1
select row__id, id, name, salary from employee;

输出结果为:

1
2
3
{"writeid":1,"bucketid":536870912,"rowid":0}    1       jerry   5000
{"writeid":1,"bucketid":536870912,"rowid":1}    2       tom     8000
{"writeid":1,"bucketid":536870912,"rowid":2}    3       kate    6000

增量数据抽取 api v2

hive 3.0 还改进了先前的 增量抽取 api,通过这个 api,用户或第三方工具(flume 等)就可以利用 acid 特性持续不断地向 hive 表写入数据了。这一操作同样会生成 delta 目录,但更新和删除操作不再支持。

1
2
3
4
5
6
streamingconnection connection = hivestreamingconnection.newbuilder().connect();
connection.begintransaction();
connection.write("11,val11,asia,china".getbytes());
connection.write("12,val12,asia,india".getbytes());
connection.committransaction();
connection.close();

更新数据

1
update employee set salary = 7000 where id = 2;

这条语句会先查询出所有符合条件的记录,获取它们的 row__id 信息,然后分别创建 delete 和 delta 目录:

1
2
3
/user/hive/warehouse/employee/delta_0000001_0000001_0000/bucket_00000
/user/hive/warehouse/employee/delete_delta_0000002_0000002_0000/bucket_00000
/user/hive/warehouse/employee/delta_0000002_0000002_0000/bucket_00000

delete_delta_0000002_0000002_0000/bucket_00000 包含了删除的记录:

1
{"operation":2,"originaltransaction":1,"bucket":536870912,"rowid":1,"currenttransaction":2,"row":null}

delta_0000002_0000002_0000/bucket_00000 包含更新后的数据:

1
{"operation":0,"originaltransaction":2,"bucket":536870912,"rowid":0,"currenttransaction":2,"row":{"id":2,"name":"tom","salary":7000}}

delete 语句的工作方式类似,同样是先查询,后生成 delete 目录。

合并表

merge 语句和 mysql 的 insert on update 功能类似,它可以将来源表的数据合并到目标表中:

1
2
3
4
5
6
7
8
9
create table employee_update (id int, name string, salary int);
insert into employee_update values
(2, 'tom',  7000),
(4, 'mary', 9000);
merge into employee as a
using employee_update as b on a.id = b.id
when matched then update set salary = b.salary
when not matched then insert values (b.id, b.name, b.salary);

这条语句会更新 tom 的薪资字段,并插入一条 mary 的新记录。多条 when 子句会被视为不同的语句,有各自的语句 id(statement id)。insert 子句会创建 delta_0000002_0000002_0000 文件,内容是 mary 的数据;update 语句则会创建 delete_delta_0000002_0000002_0001 和 delta_0000002_0000002_0001 两个文件,删除并新增 tom 的数据。

1
2
3
4
/user/hive/warehouse/employee/delta_0000001_0000001_0000
/user/hive/warehouse/employee/delta_0000002_0000002_0000
/user/hive/warehouse/employee/delete_delta_0000002_0000002_0001
/user/hive/warehouse/employee/delta_0000002_0000002_0001

压缩

随着写操作的积累,表中的 delta 和 delete 文件会越来越多。事务表的读取过程中需要合并所有文件,数量一多势必会影响效率。此外,小文件对 hdfs 这样的文件系统也是不够友好的。因此,hive 引入了压缩(compaction)的概念,分为 minor 和 major 两类。

minor compaction 会将所有的 delta 文件压缩为一个文件,delete 也压缩为一个。压缩后的结果文件名中会包含写事务 id 范围,同时省略掉语句 id。压缩过程是在 hive metastore 中运行的,会根据一定阈值自动触发。我们也可以使用如下语句人工触发:

1
alter table employee compact 'minor';

以上文中的 merge 语句的结果举例,在运行了一次 minor compaction 后,文件目录结构将变为:

1
2
/user/hive/warehouse/employee/delete_delta_0000001_0000002
/user/hive/warehouse/employee/delta_0000001_0000002

在 delta_0000001_0000002/bucket_00000 文件中,数据会被排序和合并起来,因此文件中将包含两行 tom 的数据。minor compaction 不会删除任何数据。

major compaction 则会将所有文件合并为一个文件,以 base_n 的形式命名,其中 n 表示最新的写事务 id。已删除的数据将在这个过程中被剔除。row__id 则按原样保留。

1
/user/hive/warehouse/employee/base_0000002

需要注意的是,在 minor 或 major compaction 执行之后,原来的文件不会被立刻删除。这是因为删除的动作是在另一个名为 cleaner 的线程中执行的。因此,表中可能同时存在不同事务 id 的文件组合,这在读取过程中需要做特殊处理。

读取过程

我们可以看到 acid 事务表中会包含三类文件,分别是 basedelta、以及 delete。文件中的每一行数据都会以 row__id 作为标识并排序。从 acid 事务表中读取数据就是对这些文件进行合并,从而得到最新事务的结果。这一过程是在 orcinputformat 和 orcrawrecordmerger 类中实现的,本质上是一个合并排序的算法。

以下列文件为例,产生这些文件的操作为:插入三条记录,进行一次 major compaction,然后更新两条记录。1-0-0-1 是对 originaltransaction - bucketid - rowid - currenttransaction 的缩写。

1
2
3
4
5
6
7
 ----------      ----------      ---------- 
| base_1   |    | delete_2 |    | delta_2  |
 ----------      ----------      ---------- 
| 1-0-0-1  |    | 1-0-1-2  |    | 2-0-0-2  |
| 1-0-1-1  |    | 1-0-2-2  |    | 2-0-1-2  |
| 1-0-2-1  |     ----------      ---------- 
 ---------- 

合并过程为:

  • 对所有数据行按照 (originaltransactionbucketidrowid) 正序排列,(currenttransaction) 倒序排列,即:
    • 1-0-0-1
    • 1-0-1-2
    • 1-0-1-1
    • 2-0-1-2
  • 获取第一条记录;
  • 如果当前记录的 row__id 和上条数据一样,则跳过;
  • 如果当前记录的操作类型为 delete,也跳过;
    • 通过以上两条规则,对于 1-0-1-2 和 1-0-1-1,这条记录会被跳过;
  • 如果没有跳过,记录将被输出给下游;
  • 重复以上过程。

合并过程是流式的,即 hive 会将所有文件打开,预读第一条记录,并将 row__id 信息存入到 readerkey 类型中。该类型实现了 comparable 接口,因此可以按照上述规则自定义排序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class recordidentifier implements writablecomparable {
  private long writeid;
  private int bucketid;
  private long rowid;
  protected int comparetointernal(recordidentifier other) {
    if (other == null) { return -1; }
    if (writeid != other.writeid) { return writeid < other.writeid ? -1 : 1; }
    if (bucketid != other.bucketid) { return bucketid < other.bucketid ? - 1 : 1; }
    if (rowid != other.rowid) { return rowid < other.rowid ? -1 : 1; }
    return 0;
  }
}
public class readerkey extends recordidentifier {
  private long currentwriteid;
  private boolean isdeleteevent = false;
  public int compareto(recordidentifier other) {
    int sup = comparetointernal(other);
    if (sup == 0) {
      if (other.getclass() == readerkey.class) {
        readerkey oth = (readerkey) other;
        if (currentwriteid != oth.currentwriteid) { return currentwriteid < oth.currentwriteid ?  1 : -1; }
        if (isdeleteevent != oth.isdeleteevent) { return isdeleteevent ? -1 :  1; }
      } else {
        return -1;
      }
    }
    return sup;
  }
}

然后,readerkey 会和文件句柄一起存入到 treemap 结构中。根据该结构的特性,我们每次获取第一个元素时就能得到排序后的结果,并读取数据了。

1
2
3
4
5
6
public class orcrawrecordmerger {
  private treemap readers = new treemap<>();
  public boolean next(recordidentifier recordidentifier, orcstruct prev) {
    map.entry entry = readers.pollfirstentry();
  }
}

选择文件

上文中提到,事务表目录中会同时存在多个事务的快照文件,因此 hive 首先要选择出反映了最新事务结果的文件集合,然后再进行合并。举例来说,下列文件是一系列操作后的结果:两次插入,一次 minor compaction,一次 major compaction,一次删除。

1
2
3
4
5
delta_0000001_0000001_0000
delta_0000002_0000002_0000
delta_0000001_0000002
base_0000002
delete_delta_0000003_0000003_0000

过滤过程为:

  • 从 hive metastore 中获取所有成功提交的写事务 id 列表;
  • 从文件名中解析出文件类型、写事务 id 范围、以及语句 id;
  • 选取写事务 id 最大且合法的那个 base 目录,如果存在的话;
  • 对 delta 和 delete 文件进行排序:
    • minwid 较小的优先;
    • 如果 minwid 相等,则 maxwid 较大的优先;
    • 如果都相等,则按 stmtid 排序;没有 stmtid 的会排在前面;
  • 将 base 文件中的写事务 id 作为当前 id,循环过滤所有 delta 文件:
    • 如果 maxwid 大于当前 id,则保留这个文件,并以此更新当前 id;
    • 如果 id 范围相同,也会保留这个文件;
    • 重复上述步骤。

过滤过程中还会处理一些特别的情况,如没有 base 文件,有多条语句,包含原始文件(即不含 row__id 信息的文件,一般是通过 load data 导入的),以及 acid 版本 1 格式的文件等。具体可以参考 acidutils#getacidstate 方法。

并行执行

在 map-reduce 模式下运行 hive 时,多个 mapper 是并行执行的,这就需要将 delta 文件按一定的规则组织好。简单来说,base 和 delta 文件会被分配到不同的分片(split)中,但所有分片都需要能够读取所有的 delete 文件,从而根据它们忽略掉已删除的记录。

向量化查询

当 向量化查询 特性开启时,hive 会尝试将所有的 delete 文件读入内存,并维护一个特定的数据结构,能够快速地对数据进行过滤。如果内存放不下,则会像上文提到的过程一样,逐步读取 delete 文件,使用合并排序的算法进行过滤。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class vectorizedorcacidrowbatchreader {
  private final deleteeventregistry deleteeventregistry;
  protected static interface deleteeventregistry {
    public void finddeletedrecords(columnvector[] cols, int size, bitset selectedbitset);
  }
  static class columnizeddeleteeventregistry implements deleteeventregistry {}
  static class sortmergeddeleteeventregistry implements deleteeventregistry {}
  public boolean next(nullwritable key, vectorizedrowbatch value) {
    bitset selectedbitset = new bitset(vectorizedrowbatchbase.size);
    this.deleteeventregistry.finddeletedrecords(innerrecordidcolumnvector,
        vectorizedrowbatchbase.size, selectedbitset);
    for (int setbitindex = selectedbitset.nextsetbit(0), selecteditr = 0;
        setbitindex >= 0;
        setbitindex = selectedbitset.nextsetbit(setbitindex 1),   selecteditr) {
      value.selected[selecteditr] = setbitindex;
    }
  }
}

事务管理

为了实现 acid 事务机制,hive 还引入了新的事务管理器 dbtxnmanager,它能够在查询计划中分辨出 acid 事务表,联系 hive metastore 打开新的事务,完成后提交事务。它也同时实现了过去的读写锁机制,用来支持非事务表的情形。

hive metastore 负责分配新的事务 id。这一过程是在一个数据库事务中完成的,从而避免多个 metastore 实例冲突的情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
abstract class txnhandler {
  private list opentxns(connection dbconn, statement stmt, opentxnrequest rqst) {
    string s = sqlgenerator.addforupdateclause("select ntxn_next from next_txn_id");
    s = "update next_txn_id set ntxn_next = "   (first   numtxns);
    for (long i = first; i < first   numtxns; i  ) {
      txnids.add(i);
      rows.add(i   ","   quotechar(txn_open)   ","   now   ","   now   ","
            quotestring(rqst.getuser())   ","   quotestring(rqst.gethostname())   ","   txntype.getvalue());
    }
    list queries = sqlgenerator.createinsertvaluesstmt(
        "txns (txn_id, txn_state, txn_started, txn_last_heartbeat, txn_user, txn_host, txn_type)", rows);
  }
}

ps: 向量化查询

向量化查询执行是hive特性,可以大大减少典型查询操作(如扫描,过滤器,聚合和连接)的cpu使用率。一个标准的查询执行系统一次处理一行。这涉及在执行的内部循环中长的代码路径和重要的元数据解释。目前hive也严重依赖于惰性的反序列化,数据列通过一层对象检查器来识别列类型,反序列化数据并在内部循环中确定合适的表达式例程。这些虚拟方法调用层进一步减慢了处理速度。向量化的查询执行通过一次处理1024行的数据块来简化操作。在块内,每一列都被存储为一个向量(一个基本数据类型的数组)。算术和比较等简单操作是通过在紧密循环中快速迭代向量来完成的,在循环内没有或很少有函数调用或条件分支。这些循环以简化的方式进行编译,使用相对较少的指令,并通过有效地使用处理器流水线和高速缓存存储器,以较少的时钟周期完成每条指令

网站地图