Skip to main content

Spark: Narrow vs Wide Transformation

A skilled data engineer thinks about reducing data movement (data shuffle) to improve query performance.

In distributed systems (Apache Spark), wide transformations are more time-consuming than narrow transformations because moving data between nodes takes much longer than simply reading from the file system. Therefore, minimising the amount of data shuffled between nodes in the cluster is essential.

In terms of cost (in terms of speed), from highest to lowest:

  • 1.) Moving data between nodes in the cluster
  • 2.) Reading data from the file system
  • 3.) Processing data in memory

Narrow transformations do not require data movement between the nodes in the cluster. They are usually performed per row, independent of other rows.

For example, some operations only work on one row at a time.

USE tpch;

SELECT
  orderkey,
  Linenumber,
  ROUND(extendedprice * (1 - discount) * (1  +tax), 2) AS totalprice
FROM
  lineitem
LIMIT
  10;

In the above example, Each node independently processes the data, which is read from the disk, processed, and stored locally.

Wide transformations involve moving data between nodes in the cluster. You can think of wide transformations as any operation that requires data from multiple rows.

For example, joining tables require rows from both tables with matching join keys to be moved to the same machine to get the necessary data from the joined result.

Grouping the lineitem table by orderkey will require the data with the same orderkey to be on the same node since the rows with the same orderkey need to be processed together.

A key concept to understand in a wide transformation is data shuffling, also known as data exchange. Data shuffling refers to the movement of data across the nodes in a cluster.

The OLAP DB uses a hash function on the group by or join column(s) to determine the node to send the data.

Note: In distributed systems, hash functions are used to create unique identifiers based on a given column or column's values. Hashing is utilised when performing a join or group to identify the rows that need to be processed together.

USE tpch;

SELECT
  orderpriority,
  ROUND(SUM(totalprice)/1000, 2) AS total_price_thousands
FROM
  orders
GROUP BY
  orderpriority
ORDER BY
  orderpriority;

We can minimise data movement by ensuring that the data to be shuffled is as tiny as possible, applying filters before joining, and only reading the necessary columns.

Summary:

The basic principles of query optimisation in any distributed data processing system are as follows:

  • Reducing the amount of data transferred between cluster nodes during queries.
  • Reducing the amount of data that needs to be processed by the OLAP engine.

Quiz?

Check your knowledge from this article by answering the questions below.

Click on the answer you believe to be correct for each question to see if you are right or wrong!

Q.1. Wide Transformation

Question: What function does the OLAP database use on the group by or joining column(s) to determine the node to send the data to reduce the amount of data transferred between cluster nodes during queries?

Select one answer from the below

Surrogate key function

Hash function

It skips using a function and reads data directly from the file system, as it's the fastest method

Q.2. Wide Transformation

Wide transformation requires data to be shuffled across nodes (partitions) because the operation depends on data from multiple partitions.

Question: What are operations that require shuffling data across partitions?

Select one answer from the below

map, filter, and flatMap

collect, count, and take

join, groupBy, and reduceByKey


Post Tags: