Hive queryoptimization

LearningHivequeryoptimizationtechniques, including执行计划analysis, partition裁剪, 谓词 under 推, Joinoptimization and data倾斜processing

1. queryoptimizationoverview

Hivequeryoptimization is improvingHivequeryperformance 关键techniques. 由于Hiveprocessing is large-scaledata, queryoptimization可以显著reducingquery执行时间, improvingsystemthroughput.

1.1 queryoptimization important 性

  • reducing执行时间: optimization after query可以 in 更 short 时间 in completion, improvinguser体验
  • 降 low resource消耗: optimization after queryusing更 few CPU, memory and I/Oresource
  • improvingsystemthroughput: system可以同时processing更 many queryrequest
  • 降Low Cost: reducingresourceusing意味着reduced硬件 and maintenance成本

1.2 queryoptimization 层次

Hivequeryoptimization可以分 for 以 under 几个层次:

  1. SQL语法optimization: writing high 效 SQL语句
  2. 逻辑queryoptimization: through改写query逻辑来optimization执行计划
  3. 物理queryoptimization: 选择最优 物理执行计划
  4. systemconfigurationoptimization: 调整Hive and Hadoop configurationparameter
  5. datastoreoptimization: optimizationdatastore格式 and 组织方式

2. 执行计划analysis

执行计划 is Hivequeryoptimization Basics, throughanalysis执行计划可以Understandquery 执行过程 and performance瓶颈.

2.1 查看执行计划

usingEXPLAIN语句查看Hivequery 执行计划:

-- 查看basic执行计划
hive> EXPLAIN SELECT * FROM users WHERE age > 18;

-- 查看scale执行计划
hive> EXPLAIN EXTENDED SELECT * FROM users WHERE age > 18;

-- 查看format 执行计划 (Hive 2.2.0+) 
hive> EXPLAIN FORMATTED SELECT * FROM users WHERE age > 18;

-- 查看执行计划 依赖relationships
hive> EXPLAIN DEPENDENCY SELECT * FROM users JOIN orders ON users.id = orders.user_id;

2.2 执行计划structure

Hive执行计划主要package含以 under component:

  • STAGE: query执行 阶段, 每个阶段 for 应一个 or many 个MapReducejob
  • OPERATOR: 执行计划in operation符, such asTableScan, Filter, Join, GroupByetc.
  • ESTIMATES: estimation 行数, big small and 执行时间
  • PROPERTIES: operation符 property and configuration

2.3 analysis执行计划

analysis执行计划 关键步骤:

  1. 查看query被划分 for how many个STAGE, 阶段越 many , 执行时间通常越 long
  2. check每个STAGE operation符, 看 is 否 has 可以optimization 地方
  3. 查看data倾斜circumstances, is 否 has 某个operation符processing data量特别 big
  4. check is 否 has 不必要 全表扫描
  5. 查看Join 顺序 and class型 is 否合理

3. datastoreoptimization

3.1 选择合适 file格式

不同 file格式 for queryperformance has 很 big 影响:

file格式 特点 适用场景
TextFile 纯文本格式, 易读性 good , 压缩比 low dataimport, 临时表
SequenceFile 二进制键值 for 格式, 适合storestructure化data in间结果store
ORC 列式store, 压缩比 high , support谓词 under 推 large-scaledatastore and query
Parquet 列式store, support嵌套datastructure complex dataclass型, 嵌套structure

3.2 using压缩

压缩可以reducingdatastore量 and network传输量, improvingqueryperformance:

-- 启用in间结果压缩
hive> SET hive.exec.compress.intermediate=true;
hive> SET mapreduce.map.output.compress=true;
hive> SET mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;

-- 启用最终输出压缩
hive> SET hive.exec.compress.output=true;
hive> SET mapreduce.output.fileoutputformat.compress=true;
hive> SET mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;

-- creation表时指定压缩格式
CREATE TABLE sales (
    id INT,
    product STRING,
    amount DECIMAL(10,2)
) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS ORC
TBLPROPERTIES (
    'orc.compress'='SNAPPY'
);

3.3 partition and 分桶

合理usingpartition and 分桶可以显著improvingqueryperformance:

  • partition: 将data按列值划分 to 不同 Table of Contents, reducingquery时扫描 data量
  • 分桶: 将data按列 哈希值划分 to 不同 file, optimizationJoinoperation
-- creationpartition表
CREATE TABLE logs (
    id INT,
    message STRING,
    ip STRING
) PARTITIONED BY (dt STRING)
STORED AS ORC;

-- creation分桶表
CREATE TABLE users (
    id INT,
    username STRING,
    email STRING
) CLUSTERED BY (id) INTO 8 BUCKETS
STORED AS ORC;

4. SQL语法optimization

4.1 避免全表扫描

全表扫描 is queryperformance 主要瓶颈之一, 应尽量避免:

  • usingWHERE子句filterdata
  • 利用partition裁剪, 只query相关partition
  • for 频繁query 列creationindex
-- 不 good  写法: 全表扫描
hive> SELECT * FROM logs;

--  good  写法: 只query需要 partition
hive> SELECT * FROM logs WHERE dt = '2025-01-01';

--  good  写法: 只query需要 列
hive> SELECT id, message FROM logs WHERE dt = '2025-01-01';

4.2 谓词 under 推

谓词 under 推 is 将filter条件尽可能早地application to data on , reducing after 续processing data量:

-- 不 good  写法: 先JOIN再filter
hive> SELECT u.name, o.order_id
    > FROM users u
    > JOIN orders o ON u.id = o.user_id
    > WHERE u.age > 18;

--  good  写法: 先filter再JOIN
hive> SELECT u.name, o.order_id
    > FROM (SELECT * FROM users WHERE age > 18) u
    > JOIN orders o ON u.id = o.user_id;

-- Hive会自动for谓词 under 推,  on 面两种写法效果相同

4.3 合理usingJOIN

JOINoperation is Hivequeryin commonoperation, optimizationJOINoperation可以显著improvingqueryperformance:

  • small 表JOIN big 表: 将 small 表放 in JOIN left 侧, Hive会将 small 表加载 to memoryin
  • using相同 连接键: such as果两个表 in 相同 列 on for分桶, 可以执行桶JOIN
  • usingMAPJOIN: for 于 small 表JOIN big 表, usingMAPJOIN可以避免Reduce阶段
  • 避免笛卡尔积: 确保JOIN条件inpackage含所 has 必要 连接键
-- 启用MAPJOIN
hive> SET hive.auto.convert.join=true;

--  small 表JOIN big 表
hive> SELECT u.name, o.order_id
    > FROM users u --  small 表
    > JOIN orders o ON u.id = o.user_id; --  big 表

-- 显式usingMAPJOIN
hive> SELECT /*+ MAPJOIN(u) */ u.name, o.order_id
    > FROM users u
    > JOIN orders o ON u.id = o.user_id;

4.4 aggregateoperationoptimization

aggregateoperation is 另一个performance瓶颈, optimizationaggregateoperation可以improvingqueryperformance:

  • usingGROUP BY时package含partition键: reducing每个Reducerprocessing data量
  • usingCOMPUTE STATS: 收集表 statisticsinformation, helpingoptimization器选择更 good 执行计划
  • 避免 in GROUP BYinusing complex 表达式: 将 complex 表达式移 to SELECT子句in
-- 收集表statisticsinformation
hive> ANALYZE TABLE users COMPUTE STATISTICS;
hive> ANALYZE TABLE users COMPUTE STATISTICS FOR COLUMNS id, age;

-- optimization before :  complex 表达式 in GROUP BYin
hive> SELECT SUBSTR(email, INSTR(email, '@') + 1) AS domain, COUNT(*) AS cnt
    > FROM users
    > GROUP BY SUBSTR(email, INSTR(email, '@') + 1);

-- optimization after : using子query先计算表达式
hive> SELECT domain, COUNT(*) AS cnt
    > FROM (
    >     SELECT SUBSTR(email, INSTR(email, '@') + 1) AS domain
    >     FROM users
    > ) t
    > GROUP BY domain;

5. Joinoptimizationtechniques

5.1 MapJoin

MapJoin is 将 small 表加载 to memoryin, in Map阶段completionJOINoperation, 避免了Reduce阶段, improving了JOINperformance.

-- 自动MapJoinconfiguration
hive> SET hive.auto.convert.join=true; -- 自动转换 for MapJoin
hive> SET hive.mapjoin.smalltable.filesize=25000000; --  small 表阈值, 默认25MB

--  big 表JOIN many 个 small 表
hive> SELECT /*+ MAPJOIN(d, c) */ u.name, d.department_name, c.city_name
    > FROM users u
    > JOIN departments d ON u.dept_id = d.id
    > JOIN cities c ON u.city_id = c.id;

5.2 Bucket Join

Bucket Join is 指两个表 in 相同 列 on for分桶, 并且桶 数量成倍数relationships, 这样可以 in JOIN时只需要连接 for 应 桶, reducingdata传输.

-- 启用Bucket Join
hive> SET hive.optimize.bucketmapjoin=true;
hive> SET hive.optimize.bucketmapjoin.sortedmerge=true;

-- creation分桶表
CREATE TABLE users (
    id INT,
    name STRING
) CLUSTERED BY (id) INTO 8 BUCKETS
STORED AS ORC;

CREATE TABLE orders (
    id INT,
    user_id INT,
    amount DECIMAL(10,2)
) CLUSTERED BY (user_id) INTO 8 BUCKETS
STORED AS ORC;

-- Bucket Joinquery
hive> SELECT u.name, o.amount
    > FROM users u
    > JOIN orders o ON u.id = o.user_id;

5.3 Sort Merge Bucket Join

Sort Merge Bucket Join is Bucket Join 增强version, 要求分桶表in data按连接键sort, 这样可以 in JOIN时using归并sortalgorithms, 进一步improvingperformance.

-- 启用Sort Merge Bucket Join
hive> SET hive.optimize.bucketmapjoin.sortedmerge=true;
hive> SET hive.auto.convert.sortmerge.join=true;
hive> SET hive.auto.convert.sortmerge.join.noconditionaltask=true;

-- creationsort分桶表
CREATE TABLE users (
    id INT,
    name STRING
) CLUSTERED BY (id) SORTED BY (id) INTO 8 BUCKETS
STORED AS ORC;

CREATE TABLE orders (
    id INT,
    user_id INT,
    amount DECIMAL(10,2)
) CLUSTERED BY (user_id) SORTED BY (user_id) INTO 8 BUCKETS
STORED AS ORC;

-- Sort Merge Bucket Joinquery
hive> SELECT u.name, o.amount
    > FROM users u
    > JOIN orders o ON u.id = o.user_id;

6. data倾斜processing

data倾斜 is Hivequeryin commonissues, 当data分布不均匀时, 某些Reducer会processing big 量data, 导致query执行时间过 long .

6.1 data倾斜 表现

  • query执行时间过 long
  • 某些Reducer 进度停滞不 before
  • 某些Reducerprocessing data量远 big 于otherReducer
  • 出现OOM (Out of Memory) error

6.2 data倾斜 原因

  • null过 many : big 量记录 连接键 for null, 导致这些记录被分配 to 同一个Reducer
  • 热点值: 某些连接键 值出现频率很 high , 导致这些记录被分配 to 同一个Reducer
  • dataclass型不匹配: 连接键 dataclass型不匹配, 导致Hive无法正确分配data
  • complex 表达式: using complex 表达式serving as连接键, 导致data分布不均匀

6.3 data倾斜 solution

6.3.1 nullprocessing

将null转换 for 不同 值, 避免所 has null被分配 to 同一个Reducer:

-- solution1: 将null转换 for 随机值
SELECT u.name, o.order_id
FROM users u
JOIN orders o ON COALESCE(u.id, CONCAT('random_', RAND())) = COALESCE(o.user_id, CONCAT('random_', RAND()));

-- solution2: 将null单独processing
SELECT u.name, o.order_id
FROM (
    SELECT * FROM users WHERE id IS NOT NULL
    UNION ALL
    SELECT * FROM users WHERE id IS NULL LIMIT 0
) u
JOIN (
    SELECT * FROM orders WHERE user_id IS NOT NULL
    UNION ALL
    SELECT * FROM orders WHERE user_id IS NULL LIMIT 0
) o ON u.id = o.user_id;

6.3.2 热点值processing

for 热点值for特殊processing, 避免热点值被分配 to 同一个Reducer:

-- solution: 加盐techniques
SELECT u.name, o.order_id
FROM (
    SELECT *, CONCAT(id, '_', CAST(FLOOR(RAND() * 10) AS STRING)) AS salt_id
    FROM users
    WHERE id IN ('hot_key1', 'hot_key2')
    UNION ALL
    SELECT *, id AS salt_id
    FROM users
    WHERE id NOT IN ('hot_key1', 'hot_key2')
) u
JOIN (
    SELECT *, explode(array('0', '1', '2', '3', '4', '5', '6', '7', '8', '9')) AS suffix
    FROM orders
    WHERE user_id IN ('hot_key1', 'hot_key2')
    UNION ALL
    SELECT *, '' AS suffix
    FROM orders
    WHERE user_id NOT IN ('hot_key1', 'hot_key2')
) o ON u.salt_id = CONCAT(o.user_id, '_', o.suffix);

6.3.3 othersolution

  • usingMAPJOIN: 将 small 表加载 to memoryin, 避免Reduce阶段 data倾斜
  • 增加Reducer数量: usingSET mapreduce.job.reduces=N;增加Reducer数量
  • usingSkewJoin: Hive in 置 data倾斜processingmechanism
-- 启用SkewJoin
hive> SET hive.optimize.skewjoin=true;
hive> SET hive.skewjoin.key=100000; -- 倾斜阈值, 默认100000

-- 显式指定倾斜键
hive> SELECT /*+ SKEWJOIN(u) */ u.name, o.order_id
    > FROM users u
    > JOIN orders o ON u.id = o.user_id;

7. systemconfigurationoptimization

调整Hive and Hadoop configurationparameter可以improvingqueryperformance.

7.1 Hiveconfigurationoptimization

-- 启用本地模式 (适合 small data集) 
hive> SET hive.exec.mode.local.auto=true;
hive> SET hive.exec.mode.local.auto.inputbytes.max=50000000;

-- parallel执行
hive> SET hive.exec.parallel=true;
hive> SET hive.exec.parallel.thread.number=8;

-- 启用向量化query (ORC格式) 
hive> SET hive.vectorized.execution.enabled=true;
hive> SET hive.vectorized.execution.reduce.enabled=true;

-- 动态partition
hive> SET hive.exec.dynamic.partition=true;
hive> SET hive.exec.dynamic.partition.mode=nonstrict;

-- optimization器configuration
hive> SET hive.cbo.enable=true; -- 启用成本Basicsoptimization器
hive> SET hive.compute.query.using.stats=true; -- usingstatisticsinformation
hive> SET hive.stats.fetch.column.stats=true;
hive> SET hive.stats.fetch.partition.stats=true;

7.2 MapReduceconfigurationoptimization

-- Maptaskconfiguration
hive> SET mapreduce.map.memory.mb=2048;
hive> SET mapreduce.map.java.opts=-Xmx1638m;
hive> SET mapreduce.task.io.sort.mb=512;

-- Reducetaskconfiguration
hive> SET mapreduce.reduce.memory.mb=4096;
hive> SET mapreduce.reduce.java.opts=-Xmx3276m;
hive> SET mapreduce.reduce.shuffle.parallelcopies=16;

-- 开启推测执行
hive> SET mapreduce.map.speculative=true;
hive> SET mapreduce.reduce.speculative=true;

-- 压缩configuration
hive> SET mapreduce.map.output.compress=true;
hive> SET mapreduce.output.fileoutputformat.compress=true;

8. best practicessummarized

8.1 datastorebest practices

  • using列式store格式 (ORC or Parquet)
  • 启用Snappy or Zstandard压缩
  • 合理usingpartition and 分桶
  • 定期收集表 statisticsinformation
  • 选择合适 file big small (建议128MB-256MB)

8.2 SQLwritingbest practices

  • writing简洁, 清晰 SQL语句
  • 避免全表扫描, usingWHERE子句filterdata
  • small 表JOIN big 表, 将 small 表放 in JOIN left 侧
  • usingMAPJOINoptimization small 表JOIN big 表
  • 避免 in WHERE子句inusing complex 表达式
  • 合理using子query and 视graph

8.3 systemconfigurationbest practices

  • 根据cluster规模调整MapReducetask数
  • 启用parallel执行 and 向量化query
  • 调整memoryconfiguration, 避免OOMerror
  • 启用推测执行, processing slow task
  • 定期monitorclusterperformance, 调整configurationparameter

8.4 data倾斜processingbest practices

  • 预processingdata, reducingdata倾斜
  • for null and 热点值for特殊processing
  • using加盐techniques分散热点data
  • 启用SkewJoinprocessingdata倾斜
  • 增加Reducer数量, 分散dataprocessing

9. summarized

Hivequeryoptimization is a complex 过程, 需要 from many 个角度foroptimization, includingdatastore, SQL语法, 执行计划, systemconfiguration and data倾斜processingetc..

optimizationHivequery 关键步骤 is :

  1. analysis执行计划, Understandquery 执行过程 and performance瓶颈
  2. optimizationdatastore, 选择合适 file格式 and 压缩方式
  3. writing high 效 SQL语句, 避免common performanceissues
  4. 调整systemconfiguration, improvingquery执行efficiency
  5. processingdata倾斜, 确保data均匀分布

throughcontinuouslyLearning and 实践, MasterHivequeryoptimizationtechniques, 可以显著improvingHivequery performance, 降 low resource消耗, improvingsystemthroughput.