Skip to content

Data Reference

mmm_eval.data

Data loading and processing utilities.

Classes

DataLoader(data_path: str | Path)

Simple data loader for MMM evaluation.

Takes a data path and loads the data.

Initialize data loader with data path.

Parameters:

Name Type Description Default
data_path str | Path

Path to the data file (CSV, Parquet, etc.)

required

Raises:

Type Description
FileNotFoundError

If the data file does not exist.

Source code in mmm_eval/data/loaders.py
def __init__(self, data_path: str | Path):
    """Initialize data loader with data path.

    Args:
        data_path: Path to the data file (CSV, Parquet, etc.)

    Raises:
        FileNotFoundError: If the data file does not exist.

    """
    self.data_path = Path(data_path)

    if not self.data_path.exists():
        raise FileNotFoundError(f"Data file not found: {self.data_path}")
Functions
load() -> pd.DataFrame

Load data from the specified path.

Returns Loaded DataFrame

Raises ValueError: If the file format is not supported.

Source code in mmm_eval/data/loaders.py
def load(self) -> pd.DataFrame:
    """Load data from the specified path.

    Returns
        Loaded DataFrame

    Raises
        ValueError: If the file format is not supported.

    """
    ext = self.data_path.suffix.lower().lstrip(".")
    if ext not in DataLoaderConstants.ValidDataExtensions.all():
        raise ValueError(f"Unsupported file format: {self.data_path.suffix}")

    if ext == DataLoaderConstants.ValidDataExtensions.CSV:
        return self._load_csv()
    elif ext == DataLoaderConstants.ValidDataExtensions.PARQUET:
        return self._load_parquet()

DataPipeline(data: pd.DataFrame, framework: str, control_columns: list[str] | None, channel_columns: list[str], date_column: str, response_column: str, revenue_column: str, min_number_observations: int = DataPipelineConstants.MIN_NUMBER_OBSERVATIONS)

Data pipeline that orchestrates loading, processing, and validation.

Provides a simple interface to go from raw data file to validated DataFrame.

Initialize data pipeline.

Parameters:

Name Type Description Default
data DataFrame

DataFrame containing the data

required
framework str

name of supported framework

required
control_columns list[str] | None

List of control columns

required
channel_columns list[str]

List of channel columns

required
date_column str

Name of the date column

required
response_column str

Name of the response column

required
revenue_column str

Name of the revenue column

required
min_number_observations int

Minimum required number of observations

MIN_NUMBER_OBSERVATIONS
Source code in mmm_eval/data/pipeline.py
def __init__(
    self,
    data: pd.DataFrame,
    framework: str,
    control_columns: list[str] | None,
    channel_columns: list[str],
    date_column: str,
    response_column: str,
    revenue_column: str,
    min_number_observations: int = DataPipelineConstants.MIN_NUMBER_OBSERVATIONS,
):
    """Initialize data pipeline.

    Args:
        data: DataFrame containing the data
        framework: name of supported framework
        control_columns: List of control columns
        channel_columns: List of channel columns
        date_column: Name of the date column
        response_column: Name of the response column
        revenue_column: Name of the revenue column
        min_number_observations: Minimum required number of observations

    """
    # Initialize components
    self.data = data
    self.processor = DataProcessor(
        date_column=date_column,
        response_column=response_column,
        revenue_column=revenue_column,
        control_columns=control_columns,
        channel_columns=channel_columns,
    )
    self.validator = DataValidator(
        framework=framework,
        date_column=date_column,
        response_column=InputDataframeConstants.RESPONSE_COL,
        revenue_column=InputDataframeConstants.MEDIA_CHANNEL_REVENUE_COL,
        control_columns=control_columns,
        min_number_observations=min_number_observations,
    )
Functions
run() -> pd.DataFrame

Run the complete data pipeline: process → validate.

Returns Validated and processed DataFrame

Raises Various exceptions processing or validation steps

Source code in mmm_eval/data/pipeline.py
def run(self) -> pd.DataFrame:
    """Run the complete data pipeline: process → validate.

    Returns
        Validated and processed DataFrame

    Raises
        Various exceptions processing or validation steps

    """
    processed_df = self.processor.process(self.data)

    self.validator.run_validations(processed_df)

    return processed_df

DataProcessor(control_columns: list[str] | None, channel_columns: list[str], date_column: str = InputDataframeConstants.DATE_COL, response_column: str = InputDataframeConstants.RESPONSE_COL, revenue_column: str = InputDataframeConstants.MEDIA_CHANNEL_REVENUE_COL)

Simple data processor for MMM evaluation.

Handles data transformations like datetime casting, column renaming, etc.

Initialize data processor.

Parameters:

Name Type Description Default
control_columns list[str] | None

List of control columns

required
channel_columns list[str]

List of channel columns

required
date_column str

Name of the date column to parse and rename

DATE_COL
response_column str

Name of the response column to parse and rename

RESPONSE_COL
revenue_column str

Name of the revenue column to parse and rename

MEDIA_CHANNEL_REVENUE_COL
Source code in mmm_eval/data/processor.py
def __init__(
    self,
    control_columns: list[str] | None,
    channel_columns: list[str],
    date_column: str = InputDataframeConstants.DATE_COL,
    response_column: str = InputDataframeConstants.RESPONSE_COL,
    revenue_column: str = InputDataframeConstants.MEDIA_CHANNEL_REVENUE_COL,
):
    """Initialize data processor.

    Args:
        control_columns: List of control columns
        channel_columns: List of channel columns
        date_column: Name of the date column to parse and rename
        response_column: Name of the response column to parse and rename
        revenue_column: Name of the revenue column to parse and rename

    """
    self.date_column = date_column
    self.response_column = response_column
    self.revenue_column = revenue_column
    self.control_columns = control_columns
    self.channel_columns = channel_columns
Functions
process(df: pd.DataFrame) -> pd.DataFrame

Process the DataFrame with configured transformations.

Parameters:

Name Type Description Default
df DataFrame

Input DataFrame

required

Returns:

Type Description
DataFrame

Processed DataFrame

Raises:

Type Description
MissingRequiredColumnsError

If the required columns are not present.

InvalidDateFormatError

If the date column cannot be parsed.

Source code in mmm_eval/data/processor.py
def process(self, df: pd.DataFrame) -> pd.DataFrame:
    """Process the DataFrame with configured transformations.

    Args:
        df: Input DataFrame

    Returns:
        Processed DataFrame

    Raises:
        MissingRequiredColumnsError: If the required columns are not present.
        InvalidDateFormatError: If the date column cannot be parsed.

    """
    processed_df = df.copy()

    # Validate that all required columns exist
    self._validate_required_columns_present(
        df=processed_df,
        date_column=self.date_column,
        response_column=self.response_column,
        revenue_column=self.revenue_column,
        control_columns=self.control_columns,
        channel_columns=self.channel_columns,
    )

    # Parse date columns
    processed_df = self._parse_date_columns(processed_df, self.date_column)

    # Rename required columns
    processed_df = self._rename_required_columns(
        df=processed_df,
        response_column=self.response_column,
        revenue_column=self.revenue_column,
    )

    return processed_df.sort_values(self.date_column)

DataValidator(framework: str, date_column: str, response_column: str, revenue_column: str, control_columns: list[str] | None, min_number_observations: int = DataPipelineConstants.MIN_NUMBER_OBSERVATIONS)

Validator for MMM data with configurable validation rules.

Initialize validator with validation rules.

Parameters:

Name Type Description Default
framework str

a supported framework, one of pymc_marketing or meridian

required
date_column str

Name of the date column

required
response_column str

Name of the response column

required
revenue_column str

Name of the revenue column

required
control_columns list[str] | None

List of control columns

required
min_number_observations int

Minimum required number of observations for time series CV

MIN_NUMBER_OBSERVATIONS
Source code in mmm_eval/data/validation.py
def __init__(
    self,
    framework: str,
    date_column: str,
    response_column: str,
    revenue_column: str,
    control_columns: list[str] | None,
    min_number_observations: int = DataPipelineConstants.MIN_NUMBER_OBSERVATIONS,
):
    """Initialize validator with validation rules.

    Args:
        framework: a supported framework, one of `pymc_marketing` or `meridian`
        date_column: Name of the date column
        response_column: Name of the response column
        revenue_column: Name of the revenue column
        control_columns: List of control columns
        min_number_observations: Minimum required number of observations for time series CV

    """
    self.framework = framework
    self.date_column = date_column
    self.response_column = response_column
    self.revenue_column = revenue_column
    self.min_number_observations = min_number_observations
    self.control_columns = control_columns
Functions
run_validations(df: pd.DataFrame) -> None

Run all validations on the DataFrame.

Parameters:

Name Type Description Default
df DataFrame

Input DataFrame

required

Returns:

Type Description
None

Validation result with all errors and warnings

Source code in mmm_eval/data/validation.py
def run_validations(self, df: pd.DataFrame) -> None:
    """Run all validations on the DataFrame.

    Args:
        df: Input DataFrame

    Returns:
        Validation result with all errors and warnings

    """
    # Run each validation in order
    self._validate_not_empty(df)
    self._validate_schema(df)
    self._validate_data_size(df)
    self._validate_response_and_revenue_columns_xor_zeroes(df)

    # feature scaling is done automatically in Meridian
    if self.control_columns and self.framework == "pymc_marketing":
        self._check_control_variables_between_0_and_1(df=df, cols=self.control_columns)

Functions

generate_meridian_data()

Load and process a Meridian-compatible dataset for E2E testing.

The Excel file should be placed at: mmm_eval/data/sample_data/geo_media.xlsx

Returns DataFrame containing Meridian-compatible data with media channels, controls, and response variables

Source code in mmm_eval/data/synth_data_generator.py
def generate_meridian_data():
    """Load and process a Meridian-compatible dataset for E2E testing.

    The Excel file should be placed at: mmm_eval/data/sample_data/geo_media.xlsx

    Returns
        DataFrame containing Meridian-compatible data with media channels, controls, and
        response variables

    """
    # Path to the local Excel file
    excel_path = Path(__file__).parent / "sample_data" / "geo_media.xlsx"

    if not excel_path.exists():
        raise FileNotFoundError(
            f"Meridian sample data file not found at {excel_path}. "
            "Please download the file from "
            "https://github.com/google/meridian/raw/main/meridian/data/simulated_data/xlsx/geo_media.xlsx"
            f"and save it to {excel_path}."
        )

    df = pd.read_excel(excel_path, engine="openpyxl")
    df_mod = df.copy()
    df_mod["revenue"] = df_mod["revenue_per_conversion"] * df_mod["conversions"]

    df_mod = df.iloc[:, 1:]
    # restrict to only two geos
    df_mod = df_mod[df_mod["geo"].isin(["Geo0", "Geo1"])]
    df_mod["revenue"] = df_mod["revenue_per_conversion"] * df_mod["conversions"]
    df_mod = df_mod.drop(columns="revenue_per_conversion")

    # restrict to only post-2023
    df_mod = df_mod[pd.to_datetime(df_mod["time"]) > pd.Timestamp("2023-01-01")]
    df_mod = df_mod.rename(columns={"time": "date"})
    return df_mod

generate_pymc_data()

Generate synthetic MMM data for testing purposes.

Returns DataFrame containing synthetic MMM data with media channels, controls, and response variables

Source code in mmm_eval/data/synth_data_generator.py
def generate_pymc_data():
    """Generate synthetic MMM data for testing purposes.

    Returns
        DataFrame containing synthetic MMM data with media channels, controls, and response variables

    """
    seed: int = sum(map(ord, "mmm"))
    rng: np.random.Generator = np.random.default_rng(seed=seed)

    # date range
    min_date = pd.to_datetime("2018-04-01")
    max_date = pd.to_datetime("2021-09-01")

    df = pd.DataFrame(data={"date_week": pd.date_range(start=min_date, end=max_date, freq="W-MON")}).assign(
        year=lambda x: x["date_week"].dt.year,
        month=lambda x: x["date_week"].dt.month,
        dayofyear=lambda x: x["date_week"].dt.dayofyear,
    )

    n = df.shape[0]

    # media spend data
    channel_1 = 100 * rng.uniform(low=0.0, high=1, size=n)
    df["channel_1"] = np.where(channel_1 > 90, channel_1, channel_1 / 2)

    channel_2 = 100 * rng.uniform(low=0.0, high=1, size=n)
    df["channel_2"] = np.where(channel_2 > 80, channel_2, 0)

    # apply geometric adstock transformation
    alpha1: float = 0.4
    alpha2: float = 0.2

    df["channel_1_adstock"] = (
        geometric_adstock(x=df["channel_1"].to_numpy(), alpha=alpha1, l_max=8, normalize=True).eval().flatten()
    )

    df["channel_2_adstock"] = (
        geometric_adstock(x=df["channel_2"].to_numpy(), alpha=alpha2, l_max=8, normalize=True).eval().flatten()
    )

    # apply saturation transformation
    lam1: float = 4.0
    lam2: float = 3.0

    df["channel_1_adstock_saturated"] = logistic_saturation(x=df["channel_1_adstock"].to_numpy(), lam=lam1).eval()

    df["channel_2_adstock_saturated"] = logistic_saturation(x=df["channel_2_adstock"].to_numpy(), lam=lam2).eval()

    # trend + seasonal
    df["trend"] = (np.linspace(start=0.0, stop=50, num=n) + 10) ** (1 / 4) - 1

    df["cs"] = -np.sin(2 * 2 * np.pi * df["dayofyear"] / 365.5)
    df["cc"] = np.cos(1 * 2 * np.pi * df["dayofyear"] / 365.5)
    df["seasonality"] = 0.5 * (df["cs"] + df["cc"])

    # controls
    df["event_1"] = (df["date_week"] == "2019-05-13").astype(float)
    df["event_2"] = (df["date_week"] == "2020-09-14").astype(float)

    # generate quantity
    df["intercept"] = 1000.0  # Base quantity
    # noise
    df["epsilon"] = rng.normal(loc=0.0, scale=50.0, size=n)

    # amplitude = 1
    beta_1 = 400
    beta_2 = 150

    # Generate price with seasonal fluctuations
    base_price = 5
    price_seasonality = 0.03 * (df["cs"] + df["cc"])
    price_trend = np.linspace(0, 2, n)  # Gradual price increase
    df["price"] = base_price + price_seasonality + price_trend

    df["quantity"] = (
        df["intercept"]
        + df["trend"] * 100
        + df["seasonality"] * 200
        + df["price"] * -50
        + 150 * df["event_1"]
        + 250 * df["event_2"]
        + beta_1 * df["channel_1_adstock_saturated"]
        + beta_2 * df["channel_2_adstock_saturated"]
        + df["epsilon"]
    )
    # Calculate revenue
    df["revenue"] = df["price"] * df["quantity"]

    columns_to_keep = [
        "date_week",
        "quantity",
        "price",
        "revenue",
        "channel_1",
        "channel_2",
        "event_1",
        "event_2",
        "dayofyear",
    ]

    df = df[columns_to_keep]
    return df

Modules

constants

Defines the constants for the data pipeline.

Classes
DataLoaderConstants

Constants for the data loader.

Classes
ValidDataExtensions

Valid data extensions.

Functions
all() classmethod

Return list of all supported file extensions.

Source code in mmm_eval/data/constants.py
@classmethod
def all(cls):
    """Return list of all supported file extensions."""
    return [cls.CSV, cls.PARQUET]
DataPipelineConstants

Constants for the data pipeline.

InputDataframeConstants

Constants for the dataframe.

exceptions

Custom exceptions for data validation and processing.

Classes
DataValidationError

Bases: Exception

Raised when data validation fails.

EmptyDataFrameError

Bases: Exception

Raised when DataFrame is empty.

InvalidDateFormatError

Bases: Exception

Raised when date parsing fails.

MissingRequiredColumnsError

Bases: Exception

Raised when required columns are missing.

ValidationError

Bases: Exception

Base class for validation errors.

loaders

Data loading utilities for MMM evaluation.

Classes
DataLoader(data_path: str | Path)

Simple data loader for MMM evaluation.

Takes a data path and loads the data.

Initialize data loader with data path.

Parameters:

Name Type Description Default
data_path str | Path

Path to the data file (CSV, Parquet, etc.)

required

Raises:

Type Description
FileNotFoundError

If the data file does not exist.

Source code in mmm_eval/data/loaders.py
def __init__(self, data_path: str | Path):
    """Initialize data loader with data path.

    Args:
        data_path: Path to the data file (CSV, Parquet, etc.)

    Raises:
        FileNotFoundError: If the data file does not exist.

    """
    self.data_path = Path(data_path)

    if not self.data_path.exists():
        raise FileNotFoundError(f"Data file not found: {self.data_path}")
Functions
load() -> pd.DataFrame

Load data from the specified path.

Returns Loaded DataFrame

Raises ValueError: If the file format is not supported.

Source code in mmm_eval/data/loaders.py
def load(self) -> pd.DataFrame:
    """Load data from the specified path.

    Returns
        Loaded DataFrame

    Raises
        ValueError: If the file format is not supported.

    """
    ext = self.data_path.suffix.lower().lstrip(".")
    if ext not in DataLoaderConstants.ValidDataExtensions.all():
        raise ValueError(f"Unsupported file format: {self.data_path.suffix}")

    if ext == DataLoaderConstants.ValidDataExtensions.CSV:
        return self._load_csv()
    elif ext == DataLoaderConstants.ValidDataExtensions.PARQUET:
        return self._load_parquet()

pipeline

Data pipeline for MMM evaluation.

Classes
DataPipeline(data: pd.DataFrame, framework: str, control_columns: list[str] | None, channel_columns: list[str], date_column: str, response_column: str, revenue_column: str, min_number_observations: int = DataPipelineConstants.MIN_NUMBER_OBSERVATIONS)

Data pipeline that orchestrates loading, processing, and validation.

Provides a simple interface to go from raw data file to validated DataFrame.

Initialize data pipeline.

Parameters:

Name Type Description Default
data DataFrame

DataFrame containing the data

required
framework str

name of supported framework

required
control_columns list[str] | None

List of control columns

required
channel_columns list[str]

List of channel columns

required
date_column str

Name of the date column

required
response_column str

Name of the response column

required
revenue_column str

Name of the revenue column

required
min_number_observations int

Minimum required number of observations

MIN_NUMBER_OBSERVATIONS
Source code in mmm_eval/data/pipeline.py
def __init__(
    self,
    data: pd.DataFrame,
    framework: str,
    control_columns: list[str] | None,
    channel_columns: list[str],
    date_column: str,
    response_column: str,
    revenue_column: str,
    min_number_observations: int = DataPipelineConstants.MIN_NUMBER_OBSERVATIONS,
):
    """Initialize data pipeline.

    Args:
        data: DataFrame containing the data
        framework: name of supported framework
        control_columns: List of control columns
        channel_columns: List of channel columns
        date_column: Name of the date column
        response_column: Name of the response column
        revenue_column: Name of the revenue column
        min_number_observations: Minimum required number of observations

    """
    # Initialize components
    self.data = data
    self.processor = DataProcessor(
        date_column=date_column,
        response_column=response_column,
        revenue_column=revenue_column,
        control_columns=control_columns,
        channel_columns=channel_columns,
    )
    self.validator = DataValidator(
        framework=framework,
        date_column=date_column,
        response_column=InputDataframeConstants.RESPONSE_COL,
        revenue_column=InputDataframeConstants.MEDIA_CHANNEL_REVENUE_COL,
        control_columns=control_columns,
        min_number_observations=min_number_observations,
    )
Functions
run() -> pd.DataFrame

Run the complete data pipeline: process → validate.

Returns Validated and processed DataFrame

Raises Various exceptions processing or validation steps

Source code in mmm_eval/data/pipeline.py
def run(self) -> pd.DataFrame:
    """Run the complete data pipeline: process → validate.

    Returns
        Validated and processed DataFrame

    Raises
        Various exceptions processing or validation steps

    """
    processed_df = self.processor.process(self.data)

    self.validator.run_validations(processed_df)

    return processed_df

processor

Data processing utilities for MMM evaluation.

Classes
DataProcessor(control_columns: list[str] | None, channel_columns: list[str], date_column: str = InputDataframeConstants.DATE_COL, response_column: str = InputDataframeConstants.RESPONSE_COL, revenue_column: str = InputDataframeConstants.MEDIA_CHANNEL_REVENUE_COL)

Simple data processor for MMM evaluation.

Handles data transformations like datetime casting, column renaming, etc.

Initialize data processor.

Parameters:

Name Type Description Default
control_columns list[str] | None

List of control columns

required
channel_columns list[str]

List of channel columns

required
date_column str

Name of the date column to parse and rename

DATE_COL
response_column str

Name of the response column to parse and rename

RESPONSE_COL
revenue_column str

Name of the revenue column to parse and rename

MEDIA_CHANNEL_REVENUE_COL
Source code in mmm_eval/data/processor.py
def __init__(
    self,
    control_columns: list[str] | None,
    channel_columns: list[str],
    date_column: str = InputDataframeConstants.DATE_COL,
    response_column: str = InputDataframeConstants.RESPONSE_COL,
    revenue_column: str = InputDataframeConstants.MEDIA_CHANNEL_REVENUE_COL,
):
    """Initialize data processor.

    Args:
        control_columns: List of control columns
        channel_columns: List of channel columns
        date_column: Name of the date column to parse and rename
        response_column: Name of the response column to parse and rename
        revenue_column: Name of the revenue column to parse and rename

    """
    self.date_column = date_column
    self.response_column = response_column
    self.revenue_column = revenue_column
    self.control_columns = control_columns
    self.channel_columns = channel_columns
Functions
process(df: pd.DataFrame) -> pd.DataFrame

Process the DataFrame with configured transformations.

Parameters:

Name Type Description Default
df DataFrame

Input DataFrame

required

Returns:

Type Description
DataFrame

Processed DataFrame

Raises:

Type Description
MissingRequiredColumnsError

If the required columns are not present.

InvalidDateFormatError

If the date column cannot be parsed.

Source code in mmm_eval/data/processor.py
def process(self, df: pd.DataFrame) -> pd.DataFrame:
    """Process the DataFrame with configured transformations.

    Args:
        df: Input DataFrame

    Returns:
        Processed DataFrame

    Raises:
        MissingRequiredColumnsError: If the required columns are not present.
        InvalidDateFormatError: If the date column cannot be parsed.

    """
    processed_df = df.copy()

    # Validate that all required columns exist
    self._validate_required_columns_present(
        df=processed_df,
        date_column=self.date_column,
        response_column=self.response_column,
        revenue_column=self.revenue_column,
        control_columns=self.control_columns,
        channel_columns=self.channel_columns,
    )

    # Parse date columns
    processed_df = self._parse_date_columns(processed_df, self.date_column)

    # Rename required columns
    processed_df = self._rename_required_columns(
        df=processed_df,
        response_column=self.response_column,
        revenue_column=self.revenue_column,
    )

    return processed_df.sort_values(self.date_column)

schemas

Pydantic schemas for MMM data validation.

Classes
ValidatedDataSchema

Bases: DataFrameModel

Schema for MMM data validation.

Defines the bare minimum columns for MMM evaluation.

Classes
Config

Config for the schema.

synth_data_generator

Generate synthetic data for testing.

Based on: https://www.pymc-marketing.io/en/stable/notebooks/mmm/mmm_example.html

Functions
generate_meridian_data()

Load and process a Meridian-compatible dataset for E2E testing.

The Excel file should be placed at: mmm_eval/data/sample_data/geo_media.xlsx

Returns DataFrame containing Meridian-compatible data with media channels, controls, and response variables

Source code in mmm_eval/data/synth_data_generator.py
def generate_meridian_data():
    """Load and process a Meridian-compatible dataset for E2E testing.

    The Excel file should be placed at: mmm_eval/data/sample_data/geo_media.xlsx

    Returns
        DataFrame containing Meridian-compatible data with media channels, controls, and
        response variables

    """
    # Path to the local Excel file
    excel_path = Path(__file__).parent / "sample_data" / "geo_media.xlsx"

    if not excel_path.exists():
        raise FileNotFoundError(
            f"Meridian sample data file not found at {excel_path}. "
            "Please download the file from "
            "https://github.com/google/meridian/raw/main/meridian/data/simulated_data/xlsx/geo_media.xlsx"
            f"and save it to {excel_path}."
        )

    df = pd.read_excel(excel_path, engine="openpyxl")
    df_mod = df.copy()
    df_mod["revenue"] = df_mod["revenue_per_conversion"] * df_mod["conversions"]

    df_mod = df.iloc[:, 1:]
    # restrict to only two geos
    df_mod = df_mod[df_mod["geo"].isin(["Geo0", "Geo1"])]
    df_mod["revenue"] = df_mod["revenue_per_conversion"] * df_mod["conversions"]
    df_mod = df_mod.drop(columns="revenue_per_conversion")

    # restrict to only post-2023
    df_mod = df_mod[pd.to_datetime(df_mod["time"]) > pd.Timestamp("2023-01-01")]
    df_mod = df_mod.rename(columns={"time": "date"})
    return df_mod
generate_pymc_data()

Generate synthetic MMM data for testing purposes.

Returns DataFrame containing synthetic MMM data with media channels, controls, and response variables

Source code in mmm_eval/data/synth_data_generator.py
def generate_pymc_data():
    """Generate synthetic MMM data for testing purposes.

    Returns
        DataFrame containing synthetic MMM data with media channels, controls, and response variables

    """
    seed: int = sum(map(ord, "mmm"))
    rng: np.random.Generator = np.random.default_rng(seed=seed)

    # date range
    min_date = pd.to_datetime("2018-04-01")
    max_date = pd.to_datetime("2021-09-01")

    df = pd.DataFrame(data={"date_week": pd.date_range(start=min_date, end=max_date, freq="W-MON")}).assign(
        year=lambda x: x["date_week"].dt.year,
        month=lambda x: x["date_week"].dt.month,
        dayofyear=lambda x: x["date_week"].dt.dayofyear,
    )

    n = df.shape[0]

    # media spend data
    channel_1 = 100 * rng.uniform(low=0.0, high=1, size=n)
    df["channel_1"] = np.where(channel_1 > 90, channel_1, channel_1 / 2)

    channel_2 = 100 * rng.uniform(low=0.0, high=1, size=n)
    df["channel_2"] = np.where(channel_2 > 80, channel_2, 0)

    # apply geometric adstock transformation
    alpha1: float = 0.4
    alpha2: float = 0.2

    df["channel_1_adstock"] = (
        geometric_adstock(x=df["channel_1"].to_numpy(), alpha=alpha1, l_max=8, normalize=True).eval().flatten()
    )

    df["channel_2_adstock"] = (
        geometric_adstock(x=df["channel_2"].to_numpy(), alpha=alpha2, l_max=8, normalize=True).eval().flatten()
    )

    # apply saturation transformation
    lam1: float = 4.0
    lam2: float = 3.0

    df["channel_1_adstock_saturated"] = logistic_saturation(x=df["channel_1_adstock"].to_numpy(), lam=lam1).eval()

    df["channel_2_adstock_saturated"] = logistic_saturation(x=df["channel_2_adstock"].to_numpy(), lam=lam2).eval()

    # trend + seasonal
    df["trend"] = (np.linspace(start=0.0, stop=50, num=n) + 10) ** (1 / 4) - 1

    df["cs"] = -np.sin(2 * 2 * np.pi * df["dayofyear"] / 365.5)
    df["cc"] = np.cos(1 * 2 * np.pi * df["dayofyear"] / 365.5)
    df["seasonality"] = 0.5 * (df["cs"] + df["cc"])

    # controls
    df["event_1"] = (df["date_week"] == "2019-05-13").astype(float)
    df["event_2"] = (df["date_week"] == "2020-09-14").astype(float)

    # generate quantity
    df["intercept"] = 1000.0  # Base quantity
    # noise
    df["epsilon"] = rng.normal(loc=0.0, scale=50.0, size=n)

    # amplitude = 1
    beta_1 = 400
    beta_2 = 150

    # Generate price with seasonal fluctuations
    base_price = 5
    price_seasonality = 0.03 * (df["cs"] + df["cc"])
    price_trend = np.linspace(0, 2, n)  # Gradual price increase
    df["price"] = base_price + price_seasonality + price_trend

    df["quantity"] = (
        df["intercept"]
        + df["trend"] * 100
        + df["seasonality"] * 200
        + df["price"] * -50
        + 150 * df["event_1"]
        + 250 * df["event_2"]
        + beta_1 * df["channel_1_adstock_saturated"]
        + beta_2 * df["channel_2_adstock_saturated"]
        + df["epsilon"]
    )
    # Calculate revenue
    df["revenue"] = df["price"] * df["quantity"]

    columns_to_keep = [
        "date_week",
        "quantity",
        "price",
        "revenue",
        "channel_1",
        "channel_2",
        "event_1",
        "event_2",
        "dayofyear",
    ]

    df = df[columns_to_keep]
    return df

validation

Data validation for MMM evaluation.

Classes
DataValidator(framework: str, date_column: str, response_column: str, revenue_column: str, control_columns: list[str] | None, min_number_observations: int = DataPipelineConstants.MIN_NUMBER_OBSERVATIONS)

Validator for MMM data with configurable validation rules.

Initialize validator with validation rules.

Parameters:

Name Type Description Default
framework str

a supported framework, one of pymc_marketing or meridian

required
date_column str

Name of the date column

required
response_column str

Name of the response column

required
revenue_column str

Name of the revenue column

required
control_columns list[str] | None

List of control columns

required
min_number_observations int

Minimum required number of observations for time series CV

MIN_NUMBER_OBSERVATIONS
Source code in mmm_eval/data/validation.py
def __init__(
    self,
    framework: str,
    date_column: str,
    response_column: str,
    revenue_column: str,
    control_columns: list[str] | None,
    min_number_observations: int = DataPipelineConstants.MIN_NUMBER_OBSERVATIONS,
):
    """Initialize validator with validation rules.

    Args:
        framework: a supported framework, one of `pymc_marketing` or `meridian`
        date_column: Name of the date column
        response_column: Name of the response column
        revenue_column: Name of the revenue column
        control_columns: List of control columns
        min_number_observations: Minimum required number of observations for time series CV

    """
    self.framework = framework
    self.date_column = date_column
    self.response_column = response_column
    self.revenue_column = revenue_column
    self.min_number_observations = min_number_observations
    self.control_columns = control_columns
Functions
run_validations(df: pd.DataFrame) -> None

Run all validations on the DataFrame.

Parameters:

Name Type Description Default
df DataFrame

Input DataFrame

required

Returns:

Type Description
None

Validation result with all errors and warnings

Source code in mmm_eval/data/validation.py
def run_validations(self, df: pd.DataFrame) -> None:
    """Run all validations on the DataFrame.

    Args:
        df: Input DataFrame

    Returns:
        Validation result with all errors and warnings

    """
    # Run each validation in order
    self._validate_not_empty(df)
    self._validate_schema(df)
    self._validate_data_size(df)
    self._validate_response_and_revenue_columns_xor_zeroes(df)

    # feature scaling is done automatically in Meridian
    if self.control_columns and self.framework == "pymc_marketing":
        self._check_control_variables_between_0_and_1(df=df, cols=self.control_columns)