<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:content="http://purl.org/rss/1.0/modules/content/"
    xmlns:atom="http://www.w3.org/2005/Atom" xmlns:media="http://search.yahoo.com/mrss/" version="2.0">
    <channel>
        
        <title>
            <![CDATA[ AWS Glue - freeCodeCamp.org ]]>
        </title>
        <description>
            <![CDATA[ Browse thousands of programming tutorials written by experts. Learn Web Development, Data Science, DevOps, Security, and get developer career advice. ]]>
        </description>
        <link>https://www.freecodecamp.org/news/</link>
        <image>
            <url>https://cdn.freecodecamp.org/universal/favicons/favicon.png</url>
            <title>
                <![CDATA[ AWS Glue - freeCodeCamp.org ]]>
            </title>
            <link>https://www.freecodecamp.org/news/</link>
        </image>
        <generator>Eleventy</generator>
        <lastBuildDate>Wed, 24 Jun 2026 20:18:33 +0000</lastBuildDate>
        <atom:link href="https://www.freecodecamp.org/news/tag/aws-glue/rss.xml" rel="self" type="application/rss+xml" />
        <ttl>60</ttl>
        
            <item>
                <title>
                    <![CDATA[ How to Optimize PySpark Jobs: Real-World Scenarios for Understanding Logical Plans ]]>
                </title>
                <description>
                    <![CDATA[ In the world of big data, performance isn't just about bigger clusters – it's about smarter code. Spark is deceptively simple to write but notoriously difficult to optimize, because what you write isn't what Spark executes. Between your transformatio... ]]>
                </description>
                <link>https://www.freecodecamp.org/news/how-to-optimize-pyspark-jobs-handbook/</link>
                <guid isPermaLink="false">69851d7be613661950e00d8f</guid>
                
                    <category>
                        <![CDATA[ General Programming ]]>
                    </category>
                
                    <category>
                        <![CDATA[ data-engineering ]]>
                    </category>
                
                    <category>
                        <![CDATA[ data analysis ]]>
                    </category>
                
                    <category>
                        <![CDATA[ spark ]]>
                    </category>
                
                    <category>
                        <![CDATA[ Python ]]>
                    </category>
                
                    <category>
                        <![CDATA[ PySpark ]]>
                    </category>
                
                    <category>
                        <![CDATA[ AWS Glue ]]>
                    </category>
                
                    <category>
                        <![CDATA[ handbook ]]>
                    </category>
                
                <dc:creator>
                    <![CDATA[ Sameer Shukla ]]>
                </dc:creator>
                <pubDate>Thu, 05 Feb 2026 22:45:15 +0000</pubDate>
                <media:content url="https://cdn.hashnode.com/res/hashnode/image/upload/v1770331493095/d569e168-d3ba-40e0-a500-7f682bbef693.png" medium="image" />
                <content:encoded>
                    <![CDATA[ <p>In the world of big data, performance isn't just about bigger clusters – it's about smarter code. Spark is deceptively simple to write but notoriously difficult to optimize, because what you write isn't what Spark executes. Between your transformations and actual computation lies an invisible translation layer – the logical plan – that determines whether your job runs in minutes or hours.</p>
<p>Most engineers never look at this layer, which is why they spend days tuning configurations that don't address the real problem: inefficient transformations that generate bloated plans.</p>
<p>This handbook teaches you to read, interpret, and control those plans, transforming you from someone who writes PySpark code into someone who architects efficient data pipelines with precision and confidence.</p>
<h2 id="heading-table-of-contents">Table of Contents</h2>
<ol>
<li><p><a class="post-section-overview" href="#heading-background-information">Background Information</a></p>
</li>
<li><p><a class="post-section-overview" href="#heading-chapter-1-the-spark-mindset-why-plans-matter">Chapter 1: The Spark Mindset: Why Plans Matter</a></p>
</li>
<li><p><a class="post-section-overview" href="#heading-chapter-2-understanding-the-spark-execution-flow">Chapter 2: Understanding the Spark Execution Flow</a></p>
</li>
<li><p><a class="post-section-overview" href="#heading-chapter-3-reading-and-debugging-plans-like-a-pro">Chapter 3: Reading and Debugging Plans Like a Pro</a></p>
</li>
<li><p><a class="post-section-overview" href="#heading-chapter-4-writing-efficient-transformations">Chapter 4: Writing Efficient Transformations</a></p>
<ul>
<li><p><a class="post-section-overview" href="#heading-scenario-1-rename-in-one-pass-withcolumnrenamed-vs-todf">Scenario 1: Rename in One Pass: withColumnRenamed() vs toDF()</a></p>
</li>
<li><p><a class="post-section-overview" href="#heading-scenario-2-reusing-expressions">Scenario 2: Reusing expressions</a></p>
</li>
<li><p><a class="post-section-overview" href="#heading-scenario-3-batch-column-ops">Scenario 3: Batch column ops</a></p>
</li>
<li><p><a class="post-section-overview" href="#heading-scenario-4-early-filter-vs-late-filter">Scenario 4: Early Filter vs Late Filter</a></p>
</li>
<li><p><a class="post-section-overview" href="#heading-scenario-5-column-pruning">Scenario 5: Column Pruning</a></p>
</li>
<li><p><a class="post-section-overview" href="#heading-scenario-6-filter-pushdown-vs-full-scan">Scenario 6: Filter pushdown vs full scan</a></p>
</li>
<li><p><a class="post-section-overview" href="#heading-scenario-7-de-duplicate-right">Scenario 7: De-duplicate right</a></p>
</li>
<li><p><a class="post-section-overview" href="#heading-scenario-8-count-smarter">Scenario 8: Count Smarter</a></p>
</li>
<li><p><a class="post-section-overview" href="#heading-scenario-9-window-wisely">Scenario 9: Window wisely</a></p>
</li>
<li><p><a class="post-section-overview" href="#heading-scenario-10-incremental-aggregations-with-cache-and-persist">Scenario 10: Incremental Aggregations with Cache and Persist</a></p>
</li>
<li><p><a class="post-section-overview" href="#heading-scenario-11-reduce-shuffles">Scenario 11: Reduce shuffles</a></p>
</li>
<li><p><a class="post-section-overview" href="#heading-scenario-12-know-your-shuffle-triggers">Scenario 12: Know Your Shuffle Triggers</a></p>
</li>
<li><p><a class="post-section-overview" href="#heading-scenario-13-tune-parallelism-shuffle-partitions-amp-aqe">Scenario 13: Tune Parallelism: Shuffle Partitions &amp; AQE</a></p>
</li>
<li><p><a class="post-section-overview" href="#heading-scenario-14-handle-skew-smartly">Scenario 14: Handle Skew Smartly</a></p>
</li>
<li><p><a class="post-section-overview" href="#heading-scenario-15-sort-efficiently-orderby-vs-sortwithinpartitions">Scenario 15: Sort Efficiently (orderBy vs sortWithinPartitions)</a></p>
</li>
</ul>
</li>
<li><p><a class="post-section-overview" href="#heading-conclusion">Conclusion</a></p>
</li>
</ol>
<h2 id="heading-background-information">Background Information</h2>
<h3 id="heading-what-this-handbook-is-really-about">What This Handbook is Really About</h3>
<p>This is not a tutorial about Spark internals, cluster tuning, or PySpark syntax or APIs.</p>
<p>This is a handbook about writing PySpark code that generates efficient logical plans.</p>
<p>Because when your code produces clean, optimized plans, Spark pushes filters correctly, shuffles reduce instead of multiply, projections stay shallow, and the DAG (<a target="_blank" href="https://en.wikipedia.org/wiki/Directed_acyclic_graph">Directed Acyclic Graph</a>) becomes predictable, lean, and fast.</p>
<p>When your code produces messy plans, Spark shuffles more than necessary, and projects pile up into deep, expensive stacks. Filters arrive late instead of early, joins explode into wide, slow operations, and the DAG becomes tangled and expensive.</p>
<p>The difference between a fast job and a slow job is not “faster hardware.” It’s the structure of the plan Spark generates from your code. This handbook teaches you to shape that plan deliberately through scenarios.</p>
<h3 id="heading-who-this-handbook-is-for">Who This Handbook Is For</h3>
<p>This handbook is written for:</p>
<ul>
<li><p><strong>Data engineers</strong> building production ETL pipelines who want to move beyond trial-and-error tuning and understand <em>why</em> jobs perform the way they do</p>
</li>
<li><p><strong>Analytics engineers</strong> working with large datasets in Databricks, EMR, or Glue who need to optimize Spark jobs but don't have time for thousand-page reference manuals</p>
</li>
<li><p><strong>Data scientists</strong> transitioning from pandas to PySpark who find themselves writing code that technically runs but takes forever</p>
</li>
<li><p><strong>Anyone</strong> who has stared at the Spark UI, seen mysterious "Exchange" nodes in the DAG, and wondered, <em>"Why is this shuffling so much data?"</em></p>
</li>
</ul>
<p>You should already be comfortable writing basic PySpark code , creating DataFrames, applying transformations, running aggregations. This handbookbook won't teach you Spark syntax. Instead, it teaches you how to write transformations that work <em>with</em> the optimizer, not against it.</p>
<h3 id="heading-how-this-handbook-is-structured">How This Handbook Is Structured</h3>
<p>We’ll start with foundations, then move on to real-world scenarios.</p>
<p>Chapters 1-3 build your mental model. You'll learn what logical plans are, how they connect to physical execution, and how to read the plan output that Spark shows you. These chapters are short and focused – just enough theory to make the practical scenarios meaningful.</p>
<p>Chapter 4 is the heart of the handbook. It contains 15 real-world scenarios, organized by category. Each scenario shows you a common performance problem, explains what's happening in the logical plan, and demonstrates the better approach. You'll see before-and-after code, plan comparisons, and clear explanations of why one approach outperforms another.</p>
<h3 id="heading-what-youll-learn">What You'll Learn</h3>
<p>By the end of this handbook, you'll be able to:</p>
<ul>
<li><p>Read and interpret Spark's logical, optimized, and physical plans</p>
</li>
<li><p>Identify expensive operations before running your code</p>
</li>
<li><p>Restructure transformations to minimize shuffles</p>
</li>
<li><p>Choose the right join strategies for your data</p>
</li>
<li><p>Avoid common pitfalls that cause memory issues and slow performance</p>
</li>
<li><p>Debug production issues by examining execution plans</p>
</li>
</ul>
<p>More importantly, you'll develop a Spark mindset, an intuition for how your code translates to cluster operations. You'll stop writing code that "should work" and start writing code that you <em>know</em> will work efficiently.</p>
<h3 id="heading-technical-prerequisites">Technical Prerequisites</h3>
<p>I assume that you’re familiar with the following concepts before proceeding:</p>
<ol>
<li><p>Python fundamentals</p>
</li>
<li><p>PySpark basics</p>
<ul>
<li><p>Creating DataFrames and reading data from files</p>
</li>
<li><p>Basic DataFrame operations: select, filter, withColumn, groupBy, join</p>
</li>
<li><p>Writing DataFrames back to storage</p>
</li>
</ul>
</li>
<li><p>Basic Spark concepts</p>
<ul>
<li><p>Basic understanding of Spark applications, jobs, stages, and tasks</p>
</li>
<li><p>Basic understanding of the difference between transformations and actions</p>
</li>
<li><p>Understanding. of partitions and shuffles</p>
</li>
</ul>
</li>
<li><p>AWS Glue (Good to have)</p>
</li>
</ol>
<h2 id="heading-chapter-1-the-spark-mindset-why-plans-matter">Chapter 1: The Spark Mindset: Why Plans Matter</h2>
<p>This chapter isn’t about Spark theory or internals. It’s about understanding Spark Plans, and seeing Spark the way the engine sees your code. Once you understand how Spark builds and optimizes a logical plan, optimization stops being trial and error and becomes intentional engineering.</p>
<p>Behind every simple transformation, Spark quietly redraws its internal blueprint. Every transformation you write from "<em>withColumn</em>" to join changes that plan. When the plan is efficient, Spark flies, but when it’s messy, Spark crawls.</p>
<h3 id="heading-the-invisible-layer-behind-every-transformation">The Invisible Layer Behind Every Transformation</h3>
<p>When you write PySpark code, it feels like you’re chaining operations step by step. In reality, Spark isn’t executing those lines. It’s quietly building a blueprint, a logical plan describing <em>what</em> to do, not <em>how</em>.</p>
<p>Once this plan is built, the Catalyst Optimizer analyzes it, rearranges operations, eliminates redundancies, and produces an optimized plan. Catalyst is Spark’s query optimization engine.</p>
<p>Every DataFrame or SQL operation we write, such as select, filter, join, groupBy, is first converted into a logical plan. Catalyst then analyzes and transforms this plan using a set of rule-based optimizations, such as predicate pushdown, column pruning, constant folding, and join reordering. The result is an optimized logical plan, which Spark later converts into a physical execution plan. Finally, Spark translates that into a physical plan of what your cluster actually runs. This invisible planning layer decides the job’s performance more than any configuration setting.</p>
<h3 id="heading-from-logical-to-optimized-to-physical-plans">From Logical to Optimized to Physical Plans</h3>
<p>When you run <code>df.explain(True)</code>, Spark actually shows you four stages of reasoning:</p>
<h4 id="heading-1-logical-plan">1. Logical Plan</h4>
<p>The logical plan is the first stage where the initial translation of the code results in a tree structure that shows what operations need to happen, without worrying about how to execute them efficiently. It’s a blueprint of the query’s logic before any optimization or physical planning occurs.</p>
<p>This:</p>
<pre><code class="lang-python">df.filter(col(<span class="hljs-string">'age'</span>) &gt; <span class="hljs-number">25</span>) \
  .select(<span class="hljs-string">'firstname'</span>, <span class="hljs-string">'country'</span>) \
  .groupby(<span class="hljs-string">'country'</span>) \
  .count() \
  .explain(<span class="hljs-literal">True</span>)
</code></pre>
<p>results in the following logical plan:</p>
<pre><code class="lang-python">== Parsed Logical Plan ==
<span class="hljs-string">'Aggregate ['</span>country], [<span class="hljs-string">'country, '</span>count(<span class="hljs-number">1</span>) AS count<span class="hljs-comment">#108]</span>
+- Project [firstname<span class="hljs-comment">#95, country#97]</span>
   +- Filter (age<span class="hljs-comment">#96L &gt; cast(25 as bigint))</span>
      +- LogicalRDD [firstname<span class="hljs-comment">#95, age#96L, country#97], false</span>
</code></pre>
<h4 id="heading-2-analyzed-logical-plan">2. Analyzed Logical Plan</h4>
<p>The analyzed logical plan is the second stage in Spark’s query optimization. In this stage, Spark validates the query by checking if tables and columns actually exist in the Catalog and resolving all references. It converts all the unresolved logical plans into a resolved one with correct data types and column bindings before optimization.</p>
<h4 id="heading-3-optimized-logical-plan">3. Optimized Logical Plan</h4>
<p>The optimized logical plan is where Spark's Catalyst optimizer improves the logical plan by applying smart rules like filtering data early, removing unnecessary columns, and combining operations to reduce computation. It's the smarter, more efficient version of your original plan that will execute faster and use fewer resources.</p>
<p>Let’s understand using a simple code example:</p>
<pre><code class="lang-python">df.select(<span class="hljs-string">'firstname'</span>, <span class="hljs-string">'country'</span>) \
  .groupby(<span class="hljs-string">'country'</span>) \
  .count() \
  .filter(col(<span class="hljs-string">'country'</span>) == <span class="hljs-string">'USA'</span>) \
  .explain(<span class="hljs-literal">True</span>)
</code></pre>
<p>Here’s the parsed logical plan:</p>
<pre><code class="lang-python">== Parsed Logical Plan ==
<span class="hljs-string">'Filter '</span>`=`(<span class="hljs-string">'country, USA)
+- Aggregate [country#97], [country#97, count(1) AS count#122L]
   +- Project [firstname#95, country#97]
      +- LogicalRDD [firstname#95, age#96L, country#97], false</span>
</code></pre>
<p>What this means:</p>
<ul>
<li><p>Spark first projects firstname and country</p>
</li>
<li><p>Then aggregates by country</p>
</li>
<li><p>Then applies the filter country = 'USA' <strong>after</strong> aggregation</p>
</li>
</ul>
<p>(because that’s how you wrote it).</p>
<p>Here’s the optimized logical plan:</p>
<pre><code class="lang-python">== Optimized Logical Plan ==
Aggregate [country<span class="hljs-comment">#97], [country#97, count(1) AS count#122L]</span>
+- Project [country<span class="hljs-comment">#97]</span>
   +- Filter (isnotnull(country<span class="hljs-comment">#97) AND (country#97 = USA))</span>
      +- LogicalRDD [firstname<span class="hljs-comment">#95, age#96L, country#97], false</span>
</code></pre>
<p>Key improvements Catalyst applied:</p>
<ul>
<li><p>Filter pushdown: The filter country = 'USA' is pushed below the aggregation, so Spark only groups U.S. rows.</p>
</li>
<li><p>Column pruning: “firstname” is automatically removed because it’s never used in the final output.</p>
</li>
<li><p>Cleaner projection: Intermediate columns are dropped early, reducing I/O and in-memory footprint.</p>
</li>
</ul>
<h4 id="heading-4-physical-plan">4. Physical Plan</h4>
<p>The physical plan is Spark's final execution blueprint that shows exactly how the query will run: which specific algorithms to use, how to distribute work across machines, and the order of low-level operations. It's the concrete, executable version of the optimized logical plan, translated into actual Spark operations like “ShuffleExchange”, “HashAggregate”, and “FileScan” that will run on your cluster.</p>
<p>Catalyst may, for example:</p>
<ul>
<li><p>Fold constants (col("x") * 1 → col("x"))</p>
</li>
<li><p>Push filters closer to the data source</p>
</li>
<li><p>Replace a regular join with a broadcast join when data fits in memory</p>
</li>
</ul>
<p>Once the physical plan is finalized, Spark’s scheduler converts it into a DAG of stages and tasks that run across the cluster. Understanding that lineage, from your code → plan → DAG, is what separates fast jobs from slow ones.</p>
<h3 id="heading-how-to-read-a-logical-plan">How to Read a Logical Plan</h3>
<p>A logical plan prints as a tree: the bottom is your data source, and each higher node represents a transformation.</p>
<div class="hn-table">
<table>
<thead>
<tr>
<td><strong>Node</strong></td><td><strong>Meaning</strong></td></tr>
</thead>
<tbody>
<tr>
<td>Relation / LogicalRDD</td><td>Data source, the initial DataFrame</td></tr>
<tr>
<td>Project</td><td>Column selection and transformation (select, withColumn)</td></tr>
<tr>
<td>Filter</td><td>Row filtering based on conditions (where, filter)</td></tr>
<tr>
<td>Join</td><td>Combining two DataFrames (join, union)</td></tr>
<tr>
<td>Aggregate</td><td>GroupBy and aggregation operations (groupBy, agg)</td></tr>
<tr>
<td>Exchange</td><td>Shuffle operation (data redistribution across partitions)</td></tr>
<tr>
<td>Sort</td><td>Ordering data (orderBy, sort)</td></tr>
</tbody>
</table>
</div><p>Each node represents a transformation. Execution flows from the bottom up. Let's understand with a basic example:</p>
<pre><code class="lang-python"><span class="hljs-keyword">from</span> pyspark.sql <span class="hljs-keyword">import</span> SparkSession
<span class="hljs-keyword">from</span> pyspark.sql.functions <span class="hljs-keyword">import</span> *
<span class="hljs-keyword">from</span> pyspark.sql.types <span class="hljs-keyword">import</span> *

spark = SparkSession.builder.appName(<span class="hljs-string">"Practice"</span>).getOrCreate()

employees_data = [
    (<span class="hljs-number">1</span>, <span class="hljs-string">"John"</span>, <span class="hljs-string">"Doe"</span>, <span class="hljs-string">"Engineering"</span>, <span class="hljs-number">80000</span>, <span class="hljs-number">28</span>, <span class="hljs-string">"2020-01-15"</span>, <span class="hljs-string">"USA"</span>),
    (<span class="hljs-number">2</span>, <span class="hljs-string">"Jane"</span>, <span class="hljs-string">"Smith"</span>, <span class="hljs-string">"Engineering"</span>, <span class="hljs-number">85000</span>, <span class="hljs-number">32</span>, <span class="hljs-string">"2019-03-20"</span>, <span class="hljs-string">"USA"</span>),
    (<span class="hljs-number">3</span>, <span class="hljs-string">"Alice"</span>, <span class="hljs-string">"Johnson"</span>, <span class="hljs-string">"Sales"</span>, <span class="hljs-number">60000</span>, <span class="hljs-number">25</span>, <span class="hljs-string">"2021-06-10"</span>, <span class="hljs-string">"UK"</span>),
    (<span class="hljs-number">4</span>, <span class="hljs-string">"Bob"</span>, <span class="hljs-string">"Brown"</span>, <span class="hljs-string">"Engineering"</span>, <span class="hljs-number">90000</span>, <span class="hljs-number">35</span>, <span class="hljs-string">"2018-07-01"</span>, <span class="hljs-string">"USA"</span>),
    (<span class="hljs-number">5</span>, <span class="hljs-string">"Charlie"</span>, <span class="hljs-string">"Wilson"</span>, <span class="hljs-string">"Sales"</span>, <span class="hljs-number">65000</span>, <span class="hljs-number">29</span>, <span class="hljs-string">"2020-11-05"</span>, <span class="hljs-string">"UK"</span>),
    (<span class="hljs-number">6</span>, <span class="hljs-string">"David"</span>, <span class="hljs-string">"Lee"</span>, <span class="hljs-string">"HR"</span>, <span class="hljs-number">55000</span>, <span class="hljs-number">27</span>, <span class="hljs-string">"2021-01-20"</span>, <span class="hljs-string">"USA"</span>),
    (<span class="hljs-number">7</span>, <span class="hljs-string">"Eve"</span>, <span class="hljs-string">"Davis"</span>, <span class="hljs-string">"Engineering"</span>, <span class="hljs-number">95000</span>, <span class="hljs-number">40</span>, <span class="hljs-string">"2017-04-12"</span>, <span class="hljs-string">"Canada"</span>),
    (<span class="hljs-number">8</span>, <span class="hljs-string">"Frank"</span>, <span class="hljs-string">"Miller"</span>, <span class="hljs-string">"Sales"</span>, <span class="hljs-number">70000</span>, <span class="hljs-number">33</span>, <span class="hljs-string">"2019-09-25"</span>, <span class="hljs-string">"UK"</span>),
    (<span class="hljs-number">9</span>, <span class="hljs-string">"Grace"</span>, <span class="hljs-string">"Taylor"</span>, <span class="hljs-string">"HR"</span>, <span class="hljs-number">58000</span>, <span class="hljs-number">26</span>, <span class="hljs-string">"2021-08-15"</span>, <span class="hljs-string">"Canada"</span>),
    (<span class="hljs-number">10</span>, <span class="hljs-string">"Henry"</span>, <span class="hljs-string">"Anderson"</span>, <span class="hljs-string">"Engineering"</span>, <span class="hljs-number">88000</span>, <span class="hljs-number">31</span>, <span class="hljs-string">"2020-02-28"</span>, <span class="hljs-string">"USA"</span>)
]

df = spark.createDataFrame(employees_data,  
    [<span class="hljs-string">"id"</span>, <span class="hljs-string">"firstname"</span>, <span class="hljs-string">"lastname"</span>, <span class="hljs-string">"department"</span>, <span class="hljs-string">"salary"</span>, <span class="hljs-string">"age"</span>, <span class="hljs-string">"hire_date"</span>, <span class="hljs-string">"country"</span>])
</code></pre>
<h4 id="heading-version-a-withcolumn-filter">Version A: withColumn → filter</h4>
<p>In this version, we’re using a derived column "withColumn" and then applying a filter to the dataset. This ordering is logically correct and produces the expected result: it shows how introducing derived columns early affects the logical plan. This example shows what happens when Spark is asked to compute a new column before any rows are eliminated.</p>
<pre><code class="lang-python">df_filtered = df \
.withColumn(<span class="hljs-string">'bonus'</span>, col(<span class="hljs-string">'salary'</span>) * <span class="hljs-number">82</span>) \
.filter(col(<span class="hljs-string">'age'</span>) &gt; <span class="hljs-number">35</span>) \
.explain(<span class="hljs-literal">True</span>)
</code></pre>
<h4 id="heading-parsed-logical-plan-simplified">Parsed Logical Plan (Simplified)</h4>
<pre><code class="lang-python">Filter (age &gt; <span class="hljs-number">35</span>)
└─ Project [*, (salary * <span class="hljs-number">82</span>) AS bonus]
   └─ LogicalRDD [id, firstname, lastname, department, salary, age, hire_date, country]
</code></pre>
<p>So what’s going on here? Execution flows from the bottom up.</p>
<ul>
<li><p>Spark first reads the LogicalRDD.</p>
</li>
<li><p>Then applies the Project node, keeping all columns and adding bonus.</p>
</li>
<li><p>Finally, the Filter removes rows where age ≤ 35.</p>
</li>
</ul>
<p>This means Spark computes the bonus for every employee, even those who are later filtered out. It's harmless here, but costly on millions of rows, more computation, more I/O, more shuffle volume.</p>
<h4 id="heading-version-b-filter-project">Version B: Filter → Project</h4>
<p>In this version, we apply the filter before introducing the derived column. The idea is to show how pushing row-reducing operations earlier allows Catalyst to produce a leaner logical plan. Compared to Version A, this example demonstrates that the same logic, written in a different order, can significantly reduce the amount of work Spark needs to perform.</p>
<pre><code class="lang-python">df_filtered = df \
.filter(col(<span class="hljs-string">'age'</span>) &gt; <span class="hljs-number">35</span>) \
.withColumn(<span class="hljs-string">'bonus'</span>, col(<span class="hljs-string">'salary'</span>) * <span class="hljs-number">82</span>) \
.explain(<span class="hljs-literal">True</span>)
</code></pre>
<h4 id="heading-parsed-logical-plan-simplified-1">Parsed Logical Plan (Simplified)</h4>
<pre><code class="lang-python">Project [*, (salary * <span class="hljs-number">82</span>) AS bonus]

└─ Filter (age &gt; <span class="hljs-number">35</span>)

└─ LogicalRDD [id, firstname, lastname, department, salary, age, hire_date, country]
</code></pre>
<p>So what’s going on here?</p>
<ul>
<li><p>Spark starts from the LogicalRDD.</p>
</li>
<li><p>It immediately applies the Filter, reducing the dataset to only employees with age &gt; 35.</p>
</li>
<li><p>Then the Project node adds the derived column bonus for this smaller subset.</p>
</li>
</ul>
<p>Now the Filter sits below the Project in the plan, cutting data movement and minimizing computation. Spark prunes data first, then derives new columns. This order reduces both the volume of data processed and the amount transferred, leading to a lighter and faster plan.</p>
<h3 id="heading-why-you-should-look-at-the-plan-every-time-by-running-dfexplaintrue">Why You Should Look at the Plan Every Time by running <code>df.explain(True)</code></h3>
<p>This is the quickest way to spot performance issues <em>before</em> they hit production. It shows:</p>
<ul>
<li><p>Whether filters sit in the right place.</p>
</li>
<li><p>How many Project nodes exist (each adds overhead).</p>
</li>
<li><p>Where Exchange nodes appear (these are shuffle boundaries).</p>
</li>
<li><p>If Catalyst pushed filters or rewrote joins as expected.</p>
</li>
</ul>
<p>A quick <code>explain()</code> takes seconds, while debugging a bad shuffle in production takes hours. Run <code>explain()</code> whenever you add or reorder transformations. The plan never lies.</p>
<h4 id="heading-what-spark-does-under-the-hood">What Spark Does Under the Hood</h4>
<p>Catalyst can sometimes reorder simple filters automatically, but once you use UDFs, nested logic, or joins, it often can’t. That’s why the best habit is to write transformations in a way that already makes sense to the optimizer. Filter early, avoid redundant projections, and keep plans as shallow as possible.</p>
<p>Optimizing Spark isn’t about tuning cluster configs – it’s about writing code that yields efficient plans. If your plan shows late filters, too many projections, or multiple Exchange nodes, it’s already explaining why your job will run slow.</p>
<h2 id="heading-chapter-2-understanding-the-spark-execution-flow">Chapter 2: Understanding the Spark Execution Flow</h2>
<p>In Chapter 1, you learned how Spark interprets your transformations into logical plans – blueprints of what the job intends to do.</p>
<p>But Spark doesn't stop there. It must translate those plans into distributed actions across a cluster of executors, coordinate data movement, and handle any failures that may occur.</p>
<p>This chapter reveals what happens when that plan leaves the driver: how Spark breaks your job into stages, tasks, and a directed acyclic graph (DAG) that actually runs.</p>
<p>By the end, you’ll understand why some operations shuffle terabytes while others fly, and how to predict it before execution begins.</p>
<h3 id="heading-from-plans-to-stages-to-tasks">From Plans to Stages to Tasks</h3>
<p>A Spark job evolves through three conceptual layers:</p>
<div class="hn-table">
<table>
<thead>
<tr>
<td><strong>Layer</strong></td><td><strong>What It Represents</strong></td><td><strong>Example View</strong></td></tr>
</thead>
<tbody>
<tr>
<td>Plan</td><td>The optimized logical + physical representation of your query</td><td>Read → Filter → Join → Aggregate</td></tr>
<tr>
<td>Stage</td><td>A contiguous set of operations that can run without shuffling data</td><td>“Map Stage” or “Reduce Stage”</td></tr>
<tr>
<td>Task</td><td>The smallest unit of work, one per partition per stage</td><td>“Process Partition 7 of Stage 3”</td></tr>
</tbody>
</table>
</div><h4 id="heading-the-execution-trigger-actions-vs-transformations">The Execution Trigger: Actions vs Transformations</h4>
<p>Here's the critical distinction that determines when execution actually begins:</p>
<pre><code class="lang-python">df1 = spark.paraquet(<span class="hljs-string">"data.paraquet"</span>)
df2 = spark.filter(col(<span class="hljs-string">"age"</span>) &gt; <span class="hljs-number">25</span>)
df3 = spark.groupby(<span class="hljs-string">"city"</span>).count()
</code></pre>
<p>Nothing executes yet! Spark just builds up the logical plan, adding each transformation as a node in the plan tree. No data is read, no filters run, no shuffles happen.</p>
<h4 id="heading-actions-trigger-execution">Actions Trigger Execution</h4>
<p>Spark transformations are lazy. When a sequence of DataFrame operations is defined, a logical plan is created, but no computation takes place. It’s only when Spark encounters an action, an operation that needs a result to be returned to the driver or written out, that execution takes place.</p>
<p>For example:</p>
<pre><code class="lang-python">result = df3.collect()
</code></pre>
<p>At this stage, Spark materializes the logical plan, applies optimizations, creates a physical plan, and executes the job. Until Spark is asked to <strong>act</strong>, such as collect(), count(), or write(), it’s just describing what it needs to do – but it’s not actually doing it.</p>
<h4 id="heading-the-complete-execution-flow">The Complete Execution Flow</h4>
<p>Spark execution is initiated after the execution of an operation such as collect(). The driver then sends the optimized physical plan to the SparkContext, which is then forwarded to the DAG Scheduler. The physical plan is analyzed to determine shuffle boundaries created by wide operations such as <em>groupBy</em> or <em>orderBy</em>.</p>
<p>The plan is then divided into stages that contain narrow operations. These stages are sent to the Task Scheduler as a TaskSet. Each stage has a single task per partition.</p>
<p>The tasks are then assigned to the cores of the executor based on data locality. The execution of the tasks is then initiated. The execution of the stages is initiated after the completion of the previous stage. The final stage is initiated after the completion of the previous stage. The results of the final stage are then returned to the driver or stored.</p>
<h4 id="heading-what-triggers-a-shuffle">What Triggers a Shuffle</h4>
<p><img src="https://cdn.hashnode.com/res/hashnode/image/upload/v1769457412199/308bc894-66a9-4c01-aae1-9ae42e64d32c.png" alt="Comparison of Spark shuffle behavior before and after groupBy" class="image--center mx-auto" width="1920" height="992" loading="lazy"></p>
<p>A shuffle occurs when Spark needs to redistribute data across partitions, typically because the operation requires grouping, joining, or repartitioning data in a way that can’t be done locally within existing partitions.</p>
<p>Common shuffle triggers:</p>
<div class="hn-table">
<table>
<thead>
<tr>
<td><strong>Operation</strong></td><td><strong>Why it Shuffles</strong></td></tr>
</thead>
<tbody>
<tr>
<td>groupBy(), reduceByKey()</td><td>Data with the same key must co-locate for aggregation</td></tr>
<tr>
<td>join()</td><td>Matching keys may reside in different partitions</td></tr>
<tr>
<td>orderBy() / sort()</td><td>Requires global ordering across all partitions</td></tr>
<tr>
<td>distinct()</td><td>Needs comparison of all values across partitions</td></tr>
<tr>
<td>repartition(n)</td><td>Explicit redistribution to a new number of partitions</td></tr>
</tbody>
</table>
</div><pre><code class="lang-python">df.groupBy(<span class="hljs-string">"user_id”) \
  .agg(sum("</span>amount<span class="hljs-string">"))</span>
</code></pre>
<p>In Stage 1 (Map), each task performs a partial aggregation on its partition and writes a shuffle file to disk. During the shuffle, each executor retrieves these files across the network such that all records with the same hash(user_id) % numPartitions are colocated.</p>
<p>In Stage 2 (Reduce), each task performs a final aggregation on its partitioned data and writes back to disk. Because Spark has tracked this process as a DAG, a failed task can re-read only the affected shuffle files instead of re-computing the entire DAG.</p>
<p>In practice, a healthy job has 2-6 stages. Seeing 20+ stages for such simple logic usually means unnecessary shuffles or bad partitioning.</p>
<h4 id="heading-why-shuffles-create-stage-boundaries">Why Shuffles Create Stage Boundaries</h4>
<p>Shuffles force data to move across the network between executors. Spark cannot continue processing until:</p>
<ul>
<li><p>All tasks in the current stage write their shuffle output to disk</p>
</li>
<li><p>The shuffle data is available for the next stage to read over the network</p>
</li>
</ul>
<p>This dependency creates a natural boundary – so a new stage begins after every shuffle. The DAG Scheduler uses these boundaries to determine where stages must wait for previous stages to complete.</p>
<h4 id="heading-common-performance-bottlenecks">Common Performance Bottlenecks</h4>
<div class="hn-table">
<table>
<thead>
<tr>
<td><strong>Bottleneck Type</strong></td><td><strong>Symptom</strong></td><td><strong>Solution</strong></td></tr>
</thead>
<tbody>
<tr>
<td>Data skew</td><td>Few tasks run much longer</td><td>Use salting, split hot keys, or AQE skew join</td></tr>
<tr>
<td>Small files</td><td>Too many tasks, high overhead</td><td>Coalesce or repartition after read</td></tr>
<tr>
<td>Large shuffle</td><td>High network I/O, spill to disk</td><td>Filter early, broadcast small tables, reduce cardinality</td></tr>
<tr>
<td>Unnecessary stages</td><td>Extra Exchange nodes in plan</td><td>Combine operations, remove redundant repartitions</td></tr>
<tr>
<td>Inefficient file formats</td><td>Slow reads, no predicate pushdown</td><td>Use Parquet or ORC with partitioning</td></tr>
<tr>
<td>Complex data types</td><td>Serialization overhead, large objects</td><td>Use simple types, cache in serialized form</td></tr>
</tbody>
</table>
</div><p>Let’s ground this with a small but realistic pattern using the same employees DataFrame. <strong>Goal:</strong> average salary per department and country, only for employees older than 30.</p>
<p>Naïve approach:</p>
<pre><code class="lang-python"><span class="hljs-keyword">from</span> pyspark.sql.functions <span class="hljs-keyword">import</span> col, when, avg

df_dept_country = df.select(<span class="hljs-string">"department"</span>, <span class="hljs-string">"country"</span>).distinct()

df_result = (
    df.withColumn(
        <span class="hljs-string">"age_group"</span>,
        when(col(<span class="hljs-string">"age"</span>) &lt; <span class="hljs-number">30</span>, <span class="hljs-string">"junior"</span>)
        .when(col(<span class="hljs-string">"age"</span>) &lt; <span class="hljs-number">40</span>, <span class="hljs-string">"mid"</span>)
        .otherwise(<span class="hljs-string">"senior"</span>)
    )
    .join(df_dept_country, [<span class="hljs-string">"department"</span>], <span class="hljs-string">"inner"</span>)
    .groupBy(<span class="hljs-string">"department"</span>, <span class="hljs-string">"country"</span>)
    .agg(avg(<span class="hljs-string">"salary"</span>).alias(<span class="hljs-string">"avg_salary"</span>))
</code></pre>
<p>This looks harmless, but:</p>
<ul>
<li><p>The join on "department" introduces a wide dependency → shuffle #1.</p>
</li>
<li><p>The groupBy("department", "country") introduces another wide dependency → shuffle #2.</p>
</li>
</ul>
<p>So we have two shuffles for what should be a simple aggregation. If you run explain on the df_result, you’ll see two exchange nodes, each marking a shuffle and stage boundary.</p>
<h4 id="heading-optimized-approach">Optimized Approach</h4>
<p>We can do better by filtering early, broadcasting the small dimension (df_dept_country), and keeping only one global shuffle for aggregation.</p>
<pre><code class="lang-python"><span class="hljs-keyword">from</span> pyspark.sql.functions <span class="hljs-keyword">import</span> broadcast

df_dept_country = df.select(<span class="hljs-string">"department"</span>, <span class="hljs-string">"country"</span>).distinct()

df_result_optimized = (
    df.filter(col(<span class="hljs-string">"age"</span>) &gt; <span class="hljs-number">30</span>)
        .join(broadcast(df_dept_country), [<span class="hljs-string">"department"</span>], <span class="hljs-string">"inner"</span>)
        .groupBy(<span class="hljs-string">"department"</span>, <span class="hljs-string">"country"</span>)
        .agg(avg(<span class="hljs-string">"salary"</span>).alias(<span class="hljs-string">"avg_salary"</span>))
)
</code></pre>
<p>What changed:</p>
<ul>
<li><p>filter(col("age") &gt; 30) is narrow and runs before any shuffle.</p>
</li>
<li><p>broadcast(df_dept_country) avoids a shuffle for the join.</p>
</li>
<li><p>Only the groupBy("department", "country") causes a single shuffle.</p>
</li>
</ul>
<p>Now explain shows just one Exchange.</p>
<div class="hn-table">
<table>
<thead>
<tr>
<td><strong>Version</strong></td><td><strong>Shuffles</strong></td><td><strong>Stages</strong></td><td><strong>Notes</strong></td></tr>
</thead>
<tbody>
<tr>
<td>Naïve</td><td>2</td><td>~4 (2 map + 2 reduce)</td><td>Join shuffle + groupBy shuffle = double overhead</td></tr>
<tr>
<td>Optimized</td><td>1</td><td>~2 (1 map + 1 reduce)</td><td>Broadcast join avoids shuffle. Only groupBy shuffles</td></tr>
</tbody>
</table>
</div><h2 id="heading-chapter-3-reading-and-debugging-plans-like-a-pro">Chapter 3: Reading and Debugging Plans Like a Pro</h2>
<p>As explained in Chapter 1, Spark executes transformations based on three levels: the logical plan, the optimized logical plan (Catalyst), and the physical plan. This chapter will expand on this explanation and concentrate on the impact of the logical plan on shuffle and execution performance.</p>
<p>By now, you understand how Spark builds and <em>executes</em> plans. But reading those plans and instantly spotting inefficiencies is the real superpower of a performance-focused data engineer.</p>
<p>Spark’s explain() output isn’t random jargon. It’s a precise log of Spark’s thought process. Once you learn to read it, every optimization becomes obvious.</p>
<h3 id="heading-three-layers-in-spark"><strong>Three Layers in Spark</strong></h3>
<p>As we talked about above, every Spark plan has three key views, printed when you call df.explain(True). Let’s review them now:</p>
<ol>
<li><p>Parsed Logical Plan: The raw intent Spark inferred from your code. It may include unresolved column names or expressions.</p>
</li>
<li><p>Analyzed / Optimized Logical Plan: After Spark applies Catalyst optimizations: constant folding, predicate pushdown, column pruning, and plan rearrangements.</p>
</li>
<li><p>Physical Plan: What your executors actually run: joins, shuffles, exchanges, scans, and code-generated operators.</p>
</li>
</ol>
<p>Each stage narrows the gap between what you <em>asked</em> Spark to do and what Spark decides to do.</p>
<pre><code class="lang-python">df_avg = df.filter(col(<span class="hljs-string">"age"</span>) &gt; <span class="hljs-number">30</span>)
        .groupBy(<span class="hljs-string">"department"</span>)
        .agg(avg(<span class="hljs-string">"salary"</span>).alias(<span class="hljs-string">"avg_salary"</span>))

df_avg.explain(<span class="hljs-literal">True</span>)
</code></pre>
<p><strong>1. Parsed Logical Plan</strong></p>
<pre><code class="lang-python">== Parsed Logical Plan ==
<span class="hljs-string">'Aggregate ['</span>department], [<span class="hljs-string">'department, '</span>avg(<span class="hljs-string">'salary) AS avg_salary#8]
+- Filter (age#5L &gt; cast(30 as bigint))
   +- LogicalRDD [id#0L, firstname#1, lastname#2, department#3, salary#4L, age#5L, hire_date#6, country#7], false</span>
</code></pre>
<p>How to read this</p>
<ul>
<li><p>Bottom → data source (LogicalRDD).</p>
</li>
<li><p>Middle → Filter: Spark hasn’t yet optimized column references.</p>
</li>
<li><p>Top → Aggregate: high-level grouping intent.</p>
</li>
</ul>
<p>At this stage, the plan may include unresolved symbols (like 'department or 'avg('salary)), meaning Spark hasn’t yet validated column existence or data types.</p>
<p><strong>2. Optimized Logical Plan</strong></p>
<pre><code class="lang-python">
== Optimized Logical Plan ==
Aggregate [department<span class="hljs-comment">#3], [department#3, avg(salary#4L) AS avg_salary#8]</span>
+- Project [department<span class="hljs-comment">#3, salary#4L]</span>
   +- Filter (isnotnull(age<span class="hljs-comment">#5L) AND (age#5L &gt; 30))</span>
      +- LogicalRDD [id<span class="hljs-comment">#0L, firstname#1, lastname#2, department#3, salary#4L, age#5L, hire_date#6, country#7], false</span>
</code></pre>
<p>Here, Catalyst has done its job:</p>
<ul>
<li><p>Column IDs (#11, #12L) are resolved.</p>
</li>
<li><p>Unused columns are pruned – no need to carry them forward.</p>
</li>
<li><p>The plan now accurately reflects Spark’s optimized logical intent.</p>
</li>
</ul>
<p>If you ever wonder whether Spark pruned columns or pushed filters, this is the section to check.</p>
<p><strong>3. Physical Plan</strong></p>
<pre><code class="lang-python">== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[department<span class="hljs-comment">#3], functions=[avg(salary#4L)], output=[department#3, avg_salary#8])</span>
   +- Exchange hashpartitioning(department<span class="hljs-comment">#3, 200), ENSURE_REQUIREMENTS, [plan_id=19]</span>
      +- HashAggregate(keys=[department<span class="hljs-comment">#3], functions=[partial_avg(salary#4L)], output=[department#3, sum#20, count#21L])</span>
         +- Project [department<span class="hljs-comment">#3, salary#4L]</span>
            +- Filter (isnotnull(age<span class="hljs-comment">#5L) AND (age#5L &gt; 30))</span>
               +- Scan ExistingRDD[id<span class="hljs-comment">#0L,firstname#1,lastname#2,department#3,salary#4L,age#5L,hire_date#6,country#7]</span>
</code></pre>
<p><strong>Breakdown</strong></p>
<ul>
<li><p>Scan ExistingRDD → Spark reading from the in-memory DataFrame.</p>
</li>
<li><p>Filter → narrow transformation, no shuffle.</p>
</li>
<li><p>HashAggregate → partial aggregation per partition.</p>
</li>
<li><p>Exchange → wide dependency: data is shuffled by department.</p>
</li>
<li><p>Top HashAggregate → final aggregation after shuffle.</p>
</li>
</ul>
<p>This structure – partial agg → shuffle → final agg – is Spark’s default two-phase aggregation pattern.</p>
<h4 id="heading-recognizing-common-nodes">Recognizing Common Nodes</h4>
<div class="hn-table">
<table>
<thead>
<tr>
<td><strong>Node / Operator</strong></td><td><strong>Meaning</strong></td><td><strong>Optimization Hint</strong></td></tr>
</thead>
<tbody>
<tr>
<td>Project</td><td>Column selection or computed fields</td><td>Combine multiple withColumn() into one select()</td></tr>
<tr>
<td>Filter</td><td>Predicate on rows</td><td>Push filters as low as possible in the plan</td></tr>
<tr>
<td>Join</td><td>Combine two DataFrames</td><td>Broadcast smaller side if &lt; 10 MB</td></tr>
<tr>
<td>Aggregate</td><td>GroupBy, sum, avg, count</td><td>Filter before aggregating to reduce cardinality</td></tr>
<tr>
<td>Exchange</td><td>Shuffle / data redistribution</td><td>Minimize by filtering early, using broadcast join</td></tr>
<tr>
<td>Sort</td><td>OrderBy, sort</td><td>Avoid global sorts; use within partitions if possible</td></tr>
<tr>
<td>Window</td><td>Windowed analytics (row_number, rank)</td><td>Partition on selective keys to reduce shuffle</td></tr>
</tbody>
</table>
</div><p>Repeated invocations of withColumn stack multiple Project nodes, which increases the plan depth. Instead, combine these invocations using select.</p>
<p>Multiple Exchange nodes imply repeated data shuffles. You can eliminate these by broadcasting the data or filtering.</p>
<p>Multiple scans of the same table within a single operation imply that some caching of strategic intermediates is lacking.</p>
<p>And frequent SortMergeJoin operations imply that Spark is unnecessarily sorting and shuffling the data. You can eliminate these by broadcasting the smaller dataframe or bucketing.</p>
<h4 id="heading-debugging-strategy-read-plans-from-top-to-bottom">Debugging Strategy: Read Plans from Top to Bottom</h4>
<p>Remember: Spark <em>executes</em> plans from bottom up (from data source to final result). But when you're debugging, you read from the top down (from the output schema back to the root cause). This reversal is intentional: you start with what's wrong at the output level, then trace backward through the plan to find where the inefficiency was introduced.</p>
<p>When debugging a slow job:</p>
<ul>
<li><p>Start at the top: Identify output schema and major operators (HashAggregate, Join, and so on).</p>
</li>
<li><p>Scroll for Exchanges: Count them. Each = stage boundary. Ask “Why do I need this shuffle?”</p>
</li>
<li><p>Trace backward: See if filters or projections appear below or above joins.</p>
</li>
<li><p>Look for duplication: Same scan twice? Missing cache? Re-derived columns?</p>
</li>
<li><p>Check join strategy: If it’s SortMergeJoin but one table is small, force a broadcast().</p>
</li>
<li><p>Re-run explain after optimization: You should literally see the extra nodes disappear.</p>
</li>
</ul>
<h4 id="heading-catalyst-optimizer-in-action">Catalyst Optimizer in Action</h4>
<p>Catalyst applies dozens of rules automatically. Knowing a few helps you interpret what changed:</p>
<div class="hn-table">
<table>
<thead>
<tr>
<td><strong>Optimization Rule</strong></td><td><strong>Example Transformation</strong></td></tr>
</thead>
<tbody>
<tr>
<td>Predicate Pushdown</td><td>Moves filters below joins/scans</td></tr>
<tr>
<td>Constant Folding</td><td>Replaces salary * 1 with salary</td></tr>
<tr>
<td>Column Pruning</td><td>Drops unused columns early</td></tr>
<tr>
<td>Combine Filters</td><td>Merges consecutive filters into one</td></tr>
<tr>
<td>Simplify Casts</td><td>Removes redundant type casts</td></tr>
<tr>
<td>Reorder Joins / Join Reordering</td><td>Changes join order for cheaper plan</td></tr>
</tbody>
</table>
</div><p>Putting it all together: every plan tells a story:</p>
<p><img src="https://cdn.hashnode.com/res/hashnode/image/upload/v1769458525411/64fa30a4-b16e-4aed-8c04-d12b476d9ae6.png" alt="Spark Plans and Stages" class="image--center mx-auto" width="1920" height="404" loading="lazy"></p>
<p>As you progress through the practical scenarios in Chapter 4, read every plan before and after. Your goal isn't memorization – it's intuition.</p>
<h2 id="heading-chapter-4-writing-efficient-transformations">Chapter 4: Writing Efficient Transformations</h2>
<p>Every Spark job tells a story, not in code, but in plans. By now, you've seen how Spark interprets transformations (Chapter 1), how it executes them through stages and tasks (Chapter 2), and how to read plans like a detective (Chapter 3). Now comes the part where you apply that knowledge: writing transformations that yield efficient logical plans.</p>
<p>This chapter is the heart of the handbook. It's where we move from understanding Spark's mind to writing code that speaks its language fluently.</p>
<h3 id="heading-why-transformations-matter">Why Transformations Matter</h3>
<p>In PySpark, most performance issues don’t start in clusters or configurations. They start in transformations: the way we chain, filter, rename, or join data. Every transformation reshapes the logical plan, influencing how Spark optimizes, when it shuffles, and whether the final DAG is streamlined or tangled.</p>
<p>A good transformation sequence:</p>
<ul>
<li><p>Keeps plans shallow, not nested.</p>
</li>
<li><p>Applies filters early, not after computation.</p>
</li>
<li><p>Reduces data movement, not just data size.</p>
</li>
<li><p>Let’s Catalyst and AQE optimize freely, without user-induced constraints.</p>
</li>
</ul>
<p>A bad one can double runtime, and you won't see it in your code, only in your plan.</p>
<h3 id="heading-the-goal-of-this-chapter">The Goal of this Chapter</h3>
<p>We’ll explore a series of real-world optimization scenarios, drawn from production ETL and analytical pipelines, each showing how a small change in code can completely reshape the logical plan and execution behavior.</p>
<p>Each scenario is practical and short, following a consistent structure. By the end of this chapter, you’ll be able to <em>see</em> optimization opportunities the moment you write code, because you’ll know exactly how they alter the logical plan beneath.</p>
<h3 id="heading-before-you-dive-in">Before You Dive In:</h3>
<p>Open a Spark shell or notebook. Load your familiar employees DataFrame. Run every example, and compare the explain("formatted") output before and after the fix. Because in this chapter, performance isn’t about more theory, it’s about seeing the difference in the plan and feeling the difference in execution time.</p>
<h3 id="heading-scenario-1-rename-in-one-pass-withcolumnrenamed-vs-todf">Scenario 1: Rename in One Pass: withColumnRenamed() vs toDF()</h3>
<p>If you’ve worked with PySpark DataFrames, you’ve probably had to rename columns, either by calling withColumnRenamed() repeatedly or by using toDF() in one shot.</p>
<p>At first glance, both approaches produce identical results: the columns have the new names you wanted. But beneath the surface, Spark treats them very differently – and that difference shows up directly in your logical plan.</p>
<pre><code class="lang-python">df_renamed = (df.withColumnRenamed(<span class="hljs-string">"id"</span>, <span class="hljs-string">"emp_id"</span>)
    .withColumnRenamed(<span class="hljs-string">"firstname"</span>, <span class="hljs-string">"first_name"</span>)
    .withColumnRenamed(<span class="hljs-string">"lastname"</span>, <span class="hljs-string">"last_name"</span>)
    .withColumnRenamed(<span class="hljs-string">"department"</span>, <span class="hljs-string">"dept"</span>)
    .withColumnRenamed(<span class="hljs-string">"salary"</span>, <span class="hljs-string">"base_salary"</span>)
    .withColumnRenamed(<span class="hljs-string">"age"</span>, <span class="hljs-string">"age_years"</span>)
    .withColumnRenamed(<span class="hljs-string">"hire_date"</span>, <span class="hljs-string">"hired_on"</span>)
    .withColumnRenamed(<span class="hljs-string">"country"</span>, <span class="hljs-string">"country_code"</span>)
)
</code></pre>
<p>This is simple and readable. But Spark builds the plan step by step, adding one Project node for every rename. Each Project node copies all existing columns, plus the newly renamed one. In large schemas (hundreds of columns), this silently bloats the plan.</p>
<h4 id="heading-logical-plan-impact">Logical Plan Impact:</h4>
<pre><code class="lang-python">Project [emp_id, first_name, last_name, dept, base_salary, age_years, hired_on, country_code]

└─ Project [id, first_name, last_name, dept, base_salary, age_years, hired_on, country_code]

└─ Project [id, firstname, last_name, dept, base_salary, age_years, hired_on, country_code]

└─ Project [id, firstname, lastname, dept, base_salary, age_years, hire_date, country_code]

└─ Project [id, firstname, lastname, department, base_salary, age_years, hire_date, country]

└─ Project [id, firstname, lastname, department, salary, age_years, hire_date, country]

└─ Project [id, firstname, lastname, department, salary, age, hire_date, country]

└─ LogicalRDD [id, firstname, lastname, department, salary, age, hire_date, country]
</code></pre>
<p>Each rename adds a new Project layer, deepening the DAG. Spark now has to materialize intermediate projections before applying the next one. You can see this by running: <em>df.explain(True).</em></p>
<h4 id="heading-the-better-approach-rename-once-with-todf">The Better Approach: Rename Once with toDF()</h4>
<p>Instead of chaining multiple renames, rename all columns in a single pass:</p>
<pre><code class="lang-python">new_cols = [<span class="hljs-string">"id"</span>, <span class="hljs-string">"first_name"</span>, <span class="hljs-string">"last_name"</span>, <span class="hljs-string">"department"</span>,
            <span class="hljs-string">"salary"</span>, <span class="hljs-string">"age"</span>, <span class="hljs-string">"hired_on"</span>, <span class="hljs-string">"country"</span>]

df_renamed = df.toDF(*new_cols)
</code></pre>
<h4 id="heading-logical-plan-impact-1">Logical Plan Impact:</h4>
<pre><code class="lang-python">Project [id, first_name, last_name, department, salary, age, hired_on, country]

└─ LogicalRDD [id, firstname, lastname, department, salary, age, hire_date, country]
</code></pre>
<p>Now there’s just one Project node, which means one projection over the source data. This gives us a flatter, more efficient plan.</p>
<h4 id="heading-under-the-hood-what-spark-actually-does">Under the Hood: What Spark Actually Does</h4>
<p>Every time you call withColumnRenamed(), Spark rewrites the entire projection list. Catalyst treats the rename as a full column re-selection from the previous node, not as a light-weight alias update. When you chain several renames, Catalyst duplicates internal column metadata for each intermediate step.</p>
<p>By contrast, toDF() rebases the schema in a single action. Catalyst interprets it as a single schema rebinding, so no redundant metadata trees are created.</p>
<h4 id="heading-real-world-timing-glue-job-benchmark">Real-World Timing: Glue Job Benchmark</h4>
<p>To see if chained withColumnRenamed calls add real overhead, here's a simple timing test performed on a Glue job using a DataFrame with 1M rows. First using withColumnRenamed:</p>
<pre><code class="lang-python"><span class="hljs-keyword">import</span> time
<span class="hljs-keyword">from</span> pyspark.sql <span class="hljs-keyword">import</span> SparkSession

spark = SparkSession.builder.appName(<span class="hljs-string">"MillionRowsRenameTest"</span>).getOrCreate()

employees_data = [
    (<span class="hljs-number">1</span>, <span class="hljs-string">"John"</span>, <span class="hljs-string">"Doe"</span>, <span class="hljs-string">"Engineering"</span>, <span class="hljs-number">80000</span>, <span class="hljs-number">28</span>, <span class="hljs-string">"2020-01-15"</span>, <span class="hljs-string">"USA"</span>),
    (<span class="hljs-number">2</span>, <span class="hljs-string">"Jane"</span>, <span class="hljs-string">"Smith"</span>, <span class="hljs-string">"Engineering"</span>, <span class="hljs-number">85000</span>, <span class="hljs-number">32</span>, <span class="hljs-string">"2019-03-20"</span>, <span class="hljs-string">"USA"</span>),
    (<span class="hljs-number">3</span>, <span class="hljs-string">"Alice"</span>, <span class="hljs-string">"Johnson"</span>, <span class="hljs-string">"Sales"</span>, <span class="hljs-number">60000</span>, <span class="hljs-number">25</span>, <span class="hljs-string">"2021-06-10"</span>, <span class="hljs-string">"UK"</span>),
    (<span class="hljs-number">4</span>, <span class="hljs-string">"Bob"</span>, <span class="hljs-string">"Brown"</span>, <span class="hljs-string">"Engineering"</span>, <span class="hljs-number">90000</span>, <span class="hljs-number">35</span>, <span class="hljs-string">"2018-07-01"</span>, <span class="hljs-string">"USA"</span>),
    (<span class="hljs-number">5</span>, <span class="hljs-string">"Charlie"</span>, <span class="hljs-string">"Wilson"</span>, <span class="hljs-string">"Sales"</span>, <span class="hljs-number">65000</span>, <span class="hljs-number">29</span>, <span class="hljs-string">"2020-11-05"</span>, <span class="hljs-string">"UK"</span>),
    (<span class="hljs-number">6</span>, <span class="hljs-string">"David"</span>, <span class="hljs-string">"Lee"</span>, <span class="hljs-string">"HR"</span>, <span class="hljs-number">55000</span>, <span class="hljs-number">27</span>, <span class="hljs-string">"2021-01-20"</span>, <span class="hljs-string">"USA"</span>),
    (<span class="hljs-number">7</span>, <span class="hljs-string">"Eve"</span>, <span class="hljs-string">"Davis"</span>, <span class="hljs-string">"Engineering"</span>, <span class="hljs-number">95000</span>, <span class="hljs-number">40</span>, <span class="hljs-string">"2017-04-12"</span>, <span class="hljs-string">"Canada"</span>),
    (<span class="hljs-number">8</span>, <span class="hljs-string">"Frank"</span>, <span class="hljs-string">"Miller"</span>, <span class="hljs-string">"Sales"</span>, <span class="hljs-number">70000</span>, <span class="hljs-number">33</span>, <span class="hljs-string">"2019-09-25"</span>, <span class="hljs-string">"UK"</span>),
    (<span class="hljs-number">9</span>, <span class="hljs-string">"Grace"</span>, <span class="hljs-string">"Taylor"</span>, <span class="hljs-string">"HR"</span>, <span class="hljs-number">58000</span>, <span class="hljs-number">26</span>, <span class="hljs-string">"2021-08-15"</span>, <span class="hljs-string">"Canada"</span>),
    (<span class="hljs-number">10</span>, <span class="hljs-string">"Henry"</span>, <span class="hljs-string">"Anderson"</span>, <span class="hljs-string">"Engineering"</span>, <span class="hljs-number">88000</span>, <span class="hljs-number">31</span>, <span class="hljs-string">"2020-02-28"</span>, <span class="hljs-string">"USA"</span>)
]

multiplied_data = [(i, <span class="hljs-string">f"firstname_<span class="hljs-subst">{i}</span>"</span>, <span class="hljs-string">f"lastname_<span class="hljs-subst">{i}</span>"</span>,
                    employees_data[i % <span class="hljs-number">10</span>][<span class="hljs-number">3</span>],  <span class="hljs-comment"># department</span>
                    employees_data[i % <span class="hljs-number">10</span>][<span class="hljs-number">4</span>],  <span class="hljs-comment"># salary</span>
                    employees_data[i % <span class="hljs-number">10</span>][<span class="hljs-number">5</span>],  <span class="hljs-comment"># age</span>
                    employees_data[i % <span class="hljs-number">10</span>][<span class="hljs-number">6</span>],  <span class="hljs-comment"># hire_date</span>
                    employees_data[i % <span class="hljs-number">10</span>][<span class="hljs-number">7</span>])  <span class="hljs-comment"># country</span>
                   <span class="hljs-keyword">for</span> i <span class="hljs-keyword">in</span> range(<span class="hljs-number">1</span>, <span class="hljs-number">1</span>_000_001)]

df = spark.createDataFrame(multiplied_data,
                           [<span class="hljs-string">"id"</span>, <span class="hljs-string">"firstname"</span>, <span class="hljs-string">"lastname"</span>, <span class="hljs-string">"department"</span>, <span class="hljs-string">"salary"</span>, <span class="hljs-string">"age"</span>, <span class="hljs-string">"hire_date"</span>, <span class="hljs-string">"country"</span>])

start = time.time()
df1 = (df
       .withColumnRenamed(<span class="hljs-string">"firstname"</span>, <span class="hljs-string">"first_name"</span>)
       .withColumnRenamed(<span class="hljs-string">"lastname"</span>, <span class="hljs-string">"last_name"</span>)
       .withColumnRenamed(<span class="hljs-string">"department"</span>, <span class="hljs-string">"dept_name"</span>)
       .withColumnRenamed(<span class="hljs-string">"salary"</span>, <span class="hljs-string">"annual_salary"</span>)
       .withColumnRenamed(<span class="hljs-string">"age"</span>, <span class="hljs-string">"emp_age"</span>)
       .withColumnRenamed(<span class="hljs-string">"hire_date"</span>, <span class="hljs-string">"hired_on"</span>)
       .withColumnRenamed(<span class="hljs-string">"country"</span>, <span class="hljs-string">"work_country"</span>))

print(<span class="hljs-string">"withColumnRenamed Count:"</span>, df1.count())
print(<span class="hljs-string">"withColumnRenamed time:"</span>, round(time.time() - start, <span class="hljs-number">2</span>), <span class="hljs-string">"seconds"</span>)
</code></pre>
<p>Using toDF:</p>
<pre><code class="lang-python">start = time.time()
df2 = df.toDF(<span class="hljs-string">"id"</span>, <span class="hljs-string">"first_name"</span>, <span class="hljs-string">"last_name"</span>, <span class="hljs-string">"dept_name"</span>, <span class="hljs-string">"annual_salary"</span>, <span class="hljs-string">"emp_age"</span>, <span class="hljs-string">"hired_on"</span>, <span class="hljs-string">"work_country"</span>)
print(<span class="hljs-string">"toDF Count:"</span>, df2.count())
print(<span class="hljs-string">"toDF time:"</span>, round(time.time() - start, <span class="hljs-number">2</span>), <span class="hljs-string">"seconds"</span>)

spark.stop()
</code></pre>
<div class="hn-table">
<table>
<thead>
<tr>
<td><strong>Approach</strong></td><td><strong>Number of Project Nodes</strong></td><td><strong>Glue Execution Time (1M rows)</strong></td><td><strong>Plan Complexity</strong></td></tr>
</thead>
<tbody>
<tr>
<td>Chained withColumnRenamed()</td><td>8 nodes</td><td>~12 seconds</td><td>Deep, nested</td></tr>
<tr>
<td>Single toDF()</td><td>1 node</td><td>~8 seconds</td><td>Flat, simple</td></tr>
</tbody>
</table>
</div><p>The difference becomes important at larger sizes or in complex pipelines, especially on managed runtimes such as AWS Glue (where planning overhead becomes important), or when tens of millions of rows are involved, where each additional Project increases column resolution, metadata work, and DAG height. And since Spark can’t collapse chained projections when column names are changed, renaming all columns in one go with toDF() results in a flatter logical and physical plan: one rename, one projection, and faster execution.</p>
<h3 id="heading-scenario-2-reusing-expressions">Scenario 2: Reusing Expressions</h3>
<p>Sometimes Spark jobs run slower, not because of shuffles or joins, but because the same computation is performed repeatedly within the logical plan. Every time you repeat an expression, say, col("salary") * 0.1 in multiple places, Spark treats it as a <em>new</em> derived column, expanding the logical plan and forcing redundant work.</p>
<h4 id="heading-the-problem-repeated-expressions">The Problem: Repeated Expressions</h4>
<p>Let’s say we’re calculating bonus and total compensation for employees:</p>
<pre><code class="lang-python">df_expr = (
    df.withColumn(<span class="hljs-string">"bonus"</span>, col(<span class="hljs-string">"salary"</span>) * <span class="hljs-number">0.10</span>)
      .withColumn(<span class="hljs-string">"total_comp"</span>, col(<span class="hljs-string">"salary"</span>) + (col(<span class="hljs-string">"salary"</span>) * <span class="hljs-number">0.10</span>))
)
</code></pre>
<p>At first glance, it’s simple enough. But Spark’s optimizer doesn’t automatically know that the (col("salary") * 0.10) in the second column is identical to the one computed in the first. Both get evaluated separately in the logical plan.</p>
<p><strong>Simplified Logical Plan:</strong></p>
<pre><code class="lang-python">Project [id, firstname, lastname, department,

salary, age, hire_date, country,

(salary * <span class="hljs-number">0.10</span>) AS bonus,

(salary + (salary * <span class="hljs-number">0.10</span>)) AS total_comp]

└─ LogicalRDD [id, firstname, lastname, department, salary, age, hire_date, country]
</code></pre>
<p>While this looks compact, Spark must compute (salary * 0.10) twice, once for bonus, again inside total_comp. For a large dataset (say 100 M rows), that’s two full column evaluations. The waste compounds when your expression is complex, imagine parsing JSON, applying UDFs, or running date arithmetic multiple times.</p>
<h4 id="heading-the-better-approach-compute-once-reuse-everywhere">The Better Approach: Compute Once, Reuse Everywhere</h4>
<p>Compute the expression once, store it as a column, and reference it later:</p>
<pre><code class="lang-python">df_expr = (
    df.withColumn(<span class="hljs-string">"bonus"</span>, col(<span class="hljs-string">"salary"</span>) * <span class="hljs-number">0.10</span>)
      .withColumn(<span class="hljs-string">"total_comp"</span>, col(<span class="hljs-string">"salary"</span>) + col(<span class="hljs-string">"bonus"</span>))
)
</code></pre>
<p><strong>Simplified Logical Plan:</strong></p>
<pre><code class="lang-python">Project [id, firstname, lastname, department,

salary, age, hire_date, country,

(salary * <span class="hljs-number">0.10</span>) AS bonus,

(salary + bonus) AS total_comp]

└─ LogicalRDD [id, firstname, lastname, department, salary, age, hire_date, country]
</code></pre>
<p>Now Spark calculates (salary * 0.10) once, stores it in the bonus column, and reuses that column when computing total_comp. This single change cuts CPU cost and memory usage.</p>
<h4 id="heading-under-the-hood-why-repetition-hurts">Under the Hood: Why Repetition Hurts</h4>
<p>Spark’s Catalyst optimizer doesn’t automatically factor out repeated expressions across different columns. Each withColumn() creates a new Project node with its own expression tree. If multiple nodes reuse the same arithmetic or function, Catalyst re-evaluates them independently.</p>
<p>On small DataFrames, this cost is invisible. On wide, computation-heavy jobs (think feature engineering pipelines), it can add hundreds of milliseconds per task.</p>
<p>Each redundant expression increases:</p>
<ul>
<li><p>Catalyst’s internal expression resolution time</p>
</li>
<li><p>The size of generated Java code in WholeStageCodegen</p>
</li>
<li><p>CPU cycles per row, since Spark cannot share intermediate results between columns in the same node</p>
</li>
</ul>
<h4 id="heading-real-world-benchmark-aws-glue">Real-World Benchmark: AWS Glue</h4>
<p>We tested this pattern on AWS Glue (Spark 3.3) with 10 million rows and a simulated expensive computation on the similar dataset we used in Scenario 1.</p>
<pre><code class="lang-python">df = spark.createDataFrame(multiplied_data,
                           [<span class="hljs-string">"id"</span>, <span class="hljs-string">"firstname"</span>, <span class="hljs-string">"lastname"</span>, <span class="hljs-string">"department"</span>, <span class="hljs-string">"salary"</span>, <span class="hljs-string">"age"</span>, <span class="hljs-string">"hire_date"</span>, <span class="hljs-string">"country"</span>])

expr = sqrt(exp(log(col(<span class="hljs-string">"salary"</span>) + <span class="hljs-number">1</span>)))

start = time.time()

df_repeated = (
    df.withColumn(<span class="hljs-string">"metric_a"</span>, expr)
      .withColumn(<span class="hljs-string">"metric_b"</span>, expr * <span class="hljs-number">2</span>)
      .withColumn(<span class="hljs-string">"metric_c"</span>, expr / <span class="hljs-number">10</span>)
)

df_repeated.count()
time_repeated = round(time.time() - start, <span class="hljs-number">2</span>)

start = time.time()

df_reused = (
    df.withColumn(<span class="hljs-string">"metric"</span>, expr)
      .withColumn(<span class="hljs-string">"metric_a"</span>, col(<span class="hljs-string">"metric"</span>))
      .withColumn(<span class="hljs-string">"metric_b"</span>, col(<span class="hljs-string">"metric"</span>) * <span class="hljs-number">2</span>)
      .withColumn(<span class="hljs-string">"metric_c"</span>, col(<span class="hljs-string">"metric"</span>) / <span class="hljs-number">10</span>)
)

df_reused.count()

print(<span class="hljs-string">"Repeated expr time:"</span>, time_repeated, <span class="hljs-string">"seconds"</span>)
print(<span class="hljs-string">"Reused expr time:"</span>, round(time.time() - start, <span class="hljs-number">2</span>), <span class="hljs-string">"seconds"</span>)

spark.stop()
</code></pre>
<div class="hn-table">
<table>
<thead>
<tr>
<td><strong>Approach</strong></td><td><strong>Project Nodes</strong></td><td><strong>Execution Time (10M rows)</strong></td><td><strong>Expression Evaluations</strong></td></tr>
</thead>
<tbody>
<tr>
<td>Repeated expression</td><td>Multiple (nested)</td><td>~18 seconds</td><td>3x per row</td></tr>
<tr>
<td>Compute once, reuse</td><td>Single</td><td>~11 seconds</td><td>1x per row</td></tr>
</tbody>
</table>
</div><p>The performance gap widens further with genuinely expensive expressions (like regex extraction, JSON parsing, or UDFs).</p>
<h4 id="heading-physical-plan-implication">Physical Plan Implication</h4>
<p>In the physical plan, repeated expressions expand into multiple Java blocks within the same WholeStageCodegen node:</p>
<pre><code class="lang-python">*(<span class="hljs-number">1</span>) Project [sqrt(exp(log(salary + <span class="hljs-number">1</span>))) AS metric_a,

(sqrt(exp(log(salary + <span class="hljs-number">1</span>))) * <span class="hljs-number">2</span>) AS metric_b,

(sqrt(exp(log(salary + <span class="hljs-number">1</span>))) / <span class="hljs-number">10</span>) AS metric_c, ...]
</code></pre>
<p>Spark literally embeds three copies of the same logic.</p>
<p>Each is JIT-compiled separately, leading to:</p>
<ul>
<li><p>Larger generated Java classes</p>
</li>
<li><p>Higher CPU utilization</p>
</li>
<li><p>Longer code-generation time before tasks even start</p>
</li>
</ul>
<p>When reusing a column, Spark generates one expression and references it by name, dramatically shrinking the codegen footprint. If you have complex transformations (nested when, UDFs, regex extractions, and so on), compute them once and reuse them with col("alias"). For even heavier expressions that appear across multiple pipelines, consider persisting the intermediate.</p>
<p>DataFrame:</p>
<pre><code class="lang-python">df_features = df.withColumn(<span class="hljs-string">"complex_feature"</span>, complex_logic)

df_features.cache()
</code></pre>
<p>That cache can save multiple recomputations across downstream steps.</p>
<h3 id="heading-scenario-3-batch-column-ops">Scenario 3: Batch Column Ops</h3>
<p>Most PySpark pipelines don’t die because of one big, obvious mistake. They slow down from a thousand tiny cuts: one extra withColumn() here, another there, until the logical plan turns into a tall stack of projections.</p>
<p>On its own, withColumn() is fine. The problem is how we use it:</p>
<ul>
<li><p>10–30 chained calls in a row</p>
</li>
<li><p>Re-deriving similar expressions</p>
</li>
<li><p>Spreading logic across many tiny steps</p>
</li>
</ul>
<p>This scenario shows how batching column operations into a single select() produces a flatter, cleaner logical plan that scales better and is easier to reason about.</p>
<h4 id="heading-the-problem-chaining-withcolumn-forever">The Problem: Chaining withColumn() Forever</h4>
<pre><code class="lang-python"><span class="hljs-keyword">from</span> pyspark.sql.functions <span class="hljs-keyword">import</span> col, concat_ws, when, lit

df_transformed = (
    df.withColumn(<span class="hljs-string">"full_name"</span>, concat_ws(<span class="hljs-string">" "</span>, col(<span class="hljs-string">"firstname"</span>), col(<span class="hljs-string">"lastname"</span>)))
      .withColumn(<span class="hljs-string">"is_senior"</span>, when(col(<span class="hljs-string">"age"</span>) &gt;= <span class="hljs-number">35</span>, lit(<span class="hljs-number">1</span>)).otherwise(lit(<span class="hljs-number">0</span>)))
      .withColumn(<span class="hljs-string">"salary_k"</span>, col(<span class="hljs-string">"salary"</span>) / <span class="hljs-number">1000.0</span>)
      .withColumn(<span class="hljs-string">"experience_band"</span>,
                  when(col(<span class="hljs-string">"age"</span>) &lt; <span class="hljs-number">30</span>, <span class="hljs-string">"junior"</span>)
                  .when((col(<span class="hljs-string">"age"</span>) &gt;= <span class="hljs-number">30</span>) &amp; (col(<span class="hljs-string">"age"</span>) &lt; <span class="hljs-number">40</span>), <span class="hljs-string">"mid"</span>)
                  .otherwise(<span class="hljs-string">"senior"</span>))
      .withColumn(<span class="hljs-string">"country_upper"</span>, col(<span class="hljs-string">"country"</span>).upper())
)
</code></pre>
<p>It reads nicely, it runs, and everyone moves on. But under the hood, Spark builds this as multiple Project nodes, one per withColumn() call.</p>
<p><strong>Simplified Logical Plan (Chained): Conceptually</strong></p>
<pre><code class="lang-python">Project [..., country_upper]

└─ Project [..., experience_band]

   └─ Project [..., salary_k]

      └─ Project [..., is_senior]

         └─ Project [..., full_name]

            └─ LogicalRDD [id, firstname, lastname, department, salary, age, hire_date, country]
</code></pre>
<p>Each layer re-selects all existing columns, adds one more derived column, and deepens the plan.</p>
<h4 id="heading-the-better-approach-batch-with-select">The Better Approach: Batch with select()</h4>
<p>Instead of incrementally patching the schema, build it once.</p>
<pre><code class="lang-python">df_transformed = df.select(
    col(<span class="hljs-string">"id"</span>),
    col(<span class="hljs-string">"firstname"</span>),
    col(<span class="hljs-string">"lastname"</span>),
    col(<span class="hljs-string">"department"</span>),
    col(<span class="hljs-string">"salary"</span>),
    col(<span class="hljs-string">"age"</span>),
    col(<span class="hljs-string">"hire_date"</span>),
    col(<span class="hljs-string">"country"</span>),
    concat_ws(<span class="hljs-string">" "</span>, col(<span class="hljs-string">"firstname"</span>), col(<span class="hljs-string">"lastname"</span>)).alias(<span class="hljs-string">"full_name"</span>),
    when(col(<span class="hljs-string">"age"</span>) &gt;= <span class="hljs-number">35</span>, lit(<span class="hljs-number">1</span>)).otherwise(lit(<span class="hljs-number">0</span>)).alias(<span class="hljs-string">"is_senior"</span>),
    (col(<span class="hljs-string">"salary"</span>) / <span class="hljs-number">1000.0</span>).alias(<span class="hljs-string">"salary_k"</span>),
    when(col(<span class="hljs-string">"age"</span>) &lt; <span class="hljs-number">30</span>, <span class="hljs-string">"junior"</span>)
        .when((col(<span class="hljs-string">"age"</span>) &gt;= <span class="hljs-number">30</span>) &amp; (col(<span class="hljs-string">"age"</span>) &lt; <span class="hljs-number">40</span>), <span class="hljs-string">"mid"</span>)
        .otherwise(<span class="hljs-string">"senior"</span>).alias(<span class="hljs-string">"experience_band"</span>),
    col(<span class="hljs-string">"country"</span>).upper().alias(<span class="hljs-string">"country_upper"</span>)
)
</code></pre>
<p><strong>Simplified Logical Plan (Batched):</strong></p>
<pre><code class="lang-python">Project [id, firstname, lastname, department, salary, age, hire_date, country,

         full_name, is_senior, salary_k, experience_band, country_upper]

└─ LogicalRDD [id, firstname, lastname, department, salary, age, hire_date, country]
</code></pre>
<p>One Project. All derived columns <em>are</em> defined together. Flatter DAG. Cleaner plan.</p>
<h4 id="heading-under-the-hood-why-this-matters">Under the Hood: Why This Matters</h4>
<p>Each withColumn() is syntactic sugar for: “Take the previous plan, and create a new Project on top of it.” So 10 withColumn() calls = 10 projections wrapped on top of each other.</p>
<p>Catalyst can sometimes collapse adjacent Project nodes, but:</p>
<ul>
<li><p>Not always (especially when aliases shadow each other).</p>
</li>
<li><p>Not when expressions become complex or interdependent.</p>
</li>
<li><p>Not when UDFs or analysis barriers appear.</p>
</li>
</ul>
<p>Batching with select():</p>
<ul>
<li><p>Gives Catalyst a single, complete view of all expressions.</p>
</li>
<li><p>Enables more aggressive optimizations (constant folding, expression reuse, pruning).</p>
</li>
<li><p>Keeps expression trees shallower and codegen output smaller.</p>
</li>
</ul>
<p>Think of it as the difference between editing a sentence 10 times in a row and writing the final sentence once, cleanly.</p>
<h4 id="heading-real-world-example-using-the-employees-df-at-scale">Real-World Example: Using the Employees DF at Scale:</h4>
<p>Chained version (many withColumn()):</p>
<pre><code class="lang-python"><span class="hljs-keyword">from</span> pyspark.sql.functions <span class="hljs-keyword">import</span> col, concat_ws, when, lit, upper
<span class="hljs-keyword">import</span> time

start = time.time()
df_chain = (
    df.withColumn(<span class="hljs-string">"full_name"</span>, concat_ws(<span class="hljs-string">" "</span>, col(<span class="hljs-string">"firstname"</span>), col(<span class="hljs-string">"lastname"</span>)))
      .withColumn(<span class="hljs-string">"is_senior"</span>, when(col(<span class="hljs-string">"age"</span>) &gt;= <span class="hljs-number">35</span>, <span class="hljs-number">1</span>).otherwise(<span class="hljs-number">0</span>))
      .withColumn(<span class="hljs-string">"salary_k"</span>, col(<span class="hljs-string">"salary"</span>) / <span class="hljs-number">1000.0</span>)
      .withColumn(<span class="hljs-string">"high_earner"</span>, when(col(<span class="hljs-string">"salary"</span>) &gt;= <span class="hljs-number">90000</span>, <span class="hljs-number">1</span>).otherwise(<span class="hljs-number">0</span>))
      .withColumn(<span class="hljs-string">"experience_band"</span>,
                  when(col(<span class="hljs-string">"age"</span>) &lt; <span class="hljs-number">30</span>, <span class="hljs-string">"junior"</span>)
                  .when((col(<span class="hljs-string">"age"</span>) &gt;= <span class="hljs-number">30</span>) &amp; (col(<span class="hljs-string">"age"</span>) &lt; <span class="hljs-number">40</span>), <span class="hljs-string">"mid"</span>)
                  .otherwise(<span class="hljs-string">"senior"</span>))
      .withColumn(<span class="hljs-string">"country_upper"</span>, upper(col(<span class="hljs-string">"country"</span>)))
)

df_chain.count()
time_chain = round(time.time() - start, <span class="hljs-number">2</span>)
</code></pre>
<p>Batched version (single select()):</p>
<pre><code class="lang-python">start = time.time()
df_batch = df.select(
    <span class="hljs-string">"id"</span>, <span class="hljs-string">"firstname"</span>, <span class="hljs-string">"lastname"</span>, <span class="hljs-string">"department"</span>, <span class="hljs-string">"salary"</span>, <span class="hljs-string">"age"</span>, <span class="hljs-string">"hire_date"</span>, <span class="hljs-string">"country"</span>,
    concat_ws(<span class="hljs-string">" "</span>, col(<span class="hljs-string">"firstname"</span>), col(<span class="hljs-string">"lastname"</span>)).alias(<span class="hljs-string">"full_name"</span>),
    when(col(<span class="hljs-string">"age"</span>) &gt;= <span class="hljs-number">35</span>, <span class="hljs-number">1</span>).otherwise(<span class="hljs-number">0</span>).alias(<span class="hljs-string">"is_senior"</span>),
    (col(<span class="hljs-string">"salary"</span>) / <span class="hljs-number">1000.0</span>).alias(<span class="hljs-string">"salary_k"</span>),
    when(col(<span class="hljs-string">"salary"</span>) &gt;= <span class="hljs-number">90000</span>, <span class="hljs-number">1</span>).otherwise(<span class="hljs-number">0</span>).alias(<span class="hljs-string">"high_earner"</span>),
    when(col(<span class="hljs-string">"age"</span>) &lt; <span class="hljs-number">30</span>, <span class="hljs-string">"junior"</span>)
        .when((col(<span class="hljs-string">"age"</span>) &gt;= <span class="hljs-number">30</span>) &amp; (col(<span class="hljs-string">"age"</span>) &lt; <span class="hljs-number">40</span>), <span class="hljs-string">"mid"</span>)
        .otherwise(<span class="hljs-string">"senior"</span>).alias(<span class="hljs-string">"experience_band"</span>),
    upper(col(<span class="hljs-string">"country"</span>)).alias(<span class="hljs-string">"country_upper"</span>)
)

df_batch.count()
time_batch = round(time.time() - start, <span class="hljs-number">2</span>)
</code></pre>
<div class="hn-table">
<table>
<thead>
<tr>
<td><strong>Approach</strong></td><td><strong>Logical Shape</strong></td><td><strong>Glue Execution Time (1M rows)</strong></td><td><strong>Notes</strong></td></tr>
</thead>
<tbody>
<tr>
<td>Chained withColumn()</td><td>6 nested Projects</td><td>~14 seconds</td><td>Deep plan, more Catalyst work</td></tr>
<tr>
<td>Single select()</td><td>1 Project</td><td>~9 seconds</td><td>Flat planning, cleaner DAG</td></tr>
</tbody>
</table>
</div><p>The distinction is most evident when there are more derived columns, more complex expressions (UDFs, window functions), or when executing on managed runtimes such as AWS Glue.</p>
<p>In the chained cases, there are more Project nodes, code generation is fragmented, and expression evaluation is less amenable to global optimization.</p>
<p>In the batched cases, Spark generates a single Project node, more work is consolidated into a single WholeStageCodegen pipeline, code generation is reduced, the JVM is less stressed, and the plan is flatter and more amenable to optimization. This is not only cleaner, but it’s also faster, more reliable, and friendlier to Spark’s optimizer.</p>
<h3 id="heading-scenario-4-early-filter-vs-late-filter">Scenario 4: Early Filter vs Late Filter</h3>
<p>Many pipelines apply transformations first, adding columns, joining datasets, or calculating derived metrics, before filtering records. That order looks harmless in code but can double or triple the workload at execution.</p>
<h4 id="heading-problem-late-filtering">Problem: Late Filtering</h4>
<pre><code class="lang-python">df_late = (
    df.withColumn(<span class="hljs-string">"bonus"</span>, col(<span class="hljs-string">"salary"</span>) * <span class="hljs-number">0.1</span>)
      .withColumn(<span class="hljs-string">"salary_k"</span>, col(<span class="hljs-string">"salary"</span>) / <span class="hljs-number">1000</span>)
      .filter(col(<span class="hljs-string">"age"</span>) &gt; <span class="hljs-number">35</span>)
)
</code></pre>
<p>This means Spark first computes all columns for every employee, then discards most rows.</p>
<p><strong>Simplified Logical Plan:</strong></p>
<pre><code class="lang-python">Filter (age &gt; <span class="hljs-number">35</span>)

└─ Project [id, firstname, lastname, department, salary, age, hire_date, country,

            (salary * <span class="hljs-number">0.1</span>) AS bonus,

            (salary / <span class="hljs-number">1000</span>) AS salary_k]

   └─ LogicalRDD [...]
</code></pre>
<p>Catalyst can sometimes reorder this automatically, but when it can't (due to UDFs or complex logic), you're doing unnecessary work on data that's thrown away.</p>
<h4 id="heading-better-approach-early-filtering">Better Approach: Early Filtering</h4>
<pre><code class="lang-python">df_early = (
    df.filter(col(<span class="hljs-string">"age"</span>) &gt; <span class="hljs-number">35</span>)
      .withColumn(<span class="hljs-string">"bonus"</span>, col(<span class="hljs-string">"salary"</span>) * <span class="hljs-number">0.1</span>)
      .withColumn(<span class="hljs-string">"salary_k"</span>, col(<span class="hljs-string">"salary"</span>) / <span class="hljs-number">1000</span>)
)
</code></pre>
<p><strong>Simplified Logical Plan:</strong></p>
<pre><code class="lang-python">Project [id, firstname, lastname, department, salary, age, hire_date, country,

         (salary * <span class="hljs-number">0.1</span>) AS bonus,

         (salary / <span class="hljs-number">1000</span>) AS salary_k]

└─ Filter (age &gt; <span class="hljs-number">35</span>)

   └─ LogicalRDD [...]
</code></pre>
<p>Now Spark prunes the dataset first, then applies transformations. The result: smaller intermediate data, less codegen, shorter logical plan, shorter DAG, and smaller shuffle footprint.</p>
<h4 id="heading-real-world-benchmark-aws-glue-1">Real-World Benchmark: AWS Glue</h4>
<p>Late Filtering:</p>
<pre><code class="lang-python">df = spark.createDataFrame(
    multiplied_data,
    [<span class="hljs-string">"id"</span>, <span class="hljs-string">"firstname"</span>, <span class="hljs-string">"lastname"</span>, <span class="hljs-string">"department"</span>, <span class="hljs-string">"salary"</span>, <span class="hljs-string">"age"</span>, <span class="hljs-string">"hire_date"</span>, <span class="hljs-string">"country"</span>]
)

start_late = time.time()

df_late = (
    df.withColumn(<span class="hljs-string">"bonus"</span>, col(<span class="hljs-string">"salary"</span>) * <span class="hljs-number">0.1</span>)
      .withColumn(<span class="hljs-string">"salary_k"</span>, col(<span class="hljs-string">"salary"</span>) / <span class="hljs-number">1000</span>)
      .filter(col(<span class="hljs-string">"age"</span>) &gt; <span class="hljs-number">35</span>)   
)

df_late.count()
time_late = round(time.time() - start_late, <span class="hljs-number">2</span>)
</code></pre>
<p>Early Filtering:</p>
<pre><code class="lang-python">start_early = time.time()

df_early = (
    df.filter(col(<span class="hljs-string">"age"</span>) &gt; <span class="hljs-number">35</span>)    
      .withColumn(<span class="hljs-string">"bonus"</span>, col(<span class="hljs-string">"salary"</span>) * <span class="hljs-number">0.1</span>)
      .withColumn(<span class="hljs-string">"salary_k"</span>, col(<span class="hljs-string">"salary"</span>) / <span class="hljs-number">1000</span>)
)

df_early.count()
time_early = round(time.time() - start_early, <span class="hljs-number">2</span>)

print(<span class="hljs-string">"Late Filter Time:"</span>, time_late, <span class="hljs-string">"seconds"</span>)
print(<span class="hljs-string">"Early Filter Time:"</span>, time_early, <span class="hljs-string">"seconds"</span>)

spark.stop()
</code></pre>
<div class="hn-table">
<table>
<thead>
<tr>
<td><strong>Approach</strong></td><td><strong>Rows Processed Before Filter</strong></td><td><strong>Execution Time (approx)</strong></td><td><strong>Notes</strong></td></tr>
</thead>
<tbody>
<tr>
<td>Late filter</td><td>1,000,000 (all rows)</td><td>~14 seconds</td><td>Computes bonus and salary_k for all rows, then filters</td></tr>
<tr>
<td>Early filter</td><td>300,000 (filtered subset)</td><td>~9 seconds</td><td>Filters first, computes only for age &gt; 35</td></tr>
</tbody>
</table>
</div><p>The early filter approach processes significantly less data before the projection, leading to faster execution and less memory pressure.</p>
<p>Always filter as early as possible, before joins, aggregations, expensive transformations (such as UDFs or window functions), and even during file reads via Parquet/ORC pushdown, since filtering at the source touches fewer partitions and leads to faster jobs.</p>
<h3 id="heading-scenario-5-column-pruning">Scenario 5: Column Pruning</h3>
<p>When working with Spark DataFrames, convenience often wins over correctness and nothing feels more convenient than select("*"). It’s quick, flexible, and perfect for exploration.</p>
<p>But in production pipelines, that little star silently costs CPU, memory, network bandwidth, and runtime efficiency. Every time you write select("*"), Spark expands it into <em>every</em> column from your schema, even if you’re using just one or two later.</p>
<p>Those extra attributes flow through every stage of the plan, from filters and joins to aggregations and shuffles. The result: inflated logical plans, bigger shuffle files, and slower queries.</p>
<h4 id="heading-the-problem-the-lazy-star">The Problem: “The Lazy Star”</h4>
<pre><code class="lang-python">df_star = (
    df.select(<span class="hljs-string">"*"</span>)
      .filter(col(<span class="hljs-string">"department"</span>) == <span class="hljs-string">"Engineering"</span>)
      .groupBy(<span class="hljs-string">"country"</span>)
      .agg(avg(<span class="hljs-string">"salary"</span>).alias(<span class="hljs-string">"avg_salary"</span>))
)
</code></pre>
<p>At first glance, this seems harmless. But the problem is: only two columns (country and salary) are needed for the aggregation, but Spark carries all eight (id, firstname, lastname, department, salary, age, hire_date, country) through every transformation.</p>
<p><strong>Simplified Logical Plan:</strong></p>
<pre><code class="lang-python">Aggregate [country], [avg(salary) AS avg_salary]

└─ Filter (department = Engineering)

   └─ Project [id, firstname, lastname, department, salary, age, hire_date, country]

      └─ LogicalRDD [id, firstname, lastname, department, salary, age, hire_date, country]
</code></pre>
<p>Every node in this tree carries all columns. Catalyst can’t prune them because you explicitly asked for "*". The excess attributes are serialized, shuffled, and deserialized across the cluster, even though they serve no purpose in the final result.</p>
<h4 id="heading-the-fix-select-only-what-you-need">The Fix: Select Only What You Need</h4>
<p>Be deliberate with your projections. Select the minimal schema required for the task.</p>
<pre><code class="lang-python">df_pruned = (
    df.select(<span class="hljs-string">"department"</span>, <span class="hljs-string">"salary"</span>, <span class="hljs-string">"country"</span>)
      .filter(col(<span class="hljs-string">"department"</span>) == <span class="hljs-string">"Engineering"</span>)
      .groupBy(<span class="hljs-string">"country"</span>)
      .agg(avg(<span class="hljs-string">"salary"</span>).alias(<span class="hljs-string">"avg_salary"</span>))
)
</code></pre>
<p><strong>Simplified Logical Plan:</strong></p>
<pre><code class="lang-python">Aggregate [country], [avg(salary) AS avg_salary]

└─ Filter (department = Engineering)

   └─ Project [department, salary, country]

      └─ LogicalRDD [id, firstname, lastname, department, salary, age, hire_date, country]
</code></pre>
<p>Now Spark reads and processes only the three required columns: department, salary, and country. The plan is narrower, the DAG simpler, and execution faster.</p>
<h4 id="heading-real-world-benchmark-aws-glue-2">Real-World Benchmark: AWS Glue</h4>
<p>Wide Projection:</p>
<pre><code class="lang-python">df = spark.createDataFrame(multiplied_data,
                           [<span class="hljs-string">"id"</span>, <span class="hljs-string">"firstname"</span>, <span class="hljs-string">"lastname"</span>, <span class="hljs-string">"department"</span>, <span class="hljs-string">"salary"</span>, <span class="hljs-string">"age"</span>, <span class="hljs-string">"hire_date"</span>, <span class="hljs-string">"country"</span>])

start = time.time()
df_star = (
    df.select(<span class="hljs-string">"*"</span>)
      .filter(col(<span class="hljs-string">"department"</span>) == <span class="hljs-string">"Engineering"</span>)
      .groupBy(<span class="hljs-string">"country"</span>)
      .agg(avg(<span class="hljs-string">"salary"</span>).alias(<span class="hljs-string">"avg_salary"</span>))
)

df_star.count()
time_star = round(time.time() - start, <span class="hljs-number">2</span>)
</code></pre>
<p>Pruned Projection:</p>
<pre><code class="lang-python">start = time.time()

df_pruned = (
    df.select(<span class="hljs-string">"department"</span>, <span class="hljs-string">"salary"</span>, <span class="hljs-string">"country"</span>)
      .filter(col(<span class="hljs-string">"department"</span>) == <span class="hljs-string">"Engineering"</span>)
      .groupBy(<span class="hljs-string">"country"</span>)
      .agg(avg(<span class="hljs-string">"salary"</span>).alias(<span class="hljs-string">"avg_salary"</span>))
)

df_pruned.count()
time_pruned = round(time.time() - start, <span class="hljs-number">2</span>)

print(<span class="hljs-string">f"select('*') time: <span class="hljs-subst">{time_star}</span>s"</span>)
print(<span class="hljs-string">f"pruned columns time: <span class="hljs-subst">{time_pruned}</span>s"</span>)

spark.stop()
</code></pre>
<div class="hn-table">
<table>
<thead>
<tr>
<td><strong>Approach</strong></td><td><strong>Columns Processed</strong></td><td><strong>Execution Time (1M rows)</strong></td><td><strong>Observation</strong></td></tr>
</thead>
<tbody>
<tr>
<td>select("*")</td><td>8</td><td>~26.54 s</td><td>Spark carries all columns through the plan.</td></tr>
<tr>
<td>Pruned projection</td><td>3</td><td>~2.21 s</td><td>Only needed columns processed → faster and lighter.</td></tr>
</tbody>
</table>
</div><h4 id="heading-under-the-hood-how-catalyst-handles-columns">Under the Hood: How Catalyst Handles Columns</h4>
<p>When you call select("*"), Catalyst resolves <em>every attribute</em> into the logical plan. Each subsequent transformation inherits that full attribute list, increasing plan depth and overhead.</p>
<p>Catalyst includes a rule called ColumnPruning, which removes unused attributes but it only works when Spark <em>can see</em> which columns are necessary. If you use "*" or dynamically reference df.columns, Catalyst loses visibility.</p>
<p><strong>Works:</strong></p>
<pre><code class="lang-python">df \
    .select(<span class="hljs-string">"salary"</span>, <span class="hljs-string">"country"</span>) \
    .groupBy(<span class="hljs-string">"country"</span>) \
    .agg(avg(<span class="hljs-string">"salary"</span>))
</code></pre>
<p><strong>Doesn’t Work:</strong></p>
<pre><code class="lang-python">cols = df.columns

df.select(cols) \
  .groupBy(<span class="hljs-string">"country"</span>) \
  .agg(avg(<span class="hljs-string">"salary"</span>))
</code></pre>
<p>In the second case, Catalyst can’t prune anything because cols might include everything.</p>
<h4 id="heading-physical-plan-differences">Physical Plan Differences</h4>
<pre><code class="lang-python">Wide Projection (select(<span class="hljs-string">"*"</span>)):

*(<span class="hljs-number">1</span>) HashAggregate(keys=[country], functions=[avg(salary)])

+- *(<span class="hljs-number">1</span>) Project [id, firstname, lastname, department, salary, age, hire_date, country]

   +- *(<span class="hljs-number">1</span>) Filter (department = Engineering)

      +- *(<span class="hljs-number">1</span>) Scan parquet ...
</code></pre>
<p>Pruned Projection:</p>
<pre><code class="lang-python">*(<span class="hljs-number">1</span>) HashAggregate(keys=[country], functions=[avg(salary)])

+- *(<span class="hljs-number">1</span>) Project [department, salary, country]

   +- *(<span class="hljs-number">1</span>) Filter (department = Engineering)

      +- *(<span class="hljs-number">1</span>) Scan parquet [department, salary, country]
</code></pre>
<p>Notice the last line: Spark physically scans only the three referenced columns from Parquet. That’s genuine I/O reduction, not just logical simplification. Using select(*) increases shuffle file sizes, memory usage during serialization, Catalyst planning time, and I/O and network traffic, and the solution requires no more than specifying the necessary columns.</p>
<p>But in managed environments like AWS Glue or Databricks, this simple practice can greatly reduce ETL time, particularly for Parquet or Delta files, due to effective column pruning during explicit projection. It’s one of the easiest and highest-impact Spark optimization techniques, starting with typing fewer asterisks.</p>
<h3 id="heading-scenario-6-filter-pushdown-vs-full-scan">Scenario 6: Filter Pushdown vs Full Scan</h3>
<p>When a Spark job feels slow right from the start, even before joins or aggregations, the culprit is often hidden at the data-read layer. Spark spends seconds (or minutes) scanning every record, even though most rows are useless for the query.</p>
<p>That’s where filter pushdown comes in. It tells Spark to <em>push your filter logic down to the file reader</em> so that Parquet / ORC / Delta formats return only the relevant rows from disk. Done right, this optimization can reduce scan size significantly. Done wrong, Spark performs a full scan, reading everything before filtering in memory.</p>
<h4 id="heading-the-problem-late-filters-and-full-scans">The Problem: Late Filters and Full Scans</h4>
<pre><code class="lang-python">employees_df = spark.read.parquet(<span class="hljs-string">"s3://data/employee_data/"</span>)

df_full = (
    employees_df
        .select(<span class="hljs-string">"*"</span>)  <span class="hljs-comment"># reads all columns</span>
        .filter(col(<span class="hljs-string">"country"</span>) == <span class="hljs-string">"Canada"</span>)
)
</code></pre>
<p>Looks fine, right? But Spark can’t push this filter to the Parquet reader because it’s applied <em>after</em> the select("*") projection step. Catalyst sees the filter as operating on a projected DataFrame, not the raw scan, so the pushdown boundary is lost.</p>
<p><strong>Simplified Logical Plan:</strong></p>
<pre><code class="lang-python">Filter (country = Canada)

└─ Project [id, firstname, lastname, department, salary, age, hire_date, country]

   └─ Scan parquet employee_data [id, firstname, lastname, department, salary, age, hire_date, country]
</code></pre>
<p>Every record from every Parquet file is read into memory before the filter executes. In large tables, this means scanning terabytes when you only need megabytes.</p>
<h4 id="heading-the-fix-filter-early-and-project-light">The Fix: Filter Early and Project Light</h4>
<p>Move filters as close as possible to the data source and limit columns before Spark reads them:</p>
<pre><code class="lang-python">df_pushdown = (
    spark.read.parquet(<span class="hljs-string">"s3://data/employee_data/"</span>)
        .select(<span class="hljs-string">"id"</span>, <span class="hljs-string">"firstname"</span>, <span class="hljs-string">"department"</span>, <span class="hljs-string">"salary"</span>, <span class="hljs-string">"country"</span>)
        .filter(col(<span class="hljs-string">"country"</span>) == <span class="hljs-string">"Canada"</span>)
)
</code></pre>
<p><strong>Simplified Logical Plan:</strong></p>
<pre><code class="lang-python">Project [id, firstname, department, salary, country]

└─ Scan parquet employee_data [id, firstname, department, salary, country]
</code></pre>
<p>PushedFilters: [country = Canada]</p>
<p>Notice the difference: PushedFilters appears in the plan. That means the Parquet reader handles the predicate, returning only matching blocks and rows.</p>
<h4 id="heading-under-the-hood-what-actually-happens">Under the Hood: What Actually Happens</h4>
<p>When Spark performs filter pushdown, it leverages the Parquet metadata (min/max statistics and row-group indexes) stored in file footers.</p>
<ul>
<li><p>Spark inspects file-level metadata for the predicate column (country).</p>
</li>
<li><p>It skips any row group whose values don’t match (country ≠ Canada).</p>
</li>
<li><p>It reads only the necessary row groups and columns from disk.</p>
</li>
<li><p>Those records enter the DAG directly – no in-memory filtering required.</p>
</li>
</ul>
<p>This optimization happens entirely before Spark begins executing stages, reducing both I/O and network transfer.</p>
<h4 id="heading-real-world-benchmark-aws-glue-3">Real-World Benchmark: AWS Glue</h4>
<pre><code class="lang-python"><span class="hljs-keyword">import</span> time
<span class="hljs-keyword">from</span> pyspark.sql <span class="hljs-keyword">import</span> SparkSession
<span class="hljs-keyword">from</span> pyspark.sql.functions <span class="hljs-keyword">import</span> col

spark = SparkSession.builder.appName(<span class="hljs-string">"FilterPushdownBenchmark"</span>).getOrCreate()

start = time.time()
df_full = (
    spark.read.parquet(<span class="hljs-string">"s3://data/employee_data/"</span>)
        .select(<span class="hljs-string">"*"</span>)                         <span class="hljs-comment"># all columns</span>
        .filter(col(<span class="hljs-string">"country"</span>) == <span class="hljs-string">"Canada"</span>)  
)
df_full.count()
time_full = round(time.time() - start, <span class="hljs-number">2</span>)

start = time.time()
df_pushdown = (
    spark.read.parquet(<span class="hljs-string">"s3://data/employee_data/"</span>)
        .select(<span class="hljs-string">"id"</span>, <span class="hljs-string">"firstname"</span>, <span class="hljs-string">"department"</span>, <span class="hljs-string">"salary"</span>, <span class="hljs-string">"country"</span>)
        .filter(col(<span class="hljs-string">"country"</span>) == <span class="hljs-string">"Canada"</span>)  
)
df_pushdown.count()
time_push = round(time.time() - start, <span class="hljs-number">2</span>)

print(<span class="hljs-string">"Full Scan Time:"</span>, time_full, <span class="hljs-string">"sec"</span>)
print(<span class="hljs-string">"Filter Pushdown Time:"</span>, time_push, <span class="hljs-string">"sec"</span>)

spark.stop()
</code></pre>
<div class="hn-table">
<table>
<thead>
<tr>
<td><strong>Approach</strong></td><td><strong>Execution Time (1 M rows)</strong></td><td><strong>Observation</strong></td></tr>
</thead>
<tbody>
<tr>
<td>Full Scan</td><td>14.2 s</td><td>All files scanned and filtered in memory.</td></tr>
<tr>
<td>Filter Pushdown</td><td>3.8 s</td><td>Only relevant row groups and columns read.</td></tr>
</tbody>
</table>
</div><p><strong>Physical Plan Comparison</strong></p>
<p>Full Scan:</p>
<pre><code class="lang-python">*(<span class="hljs-number">1</span>) Filter (country = Canada)

+- *(<span class="hljs-number">1</span>) ColumnarToRow

   +- *(<span class="hljs-number">1</span>) FileScan parquet [id, firstname, lastname, department, salary, age, hire_date, country]

      Batched: true, DataFilters: [], PushedFilters: []
</code></pre>
<p>Pushdown:</p>
<pre><code class="lang-python">*(<span class="hljs-number">1</span>) ColumnarToRow

+- *(<span class="hljs-number">1</span>) FileScan parquet [id, firstname, department, salary, country]

   Batched: true, DataFilters: [isnotnull(country)], PushedFilters: [country = Canada]
</code></pre>
<p>The difference is clear: PushedFilters confirms that Spark applied predicate pushdown, skipping unnecessary row groups at the scan stage.</p>
<h4 id="heading-reflection-why-pushdown-matters">Reflection: Why Pushdown Matters</h4>
<p>Pushdown isn’t a micro-optimization. It’s actually often the single biggest performance lever in Spark ETL. In data lakes with hundreds of files, full scans waste hours and inflate AWS S3 I/O costs. By filtering and projecting early, Spark prunes both rows and columns before execution even begins.</p>
<p>Apply filters as early as possible in the read pipeline, combine filter pushdown with column pruning, verify PushedFilters in explain("formatted"), avoid UDFs and select("*") at read time, and let pushdown turn “read everything and discard most” into “read only what you need.”</p>
<h3 id="heading-scenario-7-de-duplicate-right">Scenario 7: De-duplicate Right</h3>
<h4 id="heading-the-problem-all-row-deduplication-and-why-it-hurts">The Problem: “All-Row Deduplication” and Why It Hurts</h4>
<p>When we use this:</p>
<pre><code class="lang-python">df.dropDuplicates()
</code></pre>
<p>Spark removes identical rows across all columns. It sounds simple, but this operation forces Spark to treat every column as part of the deduplication key.</p>
<p>Internally, it means:</p>
<ul>
<li><p>Every attribute is serialized and hashed.</p>
</li>
<li><p>Every unique combination of all columns is shuffled across the cluster to ensure global uniqueness.</p>
</li>
<li><p>Even small changes in a non-essential field (like hire_date) cause new keys and destroy aggregation locality.</p>
</li>
</ul>
<p>In wide tables, this is one of the heaviest shuffle operations Spark can perform: df.dropDuplicates()</p>
<p><strong>Simplified Logical Plan:</strong></p>
<pre><code class="lang-python">Aggregate [id, firstname, lastname, department, salary, age, hire_date, country], [first(id) AS id, ...]

└─ Exchange hashpartitioning(id, firstname, lastname, department, salary, age, hire_date, country, <span class="hljs-number">200</span>)

   └─ LogicalRDD [id, firstname, lastname, department, salary, age, hire_date, country]
</code></pre>
<p>Notice the Exchange: that’s a full shuffle across all columns. Spark must send every record to the partition responsible for its unique combination of all fields. This is slow, memory-intensive, and scales poorly as columns grow.</p>
<h4 id="heading-the-better-approach-key-based-deduplication">The Better Approach: Key-Based Deduplication</h4>
<p>In most real datasets, duplicates are determined by a primary or business key, not all attributes. For example, if id uniquely identifies an employee, we only need to keep one record per id.</p>
<pre><code class="lang-python">df.dropDuplicates([<span class="hljs-string">"id"</span>])
</code></pre>
<p>Now Spark deduplicates based only on the id column.</p>
<pre><code class="lang-python">Aggregate [id], [first(id) AS id, first(firstname) AS firstname, ...]

└─ Exchange hashpartitioning(id, <span class="hljs-number">200</span>)

   └─ LogicalRDD [id, firstname, lastname, department, salary, age, hire_date, country]
</code></pre>
<p>The shuffle is dramatically narrower. Instead of hashing across all columns, Spark redistributes data only by id. Fewer bytes, smaller shuffle files, faster reduce stage</p>
<h4 id="heading-real-world-benchmark-aws-glue-4">Real-World Benchmark: AWS Glue</h4>
<pre><code class="lang-python"><span class="hljs-keyword">import</span> time
<span class="hljs-keyword">from</span> pyspark.sql.functions <span class="hljs-keyword">import</span> exp, log, sqrt, col, concat_ws, when, upper, avg
<span class="hljs-keyword">from</span> pyspark.sql <span class="hljs-keyword">import</span> SparkSession

spark = SparkSession.builder.appName(<span class="hljs-string">"MillionRowsRenameTest"</span>).getOrCreate()

employees_data = [
    (<span class="hljs-number">1</span>, <span class="hljs-string">"John"</span>, <span class="hljs-string">"Doe"</span>, <span class="hljs-string">"Engineering"</span>, <span class="hljs-number">80000</span>, <span class="hljs-number">28</span>, <span class="hljs-string">"2020-01-15"</span>, <span class="hljs-string">"USA"</span>),
    (<span class="hljs-number">2</span>, <span class="hljs-string">"Jane"</span>, <span class="hljs-string">"Smith"</span>, <span class="hljs-string">"Engineering"</span>, <span class="hljs-number">85000</span>, <span class="hljs-number">32</span>, <span class="hljs-string">"2019-03-20"</span>, <span class="hljs-string">"USA"</span>),
    (<span class="hljs-number">3</span>, <span class="hljs-string">"Alice"</span>, <span class="hljs-string">"Johnson"</span>, <span class="hljs-string">"Sales"</span>, <span class="hljs-number">60000</span>, <span class="hljs-number">25</span>, <span class="hljs-string">"2021-06-10"</span>, <span class="hljs-string">"UK"</span>),
    (<span class="hljs-number">4</span>, <span class="hljs-string">"Bob"</span>, <span class="hljs-string">"Brown"</span>, <span class="hljs-string">"Engineering"</span>, <span class="hljs-number">90000</span>, <span class="hljs-number">35</span>, <span class="hljs-string">"2018-07-01"</span>, <span class="hljs-string">"USA"</span>),
    (<span class="hljs-number">5</span>, <span class="hljs-string">"Charlie"</span>, <span class="hljs-string">"Wilson"</span>, <span class="hljs-string">"Sales"</span>, <span class="hljs-number">65000</span>, <span class="hljs-number">29</span>, <span class="hljs-string">"2020-11-05"</span>, <span class="hljs-string">"UK"</span>),
    (<span class="hljs-number">6</span>, <span class="hljs-string">"David"</span>, <span class="hljs-string">"Lee"</span>, <span class="hljs-string">"HR"</span>, <span class="hljs-number">55000</span>, <span class="hljs-number">27</span>, <span class="hljs-string">"2021-01-20"</span>, <span class="hljs-string">"USA"</span>),
    (<span class="hljs-number">7</span>, <span class="hljs-string">"Eve"</span>, <span class="hljs-string">"Davis"</span>, <span class="hljs-string">"Engineering"</span>, <span class="hljs-number">95000</span>, <span class="hljs-number">40</span>, <span class="hljs-string">"2017-04-12"</span>, <span class="hljs-string">"Canada"</span>),
    (<span class="hljs-number">8</span>, <span class="hljs-string">"Frank"</span>, <span class="hljs-string">"Miller"</span>, <span class="hljs-string">"Sales"</span>, <span class="hljs-number">70000</span>, <span class="hljs-number">33</span>, <span class="hljs-string">"2019-09-25"</span>, <span class="hljs-string">"UK"</span>),
    (<span class="hljs-number">9</span>, <span class="hljs-string">"Grace"</span>, <span class="hljs-string">"Taylor"</span>, <span class="hljs-string">"HR"</span>, <span class="hljs-number">58000</span>, <span class="hljs-number">26</span>, <span class="hljs-string">"2021-08-15"</span>, <span class="hljs-string">"Canada"</span>),
    (<span class="hljs-number">10</span>, <span class="hljs-string">"Henry"</span>, <span class="hljs-string">"Anderson"</span>, <span class="hljs-string">"Engineering"</span>, <span class="hljs-number">88000</span>, <span class="hljs-number">31</span>, <span class="hljs-string">"2020-02-28"</span>, <span class="hljs-string">"USA"</span>)
]

multiplied_data = [(i, <span class="hljs-string">f"firstname_<span class="hljs-subst">{i}</span>"</span>, <span class="hljs-string">f"lastname_<span class="hljs-subst">{i}</span>"</span>,
                    employees_data[i % <span class="hljs-number">10</span>][<span class="hljs-number">3</span>],   <span class="hljs-comment"># department</span>
                    employees_data[i % <span class="hljs-number">10</span>][<span class="hljs-number">4</span>],   <span class="hljs-comment"># salary</span>
                    employees_data[i % <span class="hljs-number">10</span>][<span class="hljs-number">5</span>],   <span class="hljs-comment"># age</span>
                    employees_data[i % <span class="hljs-number">10</span>][<span class="hljs-number">6</span>],   <span class="hljs-comment"># hire_date</span>
                    employees_data[i % <span class="hljs-number">10</span>][<span class="hljs-number">7</span>]    <span class="hljs-comment"># country</span>
                    )
                   <span class="hljs-keyword">for</span> i <span class="hljs-keyword">in</span> range(<span class="hljs-number">1</span>, <span class="hljs-number">1</span>_000_001)]

df = spark.createDataFrame(
    multiplied_data,
    [<span class="hljs-string">"id"</span>, <span class="hljs-string">"firstname"</span>, <span class="hljs-string">"lastname"</span>, <span class="hljs-string">"department"</span>, <span class="hljs-string">"salary"</span>, <span class="hljs-string">"age"</span>, <span class="hljs-string">"hire_date"</span>, <span class="hljs-string">"country"</span>]
)

start = time.time()
dedup_full = df.dropDuplicates()
dedup_full.count()
time_full = round(time.time() - start, <span class="hljs-number">2</span>)

start = time.time()
dedup_key = df.dropDuplicates([<span class="hljs-string">"id"</span>])
dedup_key.count()
time_key = round(time.time() - start, <span class="hljs-number">2</span>)

print(<span class="hljs-string">f"Full-row dedup time: <span class="hljs-subst">{time_full}</span>s"</span>)
print(<span class="hljs-string">f"Key-based dedup time: <span class="hljs-subst">{time_key}</span>s"</span>)

spark.stop()
</code></pre>
<div class="hn-table">
<table>
<thead>
<tr>
<td><strong>Approach</strong></td><td><strong>Execution Time (1M rows)</strong></td><td><strong>Observation</strong></td></tr>
</thead>
<tbody>
<tr>
<td>Full-Row Dedup</td><td>27.6 s</td><td>Shuffle across all attributes, large hash table</td></tr>
<tr>
<td>Key-Based Dedup (["id"])</td><td>2.06 s</td><td>10× faster, minimal shuffle width</td></tr>
</tbody>
</table>
</div><h4 id="heading-under-the-hood-what-catalyst-does">Under the Hood: What Catalyst Does</h4>
<p>When you specify a key list, Catalyst rewrites dropDuplicates(keys) into a partial + final aggregate plan, just like a groupBy:</p>
<p>HashAggregate(keys=[id], functions=[first(...)])</p>
<p>This allows Spark to:</p>
<ul>
<li><p>Perform map-side partial aggregation on each partition (before shuffle).</p>
</li>
<li><p>Exchange only the grouping key (id).</p>
</li>
<li><p>Perform a final aggregation on the reduced data.</p>
</li>
</ul>
<p>The all-column version can’t do that optimization because every column participates in uniqueness Spark must ensure <em>complete</em> data redistribution.</p>
<h4 id="heading-best-practices-for-deduplication">Best Practices for Deduplication</h4>
<div class="hn-table">
<table>
<thead>
<tr>
<td><strong>Practice</strong></td><td><strong>Why It Matters</strong></td></tr>
</thead>
<tbody>
<tr>
<td>Always deduplicate by key columns</td><td>Reduces shuffle width and data movement</td></tr>
<tr>
<td>Use deterministic keys (id, email, ssn)</td><td>Ensures predictable grouping</td></tr>
<tr>
<td>Avoid dropDuplicates() without arguments</td><td>Forces global shuffle across all attributes</td></tr>
<tr>
<td>Combine with column pruning</td><td>Keep only necessary fields before deduplication</td></tr>
<tr>
<td>For “latest record” logic, use window functions</td><td>Allows targeted deduplication (row_number() with order)</td></tr>
<tr>
<td>Cache intermediate datasets if reused</td><td>Avoids recomputation of expensive dedup stages</td></tr>
</tbody>
</table>
</div><h4 id="heading-combining-deduplication-amp-aggregation">Combining Deduplication &amp; Aggregation</h4>
<p>You can merge deduplication with aggregation for even better results:</p>
<pre><code class="lang-python">df_dedup_agg = (
    df.dropDuplicates([<span class="hljs-string">"id"</span>])
        .groupBy(<span class="hljs-string">"department"</span>)
        .agg(avg(<span class="hljs-string">"salary"</span>).alias(<span class="hljs-string">"avg_salary"</span>))
)
</code></pre>
<p>Spark now reuses the same shuffle partitioning for both operations, one shuffle instead of two. The plan will show:</p>
<pre><code class="lang-python">HashAggregate(keys=[department], functions=[avg(salary)])

└─ HashAggregate(keys=[id], functions=[first(...), first(department)])

   └─ Exchange hashpartitioning(id, <span class="hljs-number">200</span>)
</code></pre>
<p>Prefer dropDuplicates(["key_col"]) over dropDuplicates() to deduplicate by business or surrogate keys rather than the entire schema. Combine deduplication with projection to reduce I/O, and remember that one narrow shuffle is always better than a wide shuffle. Deduplication isn’t just cleanup – it’s an optimization strategy. Choose your keys wisely, and Spark will reward you with faster jobs and lighter DAGs.</p>
<h3 id="heading-scenario-8-count-smarter">Scenario 8: Count Smarter</h3>
<p>In production, one of the most common performance pitfalls is the simplest line of code:</p>
<pre><code class="lang-python"><span class="hljs-keyword">if</span> df.count() &gt; <span class="hljs-number">0</span>:
</code></pre>
<p>At first glance, this seems harmless. You just want to know whether the DataFrame has any data before writing, joining, or aggregating. But in Spark, count() is not metadata lookup, it’s a full cluster-wide job.</p>
<p><strong>What Really Happens with count()</strong><br>When you call df.count(), Spark executes a complete action:</p>
<ul>
<li><p>It scans every partition.</p>
</li>
<li><p>Deserializes every row.</p>
</li>
<li><p>Counts records locally on each executor.</p>
</li>
<li><p>Reduces the counts to the driver.</p>
</li>
</ul>
<p>That means your “empty check” runs a full distributed computation, even when the dataset has billions of rows or lives in S3.</p>
<pre><code class="lang-python">df.count()
</code></pre>
<p><strong>Simplified Logical Plan:</strong></p>
<pre><code class="lang-python">*(<span class="hljs-number">1</span>) HashAggregate(keys=[], functions=[count(<span class="hljs-number">1</span>)])

+- *(<span class="hljs-number">1</span>) ColumnarToRow

   +- *(<span class="hljs-number">1</span>) FileScan parquet [id, firstname, lastname, department, salary, age, hire_date, country]
</code></pre>
<p>Every record is read, aggregated, and returned just to produce a single integer.</p>
<p>Now imagine this runs in the middle of your Glue job, before a write, before a filter, or inside a loop. You’ve just added a full-table scan to your DAG for no reason.</p>
<h4 id="heading-the-smarter-way-limit1-or-head1">The Smarter Way: limit(1) or head(1)</h4>
<p>If all you need to know is whether data exists, you don’t need to count every record. You just need to know if there’s <em>at least one</em>.</p>
<p>Two efficient alternatives</p>
<pre><code class="lang-python">df.head(<span class="hljs-number">1</span>)
<span class="hljs-comment">#or</span>
df.limit(<span class="hljs-number">1</span>).collect()
</code></pre>
<p>Both execute a lazy scan that stops as soon as one record is found.</p>
<p><strong>Simplified Logical Plan:</strong></p>
<pre><code class="lang-python">TakeOrderedAndProject(limit=<span class="hljs-number">1</span>)

└─ *(<span class="hljs-number">1</span>) FileScan parquet [id, firstname, lastname, department, salary, age, hire_date, country]
</code></pre>
<ul>
<li><p>No global aggregation.</p>
</li>
<li><p>No shuffle.</p>
</li>
<li><p>No full scan.</p>
</li>
</ul>
<h4 id="heading-real-world-benchmark-aws-glue-5">Real-World Benchmark: AWS Glue</h4>
<pre><code class="lang-python"><span class="hljs-keyword">import</span> time
<span class="hljs-keyword">from</span> pyspark.sql.functions <span class="hljs-keyword">import</span> exp, log, sqrt, col, concat_ws, when, upper, avg
<span class="hljs-keyword">from</span> pyspark.sql <span class="hljs-keyword">import</span> SparkSession

<span class="hljs-comment"># Initialize Spark session</span>
spark = SparkSession.builder.appName(<span class="hljs-string">"MillionRowsRenameTest"</span>).getOrCreate()

<span class="hljs-comment"># Base dataset (10 sample employees)</span>
employees_data = [
    (<span class="hljs-number">1</span>, <span class="hljs-string">"John"</span>, <span class="hljs-string">"Doe"</span>, <span class="hljs-string">"Engineering"</span>, <span class="hljs-number">80000</span>, <span class="hljs-number">28</span>, <span class="hljs-string">"2020-01-15"</span>, <span class="hljs-string">"USA"</span>),
    (<span class="hljs-number">2</span>, <span class="hljs-string">"Jane"</span>, <span class="hljs-string">"Smith"</span>, <span class="hljs-string">"Engineering"</span>, <span class="hljs-number">85000</span>, <span class="hljs-number">32</span>, <span class="hljs-string">"2019-03-20"</span>, <span class="hljs-string">"USA"</span>),
    (<span class="hljs-number">3</span>, <span class="hljs-string">"Alice"</span>, <span class="hljs-string">"Johnson"</span>, <span class="hljs-string">"Sales"</span>, <span class="hljs-number">60000</span>, <span class="hljs-number">25</span>, <span class="hljs-string">"2021-06-10"</span>, <span class="hljs-string">"UK"</span>),
    (<span class="hljs-number">4</span>, <span class="hljs-string">"Bob"</span>, <span class="hljs-string">"Brown"</span>, <span class="hljs-string">"Engineering"</span>, <span class="hljs-number">90000</span>, <span class="hljs-number">35</span>, <span class="hljs-string">"2018-07-01"</span>, <span class="hljs-string">"USA"</span>),
    (<span class="hljs-number">5</span>, <span class="hljs-string">"Charlie"</span>, <span class="hljs-string">"Wilson"</span>, <span class="hljs-string">"Sales"</span>, <span class="hljs-number">65000</span>, <span class="hljs-number">29</span>, <span class="hljs-string">"2020-11-05"</span>, <span class="hljs-string">"UK"</span>),
    (<span class="hljs-number">6</span>, <span class="hljs-string">"David"</span>, <span class="hljs-string">"Lee"</span>, <span class="hljs-string">"HR"</span>, <span class="hljs-number">55000</span>, <span class="hljs-number">27</span>, <span class="hljs-string">"2021-01-20"</span>, <span class="hljs-string">"USA"</span>),
    (<span class="hljs-number">7</span>, <span class="hljs-string">"Eve"</span>, <span class="hljs-string">"Davis"</span>, <span class="hljs-string">"Engineering"</span>, <span class="hljs-number">95000</span>, <span class="hljs-number">40</span>, <span class="hljs-string">"2017-04-12"</span>, <span class="hljs-string">"Canada"</span>),
    (<span class="hljs-number">8</span>, <span class="hljs-string">"Frank"</span>, <span class="hljs-string">"Miller"</span>, <span class="hljs-string">"Sales"</span>, <span class="hljs-number">70000</span>, <span class="hljs-number">33</span>, <span class="hljs-string">"2019-09-25"</span>, <span class="hljs-string">"UK"</span>),
    (<span class="hljs-number">9</span>, <span class="hljs-string">"Grace"</span>, <span class="hljs-string">"Taylor"</span>, <span class="hljs-string">"HR"</span>, <span class="hljs-number">58000</span>, <span class="hljs-number">26</span>, <span class="hljs-string">"2021-08-15"</span>, <span class="hljs-string">"Canada"</span>),
    (<span class="hljs-number">10</span>, <span class="hljs-string">"Henry"</span>, <span class="hljs-string">"Anderson"</span>, <span class="hljs-string">"Engineering"</span>, <span class="hljs-number">88000</span>, <span class="hljs-number">31</span>, <span class="hljs-string">"2020-02-28"</span>, <span class="hljs-string">"USA"</span>)
]

<span class="hljs-comment"># Create 1 million rows</span>
multiplied_data = [
    (i, <span class="hljs-string">f"firstname_<span class="hljs-subst">{i}</span>"</span>, <span class="hljs-string">f"lastname_<span class="hljs-subst">{i}</span>"</span>,
     employees_data[i % <span class="hljs-number">10</span>][<span class="hljs-number">3</span>],
     employees_data[i % <span class="hljs-number">10</span>][<span class="hljs-number">4</span>],
     employees_data[i % <span class="hljs-number">10</span>][<span class="hljs-number">5</span>],
     employees_data[i % <span class="hljs-number">10</span>][<span class="hljs-number">6</span>],
     employees_data[i % <span class="hljs-number">10</span>][<span class="hljs-number">7</span>])
    <span class="hljs-keyword">for</span> i <span class="hljs-keyword">in</span> range(<span class="hljs-number">1</span>, <span class="hljs-number">1</span>_000_001)
]

df = spark.createDataFrame(
    multiplied_data,
    [<span class="hljs-string">"id"</span>, <span class="hljs-string">"firstname"</span>, <span class="hljs-string">"lastname"</span>, <span class="hljs-string">"department"</span>, <span class="hljs-string">"salary"</span>, <span class="hljs-string">"age"</span>, <span class="hljs-string">"hire_date"</span>, <span class="hljs-string">"country"</span>]
)
<span class="hljs-comment"># Create DataFrame</span>
df = spark.createDataFrame(
    multiplied_data,
    [<span class="hljs-string">"id"</span>, <span class="hljs-string">"firstname"</span>, <span class="hljs-string">"lastname"</span>, <span class="hljs-string">"department"</span>, <span class="hljs-string">"salary"</span>, <span class="hljs-string">"age"</span>, <span class="hljs-string">"hire_date"</span>, <span class="hljs-string">"country"</span>]
)

start = time.time()
df.count()
count_time = round(time.time() - start, <span class="hljs-number">2</span>)

start = time.time()
df.limit(<span class="hljs-number">1</span>).collect()
limit_time = round(time.time() - start, <span class="hljs-number">2</span>)

start = time.time()
df.head(<span class="hljs-number">1</span>)
head_time = round(time.time() - start, <span class="hljs-number">2</span>)

spark.stop()
</code></pre>
<div class="hn-table">
<table>
<thead>
<tr>
<td><strong>Method</strong></td><td><strong>Plan Type</strong></td><td><strong>Execution Time (1M rows)</strong></td><td><strong>Notes</strong></td></tr>
</thead>
<tbody>
<tr>
<td>count()</td><td>HashAggregate + Exchange</td><td>26.33 s</td><td>Full scan + aggregation</td></tr>
<tr>
<td>limit(1)</td><td>TakeOrderedAndProject</td><td>0.62 s</td><td>Stops after first record</td></tr>
<tr>
<td>head(1)</td><td>TakeOrderedAndProject</td><td>0.42 s</td><td>Fastest, single partition</td></tr>
</tbody>
</table>
</div><p>The difference is significant for the same logical check.</p>
<p>So why does this difference exist? Spark’s execution model treats every action as a trigger for computation. count() is an aggregation action, requiring global communication, and limit(1) and head(1) are sampling actions, short-circuiting the job after fetching the first record. Catalyst generates a TakeOrderedAndProject node instead of HashAggregate, and the scheduler terminates once one task finishes.</p>
<p><strong>Plan comparison:</strong></p>
<div class="hn-table">
<table>
<thead>
<tr>
<td><strong>Action</strong></td><td><strong>Simplified Plan</strong></td><td><strong>Type</strong></td><td><strong>Behavior</strong></td></tr>
</thead>
<tbody>
<tr>
<td>count()</td><td>HashAggregate → Exchange → FileScan</td><td>Global</td><td>Full scan, wide dependency</td></tr>
<tr>
<td>limit(1)</td><td>TakeOrderedAndProject → FileScan</td><td>Local</td><td>Early stop, narrow dependency</td></tr>
<tr>
<td>head(1)</td><td>TakeOrderedAndProject → FileScan</td><td>Local</td><td>Early stop, single task</td></tr>
</tbody>
</table>
</div><p>Avoid using count() to check emptiness since it triggers a full scan. Use limit(1) or head(1) for lightweight existence checks. And reserve count() only when the total is required, because Spark will always process all data unless explicitly told to stop. Other alternatives</p>
<div class="hn-table">
<table>
<thead>
<tr>
<td><code>df.take(1)</code></td><td>Similar to head() returns array</td></tr>
</thead>
<tbody>
<tr>
<td><code>df.first()</code></td><td>Returns first Row or None</td></tr>
<tr>
<td><code>df.isEmpty()</code></td><td>Returns true if DataFrame has no rows</td></tr>
<tr>
<td><code>df.rdd.isEmpty()</code></td><td>RDD-level check</td></tr>
</tbody>
</table>
</div><h3 id="heading-scenario-9-window-wisely">Scenario 9: Window Wisely</h3>
<p>Window functions (rank(), dense_rank(), lag(), avg() with over(), and so on) are essential in analytics. They let you calculate running totals, rankings, or time-based metrics.</p>
<p>But in Spark, they’re not cheap, because they rely on shuffles and ordering.</p>
<p>Each window operation:</p>
<ul>
<li><p>Requires all rows for the same partition key to be co-located on the same node.</p>
</li>
<li><p>Requires sorting those rows by the orderBy() clause within each partition.</p>
</li>
</ul>
<p>If you omit partitionBy() (or use it with too broad a key), Spark treats the entire dataset as one partition, triggering a massive shuffle and global sort.</p>
<h4 id="heading-global-window-the-wrong-way">Global Window: The Wrong Way</h4>
<p>Let’s compute employee rankings by salary without partitioning:</p>
<pre><code class="lang-python"><span class="hljs-keyword">from</span> pyspark.sql.window <span class="hljs-keyword">import</span> Window
<span class="hljs-keyword">from</span> pyspark.sql.functions <span class="hljs-keyword">import</span> rank, col

window_spec = Window.orderBy(col(<span class="hljs-string">"salary"</span>).desc())

df_ranked = df.withColumn(<span class="hljs-string">"salary_rank"</span>, rank().over(window_spec))
</code></pre>
<p><strong>Simplified Logical Plan:</strong></p>
<pre><code class="lang-python">Window [rank() windowspecdefinition(orderBy=[salary DESC]) AS salary_rank]

└─ Sort [salary DESC], true

   └─ Exchange rangepartitioning(salary DESC, <span class="hljs-number">200</span>)

      └─ LogicalRDD [id, firstname, lastname, department, salary, age, hire_date, country]
</code></pre>
<p>Spark must shuffle and sort the entire dataset globally, a full sort across all rows. Every executor gets a slice of this single global range, and all data must move through the network.</p>
<h4 id="heading-partition-by-a-selective-key-the-better-way">Partition by a Selective Key: The Better Way</h4>
<p>Most analytics don’t need a global ranking. You likely want rankings within a department or group, not across the entire company.</p>
<pre><code class="lang-python">window_spec = Window.partitionBy(<span class="hljs-string">"department"</span>).orderBy(col(<span class="hljs-string">"salary"</span>).desc())

df_ranked = df.withColumn(<span class="hljs-string">"salary_rank"</span>, rank().over(window_spec))
</code></pre>
<p>Now Spark builds separate windows per department. Each partition’s data stays local, dramatically reducing shuffle size.</p>
<p><strong>Simplified Logical Plan:</strong></p>
<pre><code class="lang-python">Window [rank() windowspecdefinition(partitionBy=[department], orderBy=[salary DESC]) AS salary_rank]

└─ Sort [department ASC, salary DESC], false

   └─ Exchange hashpartitioning(department, <span class="hljs-number">200</span>)

      └─ LogicalRDD [id, firstname, lastname, department, salary, age, hire_date, country]
</code></pre>
<p>The Exchange now partitions data only by department. The shuffle boundary is narrower, fewer bytes transferred, fewer sort comparisons, and smaller spill risk.</p>
<h4 id="heading-real-world-benchmark-aws-glue-6">Real-World Benchmark: AWS Glue</h4>
<p>We can execute the windows function on the same 1 million row dataset:</p>
<pre><code class="lang-python">df = spark.createDataFrame(multiplied_data,
[<span class="hljs-string">"id"</span>, <span class="hljs-string">"firstname"</span>, <span class="hljs-string">"lastname"</span>, <span class="hljs-string">"department"</span>, <span class="hljs-string">"salary"</span>, <span class="hljs-string">"age"</span>,
 <span class="hljs-string">"hire_date"</span>, <span class="hljs-string">"country"</span>])

start = time.time()
window_global = Window.orderBy(col(<span class="hljs-string">"salary"</span>).desc())
df_global = df.withColumn(<span class="hljs-string">"salary_rank"</span>, rank().over(window_global))
df_global.count()
global_time = round(time.time() - start, <span class="hljs-number">2</span>)
print(<span class="hljs-string">f'global_time:<span class="hljs-subst">{global_time}</span>'</span>)

start = time.time()
window_local = Window.partitionBy(<span class="hljs-string">"department"</span>).orderBy(col(<span class="hljs-string">"salary"</span>).desc())
df_local = df.withColumn(<span class="hljs-string">"salary_rank"</span>, rank().over(window_local))
df_local.count()
local_time = round(time.time() - start, <span class="hljs-number">2</span>)
print(<span class="hljs-string">f'local_time:<span class="hljs-subst">{local_time}</span>'</span>)

spark.stop()
</code></pre>
<div class="hn-table">
<table>
<thead>
<tr>
<td><strong>Approach</strong></td><td><strong>Stage Count</strong></td><td><strong>Execution Time (1M rows)</strong></td><td><strong>Observation</strong></td></tr>
</thead>
<tbody>
<tr>
<td>Global Window (no partition)</td><td>5</td><td>30.21 s</td><td>Full dataset shuffle + global sort</td></tr>
<tr>
<td>Partitioned Window (by department)</td><td>3</td><td>1.74 s</td><td>Localized sort, fewer shuffle files</td></tr>
</tbody>
</table>
</div><p>Partitioning the window reduces shuffle data volume significantly and runtime as well. The difference grows exponentially as data scales.</p>
<h4 id="heading-under-the-hood-what-spark-actually-does-1">Under the Hood: What Spark Actually Does</h4>
<p>Each Window transformation adds a physical plan node like:</p>
<p>WindowExec [rank() windowspecdefinition(...)], frame=RangeFrame</p>
<p>This node is non-pipelined – it materializes input partitions before computing window metrics. Catalyst optimizer can’t push filters or projections inside WindowExec, which means:</p>
<ul>
<li><p>If you rank before filtering, Spark computes ranks for all rows.</p>
</li>
<li><p>If you order globally, Spark must sort everything before starting.</p>
</li>
</ul>
<p>That’s why window placement in your code matters almost as much as partition keys.</p>
<h4 id="heading-common-anti-patterns">Common Anti-Patterns:</h4>
<div class="hn-table">
<table>
<thead>
<tr>
<td><strong>Anti-Pattern</strong></td><td><strong>Why It Hurts</strong></td><td><strong>Fix</strong></td></tr>
</thead>
<tbody>
<tr>
<td>Missing partitionBy()</td><td>Global sort across dataset</td><td>Partition by key columns</td></tr>
<tr>
<td>Overly broad partition key</td><td>Creates too many small partitions</td><td>Use selective, not unique keys</td></tr>
<tr>
<td>Wide, unbounded window frame</td><td>Retains all rows in memory per key</td><td>Use bounded ranges (for example, rowsBetween(-3, 0))</td></tr>
<tr>
<td>Filtering after window</td><td>Computes unnecessary metrics</td><td>Filter first, then window</td></tr>
<tr>
<td>Multiple chained windows</td><td>Each triggers new sort</td><td>Combine window metrics in one spec</td></tr>
</tbody>
</table>
</div><p>Partition on selective keys to reduce shuffle volume, and avoid global windows that force full sorts and shuffles. Prefer bounded frames to keep state in memory and limit disk spill, and filter early while combining metrics to minimize unnecessary data flowing through WindowExec. Windows are powerful, but unbounded ones can silently crush performance. In Spark, partitioning isn’t optional. It’s the line between analytics and overhead.</p>
<h3 id="heading-scenario-10-incremental-aggregations-with-cache-and-persist">Scenario 10: Incremental Aggregations with Cache and Persist</h3>
<p>When multiple actions depend on the same expensive base computation, don’t recompute it every time. Materialize it once with cache() or persist(), then reuse it. Most Spark teams get this wrong in two ways:</p>
<ul>
<li><p>They never cache, so Spark recomputes long lineages (filters, joins, window ops) for every action.</p>
</li>
<li><p>They cache everything, blowing executor memory and making things worse.</p>
</li>
</ul>
<p>This scenario shows how to do it intelligently.</p>
<h4 id="heading-the-problem-recomputing-the-same-work-for-every-metric">The Problem: Recomputing the Same Work for Every Metric</h4>
<pre><code class="lang-python"><span class="hljs-keyword">from</span> pyspark.sql.functions <span class="hljs-keyword">import</span> col, avg, max <span class="hljs-keyword">as</span> max_, count

base = (
    df.filter(col(<span class="hljs-string">"department"</span>) == <span class="hljs-string">"Engineering"</span>)
      .filter(col(<span class="hljs-string">"country"</span>) == <span class="hljs-string">"USA"</span>)
      .filter(col(<span class="hljs-string">"salary"</span>) &gt; <span class="hljs-number">70000</span>)
)

avg_salary = base.groupBy(<span class="hljs-string">"department"</span>).agg(avg(<span class="hljs-string">"salary"</span>).alias(<span class="hljs-string">"avg_salary"</span>))
max_salary = base.groupBy(<span class="hljs-string">"department"</span>).agg(max_(<span class="hljs-string">"salary"</span>).alias(<span class="hljs-string">"max_salary"</span>))
cnt_salary = base.groupBy(<span class="hljs-string">"department"</span>).agg(count(<span class="hljs-string">"*"</span>).alias(<span class="hljs-string">"cnt"</span>))

Looks totally fine at a glance. But remember: Spark <span class="hljs-keyword">is</span> lazy.
Every time you trigger an action:

avg_salary.show()
max_salary.show()
cnt_salary.show()
</code></pre>
<p>Spark walks back to the same base definition and re-runs all filters and shuffles for each metric – unless you persist.</p>
<p>So instead of 1 filtered + shuffled dataset reused 3 times, you effectively get:</p>
<ul>
<li><p>3 jobs</p>
</li>
<li><p>3 scans / filter chains</p>
</li>
<li><p>3 groupBy shuffles</p>
</li>
</ul>
<p>for the same input slice.</p>
<p><strong>Simplified Logical Plan Shape (Without Cache):</strong></p>
<pre><code class="lang-python">HashAggregate [department], [avg/max/count]

└─ Exchange hashpartitioning(department)

   └─ Filter (department = <span class="hljs-string">'Engineering'</span> AND country = <span class="hljs-string">'USA'</span> AND salary &gt; <span class="hljs-number">70000</span>)

      └─ Scan ...
</code></pre>
<p>And Spark builds this three times. Even though the filter logic is identical, each action triggers a new job with:</p>
<ul>
<li><p>new stages,</p>
</li>
<li><p>new shuffles, and</p>
</li>
<li><p>new scans.</p>
</li>
</ul>
<p>On large datasets (hundreds of GBs), this is brutal.</p>
<h4 id="heading-the-better-approach-cache-the-shared-base">The Better Approach: Cache the Shared Base</h4>
<pre><code class="lang-python"><span class="hljs-keyword">from</span> pyspark.sql <span class="hljs-keyword">import</span> StorageLevel

base = (
    df.filter(col(<span class="hljs-string">"department"</span>) == <span class="hljs-string">"Engineering"</span>)
      .filter(col(<span class="hljs-string">"country"</span>) == <span class="hljs-string">"USA"</span>)
      .filter(col(<span class="hljs-string">"salary"</span>) &gt; <span class="hljs-number">70000</span>)
)

base = base.persist(StorageLevel.MEMORY_AND_DISK)

base.count()

avg_salary = base.groupBy(<span class="hljs-string">"department"</span>).agg(avg(<span class="hljs-string">"salary"</span>).alias(<span class="hljs-string">"avg_salary"</span>))
max_salary = base.groupBy(<span class="hljs-string">"department"</span>).agg(max_(<span class="hljs-string">"salary"</span>).alias(<span class="hljs-string">"max_salary"</span>))
cnt_salary = base.groupBy(<span class="hljs-string">"department"</span>).agg(count(<span class="hljs-string">"*"</span>).alias(<span class="hljs-string">"cnt"</span>))

avg_salary.show()
max_salary.show()
cnt_salary.show()

base.unpersist()
</code></pre>
<p>Now, the filters and initial scan run once, the results are cached, and all subsequent aggregates read from cached data instead of recomputing upstream logic.</p>
<p><strong>Logical Plan Shape (With Cache):</strong></p>
<p>Before materialization (base.count()), the plan still shows the lineage. Afterward, subsequent actions operate off the cached node.</p>
<pre><code class="lang-python">InMemoryRelation [department, salary, country, ...]

   └─ * Cached <span class="hljs-keyword">from</span>:

      Filter (department = <span class="hljs-string">'Engineering'</span> AND country = <span class="hljs-string">'USA'</span> AND salary &gt; <span class="hljs-number">70000</span>)

      └─ Scan parquet employees_large ...
</code></pre>
<p>Then:</p>
<pre><code class="lang-python">HashAggregate [department], [avg/max/count]

└─ InMemoryRelation [...]
</code></pre>
<p>One heavy pipeline, many cheap reads. The DAG becomes flatter:</p>
<ul>
<li><p>Expensive scan &amp; filter &amp; shuffle: once.</p>
</li>
<li><p>Cheap aggregations: N times from memory/disk.</p>
</li>
</ul>
<h4 id="heading-real-world-benchmark-aws-glue-7">Real-World Benchmark: AWS Glue</h4>
<pre><code class="lang-python">df = spark.createDataFrame(multiplied_data,
[<span class="hljs-string">"id"</span>, <span class="hljs-string">"firstname"</span>, <span class="hljs-string">"lastname"</span>, <span class="hljs-string">"department"</span>, <span class="hljs-string">"salary"</span>, <span class="hljs-string">"age"</span>,
<span class="hljs-string">"hire_date"</span>, <span class="hljs-string">"country"</span>])

base = (
    df.filter(col(<span class="hljs-string">"department"</span>) == <span class="hljs-string">"Engineering"</span>)
      .filter(col(<span class="hljs-string">"country"</span>) == <span class="hljs-string">"USA"</span>)
      .filter(col(<span class="hljs-string">"salary"</span>) &gt; <span class="hljs-number">85000</span>)
)


start = time.time()

avg_salary = base.groupBy(<span class="hljs-string">"department"</span>).agg(avg(<span class="hljs-string">"salary"</span>).alias(<span class="hljs-string">"avg_salary"</span>))
max_salary = base.groupBy(<span class="hljs-string">"department"</span>).agg(max_(<span class="hljs-string">"salary"</span>).alias(<span class="hljs-string">"max_salary"</span>))
cnt = base.groupBy(<span class="hljs-string">"department"</span>).agg(count(<span class="hljs-string">"*"</span>).alias(<span class="hljs-string">"emp_count"</span>))

print(<span class="hljs-string">"---- Without Cache ----"</span>)
avg_salary.show()
max_salary.show()
cnt.show()

no_cache_time = round(time.time() - start, <span class="hljs-number">2</span>)
print(<span class="hljs-string">f"Total time without cache: <span class="hljs-subst">{no_cache_time}</span> seconds"</span>)


<span class="hljs-keyword">from</span> pyspark.sql <span class="hljs-keyword">import</span> DataFrame

base_cached = base.persist(StorageLevel.MEMORY_AND_DISK)
base_cached.count()  <span class="hljs-comment"># materialize cache</span>

start = time.time()

avg_salary_c = base_cached.groupBy(<span class="hljs-string">"department"</span>).agg(avg(<span class="hljs-string">"salary"</span>).alias(<span class="hljs-string">"avg_salary"</span>))
max_salary_c = base_cached.groupBy(<span class="hljs-string">"department"</span>).agg(max_(<span class="hljs-string">"salary"</span>).alias(<span class="hljs-string">"max_salary"</span>))
cnt_c = base_cached.groupBy(<span class="hljs-string">"department"</span>).agg(count(<span class="hljs-string">"*"</span>).alias(<span class="hljs-string">"emp_count"</span>))

print(<span class="hljs-string">"---- With Cache ----"</span>)
avg_salary_c.show()
max_salary_c.show()
cnt_c.show()

cache_time = round(time.time() - start, <span class="hljs-number">2</span>)
print(<span class="hljs-string">f"Total time with cache: <span class="hljs-subst">{cache_time}</span> seconds"</span>)

<span class="hljs-comment"># Cleanup</span>
base_cached.unpersist()

print(<span class="hljs-string">"\n==== Summary ===="</span>)
print(<span class="hljs-string">f"Without cache: <span class="hljs-subst">{no_cache_time}</span>s | With cache: <span class="hljs-subst">{cache_time}</span>s"</span>)
print(<span class="hljs-string">"================="</span>)

spark.stop()
</code></pre>
<div class="hn-table">
<table>
<thead>
<tr>
<td><strong>Approach</strong></td><td><strong>Execution Time (1M rows)</strong></td></tr>
</thead>
<tbody>
<tr>
<td>Without Cache</td><td>30.75 s</td></tr>
<tr>
<td>With Cache</td><td>3.34 s</td></tr>
</tbody>
</table>
</div><h4 id="heading-under-the-hood-why-this-works"><strong>Under the Hood: Why This Works</strong></h4>
<p>Using cache() or persist() in Spark inserts an InMemoryRelation / InMemoryTableScanExec node so that expensive intermediate results are stored in executor memory (or memory+disk). This allows future jobs to reuse cached blocks instead of re-scanning sources or re-computing shuffles. This shortens downstream logical plans, reduces repeated shuffles, and lowers load on systems like S3, HDFS, or JDBC.</p>
<p>Without caching, every action replays the full lineage and Spark recomputes the data unless another operator or AQE optimization has already materialized part of it. But caching should not become “cache everything”. Rather, you should avoid caching very large DataFrames used only once, wide raw inputs instead of filtered/aggregated subsets, or long-lived caches that are never unpersisted.</p>
<p>A good rule of thumb is to cache only when the DataFrame is expensive to recompute (joins, filters, windows, UDFs), is used at least twice, and is reasonably sized after filtering so it can fit in memory or work with MEMORY_AND_DISK. Otherwise, allow Spark to recompute.</p>
<p>Conceptually, caching converts a tall, repetitive DAG such as repeated “HashAggregate → Exchange → Filter → Scan” sequences into a hub-and-spoke design where one heavy cached hub feeds multiple lightweight downstream aggregates.</p>
<p>When multiple actions depend on the same expensive computation, cache or persist the shared base to flatten the DAG, eliminate repeated scans and shuffles, and improve end-to-end performance. All this while being intentional by caching only when reuse is real, the data size is safe, and always calling <code>unpersist()</code> when done.</p>
<p>Don’t make Spark re-solve the same puzzle three times. Let it solve it once, remember the answer, and move on.</p>
<h3 id="heading-scenario-11-reduce-shuffles">Scenario 11: Reduce Shuffles</h3>
<p>Shuffles are Spark’s invisible tax collectors. Every time your data crosses executors, you pay in CPU, disk I/O, and network bandwidth.</p>
<p>Two of the most common yet misunderstood transformations that trigger or avoid shuffles are coalesce() and repartition(). Both change partition counts, but they do it in fundamentally different ways.</p>
<h4 id="heading-the-problem"><strong>The Problem</strong></h4>
<p>Writing <code>df_result = df.repartition(10)</code> and thinking “I’m just changing partitions so Spark won’t move data unnecessarily.” But that assumption is wrong. <code>repartition()</code> always performs a full shuffle, even when:</p>
<ul>
<li><p>You are reducing partitions (from 200 → 10), or</p>
</li>
<li><p>You are increasing partitions (from 10 → 200).</p>
</li>
</ul>
<p>In both cases, Spark redistributes every row across the cluster according to a new hash partitioning scheme. So even if your data is already partitioned optimally, repartition() will still reshuffle it, adding a stage boundary.</p>
<p><strong>Logical Plan:</strong></p>
<pre><code class="lang-python">Exchange hashpartitioning(...)

└─ LogicalRDD [...]
</code></pre>
<p>That Exchange node signals a wide dependency: Spark spills intermediate data to disk, transfers it over the network, and reloads it before the next stage. In short: repartition() = "new shuffle, no matter what."</p>
<h4 id="heading-the-better-approach-coalesce">The Better Approach: coalesce()</h4>
<p>If your goal is to reduce the number of partitions, for example, before writing results to S3 or Snowflake – use coalesce() instead.</p>
<p><code>df_result = df.coalesce(10)</code></p>
<p>coalesce() merges existing partitions locally within each executor, avoiding the costly reshuffle step. It uses a narrow dependency, meaning each output partition depends on one or more existing partitions <em>from the same node</em>.</p>
<p>Coalesce</p>
<p>└─ LogicalRDD [...]</p>
<ul>
<li><p>No Exchange.</p>
</li>
<li><p>No network shuffle.</p>
</li>
<li><p>Just local merges – fast and cheap.</p>
</li>
</ul>
<h4 id="heading-real-world-benchmark-aws-glue-8">Real-World Benchmark: AWS Glue</h4>
<pre><code class="lang-python">df = spark.createDataFrame(multiplied_data,
[<span class="hljs-string">"id"</span>, <span class="hljs-string">"firstname"</span>, <span class="hljs-string">"lastname"</span>, <span class="hljs-string">"department"</span>, <span class="hljs-string">"salary"</span>, <span class="hljs-string">"age"</span>, <span class="hljs-string">"hire_date"</span>, <span class="hljs-string">"country"</span>])

start = time.time()
df_repart = df.repartition(<span class="hljs-number">10</span>)
df_repart.count()
print(<span class="hljs-string">"Repartition time:"</span>, round(time.time() - start, <span class="hljs-number">2</span>), <span class="hljs-string">"sec"</span>)

start = time.time()
df_coalesced = df.coalesce(<span class="hljs-number">10</span>)
df_coalesced.count()
print(<span class="hljs-string">"Coalesce time:"</span>, round(time.time() - start, <span class="hljs-number">2</span>), <span class="hljs-string">"sec"</span>)

spark.stop()
</code></pre>
<div class="hn-table">
<table>
<thead>
<tr>
<td><strong>Operation</strong></td><td><strong>Plan Node</strong></td><td><strong>Shuffle Triggered</strong></td><td><strong>Glue Runtime</strong></td><td><strong>Observation</strong></td></tr>
</thead>
<tbody>
<tr>
<td>repartition(10)</td><td>Exchange</td><td>Yes</td><td>18.2 s</td><td>Full cluster reshuffle</td></tr>
<tr>
<td>coalesce(10)</td><td>Coalesce</td><td>No</td><td>1.99 s</td><td>Local partition merge only</td></tr>
</tbody>
</table>
</div><p>Even though both ended with 10 partitions, repartition() took significantly longer all because of the unnecessary shuffle.</p>
<h4 id="heading-why-this-matters">Why This Matters</h4>
<p>Each Exchange node in your logical plan creates a new stage in your DAG, meaning:</p>
<ul>
<li><p>Extra disk I/O</p>
</li>
<li><p>Extra serialization</p>
</li>
<li><p>Extra network transfer</p>
</li>
</ul>
<p>That’s why avoiding just one shuffle in a Glue ETL pipeline can save seconds to minutes per run, especially on wide datasets.</p>
<p><strong>When to use which:</strong></p>
<div class="hn-table">
<table>
<thead>
<tr>
<td><strong>Goal</strong></td><td><strong>Transformation</strong></td><td><strong>Reasoning</strong></td></tr>
</thead>
<tbody>
<tr>
<td>Increase parallelism for heavy groupBy or join</td><td>repartition()</td><td>Distributes data evenly across executors</td></tr>
<tr>
<td>Reduce file count before writing</td><td>coalesce()</td><td>Avoids shuffle, merges partitions locally</td></tr>
<tr>
<td>Rebalance skewed data before a join</td><td>repartition(by="key")</td><td>Enables better key distribution</td></tr>
<tr>
<td>Optimize output after aggregation</td><td>coalesce()</td><td>Prevents too many small output files</td></tr>
</tbody>
</table>
</div><h4 id="heading-aqe-and-auto-coalescing">AQE and Auto Coalescing</h4>
<p>You can enable Adaptive Query Execution (AQE) in AWS Glue 3.0+ to let Spark merge small shuffle partitions automatically:</p>
<p><code>spark.conf.set("spark.sql.adaptive.enabled", "true")</code></p>
<p><code>spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")</code></p>
<p>With AQE, Spark dynamically combines small partitions <em>after</em> shuffle to balance performance and I/O.</p>
<p>repartition() always triggers a shuffle, while coalesce() avoids shuffles and is ideal for local merges before writes. You should always inspect Exchange nodes to identify shuffle points. Note that in AWS Glue, avoiding even one shuffle can yield ~7× runtime improvement at the 1M-row scale. Finally, use AQE to enable dynamic partition coalescing in larger workflows.</p>
<h3 id="heading-scenario-12-know-your-shuffle-triggers">Scenario 12: Know Your Shuffle Triggers</h3>
<p>Much of Spark's performance comes from invisible data movement. Every shuffle boundary adds a new stage, a new write–read cycle, and sometimes minutes of extra execution time.</p>
<p>In Spark, any operation that requires rearranging data between partitions introduces a wide dependency, represented in the logical plan as an Exchange node.</p>
<p>Common shuffle triggers:</p>
<div class="hn-table">
<table>
<thead>
<tr>
<td><strong>Operation</strong></td><td><strong>Why It Shuffles</strong></td><td><strong>Plan Node</strong></td></tr>
</thead>
<tbody>
<tr>
<td>join()</td><td>Records with the same key must be co-located for matching</td><td>Exchange (on join keys)</td></tr>
<tr>
<td>groupBy() / agg()</td><td>Keys must gather to a single partition for aggregation</td><td>Exchange</td></tr>
<tr>
<td>distinct()</td><td>Spark must compare all values across partitions</td><td>Exchange</td></tr>
<tr>
<td>orderBy()</td><td>Requires global ordering of data</td><td>Exchange</td></tr>
<tr>
<td>repartition()</td><td>Explicit reshuffle for partition balancing</td><td>Exchange</td></tr>
</tbody>
</table>
</div><p>Each Exchange means a shuffle stage: Spark writes partition data to disk, transfers it over the network, and reads it back into memory on the next stage. That’s your hidden performance cliff.</p>
<pre><code class="lang-python">df_result = (
    df.groupBy(<span class="hljs-string">"department"</span>)
      .agg(sum(<span class="hljs-string">"salary"</span>).alias(<span class="hljs-string">"total_salary"</span>))
      .join(df.select(<span class="hljs-string">"department"</span>, <span class="hljs-string">"country"</span>)
            .distinct(), <span class="hljs-string">"department"</span>)
      .orderBy(<span class="hljs-string">"total_salary"</span>, ascending=<span class="hljs-literal">False</span>)
)

df_result.explain(<span class="hljs-string">"formatted"</span>)
</code></pre>
<p><strong>Logical Plan Simplified:</strong></p>
<pre><code class="lang-python">Sort [total_salary DESC]

└─ Exchange (<span class="hljs-keyword">global</span> sort)

   └─ SortMergeJoin [department]

      ├─ Exchange (groupBy shuffle)

      │   └─ HashAggregate (sum salary)

      └─ Exchange (distinct shuffle)

          └─ Aggregate (department, country)
</code></pre>
<p>We can see three Exchange nodes, one for the aggregation, one for the distinct join, and one for the global sort. That’s three separate shuffles, three full dataset transfers.</p>
<h4 id="heading-better-approach">Better Approach</h4>
<p>Whenever possible, combine wide transformations into a single stage before an action. For instance, you can compute aggregates and join results in one consistent shuffle domain:</p>
<pre><code class="lang-python">agg_df = df.groupBy(<span class="hljs-string">"department"</span>) \
    .agg(sum(<span class="hljs-string">"salary"</span>) \
    .alias(<span class="hljs-string">"total_salary"</span>))

country_df = df.select(<span class="hljs-string">"department"</span>, <span class="hljs-string">"country"</span>).distinct()

df_result = (
    agg_df.join(country_df, <span class="hljs-string">"department"</span>)
          .sortWithinPartitions(<span class="hljs-string">"total_salary"</span>, ascending=<span class="hljs-literal">False</span>)
)
</code></pre>
<p><strong>Logical Plan Simplified:</strong></p>
<pre><code class="lang-python">SortWithinPartitions [total_salary DESC]

└─ SortMergeJoin [department]

   ├─ Exchange (shared shuffle <span class="hljs-keyword">for</span> join)

   └─ Exchange (shared shuffle <span class="hljs-keyword">for</span> distinct)
</code></pre>
<p>Now Spark reuses shuffle partitions across compatible operations – only one shuffle boundary remains. The rest execute as narrow transformations.</p>
<h4 id="heading-real-world-benchmark-aws-glue-1m">Real-World Benchmark: AWS Glue (1M)</h4>
<pre><code class="lang-python">df = spark.createDataFrame(multiplied_data,
[<span class="hljs-string">"id"</span>, <span class="hljs-string">"firstname"</span>, <span class="hljs-string">"lastname"</span>, <span class="hljs-string">"department"</span>, <span class="hljs-string">"salary"</span>, <span class="hljs-string">"age"</span>, <span class="hljs-string">"hire_date"</span>, <span class="hljs-string">"country"</span>]).repartition(<span class="hljs-number">20</span>)

<span class="hljs-keyword">from</span> pyspark.sql.functions <span class="hljs-keyword">import</span> sum <span class="hljs-keyword">as</span> sum_

start = time.time()

dept_salary = (
    df.groupBy(<span class="hljs-string">"department"</span>)
      .agg(sum_(<span class="hljs-string">"salary"</span>).alias(<span class="hljs-string">"total_salary"</span>))
)

dept_country = (
    df.select(<span class="hljs-string">"department"</span>, <span class="hljs-string">"country"</span>)
      .distinct()
)

naive_result = (
    dept_salary.join(dept_country, <span class="hljs-string">"department"</span>, <span class="hljs-string">"inner"</span>)
               .orderBy(col(<span class="hljs-string">"total_salary"</span>).desc())
)

naive_count = naive_result.count()
naive_time = round(time.time() - start, <span class="hljs-number">2</span>)


start = time.time()

dept_country_once = (
    df.select(<span class="hljs-string">"department"</span>, <span class="hljs-string">"country"</span>)
      .distinct()
)

optimized = (
    df.groupBy(<span class="hljs-string">"department"</span>)
      .agg(sum_(<span class="hljs-string">"salary"</span>).alias(<span class="hljs-string">"total_salary"</span>))
      .join(dept_country_once, <span class="hljs-string">"department"</span>, <span class="hljs-string">"inner"</span>)
      .sortWithinPartitions(col(<span class="hljs-string">"total_salary"</span>).desc())
      <span class="hljs-comment"># local ordering, avoids extra global shuffle</span>
)

opt_count = optimized.count()
opt_time = round(time.time() - start, <span class="hljs-number">2</span>)

print(<span class="hljs-string">"Optimized result count:"</span>, opt_count)
print(<span class="hljs-string">"Optimized pipeline time:"</span>, opt_time, <span class="hljs-string">"sec"</span>)

print(<span class="hljs-string">"\nOptimized plan:"</span>)
optimized.explain(<span class="hljs-string">"formatted"</span>)

spark.stop()
</code></pre>
<div class="hn-table">
<table>
<thead>
<tr>
<td><strong>Pipeline</strong></td><td><strong># of Shuffles</strong></td><td><strong>Glue Runtime (sec)</strong></td><td><strong>Observation</strong></td></tr>
</thead>
<tbody>
<tr>
<td>Naive: groupBy + distinct + orderBy</td><td>3</td><td>28.99 s</td><td>Multiple wide stages</td></tr>
<tr>
<td>Optimized: combined agg + join + sortWithinPartitions</td><td>1</td><td>3.52 s</td><td>Single wide stage</td></tr>
</tbody>
</table>
</div><p>By merging compatible stages and using sortWithinPartitions() instead of global orderBy(), the job ran significantly faster on the same dataset, with fewer Exchange nodes and shorter lineage. Run df.explain and search for Exchange. Each one signals a full shuffle. You can also check Spark UI → SQL tab → Exchange Read/Write Size to see exactly how much data moved.</p>
<p>Every Exchange represents a shuffle, adding serialization, network I/O, and stage overhead, so avoid chaining wide operations back-to-back by combining them under a consistent partition key. Prefer sortWithinPartitions() over global orderBy() when ordering is local, monitor plan depth to catch consecutive wide dependencies, and note that in AWS Glue eliminating even one shuffle in a 1M-row job can significantly reduce runtime.</p>
<h3 id="heading-scenario-13-tune-parallelism-shuffle-partitions-amp-aqe">Scenario 13: Tune Parallelism: Shuffle Partitions &amp; AQE</h3>
<p>Most Spark jobs are either over-parallelized (thousands of tiny tasks doing almost nothing, flooding the driver and filesystem) or under-parallelized (a handful of huge tasks doing all the work, causing slow stages and skew-like behavior). Both waste resources. We can control this behavior using spark.sql.shuffle.partitions and Adaptive Query Execution (AQE).</p>
<p>By default (in many environments), the default value <code>spark.conf.get("spark.sql.shuffle.partitions")</code> is 200, meaning that every shuffle produces approximately 200 shuffle partitions, regardless of data size. That means every shuffle (groupBy, join, distinct, and so on) creates ~200 shuffle partitions. Whether this default is reasonable depends entirely on the workload:</p>
<ul>
<li><p>If you’re processing 2 GB, 200 partitions might be great.</p>
</li>
<li><p>If you’re processing 5 MB, 200 partitions is comedy – 200 tiny tasks, overhead &gt; work.</p>
</li>
<li><p>If you’re processing 2 TB, 200 partitions might be too few – tasks become huge and slow.</p>
</li>
</ul>
<h4 id="heading-example-a-the-default-plan-too-many-tiny-tasks">Example A: The Default Plan (Too Many Tiny Tasks)</h4>
<pre><code class="lang-python"><span class="hljs-keyword">from</span> pyspark.sql <span class="hljs-keyword">import</span> SparkSession
<span class="hljs-keyword">from</span> pyspark.sql.functions <span class="hljs-keyword">import</span> sum <span class="hljs-keyword">as</span> sum_

spark = SparkSession.builder.appName(<span class="hljs-string">"ParallelismExample"</span>).getOrCreate()

spark.conf.get(<span class="hljs-string">"spark.sql.shuffle.partitions"</span>)  <span class="hljs-comment"># '200'</span>

data = [
    (<span class="hljs-number">1</span>, <span class="hljs-string">"John"</span>, <span class="hljs-string">"Engineering"</span>, <span class="hljs-number">90000</span>),
    (<span class="hljs-number">2</span>, <span class="hljs-string">"Alice"</span>, <span class="hljs-string">"Engineering"</span>, <span class="hljs-number">85000</span>),
    (<span class="hljs-number">3</span>, <span class="hljs-string">"Bob"</span>, <span class="hljs-string">"Sales"</span>, <span class="hljs-number">75000</span>),
    (<span class="hljs-number">4</span>, <span class="hljs-string">"Eve"</span>, <span class="hljs-string">"Sales"</span>, <span class="hljs-number">72000</span>),
    (<span class="hljs-number">5</span>, <span class="hljs-string">"Grace"</span>, <span class="hljs-string">"HR"</span>, <span class="hljs-number">65000</span>),
]

df = spark.createDataFrame(data, [<span class="hljs-string">"id"</span>, <span class="hljs-string">"name"</span>, <span class="hljs-string">"department"</span>, <span class="hljs-string">"salary"</span>])

agg_df = df.groupBy(<span class="hljs-string">"department"</span>).agg(sum_(<span class="hljs-string">"salary"</span>).alias(<span class="hljs-string">"total_salary"</span>))
agg_df.explain(<span class="hljs-string">"formatted"</span>)
</code></pre>
<p>Even though there are only 3 departments, Spark will still create 200 shuffle partitions – meaning 200 tasks for 3 groups of data.</p>
<p><strong>Effect:</strong> Each task has almost nothing to do. Spark spends more time planning and scheduling than actually computing.</p>
<h4 id="heading-example-b-tuned-plan-balanced-parallelism">Example B: Tuned Plan (Balanced Parallelism)</h4>
<pre><code class="lang-python">spark.conf.set(<span class="hljs-string">"spark.sql.shuffle.partitions"</span>, <span class="hljs-string">"8"</span>)
agg_df = df.groupBy(<span class="hljs-string">"department"</span>).agg(sum_(<span class="hljs-string">"salary"</span>).alias(<span class="hljs-string">"total_salary"</span>))
agg_df.explain(<span class="hljs-string">"formatted"</span>)
</code></pre>
<p>Now Spark launches only <strong>8 partitions</strong> still parallelized, but not wasteful. Even in this small example, you can visually feel the difference: one logical change, but a completely leaner physical plan.</p>
<h4 id="heading-the-real-problem-static-tuning-doesnt-scale">The Real Problem: Static Tuning Doesn’t Scale</h4>
<p>In production, job sizes vary:</p>
<ul>
<li><p>Today: 10 GB</p>
</li>
<li><p>Tomorrow: 500 GB</p>
</li>
<li><p>Next week: 200 MB (sampling run)</p>
</li>
</ul>
<p>Manually changing shuffle partitions for each run is neither practical nor reliable. That’s where Adaptive Query Execution (AQE) steps in.</p>
<h4 id="heading-adaptive-query-execution-aqe-smarter-dynamic-parallelism">Adaptive Query Execution (AQE): Smarter, Dynamic Parallelism</h4>
<p>AQE doesn’t guess. It measures actual shuffle statistics at runtime and rewrites the plan <em>while the job is running.</em></p>
<pre><code class="lang-python">spark.conf.set(<span class="hljs-string">"spark.sql.adaptive.enabled"</span>, <span class="hljs-string">"true"</span>)
spark.conf.set(<span class="hljs-string">"spark.sql.adaptive.coalescePartitions.enabled"</span>, <span class="hljs-string">"true"</span>)
spark.conf.set(<span class="hljs-string">"spark.sql.adaptive.coalescePartitions.minPartitionSize"</span>, <span class="hljs-string">"64m"</span>)
spark.conf.set(<span class="hljs-string">"spark.sql.adaptive.coalescePartitions.maxPartitionSize"</span>, <span class="hljs-string">"256m"</span>)
</code></pre>
<div class="hn-table">
<table>
<thead>
<tr>
<td><strong>Configuration</strong></td><td><strong>Shuffle Partitions</strong></td><td><strong>Task Distribution</strong></td><td><strong>Observation</strong></td></tr>
</thead>
<tbody>
<tr>
<td>Default</td><td>200</td><td>200 tasks / 3 groups</td><td>Too granular, mostly idle</td></tr>
<tr>
<td>Tuned</td><td>8</td><td>8 tasks / 3 groups</td><td>Balanced execution</td></tr>
</tbody>
</table>
</div><p>AQE merges tiny shuffle partitions, or splits huge ones, based on <strong>real-time data metrics</strong>, not pre-set assumptions.</p>
<pre><code class="lang-python">df = spark.createDataFrame(multiplied_data,
    [<span class="hljs-string">"id"</span>, <span class="hljs-string">"firstname"</span>, <span class="hljs-string">"lastname"</span>, <span class="hljs-string">"department"</span>, <span class="hljs-string">"salary"</span>, <span class="hljs-string">"age"</span>,
     <span class="hljs-string">"hire_date"</span>, <span class="hljs-string">"country"</span>])

start = time.time()
agg_df = df.groupBy(<span class="hljs-string">"department"</span>).agg(sum_(<span class="hljs-string">"salary"</span>).alias(<span class="hljs-string">"total_salary"</span>))
agg_df.count()

print(<span class="hljs-string">f'Num Partitions df: <span class="hljs-subst">{df.rdd.getNumPartitions()}</span>'</span>)
print(<span class="hljs-string">f'Num Partitions aggdf: <span class="hljs-subst">{agg_df.rdd.getNumPartitions()}</span>'</span>)
print(<span class="hljs-string">"Execution time:"</span>, round(time.time() - start, <span class="hljs-number">2</span>), <span class="hljs-string">"sec"</span>)

spark.stop()
</code></pre>
<div class="hn-table">
<table>
<thead>
<tr>
<td><strong>Stage</strong></td><td><strong>Without AQE</strong></td><td><strong>With AQE</strong></td></tr>
</thead>
<tbody>
<tr>
<td>Stage 3 (Aggregation)</td><td>200 shuffle partitions, each reading KBs</td><td>8–12 coalesced partitions</td></tr>
<tr>
<td>Stage 4 (Join Output)</td><td>200 shuffle files</td><td>Merged into balanced partitions</td></tr>
<tr>
<td><strong>Result</strong></td><td>Many small tasks, high overhead</td><td>Fewer, balanced tasks, faster runtime</td></tr>
</tbody>
</table>
</div><h4 id="heading-understanding-the-plan"><strong>Understanding the Plan</strong></h4>
<p>Before AQE (static):</p>
<p><code>Exchange hashpartitioning(department, 200)</code></p>
<p>With AQE: AdaptiveSparkPlan (coalesced)</p>
<p><code>HashAggregate(keys=[department], functions=[sum(salary)])</code></p>
<p><code>Exchange hashpartitioning(department, 200)</code>  <em># runtime coalesced to 12</em></p>
<p>The logical plan remains the same, but the physical execution plan is rewritten during runtime. Spark intelligently reduces or merges shuffle partitions based on data volume.</p>
<p>Spark’s default 200 shuffle partitions often misfit real workloads. Static tuning may work for predictable pipelines, but fails with variable data. On the other hand, AQE uses shuffle statistics to dynamically coalesce partitions at runtime, use it with sensible ceilings (for example, 400 partitions) and always verify in the Spark UI to catch over-partitioning (many tasks reading KBs) or under-partitioning (few tasks reading GBs).</p>
<h3 id="heading-scenario-14-handle-skew-smartly">Scenario 14: Handle Skew Smartly</h3>
<p>In an ideal Spark world, all partitions contain roughly equal amounts of data. But real datasets are rarely that kind. If one key (say "USA", "2024", or "customer_123") holds millions of rows while others have only a few, Spark ends up with one or two massive partitions. Those partitions take disproportionately longer to process, leaving other executors idle. That’s data skew: the silent killer of parallelism.</p>
<p>You’ll often spot it in Spark UI:</p>
<ul>
<li><p>198 tasks finish quickly.</p>
</li>
<li><p>2 tasks take 10× longer.</p>
</li>
<li><p>Stage stays stuck at 98% for minutes.</p>
</li>
</ul>
<h4 id="heading-example-a-the-skew-problem">Example A: The Skew Problem</h4>
<pre><code class="lang-python"><span class="hljs-keyword">from</span> pyspark.sql <span class="hljs-keyword">import</span> SparkSession, functions <span class="hljs-keyword">as</span> F

spark = SparkSession.builder.appName(<span class="hljs-string">"DataSkewDemo"</span>).getOrCreate()

<span class="hljs-comment"># Create skewed dataset</span>
df = spark.range(<span class="hljs-number">0</span>, <span class="hljs-number">10000</span>).toDF(<span class="hljs-string">"id"</span>) \
    .withColumn(<span class="hljs-string">"department"</span>,
        F.when(F.col(<span class="hljs-string">"id"</span>) &lt; <span class="hljs-number">8000</span>, <span class="hljs-string">"Engineering"</span>)  <span class="hljs-comment"># 80% of data</span>
         .when(F.col(<span class="hljs-string">"id"</span>) &lt; <span class="hljs-number">9000</span>, <span class="hljs-string">"Sales"</span>)
         .otherwise(<span class="hljs-string">"HR"</span>)) \
    .withColumn(<span class="hljs-string">"salary"</span>, (F.rand() * <span class="hljs-number">100000</span>).cast(<span class="hljs-string">"int"</span>))

df.groupBy(<span class="hljs-string">"department"</span>).count().show()
</code></pre>
<p><img src="https://cdn.hashnode.com/res/hashnode/image/upload/v1769464257950/6963171b-92de-4721-9bb3-6951c68a2775.png" alt="6963171b-92de-4721-9bb3-6951c68a2775" class="image--center mx-auto" width="594" height="446" loading="lazy"></p>
<p>Spark will hash “Engineering” into just one reducer partition, making it heavier than others. That single task becomes a bottleneck, the shuffle has technically completed, but the stage waits for that one lagging task.</p>
<h4 id="heading-example-b-the-solution-salting-hot-keys">Example B: The Solution: Salting Hot Keys</h4>
<p>To handle skew, we the hot key (Engineering) into multiple pseudo-keys using a random salt. This redistributes that large partition across multiple reducers.</p>
<pre><code class="lang-python"><span class="hljs-keyword">from</span> pyspark.sql.functions <span class="hljs-keyword">import</span> rand, concat, lit, floor

salt_buckets = <span class="hljs-number">10</span>

df_salted = (
    df.withColumn(
        <span class="hljs-string">"department_salted"</span>,
        F.when(F.col(<span class="hljs-string">"department"</span>) == <span class="hljs-string">"Engineering"</span>,
            F.concat(F.col(<span class="hljs-string">"department"</span>), lit(<span class="hljs-string">"_"</span>),
                     (F.floor(rand() * salt_buckets))))
         .otherwise(F.col(<span class="hljs-string">"department"</span>))
    )
)

df_salted.groupBy(<span class="hljs-string">"department_salted"</span>).agg(F.avg(<span class="hljs-string">"salary"</span>))
</code></pre>
<p><img src="https://cdn.hashnode.com/res/hashnode/image/upload/v1769464242395/c4ec0bc6-67bf-488c-b619-7130ceef878e.png" alt="c4ec0bc6-67bf-488c-b619-7130ceef878e" class="image--center mx-auto" width="536" height="468" loading="lazy"></p>
<p>Now “Engineering” isn’t one hot key – it’s <strong>10 smaller keys</strong> like Engineering_0, Engineering_1, ..., Engineering_9. Each one goes to a separate reducer partition, enabling parallel processing.</p>
<h4 id="heading-example-c-post-aggregation-desalting">Example C: Post-Aggregation Desalting</h4>
<p>After aggregating, recombine salted keys to get the original department names:</p>
<pre><code class="lang-python">df_final = (
    df_salted.groupBy(<span class="hljs-string">"department_salted"</span>)
        .agg(F.avg(<span class="hljs-string">"salary"</span>).alias(<span class="hljs-string">"avg_salary"</span>))
        .withColumn(<span class="hljs-string">"department"</span>, F.split(F.col(<span class="hljs-string">"department_salted"</span>), <span class="hljs-string">"_"</span>)
            .getItem(<span class="hljs-number">0</span>))
        .groupBy(<span class="hljs-string">"department"</span>)
        .agg(F.avg(<span class="hljs-string">"avg_salary"</span>).alias(<span class="hljs-string">"final_avg_salary"</span>))
)
</code></pre>
<p><img src="https://cdn.hashnode.com/res/hashnode/image/upload/v1769464321049/6349c2c3-a0e3-4f9e-be3e-c59639004128.png" alt="6349c2c3-a0e3-4f9e-be3e-c59639004128" class="image--center mx-auto" width="540" height="242" loading="lazy"></p>
<h4 id="heading-when-to-use-salting">When to Use Salting</h4>
<p>Use salting when:</p>
<ul>
<li><p>You observe stage skew (one or few long tasks).</p>
</li>
<li><p>Shuffle read sizes vary drastically between tasks.</p>
</li>
<li><p>The skew originates from a few dominant key values.</p>
</li>
</ul>
<p>Avoid it when:</p>
<ul>
<li><p>The dataset is small (&lt; 1 GB).</p>
</li>
<li><p>You already use partitioning or bucketing keys with uniform distribution.</p>
</li>
</ul>
<p><strong>Alternative approaches:</strong></p>
<div class="hn-table">
<table>
<thead>
<tr>
<td><strong>Technique</strong></td><td><strong>Use Case</strong></td><td><strong>Pros</strong></td><td><strong>Cons</strong></td></tr>
</thead>
<tbody>
<tr>
<td>Salting (manual)</td><td>Skewed joins/aggregations</td><td>Full control</td><td>Requires extra logic to merge</td></tr>
<tr>
<td>Skew join hints (/*+ SKEWJOIN */)</td><td>Supported joins in Spark 3+</td><td>No extra columns needed</td><td>Works only on joins</td></tr>
<tr>
<td>Broadcast smaller side</td><td>One table ≪ other</td><td>Avoids shuffle on big side</td><td>Limited by broadcast size</td></tr>
<tr>
<td>AQE skew optimization</td><td>Spark 3.0+</td><td>Automatic handling</td><td>Needs AQE enabled</td></tr>
</tbody>
</table>
</div><h4 id="heading-glue-specific-tip">Glue-Specific Tip</h4>
<p>AWS Glue 3.0+ includes Spark 3.x, meaning you can also enable AQE’s built-in skew optimization:</p>
<p><code>spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")</code></p>
<p><code>spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "128m")</code></p>
<p>Spark will automatically detect large shuffle partitions and split them, effectively auto-salting hot keys at runtime. Data skew causes uneven shuffle sizes across tasks and can be detected in the Spark UI or via shuffle read/write metrics. Mitigate heavy-key skew with manual salting (recombined later) or rely on AQE skew join optimization for mild cases, and always validate improvements in the Spark UI SQL tab by checking “Shuffle Read Size.”</p>
<h3 id="heading-scenario-15-sort-efficiently-orderby-vs-sortwithinpartitions">Scenario 15: Sort Efficiently (orderBy vs sortWithinPartitions)</h3>
<p>Most Spark jobs need sorted data at some point – for window functions, for writing ordered files, or for downstream processing. The instinct is to reach for orderBy(). But those instincts cost you a full shuffle every single time.</p>
<h4 id="heading-the-problem-global-sort-when-you-dont-need-it">The Problem: Global Sort When You Don't Need It</h4>
<p>Let's say you want to write employee data partitioned by department, sorted by salary within each department:</p>
<pre><code class="lang-python"><span class="hljs-keyword">from</span> pyspark.sql.functions <span class="hljs-keyword">import</span> col

<span class="hljs-comment"># Naive approach: global sort</span>
df_sorted = df.orderBy(col(<span class="hljs-string">"department"</span>), col(<span class="hljs-string">"salary"</span>).desc())

df_sorted.write.partitionBy(<span class="hljs-string">"department"</span>).parquet(<span class="hljs-string">"s3://output/employees/"</span>)
</code></pre>
<p>This looks reasonable. You're sorting by department and salary, then writing partitioned files. Clean and simple. But here's what Spark actually does:</p>
<p><strong>Simplified Logical Plan:</strong></p>
<pre><code class="lang-python">Sort [department ASC, salary DESC], true

└─ Exchange rangepartitioning(department ASC, salary DESC, <span class="hljs-number">200</span>)

   └─ LogicalRDD [id, firstname, lastname, department, salary, age, hire_date, country]
</code></pre>
<p>That Exchange <code>rangepartitioning</code> is a full shuffle. So Spark:</p>
<ul>
<li><p>Samples the data to determine range boundaries</p>
</li>
<li><p>Redistributes every row across 200 partitions based on sort keys</p>
</li>
<li><p>Sorts each partition locally</p>
</li>
<li><p>Produces globally ordered output</p>
</li>
</ul>
<p>You just shuffled 1 million rows across the cluster to achieve global ordering – even though you're immediately partitioning by department on write, which destroys that global order anyway.</p>
<h4 id="heading-why-this-hurts">Why This Hurts</h4>
<p>Range partitioning for global sort is one of the most expensive shuffles Spark performs:</p>
<ul>
<li><p>Sampling overhead: Spark must scan data twice (once to sample, once to process)</p>
</li>
<li><p>Network transfer: Every row moves to a new executor based on range boundaries</p>
</li>
<li><p>Disk I/O: Shuffle files written and read from disk</p>
</li>
<li><p>Wasted work: Global ordering across departments is meaningless when you partition by department</p>
</li>
</ul>
<p>For 1M rows, this adds 8-12 seconds of pure shuffle overhead.</p>
<h4 id="heading-the-better-approach-sort-locally-within-partitions">The Better Approach: Sort Locally Within Partitions</h4>
<p>If you only need ordering <em>within</em> each department (or within each output partition), use sortWithinPartitions():</p>
<pre><code class="lang-python"><span class="hljs-comment"># Optimized approach: local sort only</span>
df_sorted = df.sortWithinPartitions(col(<span class="hljs-string">"department"</span>), col(<span class="hljs-string">"salary"</span>).desc())
df_sorted.write.partitionBy(<span class="hljs-string">"department"</span>).parquet(<span class="hljs-string">"s3://output/employees/"</span>)
</code></pre>
<p><strong>Simplified Logical Plan:</strong></p>
<pre><code class="lang-python">Sort [department ASC, salary DESC], false

└─ LogicalRDD [id, firstname, lastname, department, salary, age, hire_date, country]
</code></pre>
<ul>
<li><p>No Exchange.</p>
</li>
<li><p>No shuffle.</p>
</li>
<li><p>Just local sorting within existing partitions.</p>
</li>
</ul>
<p>Spark sorts each partition in-place, without moving data across the network. The false flag in the Sort node indicates this is a local sort, not a global one.</p>
<h4 id="heading-real-world-benchmark-aws-glue-9">Real-World Benchmark: AWS Glue</h4>
<p>Let's measure the difference on 1 million employee records: First, will start with Global Sort with orderBy:</p>
<pre><code class="lang-python">print(<span class="hljs-string">"\n--- Testing orderBy() (global sort) ---"</span>)

start = time.time()

df_global = df.orderBy(col(<span class="hljs-string">"department"</span>), col(<span class="hljs-string">"salary"</span>).desc())
df_global.write.mode(<span class="hljs-string">"overwrite"</span>).parquet(<span class="hljs-string">"/tmp/global_sort_output"</span>)

global_time = round(time.time() - start, <span class="hljs-number">2</span>)
print(<span class="hljs-string">f"orderBy() time: <span class="hljs-subst">{global_time}</span>s"</span>)
</code></pre>
<p>Local Sort:</p>
<pre><code class="lang-python">print(<span class="hljs-string">"\n--- Testing sortWithinPartitions() (local sort) ---"</span>)

start = time.time()

df_local = df.sortWithinPartitions(col(<span class="hljs-string">"department"</span>), col(<span class="hljs-string">"salary"</span>).desc())
df_local.write.mode(<span class="hljs-string">"overwrite"</span>).parquet(<span class="hljs-string">"/tmp/local_sort_output"</span>)

local_time = round(time.time() - start, <span class="hljs-number">2</span>)
print(<span class="hljs-string">f"sortWithinPartitions() time: <span class="hljs-subst">{local_time}</span>s"</span>)
</code></pre>
<div class="hn-table">
<table>
<thead>
<tr>
<td><strong>Approach</strong></td><td><strong>Plan Type</strong></td><td><strong>Execution Time (1M rows)</strong></td><td><strong>Observation</strong></td></tr>
</thead>
<tbody>
<tr>
<td>orderBy()</td><td>Exchange rangepartitioning</td><td>10.34 s</td><td>Full shuffle for global sort</td></tr>
<tr>
<td>sortWithinPartitions()</td><td>Local Sort (no Exchange)</td><td>2.18 s</td><td>In-place sorting, no network transfer</td></tr>
</tbody>
</table>
</div><p><strong>Physical Plan Differences:</strong></p>
<p><strong>orderBy() Physical Plan:</strong></p>
<pre><code class="lang-python">*(<span class="hljs-number">2</span>) Sort [department ASC NULLS FIRST, salary DESC NULLS LAST], true, <span class="hljs-number">0</span>

+- Exchange rangepartitioning(department ASC NULLS FIRST, salary DESC NULLS LAST, <span class="hljs-number">200</span>)

   +- *(<span class="hljs-number">1</span>) Project [id, firstname, lastname, department, salary, age, hire_date, country]

      +- *(<span class="hljs-number">1</span>) Scan ExistingRDD[id, firstname, lastname, department, salary, age, hire_date, country]
</code></pre>
<p>The Exchange rangepartitioning node marks the shuffle boundary. Spark must:</p>
<ul>
<li><p>Sample data to determine range splits</p>
</li>
<li><p>Redistribute all rows across executors</p>
</li>
<li><p>Sort within each range partition</p>
</li>
</ul>
<p><strong>sortWithinPartitions() Physical Plan:</strong></p>
<pre><code class="lang-python">*(<span class="hljs-number">1</span>) Sort [department ASC NULLS FIRST, salary DESC NULLS LAST], false, <span class="hljs-number">0</span>

+- *(<span class="hljs-number">1</span>) Project [id, firstname, lastname, department, salary, age, hire_date, country]

   +- *(<span class="hljs-number">1</span>) Scan ExistingRDD[id, firstname, lastname, department, salary, age, hire_date, country]
</code></pre>
<p>No Exchange. The false flag in Sort indicates local sorting only. Each partition is sorted independently, in parallel, without any data movement.</p>
<p><strong>When to Use Which:</strong></p>
<div class="hn-table">
<table>
<thead>
<tr>
<td><strong>Use Case</strong></td><td><strong>Method</strong></td><td><strong>Why</strong></td></tr>
</thead>
<tbody>
<tr>
<td>Writing partitioned files (Parquet, Delta)</td><td>sortWithinPartitions()</td><td>Partition-level order is sufficient; global order wasted</td></tr>
<tr>
<td>Window functions with ROWS BETWEEN</td><td>sortWithinPartitions()</td><td>Only need order within each window partition</td></tr>
<tr>
<td>Top-N per group (rank, dense_rank)</td><td>sortWithinPartitions()</td><td>Ranking is local to each partition key</td></tr>
<tr>
<td>Final output must be globally ordered</td><td>orderBy()</td><td>Need total order across all partitions</td></tr>
<tr>
<td>Downstream system requires strict ordering</td><td>orderBy()</td><td>For example, time-series data for sequential processing</td></tr>
<tr>
<td>Sorting before coalesce() for fewer output files</td><td>sortWithinPartitions()</td><td>Maintains order within merged partitions</td></tr>
</tbody>
</table>
</div><h4 id="heading-common-anti-pattern">Common Anti-Pattern</h4>
<pre><code class="lang-python">df.orderBy(<span class="hljs-string">"department"</span>, <span class="hljs-string">"salary"</span>) \
  .write.partitionBy(<span class="hljs-string">"department"</span>) \
  .parquet(<span class="hljs-string">"output/"</span>)
</code></pre>
<p><strong>Problem:</strong> You're globally sorting by department, then immediately partitioning by department. The global order is destroyed during partitioning.</p>
<p>Here’s the fix:</p>
<pre><code class="lang-python">df.sortWithinPartitions(<span class="hljs-string">"department"</span>, <span class="hljs-string">"salary"</span>) \
  .write.partitionBy(<span class="hljs-string">"department"</span>) \
  .parquet(<span class="hljs-string">"output/"</span>)
</code></pre>
<p>Or even better, if you're partitioning by department anyway:</p>
<pre><code class="lang-python"><span class="hljs-comment"># Best: let partitioning handle distribution</span>
df.write.partitionBy(<span class="hljs-string">"department"</span>) \
    .sortBy(<span class="hljs-string">"salary"</span>) \
    .parquet(<span class="hljs-string">"output/"</span>)
</code></pre>
<p>orderBy() triggers an expensive full shuffle using range partitioning, while sortWithinPartitions() sorts data locally without a shuffle and is often 4–5× faster. Use it when writing partitioned files, computing window functions with partitionBy(), or when order is needed only within groups, and reserve orderBy() strictly for true global ordering, because in most production ETL, the best sort is the one that doesn’t shuffle.</p>
<h2 id="heading-conclusion">Conclusion</h2>
<p>You began this handbook likely wondering why your Spark application was slow, and now you see that the answer was both clear and not so clear: your problem was never your Spark application, your configuration, or your version of Spark. It was your plan all along.</p>
<p>You now understand that Spark runs plans, not code, that transformation order affects logical plans, that shuffles generate stages and are key to runtime performance, and that examining your physical plans allows you to directly link your application performance issues back to your problematic line of code.</p>
<p>And you’ve seen this pattern repeat across many scenarios: problem, plan, solution, improved plan, and so forth, until optimization feels less like a dark art and more like a certainty.</p>
<p>This is the Spark optimization mindset: read plans before you write code, and challenge every single Exchange. Engineers who write high-performance Spark jobs minimize shuffles, filter early, project narrowly, deal with skew carefully, and validate everything via explain() and the Spark UI. Once you learn to read the plan, Spark performance becomes mechanical.</p>
 ]]>
                </content:encoded>
            </item>
        
    </channel>
</rss>
