Batch processing web mühendisliğinin göz ardı edilen alanlarından. Real-time API’ler cool, batch job’lar backend’in yorgun amcası. Ama pek çok sistemin omurgası batch: gecelik raporlar, aggregate tablo güncelleme, data migration, bulk email, invoice üretimi.
Son 2 yılda 3 farklı projede batch pipeline tasarladım:
1. Bir SaaS için gecelik kullanıcı rapor üretimi (500K user)
2. Bir e-ticaret için yılsonu invoice batch (100K sipariş)
3. Bir fintech için legacy DB’den yeni şemaya migration (12M row)
Her üçünde ortak 6 tasarım prensibi ortaya çıktı.
1. Chunking: atomicity garantisi
İlk ve temel prensip: büyük işi küçük chunk’lara böl. 500K user’ı tek transaction’da işlemek yerine 1000’lik chunk’lar halinde.
Chunk boyutu seçimi:
– Memory’de tutulabilir kadar küçük (100MB’ı geçmeyin)
– Tek transaction’da rollback edilebilir kadar küçük
– Progress tracking anlamlı olacak kadar büyük (1 chunk/ms = takip imkansız)
Sweet spot genelde 500-5000 arası. Ben user-level job için 1000, row-level job için 5000 tercih ediyorum.
def process_in_chunks(query, chunk_size=1000):
offset = 0
while True:
chunk = db.fetch(query, limit=chunk_size, offset=offset)
if not chunk:
break
process_chunk(chunk)
offset += chunk_sizeBu naif ama başlangıç noktası.
2. Checkpoint: restart-ability
Naif yaklaşımda pipeline 300.000. kayıtta çökerse, baştan başlıyorsunuz. 3 saatlik iş tekrar baştan. Checkpoint bunu çözüyor: her chunk sonrası pipeline “şimdi şu noktadayım” bilgisini kalıcı olarak kaydediyor.
Checkpoint tablosu basit:
CREATE TABLE batch_checkpoint (
pipeline_name VARCHAR(255),
last_processed_id BIGINT,
updated_at TIMESTAMP,
PRIMARY KEY (pipeline_name)
);Her chunk sonrası update. Pipeline restart olursa checkpoint’ten devam.
def process_with_checkpoint(pipeline_name):
last_id = get_checkpoint(pipeline_name)
while True:
chunk = db.fetch("SELECT * FROM users WHERE id > %s ORDER BY id LIMIT 1000", last_id)
if not chunk:
break
process_chunk(chunk)
last_id = chunk[-1].id
save_checkpoint(pipeline_name, last_id)Önemli: offset-based pagination yerine cursor-based (id > last_id). Offset büyüdükçe DB slow olur, cursor O(log n).
3. Idempotency: aynı chunk iki kere işlense de
Checkpoint save ile chunk process arasında crash olabilir. Pipeline restart’ta aynı chunk iki kere işlenir. Idempotency şart.
İdempotency pattern’ları:
Upsert kullanın. INSERT ... ON DUPLICATE KEY UPDATE veya MERGE statement. Aynı kayıt iki kere insert edilmez.
Unique key ile deduplication. Processing sırasında generated ID varsa deterministic olsun. UUID.uuid5(NAMESPACE, source_id) deterministic.
Idempotency key tablosu. “Bu chunk bu sürümde işlendi” kaydı tutun. Restart’ta skip.
CREATE TABLE batch_processed (
pipeline_name VARCHAR(255),
item_id BIGINT,
processed_at TIMESTAMP,
PRIMARY KEY (pipeline_name, item_id)
);Chunk başında “bu item’lar daha önce işlenmiş mi?” sorgusu, zaten işlenmişleri skip.
4. Retry with dead letter
Bazı chunk’lar fail edecek: DB timeout, external API rate limit, corrupted data. İki strateji birlikte:
Retryable error: exponential backoff ile tekrar dene. Max 5 retry.
Non-retryable error (data corruption, schema mismatch): dead letter table’a at, pipeline devam etsin.
def process_chunk_with_retry(chunk):
for attempt in range(5):
try:
process_chunk(chunk)
return
except RetryableError as e:
time.sleep(2 ** attempt)
except NonRetryableError as e:
save_to_dead_letter(chunk, str(e))
return
save_to_dead_letter(chunk, "max_retries_exhausted")Dead letter review haftalık yapın. Birikiyorsa pipeline’da bir problem var.
5. Observability: progress + ETA
Long-running batch’te en çok sorulan soru: “Ne zaman bitecek?”. Progress tracking + ETA prediction olmadan stakeholder ile iletişim kopuyor.
Her chunk sonrası log:
print(f"Processed {processed}/{total} ({processed/total*100:.1f}%) - ETA: {eta}")ETA hesabı naive: geçen süre / işlenen = saniye/item. Kalan item * saniye/item = ETA.
Production’da Prometheus veya Datadog’a metric push:
– batch_items_processed_total (counter)
– batch_items_total (gauge)
– batch_chunk_duration_seconds (histogram)
Grafana dashboard’unda progress bar görünce stress azalıyor.
6. Graceful shutdown
Pipeline çalışırken Kubernetes pod’u restart olursa ne olmalı? Naif yaklaşım: tam ortada kesiliyor, checkpoint eksik, duplicate risk.
Graceful shutdown:
- SIGTERM alındığında chunk’ın ortasında isen bitir, yenisini başlatma
- Checkpoint’i kaydet
- Clean exit
import signal
shutdown_requested = False
def handle_sigterm(*args):
global shutdown_requested
shutdown_requested = True
signal.signal(signal.SIGTERM, handle_sigterm)
while True:
chunk = fetch_chunk()
if not chunk or shutdown_requested:
break
process_chunk(chunk)
save_checkpoint()Kubernetes 30 saniye default grace period veriyor, chunk size’ı 30 saniyede bitecek kadar küçük tutun.
Parallel processing: dikkatli
Batch hızlandırmak için parallel chunk işleme: 4 worker, her biri farklı chunk.
Parallel pattern pitfall’ları:
– Checkpoint yönetimi daha karmaşık (hangi chunk bitmiş?)
– DB connection pool’u tükenme riski
– Downstream rate limit’e daha hızlı ulaşma
– Deadlock ihtimali artar (farklı chunk aynı row’a yazma)
2 projede parallel yaptım, 1’inde serial. Parallel %40 hızlandı ama 3 kat daha fazla debug effort istedi. Benchmark yapmadan parallel gitmeyin.
Gerçek proje sonuçları
SaaS user reports (500K user):
– Chunk: 1000 user, parallel: 4 worker
– Süre: 45 dakika (daha önce 6 saat)
– Crash durumunda 90 saniye içinde checkpoint’ten devam
E-ticaret invoice batch (100K sipariş):
– Chunk: 500 sipariş, serial
– Süre: 22 dakika
– Idempotency ile yılda 3 kez restart yaşandı, manual cleanup hiç olmadı
Fintech DB migration (12M row):
– Chunk: 10.000 row, parallel: 8 worker
– Süre: 14 saat (target 24 saat altı idi)
– Dead letter table: 47 row (corrupted legacy data), manual inceleme
Son ders
Batch processing kod yazmak değil, disiplin yazmak. Chunking + checkpoint + idempotency + retry + observability + graceful shutdown kombine olmadan production’da zararlı olur.
Bu 6 prensibi baştan inşa ederseniz batch pipeline’ınız gece uykunuzu bölmez. Atlasanız altı ay sonra tamir etmeniz gerekir.