2. feladat: Adatbázis betöltés és Extract¶
A batch pipeline első lépése az Extract fázis: az adatok kinyerése a forrásrendszerből. Ebben a feladatban az 1. feladatban generált adatokat PostgreSQL-be töltjük (ezzel szimulálva egy valódi forrásrendszert), majd full és incremental kinyerést végzünk, és összehasonlítjuk a kettőt. Megvizsgáljuk az indexelés hatását az extract teljesítményre, és szimulálunk egy CDC (Change Data Capture) mintájú deltakinyerést is.
Notebook létrehozása¶
-
Hozzunk létre egy új notebookot
feladat2_extract.ipynbnéven. -
Az első cellában telepítsük és importáljuk a szükséges csomagokat:
!pip install -q psycopg2-binary sqlalchemy pyarrow pandas import pandas as pd import pyarrow.parquet as pq import os import time from datetime import datetime, timedelta from sqlalchemy import create_engine, text engine = create_engine( "postgresql://dataeng:dataeng@postgres:5432/labor" ) print("Kapcsolódva a PostgreSQL adatbázishoz.")Hostnév Docker-ben
Docker Compose hálózaton belül a szolgáltatásokat a nevükkel lehet elérni. A
postgreshostnév adocker-compose.yaml-ban definiált service nevéből származik. Ha a saját gépünkről csatlakoznánk (pl. DBeaver-ből), akkorlocalhost:5432-t kellene megadni.
Adatok betöltése PostgreSQL-be¶
-
Töltsük be az 1. feladatban generált Parquet fájlokat az adatbázisba:
tables = ["customers", "products", "orders", "order_items"] print("=== Betöltés PostgreSQL-be ===\n") for table in tables: df = pd.read_parquet(f"/home/jovyan/data/raw/{table}.parquet") df.to_sql(table, engine, if_exists="replace", index=False) print(f" {table:<15} → {len(df):>6,} sor betöltve")
Index létrehozása és EXPLAIN ANALYZE¶
-
Hozzunk létre indexet az
order_dateoszlopra, és nézzük meg, hogyan változik a lekérdezési terv:print("=== EXPLAIN ANALYZE – index nélkül ===\n") with engine.connect() as conn: plan = conn.execute(text(""" EXPLAIN ANALYZE SELECT * FROM orders WHERE order_date >= NOW() - INTERVAL '30 days' """)).fetchall() for row in plan: print(" ", row[0]) # Index létrehozása with engine.connect() as conn: conn.execute(text( "CREATE INDEX IF NOT EXISTS idx_orders_date ON orders(order_date)" )) conn.commit() print("\nIndex létrehozva: idx_orders_date\n") print("=== EXPLAIN ANALYZE – index után ===\n") with engine.connect() as conn: plan = conn.execute(text(""" EXPLAIN ANALYZE SELECT * FROM orders WHERE order_date >= NOW() - INTERVAL '30 days' """)).fetchall() for row in plan: print(" ", row[0])Seq Scan vs Index Scan
Index nélkül a PostgreSQL Seq Scan-t (soros olvasás) végez – minden sort megvizsgál. Index után Index Scan vagy Bitmap Index Scan látható, ami csak a releváns sorokat olvassa be. A
rows=ésactual time=értékek megmutatják a valós különbséget.
Full Extract¶
-
Kérdezzük le az összes táblát, és mentsük Parquet-ba:
os.makedirs("/home/jovyan/data/extract/full", exist_ok=True) print("=== Full Extract ===\n") full_stats = [] for table in tables: start = time.time() df = pd.read_sql(f"SELECT * FROM {table}", engine) elapsed = time.time() - start path = f"/home/jovyan/data/extract/full/{table}.parquet" df.to_parquet(path, index=False) full_stats.append({ "tábla": table, "sorok": len(df), "méret (KB)": round(os.path.getsize(path) / 1024, 1), "idő (ms)": round(elapsed * 1000, 1), }) full_df = pd.DataFrame(full_stats) print(full_df.to_string(index=False))
Incremental Extract¶
-
Kérdezzük le csak az utolsó 30 napban keletkezett rendeléseket:
os.makedirs("/home/jovyan/data/extract/incremental", exist_ok=True) print("=== Incremental Extract (utolsó 30 nap) ===\n") start = time.time() incr_df = pd.read_sql( "SELECT * FROM orders WHERE order_date >= NOW() - INTERVAL '30 days'", engine, ) incr_elapsed = time.time() - start incr_path = "/home/jovyan/data/extract/incremental/orders.parquet" incr_df.to_parquet(incr_path, index=False) full_path = "/home/jovyan/data/extract/full/orders.parquet" print(f" Full sorok: {len(pd.read_parquet(full_path)):>6,}") print(f" Incremental sorok: {len(incr_df):>6,}") print(f" Full méret: {os.path.getsize(full_path)/1024:>6.1f} KB") print(f" Incremental méret: {os.path.getsize(incr_path)/1024:>6.1f} KB")
CDC szimulálása¶
-
Szimulálunk egy Change Data Capture mintájú deltakinyerést: frissítünk 50 rendelést, és csak a változott sorokat kérjük le:
print("=== CDC szimulálás ===\n") # updated_at oszlop hozzáadása, ha még nincs with engine.connect() as conn: conn.execute(text(""" ALTER TABLE orders ADD COLUMN IF NOT EXISTS updated_at TIMESTAMP DEFAULT NOW() """)) conn.commit() # 50 rendelés frissítése 'delivered' státuszra cutoff = datetime.now() with engine.connect() as conn: conn.execute(text(""" UPDATE orders SET status = 'delivered', updated_at = NOW() WHERE order_id IN ( SELECT order_id FROM orders WHERE status = 'shipped' LIMIT 50 ) """)) conn.commit() print("50 rendelés státusza frissítve: shipped → delivered\n") # Delta extract: csak a cutoff után módosított sorok delta_df = pd.read_sql( f"SELECT * FROM orders WHERE updated_at >= '{cutoff.isoformat()}'", engine, ) delta_path = "/home/jovyan/data/extract/incremental/orders_delta.parquet" delta_df.to_parquet(delta_path, index=False) print(f"CDC delta sorok: {len(delta_df)} (csak a frissített rendelések)") print(f"Delta méret: {os.path.getsize(delta_path)/1024:.1f} KB")Mikor melyiket?
- Full extract: egyszerű, megbízható, kis és ritkán változó táblákhoz ideális. Hátránya: nagy tábláknál lassú és drága.
- Incremental extract: csak a változott adatokat kéri le — sokkal hatékonyabb. Korlátja: szükséges egy
order_datevagyupdated_atmező, és a DELETE műveletek láthatatlanok maradnak. - CDC (Change Data Capture): az
updated_attimestamp alapján csak az utolsó futás óta megváltozott sorokat kérjük le — ez a leghatékonyabb módszer valódi rendszereknél.
Full vs Incremental összehasonlítás¶
-
Készítsünk összehasonlítást:
full_orders_size = os.path.getsize("/home/jovyan/data/extract/full/orders.parquet") incr_orders_size = os.path.getsize(incr_path) full_count = len(pd.read_parquet("/home/jovyan/data/extract/full/orders.parquet")) comparison = pd.DataFrame([ { "Módszer": "Full extract", "Sorok": full_count, "Méret (KB)": round(full_orders_size / 1024, 1), "Idő (ms)": round(full_stats[2]["idő (ms)"], 1), }, { "Módszer": "Incremental (30 nap)", "Sorok": len(incr_df), "Méret (KB)": round(incr_orders_size / 1024, 1), "Idő (ms)": round(incr_elapsed * 1000, 1), }, ]) print("=== Összehasonlítás ===\n") print(comparison.to_string(index=False)) ratio = full_orders_size / incr_orders_size print(f"\nAdat-csökkentés: {ratio:.1f}×")
Parquet séma és metaadat vizsgálat¶
-
Vizsgáljuk meg az egyik Parquet fájl sémáját és metaadatait, és hasonlítsuk össze a CSV formátummal:
schema = pq.read_schema("/home/jovyan/data/extract/full/orders.parquet") meta = pq.read_metadata("/home/jovyan/data/extract/full/orders.parquet") print("=== orders.parquet séma ===\n") for i, field in enumerate(schema): print(f" {field.name:<20} {str(field.type):<15}") print(f"\nRow group-ok száma: {meta.num_row_groups}") print(f"Összes sor: {meta.num_rows:,}") print(f"Serialized méret: {meta.serialized_size / 1024:.1f} KB") # CSV vs Parquet méretarány csv_path = "/home/jovyan/data/extract/full/orders.csv" pd.read_parquet("/home/jovyan/data/extract/full/orders.parquet").to_csv(csv_path, index=False) parquet_kb = os.path.getsize("/home/jovyan/data/extract/full/orders.parquet") / 1024 csv_kb = os.path.getsize(csv_path) / 1024 print(f"\nCSV méret: {csv_kb:>8.1f} KB") print(f"Parquet méret: {parquet_kb:>8.1f} KB") print(f"Tömörítési arány: {csv_kb / parquet_kb:.1f}× (Parquet kisebb)")Row group
A Parquet fájl row group-okra van felosztva. Minden row group az adatok egy részét tárolja. Az analitikai motorok (DuckDB, Spark) a row group min/max statisztikái alapján dönthetik el, hogy egy adott row group tartalmaz-e a WHERE feltételnek megfelelő sorokat — ha nem, kihagyják az olvasását. Ez a predicate pushdown alapja.
-
Commitoljuk a notebookot.
Beadandó¶
2. feladat beadandó
- Commitoljuk a notebookot.
- Készítsünk egy képernyőképet a 8. cella kimenetéről (full vs incremental összehasonlítás), és mentsük el a repository gyökerébe
f2.pngnéven.