v0.8.2

Examples & Tutorials

This section provides end-to-end examples of using Luminara for various data processing tasks.

1. Basic ETL: CSV to JSON

A simple batch pipeline that reads a CSV file, parses the data, and writes it to a JSON Lines file.

import asyncio
import csv
import json
from luminara import Pipeline, stages

class CSVSource(stages.Source):
    def __init__(self, filepath):
        self.filepath = filepath

    async def read(self):
        with open(self.filepath, 'r') as f:
            reader = csv.DictReader(f)
            for row in reader:
                yield row

class JSONSink(stages.Sink):
    def __init__(self, filepath):
        self.filepath = filepath

    async def write(self, record):
        with open(self.filepath, 'a') as f:
            f.write(json.dumps(record) + '\n')

async def main():
    pipeline = Pipeline(
        source=CSVSource('data.csv'),
        sink=JSONSink('output.jsonl')
    )
    await pipeline.run()

if __name__ == "__main__":
    asyncio.run(main())

2. Streaming Data Processing

This example demonstrates how to consume a continuous stream of data from a WebSocket API and process it in real-time.

import aiohttp
from luminara import Pipeline, stages

class WebSocketSource(stages.Source):
    async def read(self):
        async with aiohttp.ClientSession() as session:
            async with session.ws_connect('wss://api.example.com/feed') as ws:
                async for msg in ws:
                    if msg.type == aiohttp.WSMsgType.TEXT:
                        yield json.loads(msg.data)

class FilterTrades(stages.Transform):
    async def process(self, record):
        # Keep only trades above $1000
        if record['amount'] > 1000:
            return record
        return None  # Drop small trades

async def main():
    pipeline = Pipeline(
        source=WebSocketSource(),
        stages=[FilterTrades()],
        sink=stages.ConsoleSink()  # Built-in sink
    )
    # Run indefinitely until interrupted
    await pipeline.run_forever()

3. Creating a Custom Plugin

Luminara is designed to be extensible. You can easily create reusable plugins by subclassing the core stage types.

from luminara.stages import Transform
import hashlib

class HashField(Transform):
    """
    A reusable transform that hashes a specific field using SHA256.
    """
    def __init__(self, field_name: str, salt: str = ""):
        self.field = field_name
        self.salt = salt

    async def process(self, record: dict) -> dict:
        if self.field in record:
            val = str(record[self.field]) + self.salt
            hashed = hashlib.sha256(val.encode()).hexdigest()
            record[f"{self.field}_hash"] = hashed
        return record

# Usage
# pipeline = Pipeline(..., stages=[HashField("email", salt="s3cr3t")], ...)

4. Error Handling Patterns

Handling errors gracefully is critical in data pipelines. Luminara provides a mechanism to redirect failed records to a "Dead Letter Queue" (DLQ).

from luminara import Pipeline
from luminara.stages import Transform, FileSink

class RiskyTransform(Transform):
    async def process(self, record):
        try:
            # Simulate a risky operation
            record['value'] = 100 / record['divisor']
            return record
        except Exception as e:
            # Tag the record with the error
            record['error'] = str(e)
            # Route to DLQ (handled by pipeline config)
            raise stages.DropRecord(reason="calculation_error", record=record)

async def main():
    dlq = FileSink("errors.jsonl")
    
    pipeline = Pipeline(
        source=...,
        stages=[RiskyTransform()],
        sink=...,
        dlq_sink=dlq  # Failures go here
    )
    await pipeline.run()