Skip to content

feat: New Reporting streams for Klaviyo#175

Open
mpabraham wants to merge 13 commits intomainfrom
feature/new-klaviyo-streams
Open

feat: New Reporting streams for Klaviyo#175
mpabraham wants to merge 13 commits intomainfrom
feature/new-klaviyo-streams

Conversation

@mpabraham
Copy link
Copy Markdown

No description provided.

@mpabraham mpabraham requested a review from a team as a code owner April 14, 2026 10:26
@mpabraham mpabraham changed the title New Reporting streams for Klaviyo feat: New Reporting streams for Klaviyo Apr 14, 2026
@mpabraham mpabraham force-pushed the feature/new-klaviyo-streams branch from b972928 to 00c1081 Compare April 17, 2026 09:26
Copy link
Copy Markdown
Member

@edgarrmondragon edgarrmondragon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't a complete review, but hopefully it leads to a refactor that results in significantly less code to review.

Comment thread tap_klaviyo/streams.py
Comment on lines +29 to +44
def _get_report_config_value(config: Mapping[str, Any], key: str) -> dict[str, Any]:
value = config.get(key, {})
if value is None:
return {}
if isinstance(value, dict):
return value
if isinstance(value, str):
parsed_value = json.loads(value)
if isinstance(parsed_value, dict):
return parsed_value
msg = f"Expected '{key}' JSON to decode to an object."
raise TypeError(msg)
msg = f"Expected '{key}' to be an object or JSON string."
raise TypeError(msg)


Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def _get_report_config_value(config: Mapping[str, Any], key: str) -> dict[str, Any]:
value = config.get(key, {})
if value is None:
return {}
if isinstance(value, dict):
return value
if isinstance(value, str):
parsed_value = json.loads(value)
if isinstance(parsed_value, dict):
return parsed_value
msg = f"Expected '{key}' JSON to decode to an object."
raise TypeError(msg)
msg = f"Expected '{key}' to be an object or JSON string."
raise TypeError(msg)

this doesn't seem to be used anywhere

Comment thread tap_klaviyo/streams.py
Comment on lines +314 to +323
def __init__(
self,
tap: Tap,
*,
report_config: dict[str, Any] | None = None,
report_name: str | None = None,
) -> None:
"""Initialize the stream with an optional report config and name."""
self._report_config = report_config
super().__init__(tap=tap, name=report_name or self.name)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def __init__(
self,
tap: Tap,
*,
report_config: dict[str, Any] | None = None,
report_name: str | None = None,
) -> None:
"""Initialize the stream with an optional report config and name."""
self._report_config = report_config
super().__init__(tap=tap, name=report_name or self.name)
def __init__(
self,
tap: Tap,
*,
report_config: dict[str, Any],
report_name: str,
) -> None:
"""Initialize the stream with an optional report config and name."""
self._report_config = report_config
super().__init__(tap=tap, name=report_name)

This applies to all the new streams that follow this style.

Comment thread tap_klaviyo/streams.py
# describing details such as the statistics to compute, the
# interval, and the timeframe.
#
config = self._report_config or {}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see above

Suggested change
config = self._report_config or {}
config = self._report_config

Comment thread tap_klaviyo/tap.py
Comment on lines +54 to +66
def _named_interval_report_config_type() -> th.AnyOf:
report_object = th.ObjectType(
th.Property("name", th.StringType),
th.Property("statistics", th.ArrayType(th.StringType)),
th.Property("interval", th.StringType),
th.Property(
"timeframe",
th.ObjectType(
th.Property("key", th.StringType),
),
),
)
return th.AnyOf(th.ArrayType(report_object), th.StringType, th.NullType)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious what the benefits of supporting a JSON string form of the array are. Both Meltano and the SDK will parse the JSON string if it's coming from the environment.

With that said, could we make this a simpler type? That way we can get rid of a lot of the validation code needed for the case when a string is passed. For example:

Suggested change
def _named_interval_report_config_type() -> th.AnyOf:
report_object = th.ObjectType(
th.Property("name", th.StringType),
th.Property("statistics", th.ArrayType(th.StringType)),
th.Property("interval", th.StringType),
th.Property(
"timeframe",
th.ObjectType(
th.Property("key", th.StringType),
),
),
)
return th.AnyOf(th.ArrayType(report_object), th.StringType, th.NullType)
def _named_interval_report_config_type() -> th.ArrayType:
report_object = th.ObjectType(
th.Property("name", th.StringType, required=True),
th.Property("statistics", th.ArrayType(th.StringType)),
th.Property("interval", th.StringType),
th.Property(
"timeframe",
th.ObjectType(
th.Property("key", th.StringType),
),
),
)
return th.ArrayType(report_object, nullable=True)

and ditto for the other config schemas.

Comment thread tap_klaviyo/streams.py
http_method = "POST"
# tell the base class to send the prepared payload as JSON
payload_as_json = True
_schema_path = resources.files(schemas).joinpath("segment_series_report.json")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As long as the stream name matches the file name without the extension, you don't need this. See the older streams.

Suggested change
_schema_path = resources.files(schemas).joinpath("segment_series_report.json")

Comment thread tap_klaviyo/streams.py
Comment on lines +325 to +329
@property
def schema(self) -> dict[str, Any]: # type: ignore[override]
"""Return the shared schema for all named segment series streams."""
return cast("dict[str, Any]", json.loads(self._schema_path.read_text(encoding="utf-8")))

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above

Suggested change
@property
def schema(self) -> dict[str, Any]: # type: ignore[override]
"""Return the shared schema for all named segment series streams."""
return cast("dict[str, Any]", json.loads(self._schema_path.read_text(encoding="utf-8")))

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants