Denny Lee

Optimizing Joins running on HDInsight Hive on Azure at GFS

Introduction

To analyze hardware utilization within their data centers, Microsoft’s Online Services Division Global Foundation Services (GFS) is working with Hadoop/Hive via HDInsight on Azure.  A common scenario is to perform joins between the various tables of data.  This quick blog post provides a little context on how we managed to take a query from >2h to < 10 min and the thinking behind it.

Note that this post was published on April 26th, 2013. This information may not be current, but I have kept this post for posterity.

.“…to look at the stars always makes me dream, as simply as I dream over the black dots of a map representing towns and villages…”

— Vincent Van Gogh

Image Source: Vincent Van Gogh Painting Tilt Shifted: http://coolvibe.com/2011/16-van-gogh-paintings-tilt-shifted/tilt-shift-van-gogh-15/

Background

This scenario is a three-column join between a large fact table (~1.2 B rows/day) and a smaller dimension table (~300K rows).  The size of a single day of compressed source files is ~4.2 GB; decompressed is ~120 GB.  When performing a regular join (in Hive parlance, “common join”), it created ~230 GB of intermediary files.  On a 4-node HDInsight on Azure cluster, taking a 1/6th sample of the large table for a single day of data, the query took 2h 24min.

SELECT
colA, colB, … , colN
FROM FactTable f
LEFT OUTER JOIN DimensionTable d
ON d.colC = f.colC
AND d.colD = f.colD
AND d.colE = f.colE

Join Categories

Therea are a number of options to improve performance per Join Strategies in Hive:

  • Common Joins: Designed for joins between a large table and a small table. Pinning the small table into memory improves query performance.
    • Query Notes: 2h 24min on 1/6 dataset
  • MapJoins: Not initially optimal for this situation since we created external hive tables. We had originally wanted to avoid the additional step of creating bucketed tables.
    • Query Notes: This should work perfectly in this scenario
  • Bucketed Map Joins: Great for joining large tables together where you create buckets for the tables so the joins occur between buckets
    • Query Notes: There is no data skew as it is evenly distributed across 38 buckets.
  • Skewed Joins: Hint to tell Hive that the data is skewed and optimize the query accordingly
    • Query Notes: There is no data skew as it is evenly distributed across 38 buckets.

Query Path

Below are our though processes for maximum query performance.

Test RunDurationMappersReducers
Base Query*2:23:59231
Compression*1:24:38231
Configure Reducer Task Size*0:21:392330
Full Dataset2:01:56134182
Increase Nodes (4 to 10)1:10:57134182
Map Joins0:09:581320
* sample data size of 1/6 full daily dataset
 FILES BYTES READFILES BYTES WRITTEN
Test Runmapreducemapreduce
Base Query*43,370,646,35578,930,287,55767,577,746,32259,748,935,558
Compression*1,727,983,19739,441,385,3512,695,972,97620,259,915,184
Configure Reducer Task Size*3,285,339,40338,775,855,5072,677,260,30419,595,626,728
Full Dataset106,420,783,433255,327,019,09017,460,501,681128,929,981,208
Increase Nodes (4 to 10)106,420,795,137255,327,093,47917,460,513,463128,930,072,938
Map Joins540,66407,212,2690
Map/reduce file bytes read/written for each query path

Base Query

As noted above, on just 1/6 of the data, the regular query above took 2h 24min.

Compressing the Intermediate Files and Output

As noted earlier, upon analysis, 230 GB of intermediary files were generated.  Compressing the intermediate files (using the set commands below) improved the query performance (down to 1:24:38) and reduced the size of the files bytes read and files bytes written.

set mapred.compress.map.output=true;
set mapred.map.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec;
set hive.exec.compress.intermediate=true;

Note, currently HDInsight supports gzip and bz2 codecs. We chose the Gzip codec to match the gzip compressed source.

Configure Reducer Task Size

In the previous two queries, it is apparent that there was only one reducer in operation, and increasing the number of reducers (up to a point) should improve query performance as well.  To improve the query to 0:21:39, we also included the reducer configuration.

set hive.exec.reducers.bytes.per.reducer=25000000;

Applying optimizations to the full dataset

While this improved performance, once we switched back to the full dataset, using the above configuration, it took 134 mappers and 182 reducers to complete the job in 2:01:56.   By increasing the number of nodes from four to ten, the query duration dropped down to 1:10:57.

MapJoins

The great thing about mapjoins is that it is perfect for this type of situation – large tables joining a small table.  By using the configuration below, we managed to take a query that took 1:10:57 down to 00:09:58.  In this case, there are no reducers because the join completes during the map phase with a lot less data movement.

set hive.auto.convert.join=true;
set hive.mapjoin.smalltable.filesize=50000000;

An important note is not to forget setting hive.mapjoin.smalltable.filesize.  By default, it is 25MB, and in this case, the smaller table was 43MB.  Because I had forgotten to set it to 50MB, all of my original tests had reverted back to common joins.

Verifying MapJoins are Happening

The ways to verify that the mapjoins are happening (vs. common):

1. With a mapjoin, there are no reducers because the it operates at the map level
2. From the command line, it’ll report that a mapjoin is being performed because it is pushing a smaller table up to memory (as noted in the dump the hash table)
3. And right at the end, there is a call out that it’s converting the join into MapJoin

Below is the command line output of a mapjoin:

2013-04-26 10:52:41 Starting to launch local task to process map join;
maximum memory = 932118528
2013-04-26 10:52:45 Processing rows: 200000 Hashtable size: 199999
Memory usage: 145227488 rate: 0.156
2013-04-26 10:52:47 Processing rows: 300000 Hashtable size: 299999
Memory usage: 183032536 rate: 0.196
2013-04-26 10:52:49 Processing rows: 330936 Hashtable size: 330936
Memory usage: 149795152 rate: 0.161
2013-04-26 10:52:49 Dump the hashtable into file: file:/tmp/msgbigdata/hive_
2013-04-26_22-52-34_959_3143934780177488621/-local-10002/HashTable-Stage-4/MapJo
in-mapfile01--.hashtable
2013-04-26 10:52:56 Upload 1 File to: file:/tmp/msgbigdata/hive_2013-04-26_2
2-52-34_959_3143934780177488621/-local-10002/HashTable-Stage-4/MapJoin-mapfile01
--.hashtable File size: 39687547
2013-04-26 10:52:56 End of local task; Time Taken: 14.203 sec.
Execution completed successfully
Mapred Local Task Succeeded . Convert the Join into MapJoin
Mapred Local Task Succeeded . Convert the Join into MapJoin
Launching Job 2 out of 2

“Join” our discussion

By compressing intermediary/map output files and configuring the mapjoin correctly (and adding some extra cores), we could take a join query that originally > 2 hours to complete and get it in under 10 minutes.  For this particular situation, mapjoins were perfect, but it will be important for you to analyze your data first to see if you have any skews, can fit the smaller table in memory, etc.

set mapred.compress.map.output=true;
set mapred.map.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec;
set hive.exec.compress.intermediate=true;
set hive.auto.convert.join=true;
set hive.mapjoin.smalltable.filesize=50000000;

References

Other excellent references on Hive Map Joins include:

Accolades

Many thanks to Kamen Penev, Pedro Urbina Escos, Dilan Hewage

Updates (August, 2023)

To learn how to optimize joins for Apache Spark 3.0, check out Xiao Li and my session What’s new in Apache Spark 3.0: Xiao Li and Denny Lee.

6 responses to “Optimizing Joins running on HDInsight Hive on Azure at GFS”

  1. Excellent post! Love the how to nature of it.

    1. Thanks! Glad you enjoyed it!

  2. Thanks for port. One question:
    What is the mapred.compress.map.output any impact on hive mapred-s? Why the hive.exec.compress.intermediate setting is not enough?

    1. Thanks Arthur – actually Patrick provided a good response on the difference between the two at: https://groups.google.com/a/cloudera.org/forum/?fromgroups=#!topic/cdh-user/plDwzWCZdaw.

      Basically, the former should be enough in map joins because it is just the map output. The latter effects things like shuffle and sorts (between the map and reduce phases).

      HTH!

  3. […] Beeswax to execute Hive queries from a nice web UI. As noted in Hadoop compression codecs and optimizing Hive joins (and using compression to do it), using compression gives you more space and in many cases can […]

Leave a Reply

Discover more from Denny Lee

Subscribe now to keep reading and get access to the full archive.

Continue reading