Kihagyás

2. feladat: Adatbázis betöltés és Extract

A batch pipeline első lépése az Extract fázis: az adatok kinyerése a forrásrendszerből. Ebben a feladatban az 1. feladatban generált adatokat PostgreSQL-be töltjük (ezzel szimulálva egy valódi forrásrendszert), majd full és incremental kinyerést végzünk, és összehasonlítjuk a kettőt. Megvizsgáljuk az indexelés hatását az extract teljesítményre, és szimulálunk egy CDC (Change Data Capture) mintájú deltakinyerést is.

Notebook létrehozása

  1. Hozzunk létre egy új notebookot feladat2_extract.ipynb néven.

  2. Az első cellában telepítsük és importáljuk a szükséges csomagokat:

    !pip install -q psycopg2-binary sqlalchemy pyarrow pandas
    
    import pandas as pd
    import pyarrow.parquet as pq
    import os
    import time
    from datetime import datetime, timedelta
    from sqlalchemy import create_engine, text
    
    engine = create_engine(
        "postgresql://dataeng:dataeng@postgres:5432/labor"
    )
    print("Kapcsolódva a PostgreSQL adatbázishoz.")
    

    Hostnév Docker-ben

    Docker Compose hálózaton belül a szolgáltatásokat a nevükkel lehet elérni. A postgres hostnév a docker-compose.yaml-ban definiált service nevéből származik. Ha a saját gépünkről csatlakoznánk (pl. DBeaver-ből), akkor localhost:5432-t kellene megadni.

Adatok betöltése PostgreSQL-be

  1. Töltsük be az 1. feladatban generált Parquet fájlokat az adatbázisba:

    tables = ["customers", "products", "orders", "order_items"]
    
    print("=== Betöltés PostgreSQL-be ===\n")
    for table in tables:
        df = pd.read_parquet(f"/home/jovyan/data/raw/{table}.parquet")
        df.to_sql(table, engine, if_exists="replace", index=False)
        print(f"  {table:<15}{len(df):>6,} sor betöltve")
    

Index létrehozása és EXPLAIN ANALYZE

  1. Hozzunk létre indexet az order_date oszlopra, és nézzük meg, hogyan változik a lekérdezési terv:

    print("=== EXPLAIN ANALYZE – index nélkül ===\n")
    with engine.connect() as conn:
        plan = conn.execute(text("""
            EXPLAIN ANALYZE
            SELECT * FROM orders
            WHERE order_date >= NOW() - INTERVAL '30 days'
        """)).fetchall()
    for row in plan:
        print(" ", row[0])
    
    # Index létrehozása
    with engine.connect() as conn:
        conn.execute(text(
            "CREATE INDEX IF NOT EXISTS idx_orders_date ON orders(order_date)"
        ))
        conn.commit()
    print("\nIndex létrehozva: idx_orders_date\n")
    
    print("=== EXPLAIN ANALYZE – index után ===\n")
    with engine.connect() as conn:
        plan = conn.execute(text("""
            EXPLAIN ANALYZE
            SELECT * FROM orders
            WHERE order_date >= NOW() - INTERVAL '30 days'
        """)).fetchall()
    for row in plan:
        print(" ", row[0])
    

    Seq Scan vs Index Scan

    Index nélkül a PostgreSQL Seq Scan-t (soros olvasás) végez – minden sort megvizsgál. Index után Index Scan vagy Bitmap Index Scan látható, ami csak a releváns sorokat olvassa be. A rows= és actual time= értékek megmutatják a valós különbséget.

Full Extract

  1. Kérdezzük le az összes táblát, és mentsük Parquet-ba:

    os.makedirs("/home/jovyan/data/extract/full", exist_ok=True)
    
    print("=== Full Extract ===\n")
    full_stats = []
    for table in tables:
        start = time.time()
        df = pd.read_sql(f"SELECT * FROM {table}", engine)
        elapsed = time.time() - start
        path = f"/home/jovyan/data/extract/full/{table}.parquet"
        df.to_parquet(path, index=False)
        full_stats.append({
            "tábla": table,
            "sorok": len(df),
            "méret (KB)": round(os.path.getsize(path) / 1024, 1),
            "idő (ms)": round(elapsed * 1000, 1),
        })
    
    full_df = pd.DataFrame(full_stats)
    print(full_df.to_string(index=False))
    

Incremental Extract

  1. Kérdezzük le csak az utolsó 30 napban keletkezett rendeléseket:

    os.makedirs("/home/jovyan/data/extract/incremental", exist_ok=True)
    
    print("=== Incremental Extract (utolsó 30 nap) ===\n")
    start = time.time()
    incr_df = pd.read_sql(
        "SELECT * FROM orders WHERE order_date >= NOW() - INTERVAL '30 days'",
        engine,
    )
    incr_elapsed = time.time() - start
    incr_path = "/home/jovyan/data/extract/incremental/orders.parquet"
    incr_df.to_parquet(incr_path, index=False)
    
    full_path = "/home/jovyan/data/extract/full/orders.parquet"
    print(f"  Full sorok:        {len(pd.read_parquet(full_path)):>6,}")
    print(f"  Incremental sorok: {len(incr_df):>6,}")
    print(f"  Full méret:        {os.path.getsize(full_path)/1024:>6.1f} KB")
    print(f"  Incremental méret: {os.path.getsize(incr_path)/1024:>6.1f} KB")
    

CDC szimulálása

  1. Szimulálunk egy Change Data Capture mintájú deltakinyerést: frissítünk 50 rendelést, és csak a változott sorokat kérjük le:

    print("=== CDC szimulálás ===\n")
    
    # updated_at oszlop hozzáadása, ha még nincs
    with engine.connect() as conn:
        conn.execute(text("""
            ALTER TABLE orders
            ADD COLUMN IF NOT EXISTS updated_at TIMESTAMP DEFAULT NOW()
        """))
        conn.commit()
    
    # 50 rendelés frissítése 'delivered' státuszra
    cutoff = datetime.now()
    with engine.connect() as conn:
        conn.execute(text("""
            UPDATE orders
            SET status = 'delivered',
                updated_at = NOW()
            WHERE order_id IN (
                SELECT order_id FROM orders
                WHERE status = 'shipped'
                LIMIT 50
            )
        """))
        conn.commit()
    print("50 rendelés státusza frissítve: shipped → delivered\n")
    
    # Delta extract: csak a cutoff után módosított sorok
    delta_df = pd.read_sql(
        f"SELECT * FROM orders WHERE updated_at >= '{cutoff.isoformat()}'",
        engine,
    )
    delta_path = "/home/jovyan/data/extract/incremental/orders_delta.parquet"
    delta_df.to_parquet(delta_path, index=False)
    print(f"CDC delta sorok: {len(delta_df)} (csak a frissített rendelések)")
    print(f"Delta méret: {os.path.getsize(delta_path)/1024:.1f} KB")
    

    Mikor melyiket?

    • Full extract: egyszerű, megbízható, kis és ritkán változó táblákhoz ideális. Hátránya: nagy tábláknál lassú és drága.
    • Incremental extract: csak a változott adatokat kéri le — sokkal hatékonyabb. Korlátja: szükséges egy order_date vagy updated_at mező, és a DELETE műveletek láthatatlanok maradnak.
    • CDC (Change Data Capture): az updated_at timestamp alapján csak az utolsó futás óta megváltozott sorokat kérjük le — ez a leghatékonyabb módszer valódi rendszereknél.

Full vs Incremental összehasonlítás

  1. Készítsünk összehasonlítást:

    full_orders_size = os.path.getsize("/home/jovyan/data/extract/full/orders.parquet")
    incr_orders_size = os.path.getsize(incr_path)
    full_count = len(pd.read_parquet("/home/jovyan/data/extract/full/orders.parquet"))
    
    comparison = pd.DataFrame([
        {
            "Módszer": "Full extract",
            "Sorok": full_count,
            "Méret (KB)": round(full_orders_size / 1024, 1),
            "Idő (ms)": round(full_stats[2]["idő (ms)"], 1),
        },
        {
            "Módszer": "Incremental (30 nap)",
            "Sorok": len(incr_df),
            "Méret (KB)": round(incr_orders_size / 1024, 1),
            "Idő (ms)": round(incr_elapsed * 1000, 1),
        },
    ])
    print("=== Összehasonlítás ===\n")
    print(comparison.to_string(index=False))
    ratio = full_orders_size / incr_orders_size
    print(f"\nAdat-csökkentés: {ratio:.1f}×")
    

Parquet séma és metaadat vizsgálat

  1. Vizsgáljuk meg az egyik Parquet fájl sémáját és metaadatait, és hasonlítsuk össze a CSV formátummal:

    schema = pq.read_schema("/home/jovyan/data/extract/full/orders.parquet")
    meta = pq.read_metadata("/home/jovyan/data/extract/full/orders.parquet")
    
    print("=== orders.parquet séma ===\n")
    for i, field in enumerate(schema):
        print(f"  {field.name:<20} {str(field.type):<15}")
    
    print(f"\nRow group-ok száma: {meta.num_row_groups}")
    print(f"Összes sor:         {meta.num_rows:,}")
    print(f"Serialized méret:   {meta.serialized_size / 1024:.1f} KB")
    
    # CSV vs Parquet méretarány
    csv_path = "/home/jovyan/data/extract/full/orders.csv"
    pd.read_parquet("/home/jovyan/data/extract/full/orders.parquet").to_csv(csv_path, index=False)
    parquet_kb = os.path.getsize("/home/jovyan/data/extract/full/orders.parquet") / 1024
    csv_kb = os.path.getsize(csv_path) / 1024
    print(f"\nCSV méret:     {csv_kb:>8.1f} KB")
    print(f"Parquet méret: {parquet_kb:>8.1f} KB")
    print(f"Tömörítési arány: {csv_kb / parquet_kb:.1f}× (Parquet kisebb)")
    

    Row group

    A Parquet fájl row group-okra van felosztva. Minden row group az adatok egy részét tárolja. Az analitikai motorok (DuckDB, Spark) a row group min/max statisztikái alapján dönthetik el, hogy egy adott row group tartalmaz-e a WHERE feltételnek megfelelő sorokat — ha nem, kihagyják az olvasását. Ez a predicate pushdown alapja.

  2. Commitoljuk a notebookot.

Beadandó

2. feladat beadandó

  • Commitoljuk a notebookot.
  • Készítsünk egy képernyőképet a 8. cella kimenetéről (full vs incremental összehasonlítás), és mentsük el a repository gyökerébe f2.png néven.


2026-03-02 Szerzők