Denny Lee

Computing Delta Lake State Quickly with Checkpoint Files

In the past few months, we have explored what is the Delta Lake transaction log, understanding the transaction log at the file level, and we also took a peek into the transaction log. Within these posts, our focus is that of a single transaction log in the form of JSON files. But for large-scale systems or any streaming system, this would result in creating the “small-file” problem where it becomes ever more inefficient to query the transaction log folder (i.e., _delta_log subdirectory).

Delta Lake Checkpoint File

To alleviate this issue, Delta Lake creates a checkpoint file in Parquet format after it creates (by default) the 10th commit (i.e. transaction).  

Delta Lake by defaults create a checkpoint file every 10th commit.
Delta Lake by defaults create a checkpoint file every 10th commit.

These checkpoint files save the entire state of the table at a point in time – in native Parquet format that is quick and easy for any engine to read. It offers the reader a “shortcut” to fully reproducing a table’s state to avoid reprocessing what could be thousands of tiny, inefficient JSON files.

Recomputing State Example Scenario

In the following scenario, we will show how Apache Spark™ recomputes the state of the table.

Computing Delta Lake State | Spark Caching after 3rd Commit

When recomputing the state of the table, Spark will read and cache the available JSON files that make up the transaction log.  For example, if there have been only three committed operations or commits to the table (including the table creation), Spark will read all three files and cache the results into memory (i.e., cache version 2).

Computing Delta’s State | Spark Caching after 3rd Commit
Computing Delta’s State | Spark Caching after 3rd Commit

Instead of continuously reading the transaction log, the Spark readers requesting data from this table can reference the in-memory cached copy of Delta’s state.

Computing Delta Lake State | Spark Caching after 8th Commit

As more commits are performed against the Delta table, more JSON files will be added to the _delta_log folder.

Computing Delta’s State | Spark Caching after 8th Commit
Computing Delta’s State | Spark Caching after 8th Commit

To continue this example, let’s say this table five additional commits, then Spark will cache version 7 of the data.  Note, at this point, Spark will list all of the transactions from version 0 instead of reading from version 2 to ensure that earlier transactions have completed.

Computing Delta Lake State | Spark Caching after 13th Commit

By default, Delta Lake will create a checkpoint file (00010.checkpoint.parquet) after the 10th commit.  Delta Lake will still listFrom version 0 to avoid late transactions and cache version 12.  

Computing Delta’s State | Spark Caching after 13th Commit
Computing Delta’s State | Spark Caching after 13th Commit

As the checkpoint file is a parquet file, you can easily read it using Spark using the following command.

# Review the transaction log checkpoint file
chkpt0 = spark.read.parquet("/../_delta_log/00010.checkpoint.parquet")

The resulting metadata is a union of all of the previous transactions.  This becomes apparent when you read the query for the add information.

# Add Information
display(chkpt0.select("add").where("add is not null"))
Add information in the Delta Lake checkpoint file
Add information in the Delta Lake checkpoint file

Note, the output figures use the command display which is a Databricks specific command. You can use .show (e.g., j0.show()) when using a native Spark command instead.

Upon expanding cell 2 in the preceding image, notice how the stats in the original JSON file were in string format.

stats: "{\"numRecords\":7,\"minValues\":{\"addr_state\":\"IA\",\"count\":3,\"stream_no\":3},\"maxValues\":{\"addr_state\":\"TX\",\"count\":9,\"stream_no\":3},\"nullCount\":{\"addr_state\":0,\"count\":0,\"stream_no\":0}}"

As part of the checkpoint creation process, there is a stats_parsed column that contains the statistics as nested columns instead of strings. The latter allows for faster parsing for many engines and languages while the former is there for backwards compatibility.

stats_parsed:
{
   "numRecords": 7, 
   "minValues": {
      "addr_state": "IA", 
      "count": 3, 
      "stream_no": 3
    }, 
   "maxValues": {
      "addr_state": "TX", 
      "count": 9, 
      "stream_no": 3
    }, 
   "nullCount": {
      "addr_state": 0, 
      "count": 0, 
      "stream_no": 0
   }
}

For example, Spark can read the statistics significantly faster by parsing nested columns instead of strings. This is especially important when you need to read the entire transaction log created for petabyte-scale lakehouses.

Computing Delta Lake State | Spark Caching after 15th Commit

Note, once a Parquet checkpoint file has been created, subsequent listFrom calls are from the checkpoint file instead of going back to version 0.   

Computing Delta’s State | Spark Caching after 15th Commit
Computing Delta’s State | Spark Caching after 15th Commit

Thus after the 15 commits, Delta will create cache version 14 but it will need only to listFrom the version 10 checkpoint file.

Addendum

To dive into this further, enjoy Diving into Delta Lake: Unpacking the Transaction Log v2. Burak and I also have a fun gag/blooper around time travel as it was a virtual conference that year.

2 responses to “Computing Delta Lake State Quickly with Checkpoint Files”

  1. Hi, Can you please the property through which we can increase the decrease the transaction number(default is 10) to generate the checkpoint file

    1. Great question, Rakesh. The property is `checkpointInterval`. To dive deeper, you can see it in DeltaConfig.scala and/or Delta Kernel TableConfig.java. HTH!

Leave a Reply

Discover more from Denny Lee

Subscribe now to keep reading and get access to the full archive.

Continue reading