Skip to main content

Building Robust Data Pipelines with Metadata: A Python MCP Pattern Demo

Data pipelines are the backbone of modern data systems, but they can often become complex and brittle. A common pain point is managing state and passing context between different processing stages. Relying on cryptic filename conventions or implicit directory structures often leads to errors, difficult debugging, and maintenance headaches.

What if there was a better way? What if we could explicitly pass instructions and track the state of our data directly alongside it? This is where metadata-driven pipelines come in.

This post explores a pattern for building more robust pipelines using explicit metadata files for context passing. It showcases a simple Python project demonstrating this technique, inspired by the Model Context Protocol (MCP) concept, to manage a file validation and loading workflow.

Find the complete project code on GitHub

The Problem: Fragile Pipeline Choreography

Imagine a typical scenario:

  • Files arrive from different departments (Sales, Marketing).
  • They need different validation rules applied.
  • Validated files need to be loaded into different target locations (e.g., database tables).

Traditional approaches might involve:

  • Filename Parsing: SALES_Leads_US_20231101_validated.csv - Prone to typos, hard to extend.
  • Directory Watching: Moving files between /pending/, /validated/, /failed/ - State management becomes complex, race conditions possible.
  • Implicit Knowledge: Scripts "just know" that files starting with "SALES_" go to the sales table. This breaks as soon as requirements change.

These methods lack clear traceability and make debugging a nightmare when something inevitably goes wrong. Where did this file come from? What rules should have been applied? Why did it end up here?

The Solution: Metadata-Driven Context Passing

The core idea is simple: separate the context about the data from the data itself, and pass this context explicitly between pipeline stages.

We achieve this using a metadata sidecar file. For every data file (e.g., sales_data_xyz.csv), we create a corresponding JSON file (sales_data_xyz.csv.mcp.json) that acts as its passport or manifest.

Our "MCP" Implementation: The JSON Context File

Inspired by the concept of Model Context Protocols (providing context around an ML model), we use a structured JSON file. Note that this is a project-specific convention, not a formal, standardised I/O protocol. It's a pattern for information exchange.

Key fields in our .mcp.json file include:

  • job_id: A unique ID for this specific file's journey through the pipeline.
  • source_context: Information about the origin (e.g., department: "Sales").
  • processing_directives: Instructions for downstream tasks (e.g., validation_ruleset_id: "SALES_LEADS_V1", load_target_destination: "sales_leads_table").
  • status_info: Tracks the current_status ("Uploaded", "Validated", "Loaded", "Failed", etc.) and maintains a status_history log.
  • current_*_filepath: Explicitly stores the current path of the data and MCP files as they move.
// Example MCP Structure (abbreviated)
{
	"mcp_version": "1.0",
	"job_id": "abc-123",
	"current_data_filepath": "/path/to/incoming/sales_data_xyz.csv",
	"current_mcp_filepath": "/path/to/incoming/sales_data_xyz.csv.mcp.json",
	"source_context": { "department": "Sales", "file_type": "leads" },
	"processing_directives": {
		"validation_ruleset_id": "SALES_LEADS_V1",
		"load_target_destination": "sales_leads_table"
	},
	"status_info": {
		"current_status": "Uploaded",
		"status_history": [
			/* ... */
		],
		"error_message": null
	}
}

Showcasing the Project:

To see this pattern in action, I built a simple Python project simulating a file processing workflow:

Link to Repository

Project Structure Overview

metadata-driven-mcp-pipeline/
β”œβ”€β”€ archive/                # Final resting place for processed files
β”‚   β”œβ”€β”€ failed/             # Files that failed validation or loading
β”‚   └── success/            # Files that were successfully processed and loaded
β”œβ”€β”€ config/                 # Configuration files
β”‚   └── validation_rules.json # Defines required CSV columns for different rulesets
β”œβ”€β”€ incoming/               # Landing zone for newly "uploaded" files (data + MCP)
β”œβ”€β”€ processing_loading/     # Staging area for files that passed validation, awaiting loading
└── scripts/                # Python scripts driving the workflow
    β”œβ”€β”€ __init__.py         # Makes 'scripts' a Python package
    β”œβ”€β”€ common_utils.py     # Helper functions for MCP I/O, status updates, file moves
    β”œβ”€β”€ create_upload.py    # Simulates file upload, creates initial data + MCP
    β”œβ”€β”€ data_generator.py   # Generates dummy CSV data files
    β”œβ”€β”€ load_files.py       # Performs the (simulated) loading stage
    └── validate_files.py   # Performs the validation stage

Workflow Stages & MCP Interaction

  1. Upload (create_upload.py):

    • Simulates an upload (e.g., from "Sales").
    • Creates a dummy data CSV (data_generator.py).
    • Generates the initial .mcp.json file in incoming/.
    • Crucially, it populates the source_context and determines the correct processing_directives (like validation_ruleset_id) based on the source department.
    • Sets the initial current_status to "Uploaded" and logs this in status_history.

  1. Validation (validate_files.py):

    • Scans incoming/ for MCP files with current*status == "Uploaded".
    • Reads the validation_ruleset_id from the MCP file.
    • Loads corresponding rules from config/validation_rules.json.
    • Performs validation (here, just checking CSV headers).
    • Updates the MCP: Sets current_status to "Validated" or "ValidationFailed" (with error details).
    • Moves Files: Relocates both data and MCP files to processing_loading/ (on success) or archive/failed/ (on failure).
    • Updates Paths: Critically, updates the current_*_filepath fields within the MCP before saving it in the new location.

  1. Loading (load_files.py):

    • Scans processing_loading/ for MCP files with current_status == "Validated".
    • Reads the load_target_destination from the MCP file.
    • Performs a simulated load operation based on the target.
    • Updates the MCP: Sets current_status to "Loaded" or "LoadFailed".
    • Moves Files: Relocates both files to archive/success/ or archive/failed/.
    • Updates Paths & Saves: Saves the final MCP state in the archive directory.

Benefits Observed

  • Clarity: Each stage knows exactly what to do based on explicit instructions in the MCP.
  • Configuration: Validation rules are externalised, not hardcoded.
  • Traceability: The status_history and error_message in the final MCP provide a clear audit trail.
  • State Management: Combining MCP status and dedicated directories provides robust state tracking.
  • Decoupling: Stages operate independently based on the MCP contract.

Why Adopt This Pattern?

  • Using metadata files for context passing offers significant advantages:
  • Robustness: Less prone to errors caused by implicit conventions.
  • Maintainability: Easier to modify or extend individual stages or add new rules.
  • Testability: Stages can be tested more easily by providing mock MCP files.
  • Observability: Explicit state and history simplify monitoring and debugging.
  • Scalability: While our example is sequential, this pattern lends itself well to parallel processing managed by orchestrators, where each file's context travels with it independently.

Limitations & Next Steps

This project is a simplified demonstration. Real-world implementations could be enhanced by:

  • Schema Validation: Validating the structure of the MCP JSON itself (e.g., using JSON Schema).
  • Richer Validation: Implementing more complex data validation (data types, formats, cross-field checks) using libraries like Pandera or Great Expectations.
  • Real Loading: Interacting with actual databases, APIs, or storage systems.
  • Parallelism/Orchestration: Integrating these scripts into tools like Airflow, Prefect, or Dagster to manage execution, dependencies, retries, and enable parallel processing.

Conclusion

Moving away from brittle, implicit pipeline conventions towards explicit, metadata-driven context passing can dramatically improve the robustness, traceability, and maintainability of data workflows. While the "MCP" concept often relates to ML models, the underlying pattern of using structured metadata sidecar files is a valuable technique for many data engineering challenges. Our simple Python example demonstrates the core principles and benefits of this approach.

Consider giving this pattern a try in your next pipeline project! Check out the code and experiment.

Link to Repository: https://github.com/scriptstar/metadata-driven-mcp-pipeline


Post Tags: