Luminara
A lightweight, asynchronous data pipeline framework for Python 3.9+ designed for high-throughput streaming and batch processing.
Overview
Luminara provides a minimal yet powerful abstraction for defining data workflows. Unlike heavy orchestration platforms like Airflow or Prefect, Luminara focuses on the execution layer — specifically, efficient in-memory processing of data streams with backpressure handling.
It is built on top of Python's asyncio library, making it ideal for I/O-bound workloads such as web scraping, API ingestion, and log processing. The core philosophy is "configuration over boilerplate," allowing you to define complex ETL graphs in simple YAML or Python.
Key Features
- Asynchronous Core: Built from the ground up with
async/awaitfor non-blocking I/O operations. - Backpressure Management: Automatic buffer sizing and flow control prevents memory overflows during consumer slow-downs.
- Plugin System: Easily extensible architecture. Write custom Sources, Transforms, and Sinks with minimal code.
- Schema Validation: Optional Pydantic integration for runtime data validation at any pipeline stage.
- Zero External Dependencies: The core library has no hard dependencies other than Python standard library (optional integrations available).
Quick Start
Get up and running with a simple pipeline that reads from a generic generator, transforms data, and writes to stdout.
1. Install Luminara
$ pip install luminara
2. Define a Pipeline
Create a file named pipeline.py:
import asyncio
from luminara import Pipeline
from luminara.stages import Source, Transform, Sink
# 1. Define a Source
class NumberGenerator(Source):
async def read(self):
for i in range(10):
yield {"value": i}
# 2. Define a Transform
class DoubleValue(Transform):
async def process(self, item):
item["doubled"] = item["value"] * 2
return item
# 3. Define a Sink
class ConsoleOutput(Sink):
async def write(self, item):
print(f"Processed: {item}")
# 4. Run the Pipeline
async def main():
pipeline = Pipeline(
source=NumberGenerator(),
stages=[DoubleValue()],
sink=ConsoleOutput()
)
await pipeline.run()
if __name__ == "__main__":
asyncio.run(main())
3. Run It
$ python pipeline.py
Processed: {'value': 0, 'doubled': 0}
Processed: {'value': 1, 'doubled': 2}
...
Processed: {'value': 9, 'doubled': 18}
Use Cases
Luminara is optimized for:
- Log Aggregation: Tail files, parse structured logs, and ship to Elasticsearch or S3.
- CDC (Change Data Capture): Stream database changes to message queues like Kafka or NATS.
- Real-time Analytics: Windowed aggregation of event streams for dashboarding.