Quick wins with Pydantic

programmingdata_engineeringdata_qualitydata_validation

As the complexity of a data project grows, getting serious about data validation and data quality becomes increasingly important.

Pydantic can make this an effective, pain-free process.

I’ll not spend much time going through what Pydantic is, and will instead focus on demonstrating how data teams can use basic Pydantic features to improve data validation and data quality in their data pipelines.

If you’re new to Pydantic, and would like to spend some time getting familiar with what Pydantic is and the problems it tries to solve, I would recommend reading through the documentation here.

For this simple overview, we’ll focus on 4 simple steps:

1. Identify and understand the source data.
2. Define a Pydantic model based on this understanding.
3. Validate incoming data against this model.
4. Make data available for downstream dependencies. 

Identify and understand the source data

We’ll be working with the extremely simple Carbon Itensity API.

We’ll use the UK regional breakdown endpoint and set it to 1, which is the ID for North Scotland:

A simple get request to this endpoint returns the following result:

{
  "data": [
    {
      "regionid": 1,
      "dnoregion": "Scottish Hydro Electric Power Distribution",
      "shortname": "North Scotland",
      "data": [
        {
          "from": "2024-08-06T12:30Z",
          "to": "2024-08-06T13:00Z",
          "intensity": {
            "forecast": 0,
            "index": "very low"
          },
          "generationmix": [
            {
              "fuel": "biomass",
              "perc": 0
            },
            {
              "fuel": "coal",
              "perc": 0
            },
            {
              "fuel": "imports",
              "perc": 0
            },
            {
              "fuel": "gas",
              "perc": 0
            },
            {
              "fuel": "nuclear",
              "perc": 0
            },
            {
              "fuel": "other",
              "perc": 0
            },
            {
              "fuel": "hydro",
              "perc": 0
            },
            {
              "fuel": "solar",
              "perc": 0
            },
            {
              "fuel": "wind",
              "perc": 100
            }
          ]
        }
      ]
    }
  ]
}

The structure of the JSON data includes:

  • A top-level object named data containing an array of objects.

  • Each object in this array represents a region with an ID number, name of the regional distribution network operator (dno), and the name of the region itself - as well as further nested data.

  • The nested data includes time information, intensity forecast, and energy generation data.

Define the Pydantic model

Now that we understand the data structure, we can define a Pydantic model to represent it:

from pydantic import (
    BaseModel,
    Field,
    ValidationError,
    field_validator,
    model_validator,
    ConfigDict,
)
from typing import List, Union

class GenerationMix(BaseModel):
    fuel: str
    perc: Union[float, int]

class Intensity(BaseModel):
    forecast: int
    index: str

class RegionData(BaseModel):
    from_: str = Field(..., alias="from")
    to: str
    intensity: Intensity
    generationmix: List[GenerationMix]

    @model_validator(mode="after")
    def parse_datetime(self):
        for field in ["from_", "to"]:
            value = getattr(self, field)
            if isinstance(value, str):
                setattr(self, field, datetime.strptime(value, "%Y-%m-%dT%H:%MZ"))
        return self

class Region(BaseModel):
    regionid: int
    dnoregion: str
    shortname: str
    data: List[RegionData]

    @field_validator("dnoregion", "shortname")
    @classmethod
    def lowercase_shortname(cls, value):
        if isinstance(value, str):
            return value.lower()
        return value

class CarbonIntensityResponse(BaseModel):
    model_config = ConfigDict(strict=True)

    data: List[Region]

Let’s break down the key components of this Pydantic model:

  • BaseModel: All our model components inherit from pydantic.BaseModel, which provides the core functionality.

  • Python type hints: We use type hints (e.g., str, int, List[]) to specify the expected data types of each field.

  • Custom field validators: The @field_validator decorator allows us to define custom validation or transformation logic for specific data fields. In this case, we’re converting the region names to lowercase (for demonstrative purposes)

  • Custom model validators: The @model_validator decorator lets us define validation logic that acts on the entire model instance itself. Here, we’re parsing datetime strings into datetime objects (for demonstrative purposes)

  • Model config: We use ConfigDict(strict=True) to enforce strict type checking, ensuring that the input data matches our model exactly.

The Pydantic model allows us to clearly define what we expect to be returned, and layer on top some simple data transformations in a really clear and simple way.

Validate incomming data

Once we have made a simple request to the API endpoint:

def get_data() -> bytes:
    """
    Make API call.

    Returns:
        reponse content in bytes.
    """
    try:
        url = "https://api.carbonintensity.org.uk/regional/regionid/1"
        response = requests.get(url)
        response.raise_for_status()
        result = response.content
        return result

    except Exception as e:
        raise e

Validating the response against our Pydantic model is very straightforward:

def validate_data(json_data: bytes) -> CarbonIntensityResponse:
    """
    Validate data against Pydantic model and perform some minor transformations.

    Args:
        json bytes content.

    Returns:
        reponse content in bytes.
    """
    try:
        validated = CarbonIntensityResponse.model_validate_json(json_data, strict=True)
        return validated

    except ValidationError as e:
        print(f"Data validation error: {e}")
        raise e

We can use the model_validate_json() method to validate our json data against the Pydantic model we have created - see the documentation for more information on strict mode.

If validation is successful, model_validate_json() returns an instance of the Pydantic model as shown below.

Pydatnic model instance:

data = [
    Region(
        regionid=1,
        dnoregion='scottish hydro electric power distribution',
        shortname='north scotland',
        data=[
            RegionData(
                from_=datetime(2024, 8, 11, 17, 0),
                to=datetime(2024, 8, 11, 17, 30),
                intensity=Intensity(forecast=0, index='very low'),
                generationmix=[
                    GenerationMix(fuel='biomass', perc=0),
                    GenerationMix(fuel='coal', perc=0),
                    GenerationMix(fuel='imports', perc=0),
                    GenerationMix(fuel='gas', perc=0),
                    GenerationMix(fuel='nuclear', perc=0),
                    GenerationMix(fuel='other', perc=0),
                    GenerationMix(fuel='hydro', perc=24.2),
                    GenerationMix(fuel='solar', perc=0),
                    GenerationMix(fuel='wind', perc=75.8)
                ]
            )
        ]
    )
]

As we can see, this is not too dissimilar to the original JSON data we started of with and the custom validators we implemented were successful.

Make data available for downstream dependencies

Pydantic models makes it super simple to flatten data structures using dot notation to access the fields and values you want - this is due to Pydantic models being similir to Python data classes.

This technique works particularly well when your end goal is to create a simple Dataframe.

def create_df(validated_model: CarbonIntensityResponse) -> pd.DataFrame:
    """
    Create dataframe from validated model.

    Args:
        validated_model (CarbonIntensityResponse)

    Returns:
        pd.DataFrame if successful.
    """

    try:
        flattened_data = []
        for region in validated_model.data:
            for region_data in region.data:
                row = {
                    "regionid": region.regionid,
                    "dnoregion": region.dnoregion,
                    "shortname": region.shortname,
                    "from": region_data.from_,
                    "to": region_data.to,
                    "intensity_forecast": region_data.intensity.forecast,
                    "intensity_index": region_data.intensity.index,
                }
                # Add generation mix data
                for gen_mix in region_data.generationmix:
                    row[f"gen_{gen_mix.fuel}"] = gen_mix.perc
                flattened_data.append(row)

        # Create the DataFrame
        df = pd.DataFrame(flattened_data)

        return df

    except Exception as e:
        raise e

This approach allows us to easily access nested data (like region_data.intensity.forecast) and iterate over lists (like region_data.generationmix) to create a flat structure.

Example dataframe:

   regionid                                   dnoregion  ... gen_solar gen_wind
0         1  scottish hydro electric power distribution  ...         0     80.1

[1 rows x 16 columns]

Conclusion

By following these four steps:

1. Identify and understand the source data.
2. Define a Pydantic model based on this understanding.
3. Validate incoming data against this model.
4. Make data available for downstream dependencies. 

We’ve created an incredibly simple workflow to ensure that downstream data dependencies, such as analytical dashboards, are provided with the correct data.

Data issues should be flagged as early as possible in the data processing process to ensure that analysts don’t waste time figuring out why their chart doesn’t work - due to missing data fields or incorrect datatypes.

Although the example that I have used is very simple, it is easy to see how these principles could be applied to more complex datasets and projects.

Code snippets can be found here