<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:content="http://purl.org/rss/1.0/modules/content/"
    xmlns:atom="http://www.w3.org/2005/Atom" xmlns:media="http://search.yahoo.com/mrss/" version="2.0">
    <channel>
        
        <title>
            <![CDATA[ Real Time - freeCodeCamp.org ]]>
        </title>
        <description>
            <![CDATA[ Browse thousands of programming tutorials written by experts. Learn Web Development, Data Science, DevOps, Security, and get developer career advice. ]]>
        </description>
        <link>https://www.freecodecamp.org/news/</link>
        <image>
            <url>https://cdn.freecodecamp.org/universal/favicons/favicon.png</url>
            <title>
                <![CDATA[ Real Time - freeCodeCamp.org ]]>
            </title>
            <link>https://www.freecodecamp.org/news/</link>
        </image>
        <generator>Eleventy</generator>
        <lastBuildDate>Mon, 15 Jun 2026 12:59:44 +0000</lastBuildDate>
        <atom:link href="https://www.freecodecamp.org/news/tag/real-time/rss.xml" rel="self" type="application/rss+xml" />
        <ttl>60</ttl>
        
            <item>
                <title>
                    <![CDATA[ How to Build a Market Pulse App in Python: Real-Time & Multi-Asset ]]>
                </title>
                <description>
                    <![CDATA[ A “market pulse” screen is basically the tab you keep open when you don’t want to stare at charts all day. It tells you what’s moving right now, what’s unusually volatile, and which names are starting ]]>
                </description>
                <link>https://www.freecodecamp.org/news/how-to-build-a-market-pulse-app-in-python-real-time-multi-asset/</link>
                <guid isPermaLink="false">69d3c38540c9cabf4435ed16</guid>
                
                    <category>
                        <![CDATA[ Python ]]>
                    </category>
                
                    <category>
                        <![CDATA[ stockmarket ]]>
                    </category>
                
                    <category>
                        <![CDATA[ Real Time ]]>
                    </category>
                
                <dc:creator>
                    <![CDATA[ Nikhil Adithyan ]]>
                </dc:creator>
                <pubDate>Mon, 06 Apr 2026 14:30:29 +0000</pubDate>
                <media:content url="https://cdn.hashnode.com/uploads/covers/5e1e335a7a1d3fcc59028c64/8fd6bb83-0418-41e4-9b93-a3c81325033a.png" medium="image" />
                <content:encoded>
                    <![CDATA[ <p>A “market pulse” screen is basically the tab you keep open when you don’t want to stare at charts all day. It tells you what’s moving right now, what’s unusually volatile, and which names are starting to move together.</p>
<p>Not in a research-paper way. In a product way. The kind of feed you could drop into a media platform or investing app and have it feel instantly useful.</p>
<p>In this tutorial, we’ll build a minimal version of that in Python using Streamlit. The dashboard has three parts:</p>
<ul>
<li><p>a Pulse table that ranks the biggest movers across your watchlist</p>
</li>
<li><p>a Stress feed that emits event-style alerts instead of raw tick spam</p>
</li>
<li><p>a small Correlation card that updates based on the current volatility regime</p>
</li>
</ul>
<p>The data for the dashboard will be powered by EODHD’s real-time WebSocket feeds.</p>
<p>Quick expectation setting: this isn’t TradingView, and it’s not a backtester. It’s a lightweight real-time system that streams prices, maintains rolling buffers, computes a few live metrics, and turns them into UI-ready widgets.</p>
<p>The goal here is to build something you can actually ship as a “market pulse” feature, not a one-off notebook demo.</p>
<h2 id="heading-table-of-contents">Table of Contents</h2>
<ol>
<li><p><a href="#heading-prerequisites">Prerequisites</a></p>
</li>
<li><p><a href="#heading-the-app-were-building">The App We’re Building</a></p>
<ul>
<li><p><a href="#heading-pulse-table">Pulse Table</a></p>
</li>
<li><p><a href="#heading-stress-feed">Stress Feed</a></p>
</li>
<li><p><a href="#heading-correlation-card">Correlation Card</a></p>
</li>
<li><p><a href="#heading-control-panel">Control Panel</a></p>
</li>
</ul>
</li>
<li><p><a href="#heading-the-app-architecture">The App Architecture</a></p>
<ul>
<li><a href="#heading-code-file-structure">Code File Structure</a></li>
</ul>
</li>
<li><p><a href="#heading-streaming-layer-one-queue-many-feeds">Streaming Layer: One Queue, Many Feeds</a></p>
<ul>
<li><p><a href="#heading-feedspy"><code>feeds.py</code></a></p>
</li>
<li><p><a href="#heading-why-the-watchlist-is-curated">Why the Watchlist is Curated</a></p>
</li>
</ul>
</li>
<li><p><a href="#heading-rolling-state-buffers-returns-volatility-trend">Rolling State: Buffers, Returns, Volatility, Trend</a></p>
<ul>
<li><a href="#heading-pulse-storepy"><code>pulse_store.py</code></a></li>
</ul>
</li>
<li><p><a href="#heading-turning-live-stats-into-events-stress-feed">Turning Live Stats Into Events (Stress Feed)</a></p>
<ul>
<li><a href="#heading-eventspy"><code>events.py</code></a></li>
</ul>
</li>
<li><p><a href="#heading-regime-tagging-small-but-important">Regime Tagging (Small but Important)</a></p>
<ul>
<li><p><a href="#heading-add-this-to-pulse-storepy">Add This to <code>pulse_store.py</code></a></p>
</li>
<li><p><a href="#heading-attach-regime-inside-snapshot-in-pulse-storepy">Attach Regime Inside <code>snapshot()</code> in <code>pulse_store.py</code></a></p>
</li>
</ul>
</li>
<li><p><a href="#heading-correlation-card-stocks-only-regime-aware-window">Correlation Card (Stocks Only, Regime-aware Window)</a></p>
<ul>
<li><a href="#heading-correlationpy"><code>correlation.py</code></a></li>
</ul>
</li>
<li><p><a href="#heading-building-the-streamlit-app">Building the Streamlit App</a></p>
</li>
<li><p><a href="#heading-final-output">Final Output</a></p>
</li>
<li><p><a href="#heading-what-id-improve-next">What I’d Improve Next</a></p>
</li>
<li><p><a href="#heading-conclusion">Conclusion</a></p>
</li>
</ol>
<h2 id="heading-prerequisites">Prerequisites</h2>
<p>Before we get into the build, make sure you have a few basics ready.</p>
<p>You should be comfortable running Python scripts, installing packages with <code>pip</code>, and working with a small multi-file project.</p>
<p>This tutorial isn't notebook-based. We’ll be building a lightweight real-time app with separate files for streaming, state, events, correlation logic, and the Streamlit UI.</p>
<p>You’ll need Python 3.10+ and these packages installed:</p>
<pre><code class="language-shell">pip install streamlit pandas websockets
</code></pre>
<p>You’ll also need an <a href="https://eodhd.com/">EODHD API key</a> with access to their real-time WebSocket feeds, since the dashboard depends on live stock, forex, and crypto data.</p>
<p>To follow along smoothly, create these files in your project folder before starting:</p>
<pre><code class="language-plaintext">feeds.py
pulse_store.py
events.py
correlation.py
app.py
</code></pre>
<p>One quick note before we begin: Since this app runs on live market data, what you see will depend on when you open it. During weekends or off-market hours, crypto will usually dominate the dashboard while stocks and most forex pairs stay relatively quiet. That is expected.</p>
<h2 id="heading-the-app-were-building">The App We’re&nbsp;Building</h2>
<p>Before we touch any code, here’s what the finished dashboard looks like:</p>
<p><a href="https://gumlet.tv/watch/69b99df9554f0fb510c28ce6/">https://gumlet.tv/watch/69b99df9554f0fb510c28ce6/</a></p>
<p>Let's go over its main features:</p>
<h3 id="heading-pulse-table">Pulse Table</h3>
<p>This is the main screen. It’s your ranked list of movers across the watchlist. Each row is one symbol, and the columns are the small set of signals we compute live: last price, 1-minute return, 5-minute return when available, 15-minute volatility, and a simple regime label.</p>
<p>If you open the app and only want one thing, it’s this table. You can glance at it and immediately know what deserves attention.</p>
<h3 id="heading-stress-feed">Stress Feed</h3>
<p>This is where the app stops feeling like a live ticker and starts feeling like a product feature. Instead of printing every update, we only emit events when something crosses a threshold, like a sharp 1-minute move or a volatility spike. Those events become “cards” in a feed. The point is to reduce noise, not create more of it.</p>
<h3 id="heading-correlation-card">Correlation Card</h3>
<p>This is intentionally small and conservative. Correlation in real time gets messy fast because different symbols tick at different frequencies and you need alignment. For this build, we keep it to stocks only and compute correlation off time buckets.</p>
<p>It’s not meant to be a full correlation matrix. It’s just a quick “what’s moving with my base symbol right now” view, and it adapts its lookback window depending on whether the base symbol is in a normal or high-vol regime.</p>
<h3 id="heading-control-panel">Control Panel</h3>
<p>At the top, you have a few controls that make the demo feel interactive without turning it into a settings page. Top movers lets you pick how many rows you want in the Pulse table. Correlation base switches which stock you’re anchoring correlation around. Correlation bucket changes the time bucket size used for alignment, which is useful when the feed is sparse and you want correlation to stabilize.</p>
<h2 id="heading-the-app-architecture">The App Architecture</h2>
<p>If you’ve ever tried to build a live Streamlit app, you’ve probably hit the same wall. Streamlit reruns your script constantly. Any time a widget changes, any time you call <code>st.rerun()</code>, the whole file executes again from the top.</p>
<p>That’s great for normal dashboards, but it’s a terrible place to run an infinite WebSocket loop. If you do that in the main thread, the UI either freezes or you end up reconnecting to feeds on every rerun.</p>
<img src="https://cdn.hashnode.com/uploads/covers/5f362fe21017f7317167b14c/f6431fe7-fa92-448a-8116-132af071c490.png" alt="Multi-Asset Market Pulse App Architecture" style="display:block;margin:0 auto" width="600" height="400" loading="lazy">

<p>So the architecture here is intentionally split into two roles.</p>
<p>One background worker owns the real-time work. It connects to the WebSocket feeds, ingests ticks, updates rolling buffers, computes metrics, and emits stress events. That worker runs continuously, and it keeps the latest state in memory. That’s the engine of the app.</p>
<p>Streamlit itself stays dumb on purpose. On every rerun, it only reads whatever state the worker has produced and renders tables and a small correlation card. There's no data fetching in the UI loop. No heavy computation. Just display. That separation is the reason the app stays stable even when you keep refreshing the page or tweaking controls.</p>
<p>In practice, the simplest way to do this in Python is a background thread that runs an async loop. Streamlit starts that thread once using <code>st.session_state</code> as a guard, and then the UI code just keeps rerendering from the shared state.</p>
<p>It’s not fancy. But it’s the difference between a “works for 30 seconds” demo and something that can sit open like a real market pulse screen.</p>
<h3 id="heading-code-file-structure">Code File Structure</h3>
<p>To keep this build readable, I split the app into five small files. Each file has one job, and the Streamlit UI doesn’t touch the WebSocket logic directly.</p>
<ul>
<li><p><code>feeds.py</code> handles WebSocket connections and normalizes every incoming message into the same tick format.</p>
</li>
<li><p><code>pulse_store.py</code> keeps rolling buffers per symbol and computes pulse metrics (returns, vol, trend, regime). This is the core state.</p>
</li>
<li><p><code>events.py</code> turns the live metrics into a stress feed with cooldowns and asset-aware thresholds.</p>
</li>
<li><p><code>correlation.py</code> builds the correlation card by bucketing and aligning returns, then changing the lookback window based on regime.</p>
</li>
<li><p><code>app.py</code> is the Streamlit dashboard. It starts the background worker once, then keeps rerendering from shared state.</p>
</li>
</ul>
<p>That split is what makes the app stable. The background worker can run forever. Streamlit can rerun as often as it wants without reconnecting to feeds or recomputing everything from scratch.</p>
<h2 id="heading-streaming-layer-one-queue-many-feeds">Streaming Layer: One Queue, Many&nbsp;Feeds</h2>
<p>The first step is getting real-time ticks into the system. We connect to EODHD’s WebSocket feeds for stocks, forex, and crypto, subscribe to a small watchlist, then normalize every message into one tick schema: <code>{symbol, asset, ts, price}</code>.</p>
<p>Once we have that, everything downstream becomes predictable.</p>
<h3 id="heading-feedspy"><code>feeds.py:</code></h3>
<pre><code class="language-python">import asyncio
import json
import time
import websockets

API_KEY = "YOUR EODHD API KEY"

WS = {
    "stocks": "wss://ws.eodhistoricaldata.com/ws/us?api_token=",
    "forex":  "wss://ws.eodhistoricaldata.com/ws/forex?api_token=",
    "crypto": "wss://ws.eodhistoricaldata.com/ws/crypto?api_token=",
}

def _tick(symbol, asset, price):
    return {"symbol": symbol, "asset": asset, "ts": time.time(), "price": float(price)}

def _parse(asset, msg):
    s = msg.get("s")
    p = msg.get("p")
    if s is None or p is None:
        return None
    return _tick(s, asset, p)

async def _stream(asset, symbols, q):
    url = WS[asset] + API_KEY

    while True:
        try:
            async with websockets.connect(url, ping_interval=20, ping_timeout=20) as ws:
                sub = {"action": "subscribe", "symbols": ",".join(symbols)}
                await ws.send(json.dumps(sub))

                async for raw in ws:
                    try:
                        msg = json.loads(raw)
                    except Exception:
                        continue

                    t = _parse(asset, msg)
                    if t:
                        await q.put(t)

        except Exception:
            await asyncio.sleep(1.0)

async def start_streams(q):
    tasks = []
    tasks.append(asyncio.create_task(_stream("stocks", ["AAPL","TSLA","NVDA","AMZN","MSFT","META","GOOGL"], q)))
    tasks.append(asyncio.create_task(_stream("forex", ["EURUSD","USDINR","USDJPY","GBPUSD","AUDUSD"], q)))
    tasks.append(asyncio.create_task(_stream("crypto", ["BTC-USD","ETH-USD","BTC-USDT","ETH-USDT","SOL-USDT"], q)))
    return tasks
</code></pre>
<p><strong>Note:</strong> Replace YOUR EODHD API KEY with your actual EODHD API key. If you don’t have one, you can obtain it by opening an EODHD developer account.</p>
<p>What this code is doing is simple. Each feed runs in its own async task, pushes normalized ticks into a single shared queue, and reconnects if the socket drops. We don’t try to do anything smart here. This layer is just plumbing.</p>
<h3 id="heading-why-the-watchlist-is-curated">Why the Watchlist is&nbsp;Curated</h3>
<p>A bigger watchlist makes the demo look impressive, but it also makes debugging and alignment harder. For the article, you want a list that’s small enough to reason about, but diverse enough to show multi-asset behavior.</p>
<p>One thing that will skew what you see is weekends. Stocks and most forex won’t meaningfully tick when markets are closed, while crypto runs 24/7. So if you run the app on a Sunday, crypto will naturally dominate the pulse table. That’s not a bug. It’s just what happens when only one asset class is actually moving.</p>
<p>In a real product, you’d solve this by ranking movers per asset class or rendering separate sections. For this build, we'll keep it simple and accept that the output depends on when you run it.</p>
<h2 id="heading-rolling-state-buffers-returns-volatility-trend">Rolling State: Buffers, Returns, Volatility, Trend</h2>
<p>This is the core of the app. We keep a rolling buffer per symbol, compute a few live signals from it, and expose everything as a compact snapshot that the UI and the event system can consume.</p>
<h3 id="heading-pulsestorepy"><code>pulse_store.py:</code></h3>
<pre><code class="language-python">import time
import math
import threading
from collections import deque

class PulseStore:
    def __init__(self, window_sec=3600):
        self.window_sec = window_sec
        self.buffers = {}
        self.latest = {}
        self.asset = {}
        self.vol_hist = {}
        self.lock = threading.Lock()

    def _buf(self, symbol):
        if symbol not in self.buffers:
            self.buffers[symbol] = deque()
        return self.buffers[symbol]

    def update(self, tick):
        symbol = tick["symbol"]
        ts = tick["ts"]
        px = tick["price"]

        with self.lock:
            b = self._buf(symbol)
            b.append((ts, px))
            self.latest[symbol] = px
            self.asset[symbol] = tick.get("asset")

            cutoff = ts - self.window_sec
            while b and b[0][0] &lt; cutoff:
                b.popleft()

        return len(b)

    def _price_at_or_before(self, b, target_ts):
        with self.lock:
            data = list(b)

        for i in range(len(data) - 1, -1, -1):
            if data[i][0] &lt;= target_ts:
                return data[i][1]
        return None

    def ret(self, symbol, window_sec):
        b = self.buffers.get(symbol)
        if not b:
            return None

        with self.lock:
            if len(b) &lt; 2:
                return None
            now_ts, now_px = b[-1]

        px0 = self._price_at_or_before(b, now_ts - window_sec)
        if px0 is None:
            return None

        return (now_px / px0) - 1.0

    def ret_1m(self, symbol):
        return self.ret(symbol, 60)

    def ret_5m(self, symbol):
        return self.ret(symbol, 300)

    def ret_15m(self, symbol):
        return self.ret(symbol, 900)

    def _recent_prices(self, b, window_sec):
        with self.lock:
            data = list(b)

        if not data:
            return []

        cutoff = data[-1][0] - window_sec
        out = []
        for ts, px in data:
            if ts &gt;= cutoff:
                out.append(px)
        return out

    def vol_15m(self, symbol):
        b = self.buffers.get(symbol)
        if not b:
            return None

        prices = self._recent_prices(b, 900)
        if len(prices) &lt; 6:
            return None

        rets = []
        for i in range(1, len(prices)):
            rets.append(prices[i] / prices[i-1] - 1.0)

        if len(rets) &lt; 3:
            return None

        m = sum(rets) / len(rets)
        var = sum((x - m) ** 2 for x in rets) / len(rets)
        return var ** 0.5

    def trend_15m(self, symbol):
        b = self.buffers.get(symbol)
        if not b:
            return None

        prices = self._recent_prices(b, 900)
        if len(prices) &lt; 8:
            return None

        lp = []
        for p in prices:
            if p &gt; 0:
                lp.append(math.log(p))

        if len(lp) &lt; 8:
            return None

        n = len(lp)
        xs = list(range(n))

        xbar = sum(xs) / n
        ybar = sum(lp) / n

        num = 0.0
        den = 0.0
        for i in range(n):
            dx = xs[i] - xbar
            dy = lp[i] - ybar
            num += dx * dy
            den += dx * dx

        if den == 0:
            return None

        return num / den

    def _vh(self, symbol):
        if symbol not in self.vol_hist:
            self.vol_hist[symbol] = deque(maxlen=200)
        return self.vol_hist[symbol]

    def update_vol_history(self, symbol):
        v = self.vol_15m(symbol)
        if v is None:
            return None
        self._vh(symbol).append(v)
        return v

    def regime(self, symbol):
        h = self.vol_hist.get(symbol)
        if not h or len(h) &lt; 30:
            return "unknown"

        cur = h[-1]
        hs = sorted(h)
        p80 = hs[int(0.8 * (len(hs) - 1))]

        if cur &gt;= p80:
            return "high_vol"
        return "normal"

    def snapshot(self, symbol):
        last = self.latest.get(symbol)
        if last is None:
            return None

        out = {"symbol": symbol, "asset": self.asset.get(symbol), "last": last}

        r1 = self.ret_1m(symbol)
        r5 = self.ret_5m(symbol)
        r15 = self.ret_15m(symbol)
        v15 = self.vol_15m(symbol)
        tr = self.trend_15m(symbol)

        if r1 is not None:
            out["ret_1m"] = r1
        if r5 is not None:
            out["ret_5m"] = r5
        if r15 is not None:
            out["ret_15m"] = r15
        if v15 is not None:
            out["vol_15m"] = v15
        if tr is not None:
            out["trend_15m"] = tr

        v = self.update_vol_history(symbol)
        if v is not None:
            out["regime"] = self.regime(symbol)

        return out

    def snapshots(self):
        with self.lock:
            syms = list(self.buffers.keys())

        out = []
        for s in syms:
            snap = self.snapshot(s)
            if snap:
                out.append(snap)
        return out
</code></pre>
<p><code>update()</code> is the entry point. Every incoming tick gets appended to that symbol’s deque, and old points get pruned so the buffer never grows unbounded.</p>
<p>Returns are computed using a small trick: we don’t assume we have a price exactly 60 seconds ago or 300 seconds ago. We scan backwards and grab the most recent price at or before the target timestamp. That keeps returns stable even when ticks come in unevenly.</p>
<p>Volatility is computed from short returns inside the last 15 minutes of prices. It’s not annualized. It’s just a live noise meter. Trend is a tiny slope on log prices over that same window, which gives a directional hint without doing anything heavy.</p>
<p>The <code>vol_hist</code> deque is used to label regimes. We store a rolling history of recent volatility values per symbol, then call the current state <code>high_vol</code> if it’s above the 80th percentile of that recent history. It’s intentionally simple, but it’s good enough to drive the correlation window logic later.</p>
<p>The concurrency issue is the reason the lock exists. The background thread is writing to deques while Streamlit is reading them. If you iterate a deque while it’s being mutated, Python will throw an error. So every place where we iterate, we first take a snapshot copy of the deque under the lock and iterate that list instead. That keeps reads safe without making the writer slow.</p>
<h2 id="heading-turning-live-stats-into-events-stress-feed">Turning Live Stats Into Events (Stress&nbsp;Feed)</h2>
<p>Once you have live metrics, the next question is what you do with them. If you stream raw ticks into a UI, you’ll drown the user in noise. What we want instead is an event feed. Small cards that only show up when something crosses a threshold.</p>
<p>That’s what the stress feed does. It watches the snapshot coming out of PulseStore and emits one of three event types.</p>
<ul>
<li><p><code>move_1m</code> when the 1-minute move is large enough</p>
</li>
<li><p><code>move_5m</code> when the 5-minute move is large enough</p>
</li>
<li><p><code>vol_spike</code> when 15-minute volatility crosses a threshold</p>
</li>
</ul>
<p>Two practical features make this usable in a real dashboard. First, cooldowns. If TSLA crosses the 1-minute threshold, we don’t want 50 duplicate events on every tick. Second, asset-aware thresholds. Crypto naturally moves more than equities, so if you use one global threshold, BTC will dominate your stress feed all day.</p>
<h3 id="heading-eventspy"><code>events.py</code></h3>
<pre><code class="language-python">import time
from collections import deque

class EventStore:
    def __init__(self, max_events=25):
        self.max_events = max_events
        self.events = deque(maxlen=max_events)
        
    def add(self, e):
        self.events.appendleft(e)

    def latest(self):
        return list(self.events)


class StressDetector:
    def __init__(self, move_thr_1m=0.0015, move_thr_5m=0.004, vol_thr=0.00025):
        self.move_thr_1m = move_thr_1m
        self.move_thr_5m = move_thr_5m
        self.vol_thr = vol_thr
        self.cooldown_sec = 30
        self.last_emit = {}
        self.thr = {
            "stocks": {"move_1m": 0.0012, "move_5m": 0.0040, "vol": 0.00006},
            "crypto": {"move_1m": 0.0025, "move_5m": 0.0080, "vol": 0.00045},
            "forex":  {"move_1m": 0.0006, "move_5m": 0.0018, "vol": 0.00015},
        }

    def _can_emit(self, symbol, etype, now):
        k = (symbol, etype)
        prev = self.last_emit.get(k)
        if prev is None:
            self.last_emit[k] = now
            return True
        if now - prev &gt;= self.cooldown_sec:
            self.last_emit[k] = now
            return True
        return False

    def check(self, snap):
        if not snap:
            return None

        sym = snap.get("symbol")
        asset = snap.get("asset", None)
        thr = self.thr.get(asset, {"move_1m": self.move_thr_1m, "move_5m": self.move_thr_5m, "vol": self.vol_thr})
        move_thr_1m = thr["move_1m"]
        move_thr_5m = thr["move_5m"]
        vol_thr = thr["vol"]
        now = time.time()

        r5 = snap.get("ret_5m")
        r1 = snap.get("ret_1m")
        v15 = snap.get("vol_15m")

        if r5 is not None and abs(r5) &gt;= move_thr_5m:
            if self._can_emit(sym, "move_5m", now):
                return {"ts": now, "type": "move_5m", "symbol": sym, "asset": asset, "value": float(r5)}
            return None

        if r1 is not None and abs(r1) &gt;= move_thr_1m:
            if self._can_emit(sym, "move_1m", now):
                return {"ts": now, "type": "move_1m", "symbol": sym, "asset": asset, "value": float(r1)}
            return None

        if v15 is not None and v15 &gt;= vol_thr:
            if self._can_emit(sym, "vol_spike", now):
                return {"ts": now, "type": "vol_spike", "symbol": sym, "asset": asset, "value": float(v15)}
            return None

        return None
</code></pre>
<p><code>EventStore</code> is just a rolling feed. It keeps the last N events so Streamlit can render them as a table.</p>
<p><code>StressDetector.check()</code> is the filter. It looks at the latest snapshot and decides whether it’s worth creating an event. The cooldown logic is what stops spam. Once a symbol emits a <code>move_1m</code> event, it won’t emit another <code>move_1m</code> for 30 seconds.</p>
<p>The thresholds are intentionally different per asset class. Crypto needs wider bands for both moves and volatility. Otherwise, even a quiet BTC session will look like constant stress relative to equities. This one change makes the feed feel balanced and product-like.</p>
<h2 id="heading-regime-tagging-small-but-important">Regime Tagging (Small but Important)</h2>
<p>Regime is just a lightweight context label. We keep a short history of <code>vol_15m</code> per symbol and classify the current state as <code>high_vol</code> if it’s above the recent 80th percentile, otherwise normal. This gives us a stable switch we can use later. Most importantly, we use it to change the correlation lookback window depending on conditions.</p>
<h3 id="heading-add-this-to-pulsestorepy">Add this to <code>pulse_store.py</code></h3>
<p>You already have <code>PulseStore</code> in <code>pulse_store.py</code>. Insert the following methods inside the <code>PulseStore class</code>, right after <code>vol_15m()</code> and <code>trend_15m()</code> (placement isn’t critical. it just keeps the file readable).</p>
<pre><code class="language-python">    def _vh(self, symbol):
        if symbol not in self.vol_hist:
            self.vol_hist[symbol] = deque(maxlen=200)
        return self.vol_hist[symbol]

    def update_vol_history(self, symbol):
        v = self.vol_15m(symbol)
        if v is None:
            return None
        self._vh(symbol).append(v)
        return v

    def regime(self, symbol):
        h = self.vol_hist.get(symbol)
        if not h or len(h) &lt; 30:
            return "unknown"

        cur = h[-1]
        hs = sorted(h)
        p80 = hs[int(0.8 * (len(hs) - 1))]

        if cur &gt;= p80:
            return "high_vol"
        return "normal"
</code></pre>
<h3 id="heading-attach-regime-inside-snapshot-in-pulsestorepy">Attach regime inside <code>snapshot()</code> in <code>pulse_store.py</code></h3>
<p>In the same file, inside snapshot(self, symbol), add this block near the end of the function, right before return out:</p>
<pre><code class="language-python">    v = self.update_vol_history(symbol)
    if v is not None:
        out["regime"] = self.regime(symbol)
</code></pre>
<p>That’s it for regime tagging.</p>
<p><strong>Why this matters later:</strong></p>
<p>Once <code>snapshot()</code> includes regime, the rest of the app can use it without recomputing anything. In the next section, the correlation card reads <code>store.regime(base_symbol)</code> and uses that to decide whether it should look back 60 minutes (normal) or just 15 minutes (high volatility). This is what stops correlation from feeling stale during spikes and overly jumpy during calm periods.</p>
<h2 id="heading-correlation-card-stocks-only-regime-aware-window">Correlation Card (Stocks Only, Regime-aware Window)</h2>
<p>Correlation sounds simple until you try to do it live. In real-time feeds, different symbols tick at different moments. If you just correlate raw tick-to-tick returns, you’re basically correlating noise and timing gaps.</p>
<p>So we do two things to make it usable.</p>
<p>First, we align prices by time. We bucket ticks into fixed time bins (like 10s, 20s, 30s) and treat the last price inside each bin as the price for that bin. That gives every symbol a comparable timeline.</p>
<p>Second, we make the correlation window regime-aware. If the base symbol is in <code>high_vol</code>, we compute correlation on a shorter recent slice so the card reacts faster. If the regime is normal, we use a longer lookback so it doesn’t flip wildly every refresh.</p>
<p>We also keep this card stocks-only in the app. Multi-asset correlation is doable, but alignment becomes much harder when tick frequency differs massively across assets. This article is about building something shippable. A stable stocks card beats a flaky multi-asset one.</p>
<h3 id="heading-correlationpy"><code>correlation.py</code></h3>
<pre><code class="language-python">import math

def _bucket(ts, bin_sec):
    return int(ts // bin_sec) * bin_sec

def build_price_table(store, symbols, window_sec=1800, bin_sec=10):
    table = {}
    now = None

    for s in symbols:
        b = store.buffers.get(s)
        if not b:
            continue
        if now is None:
            now = b[-1][0]
        else:
            now = max(now, b[-1][0])

    if now is None:
        return {}

    cutoff = now - window_sec

    for s in symbols:
        b = store.buffers.get(s)
        if not b:
            continue

        for ts, px in b:
            if ts &lt; cutoff:
                continue
            k = _bucket(ts, bin_sec)
            row = table.get(k)
            if row is None:
                row = {}
                table[k] = row
            row[s] = px

    return table

def to_return_matrix(price_table, symbols):
    buckets = sorted(price_table.keys())
    if len(buckets) &lt; 3:
        return []

    last_prices = None
    rows = []

    for bt in buckets:
        rowp = price_table[bt]
        if any(s not in rowp for s in symbols):
            continue

        prices = [float(rowp[s]) for s in symbols]

        if last_prices is None:
            last_prices = prices
            continue

        rets = []
        ok = True
        for i in range(len(symbols)):
            p0 = last_prices[i]
            p1 = prices[i]
            if p0 &lt;= 0 or p1 &lt;= 0:
                ok = False
                break
            rets.append(p1 / p0 - 1.0)

        last_prices = prices
        if ok:
            rows.append(rets)

    return rows

def corr(a, b):
    n = len(a)
    if n &lt; 5:
        return None
    am = sum(a) / n
    bm = sum(b) / n
    num = 0.0
    da = 0.0
    db = 0.0
    for i in range(n):
        x = a[i] - am
        y = b[i] - bm
        num += x * y
        da += x * x
        db += y * y
    if da == 0 or db == 0:
        return None
    return num / math.sqrt(da * db)

def corr_card(store, symbols, base_symbol, bin_sec=10):
    reg = store.regime(base_symbol)
    win = 900 if reg == "high_vol" else 3600

    pt = build_price_table(store, symbols, window_sec=win, bin_sec=bin_sec)
    mat = to_return_matrix(pt, symbols)
    if not mat:
        return {"base": base_symbol, "regime": reg, "window_sec": win, "top": []}

    cols = list(zip(*mat))
    if base_symbol not in symbols:
        return {"base": base_symbol, "regime": reg, "window_sec": win, "top": []}

    bi = symbols.index(base_symbol)
    base = list(cols[bi])

    scores = []
    for i, s in enumerate(symbols):
        if s == base_symbol:
            continue
        c = corr(base, list(cols[i]))
        if c is None:
            continue
        scores.append((s, c))

    scores.sort(key=lambda x: abs(x[1]), reverse=True)
    top = [{"symbol": s, "corr": float(v)} for s, v in scores[:3]]

    return {"base": base_symbol, "regime": reg, "window_sec": win, "top": top}
</code></pre>
<p><code>build_price_table()</code> creates the aligned timeline. It scans each symbol’s rolling buffer, buckets timestamps into fixed bins, and stores the last price per bucket.</p>
<p><code>to_return_matrix()</code> converts those bucketed prices into returns, but only when every symbol has a price in the same bucket. That’s the alignment step that keeps correlation meaningful.</p>
<p><code>corr_card()</code> is the actual widget output. It checks the base symbol’s regime, chooses a lookback window (15m for high-vol, 60m for normal), then computes correlations against the base symbol and returns the top matches.</p>
<p>Next, we’ll wire all of this into Streamlit and render the dashboard. That’s where the build starts to feel like a real app.</p>
<h2 id="heading-building-the-streamlit-app">Building the Streamlit App</h2>
<p>At this point, we have all the moving parts. A streaming layer that produces ticks, a state engine that produces snapshots, a stress detector that emits events, and a correlation function that can generate a small card. Now we just need to wrap it in a Streamlit app without breaking everything.</p>
<p>The key trick is to start the real-time worker once and keep it running in the background. Streamlit reruns the script constantly, so the UI code should never reconnect to WebSockets or spin up new loops. It should only read shared state and render tables.</p>
<pre><code class="language-python">import asyncio
import threading
import time

import pandas as pd
import streamlit as st

from feeds import start_streams
from pulse_store import PulseStore
from events import StressDetector, EventStore
from correlation import corr_card

st.set_page_config(page_title="Market Pulse", layout="wide")

st.markdown("""
&lt;style&gt;
html, body, [class*="css"]  { background-color: #0b0f14; color: #e6edf3; }
.stApp { background-color: #0b0f14; }
div[data-testid="stMetricValue"] { color: #e6edf3; }
div[data-testid="stMetricLabel"] { color: #9aa4af; }
[data-testid="stDataFrame"] { background-color: #0b0f14; }
&lt;/style&gt;
""", unsafe_allow_html=True)

def _runner(state):
    async def _main():
        q = asyncio.Queue()
        await start_streams(q)

        store = PulseStore(window_sec=3600)
        detector = StressDetector()
        ev = EventStore(max_events=50)

        state["store"] = store
        state["events"] = ev
        state["detector"] = detector
        state["started_at"] = time.time()

        while True:
            t = await q.get()
            store.update(t)
            snap = store.snapshot(t["symbol"])
            e = detector.check(snap)
            if e:
                ev.add(e)

    asyncio.run(_main())

if "bg_started" not in st.session_state:
    st.session_state.bg_started = True
    st.session_state.state = {}
    th = threading.Thread(target=_runner, args=(st.session_state.state,), daemon=True)
    th.start()

state = st.session_state.state

st.title("Market Pulse")

col1, col2, col3 = st.columns([2, 2, 1])
with col1:
    st.caption("Real-time multi-asset pulse. Moves, stress events, and a simple correlation card.")
with col3:
    up = 0
    if "started_at" in state:
        up = int(time.time() - state["started_at"])
    st.metric("Uptime (s)", up)

if "store" not in state:
    st.info("Connecting to feeds and warming up buffers...")
    st.stop()

store = state["store"]
ev = state["events"]

c1, c2, c3 = st.columns(3)
with c1:
    top_k = st.slider("Top movers", 3, 10, 5)
with c2:
    base = st.selectbox("Correlation base (stocks)", ["TSLA", "AAPL"], index=0)
with c3:
    bin_sec = st.selectbox("Correlation bucket (sec)", [10, 20, 30], index=2)

snaps = store.snapshots()

def score(x):
    r1 = x.get("ret_1m")
    r5 = x.get("ret_5m")
    if r1 is not None:
        return abs(r1)
    if r5 is not None:
        return abs(r5)
    return 0.0

snaps.sort(key=score, reverse=True)
top = snaps[:top_k]

pulse_df = pd.DataFrame(top)
keep_cols = ["symbol", "asset", "last", "ret_1m", "ret_5m", "vol_15m", "regime"]
pulse_df = pulse_df[[c for c in keep_cols if c in pulse_df.columns]]

st.subheader("Pulse")
st.dataframe(pulse_df, use_container_width=True, height=260)

st.subheader("Stress feed")
events = ev.latest()[:15]
if events:
    ev_df = pd.DataFrame(events)
    ev_df["time"] = pd.to_datetime(ev_df["ts"], unit="s").dt.strftime("%H:%M:%S")
    ev_df = ev_df[["time", "type", "symbol", "asset", "value"]]
    st.dataframe(ev_df, use_container_width=True, height=260)
else:
    st.caption("No events yet.")

st.subheader("Correlation card (stocks)")
corr_symbols = ["AAPL", "TSLA"]
card = corr_card(store, corr_symbols, base_symbol=base, bin_sec=bin_sec)

st.write(card)

time.sleep(2.0)
st.rerun()
</code></pre>
<p>The background worker starts exactly once, inside a daemon thread. It owns the async WebSocket loop and keeps updating store and events in memory. Streamlit never touches the sockets.</p>
<p>The Pulse table comes straight from <code>store.snapshots()</code>. We sort by absolute 1-minute return when available, and fall back to 5-minute return when it exists.</p>
<p>The stress feed is rendered as a simple table, but we convert the raw epoch timestamp into a readable time string so it looks like a real UI.</p>
<p>The correlation card is a small JSON-ish object. It includes the base symbol, current regime, the window used, and the top correlations.</p>
<p>Finally, the refresh loop is intentionally basic. Sleep for two seconds, rerun, render the latest state. The heavy work continues in the worker thread.</p>
<h2 id="heading-final-output">Final Output</h2>
<p>The final app: <a href="https://gumlet.tv/watch/69b99df9554f0fb510c28ce6/">https://gumlet.tv/watch/69b99df9554f0fb510c28ce6/</a></p>
<h2 id="heading-what-id-improve-next">What I’d Improve&nbsp;Next</h2>
<p>If you want to take this beyond a demo, I’d start with a few practical upgrades.</p>
<p>First, split the Pulse table by asset class. A single global ranking is fine, but crypto will often dominate simply because it trades all the time and moves more. Separate tables for stocks, forex, and crypto makes the dashboard feel more balanced and closer to how a real product would present it.</p>
<p>Second, add light persistence. Even a tiny SQLite file or parquet dump every few minutes is enough to replay the last hour and debug issues without leaving the app running all day.</p>
<p>Third, route stress events somewhere useful. A webhook, a queue, or a small database table. Once events leave the UI and become part of a system, you can power alerts, newsletters, and internal monitoring.</p>
<p>Finally, if you want correlation to truly be multi-asset, you’ll need a stronger alignment approach. Bucketing works well for liquid equities, but for mixed tick rates you’ll want resampling logic, missing-data handling, and probably different bucket sizes per asset class.</p>
<h2 id="heading-conclusion">Conclusion</h2>
<p>That’s the full build: a live market pulse screen that streams multi-asset prices, maintains rolling state in memory, converts noisy ticks into usable signals, and surfaces everything through a simple Streamlit dashboard.</p>
<p>The main takeaway is the pattern. Keep streaming, state, and UI separated. Compute a small set of metrics that update smoothly. Then turn those metrics into event cards and widgets that a product team can actually use.</p>
<p>If you already use a multi-asset feed like EODHD for pricing and coverage, this kind of dashboard becomes a straightforward extension. Not a giant engineering project, just a clean way to ship real-time market context.</p>
 ]]>
                </content:encoded>
            </item>
        
            <item>
                <title>
                    <![CDATA[ How to Use WebSockets: From Python to FastAPI ]]>
                </title>
                <description>
                    <![CDATA[ Real-time data powers much of modern software: live stock prices, chat applications, sports scores, collaborative tools. And to build these systems, you'll need to understand how real-time communicati ]]>
                </description>
                <link>https://www.freecodecamp.org/news/how-to-use-websockets-from-python-to-fastapi/</link>
                <guid isPermaLink="false">69b206806c896b0519d2a308</guid>
                
                    <category>
                        <![CDATA[ websockets ]]>
                    </category>
                
                    <category>
                        <![CDATA[ Python ]]>
                    </category>
                
                    <category>
                        <![CDATA[ FastAPI ]]>
                    </category>
                
                    <category>
                        <![CDATA[ Backend Development ]]>
                    </category>
                
                    <category>
                        <![CDATA[ Real Time ]]>
                    </category>
                
                <dc:creator>
                    <![CDATA[ Nneoma Uche ]]>
                </dc:creator>
                <pubDate>Thu, 12 Mar 2026 00:19:12 +0000</pubDate>
                <media:content url="https://cdn.hashnode.com/uploads/covers/5e1e335a7a1d3fcc59028c64/8acb0854-7289-4794-8f97-242ec5d8ca61.png" medium="image" />
                <content:encoded>
                    <![CDATA[ <p>Real-time data powers much of modern software: live stock prices, chat applications, sports scores, collaborative tools. And to build these systems, you'll need to understand how real-time communication actually works—which isn’t always straightforward.</p>
<p>I ran into this firsthand while trying to build a live options dashboard. HTTP requests weren't going to cut it, and everything I was reading seemed overly complex until I went back to the basics. This article is the result of that process.</p>
<p>We'll cover Python's <code>websockets</code> library from scratch, then move into FastAPI, where many Python backends live. It's worth noting that WebSockets aren't the only solution for real-time communication. WebRTC may be a better fit depending on your use case, but understanding WebSockets is the right starting point before exploring further.</p>
<h3 id="heading-table-of-contents">Table of Contents</h3>
<ol>
<li><p><a href="#heading-websocket-connections-and-methods">WebSocket Connections and Methods</a></p>
</li>
<li><p><a href="#heading-how-to-build-your-first-websocket-in-python">How to Build Your First WebSocket in Python</a></p>
</li>
<li><p><a href="#heading-file-transfer-over-websockets">File Transfer Over WebSockets</a></p>
</li>
<li><p><a href="#heading-how-to-connect-to-an-external-websocket">How to Connect to an External WebSocket</a></p>
</li>
<li><p><a href="#heading-websockets-in-fastapi">WebSockets in FastAPI</a></p>
</li>
<li><p><a href="#heading-how-to-handle-websocket-disconnections-in-fastapi">How to Handle WebSocket Disconnections in FastAPI</a></p>
</li>
<li><p><a href="#heading-conclusion">Conclusion</a></p>
</li>
</ol>
<h2 id="heading-websocket-connections-and-methods">WebSocket Connections and Methods</h2>
<p>A WebSocket connection enables bi-directional communication between a client and a server. Once a connection is established, both sides can communicate freely without either having to ask first. This is different from a regular HTTP request, where the client always has to ask before the server can respond.</p>
<p>It looks something like this:</p>
<pre><code class="language-plaintext">        CLIENT&nbsp; &lt;===== open connection =====&gt;&nbsp; SERVER
</code></pre>
<p>Note that a WebSocket URL is not a regular web page, so you can't "visit it" like a website. You need a client to talk to it.</p>
<p>Different frameworks provide different methods for handling WebSocket connections. With Python’s <code>websockets</code> library, for instance, a connection is automatically accepted the moment a client connects. With frameworks like FastAPI, you have to explicitly call <code>await websocket.accept()</code>, otherwise the connection gets rejected.</p>
<p>Let’s look at the core methods provided by Python’s <code>websockets</code> library:</p>
<ol>
<li><p><code>websockets.serve(...)</code>:&nbsp; starts a WebSocket server.</p>
</li>
<li><p><code>websockets.connect(...)</code>: connects to a WebSocket server.</p>
</li>
<li><p><code>websockets.send(...)</code>: sends a message from either side.</p>
</li>
<li><p><code>websockets.recv()</code>: receives a message from client or server.</p>
</li>
</ol>
<p><code>recv()</code> takes no arguments because it's purely a waiting operation. It waits for the next message and returns it:</p>
<pre><code class="language-python">message = await websocket.recv()
</code></pre>
<h2 id="heading-how-to-build-your-first-websocket-in-python">How to Build Your First WebSocket in Python</h2>
<p>Before we dive into frameworks, let’s explore Python’s <code>websockets</code> library. You’ll set up a simple server and client, and exchange messages over a WebSocket connection, giving you a solid foundation for understanding WebSockets under the hood.</p>
<h3 id="heading-environment-setup">Environment Setup</h3>
<p>Run the following in your virtual environment to install or verify the WebSockets package:</p>
<pre><code class="language-python">pip install websockets
# or, to check if it's already installed:
pip show websockets
</code></pre>
<h3 id="heading-create-the-websocket-server">Create the WebSocket Server</h3>
<p>Create <code>server.py</code> in your project folder, and paste this:</p>
<pre><code class="language-python">import asyncio
import websockets

async def handler(connection):
    print("Client connected")

    message = await connection.recv()
    print("Received from client:", message)
    await connection.send("Hello client!")


async def main():
    async with websockets.serve(handler, "localhost", 8000):
        print("Server running at ws://localhost:8000")
        #await asyncio.Future()  # runs forever
        await asyncio.sleep(30)

asyncio.run(main())
</code></pre>
<p>When this line executes:</p>
<pre><code class="language-python">async with websockets.serve(handler, "localhost", 8000):
</code></pre>
<p>The library opens a TCP socket on the specified host and port and waits for incoming clients. When one connects, it creates a connection object and passes it into your handler function.</p>
<p>The handler is required because it defines what the server does with each connection. The <code>host</code> and <code>port</code> arguments are also important. Both default to <code>None</code> – passing neither raises an error because the OS cannot bind a network server without a port.</p>
<p>You could pass <code>port=0</code> to let the OS assign a free port automatically, but then you'd need an extra step to figure out which port was chosen, so the client can connect:</p>
<pre><code class="language-python">server.sockets[0].getsockname()
</code></pre>
<p>It’s simpler to specify both host and port explicitly, so the client knows exactly where the server is running.</p>
<h3 id="heading-set-up-the-client">Set Up the Client</h3>
<p>Create <code>client.py</code> in the same folder and add this:</p>
<pre><code class="language-python">import asyncio
import websockets

async def client():
    async with websockets.connect("ws://localhost:8000") as websocket:
        await websocket.send("Hello server!")
        response = await websocket.recv()
        print("Server replied:", response)

asyncio.run(client())
</code></pre>
<h3 id="heading-test-the-connection">Test the Connection</h3>
<p>First, open a terminal and run <code>server.py</code>. You should see:</p>
<pre><code class="language-plaintext">Server running at ws://localhost:8000
</code></pre>
<p>In a second terminal, run <code>client.py</code>. Messages should appear in both terminals confirming that the connection is active and both sides are communicating.</p>
<p>Note that the server must be running before you start the client – otherwise the client has nothing to connect to, and the connection will fail.</p>
<h4 id="heading-keeping-the-server-alive-a-note-on-asynciofuture">Keeping the server alive: a note on asyncio.Future()</h4>
<p>In <code>server.py</code>, there’s a line currently commented out:</p>
<pre><code class="language-python">await asyncio.Future()
</code></pre>
<p>This keeps the server running indefinitely. For local development and testing however, <code>await asyncio.sleep(30)</code> is a simpler alternative. It keeps the server alive for a fixed period without running forever.</p>
<h2 id="heading-file-transfer-over-websockets">File Transfer Over WebSockets</h2>
<p>WebSockets aren't limited to text. They support raw bytes too, which means you can send files directly over the connection. Here’s how a client can send a file to a server over a WebSocket connection:</p>
<h3 id="heading-update-serverpy">Update <code>server.py</code></h3>
<pre><code class="language-python">async def file_handler(ws):
    print("Client connected, waiting for file...")
    file_bytes = await ws.recv()  # receive bytes
    with open("received_file.png", "wb") as f:
        f.write(file_bytes)
    print("File received and saved!")
    await ws.send("File received successfully!")

async def main():
    async with websockets.serve(file_handler, "localhost", 8000):
        print("Server running on ws://localhost:8000")
        await asyncio.sleep(50)  # keep server alive

asyncio.run(main())
</code></pre>
<p>The handler waits for incoming bytes with <code>await ws.recv()</code>; the <code>websockets</code> library automatically detects whether the incoming message is text or bytes, so no extra configuration is needed. Once received, the file is written to disk in binary mode (<code>"wb"</code>) and the server sends a confirmation message back to the client.</p>
<h3 id="heading-update-clientpy">Update <code>client.py</code></h3>
<pre><code class="language-python">import asyncio
import websockets

async def send_file():
    uri = "ws://localhost:8000"
    async with websockets.connect(uri) as ws:
        with open("portfolio-image.png", "rb") as f:  #open file in binary mode
            file_bytes = f.read()
        await ws.send(file_bytes)  # send bytes
        response = await ws.recv()
        print("Server response:", response)

asyncio.run(send_file())
</code></pre>
<p>The client opens the image in binary mode (<code>"rb"</code>), reads the entire file into memory as bytes, and sends it in a single <code>ws.send()</code> call. It then waits for the server's confirmation before closing the connection.</p>
<h3 id="heading-test-it">Test it</h3>
<p>Add an image to your project folder and make sure the filename in <code>client.py</code> matches. Run <code>server.py</code> first, then <code>client.py</code> in a second terminal.</p>
<p>Once the transfer completes, the server saves the file as <code>received_file.png</code> in the same directory. You should see it appear in your workspace immediately.</p>
<p>This approach loads the entire file into memory before sending. For large files, it’s better to read and send them in chunks. But this is the easiest way to understand WebSocket byte transfer.</p>
<h2 id="heading-how-to-connect-to-an-external-websocket">How to Connect to an External WebSocket</h2>
<p>So far you've been connecting to servers you built yourself. But WebSocket clients can also connect to public servers. For example, a client can connect to Postman’s echo server:</p>
<pre><code class="language-python">import asyncio
import websockets

async def connect_external():
    uri = "wss://ws.postman-echo.com/raw"  # public WebSocket server
    async with websockets.connect(uri) as ws:
        print("Connected to external server!")

        # Send a message
        await ws.send("Hello external server!")
        print("Message sent")

        # Receive response
        response = await ws.recv()
        print("Received from server:", response)
asyncio.run(connect_external())
</code></pre>
<p>Notice the client connects to Postman’s echo server using the <code>wss://</code> URI scheme instead of <code>ws://</code>. This indicates the connection is encrypted using TLS, similar to how <code>https://</code> secures regular web requests.</p>
<p>An echo server returns exactly what you send it. So "Hello external server!" comes straight back as the response. It's a useful sandbox for testing your client-side WebSocket code without needing your own server.</p>
<h2 id="heading-websockets-in-fastapi">WebSockets in FastAPI</h2>
<p>FastAPI provides a WebSocket object (via Starlette under the hood) to manage real-time connections. You can define WebSocket endpoints just like HTTP routes, while Uvicorn handles the event loop – no manual asyncio server management needed. This makes FastAPI a natural fit for real-time projects, from chat apps to live dashboards and data feeds.</p>
<p>Before jumping into code, here's a quick reference of the core methods you'll be working with.</p>
<p><strong>Accepting:</strong></p>
<ul>
<li><code>await websocket.accept()</code>: the <code>accept()</code> method must be called first, before anything else. Skip it and the connection gets rejected.</li>
</ul>
<p><strong>Sending:</strong></p>
<ul>
<li><p><code>await websocket.send_text(data)</code>: sends a string.</p>
</li>
<li><p><code>await websocket.send_bytes(data)</code>: sends binary data.</p>
</li>
<li><p><code>await websocket.send_json(data)</code>: serializes and sends JSON.</p>
</li>
</ul>
<p><strong>Receiving:</strong></p>
<ul>
<li><p><code>await websocket.receive_text()</code>: waits for a text message.</p>
</li>
<li><p><code>await websocket.receive_bytes()</code>: waits for binary data.</p>
</li>
<li><p><code>await websocket.receive_json()</code>: receives and deserializes JSON.</p>
</li>
<li><p><code>async for msg in websocket.iter_text()</code>: iterates over incoming messages, exits cleanly on disconnect.</p>
</li>
</ul>
<p><strong>Closing:</strong></p>
<ul>
<li><code>await websocket.close(code=1000)</code>: standard code for a normal closure. It accepts an optional “reason” argument.</li>
</ul>
<p>Here's what the WebSocket lifecycle looks like in FastAPI:</p>
<img src="https://cdn.hashnode.com/uploads/covers/6426acfa5bc738c37852b5bd/b61b6b25-e027-47a1-8efc-b921e93b8521.png" alt="WebSocket in FastAPI" style="display:block;margin:0 auto" width="689" height="583" loading="lazy">

<h3 id="heading-building-a-simple-echo-server-with-fastapi">Building a Simple Echo Server with FastAPI</h3>
<p>As you saw with the Postman example, an echo server sends back the message a client provides. Let's build one with FastAPI.</p>
<h4 id="heading-1-install-fastapi">1. Install FastAPI:</h4>
<pre><code class="language-python">pip install "fastapi[standard]"
</code></pre>
<h4 id="heading-2-update-serverpy">2. Update <code>server.py</code>:</h4>
<pre><code class="language-python">from fastapi import FastAPI, WebSocket

app = FastAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    data = await websocket.receive_text()
   
    await websocket.send_text(f"You said: {data}")

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="127.0.0.1", port=8000)
</code></pre>
<p>A few things to note here compared to the plain <code>websockets</code> library:</p>
<ul>
<li><p>WebSocket endpoints are defined with <code>@app.websocket("/ws")</code> just like an HTTP route.</p>
</li>
<li><p><code>await websocket.accept()</code> is required before anything else. FastAPI won't accept connections without it.</p>
</li>
<li><p>Uvicorn handles the event loop and server startup for you via the <code>if name == "__main__"</code> block. No <code>asyncio.run()</code> or <code>asyncio.Future()</code> needed.</p>
</li>
</ul>
<h4 id="heading-3-update-clientpy">3. Update client.py:</h4>
<pre><code class="language-python">async def test_client():
    uri = "ws://127.0.0.1:8000/ws"
    async with websockets.connect(uri) as ws:
        await ws.send("Hello FastAPI server!")
        response = await ws.recv()
        print("Server replied:", response)

asyncio.run(test_client())
</code></pre>
<p>Since the FastAPI server isn't secured with TLS, the client URI uses <code>ws://</code> instead of <code>wss://</code>. Make sure to match the host and port from your server code.</p>
<h4 id="heading-4-interact-with-the-echo-server">4. Interact with the echo server:</h4>
<p>Start <code>server.py</code>, then run <code>client.py</code> in another terminal. The server terminal should show the echoed message.</p>
<img src="https://cdn.hashnode.com/uploads/covers/6426acfa5bc738c37852b5bd/88629d79-eb91-4af5-9752-f9596ff5e5a4.png" alt="88629d79-eb91-4af5-9752-f9596ff5e5a4" style="display:block;margin:0 auto" width="495" height="101" loading="lazy">

<h2 id="heading-how-to-handle-websocket-disconnections-in-fastapi">How to Handle WebSocket Disconnections in FastAPI</h2>
<p>Clients will inevitably disconnect in real-time applications, sometimes intentionally, sometimes unexpectedly. If not handled properly, this can crash your server or leave it in a broken state.</p>
<p>The <code>WebSocketDisconnect</code> exception in FastAPI is raised whenever a client unexpectedly closes the connection, allowing the server to handle disconnects gracefully, log the event, and clean up resources without crashing.</p>
<p>Here’s an example:</p>
<pre><code class="language-python">@app.websocket("/ws")
async def websocket_endpoint(ws: WebSocket):
    await ws.accept()
    try:
        while True:
            data = await ws.receive_text()
   
            if "bye" in data or "quit" in data:
                await ws.send_text("Closing connection")
                await ws.close(code=1000, reason="Server requested close")  
                break
            await ws.send_text(f"I got your request: {data}")
    except WebSocketDisconnect:
        print("Client disconnected")  # connection already closed
</code></pre>
<p>The server runs a continuous loop waiting for messages. If the client message contains "bye" or "quit", the server responds, calls <code>await ws.close(code=1000)</code>, and breaks out of the loop cleanly.</p>
<p>But if the client disconnects unexpectedly, <code>WebSocketDisconnect</code> is caught by the except block and the server moves on without crashing. At this point the connection is already closed on the client side, so calling <code>ws.close()</code> inside the except block is unnecessary.</p>
<h2 id="heading-conclusion">Conclusion</h2>
<p>WebSockets make real-time communication possible by keeping a persistent connection open between client and server. Starting with Python’s <code>websockets</code> library helps clarify how the protocol works under the hood, while frameworks like FastAPI provide the structure needed for production applications.</p>
<p>The parts that trip most people up early on are <code>asyncio</code> and FastAPI's explicit <code>websocket.accept()</code>. With <code>asyncio</code>, the question is usually why it's needed and why the server dies instantly without something keeping it alive. And it's easy to ignore <code>websocket.accept()</code> if you're coming from the plain <code>websockets library</code> where that happens automatically. Once those click, everything else follows naturally.</p>
 ]]>
                </content:encoded>
            </item>
        
    </channel>
</rss>
