Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 49 additions & 27 deletions src/tfmindi/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,10 +361,22 @@ def load_motif_annotations(

def load_motif_to_dbd(motif_annotations: pd.DataFrame) -> dict[str, str]:
"""
Create motif-to-DNA-binding-domain mapping for human TFs.
Create motif-to-DNA-binding-domain mapping via evidence-hierarchy voting.

Takes motif annotations and maps motifs to their DNA-binding domains
based on TF annotations and human TF database information.
For each motif, walks evidence tiers in order::

Direct_annot → Orthology_annot → Motif_similarity_annot
→ Motif_similarity_and_Orthology_annot

At each tier, parses the comma-separated TF list, looks up each TF's DBD in
the human TF database, and collapses to a set (family-collapsed count). If
the set has size 1, that DBD wins. If the set has size > 1 AND we are at
the Direct_annot tier, the motif is labelled ``"Composite"`` (genuine
multi-TF dimer). If a lower tier is ambiguous, we fall through to the next
tier. If no tier yields a decision, the motif is absent from the returned
dict.

TFs missing from the human TF database are silently skipped.

Parameters
----------
Expand All @@ -374,7 +386,8 @@ def load_motif_to_dbd(motif_annotations: pd.DataFrame) -> dict[str, str]:
Returns
-------
dict[str, str]
Dictionary mapping motif IDs to DNA-binding domain names
Dictionary mapping motif IDs to DNA-binding domain names (or
``"Composite"`` for multi-family dimer motifs).

Examples
--------
Expand All @@ -384,33 +397,42 @@ def load_motif_to_dbd(motif_annotations: pd.DataFrame) -> dict[str, str]:
>>> print(motif_to_dbd["hocomoco__FOXO1_HUMAN.H11MO.0.A"])
'Forkhead'
"""
motif_to_tf = motif_annotations.copy()

# Flatten all TF annotations into individual TF names
motif_to_tf = (
motif_to_tf.apply(lambda row: ", ".join(row.dropna()), axis=1)
.str.split(", ")
.explode()
.reset_index()
.rename({0: "TF"}, axis=1)
)

# Download human TF annotations with DNA-binding domains
human_tf_annot = pd.read_csv(
"https://humantfs.ccbr.utoronto.ca/download/v_1.01/DatabaseExtract_v_1.01.csv",
index_col=0,
)[["HGNC symbol", "DBD"]]

motif_to_tf = motif_to_tf.merge(right=human_tf_annot, how="left", left_on="TF", right_on="HGNC symbol")

# For each motif, take the most common (mode) DBD annotation
motif_to_dbd = (
motif_to_tf.dropna()
.groupby("MotifID")["DBD"]
.agg(lambda x: x.mode().iat[0]) # take the first mode if there's a tie
.reset_index()
)
)[["HGNC symbol", "DBD"]].dropna()
tf_to_dbd: dict[str, str] = dict(zip(human_tf_annot["HGNC symbol"], human_tf_annot["DBD"], strict=True))

tiers = [
"Direct_annot",
"Orthology_annot",
"Motif_similarity_annot",
"Motif_similarity_and_Orthology_annot",
]

motif_to_dbd = motif_to_dbd.set_index("MotifID")["DBD"].to_dict()
motif_to_dbd: dict[str, str] = {}

for motif_id, row in motif_annotations.iterrows():
label: str | None = None
for tier in tiers:
cell = row.get(tier)
if cell is None or (isinstance(cell, float) and pd.isna(cell)):
continue
tfs = [t.strip() for t in str(cell).split(",") if t.strip()]
dbds = {tf_to_dbd[tf] for tf in tfs if tf in tf_to_dbd}
if not dbds:
continue
if len(dbds) == 1:
label = next(iter(dbds))
break
# len(dbds) > 1
if tier == "Direct_annot":
label = "Composite"
break
# lower tier ambiguous → fall through
continue
if label is not None:
motif_to_dbd[motif_id] = label

return motif_to_dbd
15 changes: 15 additions & 0 deletions src/tfmindi/pp/seqlets.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,21 @@ def create_seqlet_adata(
motif_ppms_typed = [ppm.astype(dtype) for ppm in motif_ppms]
var_df["motif_ppm"] = motif_ppms_typed

# var_df is indexed by motif header name; annotations and DBD are keyed on
# the .cb file name stem and broadcast to every motif sharing that file.
# The header name must be globally unique so that .loc[name, col] returns
# a scalar — otherwise seqlet_dbd gets silently corrupted with Series
# values. Enforce that invariant loudly before any writes.
if not var_df.index.is_unique:
dupes = var_df.index[var_df.index.duplicated()].unique().tolist()
raise ValueError(
f"create_seqlet_adata: var_df index (motif header name) is not unique. "
f"Duplicates: {dupes[:10]}{'...' if len(dupes) > 10 else ''}. "
f"Two motifs from different .cb files share the same header — "
f"downstream per-seqlet DBD lookup would return a Series instead "
f"of a scalar."
)

# Store motif annotations in .var if provided
if motif_annotations is not None and motif_names is not None:
# Add annotations for motifs that are present in the similarity matrix
Expand Down
86 changes: 74 additions & 12 deletions src/tfmindi/tl/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,53 @@
from tfmindi.backends import get_backend, is_gpu_available


def _vote_dbd(
data: np.ndarray,
cols: np.ndarray,
dbd_values: np.ndarray,
top_k: int,
min_share: float,
) -> object:
"""Top-K weighted DBD vote for one seqlet.

Returns NaN if the row is empty, all top-K matches are NaN/Composite, or
the winner's share of the top-K vote falls below ``min_share``.
"""
if data.size == 0 or data.max() <= 0:
return np.nan
k = min(top_k, data.size)
top_local = np.argpartition(-data, k - 1)[:k]
top_cols = cols[top_local]
top_scores = data[top_local]
top_dbds = dbd_values[top_cols]

totals: dict[str, float] = {}
for dbd, score in zip(top_dbds, top_scores, strict=False):
if dbd is None or (isinstance(dbd, float) and np.isnan(dbd)):
continue
if dbd == "Composite":
continue
totals[dbd] = totals.get(dbd, 0.0) + float(score)

if not totals:
return np.nan
total_weight = sum(totals.values())
if total_weight <= 0:
return np.nan
winner, winner_weight = max(totals.items(), key=lambda kv: kv[1])
if winner_weight / total_weight < min_share:
return np.nan
return winner


def cluster_seqlets(
adata: AnnData, resolution: float = 3.0, pca_svd_solver: str | None = None, *, recompute: bool = False
adata: AnnData,
resolution: float = 3.0,
pca_svd_solver: str | None = None,
*,
recompute: bool = False,
top_k_motifs: int = 5,
dbd_vote_min_share: float = 0.4,
) -> None:
"""
Perform complete clustering workflow including dimensionality reduction, clustering, and functional annotation.
Expand Down Expand Up @@ -49,6 +94,11 @@ def cluster_seqlets(
recompute
If False (default), reuse existing PCA and neighborhood graph computations if available.
If True, always recompute PCA, neighbors, and t-SNE from scratch.
top_k_motifs
Number of top TomTom matches used in the per-seqlet DBD vote (default: 5).
dbd_vote_min_share
Minimum fraction of the top-K weighted vote the winner must hold to be
accepted; otherwise the seqlet is labelled NaN (default: 0.4).

Returns
-------
Expand Down Expand Up @@ -149,19 +199,26 @@ def cluster_seqlets(
adata.obs["mean_contrib"] = np.nan

if "dbd" in adata.var.columns:
# find top motif for all seqlets at once
# For sparse matrices, argmax along axis=1 gives the column index of max value in each row
from scipy import sparse

dbd_values = adata.var["dbd"].to_numpy(dtype=object)
seqlet_dbds: list[object] = []

if sparse.issparse(adata.X):
# argmax on sparse matrix can return 2D array, ensure 1D
top_motif_indices = np.asarray(adata.X.argmax(axis=1)).flatten()
X = adata.X.tocsr()
for i in range(X.shape[0]):
start, end = X.indptr[i], X.indptr[i + 1]
data = X.data[start:end]
cols = X.indices[start:end]
seqlet_dbds.append(_vote_dbd(data, cols, dbd_values, top_k_motifs, dbd_vote_min_share))
else:
top_motif_indices = adata.X.argmax(axis=1)
X_dense = np.asarray(adata.X)
for i in range(X_dense.shape[0]):
row = X_dense[i]
nz = np.flatnonzero(row > 0)
seqlet_dbds.append(_vote_dbd(row[nz], nz, dbd_values, top_k_motifs, dbd_vote_min_share))

top_motif_names = adata.var.index[top_motif_indices]
seqlet_dbds = [adata.var.loc[motif_name, "dbd"] for motif_name in top_motif_names]
adata.obs["seqlet_dbd"] = seqlet_dbds
adata.obs["seqlet_dbd"] = pd.Series(seqlet_dbds, index=adata.obs.index, dtype=object)
else:
print("Warning: No DBD annotations found in adata.var['dbd']")
adata.obs["seqlet_dbd"] = np.nan
Expand Down Expand Up @@ -189,8 +246,11 @@ def cluster_seqlets(
# Test: One-tailed binomial test asking "Is k significantly greater than expected?"
# This gives us a p-value for enrichment: P(X >= k | n, p)

# background probability.
dbd_to_probability = adata.var["dbd"].value_counts(normalize=True, dropna=False).to_dict()
# background probability. NaN is kept as legitimate background mass
# (unknown-DBD motifs still consume library slots); "Composite" is
# folded into NaN so it can never be selected as a cluster label.
_dbd_series = adata.var["dbd"].replace({"Composite": np.nan})
dbd_to_probability = _dbd_series.value_counts(normalize=True, dropna=False).to_dict()

def get_dbd_min_pval(df: pd.Series) -> str:
"""
Expand All @@ -207,7 +267,9 @@ def get_dbd_min_pval(df: pd.Series) -> str:
# k = n_success
# N = number of draws
# dbd_to_p = prob of sucess
p_value = binom.sf(k - 1, N, dbd_to_probability[dbd])
# Fall back to p=1.0 (no enrichment) for labels absent from the
# background — can only happen if a stray label slips through.
p_value = binom.sf(k - 1, N, dbd_to_probability.get(dbd, 1.0))
if p_value < min_pval:
min_pval = p_value
best_dbd = dbd
Expand Down
Loading
Loading