Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
201 changes: 200 additions & 1 deletion haferml/transforms/dataframe.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from __future__ import annotations
from abc import ABC, abstractmethod
import pandas as pd
from typing import List, Union
from typing import List, Union, Callable, Any
from loguru imort logger
Comment thread
emptymalei marked this conversation as resolved.
Outdated


class TransformBase(ABC):
Expand Down Expand Up @@ -50,3 +51,201 @@ def __call__(self, dataframe: pd.DataFrame) -> pd.DataFrame:
dataframe = t(dataframe)

return dataframe


class Identity(TransformBase):
"""Returns the original dataframe

This is useful when suming up a lot of transformations.
Comment thread
emptymalei marked this conversation as resolved.
Outdated

For example, if I have a list of `TransformBase` transformations

```
my_transformations = [transform_1, transform_2, transform_3]
```

```python
transform = sum(my_transformations, Identity())
```

`transform` will be the chained transformation.
"""

def __init__(self):
logger.warning("This transformation does nothing")

def __call__(self, dataframe: pd.DataFrame) -> pd.DataFrame:
logger.info("Returning the original dataframe")
return dataframe


class Shuffle(TransformBase):
"""Returns a shuffled dataframe"""

def __call__(self, dataframe: pd.DataFrame) -> pd.DataFrame:
logger.info("Returning the original dataframe")
return dataframe
Comment thread
emptymalei marked this conversation as resolved.


class ConvertCategoricalType(TransformBase):
"""Convert a column to categorical

:param dt_column: the original datatime column
Comment thread
emptymalei marked this conversation as resolved.
Outdated
:param target_column: the column to write to.
Default is to overwrite original dt_column
"""

def __init__(self, column_name: str, target_column: Optional[str] = None):
Copy link

Copilot AI Aug 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The type hint Optional[str] is used but Optional is not imported. You need to add Optional to the imports from typing.

Copilot uses AI. Check for mistakes.
self.column_name = column_name
if target_column is None:
target_column = column_name

self.target_column = target_column

def __call__(self, dataframe: pd.DataFrame) -> pd.DataFrame:
logger.info(f"converting {self.column_name} to categorical ...")
dataframe[self.target_column] = dataframe[self.column_name].astype("category")
self.categories = dataframe[self.target_column].cat.categories
dataframe[self.target_column] = dataframe[self.target_column].cat.codes
logger.info(f"converted {self.column_name} to categorical!")

return dataframe


class ReplaceValues(TransformBase):
"""Replace some certain values with the specified value

```python
lambda_filter = lambda x: x["indicator_column"] == "bad_value"

replace_val = ReplaceValues(
lambda_filter = lambda_filter,
column_to_replace = "value_a_column",
replacement_value = np.nan
)
```

:param lambda_filter: a callable that specifies which row to filter
:param column_to_replace: which column to replace values with
:param replacement_value: the value to replace with
"""
def __init__(
self, lambda_filter: Callable,
column_to_replace: str,
replacement_value: Optional[Any] = None
Copy link

Copilot AI Aug 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The type hint Optional[Any] is used but Optional is not imported. You need to add Optional to the imports from typing.

Copilot uses AI. Check for mistakes.
):
self.lambda_filter = lambda_filter
self.column_to_replace = column_to_replace
self.replacement_value = replacement_value

def __call__(self, dataframe: pd.DataFrame) -> pd.DataFrame:
logger.info(f"replace values in {self.column_to_replace}")
dataframe.loc[lambda x: self.lambda_filter(x), self.column_to_replace] = self.replacement_value
return dataframe


class AddColumnWithCondition(TransformBase):
"""Add a calculated column based on a lambda function

```python
lambda_filter = lambda x: x["indicator_column"] == "bad_value"

replace_val = ReplaceValues(
lambda_filter = lambda_filter,
column_to_replace = "value_a_column",
replacement_value = np.nan
)
```

:param lambda_filter: a callable that specifies which row to filter
Copy link

Copilot AI Aug 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The parameter documentation refers to lambda_filter but the actual parameter name is lambda_compute. The documentation should match the parameter name.

Suggested change
:param lambda_filter: a callable that specifies which row to filter
lambda_compute = lambda x: x["indicator_column"] == "bad_value"
replace_val = ReplaceValues(
lambda_compute = lambda_compute,
column_to_replace = "value_a_column",
replacement_value = np.nan
)
```
:param lambda_compute: a callable that specifies which row to filter

Copilot uses AI. Check for mistakes.
:param column_to_replace: which column to replace values with
Copy link

Copilot AI Aug 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The parameter documentation refers to column_to_replace but the actual parameter name is target_column. The documentation should match the parameter name.

Suggested change
:param column_to_replace: which column to replace values with
:param target_column: which column to replace values with

Copilot uses AI. Check for mistakes.
:param replacement_value: the value to replace with
Copy link

Copilot AI Aug 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docstring example shows ReplaceValues but this is the AddColumnWithCondition class. The example should demonstrate AddColumnWithCondition usage instead.

Suggested change
:param replacement_value: the value to replace with
Example:
```python
# Suppose you want to add a column "is_bad" that is True if "indicator_column" == "bad_value"
lambda_compute = lambda x: x["indicator_column"] == "bad_value"
add_col = AddColumnWithCondition(
lambda_compute=lambda_compute,
target_column="is_bad"
)
df = add_col(df)
```
:param lambda_compute: a callable that computes the value for each row (applied with DataFrame.apply, axis=1)
:param target_column: the name of the column to add or overwrite

Copilot uses AI. Check for mistakes.
Copy link

Copilot AI Aug 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The parameter documentation refers to replacement_value but this parameter doesn't exist in the AddColumnWithCondition class. This documentation appears to be copied from another class.

Suggested change
:param replacement_value: the value to replace with
"""Add a calculated column based on a lambda function.
Example:
```python
# Adds a new column 'is_bad' based on a condition
lambda_compute = lambda x: x["indicator_column"] == "bad_value"
add_col = AddColumnWithCondition(
lambda_compute=lambda_compute,
target_column="is_bad"
)
```
:param lambda_compute: a callable that computes the value for each row
:param target_column: the name of the column to add or overwrite

Copilot uses AI. Check for mistakes.
"""
def __init__(
self, lambda_compute: Callable,
target_column: str,
):
self.lambda_compute = lambda_compute
self.target_column = target_column

def __call__(self, dataframe: pd.DataFrame) -> pd.DataFrame:
logger.info(f"adding column {self.target_column}")
dataframe[self.target_column] = dataframe.apply(self.lambda_compute, axis=1)
return dataframe


class ExpandJSONValues(TransformBase):
"""Expand values for columns containing JSON objects

:param column_names: the columns to expand
:param json_key: the key to extract from the JSON objects
"""

def __init__(self, column_names: list[str], json_key: str):
if isinstance(column_names, str):
column_names = [column_names]
self.column_names = column_names

self.json_key = json_key

def __call__(self, dataframe: pd.DataFrame) -> pd.DataFrame:
logger.info("Extracting from JSON values")

return dataframe.assign(
**{
k: dataframe.apply(
lambda x: (
x[k].get(self.json_key) if isinstance(x[k], dict) else x[k]
),
axis=1,
)
for k in self.column_names
}
)


class Convert2Timestamp(TransformBase):
"""Convert column to datetime"""

def __init__(self, column_name: str):
self.column_name = column_name

def __call__(self, dataframe: pd.DataFrame) -> pd.DataFrame:
logger.info(f"Converting {self.column_name} to datetime...")
dataframe[self.column_name] = pd.to_datetime(dataframe[self.column_name])
return dataframe


class SortbyColumn(TransformBase):
"""Sort dataframe based on column"""

def __init__(self, column_name: str, ascending: bool = True):
self.column_name = column_name
self.ascending = ascending

def __call__(self, dataframe: pd.DataFrame) -> pd.DataFrame:
logger.info(f"Sorting column by {self.column_name} ...")

return dataframe.sort_values(by=self.column_name, ascending=self.ascending)


class RollingMedian(TransformBase):
"""rolling mean based on column"""
Comment thread
emptymalei marked this conversation as resolved.
Outdated

def __init__(self, column_names: str, window_size: int, min_periods: int = 1):
self.column_names = column_names
self.window_size = window_size
self.min_periods = min_periods

def __call__(self, dataframe: pd.DataFrame) -> pd.DataFrame:
logger.info(f"Rolling median column by {self.column_names} ...")
non_transformed_cols = list(set(dataframe.columns) - set(self.column_names))
return pd.merge(
dataframe[non_transformed_cols],
dataframe[self.column_names]
.rolling(self.window_size, center=False, min_periods=self.min_periods)
.median(),
left_index=True,
right_index=True,
how="left",
)
Loading