Spark: Narrow vs Wide Transformation
A skilled data engineer thinks about reducing data movement (data shuffle) to improve query performance.
In distributed systems (Apache Spark), wide transformations are more time-consuming than narrow transformations because moving data between nodes takes much longer than simply reading from the file system. Therefore, minimising the amount of data shuffled between nodes in the cluster is essential.
In terms of cost (in terms of speed), from highest to lowest:
- 1.) Moving data between nodes in the cluster
- 2.) Reading data from the file system
- 3.) Processing data in memory
Narrow transformations do not require data movement between the nodes in the cluster. They are usually performed per row, independent of other rows.
For example, some operations only work on one row at a time.
USE tpch;
SELECT
orderkey,
Linenumber,
ROUND(extendedprice * (1 - discount) * (1 +tax), 2) AS totalprice
FROM
lineitem
LIMIT
10;
In the above example, Each node independently processes the data, which is read from the disk, processed, and stored locally.
Wide transformations involve moving data between nodes in the cluster. You can think of wide transformations as any operation that requires data from multiple rows.
For example, joining tables require rows from both tables with matching join keys to be moved to the same machine to get the necessary data from the joined result.
Grouping the lineitem
table by orderkey
will require the data with the same orderkey
to be on the same node since the rows with the same orderkey
need to be processed together.
A key concept to understand in a wide transformation is data shuffling, also known as data exchange. Data shuffling refers to the movement of data across the nodes in a cluster.
The OLAP DB uses a hash function on the group by or join column(s) to determine the node to send the data.
Note: In distributed systems, hash functions are used to create unique identifiers based on a given column or column's values. Hashing is utilised when performing a join or group to identify the rows that need to be processed together.
USE tpch;
SELECT
orderpriority,
ROUND(SUM(totalprice)/1000, 2) AS total_price_thousands
FROM
orders
GROUP BY
orderpriority
ORDER BY
orderpriority;
We can minimise data movement by ensuring that the data to be shuffled is as tiny as possible, applying filters before joining, and only reading the necessary columns.
Summary:
The basic principles of query optimisation in any distributed data processing system are as follows:
- Reducing the amount of data transferred between cluster nodes during queries.
- Reducing the amount of data that needs to be processed by the OLAP engine.
Quiz?
Check your knowledge from this article by answering the questions below.
Click on the answer you believe to be correct for each question to see if you are right or wrong!
Q.1. Wide Transformation
Question: What function does the OLAP database use on the group by or joining column(s) to determine the node to send the data to reduce the amount of data transferred between cluster nodes during queries?
Select one answer from the below
Surrogate key function
Hash function
It skips using a function and reads data directly from the file system, as it's the fastest method
Q.2. Wide Transformation
Wide transformation requires data to be shuffled across nodes (partitions) because the operation depends on data from multiple partitions.
Question: What are operations that require shuffling data across partitions?
Select one answer from the below
map, filter, and flatMap
collect, count, and take
join, groupBy, and reduceByKey
Post Tags: