Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 36 additions & 2 deletions .github/workflows/build_and_publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,47 @@ jobs:

- name: Run tests with coverage
run: |
pytest test --cov=src/datarepo --cov-report=term-missing
pytest test/functional --cov=src/datarepo --cov-report=term-missing

code-quality:
integration-tests:
needs: run-tests
if: github.event_name == 'pull_request' || github.event_name == 'workflow_dispatch'
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3

- name: Set up Python 3.10
uses: actions/setup-python@v4
with:
python-version: '3.10'

- name: Start Docker Compose
run: |
docker compose -f test/integration/resources/docker-compose.yml up -d --wait

- name: Install dependencies
run: |
python3.10 -m pip install --upgrade pip
python3.10 -m pip install --no-cache-dir -e ".[dev]"
env:
SKIP_STATIC_SITE_BUILD: "true"

- name: Run integration tests
run: |
pytest -rx test/integration
env:
CLICKHOUSE_HOST: localhost
CLICKHOUSE_PORT: 9000

- name: Stop Docker Compose
if: always()
run: docker compose -f test/integration/resources/docker-compose.yml down -v

code-quality:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3

- name: Set up Python 3.10
uses: actions/setup-python@v4
with:
Expand Down
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ dependencies = [
"deltalake>=0.25.4, <1", # need to upgrade with https://delta-io.github.io/delta-rs/upgrade-guides/guide-1.0.0/
"polars>=1.9.0, <1.14.0", # 1.15 fails due to hive_paritioning error, https://github.com/pola-rs/polars/pull/19902, 1.14 has a CI failure that needs investigation
"pyarrow>=17.0.0",
"ipython>=8.5.0"
"ipython>=8.5.0",
"ibis-framework[clickhouse]>=10.8.0",
]

[project.optional-dependencies]
Expand All @@ -39,6 +40,7 @@ dev = [
"mypy>=1.0.0",
"boto3-stubs>=1.39.0",
"pyarrow-stubs>=20.0.0",
"clickhouse-driver>=0.2.9",
]

[tool.hatch.build]
Expand Down
10 changes: 2 additions & 8 deletions src/datarepo/core/tables/clickhouse_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import logging
import polars as pl
import pyarrow as pa
import ibis # type: ignore[import-untyped]

from datarepo.core.dataframe import NlkDataFrame
from datarepo.core.tables.filters import Filter, InputFilters, normalize_filters
Expand Down Expand Up @@ -263,12 +264,5 @@ def __call__( # type: ignore[override]
NlkDataFrame: A lazy Polars DataFrame with the requested data.
"""
query = self._build_query(filters, columns)

# use polars read database_uri to read the result of the query
df = pl.read_database_uri(
query=query,
uri=self.uri,
engine="connectorx",
)

df = ibis.connect(self.uri).sql(query).to_polars()
return df.lazy()
Empty file added test/functional/__init__.py
Empty file.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
)
from datarepo.core.catalog import Catalog
from datarepo.export.roapi import export_to_roapi_tables
from test.roapi.data import database
from test.functional.roapi.data import database


@table
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import pytest
from unittest.mock import patch, MagicMock
from unittest.mock import patch
import polars as pl
import pyarrow as pa

Expand Down Expand Up @@ -114,9 +114,9 @@ def test_filter_operators(
)
assert query == expected_query

@patch("polars.read_database_uri")
@patch("ibis.connect")
def test_call_with_no_filters(
self, mock_read_database_uri, clickhouse_table: ClickHouseTable
self, mock_connection, clickhouse_table: ClickHouseTable
):
"""Test calling the table with no filters."""
mock_df = pl.DataFrame(
Expand All @@ -126,24 +126,21 @@ def test_call_with_no_filters(
"value": [10, 20, 30],
}
)
mock_read_database_uri.return_value = mock_df
mock_connect_obj = mock_connection.return_value
mock_sql_obj = mock_connect_obj.sql.return_value
mock_sql_obj.to_polars.return_value.lazy.return_value.collect.return_value = mock_df

result = clickhouse_table().collect()

mock_read_database_uri.assert_called_once()
call_args = mock_read_database_uri.call_args[1]
assert call_args["query"] == "SELECT * FROM `test_db`.`test_table` "
assert (
call_args["uri"]
== "clickhouse://test_user:test_password@localhost:8443/test_db"
mock_connection.assert_called_once_with("clickhouse://test_user:test_password@localhost:8443/test_db")
mock_connect_obj.sql.assert_called_once_with(
"SELECT * FROM `test_db`.`test_table` "
)
assert call_args["engine"] == "connectorx"

assert result.equals(mock_df)

@patch("polars.read_database_uri")
@patch("ibis.connect")
def test_call_with_filters_and_columns(
self, mock_read_database_uri: str, clickhouse_table: ClickHouseTable
self, mock_connection, clickhouse_table: ClickHouseTable
):
"""Test calling the table with filters and columns."""
mock_df = pl.DataFrame(
Expand All @@ -152,29 +149,23 @@ def test_call_with_filters_and_columns(
"value": [10],
}
)
mock_read_database_uri.return_value = mock_df
mock_connect_obj = mock_connection.return_value
mock_sql_obj = mock_connect_obj.sql.return_value
mock_sql_obj.to_polars.return_value.lazy.return_value.collect.return_value = mock_df

filters = [Filter("implant_id", "=", 1)]
columns = ["implant_id", "value"]
result = clickhouse_table(filters=filters, columns=columns).collect()

mock_read_database_uri.assert_called_once()
call_args = mock_read_database_uri.call_args[1]
assert (
call_args["query"]
== "SELECT `implant_id`, `value` FROM `test_db`.`test_table` WHERE (`implant_id` = 1)"
)
assert (
call_args["uri"]
== "clickhouse://test_user:test_password@localhost:8443/test_db"
mock_connection.assert_called_once_with("clickhouse://test_user:test_password@localhost:8443/test_db")
mock_connect_obj.sql.assert_called_once_with(
"SELECT `implant_id`, `value` FROM `test_db`.`test_table` WHERE (`implant_id` = 1)"
)
assert call_args["engine"] == "connectorx"

assert result.equals(mock_df)

@patch("polars.read_database_uri")
@patch("ibis.connect")
def test_call_handles_empty_results(
self, mock_read_database_uri: str, clickhouse_table: ClickHouseTable
self, mock_connection, clickhouse_table: ClickHouseTable
):
"""Test that the table handles empty results correctly."""
mock_df = pl.DataFrame(
Expand All @@ -184,7 +175,7 @@ def test_call_handles_empty_results(
"value": pl.Int64,
}
)
mock_read_database_uri.return_value = mock_df
mock_connection.return_value.sql.return_value.to_polars.return_value = mock_df

filters = [Filter("implant_id", "=", 999)]
result = clickhouse_table(filters=filters).collect()
Expand Down
File renamed without changes.
File renamed without changes.
6 changes: 3 additions & 3 deletions test/test_core.py → test/functional/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@

from datarepo.core import ModuleDatabase
from datarepo.core.catalog import Catalog
from test.data import database, database2
from test.data.database2 import frame3
from test.data.database import frame1, frame2
from test.functional.data import database, database2
from test.functional.data.database2 import frame3
from test.functional.data.database import frame1, frame2


class TestCore(unittest.TestCase):
Expand Down
18 changes: 18 additions & 0 deletions test/integration/resources/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
version: '3.9'

services:
clickhouse:
image: clickhouse/clickhouse-server:latest
ports:
- "8123:8123"
- "9000:9000"
- "9004:9004"
environment:
CLICKHOUSE_USER: chuser
CLICKHOUSE_PASSWORD: chpass
CLICKHOUSE_DB: testdb
healthcheck:
test: ["CMD", "wget", "--no-verbose", "--tries=1", "--spider", "http://localhost:8123/ping"]
interval: 3s
timeout: 5s
retries: 5
121 changes: 121 additions & 0 deletions test/integration/test_clickhouse_table_int.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import os
import pytest
import pyarrow as pa

import clickhouse_driver
from datarepo.core.tables.clickhouse_table import ClickHouseTable, ClickHouseTableConfig
from datarepo.core.tables.filters import Filter


@pytest.fixture(scope="module")
def clickhouse_config() -> ClickHouseTableConfig:
"""Create a ClickHouseTableConfig for the container."""
# Get connection details from environment or use defaults
client = clickhouse_driver.Client.from_url(f"clickhouse://chuser:chpass@localhost:9000/testdb")
client.execute("""
CREATE TABLE IF NOT EXISTS testdb.test_table (
implant_id Int64,
date String,
value Int64,
str_value String
) ENGINE = MergeTree()
ORDER BY (implant_id, date)
""")

# Insert test data
client.execute("""
INSERT INTO testdb.test_table
(implant_id, date, value, str_value)
VALUES
(1, '2023-01-01', 100, 'alpha'),
(1, '2023-01-02', 110, 'beta'),
(2, '2023-01-01', 200, 'gamma'),
(2, '2023-01-02', 210, 'delta'),
(3, '2023-01-03', 300, 'epsilon')
""")

return ClickHouseTableConfig(
host="localhost",
port="8123",
username="chuser",
password="chpass",
database="testdb",
)


@pytest.fixture(scope="module")
def clickhouse_table(clickhouse_config: ClickHouseTableConfig) -> ClickHouseTable:
"""Create a ClickHouseTable for testing."""
return ClickHouseTable(
name="test_table",
schema=pa.schema([
("implant_id", pa.int64()),
("date", pa.string()),
("value", pa.int64()),
("str_value", pa.string()),
]),
config=clickhouse_config,
description="Test ClickHouse table",
)


class TestClickHouseTableIntegration:
def test_query_all_data(self, clickhouse_table: ClickHouseTable):
"""Test querying all data from the table."""
df = clickhouse_table().collect()

assert df.height == 5
assert df.width == 4

assert set(df.columns) == {"implant_id", "date", "value", "str_value"}

assert df["implant_id"].to_list() == [1, 1, 2, 2, 3]

def test_filter_by_equality(self, clickhouse_table: ClickHouseTable):
"""Test filtering by equality."""
df = clickhouse_table(filters=[Filter("implant_id", "=", 1)]).collect()

assert df.height == 2
assert all(id == 1 for id in df["implant_id"])

def test_filter_by_comparison(self, clickhouse_table: ClickHouseTable):
"""Test filtering by comparison."""
df = clickhouse_table(filters=[Filter("value", ">", 200)]).collect()

assert df.height == 2
assert all(val > 200 for val in df["value"])

def test_filter_by_in(self, clickhouse_table: ClickHouseTable):
"""Test filtering using IN operator."""
df = clickhouse_table(filters=[Filter("implant_id", "in", [1, 3])]).collect()

assert df.height == 3
assert set(df["implant_id"].unique().to_list()) == {1, 3}

def test_select_columns(self, clickhouse_table: ClickHouseTable):
"""Test selecting specific columns."""
df = clickhouse_table(columns=["implant_id", "value"]).collect()

assert df.width == 2
assert set(df.columns) == {"implant_id", "value"}

def test_combined_filters(self, clickhouse_table: ClickHouseTable):
"""Test combining multiple filters."""
df = clickhouse_table(filters=[[Filter("implant_id", "=", 1)],
[Filter("date", "=", "2023-01-03")]]).collect()

assert df.height == 3

df = clickhouse_table(filters=[[Filter("implant_id", "=", 1),
Filter("date", "=", "2023-01-01")]]).collect()

assert df.height == 1
assert df["implant_id"][0] == 1
assert df["date"][0] == "2023-01-01"

def test_string_like_filter(self, clickhouse_table: ClickHouseTable):
"""Test LIKE filter for string matching."""
df = clickhouse_table(filters=[Filter("str_value", "contains", "%lta%")]).collect()

assert df.height == 1
assert df["str_value"][0] == "delta"