Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,10 @@ def __init__(
"Content-Type": "application/json",
}
self.api_timeout = self.config.connectionTimeout or 120
self._job_table_lineage_executed: bool = False
self.job_table_lineage: dict[str, list[dict[str, str]]] = defaultdict(list)
self._job_column_lineage_executed: bool = False
self.job_column_lineage: dict[
self._entity_table_lineage_executed: bool = False
self.entity_table_lineage: dict[str, list[dict[str, str]]] = defaultdict(list)
self._entity_column_lineage_executed: bool = False
self.entity_column_lineage: dict[
str, dict[Tuple[str, str], list[Tuple[str, str]]]
] = defaultdict(lambda: defaultdict(list))
self.engine = engine
Expand Down Expand Up @@ -122,8 +122,8 @@ def test_lineage_query(self) -> None:
except Exception as exc:
logger.debug(f"Error testing lineage queries: {traceback.format_exc()}")
raise DatabricksClientException(
f"Failed to test lineage queries Make sure you have access"
"to the tables table_lineage and column_lineage: {exc}"
f"Failed to test lineage queries. Make sure you have access "
f"to the tables table_lineage and column_lineage: {exc}"
)

def _run_query_paginator(self, data, result, end_time, response):
Expand Down Expand Up @@ -280,40 +280,41 @@ def get_job_runs(self, job_id) -> List[dict]:
logger.debug(traceback.format_exc())
logger.error(exc)

def get_table_lineage(self, job_id: str) -> List[dict[str, str]]:
def get_table_lineage(self, entity_id: str) -> List[dict[str, str]]:
"""
Method returns table lineage for a job by the specified job_id.
On first call, eagerly fetches ALL job lineage in bulk for optimal performance.
Method returns table lineage for a job or pipeline by the specified entity_id.
On first call, eagerly fetches ALL lineage in bulk for optimal performance.
"""
try:
if not self._job_table_lineage_executed:
if not self._entity_table_lineage_executed:
logger.info(
"First lineage request detected - performing bulk lineage fetch for all jobs"
)
self.cache_lineage()

# Return cached lineage for this specific job
return self.job_table_lineage.get(str(job_id), [])
return self.entity_table_lineage.get(str(entity_id), [])

except Exception as exc:
logger.debug(
f"Error getting table lineage for job {job_id} due to {traceback.format_exc()}"
f"Error getting table lineage for {entity_id} due to {traceback.format_exc()}"
)
logger.error(exc)
return []

def get_column_lineage(
self, job_id: str, TableKey: Tuple[str, str]
self, entity_id: str, TableKey: Tuple[str, str]
) -> List[Tuple[str, str]]:
"""
Method returns column lineage for a job by the specified job_id and table key
Method returns column lineage for a job or pipeline by the specified entity_id and table key
"""
try:
if not self._job_column_lineage_executed:
logger.debug("Job column lineage not found. Executing cache_lineage...")
if not self._entity_column_lineage_executed:
logger.debug(
"Entity column lineage not found. Executing cache_lineage..."
)
self.cache_lineage()

return self.job_column_lineage.get(str(job_id), {}).get(TableKey, [])
return self.entity_column_lineage.get(str(entity_id), {}).get(TableKey, [])

except Exception as exc:
logger.debug(
Expand All @@ -338,7 +339,7 @@ def run_lineage_query(self, query: str) -> List[dict]:

def cache_lineage(self):
"""
Method caches table and column lineage for ALL jobs.
Method caches table and column lineage for ALL jobs and pipelines.
"""
lookback_days = getattr(self.config, "lineageLookBackDays", 90)
logger.info(f"Caching table lineage (lookback: {lookback_days} days)")
Expand All @@ -347,7 +348,7 @@ def cache_lineage(self):
)
for row in table_lineage or []:
try:
self.job_table_lineage[row.job_id].append(
self.entity_table_lineage[row.job_id].append(
{
"source_table_full_name": row.source_table_full_name,
"target_table_full_name": row.target_table_full_name,
Expand All @@ -358,10 +359,8 @@ def cache_lineage(self):
f"Error parsing row: {row} due to {traceback.format_exc()}"
)
continue
self._job_table_lineage_executed = True
self._entity_table_lineage_executed = True

# Not every job has column lineage, so we need to check if the job exists in the column_lineage table
# we will cache the column lineage for jobs that have column lineage
logger.info(f"Caching column lineage (lookback: {lookback_days} days)")
column_lineage = self.run_lineage_query(
DATABRICKS_GET_COLUMN_LINEAGE_FOR_JOB.format(lookback_days=lookback_days)
Expand All @@ -377,14 +376,14 @@ def cache_lineage(self):
row.target_column_name,
)

self.job_column_lineage[row.job_id][table_key].append(column_pair)
self.entity_column_lineage[row.job_id][table_key].append(column_pair)

except Exception as exc:
logger.debug(
f"Error parsing row: {row} due to {traceback.format_exc()}"
)
continue
self._job_column_lineage_executed = True
self._entity_column_lineage_executed = True
logger.debug("Table and column lineage caching completed.")

def get_pipeline_details(self, pipeline_id: str) -> Optional[dict]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@
source_table_full_name,
target_table_full_name
FROM system.access.table_lineage
WHERE entity_type = 'JOB'
WHERE entity_type IN ('JOB', 'PIPELINE')
AND event_time >= current_date() - INTERVAL {lookback_days} DAYS
AND source_table_full_name IS NOT NULL
AND target_table_full_name IS NOT NULL
Expand All @@ -108,7 +108,7 @@
target_table_full_name,
target_column_name
FROM system.access.column_lineage
WHERE entity_type = 'JOB'
WHERE entity_type IN ('JOB', 'PIPELINE')
AND event_time >= current_date() - INTERVAL {lookback_days} DAYS
AND source_table_full_name IS NOT NULL
AND target_table_full_name IS NOT NULL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1225,15 +1225,12 @@ def yield_pipeline_lineage_details(
# Works automatically - no configuration required!
yield from self._yield_kafka_lineage(pipeline_details, pipeline_entity)

if not pipeline_details.job_id:
entity_id = pipeline_details.job_id or pipeline_details.pipeline_id
if not entity_id:
return

table_lineage_list = self.client.get_table_lineage(
job_id=pipeline_details.job_id
)
logger.debug(
f"Processing pipeline lineage for job {pipeline_details.job_id}"
)
table_lineage_list = self.client.get_table_lineage(entity_id=entity_id)
logger.debug(f"Processing pipeline lineage for {entity_id}")
if table_lineage_list:
for table_lineage in table_lineage_list:
source_table_full_name = table_lineage.get("source_table_full_name")
Expand Down Expand Up @@ -1296,7 +1293,7 @@ def yield_pipeline_lineage_details(
processed_column_lineage = (
self._process_and_validate_column_lineage(
column_lineage=self.client.get_column_lineage(
job_id=pipeline_details.job_id,
entity_id=entity_id,
TableKey=(
source_table_full_name,
target_table_full_name,
Expand Down Expand Up @@ -1333,12 +1330,10 @@ def yield_pipeline_lineage_details(

else:
logger.debug(
f"No source or target table full name found for job {pipeline_details.job_id}"
f"No source or target table full name found for {entity_id}"
)
else:
logger.debug(
f"No table lineage found for job {pipeline_details.job_id}"
)
logger.debug(f"No table lineage found for {entity_id}")
except Exception as exc:
yield Either(
left=StackTraceError(
Expand Down
Loading