Back to all tech blogs

Six tried and tested ways to turbocharge Databricks SQL

Best practices for maximising the cost efficiency of your Databricks SQL investment

By Holly Smith (Senior Resident Solutions Architect at Databricks), Lu Pu, (Data Engineer at Adevinta) and Wenhe Ye (Software Development Manager for Data Engineering at Adevinta)

On 25 June 2021 Adevinta completed its acquisition of eBay Classifieds Group. Data covering 20 markets and 1 billion people with 3 billion monthly visits no longer belonged to eBay and couldn’t be run on its purpose built Hadoop cluster. It had to be moved. And fast.

Data amassed over the last five years ran into petabytes; just one Google Analytics table is over 500 TB in size, growing by roughly 500GB a day. Analysts need to review and test this data on a daily basis to understand the efficacy of their digital marketing campaigns. Bake offs were run, proofs of concepts built and it was decided that Databricks SQL would be the tool of choice for these analytical workloads.

Unfortunately the initial implementation did not go quite as planned. Run times were 100–250% longer than the original system, jobs were failing midway and user feedback was far from positive.

Thus ensued a heroic cross team effort between Adevinta, Valcon and Databricks Field & Engineering teams. Together we were able to speed up run times by 300%, significantly improve availability and ultimately win over the analysts.

But this blog isn’t about inflating our ego, it’s to share those learnings with you, dear reader, so you can improve Databricks SQL performance with much less intervention.

#0 Use Delta

Delta has enormous benefits; fast query performance from a columnar storage layout and sophisticated optimisation techniques, it scales to handle trillions of rows and terabytes of data, and guarantees ACID transactions, to name but a few. Delta 2.0 is now fully open source and has been widely adopted into the wider data ecosystem, so there are no concerns about vendor lock-in. For pipelines of any meaningful size, you must use Delta.

So how to change your Parquet files into Delta? There’s the handy syntax CONVERT TO DELTA, which cycles through your parquet files to add on a Delta log and compute the relevant file statistics that make Delta so performant. In our instance, adding those statistics was inefficient to do at this stage as they would be recomputed later when we ran OPTIMIZE (more on that later). So for now CONVERT TO DELTA NO STATISTICS is the syntax we need to speed up our conversion by 95%.

Code

A screenshot of syntax to convert a 1TB parquet table to Delta, taking half a minute

Machine type is important here, and you’re likely to experience bottlenecks on the network. This can be validated by investigating the Ganglia metrics. When this happens, ensure you are selecting a machine that has guaranteed Network Bandwidth (instead of the ‘up to’ associated with smaller sizes). These can be checked on cloud provider sites AWS | Azure | GCP.

#1 Be intentional with your file layouts

Often overlooked, but hugely important for using Delta, Databricks SQL or just Databricks in general is how data is arranged on disk.

Partitions on analytics tables need to be optimised for how the data is read. There is an argument to partition on columns that users might regularly filter by, for example, date. But that introduces performance overheads if you wanted to aggregate over, say, a months’ worth of brand data. Well, what if you also partitioned by brand? Now you might have sped up that one query, but for anyone else querying neither of these things, it will really slow their queries down. Partitions can be useful for data > 100TB, but only if they match end user demands.

The choice of partition is crucial, because not all columns are suitable for partitioning and more partitions doesn’t mean better performance. Previously we had five partition columns for our Google Analytics table, with a whopping 20000+ leaf partition directories as the only way to reduce the data scan was to rely on the predicate pushdown and partition pruning mechanism in Hive/Spark.

Thanks to the dynamic file pruning and z-ordering feature from Delta, we can finally reduce the number of partition columns from five to two. We made decisions based on the statistics collected on historical queries. Here is an illustration of how we performed some in depth work to understand what filters or join keys consumers most commonly used. Based on the same historical data, we also determined the columns for Z-ordering to minimise the data scan (explained later).

Table

I can hear the collective groan as you see the recommendation to repartition data — it can be a big job to simply reorganise that data. In general, it’s less intensive to condense existing partitions (so A, B, C -> A) rather than select a new partition (A, B, C -> D).

If you’re experiencing scalability issues, instead of doing a CTAS statement, try creating an empty table first then loading in partition by partition.

Code

The next piece of advice is to follow medallion architecture, where data quality improves as it moves through the layers with an emphasis on incremental processing and replayability.

Medaillon architecture

Bronze is your raw data in Delta format, Silver is where you start to clean and join data, Gold is analytics focused tables. In our case, we’re dealing with analytics, so our Gold tables should be designed with usage patterns in mind. How much historical data is really needed? Is every column going to be used? Can common joins or aggregations be pre-computed?

Not only will this reduce the compute costs for querying your data, it’ll give your end users a better experience, too.

#2 Compact your small files

The small files problem has plagued Spark and Parquet users for years, and Delta tackles this head on. For the uninitiated, having lots of small files means more time is spent processing the overhead of each file, (opening, reading metadata etc.) rather than processing the data itself. To compact our small files we use the OPTIMIZE command.

Code

A screenshot of the metrics created from the OPTIMIZE command. In this instance, 9074 files were compacted into 3635 files. Comparing the “avg” sizes, we can see the files have 2.5x in size. 2257 files remained untouched.

During this file compaction process, Delta has another trick up its sleeve: ZORDER. This co-locates similar data from the specified columns to allow for fewer files to be read.

Code

A screenshot of the metrics created from the OPTIMIZE command when zOrder is included. zOrderStats contains detailed information about the colocation of data.

On the face of it, it’s fairly straightforward to use but for those looking to squeeze out every performance gain, there are a few additional considerations:

OPTIMIZE target file size

To minimise the need for manual tuning, Delta automatically tunes file size based on the size of the table; smaller file sizes for smaller tables and larger file sizes for larger tables. However, if these tables are smaller, analytics ready gold tables (<1TB), you may want to opt for even smaller file sizes of 32Mb — 64Mb.

OPTIMIZE & Graviton Machine type

The Databricks docs diplomatically state:

“AWS claims that instance types with [graviton] processors have the best price-to-performance ratio of any instance type on Amazon EC2”

This is me, Holly Smith, Senior Resident Solutions Architect at Databricks, stating that for OPTIMIZE workloads Graviton is the way to go for the best price performance, saving up to 50% in DBU and VM costs.

OPTIMIZE & Graviton Machine type … & Photon?

Photon is a vectorized query engine that will definitely speed up OPTIMIZE run time, but not always by enough to keep the price performance the same. It works especially well for wider tables with complex column types, but not so much for skinny IoT-type data. For large, regular or time critical workloads it’s worth experimenting with. Turn it on at cluster creation.

ZORDER column selection

Not all columns are created equal, and some are better than others as candidates for ZORDERing. As a reminder, the aim of ZORDER is to colocate data so fewer files are read as Databricks scans for minimums, maximums and counts in the file metadata.

  1. Select up to five columns. For each column that is added, there is a drop in effectiveness.
  2. Select columns that are used as a predicate within where & join statements. These are the types of queries that benefit from file skipping.
  3. Select columns with high cardinality, aka, lots of distinct values. For example, if there are only two country codes, then it won’t help speed up anything if half of the files fit your search criteria. Good candidates are unique identifiers.
  4. But …don’t get too clever with unique values. Columns with long strings of text, maps, arrays and structs are not good candidates due to the overhead in searching in them.
  5. Make sure the columns are not correlated with other ZORDER columns or partition columns. It’s essentially the same sort.
  6. And finally, select columns that have statistics collected on them! By default, this is the first 32 columns in a table, but more on this later.

I tried to put the above in order of importance, but failed terribly. They are all important.

Optimise your OPTIMIZE as part of regular runs

OPTIMIZE needs to be run on a regular basis as part of pipeline maintenance. If it’s taking a long time to run, here are some recommendations:

  • Prevent tiny files from being written in the first place with the settings for auto optimize. Great for streaming, but does add some latency as it writes in a smarter way and performs a mini OPTIMIZE.
  • Reduce the batch size and control the number of tasks in the OPTIMIZE commit
    set spark.databricks.delta.optimize.tasksPerCommit = 100
    set spark.databricks.delta.optimize.maxThreads = 64

And again — be mindful of your file layouts!

OPTIMIZE cannot compact files across a partition on disk — if you over partition your data, OPTIMIZE and ZORDER cannot save you!

#3 Don’t waste statistics collections on the wrong columns

By default, statistics are calculated on the first 32 columns. Well, ‘columns’. If a five layer nested structure is one of those columns, then it counts as five columns, not one. So, for example, say you have a map with 100 key-value pairs in it, it’s going to count as 100 columns, quickly hitting the 32 column limit. It’s not just map types to be wary of; structs, arrays and long strings all have a similar impact.

Code

At Adevinta, we had three options to address this:

  1. Move the giant map column further right in the table, outside the first 32
  2. Explode the map column for commonly used key-value pairs
  3. Increase the statistics count to 250+ with delta.dataSkippingNumIndexedCols

Ultimately we went with the first as it was the easiest to implement and gave enormous performance gains by itself. The next one would have required more overhead as we noticed there weren’t consistently popular pairs. The last option would be a very expensive operation — if you wanted performance at literally any cost, this could be an option.

#4 Use Serverless SQL

Databricks SQL has three warehouse types for analysts to run workloads on; classic, pro and serverless. Classic runs on i3 instances on AWS, and unfortunately, there is a shortage in Adevinta’s chosen cloud region. This caused reliability issues, long spin up times and unhappy users.

Switching to serverless allows for instant start up times and gives a consistent experience. There’s also the additional benefits of predictive IO to speed up read time, and workflows integration to allow for scheduling of queries and chaining them with other Databricks tasks like running notebooks, alerts or visualisations.

Happy users.

#5 Tune the S3 storage underneath

Ok, now we’re really getting into specifics here.

The first way to remove any latency introduced by networking: make sure your Databricks deployment is in the same region as the S3 bucket, and that a VPC endpoint is used to connect to S3 so traffic isn’t going via the internet. These can be checked in the AWS console.

For S3 specific settings:

  • Avoid hotspots in the metadata by randomising file prefixes in the Delta table properties using delta.randomizeFilePrefixes=true
  • Avoid listing bottlenecks created by S3 versioning by purging old versions of files using a lifecycle policy
  • If you are still facing issues, separate gold tables into a dedicated S3 bucket, separating it from bronze and silver processing. In some instances where thousands of requests are being sent per second to just one table, it may need its own dedicated S3 bucket.

And again — be mindful of your file layouts!

Small files will cause bottlenecks, remove them with appropriately sized partitions, and OPTIMIZE regularly.

Tl:dr — use Delta, optimize & z-order regularly on the best columns, use serverless, sort your networking, make users happy.

Related techblogs

Discover all techblogs

Data Analyst Enablers Community: A co-leadership journey

Read more about Data Analyst Enablers Community: A co-leadership journey
Meeting

From Glitches to Grins: The Support Superhero Squad’s Epic Journey to SLO Success!

Read more about From Glitches to Grins: The Support Superhero Squad’s Epic Journey to SLO Success!
Breakroom

Lessons learned from organising our first global AI hackathon

Read more about Lessons learned from organising our first global AI hackathon
AI hackathon