Denny Lee

How Apache Spark™ performs a fast count using the parquet metadata

Apache Spark fast count by reading Parquet Metadata

A common question I run into is how does Apache Spark™ perform fast counts against Parquet files. TL;DR: Spark is able to read the parquet metadata without needing to read the entire file. If you want to dive into the details, please continue!

The rest of this post contains the basics surrounding how an Apache Spark row count uses the Parquet file metadata to determine the count (instead of scanning the entire file). For a query like spark.read.parquet(...).count(), the Parquet columns are not accessed, instead the requested Parquet schema that is passed down to the VectorizedParquetRecordReader is simply an empty Parquet message. Thus, Spark is calculating the count using the Parquet file footer metadata.

Note, this post is based on an earlier version (December 2016) of my Github note.

Let’s start with Jobs

When Apache Spark executes a query, it creates multiple jobs and associated stages, each with its own set of tasks. Ultimately, the query will then distribute the tasks across multiple nodes. For this query spark.read.parquet(...).count(), the jobs and stages behind the can be seen in the Spark DAG (from the Spark UI) below.

Spark UI snippet showing the Spark jobs generated for a count query
Spark UI snippet showing the Spark jobs generated for a count query

By default, Spark will generate two jobs to perform this calculation. The first job reads the file from the data source and second job performs the aggregration.

First job (job 0) is to read the Parquet file from the data source
The second job (job 1) has two stages to perform the aggregation.

Normally its all about WholeStageCodegen

In most aggregation cases with Apache Spark, we would dive into WholeStageCodegen per Job 1.  At planning time, Spark generates Java byte code for many Spark operations including aggregations. The Spark engine Catalyst Optimizer generates a logical plan, multiple physical plans, and utilizes a cost optimizer. This is similar to how relational database systems operate. The last step is to generate the byte code via Code Generation.

Apache Spark Catalyst Optimizer from logical plan to code generation
Apache Spark Catalyst Optimizer from logical plan to code generation

If you want to dive deeper into this, here are some great resources:

Check out What’s new in Apache Spark 3.0 for a quick primer including Adaptive Query Execution (AQE). 

What’s new in Apache Spark 3.0 with Xiao Li and Denny Lee

Recall that Spark can perform a fast count using Parquet metadata instead of reading the data source or Parquet files. So instead of diving into FileScanRDD (scanning the Parquet files) to WholeStageCodeGen (Code Generation) for most queries per Job 1, let’s take a look at the code flow of Job 0.

Let’s look at the RecordReaderIterator

When reading the file(s) from the data source, the generated Java code from DataFrameReader interacts with the underlying data source ParquetFileFormat. This in term utilizes the RecordReaderIterator, which is used internally by the Spark Data Source API. The following is the breakdown of the code flow:

org.apache.spark.sql.DataFrameReader.load (DataFrameReader.scala:145)
   https://github.com/apache/spark/blob/689de920056ae20fe203c2b6faf5b1462e8ea73c/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala#L145

	|- DataSource.apply(L147)
		https://github.com/apache/spark/blob/689de920056ae20fe203c2b6faf5b1462e8ea73c/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala#L147

		|- package org.apache.spark.sql.execution.datasources
			https://github.com/apache/spark/blob/689de920056ae20fe203c2b6faf5b1462e8ea73c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala

			|- package org.apache.spark.sql.execution.datasources.parquet [[ParquetFileFormat.scala#L18]]
				https://github.com/apache/spark/blob/2f7461f31331cfc37f6cfa3586b7bbefb3af5547/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala#L18

    			|- class ParquetFileFormat [[ParquetFileFormat.scala#L51]]
    				https://github.com/apache/spark/blob/2f7461f31331cfc37f6cfa3586b7bbefb3af5547/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala#L51

Following the flow of the code, its job (pun intended) is access the ParquetFileFormat data source when utilizing the DataFrameReader. This is all within the context of the Data Source API org.apache.spark.sql.execution.datasources.

RecordReader paths: (Non)VectorizedParquetRecordReader and InternalRow

One interaction path for the for the Java byte code is via the VectorizedParquetRecordReader (and NonVectorized) path; below is the code flow for the former.

|- package org.apache.spark.sql.execution.datasources.parquet [[ParquetFileFormat.scala#L18]]
  https://github.com/apache/spark/blob/2f7461f31331cfc37f6cfa3586b7bbefb3af5547/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala#L18

    |- class ParquetFileFormat [[ParquetFileFormat.scala#L51]]
      https://github.com/apache/spark/blob/2f7461f31331cfc37f6cfa3586b7bbefb3af5547/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala#L51

        |- val vectorizedReader = new VectorizedParquetRecordReader()
          https://github.com/apache/spark/blob/2f7461f31331cfc37f6cfa3586b7bbefb3af5547/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala#L376

            |- VectorizedParquetRecordReader.java#48
              https://github.com/apache/spark/blob/39e2bad6a866d27c3ca594d15e574a1da3ee84cc/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java#L48

                  |- SpecificParquetRecordReaderBase.java#L151
                    https://github.com/apache/spark/blob/v2.0.2/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java#L151

                        |- totalRowCount

 The SpecificParquetRecordReaderBase.java (Line 13 in the above code snippet) references the Improve Parquet scan performance when using flat schemas commit as part of [SPARK-11787] Speed up parquet reader for flat schemas.

Another path for the Java byte code to interact with the underlying data source is through the RecordReaderIterator that returns an InternalRow via the code flow below.

|- package org.apache.spark.sql.execution.datasources.parquet [[ParquetFileFormat.scala#L18]]
	https://github.com/apache/spark/blob/2f7461f31331cfc37f6cfa3586b7bbefb3af5547/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala#L18

		|- class ParquetFileFormat [[ParquetFileFormat.scala#L51]]
			https://github.com/apache/spark/blob/2f7461f31331cfc37f6cfa3586b7bbefb3af5547/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala#L51

			|- val iter = new RecordReaderIterator(parquetReader) [[ParquetFileFormat.scala#L399]]
				https://github.com/apache/spark/blob/2f7461f31331cfc37f6cfa3586b7bbefb3af5547/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala#L399

				|- class RecordReaderIterator[T] [[RecordReaderIterator.scala#L32]]
					https://github.com/apache/spark/blob/b03b4adf6d8f4c6d92575c0947540cb474bf7de1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RecordReaderIterator.scala#L32

						|- import org.apache.hadoop.mapreduce.RecordReader [[RecordReaderIterator.scala#L22]]
							https://github.com/apache/spark/blob/b03b4adf6d8f4c6d92575c0947540cb474bf7de1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RecordReaderIterator.scala#L22

						|- import org.apache.spark.sql.catalyst.InternalRow [[RecordReaderIterator.scala#L24]]
							https://github.com/apache/spark/blob/b03b4adf6d8f4c6d92575c0947540cb474bf7de1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RecordReaderIterator.scala#L24

What is Internal Row?

The logic surrounding InternalRow via RecordReaderIterator is

  • Not needing to reading any Parquet columns to calculate the count
  • The passing of the Parquet schema to the VectorizedParquetRecordReader is actually an empty Parquet message
  • Computing the count using the metadata stored in the Parquet file footers.

To work with the Parquet file format, internally, Apache Spark wraps the logic with an iterator that returns an InternalRow; more information can be found in InternalRow.scala. Ultimately, the count() aggregate function interacts with the underlying Parquet data source using this iterator. This is true for both vectorized and non-vectorized Parquet reader.

Bridging Apache Spark count with Parquet metadata

So how do we bridge the Apache Spark count (i.e., spark.read.parquet(...).count(), or more eloquently Dataset.count()) with the Parquet metadata count calculated in the aggregation. After all these two steps are performed in two separate jobs (job 0 and job 1).

Fast Apache Spark count is calculated by reading Parquet metadata using InternalRow
Calculate fast Apache Spark count by reading Parquet metadata using InternalRow

To bridge these two jobs (i.e., the Dataset.count() with the Parquet metadata count via InternalRow)

  • The Dataset.count() call is planned into an aggregate operator with a single count() aggregate function.
  • Java code is generated (i.e. WholeStageCodeGen) at planning time for the aggregate operator as well as the count() aggregate function.
  • The Java byte code interacts with the underlying data source ParquetFileFormat with an RecordReaderIterator, which is used internally by the Spark data source API.

Discussion

Ultimately, if the query is a row count, Spark will reading the Parquet metadata to determine the count. If the predicates are fully satisfied by the min/max values, that should work as well.

Attribution

I wanted to thank Cheng Lian and Nong Li for helping me to dive deeper in how this process works.

Leave a Reply

Discover more from Denny Lee

Subscribe now to keep reading and get access to the full archive.

Continue reading