In this blog, we’ll discuss the dichotomy between distributed and centralised computation frameworks, specifically in relation to batch processing. This is quite a well-known topic and, while the answer in recent years has pretty much been “distribute everything”, recent changes in the data landscape have blurred a previously well-defined line.
Before going further, I should introduce myself. My name is Antonio, and I am a Data Engineer at Adevinta, part of the Data Foundations team for the Benelux region. I regularly deal with distributed batch processing and wanted to share some points that I noticed when it comes to properly supporting this declination of data applications, especially regarding the architecture to be used.
To better understand the topic, I’ll start with a quick historical overview of distributed systems. Then I will discuss the changes that have taken place recently. Finally, we’ll take a look at one of the most adopted batch technologies, Spark, pitting it against some alternative solutions that may be more appropriate in some use cases.
Keep in mind that this post revolves around batch data processing frameworks. If we were to consider other kinds of applications, like message queues or databases, this post would become a discount version of “Designing Data Intensive Applications” by Kleppmann, so we are sticking to a smaller scope.
There’s quite a bit to go over, so let’s get started.
PART 1: HISTORICAL OVERVIEW AND STATE OF THE ART
In this section, we will cover a brief overview of how the centralised vs. distributed topic was born and how it has evolved over the years.
Our starting point is fully centralised systems: big (by the late 90s and early 2000s standards) servers and vertical scaling. You have a big machine to do a job. If the machine is no longer capable, you get a bigger one. Although this approach worked at the time, it’s very outdated in today’s data world. As the needs for resources increase the machine specification moves away from general purpose hardware towards specialised hardware, with a very steep increase in costs. As you probably imagine, this is not ideal.
As requirements moved towards specialised hardware, the costs for commodity-use hardware started falling. This created the opening for an alternative: instead of getting a bigger machine, we could instead acquire more smaller machines and distribute the workload amongst them. This shifts the approach from scale-up to scale-out, or from vertical to horizontal scaling.
For the rest of this article we will refer to each machine in a distributed system as a node.
TRADE OFFS
Now, while economically preferable, this approach still has its drawbacks. Distributed computation frameworks have a number of issues that must be addressed:
- Faulty networks: communication between nodes has to happen over a network, and networks are generally unreliable. We need to handle communication failures and latency (more on this below), either provide exactly-once processing or put in place mechanisms to mitigate the error risk if we can’t provide it. The list of potential network issues is pretty long.
- Unbound latency network: communication between nodes has to happen over a network with unbound latency, at least in most cases. This creates a number of issues. How can we be sure that a node has crashed vs general network congestion? How can we handle the Two Generals’ Problem? Or, in the case of a leaderless system, how can we handle the Byzantine Generals Problem?
- Failure recovery: what happens when a node crashes? What happens if a node that apparently died turns out just to be frozen and “comes back to life” after a while? In a centralised framework if the machine crashes we are facing a far more straightforward scenario rather than identifying a crashed node over a faulty network.
- Distribution and replication: how do we decide what node will handle what data partition? Do we replicate the data in more than one node?
- I/O costs: moving data on a network across nodes (shuffling) is not free. It takes time, which means uptime we are paying for, as well as creating pipelines that take longer to complete.
On the other hand, distributing the workload also presents a number of positives:
- Economically sound: a number of smaller nodes will probably be cheaper than an HPC
- Fault tolerance: if we can provide fault tolerance at a node level when something goes wrong we won’t have to re-process as much data (unless we are using a Driver-Worker architecture like Spark, there if the Driver fails we will need to rerun the whole operation). While more difficult to set up, distributed architectures are usually much more fault-tolerant than their centralised counterparts.
ROUND-UP
To sum up, while technically more challenging and absolutely not free from issues, distributed computation frameworks have proven themselves to be the go-to solution for most use cases, as shown by the success of frameworks such as Spark or Flink.
INTRODUCING SPARK
For most people close to the data world Spark needs no introduction but, for sake of completeness, we will go over its core concepts here. For a more detailed introduction to Spark please refer to these articles: An Introduction To Spark and SparkSQL Query Optimisation
Spark was born as an abstraction over Map-Reduce. It is a distributed computation framework based on the Driver-Worker architecture. The driver node prepares the workload, the workers nodes actually execute it. The workloads comprise transformations (evaluated lazily) and actions (evaluated eagerly). Transformations themselves can either be narrow (not requiring a shuffle, like filtering) or wide (requiring a shuffle, like joins, ordering, groupings, etc.).
Data is abstracted and partitioned in RDDs, (Resilient Distributed Datasets) and partitions are then distributed across workers. Workers will process their partitions in a parallel fashion as long as a shuffle is not needed (grouping, ordering, joins, etc.).Partitions are then shuffled and redistributed again. If a worker node is lost, its replacement will use the lineage of its partitions to only recompute them. If the Driver is lost, fault tolerance is none.
Using these principles, further capabilities were introduced, such as SparkSQL, Spark Structured Streaming and SparkML.
While there are valid alternatives to Spark for distributed batch processing, Spark is currently the most popular in the data world.
PART 2: SHOULD WE DISTRIBUTE EVERYTHING?
The answer to this question is quite straightforward: nope. The reasons behind the ‘nope’ are quite interesting and caused by the nature of distributed systems themselves.
First, there is the cost of setting up a cluster environment. Startup costs do exist, and therefore the amount of data used has to justify them. To provide a practical example: if you had to read a 200-column .csv file it would probably be wiser to just read it in Pandas rather than spinning up a Spark cluster. Please note that this is an extreme case, usually the situation is a bit more complicated. Still, the core concept stands: sometimes using a cluster of nodes is simply not needed.
The second reason can be found in networking. When you start relying on an unbounded delay network to enable communication between computing nodes the amount of things that can go wrong increases quite quickly (for a comprehensive overview of this topic I can’t recommend enough “Designing Data-Intensive Applications”, by Kleppmann). You will also introduce I/O costs linked to marshalling/unmarshalling data and having the control plane elements keep track of considerably more elements.
THE “ONE SPARK FITS ALL” ANTIPATTERN
The widespread use of distributed frameworks, united with their shortcomings, has brought forth an anti-pattern that I like to call “one Spark fits all”.
This particular anti-pattern sees distributed frameworks, such as Spark, being deployed for use cases that could be faced with a centralised/single-node approach. In my experience, this anti-pattern has developed by having big nodes readily available and being used for batch computation in Spark.
In this situation we will incur in those nasty distributed architectures drawbacks mentioned earlier, in particular I/O costs linked to shuffling and cluster creation overheads, while instead a single node alternative would be enough.
WHAT ABOUT SINGLE NODE SPARK?
The first reaction to the point above could be to just downscale Spark and use a single node. While better than creating an unnecessary cluster, this still entails the setup of the “cluster” comprising a single node, thus adding costs that could be avoided if we considered other single-node solutions.
As a further note, the fault tolerance that characterises Spark is no longer part of the picture. If the Driver fails, everything that has not been saved to persistent storage will be lost, and even what has already been saved from the failed workload can’t really be used.
This approach impacts both the developer’s experience and the system’s performance: Spark was not created with single-node processing in mind, there are currently alternatives that perform better in such a situation. As a consequence, computations won’t be as fast as they could be and the development cycle will be more cumbersome than it should be, increasing development costs.
Basically, why should we try to use a hammer as a screwdriver when we can just use a screwdriver?
PART 3: CURRENT ALTERNATIVES
There are two alternatives to Spark for use in scenarios where a single-node solution is preferable to a dedicated cluster: DuckDB and Polars.
POLARS
Polars is a library based on Arrow and Pandas, providing “blazingly fast” computation (in their words).
According to benchmarks from the development team, in a single-node situation Polars easily outperforms Spark. This is because it has been implemented in Rust, providing optimisation opportunities that JVM doesn’t really offer. It also has a robust logic optimisation apparatus based on expressions that can be extended by the user. Last, and perhaps most interestingly, the job-stealing approach to load distribution. If a core of the machine on which we are using Polars processes all of its data before the other cores, it will then proceed to steal part of the workload from the other cores. According to the team behind Polars, this means that skewness in data partitioning across the cores is not really an issue.
Of course, Polars is not perfect. First and foremost it is not scalable: currently there is no production-ready way to distribute Polars, although they might be planning to introduce distributed use in the future. It has to be noted that this functionality is still very far from public release, as shown by the fact that the physical optimisation step needed for shuffling data across nodes, is currently non-existent.
Another downside is in the APIs: they are only available in Python and Rust. While this will cover most applications, it does possibly reduce the number of potential use cases.
Last, but not least, the APIs themselves are different syntactically from Spark, so there is a learning curve to take into account.
While not a viable choice for scenarios that entail handling large (hundreds of GBs or even TBs of data), and having a noticeable learning curve, Polars is a valid alternative to single-node Spark (or Spark over little data)
DUCKDB
DuckDB is an in-process Database Management System (DBMS) aimed at OLAP (OnLine Analytical Processing) queries. A quick read of their why DuckDB page will reveal its most interesting features, from its query execution engine and index usage, to its portability and features.
Quite importantly, DuckDB is integrated with Python and Rust, while also presenting a number of APIs for other languages.
Of course, this is not a perfect solution either. DuckDB is not really scalable, plus it is focused on OLAP workloads, which will not fit most use cases.
A very interesting feature was announced at PyData Amsterdam 2023: they are currently working on a set of Spark-like APIs that would enable the user to write as if using PySpark, but having DuckDB actually executing the workload. This would bypass the problem linked to the API syntax seen with Polars, making it easier for the user to switch between PySpark and DuckDB. However, right now quite a lot of features still need to be implemented to make the same code run both on Spark and DuckDB.
PART 4: WHEN TO USE WHAT?
So far we have covered the historical and engineering roots of the problem, explored the most popular solution and introduced the alternatives. The question that should spring to mind now is: how do I choose the right tool for my situation?
There are multiple factors to consider, including:
- Scalability: if you intend to create a pipeline that could potentially access large amounts of data, distributed approaches such as Spark are still the way to go. If, instead, you are working on analytics or are sure that the data volume won’t be massive, single-node approaches should be considered.
- Costs in adoption: are the APIs available in the language being used by the developers? How different is the syntax from Spark? Is the learning curve an acceptable cost for the optimisation obtained?
- Project maturity: do you trust the new tools to be used in production?
- Architectural concerns: is the tool actually usable in your stack?
CONCLUSIONS
In this post we have gone through a rough history of distributed computation frameworks, how they came to be and why they are the go-to solution right now. We have also introduced the market leader in distributed systems, Spark, and the recent developments that have highlighted its shortcomings in the current landscape.
Next, we discussed Polars and DuckDB, two alternatives that should be taken into account when considering single-node, low data scenarios.
Lastly, we outlined the things you should consider while switching from Spark, or any other distributed computing framework to centralised ones, specifically DuckDB and Polars.
Hopefully, this post has given you some insight into Spark alternatives and when to use them.
That’s it for today,
Cheers and see you next post 🙂