<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:content="http://purl.org/rss/1.0/modules/content/"
    xmlns:atom="http://www.w3.org/2005/Atom" xmlns:media="http://search.yahoo.com/mrss/" version="2.0">
    <channel>
        
        <title>
            <![CDATA[ Sameer Shukla - freeCodeCamp.org ]]>
        </title>
        <description>
            <![CDATA[ Browse thousands of programming tutorials written by experts. Learn Web Development, Data Science, DevOps, Security, and get developer career advice. ]]>
        </description>
        <link>https://www.freecodecamp.org/news/</link>
        <image>
            <url>https://cdn.freecodecamp.org/universal/favicons/favicon.png</url>
            <title>
                <![CDATA[ Sameer Shukla - freeCodeCamp.org ]]>
            </title>
            <link>https://www.freecodecamp.org/news/</link>
        </image>
        <generator>Eleventy</generator>
        <lastBuildDate>Fri, 08 May 2026 22:31:29 +0000</lastBuildDate>
        <atom:link href="https://www.freecodecamp.org/news/author/sshukla/rss.xml" rel="self" type="application/rss+xml" />
        <ttl>60</ttl>
        
            <item>
                <title>
                    <![CDATA[ How to Optimize PySpark Jobs: Real-World Scenarios for Understanding Logical Plans ]]>
                </title>
                <description>
                    <![CDATA[ In the world of big data, performance isn't just about bigger clusters – it's about smarter code. Spark is deceptively simple to write but notoriously difficult to optimize, because what you write isn't what Spark executes. Between your transformatio... ]]>
                </description>
                <link>https://www.freecodecamp.org/news/how-to-optimize-pyspark-jobs-handbook/</link>
                <guid isPermaLink="false">69851d7be613661950e00d8f</guid>
                
                    <category>
                        <![CDATA[ General Programming ]]>
                    </category>
                
                    <category>
                        <![CDATA[ data-engineering ]]>
                    </category>
                
                    <category>
                        <![CDATA[ data analysis ]]>
                    </category>
                
                    <category>
                        <![CDATA[ spark ]]>
                    </category>
                
                    <category>
                        <![CDATA[ Python ]]>
                    </category>
                
                    <category>
                        <![CDATA[ PySpark ]]>
                    </category>
                
                    <category>
                        <![CDATA[ AWS Glue ]]>
                    </category>
                
                    <category>
                        <![CDATA[ handbook ]]>
                    </category>
                
                <dc:creator>
                    <![CDATA[ Sameer Shukla ]]>
                </dc:creator>
                <pubDate>Thu, 05 Feb 2026 22:45:15 +0000</pubDate>
                <media:content url="https://cdn.hashnode.com/res/hashnode/image/upload/v1770331493095/d569e168-d3ba-40e0-a500-7f682bbef693.png" medium="image" />
                <content:encoded>
                    <![CDATA[ <p>In the world of big data, performance isn't just about bigger clusters – it's about smarter code. Spark is deceptively simple to write but notoriously difficult to optimize, because what you write isn't what Spark executes. Between your transformations and actual computation lies an invisible translation layer – the logical plan – that determines whether your job runs in minutes or hours.</p>
<p>Most engineers never look at this layer, which is why they spend days tuning configurations that don't address the real problem: inefficient transformations that generate bloated plans.</p>
<p>This handbook teaches you to read, interpret, and control those plans, transforming you from someone who writes PySpark code into someone who architects efficient data pipelines with precision and confidence.</p>
<h2 id="heading-table-of-contents">Table of Contents</h2>
<ol>
<li><p><a class="post-section-overview" href="#heading-background-information">Background Information</a></p>
</li>
<li><p><a class="post-section-overview" href="#heading-chapter-1-the-spark-mindset-why-plans-matter">Chapter 1: The Spark Mindset: Why Plans Matter</a></p>
</li>
<li><p><a class="post-section-overview" href="#heading-chapter-2-understanding-the-spark-execution-flow">Chapter 2: Understanding the Spark Execution Flow</a></p>
</li>
<li><p><a class="post-section-overview" href="#heading-chapter-3-reading-and-debugging-plans-like-a-pro">Chapter 3: Reading and Debugging Plans Like a Pro</a></p>
</li>
<li><p><a class="post-section-overview" href="#heading-chapter-4-writing-efficient-transformations">Chapter 4: Writing Efficient Transformations</a></p>
<ul>
<li><p><a class="post-section-overview" href="#heading-scenario-1-rename-in-one-pass-withcolumnrenamed-vs-todf">Scenario 1: Rename in One Pass: withColumnRenamed() vs toDF()</a></p>
</li>
<li><p><a class="post-section-overview" href="#heading-scenario-2-reusing-expressions">Scenario 2: Reusing expressions</a></p>
</li>
<li><p><a class="post-section-overview" href="#heading-scenario-3-batch-column-ops">Scenario 3: Batch column ops</a></p>
</li>
<li><p><a class="post-section-overview" href="#heading-scenario-4-early-filter-vs-late-filter">Scenario 4: Early Filter vs Late Filter</a></p>
</li>
<li><p><a class="post-section-overview" href="#heading-scenario-5-column-pruning">Scenario 5: Column Pruning</a></p>
</li>
<li><p><a class="post-section-overview" href="#heading-scenario-6-filter-pushdown-vs-full-scan">Scenario 6: Filter pushdown vs full scan</a></p>
</li>
<li><p><a class="post-section-overview" href="#heading-scenario-7-de-duplicate-right">Scenario 7: De-duplicate right</a></p>
</li>
<li><p><a class="post-section-overview" href="#heading-scenario-8-count-smarter">Scenario 8: Count Smarter</a></p>
</li>
<li><p><a class="post-section-overview" href="#heading-scenario-9-window-wisely">Scenario 9: Window wisely</a></p>
</li>
<li><p><a class="post-section-overview" href="#heading-scenario-10-incremental-aggregations-with-cache-and-persist">Scenario 10: Incremental Aggregations with Cache and Persist</a></p>
</li>
<li><p><a class="post-section-overview" href="#heading-scenario-11-reduce-shuffles">Scenario 11: Reduce shuffles</a></p>
</li>
<li><p><a class="post-section-overview" href="#heading-scenario-12-know-your-shuffle-triggers">Scenario 12: Know Your Shuffle Triggers</a></p>
</li>
<li><p><a class="post-section-overview" href="#heading-scenario-13-tune-parallelism-shuffle-partitions-amp-aqe">Scenario 13: Tune Parallelism: Shuffle Partitions &amp; AQE</a></p>
</li>
<li><p><a class="post-section-overview" href="#heading-scenario-14-handle-skew-smartly">Scenario 14: Handle Skew Smartly</a></p>
</li>
<li><p><a class="post-section-overview" href="#heading-scenario-15-sort-efficiently-orderby-vs-sortwithinpartitions">Scenario 15: Sort Efficiently (orderBy vs sortWithinPartitions)</a></p>
</li>
</ul>
</li>
<li><p><a class="post-section-overview" href="#heading-conclusion">Conclusion</a></p>
</li>
</ol>
<h2 id="heading-background-information">Background Information</h2>
<h3 id="heading-what-this-handbook-is-really-about">What This Handbook is Really About</h3>
<p>This is not a tutorial about Spark internals, cluster tuning, or PySpark syntax or APIs.</p>
<p>This is a handbook about writing PySpark code that generates efficient logical plans.</p>
<p>Because when your code produces clean, optimized plans, Spark pushes filters correctly, shuffles reduce instead of multiply, projections stay shallow, and the DAG (<a target="_blank" href="https://en.wikipedia.org/wiki/Directed_acyclic_graph">Directed Acyclic Graph</a>) becomes predictable, lean, and fast.</p>
<p>When your code produces messy plans, Spark shuffles more than necessary, and projects pile up into deep, expensive stacks. Filters arrive late instead of early, joins explode into wide, slow operations, and the DAG becomes tangled and expensive.</p>
<p>The difference between a fast job and a slow job is not “faster hardware.” It’s the structure of the plan Spark generates from your code. This handbook teaches you to shape that plan deliberately through scenarios.</p>
<h3 id="heading-who-this-handbook-is-for">Who This Handbook Is For</h3>
<p>This handbook is written for:</p>
<ul>
<li><p><strong>Data engineers</strong> building production ETL pipelines who want to move beyond trial-and-error tuning and understand <em>why</em> jobs perform the way they do</p>
</li>
<li><p><strong>Analytics engineers</strong> working with large datasets in Databricks, EMR, or Glue who need to optimize Spark jobs but don't have time for thousand-page reference manuals</p>
</li>
<li><p><strong>Data scientists</strong> transitioning from pandas to PySpark who find themselves writing code that technically runs but takes forever</p>
</li>
<li><p><strong>Anyone</strong> who has stared at the Spark UI, seen mysterious "Exchange" nodes in the DAG, and wondered, <em>"Why is this shuffling so much data?"</em></p>
</li>
</ul>
<p>You should already be comfortable writing basic PySpark code , creating DataFrames, applying transformations, running aggregations. This handbookbook won't teach you Spark syntax. Instead, it teaches you how to write transformations that work <em>with</em> the optimizer, not against it.</p>
<h3 id="heading-how-this-handbook-is-structured">How This Handbook Is Structured</h3>
<p>We’ll start with foundations, then move on to real-world scenarios.</p>
<p>Chapters 1-3 build your mental model. You'll learn what logical plans are, how they connect to physical execution, and how to read the plan output that Spark shows you. These chapters are short and focused – just enough theory to make the practical scenarios meaningful.</p>
<p>Chapter 4 is the heart of the handbook. It contains 15 real-world scenarios, organized by category. Each scenario shows you a common performance problem, explains what's happening in the logical plan, and demonstrates the better approach. You'll see before-and-after code, plan comparisons, and clear explanations of why one approach outperforms another.</p>
<h3 id="heading-what-youll-learn">What You'll Learn</h3>
<p>By the end of this handbook, you'll be able to:</p>
<ul>
<li><p>Read and interpret Spark's logical, optimized, and physical plans</p>
</li>
<li><p>Identify expensive operations before running your code</p>
</li>
<li><p>Restructure transformations to minimize shuffles</p>
</li>
<li><p>Choose the right join strategies for your data</p>
</li>
<li><p>Avoid common pitfalls that cause memory issues and slow performance</p>
</li>
<li><p>Debug production issues by examining execution plans</p>
</li>
</ul>
<p>More importantly, you'll develop a Spark mindset, an intuition for how your code translates to cluster operations. You'll stop writing code that "should work" and start writing code that you <em>know</em> will work efficiently.</p>
<h3 id="heading-technical-prerequisites">Technical Prerequisites</h3>
<p>I assume that you’re familiar with the following concepts before proceeding:</p>
<ol>
<li><p>Python fundamentals</p>
</li>
<li><p>PySpark basics</p>
<ul>
<li><p>Creating DataFrames and reading data from files</p>
</li>
<li><p>Basic DataFrame operations: select, filter, withColumn, groupBy, join</p>
</li>
<li><p>Writing DataFrames back to storage</p>
</li>
</ul>
</li>
<li><p>Basic Spark concepts</p>
<ul>
<li><p>Basic understanding of Spark applications, jobs, stages, and tasks</p>
</li>
<li><p>Basic understanding of the difference between transformations and actions</p>
</li>
<li><p>Understanding. of partitions and shuffles</p>
</li>
</ul>
</li>
<li><p>AWS Glue (Good to have)</p>
</li>
</ol>
<h2 id="heading-chapter-1-the-spark-mindset-why-plans-matter">Chapter 1: The Spark Mindset: Why Plans Matter</h2>
<p>This chapter isn’t about Spark theory or internals. It’s about understanding Spark Plans, and seeing Spark the way the engine sees your code. Once you understand how Spark builds and optimizes a logical plan, optimization stops being trial and error and becomes intentional engineering.</p>
<p>Behind every simple transformation, Spark quietly redraws its internal blueprint. Every transformation you write from "<em>withColumn</em>" to join changes that plan. When the plan is efficient, Spark flies, but when it’s messy, Spark crawls.</p>
<h3 id="heading-the-invisible-layer-behind-every-transformation">The Invisible Layer Behind Every Transformation</h3>
<p>When you write PySpark code, it feels like you’re chaining operations step by step. In reality, Spark isn’t executing those lines. It’s quietly building a blueprint, a logical plan describing <em>what</em> to do, not <em>how</em>.</p>
<p>Once this plan is built, the Catalyst Optimizer analyzes it, rearranges operations, eliminates redundancies, and produces an optimized plan. Catalyst is Spark’s query optimization engine.</p>
<p>Every DataFrame or SQL operation we write, such as select, filter, join, groupBy, is first converted into a logical plan. Catalyst then analyzes and transforms this plan using a set of rule-based optimizations, such as predicate pushdown, column pruning, constant folding, and join reordering. The result is an optimized logical plan, which Spark later converts into a physical execution plan. Finally, Spark translates that into a physical plan of what your cluster actually runs. This invisible planning layer decides the job’s performance more than any configuration setting.</p>
<h3 id="heading-from-logical-to-optimized-to-physical-plans">From Logical to Optimized to Physical Plans</h3>
<p>When you run <code>df.explain(True)</code>, Spark actually shows you four stages of reasoning:</p>
<h4 id="heading-1-logical-plan">1. Logical Plan</h4>
<p>The logical plan is the first stage where the initial translation of the code results in a tree structure that shows what operations need to happen, without worrying about how to execute them efficiently. It’s a blueprint of the query’s logic before any optimization or physical planning occurs.</p>
<p>This:</p>
<pre><code class="lang-python">df.filter(col(<span class="hljs-string">'age'</span>) &gt; <span class="hljs-number">25</span>) \
  .select(<span class="hljs-string">'firstname'</span>, <span class="hljs-string">'country'</span>) \
  .groupby(<span class="hljs-string">'country'</span>) \
  .count() \
  .explain(<span class="hljs-literal">True</span>)
</code></pre>
<p>results in the following logical plan:</p>
<pre><code class="lang-python">== Parsed Logical Plan ==
<span class="hljs-string">'Aggregate ['</span>country], [<span class="hljs-string">'country, '</span>count(<span class="hljs-number">1</span>) AS count<span class="hljs-comment">#108]</span>
+- Project [firstname<span class="hljs-comment">#95, country#97]</span>
   +- Filter (age<span class="hljs-comment">#96L &gt; cast(25 as bigint))</span>
      +- LogicalRDD [firstname<span class="hljs-comment">#95, age#96L, country#97], false</span>
</code></pre>
<h4 id="heading-2-analyzed-logical-plan">2. Analyzed Logical Plan</h4>
<p>The analyzed logical plan is the second stage in Spark’s query optimization. In this stage, Spark validates the query by checking if tables and columns actually exist in the Catalog and resolving all references. It converts all the unresolved logical plans into a resolved one with correct data types and column bindings before optimization.</p>
<h4 id="heading-3-optimized-logical-plan">3. Optimized Logical Plan</h4>
<p>The optimized logical plan is where Spark's Catalyst optimizer improves the logical plan by applying smart rules like filtering data early, removing unnecessary columns, and combining operations to reduce computation. It's the smarter, more efficient version of your original plan that will execute faster and use fewer resources.</p>
<p>Let’s understand using a simple code example:</p>
<pre><code class="lang-python">df.select(<span class="hljs-string">'firstname'</span>, <span class="hljs-string">'country'</span>) \
  .groupby(<span class="hljs-string">'country'</span>) \
  .count() \
  .filter(col(<span class="hljs-string">'country'</span>) == <span class="hljs-string">'USA'</span>) \
  .explain(<span class="hljs-literal">True</span>)
</code></pre>
<p>Here’s the parsed logical plan:</p>
<pre><code class="lang-python">== Parsed Logical Plan ==
<span class="hljs-string">'Filter '</span>`=`(<span class="hljs-string">'country, USA)
+- Aggregate [country#97], [country#97, count(1) AS count#122L]
   +- Project [firstname#95, country#97]
      +- LogicalRDD [firstname#95, age#96L, country#97], false</span>
</code></pre>
<p>What this means:</p>
<ul>
<li><p>Spark first projects firstname and country</p>
</li>
<li><p>Then aggregates by country</p>
</li>
<li><p>Then applies the filter country = 'USA' <strong>after</strong> aggregation</p>
</li>
</ul>
<p>(because that’s how you wrote it).</p>
<p>Here’s the optimized logical plan:</p>
<pre><code class="lang-python">== Optimized Logical Plan ==
Aggregate [country<span class="hljs-comment">#97], [country#97, count(1) AS count#122L]</span>
+- Project [country<span class="hljs-comment">#97]</span>
   +- Filter (isnotnull(country<span class="hljs-comment">#97) AND (country#97 = USA))</span>
      +- LogicalRDD [firstname<span class="hljs-comment">#95, age#96L, country#97], false</span>
</code></pre>
<p>Key improvements Catalyst applied:</p>
<ul>
<li><p>Filter pushdown: The filter country = 'USA' is pushed below the aggregation, so Spark only groups U.S. rows.</p>
</li>
<li><p>Column pruning: “firstname” is automatically removed because it’s never used in the final output.</p>
</li>
<li><p>Cleaner projection: Intermediate columns are dropped early, reducing I/O and in-memory footprint.</p>
</li>
</ul>
<h4 id="heading-4-physical-plan">4. Physical Plan</h4>
<p>The physical plan is Spark's final execution blueprint that shows exactly how the query will run: which specific algorithms to use, how to distribute work across machines, and the order of low-level operations. It's the concrete, executable version of the optimized logical plan, translated into actual Spark operations like “ShuffleExchange”, “HashAggregate”, and “FileScan” that will run on your cluster.</p>
<p>Catalyst may, for example:</p>
<ul>
<li><p>Fold constants (col("x") * 1 → col("x"))</p>
</li>
<li><p>Push filters closer to the data source</p>
</li>
<li><p>Replace a regular join with a broadcast join when data fits in memory</p>
</li>
</ul>
<p>Once the physical plan is finalized, Spark’s scheduler converts it into a DAG of stages and tasks that run across the cluster. Understanding that lineage, from your code → plan → DAG, is what separates fast jobs from slow ones.</p>
<h3 id="heading-how-to-read-a-logical-plan">How to Read a Logical Plan</h3>
<p>A logical plan prints as a tree: the bottom is your data source, and each higher node represents a transformation.</p>
<div class="hn-table">
<table>
<thead>
<tr>
<td><strong>Node</strong></td><td><strong>Meaning</strong></td></tr>
</thead>
<tbody>
<tr>
<td>Relation / LogicalRDD</td><td>Data source, the initial DataFrame</td></tr>
<tr>
<td>Project</td><td>Column selection and transformation (select, withColumn)</td></tr>
<tr>
<td>Filter</td><td>Row filtering based on conditions (where, filter)</td></tr>
<tr>
<td>Join</td><td>Combining two DataFrames (join, union)</td></tr>
<tr>
<td>Aggregate</td><td>GroupBy and aggregation operations (groupBy, agg)</td></tr>
<tr>
<td>Exchange</td><td>Shuffle operation (data redistribution across partitions)</td></tr>
<tr>
<td>Sort</td><td>Ordering data (orderBy, sort)</td></tr>
</tbody>
</table>
</div><p>Each node represents a transformation. Execution flows from the bottom up. Let's understand with a basic example:</p>
<pre><code class="lang-python"><span class="hljs-keyword">from</span> pyspark.sql <span class="hljs-keyword">import</span> SparkSession
<span class="hljs-keyword">from</span> pyspark.sql.functions <span class="hljs-keyword">import</span> *
<span class="hljs-keyword">from</span> pyspark.sql.types <span class="hljs-keyword">import</span> *

spark = SparkSession.builder.appName(<span class="hljs-string">"Practice"</span>).getOrCreate()

employees_data = [
    (<span class="hljs-number">1</span>, <span class="hljs-string">"John"</span>, <span class="hljs-string">"Doe"</span>, <span class="hljs-string">"Engineering"</span>, <span class="hljs-number">80000</span>, <span class="hljs-number">28</span>, <span class="hljs-string">"2020-01-15"</span>, <span class="hljs-string">"USA"</span>),
    (<span class="hljs-number">2</span>, <span class="hljs-string">"Jane"</span>, <span class="hljs-string">"Smith"</span>, <span class="hljs-string">"Engineering"</span>, <span class="hljs-number">85000</span>, <span class="hljs-number">32</span>, <span class="hljs-string">"2019-03-20"</span>, <span class="hljs-string">"USA"</span>),
    (<span class="hljs-number">3</span>, <span class="hljs-string">"Alice"</span>, <span class="hljs-string">"Johnson"</span>, <span class="hljs-string">"Sales"</span>, <span class="hljs-number">60000</span>, <span class="hljs-number">25</span>, <span class="hljs-string">"2021-06-10"</span>, <span class="hljs-string">"UK"</span>),
    (<span class="hljs-number">4</span>, <span class="hljs-string">"Bob"</span>, <span class="hljs-string">"Brown"</span>, <span class="hljs-string">"Engineering"</span>, <span class="hljs-number">90000</span>, <span class="hljs-number">35</span>, <span class="hljs-string">"2018-07-01"</span>, <span class="hljs-string">"USA"</span>),
    (<span class="hljs-number">5</span>, <span class="hljs-string">"Charlie"</span>, <span class="hljs-string">"Wilson"</span>, <span class="hljs-string">"Sales"</span>, <span class="hljs-number">65000</span>, <span class="hljs-number">29</span>, <span class="hljs-string">"2020-11-05"</span>, <span class="hljs-string">"UK"</span>),
    (<span class="hljs-number">6</span>, <span class="hljs-string">"David"</span>, <span class="hljs-string">"Lee"</span>, <span class="hljs-string">"HR"</span>, <span class="hljs-number">55000</span>, <span class="hljs-number">27</span>, <span class="hljs-string">"2021-01-20"</span>, <span class="hljs-string">"USA"</span>),
    (<span class="hljs-number">7</span>, <span class="hljs-string">"Eve"</span>, <span class="hljs-string">"Davis"</span>, <span class="hljs-string">"Engineering"</span>, <span class="hljs-number">95000</span>, <span class="hljs-number">40</span>, <span class="hljs-string">"2017-04-12"</span>, <span class="hljs-string">"Canada"</span>),
    (<span class="hljs-number">8</span>, <span class="hljs-string">"Frank"</span>, <span class="hljs-string">"Miller"</span>, <span class="hljs-string">"Sales"</span>, <span class="hljs-number">70000</span>, <span class="hljs-number">33</span>, <span class="hljs-string">"2019-09-25"</span>, <span class="hljs-string">"UK"</span>),
    (<span class="hljs-number">9</span>, <span class="hljs-string">"Grace"</span>, <span class="hljs-string">"Taylor"</span>, <span class="hljs-string">"HR"</span>, <span class="hljs-number">58000</span>, <span class="hljs-number">26</span>, <span class="hljs-string">"2021-08-15"</span>, <span class="hljs-string">"Canada"</span>),
    (<span class="hljs-number">10</span>, <span class="hljs-string">"Henry"</span>, <span class="hljs-string">"Anderson"</span>, <span class="hljs-string">"Engineering"</span>, <span class="hljs-number">88000</span>, <span class="hljs-number">31</span>, <span class="hljs-string">"2020-02-28"</span>, <span class="hljs-string">"USA"</span>)
]

df = spark.createDataFrame(employees_data,  
    [<span class="hljs-string">"id"</span>, <span class="hljs-string">"firstname"</span>, <span class="hljs-string">"lastname"</span>, <span class="hljs-string">"department"</span>, <span class="hljs-string">"salary"</span>, <span class="hljs-string">"age"</span>, <span class="hljs-string">"hire_date"</span>, <span class="hljs-string">"country"</span>])
</code></pre>
<h4 id="heading-version-a-withcolumn-filter">Version A: withColumn → filter</h4>
<p>In this version, we’re using a derived column "withColumn" and then applying a filter to the dataset. This ordering is logically correct and produces the expected result: it shows how introducing derived columns early affects the logical plan. This example shows what happens when Spark is asked to compute a new column before any rows are eliminated.</p>
<pre><code class="lang-python">df_filtered = df \
.withColumn(<span class="hljs-string">'bonus'</span>, col(<span class="hljs-string">'salary'</span>) * <span class="hljs-number">82</span>) \
.filter(col(<span class="hljs-string">'age'</span>) &gt; <span class="hljs-number">35</span>) \
.explain(<span class="hljs-literal">True</span>)
</code></pre>
<h4 id="heading-parsed-logical-plan-simplified">Parsed Logical Plan (Simplified)</h4>
<pre><code class="lang-python">Filter (age &gt; <span class="hljs-number">35</span>)
└─ Project [*, (salary * <span class="hljs-number">82</span>) AS bonus]
   └─ LogicalRDD [id, firstname, lastname, department, salary, age, hire_date, country]
</code></pre>
<p>So what’s going on here? Execution flows from the bottom up.</p>
<ul>
<li><p>Spark first reads the LogicalRDD.</p>
</li>
<li><p>Then applies the Project node, keeping all columns and adding bonus.</p>
</li>
<li><p>Finally, the Filter removes rows where age ≤ 35.</p>
</li>
</ul>
<p>This means Spark computes the bonus for every employee, even those who are later filtered out. It's harmless here, but costly on millions of rows, more computation, more I/O, more shuffle volume.</p>
<h4 id="heading-version-b-filter-project">Version B: Filter → Project</h4>
<p>In this version, we apply the filter before introducing the derived column. The idea is to show how pushing row-reducing operations earlier allows Catalyst to produce a leaner logical plan. Compared to Version A, this example demonstrates that the same logic, written in a different order, can significantly reduce the amount of work Spark needs to perform.</p>
<pre><code class="lang-python">df_filtered = df \
.filter(col(<span class="hljs-string">'age'</span>) &gt; <span class="hljs-number">35</span>) \
.withColumn(<span class="hljs-string">'bonus'</span>, col(<span class="hljs-string">'salary'</span>) * <span class="hljs-number">82</span>) \
.explain(<span class="hljs-literal">True</span>)
</code></pre>
<h4 id="heading-parsed-logical-plan-simplified-1">Parsed Logical Plan (Simplified)</h4>
<pre><code class="lang-python">Project [*, (salary * <span class="hljs-number">82</span>) AS bonus]

└─ Filter (age &gt; <span class="hljs-number">35</span>)

└─ LogicalRDD [id, firstname, lastname, department, salary, age, hire_date, country]
</code></pre>
<p>So what’s going on here?</p>
<ul>
<li><p>Spark starts from the LogicalRDD.</p>
</li>
<li><p>It immediately applies the Filter, reducing the dataset to only employees with age &gt; 35.</p>
</li>
<li><p>Then the Project node adds the derived column bonus for this smaller subset.</p>
</li>
</ul>
<p>Now the Filter sits below the Project in the plan, cutting data movement and minimizing computation. Spark prunes data first, then derives new columns. This order reduces both the volume of data processed and the amount transferred, leading to a lighter and faster plan.</p>
<h3 id="heading-why-you-should-look-at-the-plan-every-time-by-running-dfexplaintrue">Why You Should Look at the Plan Every Time by running <code>df.explain(True)</code></h3>
<p>This is the quickest way to spot performance issues <em>before</em> they hit production. It shows:</p>
<ul>
<li><p>Whether filters sit in the right place.</p>
</li>
<li><p>How many Project nodes exist (each adds overhead).</p>
</li>
<li><p>Where Exchange nodes appear (these are shuffle boundaries).</p>
</li>
<li><p>If Catalyst pushed filters or rewrote joins as expected.</p>
</li>
</ul>
<p>A quick <code>explain()</code> takes seconds, while debugging a bad shuffle in production takes hours. Run <code>explain()</code> whenever you add or reorder transformations. The plan never lies.</p>
<h4 id="heading-what-spark-does-under-the-hood">What Spark Does Under the Hood</h4>
<p>Catalyst can sometimes reorder simple filters automatically, but once you use UDFs, nested logic, or joins, it often can’t. That’s why the best habit is to write transformations in a way that already makes sense to the optimizer. Filter early, avoid redundant projections, and keep plans as shallow as possible.</p>
<p>Optimizing Spark isn’t about tuning cluster configs – it’s about writing code that yields efficient plans. If your plan shows late filters, too many projections, or multiple Exchange nodes, it’s already explaining why your job will run slow.</p>
<h2 id="heading-chapter-2-understanding-the-spark-execution-flow">Chapter 2: Understanding the Spark Execution Flow</h2>
<p>In Chapter 1, you learned how Spark interprets your transformations into logical plans – blueprints of what the job intends to do.</p>
<p>But Spark doesn't stop there. It must translate those plans into distributed actions across a cluster of executors, coordinate data movement, and handle any failures that may occur.</p>
<p>This chapter reveals what happens when that plan leaves the driver: how Spark breaks your job into stages, tasks, and a directed acyclic graph (DAG) that actually runs.</p>
<p>By the end, you’ll understand why some operations shuffle terabytes while others fly, and how to predict it before execution begins.</p>
<h3 id="heading-from-plans-to-stages-to-tasks">From Plans to Stages to Tasks</h3>
<p>A Spark job evolves through three conceptual layers:</p>
<div class="hn-table">
<table>
<thead>
<tr>
<td><strong>Layer</strong></td><td><strong>What It Represents</strong></td><td><strong>Example View</strong></td></tr>
</thead>
<tbody>
<tr>
<td>Plan</td><td>The optimized logical + physical representation of your query</td><td>Read → Filter → Join → Aggregate</td></tr>
<tr>
<td>Stage</td><td>A contiguous set of operations that can run without shuffling data</td><td>“Map Stage” or “Reduce Stage”</td></tr>
<tr>
<td>Task</td><td>The smallest unit of work, one per partition per stage</td><td>“Process Partition 7 of Stage 3”</td></tr>
</tbody>
</table>
</div><h4 id="heading-the-execution-trigger-actions-vs-transformations">The Execution Trigger: Actions vs Transformations</h4>
<p>Here's the critical distinction that determines when execution actually begins:</p>
<pre><code class="lang-python">df1 = spark.paraquet(<span class="hljs-string">"data.paraquet"</span>)
df2 = spark.filter(col(<span class="hljs-string">"age"</span>) &gt; <span class="hljs-number">25</span>)
df3 = spark.groupby(<span class="hljs-string">"city"</span>).count()
</code></pre>
<p>Nothing executes yet! Spark just builds up the logical plan, adding each transformation as a node in the plan tree. No data is read, no filters run, no shuffles happen.</p>
<h4 id="heading-actions-trigger-execution">Actions Trigger Execution</h4>
<p>Spark transformations are lazy. When a sequence of DataFrame operations is defined, a logical plan is created, but no computation takes place. It’s only when Spark encounters an action, an operation that needs a result to be returned to the driver or written out, that execution takes place.</p>
<p>For example:</p>
<pre><code class="lang-python">result = df3.collect()
</code></pre>
<p>At this stage, Spark materializes the logical plan, applies optimizations, creates a physical plan, and executes the job. Until Spark is asked to <strong>act</strong>, such as collect(), count(), or write(), it’s just describing what it needs to do – but it’s not actually doing it.</p>
<h4 id="heading-the-complete-execution-flow">The Complete Execution Flow</h4>
<p>Spark execution is initiated after the execution of an operation such as collect(). The driver then sends the optimized physical plan to the SparkContext, which is then forwarded to the DAG Scheduler. The physical plan is analyzed to determine shuffle boundaries created by wide operations such as <em>groupBy</em> or <em>orderBy</em>.</p>
<p>The plan is then divided into stages that contain narrow operations. These stages are sent to the Task Scheduler as a TaskSet. Each stage has a single task per partition.</p>
<p>The tasks are then assigned to the cores of the executor based on data locality. The execution of the tasks is then initiated. The execution of the stages is initiated after the completion of the previous stage. The final stage is initiated after the completion of the previous stage. The results of the final stage are then returned to the driver or stored.</p>
<h4 id="heading-what-triggers-a-shuffle">What Triggers a Shuffle</h4>
<p><img src="https://cdn.hashnode.com/res/hashnode/image/upload/v1769457412199/308bc894-66a9-4c01-aae1-9ae42e64d32c.png" alt="Comparison of Spark shuffle behavior before and after groupBy" class="image--center mx-auto" width="600" height="400" loading="lazy"></p>
<p>A shuffle occurs when Spark needs to redistribute data across partitions, typically because the operation requires grouping, joining, or repartitioning data in a way that can’t be done locally within existing partitions.</p>
<p>Common shuffle triggers:</p>
<div class="hn-table">
<table>
<thead>
<tr>
<td><strong>Operation</strong></td><td><strong>Why it Shuffles</strong></td></tr>
</thead>
<tbody>
<tr>
<td>groupBy(), reduceByKey()</td><td>Data with the same key must co-locate for aggregation</td></tr>
<tr>
<td>join()</td><td>Matching keys may reside in different partitions</td></tr>
<tr>
<td>orderBy() / sort()</td><td>Requires global ordering across all partitions</td></tr>
<tr>
<td>distinct()</td><td>Needs comparison of all values across partitions</td></tr>
<tr>
<td>repartition(n)</td><td>Explicit redistribution to a new number of partitions</td></tr>
</tbody>
</table>
</div><pre><code class="lang-python">df.groupBy(<span class="hljs-string">"user_id”) \
  .agg(sum("</span>amount<span class="hljs-string">"))</span>
</code></pre>
<p>In Stage 1 (Map), each task performs a partial aggregation on its partition and writes a shuffle file to disk. During the shuffle, each executor retrieves these files across the network such that all records with the same hash(user_id) % numPartitions are colocated.</p>
<p>In Stage 2 (Reduce), each task performs a final aggregation on its partitioned data and writes back to disk. Because Spark has tracked this process as a DAG, a failed task can re-read only the affected shuffle files instead of re-computing the entire DAG.</p>
<p>In practice, a healthy job has 2-6 stages. Seeing 20+ stages for such simple logic usually means unnecessary shuffles or bad partitioning.</p>
<h4 id="heading-why-shuffles-create-stage-boundaries">Why Shuffles Create Stage Boundaries</h4>
<p>Shuffles force data to move across the network between executors. Spark cannot continue processing until:</p>
<ul>
<li><p>All tasks in the current stage write their shuffle output to disk</p>
</li>
<li><p>The shuffle data is available for the next stage to read over the network</p>
</li>
</ul>
<p>This dependency creates a natural boundary – so a new stage begins after every shuffle. The DAG Scheduler uses these boundaries to determine where stages must wait for previous stages to complete.</p>
<h4 id="heading-common-performance-bottlenecks">Common Performance Bottlenecks</h4>
<div class="hn-table">
<table>
<thead>
<tr>
<td><strong>Bottleneck Type</strong></td><td><strong>Symptom</strong></td><td><strong>Solution</strong></td></tr>
</thead>
<tbody>
<tr>
<td>Data skew</td><td>Few tasks run much longer</td><td>Use salting, split hot keys, or AQE skew join</td></tr>
<tr>
<td>Small files</td><td>Too many tasks, high overhead</td><td>Coalesce or repartition after read</td></tr>
<tr>
<td>Large shuffle</td><td>High network I/O, spill to disk</td><td>Filter early, broadcast small tables, reduce cardinality</td></tr>
<tr>
<td>Unnecessary stages</td><td>Extra Exchange nodes in plan</td><td>Combine operations, remove redundant repartitions</td></tr>
<tr>
<td>Inefficient file formats</td><td>Slow reads, no predicate pushdown</td><td>Use Parquet or ORC with partitioning</td></tr>
<tr>
<td>Complex data types</td><td>Serialization overhead, large objects</td><td>Use simple types, cache in serialized form</td></tr>
</tbody>
</table>
</div><p>Let’s ground this with a small but realistic pattern using the same employees DataFrame. <strong>Goal:</strong> average salary per department and country, only for employees older than 30.</p>
<p>Naïve approach:</p>
<pre><code class="lang-python"><span class="hljs-keyword">from</span> pyspark.sql.functions <span class="hljs-keyword">import</span> col, when, avg

df_dept_country = df.select(<span class="hljs-string">"department"</span>, <span class="hljs-string">"country"</span>).distinct()

df_result = (
    df.withColumn(
        <span class="hljs-string">"age_group"</span>,
        when(col(<span class="hljs-string">"age"</span>) &lt; <span class="hljs-number">30</span>, <span class="hljs-string">"junior"</span>)
        .when(col(<span class="hljs-string">"age"</span>) &lt; <span class="hljs-number">40</span>, <span class="hljs-string">"mid"</span>)
        .otherwise(<span class="hljs-string">"senior"</span>)
    )
    .join(df_dept_country, [<span class="hljs-string">"department"</span>], <span class="hljs-string">"inner"</span>)
    .groupBy(<span class="hljs-string">"department"</span>, <span class="hljs-string">"country"</span>)
    .agg(avg(<span class="hljs-string">"salary"</span>).alias(<span class="hljs-string">"avg_salary"</span>))
</code></pre>
<p>This looks harmless, but:</p>
<ul>
<li><p>The join on "department" introduces a wide dependency → shuffle #1.</p>
</li>
<li><p>The groupBy("department", "country") introduces another wide dependency → shuffle #2.</p>
</li>
</ul>
<p>So we have two shuffles for what should be a simple aggregation. If you run explain on the df_result, you’ll see two exchange nodes, each marking a shuffle and stage boundary.</p>
<h4 id="heading-optimized-approach">Optimized Approach</h4>
<p>We can do better by filtering early, broadcasting the small dimension (df_dept_country), and keeping only one global shuffle for aggregation.</p>
<pre><code class="lang-python"><span class="hljs-keyword">from</span> pyspark.sql.functions <span class="hljs-keyword">import</span> broadcast

df_dept_country = df.select(<span class="hljs-string">"department"</span>, <span class="hljs-string">"country"</span>).distinct()

df_result_optimized = (
    df.filter(col(<span class="hljs-string">"age"</span>) &gt; <span class="hljs-number">30</span>)
        .join(broadcast(df_dept_country), [<span class="hljs-string">"department"</span>], <span class="hljs-string">"inner"</span>)
        .groupBy(<span class="hljs-string">"department"</span>, <span class="hljs-string">"country"</span>)
        .agg(avg(<span class="hljs-string">"salary"</span>).alias(<span class="hljs-string">"avg_salary"</span>))
)
</code></pre>
<p>What changed:</p>
<ul>
<li><p>filter(col("age") &gt; 30) is narrow and runs before any shuffle.</p>
</li>
<li><p>broadcast(df_dept_country) avoids a shuffle for the join.</p>
</li>
<li><p>Only the groupBy("department", "country") causes a single shuffle.</p>
</li>
</ul>
<p>Now explain shows just one Exchange.</p>
<div class="hn-table">
<table>
<thead>
<tr>
<td><strong>Version</strong></td><td><strong>Shuffles</strong></td><td><strong>Stages</strong></td><td><strong>Notes</strong></td></tr>
</thead>
<tbody>
<tr>
<td>Naïve</td><td>2</td><td>~4 (2 map + 2 reduce)</td><td>Join shuffle + groupBy shuffle = double overhead</td></tr>
<tr>
<td>Optimized</td><td>1</td><td>~2 (1 map + 1 reduce)</td><td>Broadcast join avoids shuffle. Only groupBy shuffles</td></tr>
</tbody>
</table>
</div><h2 id="heading-chapter-3-reading-and-debugging-plans-like-a-pro">Chapter 3: Reading and Debugging Plans Like a Pro</h2>
<p>As explained in Chapter 1, Spark executes transformations based on three levels: the logical plan, the optimized logical plan (Catalyst), and the physical plan. This chapter will expand on this explanation and concentrate on the impact of the logical plan on shuffle and execution performance.</p>
<p>By now, you understand how Spark builds and <em>executes</em> plans. But reading those plans and instantly spotting inefficiencies is the real superpower of a performance-focused data engineer.</p>
<p>Spark’s explain() output isn’t random jargon. It’s a precise log of Spark’s thought process. Once you learn to read it, every optimization becomes obvious.</p>
<h3 id="heading-three-layers-in-spark"><strong>Three Layers in Spark</strong></h3>
<p>As we talked about above, every Spark plan has three key views, printed when you call df.explain(True). Let’s review them now:</p>
<ol>
<li><p>Parsed Logical Plan: The raw intent Spark inferred from your code. It may include unresolved column names or expressions.</p>
</li>
<li><p>Analyzed / Optimized Logical Plan: After Spark applies Catalyst optimizations: constant folding, predicate pushdown, column pruning, and plan rearrangements.</p>
</li>
<li><p>Physical Plan: What your executors actually run: joins, shuffles, exchanges, scans, and code-generated operators.</p>
</li>
</ol>
<p>Each stage narrows the gap between what you <em>asked</em> Spark to do and what Spark decides to do.</p>
<pre><code class="lang-python">df_avg = df.filter(col(<span class="hljs-string">"age"</span>) &gt; <span class="hljs-number">30</span>)
        .groupBy(<span class="hljs-string">"department"</span>)
        .agg(avg(<span class="hljs-string">"salary"</span>).alias(<span class="hljs-string">"avg_salary"</span>))

df_avg.explain(<span class="hljs-literal">True</span>)
</code></pre>
<p><strong>1. Parsed Logical Plan</strong></p>
<pre><code class="lang-python">== Parsed Logical Plan ==
<span class="hljs-string">'Aggregate ['</span>department], [<span class="hljs-string">'department, '</span>avg(<span class="hljs-string">'salary) AS avg_salary#8]
+- Filter (age#5L &gt; cast(30 as bigint))
   +- LogicalRDD [id#0L, firstname#1, lastname#2, department#3, salary#4L, age#5L, hire_date#6, country#7], false</span>
</code></pre>
<p>How to read this</p>
<ul>
<li><p>Bottom → data source (LogicalRDD).</p>
</li>
<li><p>Middle → Filter: Spark hasn’t yet optimized column references.</p>
</li>
<li><p>Top → Aggregate: high-level grouping intent.</p>
</li>
</ul>
<p>At this stage, the plan may include unresolved symbols (like 'department or 'avg('salary)), meaning Spark hasn’t yet validated column existence or data types.</p>
<p><strong>2. Optimized Logical Plan</strong></p>
<pre><code class="lang-python">
== Optimized Logical Plan ==
Aggregate [department<span class="hljs-comment">#3], [department#3, avg(salary#4L) AS avg_salary#8]</span>
+- Project [department<span class="hljs-comment">#3, salary#4L]</span>
   +- Filter (isnotnull(age<span class="hljs-comment">#5L) AND (age#5L &gt; 30))</span>
      +- LogicalRDD [id<span class="hljs-comment">#0L, firstname#1, lastname#2, department#3, salary#4L, age#5L, hire_date#6, country#7], false</span>
</code></pre>
<p>Here, Catalyst has done its job:</p>
<ul>
<li><p>Column IDs (#11, #12L) are resolved.</p>
</li>
<li><p>Unused columns are pruned – no need to carry them forward.</p>
</li>
<li><p>The plan now accurately reflects Spark’s optimized logical intent.</p>
</li>
</ul>
<p>If you ever wonder whether Spark pruned columns or pushed filters, this is the section to check.</p>
<p><strong>3. Physical Plan</strong></p>
<pre><code class="lang-python">== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[department<span class="hljs-comment">#3], functions=[avg(salary#4L)], output=[department#3, avg_salary#8])</span>
   +- Exchange hashpartitioning(department<span class="hljs-comment">#3, 200), ENSURE_REQUIREMENTS, [plan_id=19]</span>
      +- HashAggregate(keys=[department<span class="hljs-comment">#3], functions=[partial_avg(salary#4L)], output=[department#3, sum#20, count#21L])</span>
         +- Project [department<span class="hljs-comment">#3, salary#4L]</span>
            +- Filter (isnotnull(age<span class="hljs-comment">#5L) AND (age#5L &gt; 30))</span>
               +- Scan ExistingRDD[id<span class="hljs-comment">#0L,firstname#1,lastname#2,department#3,salary#4L,age#5L,hire_date#6,country#7]</span>
</code></pre>
<p><strong>Breakdown</strong></p>
<ul>
<li><p>Scan ExistingRDD → Spark reading from the in-memory DataFrame.</p>
</li>
<li><p>Filter → narrow transformation, no shuffle.</p>
</li>
<li><p>HashAggregate → partial aggregation per partition.</p>
</li>
<li><p>Exchange → wide dependency: data is shuffled by department.</p>
</li>
<li><p>Top HashAggregate → final aggregation after shuffle.</p>
</li>
</ul>
<p>This structure – partial agg → shuffle → final agg – is Spark’s default two-phase aggregation pattern.</p>
<h4 id="heading-recognizing-common-nodes">Recognizing Common Nodes</h4>
<div class="hn-table">
<table>
<thead>
<tr>
<td><strong>Node / Operator</strong></td><td><strong>Meaning</strong></td><td><strong>Optimization Hint</strong></td></tr>
</thead>
<tbody>
<tr>
<td>Project</td><td>Column selection or computed fields</td><td>Combine multiple withColumn() into one select()</td></tr>
<tr>
<td>Filter</td><td>Predicate on rows</td><td>Push filters as low as possible in the plan</td></tr>
<tr>
<td>Join</td><td>Combine two DataFrames</td><td>Broadcast smaller side if &lt; 10 MB</td></tr>
<tr>
<td>Aggregate</td><td>GroupBy, sum, avg, count</td><td>Filter before aggregating to reduce cardinality</td></tr>
<tr>
<td>Exchange</td><td>Shuffle / data redistribution</td><td>Minimize by filtering early, using broadcast join</td></tr>
<tr>
<td>Sort</td><td>OrderBy, sort</td><td>Avoid global sorts; use within partitions if possible</td></tr>
<tr>
<td>Window</td><td>Windowed analytics (row_number, rank)</td><td>Partition on selective keys to reduce shuffle</td></tr>
</tbody>
</table>
</div><p>Repeated invocations of withColumn stack multiple Project nodes, which increases the plan depth. Instead, combine these invocations using select.</p>
<p>Multiple Exchange nodes imply repeated data shuffles. You can eliminate these by broadcasting the data or filtering.</p>
<p>Multiple scans of the same table within a single operation imply that some caching of strategic intermediates is lacking.</p>
<p>And frequent SortMergeJoin operations imply that Spark is unnecessarily sorting and shuffling the data. You can eliminate these by broadcasting the smaller dataframe or bucketing.</p>
<h4 id="heading-debugging-strategy-read-plans-from-top-to-bottom">Debugging Strategy: Read Plans from Top to Bottom</h4>
<p>Remember: Spark <em>executes</em> plans from bottom up (from data source to final result). But when you're debugging, you read from the top down (from the output schema back to the root cause). This reversal is intentional: you start with what's wrong at the output level, then trace backward through the plan to find where the inefficiency was introduced.</p>
<p>When debugging a slow job:</p>
<ul>
<li><p>Start at the top: Identify output schema and major operators (HashAggregate, Join, and so on).</p>
</li>
<li><p>Scroll for Exchanges: Count them. Each = stage boundary. Ask “Why do I need this shuffle?”</p>
</li>
<li><p>Trace backward: See if filters or projections appear below or above joins.</p>
</li>
<li><p>Look for duplication: Same scan twice? Missing cache? Re-derived columns?</p>
</li>
<li><p>Check join strategy: If it’s SortMergeJoin but one table is small, force a broadcast().</p>
</li>
<li><p>Re-run explain after optimization: You should literally see the extra nodes disappear.</p>
</li>
</ul>
<h4 id="heading-catalyst-optimizer-in-action">Catalyst Optimizer in Action</h4>
<p>Catalyst applies dozens of rules automatically. Knowing a few helps you interpret what changed:</p>
<div class="hn-table">
<table>
<thead>
<tr>
<td><strong>Optimization Rule</strong></td><td><strong>Example Transformation</strong></td></tr>
</thead>
<tbody>
<tr>
<td>Predicate Pushdown</td><td>Moves filters below joins/scans</td></tr>
<tr>
<td>Constant Folding</td><td>Replaces salary * 1 with salary</td></tr>
<tr>
<td>Column Pruning</td><td>Drops unused columns early</td></tr>
<tr>
<td>Combine Filters</td><td>Merges consecutive filters into one</td></tr>
<tr>
<td>Simplify Casts</td><td>Removes redundant type casts</td></tr>
<tr>
<td>Reorder Joins / Join Reordering</td><td>Changes join order for cheaper plan</td></tr>
</tbody>
</table>
</div><p>Putting it all together: every plan tells a story:</p>
<p><img src="https://cdn.hashnode.com/res/hashnode/image/upload/v1769458525411/64fa30a4-b16e-4aed-8c04-d12b476d9ae6.png" alt="Spark Plans and Stages" class="image--center mx-auto" width="600" height="400" loading="lazy"></p>
<p>As you progress through the practical scenarios in Chapter 4, read every plan before and after. Your goal isn't memorization – it's intuition.</p>
<h2 id="heading-chapter-4-writing-efficient-transformations">Chapter 4: Writing Efficient Transformations</h2>
<p>Every Spark job tells a story, not in code, but in plans. By now, you've seen how Spark interprets transformations (Chapter 1), how it executes them through stages and tasks (Chapter 2), and how to read plans like a detective (Chapter 3). Now comes the part where you apply that knowledge: writing transformations that yield efficient logical plans.</p>
<p>This chapter is the heart of the handbook. It's where we move from understanding Spark's mind to writing code that speaks its language fluently.</p>
<h3 id="heading-why-transformations-matter">Why Transformations Matter</h3>
<p>In PySpark, most performance issues don’t start in clusters or configurations. They start in transformations: the way we chain, filter, rename, or join data. Every transformation reshapes the logical plan, influencing how Spark optimizes, when it shuffles, and whether the final DAG is streamlined or tangled.</p>
<p>A good transformation sequence:</p>
<ul>
<li><p>Keeps plans shallow, not nested.</p>
</li>
<li><p>Applies filters early, not after computation.</p>
</li>
<li><p>Reduces data movement, not just data size.</p>
</li>
<li><p>Let’s Catalyst and AQE optimize freely, without user-induced constraints.</p>
</li>
</ul>
<p>A bad one can double runtime, and you won't see it in your code, only in your plan.</p>
<h3 id="heading-the-goal-of-this-chapter">The Goal of this Chapter</h3>
<p>We’ll explore a series of real-world optimization scenarios, drawn from production ETL and analytical pipelines, each showing how a small change in code can completely reshape the logical plan and execution behavior.</p>
<p>Each scenario is practical and short, following a consistent structure. By the end of this chapter, you’ll be able to <em>see</em> optimization opportunities the moment you write code, because you’ll know exactly how they alter the logical plan beneath.</p>
<h3 id="heading-before-you-dive-in">Before You Dive In:</h3>
<p>Open a Spark shell or notebook. Load your familiar employees DataFrame. Run every example, and compare the explain("formatted") output before and after the fix. Because in this chapter, performance isn’t about more theory, it’s about seeing the difference in the plan and feeling the difference in execution time.</p>
<h3 id="heading-scenario-1-rename-in-one-pass-withcolumnrenamed-vs-todf">Scenario 1: Rename in One Pass: withColumnRenamed() vs toDF()</h3>
<p>If you’ve worked with PySpark DataFrames, you’ve probably had to rename columns, either by calling withColumnRenamed() repeatedly or by using toDF() in one shot.</p>
<p>At first glance, both approaches produce identical results: the columns have the new names you wanted. But beneath the surface, Spark treats them very differently – and that difference shows up directly in your logical plan.</p>
<pre><code class="lang-python">df_renamed = (df.withColumnRenamed(<span class="hljs-string">"id"</span>, <span class="hljs-string">"emp_id"</span>)
    .withColumnRenamed(<span class="hljs-string">"firstname"</span>, <span class="hljs-string">"first_name"</span>)
    .withColumnRenamed(<span class="hljs-string">"lastname"</span>, <span class="hljs-string">"last_name"</span>)
    .withColumnRenamed(<span class="hljs-string">"department"</span>, <span class="hljs-string">"dept"</span>)
    .withColumnRenamed(<span class="hljs-string">"salary"</span>, <span class="hljs-string">"base_salary"</span>)
    .withColumnRenamed(<span class="hljs-string">"age"</span>, <span class="hljs-string">"age_years"</span>)
    .withColumnRenamed(<span class="hljs-string">"hire_date"</span>, <span class="hljs-string">"hired_on"</span>)
    .withColumnRenamed(<span class="hljs-string">"country"</span>, <span class="hljs-string">"country_code"</span>)
)
</code></pre>
<p>This is simple and readable. But Spark builds the plan step by step, adding one Project node for every rename. Each Project node copies all existing columns, plus the newly renamed one. In large schemas (hundreds of columns), this silently bloats the plan.</p>
<h4 id="heading-logical-plan-impact">Logical Plan Impact:</h4>
<pre><code class="lang-python">Project [emp_id, first_name, last_name, dept, base_salary, age_years, hired_on, country_code]

└─ Project [id, first_name, last_name, dept, base_salary, age_years, hired_on, country_code]

└─ Project [id, firstname, last_name, dept, base_salary, age_years, hired_on, country_code]

└─ Project [id, firstname, lastname, dept, base_salary, age_years, hire_date, country_code]

└─ Project [id, firstname, lastname, department, base_salary, age_years, hire_date, country]

└─ Project [id, firstname, lastname, department, salary, age_years, hire_date, country]

└─ Project [id, firstname, lastname, department, salary, age, hire_date, country]

└─ LogicalRDD [id, firstname, lastname, department, salary, age, hire_date, country]
</code></pre>
<p>Each rename adds a new Project layer, deepening the DAG. Spark now has to materialize intermediate projections before applying the next one. You can see this by running: <em>df.explain(True).</em></p>
<h4 id="heading-the-better-approach-rename-once-with-todf">The Better Approach: Rename Once with toDF()</h4>
<p>Instead of chaining multiple renames, rename all columns in a single pass:</p>
<pre><code class="lang-python">new_cols = [<span class="hljs-string">"id"</span>, <span class="hljs-string">"first_name"</span>, <span class="hljs-string">"last_name"</span>, <span class="hljs-string">"department"</span>,
            <span class="hljs-string">"salary"</span>, <span class="hljs-string">"age"</span>, <span class="hljs-string">"hired_on"</span>, <span class="hljs-string">"country"</span>]

df_renamed = df.toDF(*new_cols)
</code></pre>
<h4 id="heading-logical-plan-impact-1">Logical Plan Impact:</h4>
<pre><code class="lang-python">Project [id, first_name, last_name, department, salary, age, hired_on, country]

└─ LogicalRDD [id, firstname, lastname, department, salary, age, hire_date, country]
</code></pre>
<p>Now there’s just one Project node, which means one projection over the source data. This gives us a flatter, more efficient plan.</p>
<h4 id="heading-under-the-hood-what-spark-actually-does">Under the Hood: What Spark Actually Does</h4>
<p>Every time you call withColumnRenamed(), Spark rewrites the entire projection list. Catalyst treats the rename as a full column re-selection from the previous node, not as a light-weight alias update. When you chain several renames, Catalyst duplicates internal column metadata for each intermediate step.</p>
<p>By contrast, toDF() rebases the schema in a single action. Catalyst interprets it as a single schema rebinding, so no redundant metadata trees are created.</p>
<h4 id="heading-real-world-timing-glue-job-benchmark">Real-World Timing: Glue Job Benchmark</h4>
<p>To see if chained withColumnRenamed calls add real overhead, here's a simple timing test performed on a Glue job using a DataFrame with 1M rows. First using withColumnRenamed:</p>
<pre><code class="lang-python"><span class="hljs-keyword">import</span> time
<span class="hljs-keyword">from</span> pyspark.sql <span class="hljs-keyword">import</span> SparkSession

spark = SparkSession.builder.appName(<span class="hljs-string">"MillionRowsRenameTest"</span>).getOrCreate()

employees_data = [
    (<span class="hljs-number">1</span>, <span class="hljs-string">"John"</span>, <span class="hljs-string">"Doe"</span>, <span class="hljs-string">"Engineering"</span>, <span class="hljs-number">80000</span>, <span class="hljs-number">28</span>, <span class="hljs-string">"2020-01-15"</span>, <span class="hljs-string">"USA"</span>),
    (<span class="hljs-number">2</span>, <span class="hljs-string">"Jane"</span>, <span class="hljs-string">"Smith"</span>, <span class="hljs-string">"Engineering"</span>, <span class="hljs-number">85000</span>, <span class="hljs-number">32</span>, <span class="hljs-string">"2019-03-20"</span>, <span class="hljs-string">"USA"</span>),
    (<span class="hljs-number">3</span>, <span class="hljs-string">"Alice"</span>, <span class="hljs-string">"Johnson"</span>, <span class="hljs-string">"Sales"</span>, <span class="hljs-number">60000</span>, <span class="hljs-number">25</span>, <span class="hljs-string">"2021-06-10"</span>, <span class="hljs-string">"UK"</span>),
    (<span class="hljs-number">4</span>, <span class="hljs-string">"Bob"</span>, <span class="hljs-string">"Brown"</span>, <span class="hljs-string">"Engineering"</span>, <span class="hljs-number">90000</span>, <span class="hljs-number">35</span>, <span class="hljs-string">"2018-07-01"</span>, <span class="hljs-string">"USA"</span>),
    (<span class="hljs-number">5</span>, <span class="hljs-string">"Charlie"</span>, <span class="hljs-string">"Wilson"</span>, <span class="hljs-string">"Sales"</span>, <span class="hljs-number">65000</span>, <span class="hljs-number">29</span>, <span class="hljs-string">"2020-11-05"</span>, <span class="hljs-string">"UK"</span>),
    (<span class="hljs-number">6</span>, <span class="hljs-string">"David"</span>, <span class="hljs-string">"Lee"</span>, <span class="hljs-string">"HR"</span>, <span class="hljs-number">55000</span>, <span class="hljs-number">27</span>, <span class="hljs-string">"2021-01-20"</span>, <span class="hljs-string">"USA"</span>),
    (<span class="hljs-number">7</span>, <span class="hljs-string">"Eve"</span>, <span class="hljs-string">"Davis"</span>, <span class="hljs-string">"Engineering"</span>, <span class="hljs-number">95000</span>, <span class="hljs-number">40</span>, <span class="hljs-string">"2017-04-12"</span>, <span class="hljs-string">"Canada"</span>),
    (<span class="hljs-number">8</span>, <span class="hljs-string">"Frank"</span>, <span class="hljs-string">"Miller"</span>, <span class="hljs-string">"Sales"</span>, <span class="hljs-number">70000</span>, <span class="hljs-number">33</span>, <span class="hljs-string">"2019-09-25"</span>, <span class="hljs-string">"UK"</span>),
    (<span class="hljs-number">9</span>, <span class="hljs-string">"Grace"</span>, <span class="hljs-string">"Taylor"</span>, <span class="hljs-string">"HR"</span>, <span class="hljs-number">58000</span>, <span class="hljs-number">26</span>, <span class="hljs-string">"2021-08-15"</span>, <span class="hljs-string">"Canada"</span>),
    (<span class="hljs-number">10</span>, <span class="hljs-string">"Henry"</span>, <span class="hljs-string">"Anderson"</span>, <span class="hljs-string">"Engineering"</span>, <span class="hljs-number">88000</span>, <span class="hljs-number">31</span>, <span class="hljs-string">"2020-02-28"</span>, <span class="hljs-string">"USA"</span>)
]

multiplied_data = [(i, <span class="hljs-string">f"firstname_<span class="hljs-subst">{i}</span>"</span>, <span class="hljs-string">f"lastname_<span class="hljs-subst">{i}</span>"</span>,
                    employees_data[i % <span class="hljs-number">10</span>][<span class="hljs-number">3</span>],  <span class="hljs-comment"># department</span>
                    employees_data[i % <span class="hljs-number">10</span>][<span class="hljs-number">4</span>],  <span class="hljs-comment"># salary</span>
                    employees_data[i % <span class="hljs-number">10</span>][<span class="hljs-number">5</span>],  <span class="hljs-comment"># age</span>
                    employees_data[i % <span class="hljs-number">10</span>][<span class="hljs-number">6</span>],  <span class="hljs-comment"># hire_date</span>
                    employees_data[i % <span class="hljs-number">10</span>][<span class="hljs-number">7</span>])  <span class="hljs-comment"># country</span>
                   <span class="hljs-keyword">for</span> i <span class="hljs-keyword">in</span> range(<span class="hljs-number">1</span>, <span class="hljs-number">1</span>_000_001)]

df = spark.createDataFrame(multiplied_data,
                           [<span class="hljs-string">"id"</span>, <span class="hljs-string">"firstname"</span>, <span class="hljs-string">"lastname"</span>, <span class="hljs-string">"department"</span>, <span class="hljs-string">"salary"</span>, <span class="hljs-string">"age"</span>, <span class="hljs-string">"hire_date"</span>, <span class="hljs-string">"country"</span>])

start = time.time()
df1 = (df
       .withColumnRenamed(<span class="hljs-string">"firstname"</span>, <span class="hljs-string">"first_name"</span>)
       .withColumnRenamed(<span class="hljs-string">"lastname"</span>, <span class="hljs-string">"last_name"</span>)
       .withColumnRenamed(<span class="hljs-string">"department"</span>, <span class="hljs-string">"dept_name"</span>)
       .withColumnRenamed(<span class="hljs-string">"salary"</span>, <span class="hljs-string">"annual_salary"</span>)
       .withColumnRenamed(<span class="hljs-string">"age"</span>, <span class="hljs-string">"emp_age"</span>)
       .withColumnRenamed(<span class="hljs-string">"hire_date"</span>, <span class="hljs-string">"hired_on"</span>)
       .withColumnRenamed(<span class="hljs-string">"country"</span>, <span class="hljs-string">"work_country"</span>))

print(<span class="hljs-string">"withColumnRenamed Count:"</span>, df1.count())
print(<span class="hljs-string">"withColumnRenamed time:"</span>, round(time.time() - start, <span class="hljs-number">2</span>), <span class="hljs-string">"seconds"</span>)
</code></pre>
<p>Using toDF:</p>
<pre><code class="lang-python">start = time.time()
df2 = df.toDF(<span class="hljs-string">"id"</span>, <span class="hljs-string">"first_name"</span>, <span class="hljs-string">"last_name"</span>, <span class="hljs-string">"dept_name"</span>, <span class="hljs-string">"annual_salary"</span>, <span class="hljs-string">"emp_age"</span>, <span class="hljs-string">"hired_on"</span>, <span class="hljs-string">"work_country"</span>)
print(<span class="hljs-string">"toDF Count:"</span>, df2.count())
print(<span class="hljs-string">"toDF time:"</span>, round(time.time() - start, <span class="hljs-number">2</span>), <span class="hljs-string">"seconds"</span>)

spark.stop()
</code></pre>
<div class="hn-table">
<table>
<thead>
<tr>
<td><strong>Approach</strong></td><td><strong>Number of Project Nodes</strong></td><td><strong>Glue Execution Time (1M rows)</strong></td><td><strong>Plan Complexity</strong></td></tr>
</thead>
<tbody>
<tr>
<td>Chained withColumnRenamed()</td><td>8 nodes</td><td>~12 seconds</td><td>Deep, nested</td></tr>
<tr>
<td>Single toDF()</td><td>1 node</td><td>~8 seconds</td><td>Flat, simple</td></tr>
</tbody>
</table>
</div><p>The difference becomes important at larger sizes or in complex pipelines, especially on managed runtimes such as AWS Glue (where planning overhead becomes important), or when tens of millions of rows are involved, where each additional Project increases column resolution, metadata work, and DAG height. And since Spark can’t collapse chained projections when column names are changed, renaming all columns in one go with toDF() results in a flatter logical and physical plan: one rename, one projection, and faster execution.</p>
<h3 id="heading-scenario-2-reusing-expressions">Scenario 2: Reusing Expressions</h3>
<p>Sometimes Spark jobs run slower, not because of shuffles or joins, but because the same computation is performed repeatedly within the logical plan. Every time you repeat an expression, say, col("salary") * 0.1 in multiple places, Spark treats it as a <em>new</em> derived column, expanding the logical plan and forcing redundant work.</p>
<h4 id="heading-the-problem-repeated-expressions">The Problem: Repeated Expressions</h4>
<p>Let’s say we’re calculating bonus and total compensation for employees:</p>
<pre><code class="lang-python">df_expr = (
    df.withColumn(<span class="hljs-string">"bonus"</span>, col(<span class="hljs-string">"salary"</span>) * <span class="hljs-number">0.10</span>)
      .withColumn(<span class="hljs-string">"total_comp"</span>, col(<span class="hljs-string">"salary"</span>) + (col(<span class="hljs-string">"salary"</span>) * <span class="hljs-number">0.10</span>))
)
</code></pre>
<p>At first glance, it’s simple enough. But Spark’s optimizer doesn’t automatically know that the (col("salary") * 0.10) in the second column is identical to the one computed in the first. Both get evaluated separately in the logical plan.</p>
<p><strong>Simplified Logical Plan:</strong></p>
<pre><code class="lang-python">Project [id, firstname, lastname, department,

salary, age, hire_date, country,

(salary * <span class="hljs-number">0.10</span>) AS bonus,

(salary + (salary * <span class="hljs-number">0.10</span>)) AS total_comp]

└─ LogicalRDD [id, firstname, lastname, department, salary, age, hire_date, country]
</code></pre>
<p>While this looks compact, Spark must compute (salary * 0.10) twice, once for bonus, again inside total_comp. For a large dataset (say 100 M rows), that’s two full column evaluations. The waste compounds when your expression is complex, imagine parsing JSON, applying UDFs, or running date arithmetic multiple times.</p>
<h4 id="heading-the-better-approach-compute-once-reuse-everywhere">The Better Approach: Compute Once, Reuse Everywhere</h4>
<p>Compute the expression once, store it as a column, and reference it later:</p>
<pre><code class="lang-python">df_expr = (
    df.withColumn(<span class="hljs-string">"bonus"</span>, col(<span class="hljs-string">"salary"</span>) * <span class="hljs-number">0.10</span>)
      .withColumn(<span class="hljs-string">"total_comp"</span>, col(<span class="hljs-string">"salary"</span>) + col(<span class="hljs-string">"bonus"</span>))
)
</code></pre>
<p><strong>Simplified Logical Plan:</strong></p>
<pre><code class="lang-python">Project [id, firstname, lastname, department,

salary, age, hire_date, country,

(salary * <span class="hljs-number">0.10</span>) AS bonus,

(salary + bonus) AS total_comp]

└─ LogicalRDD [id, firstname, lastname, department, salary, age, hire_date, country]
</code></pre>
<p>Now Spark calculates (salary * 0.10) once, stores it in the bonus column, and reuses that column when computing total_comp. This single change cuts CPU cost and memory usage.</p>
<h4 id="heading-under-the-hood-why-repetition-hurts">Under the Hood: Why Repetition Hurts</h4>
<p>Spark’s Catalyst optimizer doesn’t automatically factor out repeated expressions across different columns. Each withColumn() creates a new Project node with its own expression tree. If multiple nodes reuse the same arithmetic or function, Catalyst re-evaluates them independently.</p>
<p>On small DataFrames, this cost is invisible. On wide, computation-heavy jobs (think feature engineering pipelines), it can add hundreds of milliseconds per task.</p>
<p>Each redundant expression increases:</p>
<ul>
<li><p>Catalyst’s internal expression resolution time</p>
</li>
<li><p>The size of generated Java code in WholeStageCodegen</p>
</li>
<li><p>CPU cycles per row, since Spark cannot share intermediate results between columns in the same node</p>
</li>
</ul>
<h4 id="heading-real-world-benchmark-aws-glue">Real-World Benchmark: AWS Glue</h4>
<p>We tested this pattern on AWS Glue (Spark 3.3) with 10 million rows and a simulated expensive computation on the similar dataset we used in Scenario 1.</p>
<pre><code class="lang-python">df = spark.createDataFrame(multiplied_data,
                           [<span class="hljs-string">"id"</span>, <span class="hljs-string">"firstname"</span>, <span class="hljs-string">"lastname"</span>, <span class="hljs-string">"department"</span>, <span class="hljs-string">"salary"</span>, <span class="hljs-string">"age"</span>, <span class="hljs-string">"hire_date"</span>, <span class="hljs-string">"country"</span>])

expr = sqrt(exp(log(col(<span class="hljs-string">"salary"</span>) + <span class="hljs-number">1</span>)))

start = time.time()

df_repeated = (
    df.withColumn(<span class="hljs-string">"metric_a"</span>, expr)
      .withColumn(<span class="hljs-string">"metric_b"</span>, expr * <span class="hljs-number">2</span>)
      .withColumn(<span class="hljs-string">"metric_c"</span>, expr / <span class="hljs-number">10</span>)
)

df_repeated.count()
time_repeated = round(time.time() - start, <span class="hljs-number">2</span>)

start = time.time()

df_reused = (
    df.withColumn(<span class="hljs-string">"metric"</span>, expr)
      .withColumn(<span class="hljs-string">"metric_a"</span>, col(<span class="hljs-string">"metric"</span>))
      .withColumn(<span class="hljs-string">"metric_b"</span>, col(<span class="hljs-string">"metric"</span>) * <span class="hljs-number">2</span>)
      .withColumn(<span class="hljs-string">"metric_c"</span>, col(<span class="hljs-string">"metric"</span>) / <span class="hljs-number">10</span>)
)

df_reused.count()

print(<span class="hljs-string">"Repeated expr time:"</span>, time_repeated, <span class="hljs-string">"seconds"</span>)
print(<span class="hljs-string">"Reused expr time:"</span>, round(time.time() - start, <span class="hljs-number">2</span>), <span class="hljs-string">"seconds"</span>)

spark.stop()
</code></pre>
<div class="hn-table">
<table>
<thead>
<tr>
<td><strong>Approach</strong></td><td><strong>Project Nodes</strong></td><td><strong>Execution Time (10M rows)</strong></td><td><strong>Expression Evaluations</strong></td></tr>
</thead>
<tbody>
<tr>
<td>Repeated expression</td><td>Multiple (nested)</td><td>~18 seconds</td><td>3x per row</td></tr>
<tr>
<td>Compute once, reuse</td><td>Single</td><td>~11 seconds</td><td>1x per row</td></tr>
</tbody>
</table>
</div><p>The performance gap widens further with genuinely expensive expressions (like regex extraction, JSON parsing, or UDFs).</p>
<h4 id="heading-physical-plan-implication">Physical Plan Implication</h4>
<p>In the physical plan, repeated expressions expand into multiple Java blocks within the same WholeStageCodegen node:</p>
<pre><code class="lang-python">*(<span class="hljs-number">1</span>) Project [sqrt(exp(log(salary + <span class="hljs-number">1</span>))) AS metric_a,

(sqrt(exp(log(salary + <span class="hljs-number">1</span>))) * <span class="hljs-number">2</span>) AS metric_b,

(sqrt(exp(log(salary + <span class="hljs-number">1</span>))) / <span class="hljs-number">10</span>) AS metric_c, ...]
</code></pre>
<p>Spark literally embeds three copies of the same logic.</p>
<p>Each is JIT-compiled separately, leading to:</p>
<ul>
<li><p>Larger generated Java classes</p>
</li>
<li><p>Higher CPU utilization</p>
</li>
<li><p>Longer code-generation time before tasks even start</p>
</li>
</ul>
<p>When reusing a column, Spark generates one expression and references it by name, dramatically shrinking the codegen footprint. If you have complex transformations (nested when, UDFs, regex extractions, and so on), compute them once and reuse them with col("alias"). For even heavier expressions that appear across multiple pipelines, consider persisting the intermediate.</p>
<p>DataFrame:</p>
<pre><code class="lang-python">df_features = df.withColumn(<span class="hljs-string">"complex_feature"</span>, complex_logic)

df_features.cache()
</code></pre>
<p>That cache can save multiple recomputations across downstream steps.</p>
<h3 id="heading-scenario-3-batch-column-ops">Scenario 3: Batch Column Ops</h3>
<p>Most PySpark pipelines don’t die because of one big, obvious mistake. They slow down from a thousand tiny cuts: one extra withColumn() here, another there, until the logical plan turns into a tall stack of projections.</p>
<p>On its own, withColumn() is fine. The problem is how we use it:</p>
<ul>
<li><p>10–30 chained calls in a row</p>
</li>
<li><p>Re-deriving similar expressions</p>
</li>
<li><p>Spreading logic across many tiny steps</p>
</li>
</ul>
<p>This scenario shows how batching column operations into a single select() produces a flatter, cleaner logical plan that scales better and is easier to reason about.</p>
<h4 id="heading-the-problem-chaining-withcolumn-forever">The Problem: Chaining withColumn() Forever</h4>
<pre><code class="lang-python"><span class="hljs-keyword">from</span> pyspark.sql.functions <span class="hljs-keyword">import</span> col, concat_ws, when, lit

df_transformed = (
    df.withColumn(<span class="hljs-string">"full_name"</span>, concat_ws(<span class="hljs-string">" "</span>, col(<span class="hljs-string">"firstname"</span>), col(<span class="hljs-string">"lastname"</span>)))
      .withColumn(<span class="hljs-string">"is_senior"</span>, when(col(<span class="hljs-string">"age"</span>) &gt;= <span class="hljs-number">35</span>, lit(<span class="hljs-number">1</span>)).otherwise(lit(<span class="hljs-number">0</span>)))
      .withColumn(<span class="hljs-string">"salary_k"</span>, col(<span class="hljs-string">"salary"</span>) / <span class="hljs-number">1000.0</span>)
      .withColumn(<span class="hljs-string">"experience_band"</span>,
                  when(col(<span class="hljs-string">"age"</span>) &lt; <span class="hljs-number">30</span>, <span class="hljs-string">"junior"</span>)
                  .when((col(<span class="hljs-string">"age"</span>) &gt;= <span class="hljs-number">30</span>) &amp; (col(<span class="hljs-string">"age"</span>) &lt; <span class="hljs-number">40</span>), <span class="hljs-string">"mid"</span>)
                  .otherwise(<span class="hljs-string">"senior"</span>))
      .withColumn(<span class="hljs-string">"country_upper"</span>, col(<span class="hljs-string">"country"</span>).upper())
)
</code></pre>
<p>It reads nicely, it runs, and everyone moves on. But under the hood, Spark builds this as multiple Project nodes, one per withColumn() call.</p>
<p><strong>Simplified Logical Plan (Chained): Conceptually</strong></p>
<pre><code class="lang-python">Project [..., country_upper]

└─ Project [..., experience_band]

   └─ Project [..., salary_k]

      └─ Project [..., is_senior]

         └─ Project [..., full_name]

            └─ LogicalRDD [id, firstname, lastname, department, salary, age, hire_date, country]
</code></pre>
<p>Each layer re-selects all existing columns, adds one more derived column, and deepens the plan.</p>
<h4 id="heading-the-better-approach-batch-with-select">The Better Approach: Batch with select()</h4>
<p>Instead of incrementally patching the schema, build it once.</p>
<pre><code class="lang-python">df_transformed = df.select(
    col(<span class="hljs-string">"id"</span>),
    col(<span class="hljs-string">"firstname"</span>),
    col(<span class="hljs-string">"lastname"</span>),
    col(<span class="hljs-string">"department"</span>),
    col(<span class="hljs-string">"salary"</span>),
    col(<span class="hljs-string">"age"</span>),
    col(<span class="hljs-string">"hire_date"</span>),
    col(<span class="hljs-string">"country"</span>),
    concat_ws(<span class="hljs-string">" "</span>, col(<span class="hljs-string">"firstname"</span>), col(<span class="hljs-string">"lastname"</span>)).alias(<span class="hljs-string">"full_name"</span>),
    when(col(<span class="hljs-string">"age"</span>) &gt;= <span class="hljs-number">35</span>, lit(<span class="hljs-number">1</span>)).otherwise(lit(<span class="hljs-number">0</span>)).alias(<span class="hljs-string">"is_senior"</span>),
    (col(<span class="hljs-string">"salary"</span>) / <span class="hljs-number">1000.0</span>).alias(<span class="hljs-string">"salary_k"</span>),
    when(col(<span class="hljs-string">"age"</span>) &lt; <span class="hljs-number">30</span>, <span class="hljs-string">"junior"</span>)
        .when((col(<span class="hljs-string">"age"</span>) &gt;= <span class="hljs-number">30</span>) &amp; (col(<span class="hljs-string">"age"</span>) &lt; <span class="hljs-number">40</span>), <span class="hljs-string">"mid"</span>)
        .otherwise(<span class="hljs-string">"senior"</span>).alias(<span class="hljs-string">"experience_band"</span>),
    col(<span class="hljs-string">"country"</span>).upper().alias(<span class="hljs-string">"country_upper"</span>)
)
</code></pre>
<p><strong>Simplified Logical Plan (Batched):</strong></p>
<pre><code class="lang-python">Project [id, firstname, lastname, department, salary, age, hire_date, country,

         full_name, is_senior, salary_k, experience_band, country_upper]

└─ LogicalRDD [id, firstname, lastname, department, salary, age, hire_date, country]
</code></pre>
<p>One Project. All derived columns <em>are</em> defined together. Flatter DAG. Cleaner plan.</p>
<h4 id="heading-under-the-hood-why-this-matters">Under the Hood: Why This Matters</h4>
<p>Each withColumn() is syntactic sugar for: “Take the previous plan, and create a new Project on top of it.” So 10 withColumn() calls = 10 projections wrapped on top of each other.</p>
<p>Catalyst can sometimes collapse adjacent Project nodes, but:</p>
<ul>
<li><p>Not always (especially when aliases shadow each other).</p>
</li>
<li><p>Not when expressions become complex or interdependent.</p>
</li>
<li><p>Not when UDFs or analysis barriers appear.</p>
</li>
</ul>
<p>Batching with select():</p>
<ul>
<li><p>Gives Catalyst a single, complete view of all expressions.</p>
</li>
<li><p>Enables more aggressive optimizations (constant folding, expression reuse, pruning).</p>
</li>
<li><p>Keeps expression trees shallower and codegen output smaller.</p>
</li>
</ul>
<p>Think of it as the difference between editing a sentence 10 times in a row and writing the final sentence once, cleanly.</p>
<h4 id="heading-real-world-example-using-the-employees-df-at-scale">Real-World Example: Using the Employees DF at Scale:</h4>
<p>Chained version (many withColumn()):</p>
<pre><code class="lang-python"><span class="hljs-keyword">from</span> pyspark.sql.functions <span class="hljs-keyword">import</span> col, concat_ws, when, lit, upper
<span class="hljs-keyword">import</span> time

start = time.time()
df_chain = (
    df.withColumn(<span class="hljs-string">"full_name"</span>, concat_ws(<span class="hljs-string">" "</span>, col(<span class="hljs-string">"firstname"</span>), col(<span class="hljs-string">"lastname"</span>)))
      .withColumn(<span class="hljs-string">"is_senior"</span>, when(col(<span class="hljs-string">"age"</span>) &gt;= <span class="hljs-number">35</span>, <span class="hljs-number">1</span>).otherwise(<span class="hljs-number">0</span>))
      .withColumn(<span class="hljs-string">"salary_k"</span>, col(<span class="hljs-string">"salary"</span>) / <span class="hljs-number">1000.0</span>)
      .withColumn(<span class="hljs-string">"high_earner"</span>, when(col(<span class="hljs-string">"salary"</span>) &gt;= <span class="hljs-number">90000</span>, <span class="hljs-number">1</span>).otherwise(<span class="hljs-number">0</span>))
      .withColumn(<span class="hljs-string">"experience_band"</span>,
                  when(col(<span class="hljs-string">"age"</span>) &lt; <span class="hljs-number">30</span>, <span class="hljs-string">"junior"</span>)
                  .when((col(<span class="hljs-string">"age"</span>) &gt;= <span class="hljs-number">30</span>) &amp; (col(<span class="hljs-string">"age"</span>) &lt; <span class="hljs-number">40</span>), <span class="hljs-string">"mid"</span>)
                  .otherwise(<span class="hljs-string">"senior"</span>))
      .withColumn(<span class="hljs-string">"country_upper"</span>, upper(col(<span class="hljs-string">"country"</span>)))
)

df_chain.count()
time_chain = round(time.time() - start, <span class="hljs-number">2</span>)
</code></pre>
<p>Batched version (single select()):</p>
<pre><code class="lang-python">start = time.time()
df_batch = df.select(
    <span class="hljs-string">"id"</span>, <span class="hljs-string">"firstname"</span>, <span class="hljs-string">"lastname"</span>, <span class="hljs-string">"department"</span>, <span class="hljs-string">"salary"</span>, <span class="hljs-string">"age"</span>, <span class="hljs-string">"hire_date"</span>, <span class="hljs-string">"country"</span>,
    concat_ws(<span class="hljs-string">" "</span>, col(<span class="hljs-string">"firstname"</span>), col(<span class="hljs-string">"lastname"</span>)).alias(<span class="hljs-string">"full_name"</span>),
    when(col(<span class="hljs-string">"age"</span>) &gt;= <span class="hljs-number">35</span>, <span class="hljs-number">1</span>).otherwise(<span class="hljs-number">0</span>).alias(<span class="hljs-string">"is_senior"</span>),
    (col(<span class="hljs-string">"salary"</span>) / <span class="hljs-number">1000.0</span>).alias(<span class="hljs-string">"salary_k"</span>),
    when(col(<span class="hljs-string">"salary"</span>) &gt;= <span class="hljs-number">90000</span>, <span class="hljs-number">1</span>).otherwise(<span class="hljs-number">0</span>).alias(<span class="hljs-string">"high_earner"</span>),
    when(col(<span class="hljs-string">"age"</span>) &lt; <span class="hljs-number">30</span>, <span class="hljs-string">"junior"</span>)
        .when((col(<span class="hljs-string">"age"</span>) &gt;= <span class="hljs-number">30</span>) &amp; (col(<span class="hljs-string">"age"</span>) &lt; <span class="hljs-number">40</span>), <span class="hljs-string">"mid"</span>)
        .otherwise(<span class="hljs-string">"senior"</span>).alias(<span class="hljs-string">"experience_band"</span>),
    upper(col(<span class="hljs-string">"country"</span>)).alias(<span class="hljs-string">"country_upper"</span>)
)

df_batch.count()
time_batch = round(time.time() - start, <span class="hljs-number">2</span>)
</code></pre>
<div class="hn-table">
<table>
<thead>
<tr>
<td><strong>Approach</strong></td><td><strong>Logical Shape</strong></td><td><strong>Glue Execution Time (1M rows)</strong></td><td><strong>Notes</strong></td></tr>
</thead>
<tbody>
<tr>
<td>Chained withColumn()</td><td>6 nested Projects</td><td>~14 seconds</td><td>Deep plan, more Catalyst work</td></tr>
<tr>
<td>Single select()</td><td>1 Project</td><td>~9 seconds</td><td>Flat planning, cleaner DAG</td></tr>
</tbody>
</table>
</div><p>The distinction is most evident when there are more derived columns, more complex expressions (UDFs, window functions), or when executing on managed runtimes such as AWS Glue.</p>
<p>In the chained cases, there are more Project nodes, code generation is fragmented, and expression evaluation is less amenable to global optimization.</p>
<p>In the batched cases, Spark generates a single Project node, more work is consolidated into a single WholeStageCodegen pipeline, code generation is reduced, the JVM is less stressed, and the plan is flatter and more amenable to optimization. This is not only cleaner, but it’s also faster, more reliable, and friendlier to Spark’s optimizer.</p>
<h3 id="heading-scenario-4-early-filter-vs-late-filter">Scenario 4: Early Filter vs Late Filter</h3>
<p>Many pipelines apply transformations first, adding columns, joining datasets, or calculating derived metrics, before filtering records. That order looks harmless in code but can double or triple the workload at execution.</p>
<h4 id="heading-problem-late-filtering">Problem: Late Filtering</h4>
<pre><code class="lang-python">df_late = (
    df.withColumn(<span class="hljs-string">"bonus"</span>, col(<span class="hljs-string">"salary"</span>) * <span class="hljs-number">0.1</span>)
      .withColumn(<span class="hljs-string">"salary_k"</span>, col(<span class="hljs-string">"salary"</span>) / <span class="hljs-number">1000</span>)
      .filter(col(<span class="hljs-string">"age"</span>) &gt; <span class="hljs-number">35</span>)
)
</code></pre>
<p>This means Spark first computes all columns for every employee, then discards most rows.</p>
<p><strong>Simplified Logical Plan:</strong></p>
<pre><code class="lang-python">Filter (age &gt; <span class="hljs-number">35</span>)

└─ Project [id, firstname, lastname, department, salary, age, hire_date, country,

            (salary * <span class="hljs-number">0.1</span>) AS bonus,

            (salary / <span class="hljs-number">1000</span>) AS salary_k]

   └─ LogicalRDD [...]
</code></pre>
<p>Catalyst can sometimes reorder this automatically, but when it can't (due to UDFs or complex logic), you're doing unnecessary work on data that's thrown away.</p>
<h4 id="heading-better-approach-early-filtering">Better Approach: Early Filtering</h4>
<pre><code class="lang-python">df_early = (
    df.filter(col(<span class="hljs-string">"age"</span>) &gt; <span class="hljs-number">35</span>)
      .withColumn(<span class="hljs-string">"bonus"</span>, col(<span class="hljs-string">"salary"</span>) * <span class="hljs-number">0.1</span>)
      .withColumn(<span class="hljs-string">"salary_k"</span>, col(<span class="hljs-string">"salary"</span>) / <span class="hljs-number">1000</span>)
)
</code></pre>
<p><strong>Simplified Logical Plan:</strong></p>
<pre><code class="lang-python">Project [id, firstname, lastname, department, salary, age, hire_date, country,

         (salary * <span class="hljs-number">0.1</span>) AS bonus,

         (salary / <span class="hljs-number">1000</span>) AS salary_k]

└─ Filter (age &gt; <span class="hljs-number">35</span>)

   └─ LogicalRDD [...]
</code></pre>
<p>Now Spark prunes the dataset first, then applies transformations. The result: smaller intermediate data, less codegen, shorter logical plan, shorter DAG, and smaller shuffle footprint.</p>
<h4 id="heading-real-world-benchmark-aws-glue-1">Real-World Benchmark: AWS Glue</h4>
<p>Late Filtering:</p>
<pre><code class="lang-python">df = spark.createDataFrame(
    multiplied_data,
    [<span class="hljs-string">"id"</span>, <span class="hljs-string">"firstname"</span>, <span class="hljs-string">"lastname"</span>, <span class="hljs-string">"department"</span>, <span class="hljs-string">"salary"</span>, <span class="hljs-string">"age"</span>, <span class="hljs-string">"hire_date"</span>, <span class="hljs-string">"country"</span>]
)

start_late = time.time()

df_late = (
    df.withColumn(<span class="hljs-string">"bonus"</span>, col(<span class="hljs-string">"salary"</span>) * <span class="hljs-number">0.1</span>)
      .withColumn(<span class="hljs-string">"salary_k"</span>, col(<span class="hljs-string">"salary"</span>) / <span class="hljs-number">1000</span>)
      .filter(col(<span class="hljs-string">"age"</span>) &gt; <span class="hljs-number">35</span>)   
)

df_late.count()
time_late = round(time.time() - start_late, <span class="hljs-number">2</span>)
</code></pre>
<p>Early Filtering:</p>
<pre><code class="lang-python">start_early = time.time()

df_early = (
    df.filter(col(<span class="hljs-string">"age"</span>) &gt; <span class="hljs-number">35</span>)    
      .withColumn(<span class="hljs-string">"bonus"</span>, col(<span class="hljs-string">"salary"</span>) * <span class="hljs-number">0.1</span>)
      .withColumn(<span class="hljs-string">"salary_k"</span>, col(<span class="hljs-string">"salary"</span>) / <span class="hljs-number">1000</span>)
)

df_early.count()
time_early = round(time.time() - start_early, <span class="hljs-number">2</span>)

print(<span class="hljs-string">"Late Filter Time:"</span>, time_late, <span class="hljs-string">"seconds"</span>)
print(<span class="hljs-string">"Early Filter Time:"</span>, time_early, <span class="hljs-string">"seconds"</span>)

spark.stop()
</code></pre>
<div class="hn-table">
<table>
<thead>
<tr>
<td><strong>Approach</strong></td><td><strong>Rows Processed Before Filter</strong></td><td><strong>Execution Time (approx)</strong></td><td><strong>Notes</strong></td></tr>
</thead>
<tbody>
<tr>
<td>Late filter</td><td>1,000,000 (all rows)</td><td>~14 seconds</td><td>Computes bonus and salary_k for all rows, then filters</td></tr>
<tr>
<td>Early filter</td><td>300,000 (filtered subset)</td><td>~9 seconds</td><td>Filters first, computes only for age &gt; 35</td></tr>
</tbody>
</table>
</div><p>The early filter approach processes significantly less data before the projection, leading to faster execution and less memory pressure.</p>
<p>Always filter as early as possible, before joins, aggregations, expensive transformations (such as UDFs or window functions), and even during file reads via Parquet/ORC pushdown, since filtering at the source touches fewer partitions and leads to faster jobs.</p>
<h3 id="heading-scenario-5-column-pruning">Scenario 5: Column Pruning</h3>
<p>When working with Spark DataFrames, convenience often wins over correctness and nothing feels more convenient than select("*"). It’s quick, flexible, and perfect for exploration.</p>
<p>But in production pipelines, that little star silently costs CPU, memory, network bandwidth, and runtime efficiency. Every time you write select("*"), Spark expands it into <em>every</em> column from your schema, even if you’re using just one or two later.</p>
<p>Those extra attributes flow through every stage of the plan, from filters and joins to aggregations and shuffles. The result: inflated logical plans, bigger shuffle files, and slower queries.</p>
<h4 id="heading-the-problem-the-lazy-star">The Problem: “The Lazy Star”</h4>
<pre><code class="lang-python">df_star = (
    df.select(<span class="hljs-string">"*"</span>)
      .filter(col(<span class="hljs-string">"department"</span>) == <span class="hljs-string">"Engineering"</span>)
      .groupBy(<span class="hljs-string">"country"</span>)
      .agg(avg(<span class="hljs-string">"salary"</span>).alias(<span class="hljs-string">"avg_salary"</span>))
)
</code></pre>
<p>At first glance, this seems harmless. But the problem is: only two columns (country and salary) are needed for the aggregation, but Spark carries all eight (id, firstname, lastname, department, salary, age, hire_date, country) through every transformation.</p>
<p><strong>Simplified Logical Plan:</strong></p>
<pre><code class="lang-python">Aggregate [country], [avg(salary) AS avg_salary]

└─ Filter (department = Engineering)

   └─ Project [id, firstname, lastname, department, salary, age, hire_date, country]

      └─ LogicalRDD [id, firstname, lastname, department, salary, age, hire_date, country]
</code></pre>
<p>Every node in this tree carries all columns. Catalyst can’t prune them because you explicitly asked for "*". The excess attributes are serialized, shuffled, and deserialized across the cluster, even though they serve no purpose in the final result.</p>
<h4 id="heading-the-fix-select-only-what-you-need">The Fix: Select Only What You Need</h4>
<p>Be deliberate with your projections. Select the minimal schema required for the task.</p>
<pre><code class="lang-python">df_pruned = (
    df.select(<span class="hljs-string">"department"</span>, <span class="hljs-string">"salary"</span>, <span class="hljs-string">"country"</span>)
      .filter(col(<span class="hljs-string">"department"</span>) == <span class="hljs-string">"Engineering"</span>)
      .groupBy(<span class="hljs-string">"country"</span>)
      .agg(avg(<span class="hljs-string">"salary"</span>).alias(<span class="hljs-string">"avg_salary"</span>))
)
</code></pre>
<p><strong>Simplified Logical Plan:</strong></p>
<pre><code class="lang-python">Aggregate [country], [avg(salary) AS avg_salary]

└─ Filter (department = Engineering)

   └─ Project [department, salary, country]

      └─ LogicalRDD [id, firstname, lastname, department, salary, age, hire_date, country]
</code></pre>
<p>Now Spark reads and processes only the three required columns: department, salary, and country. The plan is narrower, the DAG simpler, and execution faster.</p>
<h4 id="heading-real-world-benchmark-aws-glue-2">Real-World Benchmark: AWS Glue</h4>
<p>Wide Projection:</p>
<pre><code class="lang-python">df = spark.createDataFrame(multiplied_data,
                           [<span class="hljs-string">"id"</span>, <span class="hljs-string">"firstname"</span>, <span class="hljs-string">"lastname"</span>, <span class="hljs-string">"department"</span>, <span class="hljs-string">"salary"</span>, <span class="hljs-string">"age"</span>, <span class="hljs-string">"hire_date"</span>, <span class="hljs-string">"country"</span>])

start = time.time()
df_star = (
    df.select(<span class="hljs-string">"*"</span>)
      .filter(col(<span class="hljs-string">"department"</span>) == <span class="hljs-string">"Engineering"</span>)
      .groupBy(<span class="hljs-string">"country"</span>)
      .agg(avg(<span class="hljs-string">"salary"</span>).alias(<span class="hljs-string">"avg_salary"</span>))
)

df_star.count()
time_star = round(time.time() - start, <span class="hljs-number">2</span>)
</code></pre>
<p>Pruned Projection:</p>
<pre><code class="lang-python">start = time.time()

df_pruned = (
    df.select(<span class="hljs-string">"department"</span>, <span class="hljs-string">"salary"</span>, <span class="hljs-string">"country"</span>)
      .filter(col(<span class="hljs-string">"department"</span>) == <span class="hljs-string">"Engineering"</span>)
      .groupBy(<span class="hljs-string">"country"</span>)
      .agg(avg(<span class="hljs-string">"salary"</span>).alias(<span class="hljs-string">"avg_salary"</span>))
)

df_pruned.count()
time_pruned = round(time.time() - start, <span class="hljs-number">2</span>)

print(<span class="hljs-string">f"select('*') time: <span class="hljs-subst">{time_star}</span>s"</span>)
print(<span class="hljs-string">f"pruned columns time: <span class="hljs-subst">{time_pruned}</span>s"</span>)

spark.stop()
</code></pre>
<div class="hn-table">
<table>
<thead>
<tr>
<td><strong>Approach</strong></td><td><strong>Columns Processed</strong></td><td><strong>Execution Time (1M rows)</strong></td><td><strong>Observation</strong></td></tr>
</thead>
<tbody>
<tr>
<td>select("*")</td><td>8</td><td>~26.54 s</td><td>Spark carries all columns through the plan.</td></tr>
<tr>
<td>Pruned projection</td><td>3</td><td>~2.21 s</td><td>Only needed columns processed → faster and lighter.</td></tr>
</tbody>
</table>
</div><h4 id="heading-under-the-hood-how-catalyst-handles-columns">Under the Hood: How Catalyst Handles Columns</h4>
<p>When you call select("*"), Catalyst resolves <em>every attribute</em> into the logical plan. Each subsequent transformation inherits that full attribute list, increasing plan depth and overhead.</p>
<p>Catalyst includes a rule called ColumnPruning, which removes unused attributes but it only works when Spark <em>can see</em> which columns are necessary. If you use "*" or dynamically reference df.columns, Catalyst loses visibility.</p>
<p><strong>Works:</strong></p>
<pre><code class="lang-python">df \
    .select(<span class="hljs-string">"salary"</span>, <span class="hljs-string">"country"</span>) \
    .groupBy(<span class="hljs-string">"country"</span>) \
    .agg(avg(<span class="hljs-string">"salary"</span>))
</code></pre>
<p><strong>Doesn’t Work:</strong></p>
<pre><code class="lang-python">cols = df.columns

df.select(cols) \
  .groupBy(<span class="hljs-string">"country"</span>) \
  .agg(avg(<span class="hljs-string">"salary"</span>))
</code></pre>
<p>In the second case, Catalyst can’t prune anything because cols might include everything.</p>
<h4 id="heading-physical-plan-differences">Physical Plan Differences</h4>
<pre><code class="lang-python">Wide Projection (select(<span class="hljs-string">"*"</span>)):

*(<span class="hljs-number">1</span>) HashAggregate(keys=[country], functions=[avg(salary)])

+- *(<span class="hljs-number">1</span>) Project [id, firstname, lastname, department, salary, age, hire_date, country]

   +- *(<span class="hljs-number">1</span>) Filter (department = Engineering)

      +- *(<span class="hljs-number">1</span>) Scan parquet ...
</code></pre>
<p>Pruned Projection:</p>
<pre><code class="lang-python">*(<span class="hljs-number">1</span>) HashAggregate(keys=[country], functions=[avg(salary)])

+- *(<span class="hljs-number">1</span>) Project [department, salary, country]

   +- *(<span class="hljs-number">1</span>) Filter (department = Engineering)

      +- *(<span class="hljs-number">1</span>) Scan parquet [department, salary, country]
</code></pre>
<p>Notice the last line: Spark physically scans only the three referenced columns from Parquet. That’s genuine I/O reduction, not just logical simplification. Using select(*) increases shuffle file sizes, memory usage during serialization, Catalyst planning time, and I/O and network traffic, and the solution requires no more than specifying the necessary columns.</p>
<p>But in managed environments like AWS Glue or Databricks, this simple practice can greatly reduce ETL time, particularly for Parquet or Delta files, due to effective column pruning during explicit projection. It’s one of the easiest and highest-impact Spark optimization techniques, starting with typing fewer asterisks.</p>
<h3 id="heading-scenario-6-filter-pushdown-vs-full-scan">Scenario 6: Filter Pushdown vs Full Scan</h3>
<p>When a Spark job feels slow right from the start, even before joins or aggregations, the culprit is often hidden at the data-read layer. Spark spends seconds (or minutes) scanning every record, even though most rows are useless for the query.</p>
<p>That’s where filter pushdown comes in. It tells Spark to <em>push your filter logic down to the file reader</em> so that Parquet / ORC / Delta formats return only the relevant rows from disk. Done right, this optimization can reduce scan size significantly. Done wrong, Spark performs a full scan, reading everything before filtering in memory.</p>
<h4 id="heading-the-problem-late-filters-and-full-scans">The Problem: Late Filters and Full Scans</h4>
<pre><code class="lang-python">employees_df = spark.read.parquet(<span class="hljs-string">"s3://data/employee_data/"</span>)

df_full = (
    employees_df
        .select(<span class="hljs-string">"*"</span>)  <span class="hljs-comment"># reads all columns</span>
        .filter(col(<span class="hljs-string">"country"</span>) == <span class="hljs-string">"Canada"</span>)
)
</code></pre>
<p>Looks fine, right? But Spark can’t push this filter to the Parquet reader because it’s applied <em>after</em> the select("*") projection step. Catalyst sees the filter as operating on a projected DataFrame, not the raw scan, so the pushdown boundary is lost.</p>
<p><strong>Simplified Logical Plan:</strong></p>
<pre><code class="lang-python">Filter (country = Canada)

└─ Project [id, firstname, lastname, department, salary, age, hire_date, country]

   └─ Scan parquet employee_data [id, firstname, lastname, department, salary, age, hire_date, country]
</code></pre>
<p>Every record from every Parquet file is read into memory before the filter executes. In large tables, this means scanning terabytes when you only need megabytes.</p>
<h4 id="heading-the-fix-filter-early-and-project-light">The Fix: Filter Early and Project Light</h4>
<p>Move filters as close as possible to the data source and limit columns before Spark reads them:</p>
<pre><code class="lang-python">df_pushdown = (
    spark.read.parquet(<span class="hljs-string">"s3://data/employee_data/"</span>)
        .select(<span class="hljs-string">"id"</span>, <span class="hljs-string">"firstname"</span>, <span class="hljs-string">"department"</span>, <span class="hljs-string">"salary"</span>, <span class="hljs-string">"country"</span>)
        .filter(col(<span class="hljs-string">"country"</span>) == <span class="hljs-string">"Canada"</span>)
)
</code></pre>
<p><strong>Simplified Logical Plan:</strong></p>
<pre><code class="lang-python">Project [id, firstname, department, salary, country]

└─ Scan parquet employee_data [id, firstname, department, salary, country]
</code></pre>
<p>PushedFilters: [country = Canada]</p>
<p>Notice the difference: PushedFilters appears in the plan. That means the Parquet reader handles the predicate, returning only matching blocks and rows.</p>
<h4 id="heading-under-the-hood-what-actually-happens">Under the Hood: What Actually Happens</h4>
<p>When Spark performs filter pushdown, it leverages the Parquet metadata (min/max statistics and row-group indexes) stored in file footers.</p>
<ul>
<li><p>Spark inspects file-level metadata for the predicate column (country).</p>
</li>
<li><p>It skips any row group whose values don’t match (country ≠ Canada).</p>
</li>
<li><p>It reads only the necessary row groups and columns from disk.</p>
</li>
<li><p>Those records enter the DAG directly – no in-memory filtering required.</p>
</li>
</ul>
<p>This optimization happens entirely before Spark begins executing stages, reducing both I/O and network transfer.</p>
<h4 id="heading-real-world-benchmark-aws-glue-3">Real-World Benchmark: AWS Glue</h4>
<pre><code class="lang-python"><span class="hljs-keyword">import</span> time
<span class="hljs-keyword">from</span> pyspark.sql <span class="hljs-keyword">import</span> SparkSession
<span class="hljs-keyword">from</span> pyspark.sql.functions <span class="hljs-keyword">import</span> col

spark = SparkSession.builder.appName(<span class="hljs-string">"FilterPushdownBenchmark"</span>).getOrCreate()

start = time.time()
df_full = (
    spark.read.parquet(<span class="hljs-string">"s3://data/employee_data/"</span>)
        .select(<span class="hljs-string">"*"</span>)                         <span class="hljs-comment"># all columns</span>
        .filter(col(<span class="hljs-string">"country"</span>) == <span class="hljs-string">"Canada"</span>)  
)
df_full.count()
time_full = round(time.time() - start, <span class="hljs-number">2</span>)

start = time.time()
df_pushdown = (
    spark.read.parquet(<span class="hljs-string">"s3://data/employee_data/"</span>)
        .select(<span class="hljs-string">"id"</span>, <span class="hljs-string">"firstname"</span>, <span class="hljs-string">"department"</span>, <span class="hljs-string">"salary"</span>, <span class="hljs-string">"country"</span>)
        .filter(col(<span class="hljs-string">"country"</span>) == <span class="hljs-string">"Canada"</span>)  
)
df_pushdown.count()
time_push = round(time.time() - start, <span class="hljs-number">2</span>)

print(<span class="hljs-string">"Full Scan Time:"</span>, time_full, <span class="hljs-string">"sec"</span>)
print(<span class="hljs-string">"Filter Pushdown Time:"</span>, time_push, <span class="hljs-string">"sec"</span>)

spark.stop()
</code></pre>
<div class="hn-table">
<table>
<thead>
<tr>
<td><strong>Approach</strong></td><td><strong>Execution Time (1 M rows)</strong></td><td><strong>Observation</strong></td></tr>
</thead>
<tbody>
<tr>
<td>Full Scan</td><td>14.2 s</td><td>All files scanned and filtered in memory.</td></tr>
<tr>
<td>Filter Pushdown</td><td>3.8 s</td><td>Only relevant row groups and columns read.</td></tr>
</tbody>
</table>
</div><p><strong>Physical Plan Comparison</strong></p>
<p>Full Scan:</p>
<pre><code class="lang-python">*(<span class="hljs-number">1</span>) Filter (country = Canada)

+- *(<span class="hljs-number">1</span>) ColumnarToRow

   +- *(<span class="hljs-number">1</span>) FileScan parquet [id, firstname, lastname, department, salary, age, hire_date, country]

      Batched: true, DataFilters: [], PushedFilters: []
</code></pre>
<p>Pushdown:</p>
<pre><code class="lang-python">*(<span class="hljs-number">1</span>) ColumnarToRow

+- *(<span class="hljs-number">1</span>) FileScan parquet [id, firstname, department, salary, country]

   Batched: true, DataFilters: [isnotnull(country)], PushedFilters: [country = Canada]
</code></pre>
<p>The difference is clear: PushedFilters confirms that Spark applied predicate pushdown, skipping unnecessary row groups at the scan stage.</p>
<h4 id="heading-reflection-why-pushdown-matters">Reflection: Why Pushdown Matters</h4>
<p>Pushdown isn’t a micro-optimization. It’s actually often the single biggest performance lever in Spark ETL. In data lakes with hundreds of files, full scans waste hours and inflate AWS S3 I/O costs. By filtering and projecting early, Spark prunes both rows and columns before execution even begins.</p>
<p>Apply filters as early as possible in the read pipeline, combine filter pushdown with column pruning, verify PushedFilters in explain("formatted"), avoid UDFs and select("*") at read time, and let pushdown turn “read everything and discard most” into “read only what you need.”</p>
<h3 id="heading-scenario-7-de-duplicate-right">Scenario 7: De-duplicate Right</h3>
<h4 id="heading-the-problem-all-row-deduplication-and-why-it-hurts">The Problem: “All-Row Deduplication” and Why It Hurts</h4>
<p>When we use this:</p>
<pre><code class="lang-python">df.dropDuplicates()
</code></pre>
<p>Spark removes identical rows across all columns. It sounds simple, but this operation forces Spark to treat every column as part of the deduplication key.</p>
<p>Internally, it means:</p>
<ul>
<li><p>Every attribute is serialized and hashed.</p>
</li>
<li><p>Every unique combination of all columns is shuffled across the cluster to ensure global uniqueness.</p>
</li>
<li><p>Even small changes in a non-essential field (like hire_date) cause new keys and destroy aggregation locality.</p>
</li>
</ul>
<p>In wide tables, this is one of the heaviest shuffle operations Spark can perform: df.dropDuplicates()</p>
<p><strong>Simplified Logical Plan:</strong></p>
<pre><code class="lang-python">Aggregate [id, firstname, lastname, department, salary, age, hire_date, country], [first(id) AS id, ...]

└─ Exchange hashpartitioning(id, firstname, lastname, department, salary, age, hire_date, country, <span class="hljs-number">200</span>)

   └─ LogicalRDD [id, firstname, lastname, department, salary, age, hire_date, country]
</code></pre>
<p>Notice the Exchange: that’s a full shuffle across all columns. Spark must send every record to the partition responsible for its unique combination of all fields. This is slow, memory-intensive, and scales poorly as columns grow.</p>
<h4 id="heading-the-better-approach-key-based-deduplication">The Better Approach: Key-Based Deduplication</h4>
<p>In most real datasets, duplicates are determined by a primary or business key, not all attributes. For example, if id uniquely identifies an employee, we only need to keep one record per id.</p>
<pre><code class="lang-python">df.dropDuplicates([<span class="hljs-string">"id"</span>])
</code></pre>
<p>Now Spark deduplicates based only on the id column.</p>
<pre><code class="lang-python">Aggregate [id], [first(id) AS id, first(firstname) AS firstname, ...]

└─ Exchange hashpartitioning(id, <span class="hljs-number">200</span>)

   └─ LogicalRDD [id, firstname, lastname, department, salary, age, hire_date, country]
</code></pre>
<p>The shuffle is dramatically narrower. Instead of hashing across all columns, Spark redistributes data only by id. Fewer bytes, smaller shuffle files, faster reduce stage</p>
<h4 id="heading-real-world-benchmark-aws-glue-4">Real-World Benchmark: AWS Glue</h4>
<pre><code class="lang-python"><span class="hljs-keyword">import</span> time
<span class="hljs-keyword">from</span> pyspark.sql.functions <span class="hljs-keyword">import</span> exp, log, sqrt, col, concat_ws, when, upper, avg
<span class="hljs-keyword">from</span> pyspark.sql <span class="hljs-keyword">import</span> SparkSession

spark = SparkSession.builder.appName(<span class="hljs-string">"MillionRowsRenameTest"</span>).getOrCreate()

employees_data = [
    (<span class="hljs-number">1</span>, <span class="hljs-string">"John"</span>, <span class="hljs-string">"Doe"</span>, <span class="hljs-string">"Engineering"</span>, <span class="hljs-number">80000</span>, <span class="hljs-number">28</span>, <span class="hljs-string">"2020-01-15"</span>, <span class="hljs-string">"USA"</span>),
    (<span class="hljs-number">2</span>, <span class="hljs-string">"Jane"</span>, <span class="hljs-string">"Smith"</span>, <span class="hljs-string">"Engineering"</span>, <span class="hljs-number">85000</span>, <span class="hljs-number">32</span>, <span class="hljs-string">"2019-03-20"</span>, <span class="hljs-string">"USA"</span>),
    (<span class="hljs-number">3</span>, <span class="hljs-string">"Alice"</span>, <span class="hljs-string">"Johnson"</span>, <span class="hljs-string">"Sales"</span>, <span class="hljs-number">60000</span>, <span class="hljs-number">25</span>, <span class="hljs-string">"2021-06-10"</span>, <span class="hljs-string">"UK"</span>),
    (<span class="hljs-number">4</span>, <span class="hljs-string">"Bob"</span>, <span class="hljs-string">"Brown"</span>, <span class="hljs-string">"Engineering"</span>, <span class="hljs-number">90000</span>, <span class="hljs-number">35</span>, <span class="hljs-string">"2018-07-01"</span>, <span class="hljs-string">"USA"</span>),
    (<span class="hljs-number">5</span>, <span class="hljs-string">"Charlie"</span>, <span class="hljs-string">"Wilson"</span>, <span class="hljs-string">"Sales"</span>, <span class="hljs-number">65000</span>, <span class="hljs-number">29</span>, <span class="hljs-string">"2020-11-05"</span>, <span class="hljs-string">"UK"</span>),
    (<span class="hljs-number">6</span>, <span class="hljs-string">"David"</span>, <span class="hljs-string">"Lee"</span>, <span class="hljs-string">"HR"</span>, <span class="hljs-number">55000</span>, <span class="hljs-number">27</span>, <span class="hljs-string">"2021-01-20"</span>, <span class="hljs-string">"USA"</span>),
    (<span class="hljs-number">7</span>, <span class="hljs-string">"Eve"</span>, <span class="hljs-string">"Davis"</span>, <span class="hljs-string">"Engineering"</span>, <span class="hljs-number">95000</span>, <span class="hljs-number">40</span>, <span class="hljs-string">"2017-04-12"</span>, <span class="hljs-string">"Canada"</span>),
    (<span class="hljs-number">8</span>, <span class="hljs-string">"Frank"</span>, <span class="hljs-string">"Miller"</span>, <span class="hljs-string">"Sales"</span>, <span class="hljs-number">70000</span>, <span class="hljs-number">33</span>, <span class="hljs-string">"2019-09-25"</span>, <span class="hljs-string">"UK"</span>),
    (<span class="hljs-number">9</span>, <span class="hljs-string">"Grace"</span>, <span class="hljs-string">"Taylor"</span>, <span class="hljs-string">"HR"</span>, <span class="hljs-number">58000</span>, <span class="hljs-number">26</span>, <span class="hljs-string">"2021-08-15"</span>, <span class="hljs-string">"Canada"</span>),
    (<span class="hljs-number">10</span>, <span class="hljs-string">"Henry"</span>, <span class="hljs-string">"Anderson"</span>, <span class="hljs-string">"Engineering"</span>, <span class="hljs-number">88000</span>, <span class="hljs-number">31</span>, <span class="hljs-string">"2020-02-28"</span>, <span class="hljs-string">"USA"</span>)
]

multiplied_data = [(i, <span class="hljs-string">f"firstname_<span class="hljs-subst">{i}</span>"</span>, <span class="hljs-string">f"lastname_<span class="hljs-subst">{i}</span>"</span>,
                    employees_data[i % <span class="hljs-number">10</span>][<span class="hljs-number">3</span>],   <span class="hljs-comment"># department</span>
                    employees_data[i % <span class="hljs-number">10</span>][<span class="hljs-number">4</span>],   <span class="hljs-comment"># salary</span>
                    employees_data[i % <span class="hljs-number">10</span>][<span class="hljs-number">5</span>],   <span class="hljs-comment"># age</span>
                    employees_data[i % <span class="hljs-number">10</span>][<span class="hljs-number">6</span>],   <span class="hljs-comment"># hire_date</span>
                    employees_data[i % <span class="hljs-number">10</span>][<span class="hljs-number">7</span>]    <span class="hljs-comment"># country</span>
                    )
                   <span class="hljs-keyword">for</span> i <span class="hljs-keyword">in</span> range(<span class="hljs-number">1</span>, <span class="hljs-number">1</span>_000_001)]

df = spark.createDataFrame(
    multiplied_data,
    [<span class="hljs-string">"id"</span>, <span class="hljs-string">"firstname"</span>, <span class="hljs-string">"lastname"</span>, <span class="hljs-string">"department"</span>, <span class="hljs-string">"salary"</span>, <span class="hljs-string">"age"</span>, <span class="hljs-string">"hire_date"</span>, <span class="hljs-string">"country"</span>]
)

start = time.time()
dedup_full = df.dropDuplicates()
dedup_full.count()
time_full = round(time.time() - start, <span class="hljs-number">2</span>)

start = time.time()
dedup_key = df.dropDuplicates([<span class="hljs-string">"id"</span>])
dedup_key.count()
time_key = round(time.time() - start, <span class="hljs-number">2</span>)

print(<span class="hljs-string">f"Full-row dedup time: <span class="hljs-subst">{time_full}</span>s"</span>)
print(<span class="hljs-string">f"Key-based dedup time: <span class="hljs-subst">{time_key}</span>s"</span>)

spark.stop()
</code></pre>
<div class="hn-table">
<table>
<thead>
<tr>
<td><strong>Approach</strong></td><td><strong>Execution Time (1M rows)</strong></td><td><strong>Observation</strong></td></tr>
</thead>
<tbody>
<tr>
<td>Full-Row Dedup</td><td>27.6 s</td><td>Shuffle across all attributes, large hash table</td></tr>
<tr>
<td>Key-Based Dedup (["id"])</td><td>2.06 s</td><td>10× faster, minimal shuffle width</td></tr>
</tbody>
</table>
</div><h4 id="heading-under-the-hood-what-catalyst-does">Under the Hood: What Catalyst Does</h4>
<p>When you specify a key list, Catalyst rewrites dropDuplicates(keys) into a partial + final aggregate plan, just like a groupBy:</p>
<p>HashAggregate(keys=[id], functions=[first(...)])</p>
<p>This allows Spark to:</p>
<ul>
<li><p>Perform map-side partial aggregation on each partition (before shuffle).</p>
</li>
<li><p>Exchange only the grouping key (id).</p>
</li>
<li><p>Perform a final aggregation on the reduced data.</p>
</li>
</ul>
<p>The all-column version can’t do that optimization because every column participates in uniqueness Spark must ensure <em>complete</em> data redistribution.</p>
<h4 id="heading-best-practices-for-deduplication">Best Practices for Deduplication</h4>
<div class="hn-table">
<table>
<thead>
<tr>
<td><strong>Practice</strong></td><td><strong>Why It Matters</strong></td></tr>
</thead>
<tbody>
<tr>
<td>Always deduplicate by key columns</td><td>Reduces shuffle width and data movement</td></tr>
<tr>
<td>Use deterministic keys (id, email, ssn)</td><td>Ensures predictable grouping</td></tr>
<tr>
<td>Avoid dropDuplicates() without arguments</td><td>Forces global shuffle across all attributes</td></tr>
<tr>
<td>Combine with column pruning</td><td>Keep only necessary fields before deduplication</td></tr>
<tr>
<td>For “latest record” logic, use window functions</td><td>Allows targeted deduplication (row_number() with order)</td></tr>
<tr>
<td>Cache intermediate datasets if reused</td><td>Avoids recomputation of expensive dedup stages</td></tr>
</tbody>
</table>
</div><h4 id="heading-combining-deduplication-amp-aggregation">Combining Deduplication &amp; Aggregation</h4>
<p>You can merge deduplication with aggregation for even better results:</p>
<pre><code class="lang-python">df_dedup_agg = (
    df.dropDuplicates([<span class="hljs-string">"id"</span>])
        .groupBy(<span class="hljs-string">"department"</span>)
        .agg(avg(<span class="hljs-string">"salary"</span>).alias(<span class="hljs-string">"avg_salary"</span>))
)
</code></pre>
<p>Spark now reuses the same shuffle partitioning for both operations, one shuffle instead of two. The plan will show:</p>
<pre><code class="lang-python">HashAggregate(keys=[department], functions=[avg(salary)])

└─ HashAggregate(keys=[id], functions=[first(...), first(department)])

   └─ Exchange hashpartitioning(id, <span class="hljs-number">200</span>)
</code></pre>
<p>Prefer dropDuplicates(["key_col"]) over dropDuplicates() to deduplicate by business or surrogate keys rather than the entire schema. Combine deduplication with projection to reduce I/O, and remember that one narrow shuffle is always better than a wide shuffle. Deduplication isn’t just cleanup – it’s an optimization strategy. Choose your keys wisely, and Spark will reward you with faster jobs and lighter DAGs.</p>
<h3 id="heading-scenario-8-count-smarter">Scenario 8: Count Smarter</h3>
<p>In production, one of the most common performance pitfalls is the simplest line of code:</p>
<pre><code class="lang-python"><span class="hljs-keyword">if</span> df.count() &gt; <span class="hljs-number">0</span>:
</code></pre>
<p>At first glance, this seems harmless. You just want to know whether the DataFrame has any data before writing, joining, or aggregating. But in Spark, count() is not metadata lookup, it’s a full cluster-wide job.</p>
<p><strong>What Really Happens with count()</strong><br>When you call df.count(), Spark executes a complete action:</p>
<ul>
<li><p>It scans every partition.</p>
</li>
<li><p>Deserializes every row.</p>
</li>
<li><p>Counts records locally on each executor.</p>
</li>
<li><p>Reduces the counts to the driver.</p>
</li>
</ul>
<p>That means your “empty check” runs a full distributed computation, even when the dataset has billions of rows or lives in S3.</p>
<pre><code class="lang-python">df.count()
</code></pre>
<p><strong>Simplified Logical Plan:</strong></p>
<pre><code class="lang-python">*(<span class="hljs-number">1</span>) HashAggregate(keys=[], functions=[count(<span class="hljs-number">1</span>)])

+- *(<span class="hljs-number">1</span>) ColumnarToRow

   +- *(<span class="hljs-number">1</span>) FileScan parquet [id, firstname, lastname, department, salary, age, hire_date, country]
</code></pre>
<p>Every record is read, aggregated, and returned just to produce a single integer.</p>
<p>Now imagine this runs in the middle of your Glue job, before a write, before a filter, or inside a loop. You’ve just added a full-table scan to your DAG for no reason.</p>
<h4 id="heading-the-smarter-way-limit1-or-head1">The Smarter Way: limit(1) or head(1)</h4>
<p>If all you need to know is whether data exists, you don’t need to count every record. You just need to know if there’s <em>at least one</em>.</p>
<p>Two efficient alternatives</p>
<pre><code class="lang-python">df.head(<span class="hljs-number">1</span>)
<span class="hljs-comment">#or</span>
df.limit(<span class="hljs-number">1</span>).collect()
</code></pre>
<p>Both execute a lazy scan that stops as soon as one record is found.</p>
<p><strong>Simplified Logical Plan:</strong></p>
<pre><code class="lang-python">TakeOrderedAndProject(limit=<span class="hljs-number">1</span>)

└─ *(<span class="hljs-number">1</span>) FileScan parquet [id, firstname, lastname, department, salary, age, hire_date, country]
</code></pre>
<ul>
<li><p>No global aggregation.</p>
</li>
<li><p>No shuffle.</p>
</li>
<li><p>No full scan.</p>
</li>
</ul>
<h4 id="heading-real-world-benchmark-aws-glue-5">Real-World Benchmark: AWS Glue</h4>
<pre><code class="lang-python"><span class="hljs-keyword">import</span> time
<span class="hljs-keyword">from</span> pyspark.sql.functions <span class="hljs-keyword">import</span> exp, log, sqrt, col, concat_ws, when, upper, avg
<span class="hljs-keyword">from</span> pyspark.sql <span class="hljs-keyword">import</span> SparkSession

<span class="hljs-comment"># Initialize Spark session</span>
spark = SparkSession.builder.appName(<span class="hljs-string">"MillionRowsRenameTest"</span>).getOrCreate()

<span class="hljs-comment"># Base dataset (10 sample employees)</span>
employees_data = [
    (<span class="hljs-number">1</span>, <span class="hljs-string">"John"</span>, <span class="hljs-string">"Doe"</span>, <span class="hljs-string">"Engineering"</span>, <span class="hljs-number">80000</span>, <span class="hljs-number">28</span>, <span class="hljs-string">"2020-01-15"</span>, <span class="hljs-string">"USA"</span>),
    (<span class="hljs-number">2</span>, <span class="hljs-string">"Jane"</span>, <span class="hljs-string">"Smith"</span>, <span class="hljs-string">"Engineering"</span>, <span class="hljs-number">85000</span>, <span class="hljs-number">32</span>, <span class="hljs-string">"2019-03-20"</span>, <span class="hljs-string">"USA"</span>),
    (<span class="hljs-number">3</span>, <span class="hljs-string">"Alice"</span>, <span class="hljs-string">"Johnson"</span>, <span class="hljs-string">"Sales"</span>, <span class="hljs-number">60000</span>, <span class="hljs-number">25</span>, <span class="hljs-string">"2021-06-10"</span>, <span class="hljs-string">"UK"</span>),
    (<span class="hljs-number">4</span>, <span class="hljs-string">"Bob"</span>, <span class="hljs-string">"Brown"</span>, <span class="hljs-string">"Engineering"</span>, <span class="hljs-number">90000</span>, <span class="hljs-number">35</span>, <span class="hljs-string">"2018-07-01"</span>, <span class="hljs-string">"USA"</span>),
    (<span class="hljs-number">5</span>, <span class="hljs-string">"Charlie"</span>, <span class="hljs-string">"Wilson"</span>, <span class="hljs-string">"Sales"</span>, <span class="hljs-number">65000</span>, <span class="hljs-number">29</span>, <span class="hljs-string">"2020-11-05"</span>, <span class="hljs-string">"UK"</span>),
    (<span class="hljs-number">6</span>, <span class="hljs-string">"David"</span>, <span class="hljs-string">"Lee"</span>, <span class="hljs-string">"HR"</span>, <span class="hljs-number">55000</span>, <span class="hljs-number">27</span>, <span class="hljs-string">"2021-01-20"</span>, <span class="hljs-string">"USA"</span>),
    (<span class="hljs-number">7</span>, <span class="hljs-string">"Eve"</span>, <span class="hljs-string">"Davis"</span>, <span class="hljs-string">"Engineering"</span>, <span class="hljs-number">95000</span>, <span class="hljs-number">40</span>, <span class="hljs-string">"2017-04-12"</span>, <span class="hljs-string">"Canada"</span>),
    (<span class="hljs-number">8</span>, <span class="hljs-string">"Frank"</span>, <span class="hljs-string">"Miller"</span>, <span class="hljs-string">"Sales"</span>, <span class="hljs-number">70000</span>, <span class="hljs-number">33</span>, <span class="hljs-string">"2019-09-25"</span>, <span class="hljs-string">"UK"</span>),
    (<span class="hljs-number">9</span>, <span class="hljs-string">"Grace"</span>, <span class="hljs-string">"Taylor"</span>, <span class="hljs-string">"HR"</span>, <span class="hljs-number">58000</span>, <span class="hljs-number">26</span>, <span class="hljs-string">"2021-08-15"</span>, <span class="hljs-string">"Canada"</span>),
    (<span class="hljs-number">10</span>, <span class="hljs-string">"Henry"</span>, <span class="hljs-string">"Anderson"</span>, <span class="hljs-string">"Engineering"</span>, <span class="hljs-number">88000</span>, <span class="hljs-number">31</span>, <span class="hljs-string">"2020-02-28"</span>, <span class="hljs-string">"USA"</span>)
]

<span class="hljs-comment"># Create 1 million rows</span>
multiplied_data = [
    (i, <span class="hljs-string">f"firstname_<span class="hljs-subst">{i}</span>"</span>, <span class="hljs-string">f"lastname_<span class="hljs-subst">{i}</span>"</span>,
     employees_data[i % <span class="hljs-number">10</span>][<span class="hljs-number">3</span>],
     employees_data[i % <span class="hljs-number">10</span>][<span class="hljs-number">4</span>],
     employees_data[i % <span class="hljs-number">10</span>][<span class="hljs-number">5</span>],
     employees_data[i % <span class="hljs-number">10</span>][<span class="hljs-number">6</span>],
     employees_data[i % <span class="hljs-number">10</span>][<span class="hljs-number">7</span>])
    <span class="hljs-keyword">for</span> i <span class="hljs-keyword">in</span> range(<span class="hljs-number">1</span>, <span class="hljs-number">1</span>_000_001)
]

df = spark.createDataFrame(
    multiplied_data,
    [<span class="hljs-string">"id"</span>, <span class="hljs-string">"firstname"</span>, <span class="hljs-string">"lastname"</span>, <span class="hljs-string">"department"</span>, <span class="hljs-string">"salary"</span>, <span class="hljs-string">"age"</span>, <span class="hljs-string">"hire_date"</span>, <span class="hljs-string">"country"</span>]
)
<span class="hljs-comment"># Create DataFrame</span>
df = spark.createDataFrame(
    multiplied_data,
    [<span class="hljs-string">"id"</span>, <span class="hljs-string">"firstname"</span>, <span class="hljs-string">"lastname"</span>, <span class="hljs-string">"department"</span>, <span class="hljs-string">"salary"</span>, <span class="hljs-string">"age"</span>, <span class="hljs-string">"hire_date"</span>, <span class="hljs-string">"country"</span>]
)

start = time.time()
df.count()
count_time = round(time.time() - start, <span class="hljs-number">2</span>)

start = time.time()
df.limit(<span class="hljs-number">1</span>).collect()
limit_time = round(time.time() - start, <span class="hljs-number">2</span>)

start = time.time()
df.head(<span class="hljs-number">1</span>)
head_time = round(time.time() - start, <span class="hljs-number">2</span>)

spark.stop()
</code></pre>
<div class="hn-table">
<table>
<thead>
<tr>
<td><strong>Method</strong></td><td><strong>Plan Type</strong></td><td><strong>Execution Time (1M rows)</strong></td><td><strong>Notes</strong></td></tr>
</thead>
<tbody>
<tr>
<td>count()</td><td>HashAggregate + Exchange</td><td>26.33 s</td><td>Full scan + aggregation</td></tr>
<tr>
<td>limit(1)</td><td>TakeOrderedAndProject</td><td>0.62 s</td><td>Stops after first record</td></tr>
<tr>
<td>head(1)</td><td>TakeOrderedAndProject</td><td>0.42 s</td><td>Fastest, single partition</td></tr>
</tbody>
</table>
</div><p>The difference is significant for the same logical check.</p>
<p>So why does this difference exist? Spark’s execution model treats every action as a trigger for computation. count() is an aggregation action, requiring global communication, and limit(1) and head(1) are sampling actions, short-circuiting the job after fetching the first record. Catalyst generates a TakeOrderedAndProject node instead of HashAggregate, and the scheduler terminates once one task finishes.</p>
<p><strong>Plan comparison:</strong></p>
<div class="hn-table">
<table>
<thead>
<tr>
<td><strong>Action</strong></td><td><strong>Simplified Plan</strong></td><td><strong>Type</strong></td><td><strong>Behavior</strong></td></tr>
</thead>
<tbody>
<tr>
<td>count()</td><td>HashAggregate → Exchange → FileScan</td><td>Global</td><td>Full scan, wide dependency</td></tr>
<tr>
<td>limit(1)</td><td>TakeOrderedAndProject → FileScan</td><td>Local</td><td>Early stop, narrow dependency</td></tr>
<tr>
<td>head(1)</td><td>TakeOrderedAndProject → FileScan</td><td>Local</td><td>Early stop, single task</td></tr>
</tbody>
</table>
</div><p>Avoid using count() to check emptiness since it triggers a full scan. Use limit(1) or head(1) for lightweight existence checks. And reserve count() only when the total is required, because Spark will always process all data unless explicitly told to stop. Other alternatives</p>
<div class="hn-table">
<table>
<thead>
<tr>
<td><code>df.take(1)</code></td><td>Similar to head() returns array</td></tr>
</thead>
<tbody>
<tr>
<td><code>df.first()</code></td><td>Returns first Row or None</td></tr>
<tr>
<td><code>df.isEmpty()</code></td><td>Returns true if DataFrame has no rows</td></tr>
<tr>
<td><code>df.rdd.isEmpty()</code></td><td>RDD-level check</td></tr>
</tbody>
</table>
</div><h3 id="heading-scenario-9-window-wisely">Scenario 9: Window Wisely</h3>
<p>Window functions (rank(), dense_rank(), lag(), avg() with over(), and so on) are essential in analytics. They let you calculate running totals, rankings, or time-based metrics.</p>
<p>But in Spark, they’re not cheap, because they rely on shuffles and ordering.</p>
<p>Each window operation:</p>
<ul>
<li><p>Requires all rows for the same partition key to be co-located on the same node.</p>
</li>
<li><p>Requires sorting those rows by the orderBy() clause within each partition.</p>
</li>
</ul>
<p>If you omit partitionBy() (or use it with too broad a key), Spark treats the entire dataset as one partition, triggering a massive shuffle and global sort.</p>
<h4 id="heading-global-window-the-wrong-way">Global Window: The Wrong Way</h4>
<p>Let’s compute employee rankings by salary without partitioning:</p>
<pre><code class="lang-python"><span class="hljs-keyword">from</span> pyspark.sql.window <span class="hljs-keyword">import</span> Window
<span class="hljs-keyword">from</span> pyspark.sql.functions <span class="hljs-keyword">import</span> rank, col

window_spec = Window.orderBy(col(<span class="hljs-string">"salary"</span>).desc())

df_ranked = df.withColumn(<span class="hljs-string">"salary_rank"</span>, rank().over(window_spec))
</code></pre>
<p><strong>Simplified Logical Plan:</strong></p>
<pre><code class="lang-python">Window [rank() windowspecdefinition(orderBy=[salary DESC]) AS salary_rank]

└─ Sort [salary DESC], true

   └─ Exchange rangepartitioning(salary DESC, <span class="hljs-number">200</span>)

      └─ LogicalRDD [id, firstname, lastname, department, salary, age, hire_date, country]
</code></pre>
<p>Spark must shuffle and sort the entire dataset globally, a full sort across all rows. Every executor gets a slice of this single global range, and all data must move through the network.</p>
<h4 id="heading-partition-by-a-selective-key-the-better-way">Partition by a Selective Key: The Better Way</h4>
<p>Most analytics don’t need a global ranking. You likely want rankings within a department or group, not across the entire company.</p>
<pre><code class="lang-python">window_spec = Window.partitionBy(<span class="hljs-string">"department"</span>).orderBy(col(<span class="hljs-string">"salary"</span>).desc())

df_ranked = df.withColumn(<span class="hljs-string">"salary_rank"</span>, rank().over(window_spec))
</code></pre>
<p>Now Spark builds separate windows per department. Each partition’s data stays local, dramatically reducing shuffle size.</p>
<p><strong>Simplified Logical Plan:</strong></p>
<pre><code class="lang-python">Window [rank() windowspecdefinition(partitionBy=[department], orderBy=[salary DESC]) AS salary_rank]

└─ Sort [department ASC, salary DESC], false

   └─ Exchange hashpartitioning(department, <span class="hljs-number">200</span>)

      └─ LogicalRDD [id, firstname, lastname, department, salary, age, hire_date, country]
</code></pre>
<p>The Exchange now partitions data only by department. The shuffle boundary is narrower, fewer bytes transferred, fewer sort comparisons, and smaller spill risk.</p>
<h4 id="heading-real-world-benchmark-aws-glue-6">Real-World Benchmark: AWS Glue</h4>
<p>We can execute the windows function on the same 1 million row dataset:</p>
<pre><code class="lang-python">df = spark.createDataFrame(multiplied_data,
[<span class="hljs-string">"id"</span>, <span class="hljs-string">"firstname"</span>, <span class="hljs-string">"lastname"</span>, <span class="hljs-string">"department"</span>, <span class="hljs-string">"salary"</span>, <span class="hljs-string">"age"</span>,
 <span class="hljs-string">"hire_date"</span>, <span class="hljs-string">"country"</span>])

start = time.time()
window_global = Window.orderBy(col(<span class="hljs-string">"salary"</span>).desc())
df_global = df.withColumn(<span class="hljs-string">"salary_rank"</span>, rank().over(window_global))
df_global.count()
global_time = round(time.time() - start, <span class="hljs-number">2</span>)
print(<span class="hljs-string">f'global_time:<span class="hljs-subst">{global_time}</span>'</span>)

start = time.time()
window_local = Window.partitionBy(<span class="hljs-string">"department"</span>).orderBy(col(<span class="hljs-string">"salary"</span>).desc())
df_local = df.withColumn(<span class="hljs-string">"salary_rank"</span>, rank().over(window_local))
df_local.count()
local_time = round(time.time() - start, <span class="hljs-number">2</span>)
print(<span class="hljs-string">f'local_time:<span class="hljs-subst">{local_time}</span>'</span>)

spark.stop()
</code></pre>
<div class="hn-table">
<table>
<thead>
<tr>
<td><strong>Approach</strong></td><td><strong>Stage Count</strong></td><td><strong>Execution Time (1M rows)</strong></td><td><strong>Observation</strong></td></tr>
</thead>
<tbody>
<tr>
<td>Global Window (no partition)</td><td>5</td><td>30.21 s</td><td>Full dataset shuffle + global sort</td></tr>
<tr>
<td>Partitioned Window (by department)</td><td>3</td><td>1.74 s</td><td>Localized sort, fewer shuffle files</td></tr>
</tbody>
</table>
</div><p>Partitioning the window reduces shuffle data volume significantly and runtime as well. The difference grows exponentially as data scales.</p>
<h4 id="heading-under-the-hood-what-spark-actually-does-1">Under the Hood: What Spark Actually Does</h4>
<p>Each Window transformation adds a physical plan node like:</p>
<p>WindowExec [rank() windowspecdefinition(...)], frame=RangeFrame</p>
<p>This node is non-pipelined – it materializes input partitions before computing window metrics. Catalyst optimizer can’t push filters or projections inside WindowExec, which means:</p>
<ul>
<li><p>If you rank before filtering, Spark computes ranks for all rows.</p>
</li>
<li><p>If you order globally, Spark must sort everything before starting.</p>
</li>
</ul>
<p>That’s why window placement in your code matters almost as much as partition keys.</p>
<h4 id="heading-common-anti-patterns">Common Anti-Patterns:</h4>
<div class="hn-table">
<table>
<thead>
<tr>
<td><strong>Anti-Pattern</strong></td><td><strong>Why It Hurts</strong></td><td><strong>Fix</strong></td></tr>
</thead>
<tbody>
<tr>
<td>Missing partitionBy()</td><td>Global sort across dataset</td><td>Partition by key columns</td></tr>
<tr>
<td>Overly broad partition key</td><td>Creates too many small partitions</td><td>Use selective, not unique keys</td></tr>
<tr>
<td>Wide, unbounded window frame</td><td>Retains all rows in memory per key</td><td>Use bounded ranges (for example, rowsBetween(-3, 0))</td></tr>
<tr>
<td>Filtering after window</td><td>Computes unnecessary metrics</td><td>Filter first, then window</td></tr>
<tr>
<td>Multiple chained windows</td><td>Each triggers new sort</td><td>Combine window metrics in one spec</td></tr>
</tbody>
</table>
</div><p>Partition on selective keys to reduce shuffle volume, and avoid global windows that force full sorts and shuffles. Prefer bounded frames to keep state in memory and limit disk spill, and filter early while combining metrics to minimize unnecessary data flowing through WindowExec. Windows are powerful, but unbounded ones can silently crush performance. In Spark, partitioning isn’t optional. It’s the line between analytics and overhead.</p>
<h3 id="heading-scenario-10-incremental-aggregations-with-cache-and-persist">Scenario 10: Incremental Aggregations with Cache and Persist</h3>
<p>When multiple actions depend on the same expensive base computation, don’t recompute it every time. Materialize it once with cache() or persist(), then reuse it. Most Spark teams get this wrong in two ways:</p>
<ul>
<li><p>They never cache, so Spark recomputes long lineages (filters, joins, window ops) for every action.</p>
</li>
<li><p>They cache everything, blowing executor memory and making things worse.</p>
</li>
</ul>
<p>This scenario shows how to do it intelligently.</p>
<h4 id="heading-the-problem-recomputing-the-same-work-for-every-metric">The Problem: Recomputing the Same Work for Every Metric</h4>
<pre><code class="lang-python"><span class="hljs-keyword">from</span> pyspark.sql.functions <span class="hljs-keyword">import</span> col, avg, max <span class="hljs-keyword">as</span> max_, count

base = (
    df.filter(col(<span class="hljs-string">"department"</span>) == <span class="hljs-string">"Engineering"</span>)
      .filter(col(<span class="hljs-string">"country"</span>) == <span class="hljs-string">"USA"</span>)
      .filter(col(<span class="hljs-string">"salary"</span>) &gt; <span class="hljs-number">70000</span>)
)

avg_salary = base.groupBy(<span class="hljs-string">"department"</span>).agg(avg(<span class="hljs-string">"salary"</span>).alias(<span class="hljs-string">"avg_salary"</span>))
max_salary = base.groupBy(<span class="hljs-string">"department"</span>).agg(max_(<span class="hljs-string">"salary"</span>).alias(<span class="hljs-string">"max_salary"</span>))
cnt_salary = base.groupBy(<span class="hljs-string">"department"</span>).agg(count(<span class="hljs-string">"*"</span>).alias(<span class="hljs-string">"cnt"</span>))

Looks totally fine at a glance. But remember: Spark <span class="hljs-keyword">is</span> lazy.
Every time you trigger an action:

avg_salary.show()
max_salary.show()
cnt_salary.show()
</code></pre>
<p>Spark walks back to the same base definition and re-runs all filters and shuffles for each metric – unless you persist.</p>
<p>So instead of 1 filtered + shuffled dataset reused 3 times, you effectively get:</p>
<ul>
<li><p>3 jobs</p>
</li>
<li><p>3 scans / filter chains</p>
</li>
<li><p>3 groupBy shuffles</p>
</li>
</ul>
<p>for the same input slice.</p>
<p><strong>Simplified Logical Plan Shape (Without Cache):</strong></p>
<pre><code class="lang-python">HashAggregate [department], [avg/max/count]

└─ Exchange hashpartitioning(department)

   └─ Filter (department = <span class="hljs-string">'Engineering'</span> AND country = <span class="hljs-string">'USA'</span> AND salary &gt; <span class="hljs-number">70000</span>)

      └─ Scan ...
</code></pre>
<p>And Spark builds this three times. Even though the filter logic is identical, each action triggers a new job with:</p>
<ul>
<li><p>new stages,</p>
</li>
<li><p>new shuffles, and</p>
</li>
<li><p>new scans.</p>
</li>
</ul>
<p>On large datasets (hundreds of GBs), this is brutal.</p>
<h4 id="heading-the-better-approach-cache-the-shared-base">The Better Approach: Cache the Shared Base</h4>
<pre><code class="lang-python"><span class="hljs-keyword">from</span> pyspark.sql <span class="hljs-keyword">import</span> StorageLevel

base = (
    df.filter(col(<span class="hljs-string">"department"</span>) == <span class="hljs-string">"Engineering"</span>)
      .filter(col(<span class="hljs-string">"country"</span>) == <span class="hljs-string">"USA"</span>)
      .filter(col(<span class="hljs-string">"salary"</span>) &gt; <span class="hljs-number">70000</span>)
)

base = base.persist(StorageLevel.MEMORY_AND_DISK)

base.count()

avg_salary = base.groupBy(<span class="hljs-string">"department"</span>).agg(avg(<span class="hljs-string">"salary"</span>).alias(<span class="hljs-string">"avg_salary"</span>))
max_salary = base.groupBy(<span class="hljs-string">"department"</span>).agg(max_(<span class="hljs-string">"salary"</span>).alias(<span class="hljs-string">"max_salary"</span>))
cnt_salary = base.groupBy(<span class="hljs-string">"department"</span>).agg(count(<span class="hljs-string">"*"</span>).alias(<span class="hljs-string">"cnt"</span>))

avg_salary.show()
max_salary.show()
cnt_salary.show()

base.unpersist()
</code></pre>
<p>Now, the filters and initial scan run once, the results are cached, and all subsequent aggregates read from cached data instead of recomputing upstream logic.</p>
<p><strong>Logical Plan Shape (With Cache):</strong></p>
<p>Before materialization (base.count()), the plan still shows the lineage. Afterward, subsequent actions operate off the cached node.</p>
<pre><code class="lang-python">InMemoryRelation [department, salary, country, ...]

   └─ * Cached <span class="hljs-keyword">from</span>:

      Filter (department = <span class="hljs-string">'Engineering'</span> AND country = <span class="hljs-string">'USA'</span> AND salary &gt; <span class="hljs-number">70000</span>)

      └─ Scan parquet employees_large ...
</code></pre>
<p>Then:</p>
<pre><code class="lang-python">HashAggregate [department], [avg/max/count]

└─ InMemoryRelation [...]
</code></pre>
<p>One heavy pipeline, many cheap reads. The DAG becomes flatter:</p>
<ul>
<li><p>Expensive scan &amp; filter &amp; shuffle: once.</p>
</li>
<li><p>Cheap aggregations: N times from memory/disk.</p>
</li>
</ul>
<h4 id="heading-real-world-benchmark-aws-glue-7">Real-World Benchmark: AWS Glue</h4>
<pre><code class="lang-python">df = spark.createDataFrame(multiplied_data,
[<span class="hljs-string">"id"</span>, <span class="hljs-string">"firstname"</span>, <span class="hljs-string">"lastname"</span>, <span class="hljs-string">"department"</span>, <span class="hljs-string">"salary"</span>, <span class="hljs-string">"age"</span>,
<span class="hljs-string">"hire_date"</span>, <span class="hljs-string">"country"</span>])

base = (
    df.filter(col(<span class="hljs-string">"department"</span>) == <span class="hljs-string">"Engineering"</span>)
      .filter(col(<span class="hljs-string">"country"</span>) == <span class="hljs-string">"USA"</span>)
      .filter(col(<span class="hljs-string">"salary"</span>) &gt; <span class="hljs-number">85000</span>)
)


start = time.time()

avg_salary = base.groupBy(<span class="hljs-string">"department"</span>).agg(avg(<span class="hljs-string">"salary"</span>).alias(<span class="hljs-string">"avg_salary"</span>))
max_salary = base.groupBy(<span class="hljs-string">"department"</span>).agg(max_(<span class="hljs-string">"salary"</span>).alias(<span class="hljs-string">"max_salary"</span>))
cnt = base.groupBy(<span class="hljs-string">"department"</span>).agg(count(<span class="hljs-string">"*"</span>).alias(<span class="hljs-string">"emp_count"</span>))

print(<span class="hljs-string">"---- Without Cache ----"</span>)
avg_salary.show()
max_salary.show()
cnt.show()

no_cache_time = round(time.time() - start, <span class="hljs-number">2</span>)
print(<span class="hljs-string">f"Total time without cache: <span class="hljs-subst">{no_cache_time}</span> seconds"</span>)


<span class="hljs-keyword">from</span> pyspark.sql <span class="hljs-keyword">import</span> DataFrame

base_cached = base.persist(StorageLevel.MEMORY_AND_DISK)
base_cached.count()  <span class="hljs-comment"># materialize cache</span>

start = time.time()

avg_salary_c = base_cached.groupBy(<span class="hljs-string">"department"</span>).agg(avg(<span class="hljs-string">"salary"</span>).alias(<span class="hljs-string">"avg_salary"</span>))
max_salary_c = base_cached.groupBy(<span class="hljs-string">"department"</span>).agg(max_(<span class="hljs-string">"salary"</span>).alias(<span class="hljs-string">"max_salary"</span>))
cnt_c = base_cached.groupBy(<span class="hljs-string">"department"</span>).agg(count(<span class="hljs-string">"*"</span>).alias(<span class="hljs-string">"emp_count"</span>))

print(<span class="hljs-string">"---- With Cache ----"</span>)
avg_salary_c.show()
max_salary_c.show()
cnt_c.show()

cache_time = round(time.time() - start, <span class="hljs-number">2</span>)
print(<span class="hljs-string">f"Total time with cache: <span class="hljs-subst">{cache_time}</span> seconds"</span>)

<span class="hljs-comment"># Cleanup</span>
base_cached.unpersist()

print(<span class="hljs-string">"\n==== Summary ===="</span>)
print(<span class="hljs-string">f"Without cache: <span class="hljs-subst">{no_cache_time}</span>s | With cache: <span class="hljs-subst">{cache_time}</span>s"</span>)
print(<span class="hljs-string">"================="</span>)

spark.stop()
</code></pre>
<div class="hn-table">
<table>
<thead>
<tr>
<td><strong>Approach</strong></td><td><strong>Execution Time (1M rows)</strong></td></tr>
</thead>
<tbody>
<tr>
<td>Without Cache</td><td>30.75 s</td></tr>
<tr>
<td>With Cache</td><td>3.34 s</td></tr>
</tbody>
</table>
</div><h4 id="heading-under-the-hood-why-this-works"><strong>Under the Hood: Why This Works</strong></h4>
<p>Using cache() or persist() in Spark inserts an InMemoryRelation / InMemoryTableScanExec node so that expensive intermediate results are stored in executor memory (or memory+disk). This allows future jobs to reuse cached blocks instead of re-scanning sources or re-computing shuffles. This shortens downstream logical plans, reduces repeated shuffles, and lowers load on systems like S3, HDFS, or JDBC.</p>
<p>Without caching, every action replays the full lineage and Spark recomputes the data unless another operator or AQE optimization has already materialized part of it. But caching should not become “cache everything”. Rather, you should avoid caching very large DataFrames used only once, wide raw inputs instead of filtered/aggregated subsets, or long-lived caches that are never unpersisted.</p>
<p>A good rule of thumb is to cache only when the DataFrame is expensive to recompute (joins, filters, windows, UDFs), is used at least twice, and is reasonably sized after filtering so it can fit in memory or work with MEMORY_AND_DISK. Otherwise, allow Spark to recompute.</p>
<p>Conceptually, caching converts a tall, repetitive DAG such as repeated “HashAggregate → Exchange → Filter → Scan” sequences into a hub-and-spoke design where one heavy cached hub feeds multiple lightweight downstream aggregates.</p>
<p>When multiple actions depend on the same expensive computation, cache or persist the shared base to flatten the DAG, eliminate repeated scans and shuffles, and improve end-to-end performance. All this while being intentional by caching only when reuse is real, the data size is safe, and always calling <code>unpersist()</code> when done.</p>
<p>Don’t make Spark re-solve the same puzzle three times. Let it solve it once, remember the answer, and move on.</p>
<h3 id="heading-scenario-11-reduce-shuffles">Scenario 11: Reduce Shuffles</h3>
<p>Shuffles are Spark’s invisible tax collectors. Every time your data crosses executors, you pay in CPU, disk I/O, and network bandwidth.</p>
<p>Two of the most common yet misunderstood transformations that trigger or avoid shuffles are coalesce() and repartition(). Both change partition counts, but they do it in fundamentally different ways.</p>
<h4 id="heading-the-problem"><strong>The Problem</strong></h4>
<p>Writing <code>df_result = df.repartition(10)</code> and thinking “I’m just changing partitions so Spark won’t move data unnecessarily.” But that assumption is wrong. <code>repartition()</code> always performs a full shuffle, even when:</p>
<ul>
<li><p>You are reducing partitions (from 200 → 10), or</p>
</li>
<li><p>You are increasing partitions (from 10 → 200).</p>
</li>
</ul>
<p>In both cases, Spark redistributes every row across the cluster according to a new hash partitioning scheme. So even if your data is already partitioned optimally, repartition() will still reshuffle it, adding a stage boundary.</p>
<p><strong>Logical Plan:</strong></p>
<pre><code class="lang-python">Exchange hashpartitioning(...)

└─ LogicalRDD [...]
</code></pre>
<p>That Exchange node signals a wide dependency: Spark spills intermediate data to disk, transfers it over the network, and reloads it before the next stage. In short: repartition() = "new shuffle, no matter what."</p>
<h4 id="heading-the-better-approach-coalesce">The Better Approach: coalesce()</h4>
<p>If your goal is to reduce the number of partitions, for example, before writing results to S3 or Snowflake – use coalesce() instead.</p>
<p><code>df_result = df.coalesce(10)</code></p>
<p>coalesce() merges existing partitions locally within each executor, avoiding the costly reshuffle step. It uses a narrow dependency, meaning each output partition depends on one or more existing partitions <em>from the same node</em>.</p>
<p>Coalesce</p>
<p>└─ LogicalRDD [...]</p>
<ul>
<li><p>No Exchange.</p>
</li>
<li><p>No network shuffle.</p>
</li>
<li><p>Just local merges – fast and cheap.</p>
</li>
</ul>
<h4 id="heading-real-world-benchmark-aws-glue-8">Real-World Benchmark: AWS Glue</h4>
<pre><code class="lang-python">df = spark.createDataFrame(multiplied_data,
[<span class="hljs-string">"id"</span>, <span class="hljs-string">"firstname"</span>, <span class="hljs-string">"lastname"</span>, <span class="hljs-string">"department"</span>, <span class="hljs-string">"salary"</span>, <span class="hljs-string">"age"</span>, <span class="hljs-string">"hire_date"</span>, <span class="hljs-string">"country"</span>])

start = time.time()
df_repart = df.repartition(<span class="hljs-number">10</span>)
df_repart.count()
print(<span class="hljs-string">"Repartition time:"</span>, round(time.time() - start, <span class="hljs-number">2</span>), <span class="hljs-string">"sec"</span>)

start = time.time()
df_coalesced = df.coalesce(<span class="hljs-number">10</span>)
df_coalesced.count()
print(<span class="hljs-string">"Coalesce time:"</span>, round(time.time() - start, <span class="hljs-number">2</span>), <span class="hljs-string">"sec"</span>)

spark.stop()
</code></pre>
<div class="hn-table">
<table>
<thead>
<tr>
<td><strong>Operation</strong></td><td><strong>Plan Node</strong></td><td><strong>Shuffle Triggered</strong></td><td><strong>Glue Runtime</strong></td><td><strong>Observation</strong></td></tr>
</thead>
<tbody>
<tr>
<td>repartition(10)</td><td>Exchange</td><td>Yes</td><td>18.2 s</td><td>Full cluster reshuffle</td></tr>
<tr>
<td>coalesce(10)</td><td>Coalesce</td><td>No</td><td>1.99 s</td><td>Local partition merge only</td></tr>
</tbody>
</table>
</div><p>Even though both ended with 10 partitions, repartition() took significantly longer all because of the unnecessary shuffle.</p>
<h4 id="heading-why-this-matters">Why This Matters</h4>
<p>Each Exchange node in your logical plan creates a new stage in your DAG, meaning:</p>
<ul>
<li><p>Extra disk I/O</p>
</li>
<li><p>Extra serialization</p>
</li>
<li><p>Extra network transfer</p>
</li>
</ul>
<p>That’s why avoiding just one shuffle in a Glue ETL pipeline can save seconds to minutes per run, especially on wide datasets.</p>
<p><strong>When to use which:</strong></p>
<div class="hn-table">
<table>
<thead>
<tr>
<td><strong>Goal</strong></td><td><strong>Transformation</strong></td><td><strong>Reasoning</strong></td></tr>
</thead>
<tbody>
<tr>
<td>Increase parallelism for heavy groupBy or join</td><td>repartition()</td><td>Distributes data evenly across executors</td></tr>
<tr>
<td>Reduce file count before writing</td><td>coalesce()</td><td>Avoids shuffle, merges partitions locally</td></tr>
<tr>
<td>Rebalance skewed data before a join</td><td>repartition(by="key")</td><td>Enables better key distribution</td></tr>
<tr>
<td>Optimize output after aggregation</td><td>coalesce()</td><td>Prevents too many small output files</td></tr>
</tbody>
</table>
</div><h4 id="heading-aqe-and-auto-coalescing">AQE and Auto Coalescing</h4>
<p>You can enable Adaptive Query Execution (AQE) in AWS Glue 3.0+ to let Spark merge small shuffle partitions automatically:</p>
<p><code>spark.conf.set("spark.sql.adaptive.enabled", "true")</code></p>
<p><code>spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")</code></p>
<p>With AQE, Spark dynamically combines small partitions <em>after</em> shuffle to balance performance and I/O.</p>
<p>repartition() always triggers a shuffle, while coalesce() avoids shuffles and is ideal for local merges before writes. You should always inspect Exchange nodes to identify shuffle points. Note that in AWS Glue, avoiding even one shuffle can yield ~7× runtime improvement at the 1M-row scale. Finally, use AQE to enable dynamic partition coalescing in larger workflows.</p>
<h3 id="heading-scenario-12-know-your-shuffle-triggers">Scenario 12: Know Your Shuffle Triggers</h3>
<p>Much of Spark's performance comes from invisible data movement. Every shuffle boundary adds a new stage, a new write–read cycle, and sometimes minutes of extra execution time.</p>
<p>In Spark, any operation that requires rearranging data between partitions introduces a wide dependency, represented in the logical plan as an Exchange node.</p>
<p>Common shuffle triggers:</p>
<div class="hn-table">
<table>
<thead>
<tr>
<td><strong>Operation</strong></td><td><strong>Why It Shuffles</strong></td><td><strong>Plan Node</strong></td></tr>
</thead>
<tbody>
<tr>
<td>join()</td><td>Records with the same key must be co-located for matching</td><td>Exchange (on join keys)</td></tr>
<tr>
<td>groupBy() / agg()</td><td>Keys must gather to a single partition for aggregation</td><td>Exchange</td></tr>
<tr>
<td>distinct()</td><td>Spark must compare all values across partitions</td><td>Exchange</td></tr>
<tr>
<td>orderBy()</td><td>Requires global ordering of data</td><td>Exchange</td></tr>
<tr>
<td>repartition()</td><td>Explicit reshuffle for partition balancing</td><td>Exchange</td></tr>
</tbody>
</table>
</div><p>Each Exchange means a shuffle stage: Spark writes partition data to disk, transfers it over the network, and reads it back into memory on the next stage. That’s your hidden performance cliff.</p>
<pre><code class="lang-python">df_result = (
    df.groupBy(<span class="hljs-string">"department"</span>)
      .agg(sum(<span class="hljs-string">"salary"</span>).alias(<span class="hljs-string">"total_salary"</span>))
      .join(df.select(<span class="hljs-string">"department"</span>, <span class="hljs-string">"country"</span>)
            .distinct(), <span class="hljs-string">"department"</span>)
      .orderBy(<span class="hljs-string">"total_salary"</span>, ascending=<span class="hljs-literal">False</span>)
)

df_result.explain(<span class="hljs-string">"formatted"</span>)
</code></pre>
<p><strong>Logical Plan Simplified:</strong></p>
<pre><code class="lang-python">Sort [total_salary DESC]

└─ Exchange (<span class="hljs-keyword">global</span> sort)

   └─ SortMergeJoin [department]

      ├─ Exchange (groupBy shuffle)

      │   └─ HashAggregate (sum salary)

      └─ Exchange (distinct shuffle)

          └─ Aggregate (department, country)
</code></pre>
<p>We can see three Exchange nodes, one for the aggregation, one for the distinct join, and one for the global sort. That’s three separate shuffles, three full dataset transfers.</p>
<h4 id="heading-better-approach">Better Approach</h4>
<p>Whenever possible, combine wide transformations into a single stage before an action. For instance, you can compute aggregates and join results in one consistent shuffle domain:</p>
<pre><code class="lang-python">agg_df = df.groupBy(<span class="hljs-string">"department"</span>) \
    .agg(sum(<span class="hljs-string">"salary"</span>) \
    .alias(<span class="hljs-string">"total_salary"</span>))

country_df = df.select(<span class="hljs-string">"department"</span>, <span class="hljs-string">"country"</span>).distinct()

df_result = (
    agg_df.join(country_df, <span class="hljs-string">"department"</span>)
          .sortWithinPartitions(<span class="hljs-string">"total_salary"</span>, ascending=<span class="hljs-literal">False</span>)
)
</code></pre>
<p><strong>Logical Plan Simplified:</strong></p>
<pre><code class="lang-python">SortWithinPartitions [total_salary DESC]

└─ SortMergeJoin [department]

   ├─ Exchange (shared shuffle <span class="hljs-keyword">for</span> join)

   └─ Exchange (shared shuffle <span class="hljs-keyword">for</span> distinct)
</code></pre>
<p>Now Spark reuses shuffle partitions across compatible operations – only one shuffle boundary remains. The rest execute as narrow transformations.</p>
<h4 id="heading-real-world-benchmark-aws-glue-1m">Real-World Benchmark: AWS Glue (1M)</h4>
<pre><code class="lang-python">df = spark.createDataFrame(multiplied_data,
[<span class="hljs-string">"id"</span>, <span class="hljs-string">"firstname"</span>, <span class="hljs-string">"lastname"</span>, <span class="hljs-string">"department"</span>, <span class="hljs-string">"salary"</span>, <span class="hljs-string">"age"</span>, <span class="hljs-string">"hire_date"</span>, <span class="hljs-string">"country"</span>]).repartition(<span class="hljs-number">20</span>)

<span class="hljs-keyword">from</span> pyspark.sql.functions <span class="hljs-keyword">import</span> sum <span class="hljs-keyword">as</span> sum_

start = time.time()

dept_salary = (
    df.groupBy(<span class="hljs-string">"department"</span>)
      .agg(sum_(<span class="hljs-string">"salary"</span>).alias(<span class="hljs-string">"total_salary"</span>))
)

dept_country = (
    df.select(<span class="hljs-string">"department"</span>, <span class="hljs-string">"country"</span>)
      .distinct()
)

naive_result = (
    dept_salary.join(dept_country, <span class="hljs-string">"department"</span>, <span class="hljs-string">"inner"</span>)
               .orderBy(col(<span class="hljs-string">"total_salary"</span>).desc())
)

naive_count = naive_result.count()
naive_time = round(time.time() - start, <span class="hljs-number">2</span>)


start = time.time()

dept_country_once = (
    df.select(<span class="hljs-string">"department"</span>, <span class="hljs-string">"country"</span>)
      .distinct()
)

optimized = (
    df.groupBy(<span class="hljs-string">"department"</span>)
      .agg(sum_(<span class="hljs-string">"salary"</span>).alias(<span class="hljs-string">"total_salary"</span>))
      .join(dept_country_once, <span class="hljs-string">"department"</span>, <span class="hljs-string">"inner"</span>)
      .sortWithinPartitions(col(<span class="hljs-string">"total_salary"</span>).desc())
      <span class="hljs-comment"># local ordering, avoids extra global shuffle</span>
)

opt_count = optimized.count()
opt_time = round(time.time() - start, <span class="hljs-number">2</span>)

print(<span class="hljs-string">"Optimized result count:"</span>, opt_count)
print(<span class="hljs-string">"Optimized pipeline time:"</span>, opt_time, <span class="hljs-string">"sec"</span>)

print(<span class="hljs-string">"\nOptimized plan:"</span>)
optimized.explain(<span class="hljs-string">"formatted"</span>)

spark.stop()
</code></pre>
<div class="hn-table">
<table>
<thead>
<tr>
<td><strong>Pipeline</strong></td><td><strong># of Shuffles</strong></td><td><strong>Glue Runtime (sec)</strong></td><td><strong>Observation</strong></td></tr>
</thead>
<tbody>
<tr>
<td>Naive: groupBy + distinct + orderBy</td><td>3</td><td>28.99 s</td><td>Multiple wide stages</td></tr>
<tr>
<td>Optimized: combined agg + join + sortWithinPartitions</td><td>1</td><td>3.52 s</td><td>Single wide stage</td></tr>
</tbody>
</table>
</div><p>By merging compatible stages and using sortWithinPartitions() instead of global orderBy(), the job ran significantly faster on the same dataset, with fewer Exchange nodes and shorter lineage. Run df.explain and search for Exchange. Each one signals a full shuffle. You can also check Spark UI → SQL tab → Exchange Read/Write Size to see exactly how much data moved.</p>
<p>Every Exchange represents a shuffle, adding serialization, network I/O, and stage overhead, so avoid chaining wide operations back-to-back by combining them under a consistent partition key. Prefer sortWithinPartitions() over global orderBy() when ordering is local, monitor plan depth to catch consecutive wide dependencies, and note that in AWS Glue eliminating even one shuffle in a 1M-row job can significantly reduce runtime.</p>
<h3 id="heading-scenario-13-tune-parallelism-shuffle-partitions-amp-aqe">Scenario 13: Tune Parallelism: Shuffle Partitions &amp; AQE</h3>
<p>Most Spark jobs are either over-parallelized (thousands of tiny tasks doing almost nothing, flooding the driver and filesystem) or under-parallelized (a handful of huge tasks doing all the work, causing slow stages and skew-like behavior). Both waste resources. We can control this behavior using spark.sql.shuffle.partitions and Adaptive Query Execution (AQE).</p>
<p>By default (in many environments), the default value <code>spark.conf.get("spark.sql.shuffle.partitions")</code> is 200, meaning that every shuffle produces approximately 200 shuffle partitions, regardless of data size. That means every shuffle (groupBy, join, distinct, and so on) creates ~200 shuffle partitions. Whether this default is reasonable depends entirely on the workload:</p>
<ul>
<li><p>If you’re processing 2 GB, 200 partitions might be great.</p>
</li>
<li><p>If you’re processing 5 MB, 200 partitions is comedy – 200 tiny tasks, overhead &gt; work.</p>
</li>
<li><p>If you’re processing 2 TB, 200 partitions might be too few – tasks become huge and slow.</p>
</li>
</ul>
<h4 id="heading-example-a-the-default-plan-too-many-tiny-tasks">Example A: The Default Plan (Too Many Tiny Tasks)</h4>
<pre><code class="lang-python"><span class="hljs-keyword">from</span> pyspark.sql <span class="hljs-keyword">import</span> SparkSession
<span class="hljs-keyword">from</span> pyspark.sql.functions <span class="hljs-keyword">import</span> sum <span class="hljs-keyword">as</span> sum_

spark = SparkSession.builder.appName(<span class="hljs-string">"ParallelismExample"</span>).getOrCreate()

spark.conf.get(<span class="hljs-string">"spark.sql.shuffle.partitions"</span>)  <span class="hljs-comment"># '200'</span>

data = [
    (<span class="hljs-number">1</span>, <span class="hljs-string">"John"</span>, <span class="hljs-string">"Engineering"</span>, <span class="hljs-number">90000</span>),
    (<span class="hljs-number">2</span>, <span class="hljs-string">"Alice"</span>, <span class="hljs-string">"Engineering"</span>, <span class="hljs-number">85000</span>),
    (<span class="hljs-number">3</span>, <span class="hljs-string">"Bob"</span>, <span class="hljs-string">"Sales"</span>, <span class="hljs-number">75000</span>),
    (<span class="hljs-number">4</span>, <span class="hljs-string">"Eve"</span>, <span class="hljs-string">"Sales"</span>, <span class="hljs-number">72000</span>),
    (<span class="hljs-number">5</span>, <span class="hljs-string">"Grace"</span>, <span class="hljs-string">"HR"</span>, <span class="hljs-number">65000</span>),
]

df = spark.createDataFrame(data, [<span class="hljs-string">"id"</span>, <span class="hljs-string">"name"</span>, <span class="hljs-string">"department"</span>, <span class="hljs-string">"salary"</span>])

agg_df = df.groupBy(<span class="hljs-string">"department"</span>).agg(sum_(<span class="hljs-string">"salary"</span>).alias(<span class="hljs-string">"total_salary"</span>))
agg_df.explain(<span class="hljs-string">"formatted"</span>)
</code></pre>
<p>Even though there are only 3 departments, Spark will still create 200 shuffle partitions – meaning 200 tasks for 3 groups of data.</p>
<p><strong>Effect:</strong> Each task has almost nothing to do. Spark spends more time planning and scheduling than actually computing.</p>
<h4 id="heading-example-b-tuned-plan-balanced-parallelism">Example B: Tuned Plan (Balanced Parallelism)</h4>
<pre><code class="lang-python">spark.conf.set(<span class="hljs-string">"spark.sql.shuffle.partitions"</span>, <span class="hljs-string">"8"</span>)
agg_df = df.groupBy(<span class="hljs-string">"department"</span>).agg(sum_(<span class="hljs-string">"salary"</span>).alias(<span class="hljs-string">"total_salary"</span>))
agg_df.explain(<span class="hljs-string">"formatted"</span>)
</code></pre>
<p>Now Spark launches only <strong>8 partitions</strong> still parallelized, but not wasteful. Even in this small example, you can visually feel the difference: one logical change, but a completely leaner physical plan.</p>
<h4 id="heading-the-real-problem-static-tuning-doesnt-scale">The Real Problem: Static Tuning Doesn’t Scale</h4>
<p>In production, job sizes vary:</p>
<ul>
<li><p>Today: 10 GB</p>
</li>
<li><p>Tomorrow: 500 GB</p>
</li>
<li><p>Next week: 200 MB (sampling run)</p>
</li>
</ul>
<p>Manually changing shuffle partitions for each run is neither practical nor reliable. That’s where Adaptive Query Execution (AQE) steps in.</p>
<h4 id="heading-adaptive-query-execution-aqe-smarter-dynamic-parallelism">Adaptive Query Execution (AQE): Smarter, Dynamic Parallelism</h4>
<p>AQE doesn’t guess. It measures actual shuffle statistics at runtime and rewrites the plan <em>while the job is running.</em></p>
<pre><code class="lang-python">spark.conf.set(<span class="hljs-string">"spark.sql.adaptive.enabled"</span>, <span class="hljs-string">"true"</span>)
spark.conf.set(<span class="hljs-string">"spark.sql.adaptive.coalescePartitions.enabled"</span>, <span class="hljs-string">"true"</span>)
spark.conf.set(<span class="hljs-string">"spark.sql.adaptive.coalescePartitions.minPartitionSize"</span>, <span class="hljs-string">"64m"</span>)
spark.conf.set(<span class="hljs-string">"spark.sql.adaptive.coalescePartitions.maxPartitionSize"</span>, <span class="hljs-string">"256m"</span>)
</code></pre>
<div class="hn-table">
<table>
<thead>
<tr>
<td><strong>Configuration</strong></td><td><strong>Shuffle Partitions</strong></td><td><strong>Task Distribution</strong></td><td><strong>Observation</strong></td></tr>
</thead>
<tbody>
<tr>
<td>Default</td><td>200</td><td>200 tasks / 3 groups</td><td>Too granular, mostly idle</td></tr>
<tr>
<td>Tuned</td><td>8</td><td>8 tasks / 3 groups</td><td>Balanced execution</td></tr>
</tbody>
</table>
</div><p>AQE merges tiny shuffle partitions, or splits huge ones, based on <strong>real-time data metrics</strong>, not pre-set assumptions.</p>
<pre><code class="lang-python">df = spark.createDataFrame(multiplied_data,
    [<span class="hljs-string">"id"</span>, <span class="hljs-string">"firstname"</span>, <span class="hljs-string">"lastname"</span>, <span class="hljs-string">"department"</span>, <span class="hljs-string">"salary"</span>, <span class="hljs-string">"age"</span>,
     <span class="hljs-string">"hire_date"</span>, <span class="hljs-string">"country"</span>])

start = time.time()
agg_df = df.groupBy(<span class="hljs-string">"department"</span>).agg(sum_(<span class="hljs-string">"salary"</span>).alias(<span class="hljs-string">"total_salary"</span>))
agg_df.count()

print(<span class="hljs-string">f'Num Partitions df: <span class="hljs-subst">{df.rdd.getNumPartitions()}</span>'</span>)
print(<span class="hljs-string">f'Num Partitions aggdf: <span class="hljs-subst">{agg_df.rdd.getNumPartitions()}</span>'</span>)
print(<span class="hljs-string">"Execution time:"</span>, round(time.time() - start, <span class="hljs-number">2</span>), <span class="hljs-string">"sec"</span>)

spark.stop()
</code></pre>
<div class="hn-table">
<table>
<thead>
<tr>
<td><strong>Stage</strong></td><td><strong>Without AQE</strong></td><td><strong>With AQE</strong></td></tr>
</thead>
<tbody>
<tr>
<td>Stage 3 (Aggregation)</td><td>200 shuffle partitions, each reading KBs</td><td>8–12 coalesced partitions</td></tr>
<tr>
<td>Stage 4 (Join Output)</td><td>200 shuffle files</td><td>Merged into balanced partitions</td></tr>
<tr>
<td><strong>Result</strong></td><td>Many small tasks, high overhead</td><td>Fewer, balanced tasks, faster runtime</td></tr>
</tbody>
</table>
</div><h4 id="heading-understanding-the-plan"><strong>Understanding the Plan</strong></h4>
<p>Before AQE (static):</p>
<p><code>Exchange hashpartitioning(department, 200)</code></p>
<p>With AQE: AdaptiveSparkPlan (coalesced)</p>
<p><code>HashAggregate(keys=[department], functions=[sum(salary)])</code></p>
<p><code>Exchange hashpartitioning(department, 200)</code>  <em># runtime coalesced to 12</em></p>
<p>The logical plan remains the same, but the physical execution plan is rewritten during runtime. Spark intelligently reduces or merges shuffle partitions based on data volume.</p>
<p>Spark’s default 200 shuffle partitions often misfit real workloads. Static tuning may work for predictable pipelines, but fails with variable data. On the other hand, AQE uses shuffle statistics to dynamically coalesce partitions at runtime, use it with sensible ceilings (for example, 400 partitions) and always verify in the Spark UI to catch over-partitioning (many tasks reading KBs) or under-partitioning (few tasks reading GBs).</p>
<h3 id="heading-scenario-14-handle-skew-smartly">Scenario 14: Handle Skew Smartly</h3>
<p>In an ideal Spark world, all partitions contain roughly equal amounts of data. But real datasets are rarely that kind. If one key (say "USA", "2024", or "customer_123") holds millions of rows while others have only a few, Spark ends up with one or two massive partitions. Those partitions take disproportionately longer to process, leaving other executors idle. That’s data skew: the silent killer of parallelism.</p>
<p>You’ll often spot it in Spark UI:</p>
<ul>
<li><p>198 tasks finish quickly.</p>
</li>
<li><p>2 tasks take 10× longer.</p>
</li>
<li><p>Stage stays stuck at 98% for minutes.</p>
</li>
</ul>
<h4 id="heading-example-a-the-skew-problem">Example A: The Skew Problem</h4>
<pre><code class="lang-python"><span class="hljs-keyword">from</span> pyspark.sql <span class="hljs-keyword">import</span> SparkSession, functions <span class="hljs-keyword">as</span> F

spark = SparkSession.builder.appName(<span class="hljs-string">"DataSkewDemo"</span>).getOrCreate()

<span class="hljs-comment"># Create skewed dataset</span>
df = spark.range(<span class="hljs-number">0</span>, <span class="hljs-number">10000</span>).toDF(<span class="hljs-string">"id"</span>) \
    .withColumn(<span class="hljs-string">"department"</span>,
        F.when(F.col(<span class="hljs-string">"id"</span>) &lt; <span class="hljs-number">8000</span>, <span class="hljs-string">"Engineering"</span>)  <span class="hljs-comment"># 80% of data</span>
         .when(F.col(<span class="hljs-string">"id"</span>) &lt; <span class="hljs-number">9000</span>, <span class="hljs-string">"Sales"</span>)
         .otherwise(<span class="hljs-string">"HR"</span>)) \
    .withColumn(<span class="hljs-string">"salary"</span>, (F.rand() * <span class="hljs-number">100000</span>).cast(<span class="hljs-string">"int"</span>))

df.groupBy(<span class="hljs-string">"department"</span>).count().show()
</code></pre>
<p><img src="https://cdn.hashnode.com/res/hashnode/image/upload/v1769464257950/6963171b-92de-4721-9bb3-6951c68a2775.png" alt="6963171b-92de-4721-9bb3-6951c68a2775" class="image--center mx-auto" width="600" height="400" loading="lazy"></p>
<p>Spark will hash “Engineering” into just one reducer partition, making it heavier than others. That single task becomes a bottleneck, the shuffle has technically completed, but the stage waits for that one lagging task.</p>
<h4 id="heading-example-b-the-solution-salting-hot-keys">Example B: The Solution: Salting Hot Keys</h4>
<p>To handle skew, we the hot key (Engineering) into multiple pseudo-keys using a random salt. This redistributes that large partition across multiple reducers.</p>
<pre><code class="lang-python"><span class="hljs-keyword">from</span> pyspark.sql.functions <span class="hljs-keyword">import</span> rand, concat, lit, floor

salt_buckets = <span class="hljs-number">10</span>

df_salted = (
    df.withColumn(
        <span class="hljs-string">"department_salted"</span>,
        F.when(F.col(<span class="hljs-string">"department"</span>) == <span class="hljs-string">"Engineering"</span>,
            F.concat(F.col(<span class="hljs-string">"department"</span>), lit(<span class="hljs-string">"_"</span>),
                     (F.floor(rand() * salt_buckets))))
         .otherwise(F.col(<span class="hljs-string">"department"</span>))
    )
)

df_salted.groupBy(<span class="hljs-string">"department_salted"</span>).agg(F.avg(<span class="hljs-string">"salary"</span>))
</code></pre>
<p><img src="https://cdn.hashnode.com/res/hashnode/image/upload/v1769464242395/c4ec0bc6-67bf-488c-b619-7130ceef878e.png" alt="c4ec0bc6-67bf-488c-b619-7130ceef878e" class="image--center mx-auto" width="600" height="400" loading="lazy"></p>
<p>Now “Engineering” isn’t one hot key – it’s <strong>10 smaller keys</strong> like Engineering_0, Engineering_1, ..., Engineering_9. Each one goes to a separate reducer partition, enabling parallel processing.</p>
<h4 id="heading-example-c-post-aggregation-desalting">Example C: Post-Aggregation Desalting</h4>
<p>After aggregating, recombine salted keys to get the original department names:</p>
<pre><code class="lang-python">df_final = (
    df_salted.groupBy(<span class="hljs-string">"department_salted"</span>)
        .agg(F.avg(<span class="hljs-string">"salary"</span>).alias(<span class="hljs-string">"avg_salary"</span>))
        .withColumn(<span class="hljs-string">"department"</span>, F.split(F.col(<span class="hljs-string">"department_salted"</span>), <span class="hljs-string">"_"</span>)
            .getItem(<span class="hljs-number">0</span>))
        .groupBy(<span class="hljs-string">"department"</span>)
        .agg(F.avg(<span class="hljs-string">"avg_salary"</span>).alias(<span class="hljs-string">"final_avg_salary"</span>))
)
</code></pre>
<p><img src="https://cdn.hashnode.com/res/hashnode/image/upload/v1769464321049/6349c2c3-a0e3-4f9e-be3e-c59639004128.png" alt="6349c2c3-a0e3-4f9e-be3e-c59639004128" class="image--center mx-auto" width="600" height="400" loading="lazy"></p>
<h4 id="heading-when-to-use-salting">When to Use Salting</h4>
<p>Use salting when:</p>
<ul>
<li><p>You observe stage skew (one or few long tasks).</p>
</li>
<li><p>Shuffle read sizes vary drastically between tasks.</p>
</li>
<li><p>The skew originates from a few dominant key values.</p>
</li>
</ul>
<p>Avoid it when:</p>
<ul>
<li><p>The dataset is small (&lt; 1 GB).</p>
</li>
<li><p>You already use partitioning or bucketing keys with uniform distribution.</p>
</li>
</ul>
<p><strong>Alternative approaches:</strong></p>
<div class="hn-table">
<table>
<thead>
<tr>
<td><strong>Technique</strong></td><td><strong>Use Case</strong></td><td><strong>Pros</strong></td><td><strong>Cons</strong></td></tr>
</thead>
<tbody>
<tr>
<td>Salting (manual)</td><td>Skewed joins/aggregations</td><td>Full control</td><td>Requires extra logic to merge</td></tr>
<tr>
<td>Skew join hints (/*+ SKEWJOIN */)</td><td>Supported joins in Spark 3+</td><td>No extra columns needed</td><td>Works only on joins</td></tr>
<tr>
<td>Broadcast smaller side</td><td>One table ≪ other</td><td>Avoids shuffle on big side</td><td>Limited by broadcast size</td></tr>
<tr>
<td>AQE skew optimization</td><td>Spark 3.0+</td><td>Automatic handling</td><td>Needs AQE enabled</td></tr>
</tbody>
</table>
</div><h4 id="heading-glue-specific-tip">Glue-Specific Tip</h4>
<p>AWS Glue 3.0+ includes Spark 3.x, meaning you can also enable AQE’s built-in skew optimization:</p>
<p><code>spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")</code></p>
<p><code>spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "128m")</code></p>
<p>Spark will automatically detect large shuffle partitions and split them, effectively auto-salting hot keys at runtime. Data skew causes uneven shuffle sizes across tasks and can be detected in the Spark UI or via shuffle read/write metrics. Mitigate heavy-key skew with manual salting (recombined later) or rely on AQE skew join optimization for mild cases, and always validate improvements in the Spark UI SQL tab by checking “Shuffle Read Size.”</p>
<h3 id="heading-scenario-15-sort-efficiently-orderby-vs-sortwithinpartitions">Scenario 15: Sort Efficiently (orderBy vs sortWithinPartitions)</h3>
<p>Most Spark jobs need sorted data at some point – for window functions, for writing ordered files, or for downstream processing. The instinct is to reach for orderBy(). But those instincts cost you a full shuffle every single time.</p>
<h4 id="heading-the-problem-global-sort-when-you-dont-need-it">The Problem: Global Sort When You Don't Need It</h4>
<p>Let's say you want to write employee data partitioned by department, sorted by salary within each department:</p>
<pre><code class="lang-python"><span class="hljs-keyword">from</span> pyspark.sql.functions <span class="hljs-keyword">import</span> col

<span class="hljs-comment"># Naive approach: global sort</span>
df_sorted = df.orderBy(col(<span class="hljs-string">"department"</span>), col(<span class="hljs-string">"salary"</span>).desc())

df_sorted.write.partitionBy(<span class="hljs-string">"department"</span>).parquet(<span class="hljs-string">"s3://output/employees/"</span>)
</code></pre>
<p>This looks reasonable. You're sorting by department and salary, then writing partitioned files. Clean and simple. But here's what Spark actually does:</p>
<p><strong>Simplified Logical Plan:</strong></p>
<pre><code class="lang-python">Sort [department ASC, salary DESC], true

└─ Exchange rangepartitioning(department ASC, salary DESC, <span class="hljs-number">200</span>)

   └─ LogicalRDD [id, firstname, lastname, department, salary, age, hire_date, country]
</code></pre>
<p>That Exchange <code>rangepartitioning</code> is a full shuffle. So Spark:</p>
<ul>
<li><p>Samples the data to determine range boundaries</p>
</li>
<li><p>Redistributes every row across 200 partitions based on sort keys</p>
</li>
<li><p>Sorts each partition locally</p>
</li>
<li><p>Produces globally ordered output</p>
</li>
</ul>
<p>You just shuffled 1 million rows across the cluster to achieve global ordering – even though you're immediately partitioning by department on write, which destroys that global order anyway.</p>
<h4 id="heading-why-this-hurts">Why This Hurts</h4>
<p>Range partitioning for global sort is one of the most expensive shuffles Spark performs:</p>
<ul>
<li><p>Sampling overhead: Spark must scan data twice (once to sample, once to process)</p>
</li>
<li><p>Network transfer: Every row moves to a new executor based on range boundaries</p>
</li>
<li><p>Disk I/O: Shuffle files written and read from disk</p>
</li>
<li><p>Wasted work: Global ordering across departments is meaningless when you partition by department</p>
</li>
</ul>
<p>For 1M rows, this adds 8-12 seconds of pure shuffle overhead.</p>
<h4 id="heading-the-better-approach-sort-locally-within-partitions">The Better Approach: Sort Locally Within Partitions</h4>
<p>If you only need ordering <em>within</em> each department (or within each output partition), use sortWithinPartitions():</p>
<pre><code class="lang-python"><span class="hljs-comment"># Optimized approach: local sort only</span>
df_sorted = df.sortWithinPartitions(col(<span class="hljs-string">"department"</span>), col(<span class="hljs-string">"salary"</span>).desc())
df_sorted.write.partitionBy(<span class="hljs-string">"department"</span>).parquet(<span class="hljs-string">"s3://output/employees/"</span>)
</code></pre>
<p><strong>Simplified Logical Plan:</strong></p>
<pre><code class="lang-python">Sort [department ASC, salary DESC], false

└─ LogicalRDD [id, firstname, lastname, department, salary, age, hire_date, country]
</code></pre>
<ul>
<li><p>No Exchange.</p>
</li>
<li><p>No shuffle.</p>
</li>
<li><p>Just local sorting within existing partitions.</p>
</li>
</ul>
<p>Spark sorts each partition in-place, without moving data across the network. The false flag in the Sort node indicates this is a local sort, not a global one.</p>
<h4 id="heading-real-world-benchmark-aws-glue-9">Real-World Benchmark: AWS Glue</h4>
<p>Let's measure the difference on 1 million employee records: First, will start with Global Sort with orderBy:</p>
<pre><code class="lang-python">print(<span class="hljs-string">"\n--- Testing orderBy() (global sort) ---"</span>)

start = time.time()

df_global = df.orderBy(col(<span class="hljs-string">"department"</span>), col(<span class="hljs-string">"salary"</span>).desc())
df_global.write.mode(<span class="hljs-string">"overwrite"</span>).parquet(<span class="hljs-string">"/tmp/global_sort_output"</span>)

global_time = round(time.time() - start, <span class="hljs-number">2</span>)
print(<span class="hljs-string">f"orderBy() time: <span class="hljs-subst">{global_time}</span>s"</span>)
</code></pre>
<p>Local Sort:</p>
<pre><code class="lang-python">print(<span class="hljs-string">"\n--- Testing sortWithinPartitions() (local sort) ---"</span>)

start = time.time()

df_local = df.sortWithinPartitions(col(<span class="hljs-string">"department"</span>), col(<span class="hljs-string">"salary"</span>).desc())
df_local.write.mode(<span class="hljs-string">"overwrite"</span>).parquet(<span class="hljs-string">"/tmp/local_sort_output"</span>)

local_time = round(time.time() - start, <span class="hljs-number">2</span>)
print(<span class="hljs-string">f"sortWithinPartitions() time: <span class="hljs-subst">{local_time}</span>s"</span>)
</code></pre>
<div class="hn-table">
<table>
<thead>
<tr>
<td><strong>Approach</strong></td><td><strong>Plan Type</strong></td><td><strong>Execution Time (1M rows)</strong></td><td><strong>Observation</strong></td></tr>
</thead>
<tbody>
<tr>
<td>orderBy()</td><td>Exchange rangepartitioning</td><td>10.34 s</td><td>Full shuffle for global sort</td></tr>
<tr>
<td>sortWithinPartitions()</td><td>Local Sort (no Exchange)</td><td>2.18 s</td><td>In-place sorting, no network transfer</td></tr>
</tbody>
</table>
</div><p><strong>Physical Plan Differences:</strong></p>
<p><strong>orderBy() Physical Plan:</strong></p>
<pre><code class="lang-python">*(<span class="hljs-number">2</span>) Sort [department ASC NULLS FIRST, salary DESC NULLS LAST], true, <span class="hljs-number">0</span>

+- Exchange rangepartitioning(department ASC NULLS FIRST, salary DESC NULLS LAST, <span class="hljs-number">200</span>)

   +- *(<span class="hljs-number">1</span>) Project [id, firstname, lastname, department, salary, age, hire_date, country]

      +- *(<span class="hljs-number">1</span>) Scan ExistingRDD[id, firstname, lastname, department, salary, age, hire_date, country]
</code></pre>
<p>The Exchange rangepartitioning node marks the shuffle boundary. Spark must:</p>
<ul>
<li><p>Sample data to determine range splits</p>
</li>
<li><p>Redistribute all rows across executors</p>
</li>
<li><p>Sort within each range partition</p>
</li>
</ul>
<p><strong>sortWithinPartitions() Physical Plan:</strong></p>
<pre><code class="lang-python">*(<span class="hljs-number">1</span>) Sort [department ASC NULLS FIRST, salary DESC NULLS LAST], false, <span class="hljs-number">0</span>

+- *(<span class="hljs-number">1</span>) Project [id, firstname, lastname, department, salary, age, hire_date, country]

   +- *(<span class="hljs-number">1</span>) Scan ExistingRDD[id, firstname, lastname, department, salary, age, hire_date, country]
</code></pre>
<p>No Exchange. The false flag in Sort indicates local sorting only. Each partition is sorted independently, in parallel, without any data movement.</p>
<p><strong>When to Use Which:</strong></p>
<div class="hn-table">
<table>
<thead>
<tr>
<td><strong>Use Case</strong></td><td><strong>Method</strong></td><td><strong>Why</strong></td></tr>
</thead>
<tbody>
<tr>
<td>Writing partitioned files (Parquet, Delta)</td><td>sortWithinPartitions()</td><td>Partition-level order is sufficient; global order wasted</td></tr>
<tr>
<td>Window functions with ROWS BETWEEN</td><td>sortWithinPartitions()</td><td>Only need order within each window partition</td></tr>
<tr>
<td>Top-N per group (rank, dense_rank)</td><td>sortWithinPartitions()</td><td>Ranking is local to each partition key</td></tr>
<tr>
<td>Final output must be globally ordered</td><td>orderBy()</td><td>Need total order across all partitions</td></tr>
<tr>
<td>Downstream system requires strict ordering</td><td>orderBy()</td><td>For example, time-series data for sequential processing</td></tr>
<tr>
<td>Sorting before coalesce() for fewer output files</td><td>sortWithinPartitions()</td><td>Maintains order within merged partitions</td></tr>
</tbody>
</table>
</div><h4 id="heading-common-anti-pattern">Common Anti-Pattern</h4>
<pre><code class="lang-python">df.orderBy(<span class="hljs-string">"department"</span>, <span class="hljs-string">"salary"</span>) \
  .write.partitionBy(<span class="hljs-string">"department"</span>) \
  .parquet(<span class="hljs-string">"output/"</span>)
</code></pre>
<p><strong>Problem:</strong> You're globally sorting by department, then immediately partitioning by department. The global order is destroyed during partitioning.</p>
<p>Here’s the fix:</p>
<pre><code class="lang-python">df.sortWithinPartitions(<span class="hljs-string">"department"</span>, <span class="hljs-string">"salary"</span>) \
  .write.partitionBy(<span class="hljs-string">"department"</span>) \
  .parquet(<span class="hljs-string">"output/"</span>)
</code></pre>
<p>Or even better, if you're partitioning by department anyway:</p>
<pre><code class="lang-python"><span class="hljs-comment"># Best: let partitioning handle distribution</span>
df.write.partitionBy(<span class="hljs-string">"department"</span>) \
    .sortBy(<span class="hljs-string">"salary"</span>) \
    .parquet(<span class="hljs-string">"output/"</span>)
</code></pre>
<p>orderBy() triggers an expensive full shuffle using range partitioning, while sortWithinPartitions() sorts data locally without a shuffle and is often 4–5× faster. Use it when writing partitioned files, computing window functions with partitionBy(), or when order is needed only within groups, and reserve orderBy() strictly for true global ordering, because in most production ETL, the best sort is the one that doesn’t shuffle.</p>
<h2 id="heading-conclusion">Conclusion</h2>
<p>You began this handbook likely wondering why your Spark application was slow, and now you see that the answer was both clear and not so clear: your problem was never your Spark application, your configuration, or your version of Spark. It was your plan all along.</p>
<p>You now understand that Spark runs plans, not code, that transformation order affects logical plans, that shuffles generate stages and are key to runtime performance, and that examining your physical plans allows you to directly link your application performance issues back to your problematic line of code.</p>
<p>And you’ve seen this pattern repeat across many scenarios: problem, plan, solution, improved plan, and so forth, until optimization feels less like a dark art and more like a certainty.</p>
<p>This is the Spark optimization mindset: read plans before you write code, and challenge every single Exchange. Engineers who write high-performance Spark jobs minimize shuffles, filter early, project narrowly, deal with skew carefully, and validate everything via explain() and the Spark UI. Once you learn to read the plan, Spark performance becomes mechanical.</p>
 ]]>
                </content:encoded>
            </item>
        
            <item>
                <title>
                    <![CDATA[ How to Write Better Java Code using Pattern Matching and Sealed Classes ]]>
                </title>
                <description>
                    <![CDATA[ This article explores how you can improve your Java code quality using Pattern Matching and Sealed Classes. Java Pattern Matching allows you to write more concise and readable code when working with complex data structures. It simplifies extracting d... ]]>
                </description>
                <link>https://www.freecodecamp.org/news/write-better-java-code-pattern-matching-sealed-classes/</link>
                <guid isPermaLink="false">66d460f851f567b42d9f84b1</guid>
                
                    <category>
                        <![CDATA[ Java ]]>
                    </category>
                
                <dc:creator>
                    <![CDATA[ Sameer Shukla ]]>
                </dc:creator>
                <pubDate>Wed, 11 Oct 2023 15:48:42 +0000</pubDate>
                <media:content url="https://www.freecodecamp.org/news/content/images/2023/10/Pattern-Matching.png" medium="image" />
                <content:encoded>
                    <![CDATA[ <p>This article explores how you can improve your Java code quality using Pattern Matching and Sealed Classes.</p>
<p>Java Pattern Matching allows you to write more concise and readable code when working with complex data structures. It simplifies extracting data from data structures and performing operations on them.</p>
<p>Let's dive in and learn more about pattern matching and sealed classes.</p>
<h2 id="heading-what-is-pattern-matching-in-java">What is Pattern Matching in Java?</h2>
<p>Pattern Matching matches a value against a pattern that includes variables and conditions. If the value matches the pattern, the corresponding parts of the value are bound to the variables in the pattern. This allows for more readable and intuitive code.</p>
<p>There are two types of pattern matching: traditional and modern. Let's see the differences between the two in the following sections.</p>
<h3 id="heading-traditional-pattern-matching">Traditional Pattern Matching</h3>
<p>In traditional pattern matching, the <code>switch</code> statement is extended to support pattern matching by adding the <code>case</code> keyword with a pattern argument. The <code>switch</code> statement can match against a primitive type, wrappers, enums, and strings.</p>
<p>For example:</p>
<pre><code class="lang-java"><span class="hljs-function"><span class="hljs-keyword">private</span> <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> <span class="hljs-title">printGreetingBasedOnInput</span><span class="hljs-params">(String input)</span></span>{
        <span class="hljs-keyword">switch</span> (input){
            <span class="hljs-keyword">case</span> <span class="hljs-string">"hello"</span>:
                System.out.println(<span class="hljs-string">"Hi There"</span>);
                <span class="hljs-keyword">break</span>;
            <span class="hljs-keyword">case</span> <span class="hljs-string">"goodbye"</span>:
                System.out.println(<span class="hljs-string">"See you Later!"</span>);
                <span class="hljs-keyword">break</span>;
            <span class="hljs-keyword">case</span> <span class="hljs-string">"thank you"</span>:
                System.out.println(<span class="hljs-string">"You are welcome"</span>);
                <span class="hljs-keyword">break</span>;
            <span class="hljs-keyword">default</span>:
                System.out.println(<span class="hljs-string">"I don't understand"</span>);
                <span class="hljs-keyword">break</span>;
        }
    }
</code></pre>
<p>The Java method <code>printGreetingBasedOnInput</code> takes an <code>input</code> string and prints a corresponding greeting message based on its value using a switch-case statement. It covers cases for "hello," "goodbye," and "thank you," providing appropriate responses, and defaults to "I don't understand" for any other inputs.</p>
<h3 id="heading-modern-pattern-matching">Modern Pattern Matching</h3>
<p>In modern pattern matching, the <code>switch</code> statement can match against a variety of patterns like any type of objects, enums, or primitives. The <code>case</code> keyword is used to specify the pattern to match against.</p>
<pre><code class="lang-java"><span class="hljs-function"><span class="hljs-keyword">private</span> <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> <span class="hljs-title">printGreetingBasedOnInput</span><span class="hljs-params">(String input)</span></span>{
        <span class="hljs-keyword">switch</span> (input){
            <span class="hljs-keyword">case</span> <span class="hljs-string">"hello"</span> -&gt; System.out.println(<span class="hljs-string">"Hi There"</span>);
            <span class="hljs-keyword">case</span> <span class="hljs-string">"goodbye"</span> -&gt; System.out.println(<span class="hljs-string">"See you Later!"</span>);
            <span class="hljs-keyword">case</span> <span class="hljs-string">"thank you"</span> -&gt; System.out.println(<span class="hljs-string">"You are welcome"</span>);
            <span class="hljs-keyword">default</span> -&gt; System.out.println(<span class="hljs-string">"I don't understand"</span>);
        }
    }
</code></pre>
<p>This code snippet uses a more concise syntax. It simplifies the code by directly specifying the action to perform for each case label.</p>
<p>Before Java 16, we needed to check the object's type and then explicitly cast it to a variable. The enhanced <code>instanceof</code> operator introduced in Java 16 can both verify the type and perform an implicit cast to a variable, like in the example below:</p>
<pre><code class="lang-java"><span class="hljs-function"><span class="hljs-keyword">private</span> <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> <span class="hljs-title">printType</span><span class="hljs-params">(Object input)</span></span>{
        <span class="hljs-keyword">switch</span> (input) {
            <span class="hljs-keyword">case</span> Integer i -&gt; System.out.println(<span class="hljs-string">"Integer"</span>);
            <span class="hljs-keyword">case</span> String s -&gt; System.out.println(<span class="hljs-string">"String!"</span>);
            <span class="hljs-keyword">default</span> -&gt; System.out.println(<span class="hljs-string">"I don't understand"</span>);
        }
    }
</code></pre>
<p>The enhancement of <code>instanceof</code> becomes particularly valuable when working with pattern guards. Pattern guards are a way to make case statements in Java pattern matching more specific by including boolean expressions.</p>
<p>This allows for more fine-grained control over how patterns are matched and can make code more readable and expressive.</p>
<pre><code class="lang-java"><span class="hljs-function"><span class="hljs-keyword">private</span> <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> <span class="hljs-title">printType</span><span class="hljs-params">(Object input)</span></span>{                                                 
    <span class="hljs-keyword">switch</span> (input) {                                                                         
        <span class="hljs-keyword">case</span> Integer i &amp;&amp; i &gt; <span class="hljs-number">10</span> -&gt; System.out.println(<span class="hljs-string">"Integer is greater than 10"</span>);        
        <span class="hljs-keyword">case</span> String s &amp;&amp; !s.isEmpty()-&gt; System.out.println(<span class="hljs-string">"String!"</span>);                       
        <span class="hljs-keyword">default</span> -&gt; System.out.println(<span class="hljs-string">"Invalid Input"</span>);                                      
    }                                                                                        
}
</code></pre>
<p>Based on the examples shown above, you can hopefully see that Java pattern matching offers various benefits:</p>
<ul>
<li><p>It improves code readability by allowing for efficient matching of values against patterns and extracting data.</p>
</li>
<li><p>It reduces code duplication by handling different cases with a single piece of code.</p>
</li>
<li><p>It enhances type safety by enabling the matching of values against specific types.</p>
</li>
<li><p>Pattern guards can be used within cases to further enhance code readability and maintainability.</p>
</li>
</ul>
<h2 id="heading-what-are-sealed-classes-in-java">What Are Sealed Classes in Java?</h2>
<p>Sealed classes allow developers to restrict the set of classes that can extend or implement a given class or interface.</p>
<p>Sealed classes provide a way to create a hierarchy of classes or interfaces that can be extended or implemented by only a specified set of classes.</p>
<p>For example:</p>
<pre><code class="lang-java"><span class="hljs-keyword">public</span> sealed <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">Result</span> <span class="hljs-title">permits</span> <span class="hljs-title">Success</span>, <span class="hljs-title">Failure</span> </span>{
    <span class="hljs-keyword">protected</span> String response;

    <span class="hljs-function"><span class="hljs-keyword">public</span> String <span class="hljs-title">message</span><span class="hljs-params">()</span></span>{
        <span class="hljs-keyword">return</span> response;
    }
}

<span class="hljs-keyword">public</span> <span class="hljs-keyword">final</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">Success</span> <span class="hljs-keyword">extends</span> <span class="hljs-title">Result</span> </span>{

    <span class="hljs-meta">@Override</span>
    <span class="hljs-function"><span class="hljs-keyword">public</span> String <span class="hljs-title">message</span><span class="hljs-params">()</span> </span>{
        <span class="hljs-keyword">return</span> <span class="hljs-string">"Success!"</span>;
    }
}

<span class="hljs-keyword">public</span> <span class="hljs-keyword">final</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">Failure</span> <span class="hljs-keyword">extends</span> <span class="hljs-title">Result</span> </span>{

    <span class="hljs-meta">@Override</span>
    <span class="hljs-function"><span class="hljs-keyword">public</span> String <span class="hljs-title">message</span><span class="hljs-params">()</span> </span>{
        <span class="hljs-keyword">return</span> <span class="hljs-string">"Failure!"</span>;
    }
}
</code></pre>
<p>In this example, we've defined a sealed class called <code>Result</code> which can be extended by either <code>Success</code> or <code>Failure</code> classes.</p>
<p>Any other class that attempts to extend <code>Result</code> will result in a compilation error.</p>
<p>This provides a way to restrict the set of classes that can be used to extend <code>Result</code>, making the code more maintainable and extensible.</p>
<h3 id="heading-a-few-important-points-to-keep-in-mind">A few important points to keep in mind:</h3>
<ul>
<li><p>If a subclass wants to be a permitted subclass of a sealed class in Java, it must be defined in the same package as the sealed class. If the subclass is not defined in the same package, a compilation error will occur.</p>
</li>
<li><p>If a subclass is to be permitted to extend a sealed class in Java, it must have one of three modifiers: final, sealed, or non-sealed.</p>
</li>
<li><p>A sealed subclass must define the same or more restrictive set of permitted subclasses as its sealed superclass. Sealed subclasses must be either final or sealed. Non-sealed subclasses are not allowed as permitted subclasses of a sealed superclass and all permitted subclasses must belong to the same package as the sealed superclass.</p>
</li>
</ul>
<h2 id="heading-how-to-combine-java-pattern-matching-and-sealed-classes">How to Combine Java Pattern Matching and Sealed Classes</h2>
<p>You can use sealed classes and their permitted subclasses in switch statements with pattern matching.</p>
<p>This can make the code more concise and easier to read. Here's an example:</p>
<pre><code class="lang-java"><span class="hljs-function"><span class="hljs-keyword">private</span> <span class="hljs-keyword">static</span> String <span class="hljs-title">checkResult</span><span class="hljs-params">(Result result)</span></span>{                                     
    <span class="hljs-keyword">return</span> <span class="hljs-keyword">switch</span> (result) {                                                          
        <span class="hljs-keyword">case</span> Success s -&gt; s.message();                                                
        <span class="hljs-keyword">case</span> Failure f -&gt; f.message();                                                
        <span class="hljs-keyword">default</span> -&gt; <span class="hljs-keyword">throw</span> <span class="hljs-keyword">new</span> IllegalArgumentException(<span class="hljs-string">"Unexpected Input: "</span> + result); 
    };                                                                                
}
</code></pre>
<p>In the case of sealed classes, the compiler requires a default branch in pattern matching to ensure that all possible cases are covered.</p>
<p>Since sealed classes have a fixed set of permitted subclasses, it is possible to cover all cases with a finite number of case statements.</p>
<p>If a default branch is not included, it's possible to add a new subclass to the hierarchy in the future, which would not be covered by the existing case statements. This would result in a runtime error, which could be difficult to debug.</p>
<p>By requiring a default branch, the compiler ensures that the code is complete and covers all possible cases, even if new subclasses are added to the sealed class hierarchy in the future.</p>
<p>This helps to prevent runtime errors and makes the code more robust and maintainable.</p>
<p>If we modify the <code>Result</code> class to include a new subclass called <code>Pending</code> and we have not included that in our pattern matching, it will be covered by the default branch.</p>
<h2 id="heading-what-is-a-sealed-interface-in-java">What is a Sealed Interface in Java?</h2>
<p>When working with a Sealed Interface in Java, the compiler will not require a default branch in pattern matching if all cases are covered.</p>
<p>In case a branch is missing, the compiler will require a default branch to ensure that all possible cases are handled.</p>
<p>We are always required to include the default branch when working with Sealed Classes. Here's a code example:</p>
<pre><code class="lang-java"><span class="hljs-keyword">public</span> sealed <span class="hljs-class"><span class="hljs-keyword">interface</span> <span class="hljs-title">OtherResult</span> <span class="hljs-title">permits</span> <span class="hljs-title">Pending</span>, <span class="hljs-title">Timeout</span> </span>{
    <span class="hljs-function"><span class="hljs-keyword">void</span> <span class="hljs-title">message</span><span class="hljs-params">()</span></span>;
}

<span class="hljs-keyword">public</span> <span class="hljs-keyword">final</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">Pending</span> <span class="hljs-keyword">implements</span> <span class="hljs-title">OtherResult</span></span>{
    <span class="hljs-meta">@Override</span>
    <span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">message</span><span class="hljs-params">()</span> </span>{
        System.out.println(<span class="hljs-string">"Pending!"</span>);
    }
}

<span class="hljs-keyword">public</span> <span class="hljs-keyword">final</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">Timeout</span> <span class="hljs-keyword">implements</span> <span class="hljs-title">OtherResult</span></span>{
    <span class="hljs-meta">@Override</span>
    <span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">message</span><span class="hljs-params">()</span> </span>{
        System.out.println(<span class="hljs-string">"Timeout!"</span>);
    }
}

<span class="hljs-function"><span class="hljs-keyword">private</span> <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> <span class="hljs-title">checkResult</span><span class="hljs-params">(OtherResult result)</span></span>{
        <span class="hljs-keyword">switch</span> (result) {
            <span class="hljs-keyword">case</span> Pending p -&gt; p.message();
            <span class="hljs-keyword">case</span> Timeout t -&gt; t.message();
        };
    }
</code></pre>
<h2 id="heading-conclusion">Conclusion</h2>
<p>Here are some key takeaways of using Pattern Matching and Sealed Classes in Java code:</p>
<ul>
<li><p><strong>Improved readability</strong>: Pattern Matching and Sealed Classes can make code more expressive and easier to read, since they allow for more concise and intuitive syntax.</p>
</li>
<li><p><strong>Greater control over class hierarchies</strong>: Sealed Classes provide a way to control class hierarchies and ensure that only permitted subclasses can be used. This can improve code safety and maintainability.</p>
</li>
<li><p><strong>Implicit type safety</strong>: Pattern Matching and Sealed Classes provide implicit type safety, which can reduce the risk of runtime errors and make code easier to maintain.</p>
</li>
<li><p><strong>Reduced code duplication</strong>: Pattern Matching and Sealed Classes can reduce code duplication by allowing different cases to be handled in a single piece of code.</p>
</li>
<li><p><strong>Better code organization:</strong> Sealed Classes can help to organize code and reduce the complexity of class hierarchies by grouping related classes together.</p>
</li>
<li><p><strong>Improved maintainability:</strong> Pattern Matching and Sealed Classes can improve the maintainability of code by making it easier to understand and update, which can save time and effort in the long run.</p>
</li>
</ul>
<p>Thank you so much for reading.</p>
 ]]>
                </content:encoded>
            </item>
        
            <item>
                <title>
                    <![CDATA[ Functional Programming in Java ]]>
                </title>
                <description>
                    <![CDATA[ Functional programming (FP) is a programming paradigm. It emphasizes the use of pure functions that don't have side effects and always return the same output for a given input. This article explores how to implement FP concepts in Java, including vie... ]]>
                </description>
                <link>https://www.freecodecamp.org/news/functional-programming-in-java/</link>
                <guid isPermaLink="false">66d460ee868774922c88500e</guid>
                
                    <category>
                        <![CDATA[ Functional Programming ]]>
                    </category>
                
                    <category>
                        <![CDATA[ Java ]]>
                    </category>
                
                <dc:creator>
                    <![CDATA[ Sameer Shukla ]]>
                </dc:creator>
                <pubDate>Tue, 14 Mar 2023 17:34:55 +0000</pubDate>
                <media:content url="https://www.freecodecamp.org/news/content/images/2023/03/lambda.png" medium="image" />
                <content:encoded>
                    <![CDATA[ <p>Functional programming (FP) is a programming paradigm. It emphasizes the use of pure functions that don't have side effects and always return the same output for a given input.</p>
<p>This article explores how to implement FP concepts in Java, including viewing functions as first-class citizens, chaining, and composing them to create function pipelines.</p>
<p>We'll also discuss the technique of currying, which allows a function that takes multiple arguments to be transformed into a chain of functions that each take a single argument. This can simplify the use of complex functions and make them more reusable.</p>
<p>In this article, I'll show you examples of how to implement these concepts in Java using modern language features, like “java.util.function.Function”, “java.util.function.BiFunction”, and a user-defined TriFunction. We'll then see how to overcome its limitation using currying.</p>
<h2 id="heading-the-javautilfunctionfunction-interface">The <code>java.util.function.Function</code> interface</h2>
<p>The java.util.function.Function interface is a key component of the Java 8 functional programming API. The java.util.function.Function is a functional interface in Java that takes input of type 'T' and produces an output of type 'R'.</p>
<p>In functional programming, functions are first-class citizens, meaning that they can be passed around as values, stored in variables or data structures, and used as arguments or return values of other functions.</p>
<p>The function interface provides a way to define and manipulate functions in Java code.</p>
<p>The function interface represents a function that takes one input and produces one output. It has a single abstract method, <code>apply()</code>, which takes an argument of a specified type and returns a result of a specified type.</p>
<p>You can use the function interface to define new functions, as well as to manipulate and compose existing functions. For example, you can use it to convert a list of objects of one type to a list of objects of another type, or to apply a series of transformations to a stream of data.</p>
<p>One of the main benefits of the function interface is that it allows you to write code that is more concise and expressive. By defining functions as values and passing them around as arguments or return values, developers can create more modular and reusable code. Also, by using lambdas to define functions, Java code can be more expressive and easier to read.</p>
<p>You can think about <code>java.util.function.Function</code> like this:</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2023/03/image-98.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p><em>Function&lt;A, B&gt;</em></p>
<p>The java.util.function.BiFunction is also a functional interface in Java that takes two inputs 'T' and 'U' and produces an output of type 'R'. In short, BiFunction takes 2 inputs and returns an output:</p>
<pre><code class="lang-bash">BiFunction&lt;Integer, Integer, Integer&gt; sum = (a, b) -&gt; a + b;
</code></pre>
<p>You can think about <code>java.util.function.BiFunction</code> like this:</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2023/03/image-99.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p><em>BiFunction&lt;A, B, C&gt;</em></p>
<h2 id="heading-function-composition-and-chaining">Function Composition and Chaining</h2>
<p>Function chaining is a technique in functional programming that involves composing multiple functions into a single pipeline or chain.</p>
<p>In Java FP, function chaining is often used to transform data in a series of steps, where each step applies a specific transformation to the data and passes it on to the next step in the chain.</p>
<p>The function interface in Java provides a powerful tool for function chaining. Each function in the chain is defined as a separate instance of the Function interface. The output of each function becomes the input to the next function in the chain. This allows for a series of transformations to be applied to the data, one after another, until the final result is produced.</p>
<h3 id="heading-how-to-use-the-andthen-method">How to use the <code>andThen</code> method</h3>
<p>The "andThen()" method is a default method provided by the function interface in Java. This method takes a sequence of two functions and applies them in succession, using the output of the first function as the input to the second function. This chaining of the functions results in a new function that combines the behavior of both functions in a single transformation.</p>
<p>The syntax of <code>andThen</code> looks like this:</p>
<pre><code class="lang-java"><span class="hljs-keyword">default</span> &lt;V&gt; <span class="hljs-function">Function&lt;T, V&gt; <span class="hljs-title">andThen</span><span class="hljs-params">(Function&lt;? <span class="hljs-keyword">super</span> R, ? extends V&gt; after)</span></span>
</code></pre>
<p><img src="https://www.freecodecamp.org/news/content/images/2023/03/image-107.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p><em>andThen</em></p>
<p>We can break the example down into series of steps. Initially the "multiply" function will be executed and its output passed as input to the "add" function. Then the "logOutput" function is used to log the resulting output.</p>
<pre><code class="lang-java"><span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">FunctionChainingExample</span> </span>{

    <span class="hljs-keyword">static</span> Logger logger = Logger.getLogger(FunctionChainingExample.class.getName());

    <span class="hljs-keyword">private</span> <span class="hljs-keyword">static</span> Function&lt;Integer, Integer&gt; multiply = x -&gt; x * <span class="hljs-number">2</span>;

    <span class="hljs-keyword">private</span> <span class="hljs-keyword">static</span> Function&lt;Integer, Integer&gt; add = x -&gt; x + <span class="hljs-number">2</span>;

    <span class="hljs-keyword">private</span> <span class="hljs-keyword">static</span> Function&lt;Integer, Unit&gt; logOutput = x -&gt; {
        logger.info(<span class="hljs-string">"Data:"</span> + x);
        <span class="hljs-keyword">return</span> Unit.unit();
    };

    <span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">static</span> Unit <span class="hljs-title">execute</span><span class="hljs-params">(Integer input)</span> </span>{
        Function&lt;Integer, Unit&gt; pipeline = multiply
                                               .andThen(add)
                                               .andThen(logOutput);
        <span class="hljs-keyword">return</span> pipeline.apply(input);
    }

    <span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> <span class="hljs-title">main</span><span class="hljs-params">(String[] args)</span> </span>{
        execute(<span class="hljs-number">10</span>);
    }

}
</code></pre>
<p>The the above code snippet showcases the creation of three functions, namely multiply, add, and logOutput.</p>
<p>The function <code>multiply</code> accepts an integer input and produces an integer output, just like the <code>add</code> function. But, the <code>logOutput</code> function accepts an integer input and returns nothing (as represented by the Unit object, which implies the absence of a value).</p>
<p>The <code>execute</code> function chains the three functions together, where the output of multiply is utilized as the input for the add function, and the resulting output of add is passed to the logOutput function for logging purposes. The above example showcases function chaining using the <code>andThen</code> default method.</p>
<h3 id="heading-how-to-use-the-compose-method">How to use the <code>compose</code> method</h3>
<p>Unlike the <code>andThen</code> method, the <code>compose</code> method is another default method provided by the function interface in Java. It applies the first function to the output of the second function.</p>
<p>This means that the second function is applied to the input first, and then the first function is applied to the output of the second function. As a result, a chain of functions is created where the output of the second function becomes the input of the first function.</p>
<p>The compose function looks like this:</p>
<pre><code class="lang-java"><span class="hljs-function"><span class="hljs-keyword">default</span>  Function&lt;V, R&gt; <span class="hljs-title">compose</span><span class="hljs-params">(Function&lt;? <span class="hljs-keyword">super</span> V, ? extends T&gt; before)</span></span>
</code></pre>
<p><img src="https://www.freecodecamp.org/news/content/images/2023/03/image-118.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p><em>compose</em></p>
<p>When to use <code>andThen</code> vs <code>compose</code> depends on the order in which you want the functions to be applied.</p>
<p>If you want to apply the functions in the order they are defined, from left to right, then you should use the <code>andThen</code> method. This method applies the first function to the input, and then applies the second function to the output of the first function.</p>
<p>This is useful when you want to chain together functions that process the input data in a specific order.</p>
<p>On the other hand, if you want to apply the functions in the opposite order, from right to left, then you should use the <code>compose</code> method. This method applies the second function to the input, and then applies the first function to the output of the second function.</p>
<p>This is useful when you want to chain together functions that need to be applied in the opposite order of the way they are defined.</p>
<p>In general, you should use <code>andThen</code> when you want to apply the functions in the order they are defined, and use <code>compose</code> when you want to apply the functions in the opposite order. But the choice between the two methods ultimately depends on the specific requirements of your use case.</p>
<pre><code class="lang-bash"> public static Unit compose(Integer input) {
        Function&lt;Integer, Unit&gt; pipeline = logOutput
                                            .compose(add)
                                            .compose(multiply);
        <span class="hljs-built_in">return</span> pipeline.apply(input);
    }
</code></pre>
<p>In the <code>compose</code> example, the functions are executed in a right-to-left order. Firstly, <code>multiply</code> is executed and its output is passed to the <code>add</code> function. Then, the resulting output of the <code>add</code> function is passed to <code>logOutput</code> for logging purposes.</p>
<h3 id="heading-how-to-use-the-apply-function">How to use the <code>apply()</code> function</h3>
<p>The <code>apply()</code> method takes an input and returns a result. It is used to apply a function to an argument and compute a result.</p>
<p>The <code>apply()</code> function is a method of functional interfaces, such as the function interface, that takes an argument of a specified type and returns a result of a specified type. It is the single abstract method of these interfaces, which is required for them to be used as functional interfaces.</p>
<p>The <code>apply()</code> function defines the behavior of the functional interface. When an instance of a functional interface is created, the <code>apply()</code> function is implemented to define what the functional interface does when it is called with an argument.</p>
<pre><code class="lang-bash">Function&lt;Integer, Integer&gt; multiply = x -&gt; x * x;
        int result = multiply.apply(5);
</code></pre>
<p>In this code, we define a function called <code>multiply</code> that takes an integer argument and returns the square of that integer. We then call the <code>apply()</code> function on this function, passing in the integer value of 5. The <code>apply()</code> function executes the lambda expression defined in the <code>square</code> Function, and returns the result of the computation, which in this case is 25.</p>
<p>Let's understand by looking at a real-world example, FileReaderPipeline.</p>
<pre><code class="lang-bash">public class FileReaderPipeline {

    static Function&lt;String, String&gt; trim = String::trim;
    static Function&lt;String, String&gt; toUpperCase = String::toUpperCase;
    static Function&lt;String, String&gt; replaceSpecialCharacters = 
        str -&gt; str.replaceAll(<span class="hljs-string">"[^\\p{Alpha}]+"</span>, <span class="hljs-string">""</span>);

    static Function&lt;String, String&gt; pipeline = 
                                trim
                                  .andThen(toUpperCase)
                                  .andThen(replaceSpecialCharacters);

 public static void main(String[] args) throws IOException {
        try (BufferedReader reader = new BufferedReader(new     FileReader(<span class="hljs-string">"example.txt"</span>))) {
            String line;
            <span class="hljs-keyword">while</span> ((line = reader.readLine()) != null) {
                String result = pipeline.apply(line);
                System.out.println(result);
            }
        }
    }
}
</code></pre>
<p>The above code defines a FileReaderPipeline class, which reads lines from a file, processes them through a pipeline of functions, and prints the resulting output.</p>
<p>First, three functions <code>trim</code>, <code>toUpperCase</code>, and <code>replaceSpecialCharacters</code> are defined using method references and lambda expressions. <code>trim</code> removes leading and trailing whitespace from a String, <code>toUpperCase</code> converts the string to uppercase, and <code>replaceSpecialCharacters</code> removes any non-alphabetic characters from the string.</p>
<p>Next, a function pipeline is created using the <code>andThen</code> method, which chains the three functions together. The input to the pipeline function is a String, and the output is also a String. When a String is passed to the pipeline function, it first removes any leading or trailing whitespace, then converts the string to uppercase, and finally removes any non-alphabetic characters.</p>
<p>Finally, in the <code>main</code> method, the program reads lines from a file (in this example, "example.txt") using a <code>BufferedReader</code>. Each line is processed through the pipeline function using the <code>apply</code> method, which applies each function in the pipeline in sequence, and returns the resulting output. The resulting output is then printed to the console using <code>System.out.println</code>.</p>
<h2 id="heading-function-currying">Function Currying</h2>
<p>The Java util package contains two functional interfaces known as "Function&lt;A, B&gt;" and "BiFunction&lt;A, B, C&gt;". The <code>Function</code> interface takes a single input and produces an output, whereas the <code>BiFunction</code> interface takes two inputs and produces an output. Here is an illustration:</p>
<pre><code class="lang-bash"> Function&lt;String, String&gt; toUpper = str -&gt; str.toUpperCase();
 BiFunction&lt;Integer, Integer, Integer&gt; sum = (a, b) -&gt; a + b;
</code></pre>
<p>This is a clear constraint as there are only two functions, with one accepting a single input and the other accepting two inputs. Consequently, if we need a function that takes three inputs, we must construct our own customized function.</p>
<p>To this end, let's design a function that accepts three inputs and name it <code>TriFunction</code>. Here is an example implementation:</p>
<pre><code class="lang-bash">@FunctionalInterface
public interface TriFunction&lt;A, B, C, O&gt; {
    O apply(A a, B b, C c);

    default &lt;R&gt; TriFunction&lt;A, B, C, O&gt; andThen(TriFunction&lt;A, B, C, O&gt; after) {
        <span class="hljs-built_in">return</span> (A a, B b, C c) -&gt; after.apply(a,b,c);
    }
}
</code></pre>
<pre><code class="lang-bash">TriFunction&lt;Integer, Integer, Integer, Integer&gt; sum = (a, b, c) -&gt; a + b + c;                                                                
int result = sum.apply(1, 2, 3);
</code></pre>
<p>In situations where we require more parameters than what the available functions can accept, we can leverage currying to overcome this limitation.</p>
<p>We can refactor the previous function using currying, which involves decomposing a function that takes multiple arguments into a sequence of functions, each of which accepts only one argument. This is in line with the definition of currying, which is a technique employed in Functional Programming.</p>
<pre><code class="lang-bash">Function&lt;Integer, Function&lt;Integer, Function&lt;Integer, Integer&gt;&gt;&gt;     sumWithThreeParams = (a) -&gt; (b) -&gt; (c) -&gt; a + b + c;
                                                                       Function&lt;Integer, Function&lt;Integer, Function&lt;Integer, Function&lt;Integer, Integer&gt;&gt;&gt;&gt; sumWithFourParams = (a)-&gt;(b)-&gt;(c)-&gt;(d) -&gt; a + b + c + d;
</code></pre>
<p>The above declares a new <code>Function</code> named <code>sumWithThreeParams</code> that takes an <code>Integer</code> input. It returns a new <code>Function</code> that takes an <code>Integer</code> input and returns yet another new <code>Function</code> that takes an <code>Integer</code> input and returns an <code>Integer</code> output. Similarly a new Function named <code>sumWithFourParams</code> is created.</p>
<p>Note that the curried function <code>sumWithThreeParams</code> and <code>sumWithFourParams</code> allows us to partially apply arguments to the function, which can be useful in situations where we have some of the inputs available but not all of them</p>
<pre><code class="lang-bash">Function&lt;Integer, Function&lt;Integer, Integer&gt;&gt; partialSumWithTwoParams                                     = sumWithThreeParams.apply(5);
                                                                        Function&lt;Integer, Integer&gt; partialSumWithOneParams 
                                 = partialSumWithTwoParams.apply(10);

                                                                        int c = 15;
int result = partialSumWithOneParams.apply(c);
System.out.println(result); //30
</code></pre>
<h2 id="heading-conclusion">Conclusion</h2>
<p>In this article, we covered the implementation of functional programming (FP) concepts in Java, such as treating functions as first-class citizens, composing them into pipelines, and utilizing currying to simplify complex functions.</p>
<p>We have provided examples using modern language features like <code>java.util.function.Function</code> and <code>java.util.function.BiFunction</code>, as well as a user-defined <code>TriFunction</code> with currying to overcome its limitations.</p>
<p>By applying these techniques, developers can write more concise, reusable, and maintainable code in Java.</p>
<p>With the growing popularity of FP in the industry, understanding and implementing these concepts is becoming increasingly important for Java developers.</p>
<p>After reading this article, if you have found it useful and would like to explore more practical examples, <a target="_blank" href="https://github.com/sameershukla/JavaFPLearning">please visit the repository</a> and consider giving it a star.</p>
 ]]>
                </content:encoded>
            </item>
        
            <item>
                <title>
                    <![CDATA[ How to Monitor Python APIs using Pyctuator and SpringBootAdmin ]]>
                </title>
                <description>
                    <![CDATA[ Actuator endpoints help us monitor our services. By using actuators, we can gain a lot of information about what’s going on. SpringBoot has a number of in-built actuators, and it also allows us to create our own Actuator Endpoint. For frameworks writ... ]]>
                </description>
                <link>https://www.freecodecamp.org/news/how-to-monitor-python-apis-using-pyctuator-and-springbootadmin/</link>
                <guid isPermaLink="false">66d460f03dce891ac3a96810</guid>
                
                    <category>
                        <![CDATA[ api ]]>
                    </category>
                
                    <category>
                        <![CDATA[ Python ]]>
                    </category>
                
                    <category>
                        <![CDATA[ spring-boot ]]>
                    </category>
                
                <dc:creator>
                    <![CDATA[ Sameer Shukla ]]>
                </dc:creator>
                <pubDate>Fri, 02 Sep 2022 15:02:51 +0000</pubDate>
                <media:content url="https://www.freecodecamp.org/news/content/images/2022/09/Screen-Shot-2022-09-01-at-12.18.52-AM.png" medium="image" />
                <content:encoded>
                    <![CDATA[ <p>Actuator endpoints help us monitor our services. By using actuators, we can gain a lot of information about what’s going on.</p>
<p>SpringBoot has a number of in-built actuators, and it also allows us to create our own Actuator Endpoint.</p>
<p>For frameworks written in Python like Flask or FastAPI, we can incorporate actuators by integrating a library called Pyctuator.</p>
<p>In this article I am going to explain how to monitor applications written in FastAPI using the Pyctuator library. I'll also show you how to manage the actuator endpoints using the SpringBootAdmin server.</p>
<h2 id="heading-what-are-actuators">What are Actuators?</h2>
<p>We use actuators for monitoring and managing application usage in production. This usage information gets exposed to us via REST endpoints.</p>
<p>For example, we can access the application logs in production, environment details, and HTTP traces. And if something has gone wrong within the application, we can even access the applications “threaddump” for debugging purposes.</p>
<p>Here are some examples of few important actuator endpoints:</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/09/image-16.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p><em>Actuators</em></p>
<h2 id="heading-what-is-pyctuator">What is Pyctuator?</h2>
<p>Actuators become popular because of SpringBoot, but you can implement them in frameworks like FastAPI or Flask by integrating a module called Pyctuator.</p>
<p>Pyctuator is a Python Module, which is a partial implementation of SpringBoot Actuators. Pyctuator is managed by SolarEdge.</p>
<p>Some of the actuators supported by Pyctuators are:</p>
<ul>
<li><p>/health: This endpoint in Pyctuator has built-in monitoring for Redis and MySQL</p>
</li>
<li><p>/env</p>
</li>
<li><p>/metrics</p>
</li>
<li><p>/logfile</p>
</li>
<li><p>/threaddump</p>
</li>
<li><p>/httptrace</p>
</li>
<li><p>/loggers</p>
</li>
</ul>
<h2 id="heading-what-is-springbootadmin">What is SpringBootAdmin?</h2>
<p>Imagine a service having all these actuators for checking metrics, httptrace, threaddump and so on. It would be pretty tedious to invoke each one of them individually to check what’s going on within the service. And if we have many services and each one of them has its own actuator endpoints, this makes monitoring even more difficult.</p>
<p>That’s where you can use SpringBootAdmin to manage and monitor applications.</p>
<p>In a nutshell, SpringBootAdmin provides a nice dashboard for all the actuator endpoints in one place.</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/09/image-30.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p><em>Admin Dashboard</em></p>
<h2 id="heading-use-case-for-pyctuator">Use-Case for Pyctuator</h2>
<p>The use-case is straightforward: we are going to develop a RESTful service using FastAPI framework and configure the actuators in the service using the Pyctuator module.</p>
<p>The service has 3 endpoints as shown in the API-Docs (Swagger) below</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/09/image-31.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p><em>API-Docs</em></p>
<ul>
<li><p>GET /users: Return all the users that exists in the system.</p>
</li>
<li><p>POST /users: Create user</p>
</li>
<li><p>GET /users/{id}: Return a user with a given id</p>
</li>
</ul>
<p>You can find the code <a target="_blank" href="https://github.com/sameershukla/fastapi-pyctuator">here</a>.</p>
<p>In the User-Service we are going to enable the actuators using Pyctuator and monitor them using SpringBootAdmin dashboard. We are also going to explore how we can enhance the /health actuator for monitoring Redis.</p>
<p>For configuring the Actuators, first we need to install “pyctuator”. You can do that using the command “pip install pyctuator”.</p>
<p>After installation, simply instantiating the Pyctuator object is the entry point for seeing in-built actuators within a web-framework.</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/09/image-47.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p><em>Pyctuator Constructor</em></p>
<p>Before instantiating Pyctuator, if you access the /pyctuator endpoint you will get the “Not Found” message:</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/09/image-91.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p><em>Without Pyctuator Configuration</em></p>
<p>After instantiation, on accessing the /pyctuator endpoint you will see all the actuators enabled by default. This is because we have defined "pyactuator_endpoint_url" within Pyctuator.</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/09/image-93.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p><em>After Pyctuator Configuration</em></p>
<p>I strongly recommend going through the Pyctuator object as it explains what the mandatory and optional arguments are that we need to provide.</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/09/image-114.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p><em>Understanding Constructor parameters</em></p>
<p>The mandatory parameters are:</p>
<ul>
<li><p>app – instance of FastAPI or Flask</p>
</li>
<li><p>the application name – displayed in the info section in SpringBootAdmin</p>
</li>
<li><p>“pyctuator_endpoint_url” – what we have seen which returns all the actuator endpoints</p>
</li>
<li><p>“registration_url” – you will understand this one shortly.</p>
</li>
</ul>
<h2 id="heading-how-to-enhance-the-health-endpoint">How to Enhance the /health Endpoint</h2>
<p>You can enhance the /health endpoint in Pyctuator to monitor Redis or MySQL databases. Say you are using Redis in your application – then we need to use RedisHealthProvider and pass the redis instance to it.</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/09/image-174.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p><em>Redis Health</em></p>
<h2 id="heading-how-to-start-the-springbootadmin-server">How to Start the SpringBootAdmin Server</h2>
<p>To run the SpringBootAdmin server on local, we have two options: first, we can do it by creating SpringBootAdmin manually by going to start.spring.io and adding libraries.</p>
<p>Spring Web &amp; Spring Boot Admin (Server):</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/09/image-175.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p><em>Creating SpringBootAdmin</em></p>
<p>The second option is to run a Docker image:</p>
<pre><code class="lang-docker">docker <span class="hljs-keyword">run</span><span class="bash"> --rm --name spring-boot-admin -p 8080:8080 michayaak/spring-boot-admin:2.2.3-1</span>
</code></pre>
<p>Once the admin server is up, we need to provide the “registration_url” to the Pyctuator Constructor as discussed earlier.</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/09/image-178.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p><em>URL registration in SpringBootAdmin</em></p>
<p>The Admin Server is running on localhost:8080 and this should register our application to SpringBootAdmin. We can access all the actuator endpoints in one place:</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/09/image-181.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p><em>Dashboard</em></p>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/09/image-182.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p><em>All Configured Actuator Endpoints</em></p>
<p>I executed the /users endpoint few times and now HTTP Traces on the Admin side showcases all the Request-Response exchange details.</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/09/image-191.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p><em>HTTP Traces</em></p>
<h2 id="heading-wrapping-up">Wrapping Up</h2>
<p>Actuators are extremely helpful in monitoring and debugging applications in production. By accessing endpoints we can get details on thread dumps, heap dumps, HTTP Traces and so on.</p>
<p>Pyctuator simplifies having actuators in Python APIs to a great extent. By simply importing the library and defining an object, all the actuators are ready for us within our application.</p>
 ]]>
                </content:encoded>
            </item>
        
            <item>
                <title>
                    <![CDATA[ How to Perform Integration Testing using JUnit 5 and TestContainers with SpringBoot ]]>
                </title>
                <description>
                    <![CDATA[ TestContainers is a library that helps you run module-specific Docker containers to simplify Integration Testing. These Docker containers are lightweight, and once the tests are finished the containers get destroyed. In the article we are going to un... ]]>
                </description>
                <link>https://www.freecodecamp.org/news/integration-testing-using-junit-5-testcontainers-with-springboot-example/</link>
                <guid isPermaLink="false">66d460f437bd2215d1e245c9</guid>
                
                    <category>
                        <![CDATA[ Docker ]]>
                    </category>
                
                    <category>
                        <![CDATA[ Software Testing ]]>
                    </category>
                
                    <category>
                        <![CDATA[ spring-boot ]]>
                    </category>
                
                    <category>
                        <![CDATA[ Testing ]]>
                    </category>
                
                <dc:creator>
                    <![CDATA[ Sameer Shukla ]]>
                </dc:creator>
                <pubDate>Fri, 26 Aug 2022 15:46:39 +0000</pubDate>
                <media:content url="https://www.freecodecamp.org/news/content/images/2022/08/testcontainers-logo.png" medium="image" />
                <content:encoded>
                    <![CDATA[ <p>TestContainers is a library that helps you run module-specific Docker containers to simplify Integration Testing.</p>
<p>These Docker containers are lightweight, and once the tests are finished the containers get destroyed.</p>
<p>In the article we are going to understand what the TestContainers is and how it helps you write more reliable Tests.</p>
<p>We are also going to understand the important components (Annotations and Methods) of the library which help you write the Tests.</p>
<p>Finally, we will also learn to write a proper Integration Test in SpringBoot using the TestContainers library and its components.</p>
<h2 id="heading-limitations-of-testing-with-an-h2-in-memory-database">Limitations of Testing with an H2 In-Memory Database</h2>
<p>The most common approach to Integration Testing today is to use an H2 in-memory database. But there are certain limitations to this method.</p>
<p>First of all, say we are using version 8.0 of MySQL in production, but our integration tests are using H2. We can never execute our tests for the database version running on Production.</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/08/image-303.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p><em>SpringBoot app with MySQL DB and H2</em></p>
<p>‌Secondly, the test cases are less reliable because in production we are using an altogether different database and the tests are pointing to H2. The application may run into issues in production, but the integration tests may succeed.</p>
<p>I was trying to access my RESTful service on local and faced this error:</p>
<p>“<strong>Caused by: org.postgresql.util.PSQLException: FATAL: database "example_db" does not exist</strong>”.</p>
<p>It happened because of a permission issue, but the tests on local worked fine.</p>
<p>And finally, as documented <a target="_blank" href="http://h2database.com/html/features.html#compatibility">here</a>, H2 is compatible with other databases only up to a certain point. There are few areas where H2 is incompatible. If you need to use “nativeQueries” in a SpringBoot application, for example, then using H2 may cause problems.</p>
<h2 id="heading-enter-the-testcontainers-library">Enter the TestContainers Library</h2>
<p>By using TestContainers we can overcome the limitations of H2.</p>
<ul>
<li><p>Integration tests will point to the same version of the database as it’s in production. So we can tie our TestContainer Database Image to the same version running on production.</p>
</li>
<li><p>Integration tests are lot more reliable because both application and tests are using the same database type and version and there won't be any compatibility issues in Testcases.</p>
</li>
</ul>
<h2 id="heading-what-is-testcontainers">What is TestContainers?</h2>
<p>The TestContainers library is a wrapper API over Docker. When we write code to create a container behind the scenes it may be translated to some Docker command, for example‌:</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/08/image-283.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p><em>MySQLContainer Creation</em></p>
<p>This code may be translated to something like the following:</p>
<pre><code class="lang-docker">docker <span class="hljs-keyword">run</span><span class="bash"> -d --env MYSQL_DATABASE=example_db --env MYSQL_USER=<span class="hljs-built_in">test</span> --env MYSQL_PASSWORD=<span class="hljs-built_in">test</span> ‘mysql:latest’</span>
</code></pre>
<p>TestContainers has a method name “withCommand”. You use it to set the command that should be run inside the Docker container which confirms that TestContainers is a wrapper API over Docker.</p>
<p>TestContainers downloads the MySQL, Postgres, Kafka, Redis images and runs in a container. The MySQLContainer will run a MySQL Database in a container and the Testcases can connect to it on the local machine. Once the execution is over the Database will be gone – it just deletes it from the machine. In the Testcases we can start as many container images as we want.</p>
<p>TestContainers supports JUnit 4, JUnit 5 and Spock. If you go to the TestContainers.org website, just visit the QuickStart section that explains how to use it:</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/08/image-284.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p><em>TestContainers.org how to start with Test Framework</em></p>
<p>TestContainers supports almost every Database from MySQL and Postgres to CockroachDB. You can find more info about this on the TestContainers.org website under the Modules section:‌</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/08/image-285.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p><em>TestContainers support for Database Modules</em></p>
<p>‌TestContainers also supports Cloud Modules like GCloud Module and Azure Module as well. If your application is running on Google Cloud, then TestContainers has support for Cloud Spanner, Firestore, Datastore and so on.‌</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/08/image-286.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p><em>TestContainers support for GCloud Module</em></p>
<p>In the article so far, we have discussed only about Databases, but TestContainers supports various other components like Kafka, SOLR, Redis, and more.</p>
<h2 id="heading-how-to-use-the-testcontainers-library">How to Use the TestContainers Library</h2>
<p>In this article we are going to explore TestContainers with JUnit 5. To implement TestContainers we need to understand a few important TestContainers annotations, methods, and the libraries that we need to implement in our project.</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/08/image-288.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p><em>TestContainers libraries</em></p>
<h3 id="heading-annotations-in-testcontainers">Annotations in TestContainers</h3>
<p>Two important annotations are required in our Tests for TestContainers to work: @TestContainers and @Container.</p>
<p>@TestContainer is JUnit-Jupiter extension which automatically starts and stops the containers that are used in the tests. This annotation finds the fields that are marked with @Container and calls the specific Container life-cycle methods. Here, MySQLContainer life-cycle methods will be invoked.‌</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/08/image-289.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p><em>MySQLContainer</em></p>
<p>The MySQLContainer is declared as static because if we declare Container as static then a single container is started and it will be shared across all the test methods.</p>
<p>If it’s an instance variable, then a new container is created for each test method.</p>
<h2 id="heading-testcontainers-library-methods">TestContainers Library Methods</h2>
<p>There are few important methods in TestContainers library that you'll use in the tests. They're good to know before using the library.</p>
<ul>
<li><p><strong>withInitScript</strong>: Using ‘withInitScript’ we can execute the .SQL to define the schema, tables, and plus add the data into the database. In short, this method is used to run the .SQL to populate the database.</p>
</li>
<li><p><strong>withReuse</strong> (true): Using “withReuse” method we can enable the reuse of containers. This method works well in conjunction with enabling the “testcontainers.reuse.enable:true” property in the “.testcontainers.properties” file.</p>
</li>
<li><p><strong>start</strong>: we use this to start the container.</p>
</li>
<li><p><strong>withClasspathResourceMapping</strong>: This maps a resource (file or directory) on the classpath to a path inside the container. This will only work if you are running your tests outside a Docker container.</p>
</li>
<li><p><strong>withCommand</strong>: Set the command that should be run inside the Docker container.</p>
</li>
<li><p><strong>withExposedPorts</strong>: Used to set the port that the container listens on.</p>
</li>
<li><p><strong>withFileSystemBind</strong>: Used to map a file / directory from the local filesystem into the container.</p>
</li>
</ul>
<h2 id="heading-testcontainers-use-case">TestContainers Use Case</h2>
<p>In the example we'll look at now, the application will communicate only with the database and write the integration tests for it using TestContainers. Then we'll extend the use-case by implementing Redis in between.</p>
<p>If the data exists in the Redis cache it will be returned, otherwise it'll dip into the database for saving and retrieval based on the Key.</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/08/image-308.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p><em>Use-Case</em></p>
<p>The service is simple. It has 2 endpoints – the first one is to create a user and the second one is to find a user by email. If the user is found it is returned, otherwise we get a 404. The service class code looks something like this:‌</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/08/image-291.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p><em>Service Component</em></p>
<p>We are going to write tests for this class. You can find the entire codebase <a target="_blank" href="https://github.com/sameershukla/testcontainers_demo">here</a>:‌</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/08/image-292.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p><em>Test Class</em></p>
<p>The test class is marked with @TestContainers annotation which starts/stops the container. We use the @Container annotation to call the specific container's life-cycle methods.</p>
<p>Also, the “MySQLContainer” is declared as static because then a single container is started. Then it gets shared across all the test methods (we have already discussed the importance of these annotations).</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/08/image-293.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p><em>BeforeAll</em></p>
<p>Next we need to write a setup method marked with @BeforeAll, where we have enabled the “withReuse” method. This helps us reuse the existing containers. We are using the “withInitScript” method to execute the “.sql” file and then starting the container.‌</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/08/image-294.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p><em>Overwriting Properties</em></p>
<p>‌@DynamicPropertySource helps us override the properties declared in the properties file. We write this method to allow TestContainers to create the URL, username, and password on its own – otherwise we may face errors.</p>
<p>For example on removing username and password we may face an ‘Access denied’ error which may confuse us. So it’s better to allow TestContainer to assign these properties dynamically on its own.</p>
<p>That’s it – we are ready to run the Testcases:</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/08/image-295.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p><em>Test Cases</em></p>
<p>Execute @AfterAll to stop the container, otherwise it may keep running on your local machine if you don't explicitly stop it. ‌</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/08/image-296.png" alt="Image" width="600" height="400" loading="lazy"></p>
<h2 id="heading-how-to-use-genericcontainer">How to Use GenericContainer</h2>
<p>‌‌GenericContainer is the most flexible container. It makes it easy to run any container images within GenericContainer.</p>
<p>Now we have Redis in place, all we need to do in our Testcase is to spin up a GenericContainer with the Redis image.</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/08/image-297.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p><em>GenericContainer for Redis</em></p>
<p>Then we start the Generic Redis container in @BeforeAll and stop it with the @AfterAll tear down method.</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/08/image-298.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p><em>Starting Containers</em></p>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/08/image-299.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p><em>Stopping Containers</em></p>
<h2 id="heading-wrapping-up">Wrapping Up</h2>
<p>It's extremely easy to use TestContainers in our application to write better tests. The learning curve is not too steep and it has support for various different modules from a variety of databases like Kafka, Redis and others.</p>
<p>Writing tests using TestContainers makes our tests lot more reliable. The only flip side is that the tests are slow compared to H2. This is because H2 is in memory and TestContainers takes time to download the image, run the container, and execute the entire setup we have discussed in this article.</p>
 ]]>
                </content:encoded>
            </item>
        
            <item>
                <title>
                    <![CDATA[ How to Use Apache Airflow to Schedule and Manage Workflows ]]>
                </title>
                <description>
                    <![CDATA[ Apache Airflow is an open-source workflow management system that makes it easy to write, schedule, and monitor workflows. A workflow as a sequence of operations, from start to finish. The workflows in Airflow are authored as Directed Acyclic Graphs (... ]]>
                </description>
                <link>https://www.freecodecamp.org/news/how-to-use-apache-airflow-to-manage-workflows/</link>
                <guid isPermaLink="false">66d460f2a326133d12440a7c</guid>
                
                    <category>
                        <![CDATA[ apache ]]>
                    </category>
                
                    <category>
                        <![CDATA[ Python ]]>
                    </category>
                
                    <category>
                        <![CDATA[ workflow ]]>
                    </category>
                
                <dc:creator>
                    <![CDATA[ Sameer Shukla ]]>
                </dc:creator>
                <pubDate>Fri, 13 May 2022 15:11:17 +0000</pubDate>
                <media:content url="https://www.freecodecamp.org/news/content/images/2022/05/My-project--1-.jpg" medium="image" />
                <content:encoded>
                    <![CDATA[ <p>Apache Airflow is an open-source workflow management system that makes it easy to write, schedule, and monitor workflows.</p>
<p>A workflow as a sequence of operations, from start to finish. The workflows in Airflow are authored as Directed Acyclic Graphs (DAG) using standard Python programming.</p>
<p>You can configure when a DAG should start execution and when it should finish. You can also set up workflow monitoring through the very intuitive Airflow UI.</p>
<p>You can be up and running on Airflow in no time – it’s easy to use and you only need some basic Python knowledge. It's also completely open source.</p>
<p>Apache Airflow also has a helpful collection of operators that work easily with the Google Cloud, Azure, and AWS platforms.</p>
<p>In this article we are going to cover</p>
<ul>
<li><p>What are Directed Acyclic Graphs (DAGs)?</p>
</li>
<li><p>What are Operators?</p>
</li>
<li><p>How to Create your First DAG</p>
</li>
<li><p>A Use-Case for DAGs</p>
</li>
<li><p>How to Set Up Cloud Composer</p>
</li>
<li><p>How to Run the Pipeline on Composer</p>
</li>
</ul>
<h2 id="heading-what-are-directed-acyclic-graphs-or-dags">What are Directed Acyclic Graphs, or DAGs?</h2>
<p>DAGs, or Directed Acyclic Graphs, have nodes and edges. DAGs should not contain any loops and their edges should always be directed.</p>
<p>In short, a DAG is a data pipeline and each node in a DAG is a task. Some examples of nodes are downloading a file from GCS (Google Cloud Storage) to Local, applying business logic on a file using Pandas, querying the database, making a rest call, or uploading a file again to a GCS bucket.</p>
<h3 id="heading-visualizing-dags">Visualizing DAGs</h3>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/05/image-47.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p><em>Correct DAG with no loops</em></p>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/05/image-48.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p><em>Incorrect DAG with Loop</em></p>
<p>You can schedule DAGs in Airflow using the schedule_interval attribute. By default it’s "None" which means that the DAG can be run only using the Airflow UI.</p>
<p>You can schedule the DAG to run once every hour, every day, once a week, monthly, yearly or whatever you wish using the cron presets options (@hour, @daily, @weekly, @hourly, @monthly, @yearly).</p>
<p>If you need to run the DAG every 5 mins, every 10 mins, every day at 14:00, or once on a specific day like every Thursday at 10:00am, then you should use these cron-based expressions.</p>
<p>*/5 * * * * = Every 5 minutes</p>
<p>0 14 * * * = Every day at 14:00</p>
<h2 id="heading-what-are-operators">What are Operators?</h2>
<p>A DAG consists of multiple tasks. You can create tasks in a DAG using operators which are nodes in the graph.</p>
<p>There are various ready to use operators available in Airflow, such as:</p>
<ul>
<li><p>LocalFilesystemToGCSOperator – use it to upload a file from Local to GCS bucket.</p>
</li>
<li><p>PythonOperator – use it to execute Python callables.</p>
</li>
<li><p>functionEmailOperator – use it to send email.</p>
</li>
<li><p>SimpleHTTPOperator – use it to make an HTTP Request.</p>
</li>
</ul>
<h2 id="heading-how-to-create-your-first-dag">How to Create Your First DAG</h2>
<p>The example DAG we are going to create consists of only one operator (the Python operator) which executes a Python function.</p>
<pre><code class="lang-python"><span class="hljs-keyword">from</span> airflow <span class="hljs-keyword">import</span> DAG
<span class="hljs-keyword">from</span> datetime <span class="hljs-keyword">import</span> datetime
<span class="hljs-keyword">from</span> airflow.operators.python_operator <span class="hljs-keyword">import</span> PythonOperator

<span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">message</span>():</span>
    print(<span class="hljs-string">"First DAG executed Successfully!!"</span>)

<span class="hljs-keyword">with</span> DAG(dag_id=<span class="hljs-string">"FirstDAG"</span>, start_date=datetime(<span class="hljs-number">2022</span>,<span class="hljs-number">1</span>,<span class="hljs-number">23</span>), schedule_interval=<span class="hljs-string">"@hourly"</span>,
         catchup=<span class="hljs-literal">False</span>) <span class="hljs-keyword">as</span> dag:

    task = PythonOperator(
        task_id=<span class="hljs-string">"task"</span>,
        python_callable=message)

task
</code></pre>
<p>The first step is to import the necessary modules required for DAG development. The line <code>with DAG</code> is the DAG which is a data pipeline that has basic parameters like <code>dag_id</code>, <code>start_date</code>, and <code>schedule_interval</code>.</p>
<p>The <code>schedule_interval</code> is configured as @hourly which indicates that the DAG will run every hour.</p>
<p>The task in the DAG is to print a message in the logs. We have used the PythonOperator here. This operator is used to execute any callable Python function.</p>
<p>Once the execution is complete, we should see the message “First DAG executed Successfully” in the logs. We are going to execute all our DAGs on GCP Cloud Composer.</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/05/image-49.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p><em>Airflow UI</em></p>
<p>After successful execution, the message is printed on the logs:</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/05/image-50.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p><em>Logs</em></p>
<h2 id="heading-a-use-case-for-dags">A Use-Case for DAGs</h2>
<p>The use-case we are going to cover in this article involves a three-step process.</p>
<p>In step one, we will upload a .csv file in some input GCS bucket. This file should be processed by PythonOperator in the DAG. The function which will be executed by the PythonOperator consists of Pandas code, which represents how users can use Pandas code for transforming the data in the Airflow Data Pipeline.</p>
<p>In step two, we'll upload the transformed .csv file to another GCS bucket. This task will be handled by the GCSToGCSOperator.</p>
<p>Step three is to send the status email indicating the that the pipeline execution is completed which will be handled by the EmailOperator.</p>
<p>In this use-case we will also cover how to notify the team via email in case any step of the execution failed.</p>
<h2 id="heading-how-to-install-cloud-composer">How to Install Cloud Composer</h2>
<p>In GCP, Cloud Composer is a managed service built on Apache Airflow. Cloud Composer has default integration with other GCP Services such as GCS, BigQuery, Cloud Dataflow and so on.</p>
<p>First, we need to create the Cloud Composer Environment. So search for Cloud Composer on the search bar and click on "Create Environment" as shown below:</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/05/image-51.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p><em>Create Environment</em></p>
<p>In the Environments option, I am selecting the "Composer 1" option as we don’t need auto-scaling.</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/05/image-54.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p>Once we select the type of composer we need, we'll need to do some basic configuration just like in any GCP managed service ("Instance Name", "Location", and so on).</p>
<p>The node count here should always be 3 as GCP will setup the 3 services needed for Airflow.</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/05/image-56.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p>Once we're done with that, it'll set up an Airflow instance for us. To upload a DAG, we need to open the DAGs folder shown in ‘DAGs folder’ section.</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/05/image-57.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p><em>Airflow Instance</em></p>
<p>If you go to the "Kubernetes Engine" section on GCP, we can see 3 services up and running:</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/05/image-58.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p><em>Kubernetes Engine</em></p>
<p>All DAGs will reside in a bucket created by Airflow.</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/05/image-59.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p><em>Airflow Instance bucket for DAGs</em></p>
<h2 id="heading-how-to-create-and-run-the-pipeline-on-composer">How to Create and Run the Pipeline on Composer</h2>
<p>In the Pipeline, we have two buckets. input_csv will contain the csv which requires some transformation, and the transformed_csv bucket will be the location where the file will be uploaded once the transformation is done.</p>
<p>The entire pipeline code is the following:</p>
<pre><code class="lang-python"><span class="hljs-keyword">from</span> airflow <span class="hljs-keyword">import</span> DAG
<span class="hljs-keyword">from</span> datetime <span class="hljs-keyword">import</span> datetime
<span class="hljs-keyword">import</span> pandas <span class="hljs-keyword">as</span> pd

<span class="hljs-keyword">from</span> airflow.utils.email <span class="hljs-keyword">import</span> send_email
<span class="hljs-keyword">from</span> airflow.operators.python_operator <span class="hljs-keyword">import</span> PythonOperator
<span class="hljs-keyword">from</span> airflow.operators.email_operator <span class="hljs-keyword">import</span> EmailOperator
<span class="hljs-keyword">from</span> airflow.providers.google.cloud.transfers.gcs_to_gcs <span class="hljs-keyword">import</span> GCSToGCSOperator


<span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">transformation</span>():</span>
    trainDetailsDF = pd.read_csv(<span class="hljs-string">'gs://input_csv/Event_File_03_16_2022.csv'</span>)
    print(trainDetailsDF.head())


<span class="hljs-keyword">with</span> DAG(
        dag_id=<span class="hljs-string">"pipeline_demo"</span>,
        schedule_interval=<span class="hljs-string">"@hourly"</span>,
        start_date=datetime(<span class="hljs-number">2022</span>, <span class="hljs-number">1</span>, <span class="hljs-number">23</span>),
        catchup=<span class="hljs-literal">False</span>
) <span class="hljs-keyword">as</span> dag:
    buisness_logic_task = PythonOperator(
        task_id=<span class="hljs-string">'ApplyBusinessLogic'</span>,
        python_callable=transformation,
        dag=dag)

    upload_task = GCSToGCSOperator(
        task_id=<span class="hljs-string">'upload_task'</span>,
        source_bucket=<span class="hljs-string">'input_csv'</span>,
        destination_bucket=<span class="hljs-string">'transformed_csv'</span>,
        source_object=<span class="hljs-string">'Event_File_03_16_2022.csv'</span>,
        move_object=<span class="hljs-literal">True</span>,
        dag=dag
    )

    email_task = EmailOperator(
        task_id=<span class="hljs-string">"SendStatusEmail"</span>,
        depends_on_past=<span class="hljs-literal">True</span>,
        to=<span class="hljs-string">'youremail'</span>,
        subject=<span class="hljs-string">'Pipeline Status!'</span>,
        html_content=<span class="hljs-string">'&lt;p&gt;Hi Everyone, Process completed Successfully! &lt;p&gt;'</span>,
        dag=dag)

    buisness_logic_task &gt;&gt; upload_task &gt;&gt; email_task
</code></pre>
<p>In the first task, all we are doing is creating a DataFrame from the input file and printing the head elements. In the logs it looks like this:</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/05/image-60.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p><em>DataFrame Head</em></p>
<p>In the second task, GCSToGCSOperator, we have used the attribute move_object=True which will delete the file from the Source bucket.</p>
<p>Once we upload the file to the bucket, we can see that the DAG is being scheduled. The name of the DAG is "pipeline_demo".</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/05/image-61.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p><em>DAGs</em></p>
<p>Note that in case if you encounter any "import errors" after uploading or executing a DAG, something like this:</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/05/image-62.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p>You can upload these missing packages through the "PYPI Packages" option in GCP. This will update the environment after few minutes.</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/05/image-63.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p><em>Updating environment with missing Packages</em></p>
<p>To open an Airflow UI, Click on the "Airflow" link under Airflow webserver.</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/05/image-64.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p><em>Airflow Instance, click Airflow link to Open UI</em></p>
<p>The Airflow UI looks like this:</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/05/image-65.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p>Upon successful execution of Pipeline, here's what you should see:</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/05/image-66.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p>In order to send email if a task fails, you can use the on_failure_callback like this:</p>
<pre><code class="lang-python"><span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">notify_email</span>(<span class="hljs-params">contextDict, **kwargs</span>):</span>
    title = <span class="hljs-string">"Airflow alert: {task_name} Failed"</span>.format(**contextDict)
    body = <span class="hljs-string">"""
    Task Name :{task_name} Failed.&lt;br&gt;
    """</span>.format(**contextDict)
    send_email(<span class="hljs-string">'youremail’, title, body)



buisness_logic_task = PythonOperator(
    task_id='</span>ApplyBusinessLogic<span class="hljs-string">',
    python_callable=transformation,
    on_failure_callback=notify_email,
    dag=dag)</span>
</code></pre>
<p>We're doing the notification email configuration on composer through Sendgrid. Also, once you are done with Cloud Composer, don't forget to delete the instance as it cannot be stopped.</p>
<h2 id="heading-conclusion">Conclusion</h2>
<p>Apache Airflow is a fairly easy-to-use tool. There's also a lot of help now available on the internet and the community is growing.</p>
<p>GCP simplified working with Airflow a lot by creating a separate managed service for it.</p>
 ]]>
                </content:encoded>
            </item>
        
            <item>
                <title>
                    <![CDATA[ How to Use Google Dataproc – Example with PySpark and Jupyter Notebook ]]>
                </title>
                <description>
                    <![CDATA[ In this article, I'll explain what Dataproc is and how it works. Dataproc is a Google Cloud Platform managed service for Spark and Hadoop which helps you with Big Data Processing, ETL, and Machine Learning. It provides a Hadoop cluster and supports H... ]]>
                </description>
                <link>https://www.freecodecamp.org/news/what-is-google-dataproc/</link>
                <guid isPermaLink="false">66d460f6d1ffc3d3eb89de60</guid>
                
                    <category>
                        <![CDATA[ Google Cloud Platform ]]>
                    </category>
                
                    <category>
                        <![CDATA[ hadoop ]]>
                    </category>
                
                    <category>
                        <![CDATA[ Machine Learning ]]>
                    </category>
                
                    <category>
                        <![CDATA[ spark ]]>
                    </category>
                
                <dc:creator>
                    <![CDATA[ Sameer Shukla ]]>
                </dc:creator>
                <pubDate>Tue, 03 May 2022 15:14:31 +0000</pubDate>
                <media:content url="https://www.freecodecamp.org/news/content/images/2022/05/My-project.jpg" medium="image" />
                <content:encoded>
                    <![CDATA[ <p>In this article, I'll explain what Dataproc is and how it works.</p>
<p>Dataproc is a Google Cloud Platform managed service for Spark and Hadoop which helps you with Big Data Processing, ETL, and Machine Learning. It provides a Hadoop cluster and supports Hadoop ecosystems tools like Flink, Hive, Presto, Pig, and Spark.</p>
<p>Dataproc is an auto-scaling cluster which manages logging, monitoring, cluster creation of your choice and job orchestration. You'll need to manually provision the cluster, but once the cluster is provisioned you can submit jobs to Spark, Flink, Presto, and Hadoop.</p>
<p>Dataproc has implicit integration with other GCP products like Compute Engine, Cloud Storage, Bigtable, BigQuery, Cloud Monitoring, and so on. The jobs supported by Dataproc are MapReduce, Spark, PySpark, SparkSQL, SparkR, Hive and Pig.</p>
<p>Apart from that, Dataproc allows native integration with Jupyter Notebooks as well, which we'll cover later in this article.</p>
<p>In the article, we are going to cover:</p>
<ol>
<li><p>Dataproc cluster types and how to set Dataproc up</p>
</li>
<li><p>How to submit a PySpark job to Dataproc</p>
</li>
<li><p>How to create a Notebook instance and execute PySpark jobs through Jupyter Notebook.</p>
</li>
</ol>
<h2 id="heading-how-to-create-a-dataproc-cluster">How to Create a Dataproc Cluster</h2>
<p>Dataproc has three cluster types:</p>
<ol>
<li><p>Standard</p>
</li>
<li><p>Single Node</p>
</li>
<li><p>High Availability</p>
</li>
</ol>
<p>The Standard cluster can consist of 1 master and N worker nodes. The Single Node has only 1 master and 0 worker nodes. For production purposes, you should use the High Availability cluster which has 3 master and N worker nodes.</p>
<p>For our learning purposes, a single node cluster is sufficient which has only 1 master Node.</p>
<p>Creating Dataproc clusters in GCP is straightforward. First, we'll need to enable Dataproc, and then we'll be able to create the cluster.</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/04/image-185.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p><em>Start Dataproc cluster creation</em></p>
<p>When you click "Create Cluster", GCP gives you the option to select Cluster Type, Name of Cluster, Location, Auto-Scaling Options, and more.</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/04/image-199.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p><em>Parameters required for Cluster</em></p>
<p>Since we've selected the Single Node Cluster option, this means that auto-scaling is disabled as the cluster consists of only 1 master node.</p>
<p>The Configure Nodes option allows us to select the type of machine family like Compute Optimized, GPU and General-Purpose.</p>
<p>In this tutorial, we'll be using the General-Purpose machine option. Through this, you can select Machine Type, Primary Disk Size, and Disk-Type options.</p>
<p>The Machine Type we're going to select is n1-standard-2 which has 2 CPU’s and 7.5 GB of memory. The Primary Disk size is 100GB which is sufficient for our demo purposes here.</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/04/image-200.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p><em>Master Node Configuration</em></p>
<p>We've selected the cluster type of Single Node, which is why the configuration consists only of a master node. If you select any other Cluster Type, then you'll also need to configure the master node and worker nodes.</p>
<p>From the Customise Cluster option, select the default network configuration:</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/04/image-201.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p>Use the option "Scheduled Deletion" in case no cluster is required at a specified future time (or say after a few hours, days, or minutes).</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/05/5_ml_resize_x2_colored_toned_light_ai-1.jpg" alt="Image" width="600" height="400" loading="lazy"></p>
<p><em>Schedule Deleting Setting</em></p>
<p>Here, we've set "Timeout" to be 2 hours, so the cluster will be automatically deleted after 2 hours.</p>
<p>We'll use the default security option which is a Google-managed encryption key. When you click "Create", it'll start creating the cluster.</p>
<p>You can also create the cluster using the ‘gcloud’ command which you'll find on the ‘EQUIVALENT COMMAND LINE’ option as shown in image below.</p>
<p>And you can create a cluster using a POST request which you'll find in the ‘Equivalent REST’ option.</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/04/image-203.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p><em>gcloud and REST option for Cluster creation</em></p>
<p>After few minutes the cluster with 1 master node will be ready for use.</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/04/image-204.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p><em>Cluster Up and Running</em></p>
<p>You can find details about the VM instances if you click on "Cluster Name":</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/04/image-205.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/04/image-206.png" alt="Image" width="600" height="400" loading="lazy"></p>
<h2 id="heading-how-to-submit-a-pyspark-job">How to Submit a PySpark Job</h2>
<p>Let’s briefly understand how a PySpark Job works before submitting one to Dataproc. It’s a simple job of identifying the distinct elements from the list containing duplicate elements.</p>
<pre><code class="lang-python"><span class="hljs-comment">#! /usr/bin/python</span>

<span class="hljs-keyword">import</span> pyspark

<span class="hljs-comment">#Create List</span>
numbers = [<span class="hljs-number">1</span>,<span class="hljs-number">2</span>,<span class="hljs-number">1</span>,<span class="hljs-number">2</span>,<span class="hljs-number">3</span>,<span class="hljs-number">4</span>,<span class="hljs-number">4</span>,<span class="hljs-number">6</span>]

<span class="hljs-comment">#SparkContext</span>
sc = pyspark.SparkContext()

<span class="hljs-comment"># Creating RDD using parallelize method of SparkContext</span>
rdd = sc.parallelize(numbers)

<span class="hljs-comment">#Returning distinct elements from RDD</span>
distinct_numbers = rdd.distinct().collect()

<span class="hljs-comment">#Print</span>
print(<span class="hljs-string">'Distinct Numbers:'</span>, distinct_numbers)
</code></pre>
<p>Upload the .py file to the GCS bucket, and we'll need its reference while configuring the PySpark Job.</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/05/image-21.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p><em>Job GCS Location</em></p>
<p>Submitting jobs in Dataproc is straightforward. You just need to select “Submit Job” option:</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/04/image-209.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p><em>Job Submission</em></p>
<p>For submitting a Job, you'll need to provide the Job ID which is the name of the job, the region, the cluster name (which is going to be the name of cluster, "first-data-proc-cluster"), and the job type which is going to be PySpark.</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/04/image-223.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p><em>Parameters required for Job Submission</em></p>
<p>You can get the Python file location from the GCS bucket where the Python file is uploaded – you'll find it at gsutil URI.</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/05/image-24.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p>No other additional parameters are required, and we can now submit the job:</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/04/image-224.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p>After execution, you should be able to find the distinct numbers in the logs:</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/04/image-213.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p><em>Logs</em></p>
<h2 id="heading-how-to-create-a-jupyter-notebook-instance">How to Create a Jupyter Notebook Instance</h2>
<p>You can associate a notebook instance with Dataproc Hub. To do that, GCP provisions a cluster for each Notebook Instance. We can execute PySpark and SparkR types of jobs from the notebook.</p>
<p>To create a notebook, use the "Workbench" option like below:</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/05/image-26.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p>Make sure you go through the usual configurations like Notebook Name, Region, Environment (Dataproc Hub), and Machine Configuration (we're using 2 vCPUs with 7.5 GB RAM). We're using the default Network settings, and in the Permission section, select the "Service account" option.</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/04/image-225.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p><em>Parameters required for Notebook Cluster Creation</em></p>
<p>Click Create:</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/04/image-216.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p><em>Notebook Cluster Up &amp; Running</em></p>
<p>The "OPEN JUPYTYERLAB" option allows users to specify the cluster options and zone for their notebook.</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/04/image-226.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/04/image-227.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p>Once the provisioning is completed, the Notebook gives you a few kernel options:</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/05/image-27.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p>Click on PySpark which will allow you to execute jobs through the Notebook.</p>
<p>A SparkContext instance will already be available, so you don't need to explicitly create SparkContext. Apart from that, the program remains the same.</p>
<p><img src="https://www.freecodecamp.org/news/content/images/2022/04/image-220.png" alt="Image" width="600" height="400" loading="lazy"></p>
<p><em>Code snapshot on Notebook</em></p>
<h2 id="heading-conclusion">Conclusion</h2>
<p>Working on Spark and Hadoop becomes much easier when you're using GCP Dataproc. The best part is that you can create a notebook cluster which makes development simpler.</p>
 ]]>
                </content:encoded>
            </item>
        
    </channel>
</rss>
