Ana Sayfa / Blog / Batch processing pipeline tasarımı: 3 gerçek proje örneği

Batch processing pipeline tasarımı: 3 gerçek proje örneği

Rapor üretimi, veri göçü, gecelik aggregation pipeline'ları kurduğum 3 projeden ortak pattern'ler. Chunking, checkpoint, retry disipline, idempotency garanti.

Batch processing web mühendisliğinin göz ardı edilen alanlarından. Real-time API’ler cool, batch job’lar backend’in yorgun amcası. Ama pek çok sistemin omurgası batch: gecelik raporlar, aggregate tablo güncelleme, data migration, bulk email, invoice üretimi.

Son 2 yılda 3 farklı projede batch pipeline tasarladım:
1. Bir SaaS için gecelik kullanıcı rapor üretimi (500K user)
2. Bir e-ticaret için yılsonu invoice batch (100K sipariş)
3. Bir fintech için legacy DB’den yeni şemaya migration (12M row)

Her üçünde ortak 6 tasarım prensibi ortaya çıktı.

1. Chunking: atomicity garantisi

İlk ve temel prensip: büyük işi küçük chunk’lara böl. 500K user’ı tek transaction’da işlemek yerine 1000’lik chunk’lar halinde.

Chunk boyutu seçimi:
– Memory’de tutulabilir kadar küçük (100MB’ı geçmeyin)
– Tek transaction’da rollback edilebilir kadar küçük
– Progress tracking anlamlı olacak kadar büyük (1 chunk/ms = takip imkansız)

Sweet spot genelde 500-5000 arası. Ben user-level job için 1000, row-level job için 5000 tercih ediyorum.

def process_in_chunks(query, chunk_size=1000):
    offset = 0
    while True:
        chunk = db.fetch(query, limit=chunk_size, offset=offset)
        if not chunk:
            break
        process_chunk(chunk)
        offset += chunk_size

Bu naif ama başlangıç noktası.

2. Checkpoint: restart-ability

Naif yaklaşımda pipeline 300.000. kayıtta çökerse, baştan başlıyorsunuz. 3 saatlik iş tekrar baştan. Checkpoint bunu çözüyor: her chunk sonrası pipeline “şimdi şu noktadayım” bilgisini kalıcı olarak kaydediyor.

Checkpoint tablosu basit:

CREATE TABLE batch_checkpoint (
    pipeline_name VARCHAR(255),
    last_processed_id BIGINT,
    updated_at TIMESTAMP,
    PRIMARY KEY (pipeline_name)
);

Her chunk sonrası update. Pipeline restart olursa checkpoint’ten devam.

def process_with_checkpoint(pipeline_name):
    last_id = get_checkpoint(pipeline_name)
    while True:
        chunk = db.fetch("SELECT * FROM users WHERE id > %s ORDER BY id LIMIT 1000", last_id)
        if not chunk:
            break
        process_chunk(chunk)
        last_id = chunk[-1].id
        save_checkpoint(pipeline_name, last_id)

Önemli: offset-based pagination yerine cursor-based (id > last_id). Offset büyüdükçe DB slow olur, cursor O(log n).

3. Idempotency: aynı chunk iki kere işlense de

Checkpoint save ile chunk process arasında crash olabilir. Pipeline restart’ta aynı chunk iki kere işlenir. Idempotency şart.

İdempotency pattern’ları:

Upsert kullanın. INSERT ... ON DUPLICATE KEY UPDATE veya MERGE statement. Aynı kayıt iki kere insert edilmez.

Unique key ile deduplication. Processing sırasında generated ID varsa deterministic olsun. UUID.uuid5(NAMESPACE, source_id) deterministic.

Idempotency key tablosu. “Bu chunk bu sürümde işlendi” kaydı tutun. Restart’ta skip.

CREATE TABLE batch_processed (
    pipeline_name VARCHAR(255),
    item_id BIGINT,
    processed_at TIMESTAMP,
    PRIMARY KEY (pipeline_name, item_id)
);

Chunk başında “bu item’lar daha önce işlenmiş mi?” sorgusu, zaten işlenmişleri skip.

4. Retry with dead letter

Bazı chunk’lar fail edecek: DB timeout, external API rate limit, corrupted data. İki strateji birlikte:

Retryable error: exponential backoff ile tekrar dene. Max 5 retry.

Non-retryable error (data corruption, schema mismatch): dead letter table’a at, pipeline devam etsin.

def process_chunk_with_retry(chunk):
    for attempt in range(5):
        try:
            process_chunk(chunk)
            return
        except RetryableError as e:
            time.sleep(2 ** attempt)
        except NonRetryableError as e:
            save_to_dead_letter(chunk, str(e))
            return
    save_to_dead_letter(chunk, "max_retries_exhausted")

Dead letter review haftalık yapın. Birikiyorsa pipeline’da bir problem var.

5. Observability: progress + ETA

Long-running batch’te en çok sorulan soru: “Ne zaman bitecek?”. Progress tracking + ETA prediction olmadan stakeholder ile iletişim kopuyor.

Her chunk sonrası log:

print(f"Processed {processed}/{total} ({processed/total*100:.1f}%) - ETA: {eta}")

ETA hesabı naive: geçen süre / işlenen = saniye/item. Kalan item * saniye/item = ETA.

Production’da Prometheus veya Datadog’a metric push:
batch_items_processed_total (counter)
batch_items_total (gauge)
batch_chunk_duration_seconds (histogram)

Grafana dashboard’unda progress bar görünce stress azalıyor.

6. Graceful shutdown

Pipeline çalışırken Kubernetes pod’u restart olursa ne olmalı? Naif yaklaşım: tam ortada kesiliyor, checkpoint eksik, duplicate risk.

Graceful shutdown:

  1. SIGTERM alındığında chunk’ın ortasında isen bitir, yenisini başlatma
  2. Checkpoint’i kaydet
  3. Clean exit
import signal

shutdown_requested = False

def handle_sigterm(*args):
    global shutdown_requested
    shutdown_requested = True

signal.signal(signal.SIGTERM, handle_sigterm)

while True:
    chunk = fetch_chunk()
    if not chunk or shutdown_requested:
        break
    process_chunk(chunk)
    save_checkpoint()

Kubernetes 30 saniye default grace period veriyor, chunk size’ı 30 saniyede bitecek kadar küçük tutun.

Parallel processing: dikkatli

Batch hızlandırmak için parallel chunk işleme: 4 worker, her biri farklı chunk.

Parallel pattern pitfall’ları:
– Checkpoint yönetimi daha karmaşık (hangi chunk bitmiş?)
– DB connection pool’u tükenme riski
– Downstream rate limit’e daha hızlı ulaşma
– Deadlock ihtimali artar (farklı chunk aynı row’a yazma)

2 projede parallel yaptım, 1’inde serial. Parallel %40 hızlandı ama 3 kat daha fazla debug effort istedi. Benchmark yapmadan parallel gitmeyin.

Gerçek proje sonuçları

SaaS user reports (500K user):
– Chunk: 1000 user, parallel: 4 worker
– Süre: 45 dakika (daha önce 6 saat)
– Crash durumunda 90 saniye içinde checkpoint’ten devam

E-ticaret invoice batch (100K sipariş):
– Chunk: 500 sipariş, serial
– Süre: 22 dakika
– Idempotency ile yılda 3 kez restart yaşandı, manual cleanup hiç olmadı

Fintech DB migration (12M row):
– Chunk: 10.000 row, parallel: 8 worker
– Süre: 14 saat (target 24 saat altı idi)
– Dead letter table: 47 row (corrupted legacy data), manual inceleme

Son ders

Batch processing kod yazmak değil, disiplin yazmak. Chunking + checkpoint + idempotency + retry + observability + graceful shutdown kombine olmadan production’da zararlı olur.

Bu 6 prensibi baştan inşa ederseniz batch pipeline’ınız gece uykunuzu bölmez. Atlasanız altı ay sonra tamir etmeniz gerekir.

Bu konuda bir projeniz mi var?

Kısa bir özet bırakın, 24 saat içinde size dönüş yapayım.

İletişime Geç