Back to blog
data-engineeringprefectworkflow-orchestration

Modern Data Orchestration with Prefect

If you've spent any time in data engineering, you've probably wrestled with Airflow. It works, but it comes with a lot of ceremony — DAG files that have to live in specific directories, a scheduler…

Modern Data Orchestration with Prefect

If you've spent any time in data engineering, you've probably wrestled with Airflow. It works, but it comes with a lot of ceremony — DAG files that have to live in specific directories, a scheduler that needs babysitting, and a local development experience that can be genuinely painful. Prefect is a serious alternative that takes a more developer-friendly approach, and it's worth understanding why teams are switching.

Why Airflow Feels Dated

Airflow was built in 2014 and open-sourced by Airbnb. For its time, it was a massive step forward. But the architecture shows its age:

  • Your workflow logic lives in DAG files that Airflow parses on a schedule — this means import errors or slow imports can silently break your scheduler
  • Local testing is awkward because you need a running Airflow environment to properly validate your DAGs
  • The UI is functional but not exactly a joy to use
  • Dynamic workflows (where the number of tasks depends on runtime data) are possible but painful
  • Prefect was built specifically to address these frustrations. The core philosophy is that your Python code should just be Python code, with orchestration added on top rather than baked in.

    How Prefect Actually Works

    The mental model is simple: you decorate your Python functions with @flow and @task, and Prefect handles the rest.

    from prefect import flow, task
    import httpx

    @task def fetch_data(url: str) -> dict: response = httpx.get(url) response.raise_for_status() return response.json()

    @task def process_data(data: dict) -> list: # Your transformation logic here return [item for item in data.get("results", []) if item.get("active")]

    @flow(name="data-pipeline") def run_pipeline(url: str): raw = fetch_data(url) processed = process_data(raw) print(f"Processed {len(processed)} records") return processed

    if __name__ == "__main__": run_pipeline("https://api.example.com/data")

    That's a fully functional Prefect flow. You can run it directly with python pipeline.py and it works like normal Python. No scheduler required, no special environment, no DAG directory. This is the biggest practical win — your local development loop is just running a Python script.

    Tasks vs Flows

    Understanding the difference matters:

    Tasks are the individual units of work. Prefect tracks their state (pending, running, completed, failed), handles retries, and caches results. They're decorated with @task.

    Flows are the orchestrators — they call tasks and other flows, define the overall logic, and are what you actually schedule. They're decorated with @flow.

    You can nest flows, which is useful for building modular pipelines:

    from prefect import flow, task

    @task(retries=3, retry_delay_seconds=10) def extract(source: str) -> list: # Fetch data from source return []

    @task def transform(data: list) -> list: return [{"processed": True, **row} for row in data]

    @task def load(data: list, destination: str): print(f"Loading {len(data)} rows to {destination}")

    @flow def etl_subflow(source: str, destination: str): data = extract(source) transformed = transform(data) load(transformed, destination)

    @flow(name="main-pipeline") def main_pipeline(): etl_subflow("source_a", "warehouse_a") etl_subflow("source_b", "warehouse_b")

    Notice the retries=3 on the extract task — that's built-in retry logic with one line. In Airflow, you'd configure that at the operator level with more boilerplate.

    Prefect vs Airflow: The Real Comparison

    Here's where they actually differ in practice:

    PrefectAirflow
    Local devRun as plain PythonNeeds Airflow running
    Dynamic tasksNative supportPossible but complex
    DeploymentPrefect Cloud or self-hostedSelf-hosted (typically)
    Learning curveGentleSteeper
    EcosystemGrowingMature and large
    The dynamic tasks point is worth expanding. In Prefect, if you want to create tasks based on runtime data, you just write Python:

    from prefect import flow, task

    @task def process_file(filename: str) -> str: return f"Processed: {filename}"

    @flow def process_all_files(filenames: list[str]): # This just works — tasks are created dynamically at runtime results = process_file.map(filenames) return results

    The .map() method runs process_file for each item in the list, in parallel by default. In Airflow, equivalent dynamic task mapping was only properly introduced in Airflow 2.3, and the syntax is considerably more verbose.

    Getting Started Practically

    Install Prefect and spin up the local UI:

    pip install prefect

    Start the local Prefect server (UI at http://localhost:4200)

    prefect server start

    In another terminal, run your flow:

    python pipeline.py

    You'll see the flow run appear in the UI at localhost:4200 with full task state tracking, logs, and timing information. This is genuinely impressive for a local setup — you get observability without any infrastructure overhead.

    For scheduling, you deploy your flow and attach a schedule:

    from prefect import flow
    from prefect.deployments import Deployment
    from prefect.server.schemas.schedules import CronSchedule

    @flow def my_pipeline(): pass

    if __name__ == "__main__": deployment = Deployment.build_from_flow( flow=my_pipeline, name="production-run", schedule=CronSchedule(cron="0 9 * * *"), # Daily at 9am ) deployment.apply()

    A Few Practical Tips

    Use caching for expensive tasks. If you're fetching data that doesn't change often, cache the result:

    from prefect import task
    from prefect.tasks import task_input_hash
    from datetime import timedelta

    @task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1)) def fetch_reference_data(endpoint: str) -> dict: # This result will be cached for 1 hour per unique endpoint pass

    Handle failures gracefully. Prefect lets you define what happens when a flow fails:

    from prefect import flow
    from prefect.states import Failed

    @flow(on_failure=[send_slack_alert]) def critical_pipeline(): pass

    Use Prefect Cloud for production. The self-hosted server is great for development, but Prefect Cloud gives you a managed control plane, team features, and better reliability for production workloads. There's a generous free tier that covers most small-to-medium use cases.

    Next Steps

    If you're evaluating Prefect for a real project, here's a concrete path forward:

  • Install Prefect and convert one existing script into a flow with a couple of tasks. The migration is usually 30 minutes of work for a simple script.
  • Run prefect server start and explore the UI while running your flow — seeing the observability out of the box is convincing.
  • Check out the Prefect docs on deployments if you need scheduling — the deployment model is the part that takes the most time to understand.
  • Compare your Airflow pain points against what Prefect offers. If you're not on Airflow yet, Prefect is probably the better starting point for a new project.
  • Prefect won't replace Airflow everywhere overnight — Airflow has a massive ecosystem and many teams have years of investment in it. But if you're starting fresh or your team is drowning in Airflow operational overhead, Prefect is worth a serious look.