菜鸟笔记
提升您的技术认知

hive-ag真人游戏

阅读 : 499

yarn.nodemanager.resource.memory-mb

该参数的含义是,一个nodemanager节点分配给container使用的内存。该参数的配置,取决于nodemanager所在节点的总内存容量和该节点运行的其他服务的数量。

考虑上述因素,此处可将该参数设置为64g,如下:


    yarn.nodemanager.resource.memory-mb
    65536

yarn.nodemanager.resource.cpu-vcores

该参数的含义是,一个nodemanager节点分配给container使用的cpu核数。该参数的配置,同样取决于nodemanager所在节点的总cpu核数和该节点运行的其他服务。

考虑上述因素,此处可将该参数设置为16。尽量1cpu对应的4g内存


    yarn.nodemanager.resource.cpu-vcores
    16

yarn.scheduler.maximum-allocation-mb

该参数的含义是,单个container能够使用的最大内存。由于spark的yarn模式下,driver和executor都运行在container中,故该参数不能小于driver和executor的内存配置,推荐配置如下:


    yarn.scheduler.maximum-allocation-mb
    16384

yarn.scheduler.minimum-allocation-mb

该参数的含义是,单个container能够使用的最小内存,推荐配置如下:


    yarn.scheduler.minimum-allocation-mb
    512

修改$hadoop_home/etc/hadoop/yarn-site.xml文件


    yarn.nodemanager.resource.memory-mb
    65536


    yarn.nodemanager.resource.cpu-vcores
    16


    yarn.scheduler.maximum-allocation-mb
    16384


    yarn.scheduler.minimum-allocation-mb
    512

executor配置说明

executor cpu核数配置

单个executor的cpu核数,由spark.executor.cores参数决定,建议配置为4-6,具体配置为多少,视具体情况而定,原则是尽量充分利用资源。

此处单个节点共有16个核可供executor使用,则spark.executor.core配置为4最合适。原因是,若配置为5,则单个节点只能启动3个executor,会剩余1个核未使用;若配置为6,则只能启动2个executor,会剩余4个核未使用。

executor内存配置

spark在yarn模式下的executor内存模型如下图所示:

executor相关的参数有:spark.executor.memory和spark.executor.memoryoverhead。spark.executor.memory用于指定executor进程的堆内存大小,这部分内存用于任务的计算和存储;spark.executor.memoryoverhead用于指定executor进程的堆外内存,这部分内存用于jvm的额外开销,操作系统开销等。两者的和才算一个executor进程所需的总内存大小。默认情况下spark.executor.memoryoverhead的值等于spark.executor.memory*0.1。

以上两个参数的推荐配置思路是,先按照单个nodemanager的核数和单个executor的核数,计算出每个nodemanager最多能运行多少个executor。在将nodemanager的总内存平均分配给每个executor,最后再将单个executor的内存按照大约10:1的比例分配到spark.executor.memory和spark.executor.memoryoverhead。

根据上述思路,可得到如下关系:

(spark.executor.memory spark.executor.memoryoverhead)= yarn.nodemanager.resource.memory-mb * (spark.executor.cores/yarn.nodemanager.resource.cpu-vcores)

经计算,此处应做如下配置:

spark.executor.memory    14g
spark.executor.memoryoverhead    2g

executor个数配置

此处的executor个数是指分配给一个spark应用的executor个数,executor个数对于spark应用的执行速度有很大的影响,所以executor个数的确定十分重要。

一个spark应用的executor个数的指定方式有两种,静态分配动态分配

静态分配

可通过spark.executor.instances指定一个spark应用启动的executor个数。这种方式需要自行估计每个spark应用所需的资源,并为每个应用单独配置executor个数。

动态分配

动态分配可根据一个spark应用的工作负载,动态的调整其所占用的资源(executor个数)。这意味着一个spark应用程序可以在运行的过程中,需要时,申请更多的资源(启动更多的executor),不用时,便将其释放。

在生产集群中,推荐使用动态分配。动态分配相关参数如下:

#启动动态分配
spark.dynamicallocation.enabled    true
#启用spark shuffle服务
spark.shuffle.service.enabled    true
#executor个数初始值
spark.dynamicallocation.initialexecutors    1
#executor个数最小值
spark.dynamicallocation.minexecutors    1
#executor个数最大值
spark.dynamicallocation.maxexecutors    12
#executor空闲时长,若某executor空闲时间超过此值,则会被关闭
spark.dynamicallocation.executoridletimeout    60s
#积压任务等待时长,若有task等待时间超过此值,则申请启动新的executor
spark.dynamicallocation.schedulerbacklogtimeout    1s
#使用旧版的shuffle文件fetch协议
spark.shuffle.useoldfetchprotocol true

说明:spark shuffle服务的作用是管理executor中的各task的输出文件,主要是shuffle过程map端的输出文件。由于启用资源动态分配后,spark会在一个应用未结束前,将已经完成任务,处于空闲状态的executor关闭。executor关闭后,其输出的文件,也就无法供其他executor使用了。需要启用spark shuffle服务,来管理各executor输出的文件,这样就能关闭空闲的executor,而不影响后续的计算任务了。

driver主要配置内存即可,相关的参数有spark.driver.memory和spark.driver.memoryoverhead。

spark.driver.memory用于指定driver进程的堆内存大小,spark.driver.memoryoverhead用于指定driver进程的堆外内存大小。默认情况下,两者的关系如下:spark.driver.memoryoverhead=spark.driver.memory*0.1。两者的和才算一个driver进程所需的总内存大小。

一般情况下,按照如下经验进行调整即可:假定yarn.nodemanager.resource.memory-mb设置为x,

若x>50g,则driver可设置为12g,

若12g

若1g

此处yarn.nodemanager.resource.memory-mb为64g,则driver的总内存可分配12g,所以上述两个参数可配置为

spark.driver.memory    10g
spark.yarn.driver.memoryoverhead    2g

配置实操

修改spark-defaults.conf文件

修改$hive_home/conf/spark-defaults.conf

spark.master                               yarn
spark.eventlog.enabled                   true
spark.eventlog.dir    hdfs://mynameservice1/spark-history
spark.executor.cores    4
spark.executor.memory    14g
spark.executor.memoryoverhead    2g
spark.driver.memory    10g
spark.driver.memoryoverhead    2g
spark.dynamicallocation.enabled  true
spark.shuffle.service.enabled  true
spark.dynamicallocation.executoridletimeout  60s
spark.dynamicallocation.initialexecutors    1
spark.dynamicallocation.minexecutors  1
spark.dynamicallocation.maxexecutors  11
spark.dynamicallocation.schedulerbacklogtimeout 1s
spark.shuffle.useoldfetchprotocol    true

修改$hive_home/conf/hive-site.xml配置文件


    spark.dynamicallocation.enabled
    true

配置spark shuffle服务

spark shuffle服务的配置因cluster manager(standalone、mesos、yarn)的不同而不同。此处以yarn作为cluster manager。

(1)拷贝$spark_home/yarn/spark-3.0.0-yarn-shuffle.jar到

$hadoop_home/share/hadoop/yarn/lib

(2)分发$hadoop_home/share/hadoop/yarn/lib/yarn/spark-3.0.0-yarn-shuffle.jar

(3)修改$hadoop_home/etc/hadoop/yarn-site.xml文件


    yarn.nodemanager.aux-services
    mapreduce_shuffle,spark_shuffle


    yarn.nodemanager.aux-services.spark_shuffle.class
    org.apache.spark.network.yarn.yarnshuffleservice

(4)分发$hadoop_home/etc/hadoop/yarn-site.xml文件

(5)重启yarn

hive sql的执行计划,可由explain查看。

explain呈现的执行计划,由一系列stage组成,这一系列stage具有依赖关系,每个stage对应一个mapreduce job或者spark job,或者一个文件系统操作等。

每个stage由一系列的operator组成,一个operator代表一个逻辑操作,例如tablescan operator,select operator,join operator等。

stage与operator的对应关系如下图:

分组聚合优化 

select
    coupon_id,
    count(*)
from dwd_trade_order_detail_inc
where dt='2020-06-16'
group by coupon_id;

优化前执行计划

优化思路 

优化思路为map-side聚合。所谓map-side聚合,就是在map端维护一个hash table,利用其完成分区内的、部分的聚合,然后将部分聚合的结果,发送至reduce端,完成最终的聚合。map-side聚合能有效减少shuffle的数据量,提高分组聚合运算的效率。

map-side 聚合相关的参数如下:

--启用map-side聚合
set hive.map.aggr=true;
--hash map占用map端内存的最大比例
set hive.map.aggr.hash.percentmemory=0.5;

优化后执行计划

join优化

hive join算法概述

hive拥有多种join算法,包括common join,map join,bucket map join等。下面对每种join算法做简要说明:

common join

map端负责读取参与join的表的数据,并按照关联字段进行分区,将其发送到reduce端,reduce端完成最终的关联操作。

map join 

若参与join的表中,有n-1张表足够小,map端就会缓存小表全部数据,然后扫描另外一张大表,在map端完成关联操作。

bucket map join

若参与join的表均为分桶表,且关联字段为分桶字段,且大表的分桶数量是小表分桶数量的整数倍。此时,就可以以分桶为单位,为每个map分配任务了,此时map端就无需再缓存小表的全表数据了,而只需缓存其所需的分桶。

map join优化

select
    *
from
(
    select
        *
    from dwd_trade_order_detail_inc
    where dt='2020-06-16'
)fact
left join
(
    select
        *
    from dim_sku_full
    where dt='2020-06-16'
)dim
on fact.sku_id=dim.id;

优化前执行计划

优化思路

上述参与join的两表一大一小,可考虑map join优化。

map join相关参数如下:

--启用map join自动转换
set hive.auto.convert.join=true;
--common join转map join小表阈值
set hive.auto.convert.join.noconditionaltask.size=1xxxx;

优化后执行计划

数据倾斜优化 

数据倾斜说明

数据倾斜问题,通常是指参与计算的数据分布不均,即某个key或者某些key的数据量远超其他key,导致在shuffle阶段,大量相同key的数据被发往一个reduce,进而导致该reduce所需的时间远超其他reduce,成为整个任务的瓶颈。

hive中的数据倾斜常出现在分组聚合和join操作的场景中,下面分别介绍在上述两种场景下的优化思路。

分组聚合导致的数据倾斜

select
    province_id,
    count(*)
from dwd_trade_order_detail_inc
where dt='2020-06-16'
group by province_id;

优化前执行计划

优化思路

由分组聚合导致的数据倾斜问题主要有以下两种优化思路:

启用map-side聚合 

--启用map-side聚合
set hive.map.aggr=true;
--hash map占用map端内存的最大比例
set hive.map.aggr.hash.percentmemory=0.5;

启用map-side聚合后的执行计划如下图所示

启用skew groupby优化 

--启用分组聚合数据倾斜优化
set hive.groupby.skewindata=true;

join导致的数据倾斜

优化思路

由join导致的数据倾斜问题主要有以下两种优化思路:

使用map join

--启用map join自动转换
set hive.auto.convert.join=true;
--common join转map join小表阈值
set hive.auto.convert.join.noconditionaltask.size

使用map join优化后执行计划如下图

启用skew join优化

其原理如下图

--启用skew join优化
set hive.optimize.skewjoin=true;
--触发skew join的阈值,若某个key的行数超过该参数值,则触发
set hive.skewjoin.key=100000;

需要注意的是,skew join只支持inner join。

启动skew join优化后的执行计划如下图所示:

任务并行度优化

map阶段并行度

map端的并行度,也就是map的个数。是由输入文件的切片数决定的。一般情况下,map端的并行度无需手动调整。map端的并行度相关参数如下:

--可将多个小文件切片,合并为一个切片,进而由一个map任务处理
set hive.input.format=org.apache.hadoop.hive.ql.io.combinehiveinputformat; 
--一个切片的最大值
set mapreduce.input.fileinputformat.split.maxsize=256000000;

reduce阶段并行度

reduce端的并行度,相对来说,更需要关注。默认情况下,hive会根据reduce端输入数据的大小,估算一个reduce并行度。但是在某些情况下,其估计值不一定是最合适的,此时则需要人为调整其并行度。

reduce并行度相关参数如下:

--指定reduce端并行度,默认值为-1,表示用户未指定
set mapreduce.job.reduces;
--reduce端并行度最大值
set hive.exec.reducers.max;
--单个reduce task计算的数据量,用于估算reduce并行度
set hive.exec.reducers.bytes.per.reducer;

reduce端并行度的确定逻辑为,若指定参数mapreduce.job.reduces的值为一个非负整数,则reduce并行度为指定值。否则,hive会自行估算reduce并行度,估算逻辑如下:

假设reduce端输入的数据量大小为totalinputbytes

参数hive.exec.reducers.bytes.per.reducer的值为bytesperreducer

参数hive.exec.reducers.max的值为maxreducers

则reduce端的并行度为:

其中,reduce端输入的数据量大小,是从reduce上游的operator的statistics(统计信息)中获取的。为保证hive能获得准确的统计信息,需配置如下参数:

--执行dml语句时,收集表级别的统计信息
set hive.stats.autogather=true;
--执行dml语句时,收集字段级别的统计信息
set hive.stats.column.autogather=true;
--计算reduce并行度时,从上游operator统计信息获得输入数据量
set hive.spark.use.op.stats=true;
--计算reduce并行度时,使用列级别的统计信息估算输入数据量
set hive.stats.fetch.column.stats=true;

小文件合并优化

优化说明

小文件合并优化,分为两个方面,分别是map端输入的小文件合并,和reduce端输出的小文件合并。

map端输入文件合并

合并map端输入的小文件,是指将多个小文件划分到一个切片中,进而由一个map task去处理。目的是防止为单个小文件启动一个map task,浪费计算资源。

相关参数为:

--可将多个小文件切片,合并为一个切片,进而由一个map任务处理
set hive.input.format=org.apache.hadoop.hive.ql.io.combinehiveinputformat; 

reduce输出文件合并

合并reduce端输出的小文件,是指将多个小文件合并成大文件。目的是减少hdfs小文件数量。

相关参数为:

--开启合并hive on spark任务输出的小文件
set hive.merge.sparkfiles=true;

其他优化

 cbo优化

优化说明

cbo是指cost based optimizer,即基于计算成本的优化。

在hive中,计算成本模型考虑到了:数据的行数、cpu、本地io、hdfs io、网络io等方面。hive会计算同一sql语句的不同执行计划的计算成本,并选出成本最低的执行计划。目前cbo在hive中主要用于join的优化,例如多表join的join顺序。

相关参数为:

--是否启用cbo优化 
set hive.cbo.enable=true;

关闭cbo优化

--关闭cbo优化 
set hive.cbo.enable=false;
--为了测试效果更加直观,关闭map join自动转换
set hive.auto.convert.join=false;

谓词下推

谓词下推(predicate pushdown)是指,尽量将过滤操作前移,以减少后续计算步骤的数据量。

相关参数为:

--是否启动谓词下推(predicate pushdown)优化
set hive.optimize.ppd = true;

需要注意的是:

cbo优化也会完成一部分的谓词下推优化工作,因为在执行计划中,谓词越靠前,整个计划的计算成本就会越低

矢量化查询

hive的矢量化查询优化,依赖于cpu的矢量化计算,cpu的矢量化计算的基本原理如下图:

hive的矢量化查询,可以极大的提高一些典型查询场景(例如scans, filters, aggregates, and joins)下的cpu使用效率。

相关参数如下:

set hive.vectorized.execution.enabled=true;

 若执行计划中,出现“execution mode: vectorized”字样,即表明使用了矢量化计算。

参考资料:

1.https://docs.cloudera.com/documentation/enterprise/6/6.3/topics/admin_hos_tuning.html#hos_tuning

2.hive on spark: getting started - apache hive - apache software foundation

网站地图