Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
)
from metadata.ingestion.ometa.client import APIError
from metadata.ingestion.source.database.databricks.queries import (
DATABRICKS_GET_COLUMN_LINEAGE_FOR_JOB,
DATABRICKS_GET_TABLE_LINEAGE_FOR_JOB,
DATABRICKS_GET_COLUMN_LINEAGE,
DATABRICKS_GET_TABLE_LINEAGE,
)
from metadata.utils.constants import QUERY_WITH_DBT, QUERY_WITH_OM_VERSION
from metadata.utils.helpers import datetime_to_ts
Expand Down Expand Up @@ -73,10 +73,10 @@
"Content-Type": "application/json",
}
self.api_timeout = self.config.connectionTimeout or 120
self._job_table_lineage_executed: bool = False
self.job_table_lineage: dict[str, list[dict[str, str]]] = defaultdict(list)
self._job_column_lineage_executed: bool = False
self.job_column_lineage: dict[
self._entity_table_lineage_executed: bool = False
self.entity_table_lineage: dict[str, list[dict[str, str]]] = defaultdict(list)
self._entity_column_lineage_executed: bool = False
self.entity_column_lineage: dict[
str, dict[Tuple[str, str], list[Tuple[str, str]]]
] = defaultdict(lambda: defaultdict(list))
self.engine = engine
Expand All @@ -101,15 +101,13 @@
with self.engine.connect() as connection:
test_table_lineage = connection.execute(
text(
DATABRICKS_GET_TABLE_LINEAGE_FOR_JOB.format(
lookback_days=lookback_days
)
DATABRICKS_GET_TABLE_LINEAGE.format(lookback_days=lookback_days)
+ " LIMIT 1"
)
)
test_column_lineage = connection.execute(
text(
DATABRICKS_GET_COLUMN_LINEAGE_FOR_JOB.format(
DATABRICKS_GET_COLUMN_LINEAGE.format(
lookback_days=lookback_days
)
+ " LIMIT 1"
Expand All @@ -122,8 +120,8 @@
except Exception as exc:
logger.debug(f"Error testing lineage queries: {traceback.format_exc()}")
raise DatabricksClientException(
f"Failed to test lineage queries Make sure you have access"
"to the tables table_lineage and column_lineage: {exc}"
f"Failed to test lineage queries. Make sure you have access "
f"to the tables table_lineage and column_lineage: {exc}"
)

def _run_query_paginator(self, data, result, end_time, response):
Expand Down Expand Up @@ -280,40 +278,41 @@
logger.debug(traceback.format_exc())
logger.error(exc)

def get_table_lineage(self, job_id: str) -> List[dict[str, str]]:
def get_table_lineage(self, entity_id: str) -> List[dict[str, str]]:
"""
Method returns table lineage for a job by the specified job_id.
On first call, eagerly fetches ALL job lineage in bulk for optimal performance.
Method returns table lineage for a job or pipeline by the specified entity_id.
On first call, eagerly fetches ALL lineage in bulk for optimal performance.
"""
try:
if not self._job_table_lineage_executed:
if not self._entity_table_lineage_executed:
logger.info(
"First lineage request detected - performing bulk lineage fetch for all jobs"
"First lineage request detected - performing bulk lineage fetch for all entities"
)
self.cache_lineage()

# Return cached lineage for this specific job
return self.job_table_lineage.get(str(job_id), [])
return self.entity_table_lineage.get(str(entity_id), [])

except Exception as exc:
logger.debug(
f"Error getting table lineage for job {job_id} due to {traceback.format_exc()}"
f"Error getting table lineage for {entity_id} due to {traceback.format_exc()}"
)
logger.error(exc)
return []

def get_column_lineage(
self, job_id: str, TableKey: Tuple[str, str]
self, entity_id: str, TableKey: Tuple[str, str]

Check warning on line 303 in ingestion/src/metadata/ingestion/source/database/databricks/client.py

View check run for this annotation

SonarQubeCloud / [open-metadata-ingestion] SonarCloud Code Analysis

Rename this parameter "TableKey" to match the regular expression ^[_a-z][a-z0-9_]*$.

See more on https://sonarcloud.io/project/issues?id=open-metadata-ingestion&issues=AZ0oqT3XNWet5u0P6ONX&open=AZ0oqT3XNWet5u0P6ONX&pullRequest=26786
) -> List[Tuple[str, str]]:
"""
Method returns column lineage for a job by the specified job_id and table key
Method returns column lineage for a job or pipeline by the specified entity_id and table key
"""
try:
if not self._job_column_lineage_executed:
logger.debug("Job column lineage not found. Executing cache_lineage...")
if not self._entity_column_lineage_executed:
logger.debug(
"Entity column lineage not found. Executing cache_lineage..."
)
self.cache_lineage()

return self.job_column_lineage.get(str(job_id), {}).get(TableKey, [])
return self.entity_column_lineage.get(str(entity_id), {}).get(TableKey, [])

except Exception as exc:
logger.debug(
Expand All @@ -338,16 +337,16 @@

def cache_lineage(self):
"""
Method caches table and column lineage for ALL jobs.
Method caches table and column lineage for ALL jobs and pipelines.
"""
lookback_days = getattr(self.config, "lineageLookBackDays", 90)
logger.info(f"Caching table lineage (lookback: {lookback_days} days)")
table_lineage = self.run_lineage_query(
DATABRICKS_GET_TABLE_LINEAGE_FOR_JOB.format(lookback_days=lookback_days)
DATABRICKS_GET_TABLE_LINEAGE.format(lookback_days=lookback_days)
)
for row in table_lineage or []:
try:
self.job_table_lineage[row.job_id].append(
self.entity_table_lineage[row.entity_id].append(
{
"source_table_full_name": row.source_table_full_name,
"target_table_full_name": row.target_table_full_name,
Expand All @@ -358,13 +357,11 @@
f"Error parsing row: {row} due to {traceback.format_exc()}"
)
continue
self._job_table_lineage_executed = True
self._entity_table_lineage_executed = True

# Not every job has column lineage, so we need to check if the job exists in the column_lineage table
# we will cache the column lineage for jobs that have column lineage
logger.info(f"Caching column lineage (lookback: {lookback_days} days)")
column_lineage = self.run_lineage_query(
DATABRICKS_GET_COLUMN_LINEAGE_FOR_JOB.format(lookback_days=lookback_days)
DATABRICKS_GET_COLUMN_LINEAGE.format(lookback_days=lookback_days)
)
for row in column_lineage or []:
try:
Expand All @@ -377,14 +374,14 @@
row.target_column_name,
)

self.job_column_lineage[row.job_id][table_key].append(column_pair)
self.entity_column_lineage[row.entity_id][table_key].append(column_pair)

except Exception as exc:
logger.debug(
f"Error parsing row: {row} due to {traceback.format_exc()}"
)
continue
self._job_column_lineage_executed = True
self._entity_column_lineage_executed = True
logger.debug("Table and column lineage caching completed.")

def get_pipeline_details(self, pipeline_id: str) -> Optional[dict]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,28 +87,28 @@

DATABRICKS_DDL = "SHOW CREATE TABLE `{table_name}`"

DATABRICKS_GET_TABLE_LINEAGE_FOR_JOB = """
SELECT
entity_id AS job_id,
DATABRICKS_GET_TABLE_LINEAGE = """
SELECT
entity_id,
source_table_full_name,
target_table_full_name
FROM system.access.table_lineage
WHERE entity_type = 'JOB'
WHERE entity_type IN ('JOB', 'PIPELINE')
AND event_time >= current_date() - INTERVAL {lookback_days} DAYS
AND source_table_full_name IS NOT NULL
AND target_table_full_name IS NOT NULL
GROUP BY entity_id, source_table_full_name, target_table_full_name
"""

DATABRICKS_GET_COLUMN_LINEAGE_FOR_JOB = """
DATABRICKS_GET_COLUMN_LINEAGE = """
SELECT
entity_id as job_id,
entity_id,
source_table_full_name,
source_column_name,
target_table_full_name,
target_column_name
FROM system.access.column_lineage
WHERE entity_type = 'JOB'
WHERE entity_type IN ('JOB', 'PIPELINE')
AND event_time >= current_date() - INTERVAL {lookback_days} DAYS
AND source_table_full_name IS NOT NULL
AND target_table_full_name IS NOT NULL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1225,15 +1225,12 @@ def yield_pipeline_lineage_details(
# Works automatically - no configuration required!
yield from self._yield_kafka_lineage(pipeline_details, pipeline_entity)

if not pipeline_details.job_id:
entity_id = pipeline_details.job_id or pipeline_details.pipeline_id
if not entity_id:
return

table_lineage_list = self.client.get_table_lineage(
job_id=pipeline_details.job_id
)
logger.debug(
f"Processing pipeline lineage for job {pipeline_details.job_id}"
)
table_lineage_list = self.client.get_table_lineage(entity_id=entity_id)
logger.debug(f"Processing pipeline lineage for {entity_id}")
if table_lineage_list:
for table_lineage in table_lineage_list:
source_table_full_name = table_lineage.get("source_table_full_name")
Expand Down Expand Up @@ -1296,7 +1293,7 @@ def yield_pipeline_lineage_details(
processed_column_lineage = (
self._process_and_validate_column_lineage(
column_lineage=self.client.get_column_lineage(
job_id=pipeline_details.job_id,
entity_id=entity_id,
TableKey=(
source_table_full_name,
target_table_full_name,
Expand Down Expand Up @@ -1330,15 +1327,8 @@ def yield_pipeline_lineage_details(
)
)
)

else:
logger.debug(
f"No source or target table full name found for job {pipeline_details.job_id}"
)
else:
logger.debug(
f"No table lineage found for job {pipeline_details.job_id}"
)
logger.debug(f"No table lineage found for {entity_id}")
except Exception as exc:
yield Either(
left=StackTraceError(
Expand Down
Loading
Loading