Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 170 additions & 12 deletions snakesee/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import os
import threading
from collections.abc import Iterator
from collections.abc import Sequence
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed
from dataclasses import dataclass
Expand Down Expand Up @@ -199,19 +200,147 @@ def safe_file_size(path: Path) -> int:
return 0


def _scandir_files(directory: Path) -> list[_MetadataFileInfo]:
"""Recursively scan directory for files using os.scandir (faster than rglob).
class _ScanCache:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue: can you add unit tests:

  • First call performs full scan
  • No-change call returns cached results
  • Adding/removing a file triggers rescan of that directory only
  • Removing an entire directory removes its files
  • Different root directory triggers full rescan
  • clear() resets state

"""Cache for directory scan results to avoid re-stat-ing thousands of files.

Args:
directory: Directory to scan.
Tracks directory mtimes to detect changes (additions, modifications, or
deletions). A directory's mtime updates whenever its direct entries change,
so this catches all file-level mutations within that directory.

Returns:
List of _MetadataFileInfo for all files found.
Note: This cache is designed for a single root directory. Calling get_files
with a different directory than the first call will trigger a full rescan.
"""

__slots__ = ("_dir_mtimes", "_files", "_lock", "_root")

def __init__(self) -> None:
self._files: tuple[_MetadataFileInfo, ...] = ()
self._dir_mtimes: dict[str, float] = {}
self._lock = threading.Lock()
self._root: str | None = None

def get_files(self, directory: Path) -> tuple[_MetadataFileInfo, ...]:
"""Return cached file tuple, rescanning only changed directories."""
dir_str = str(directory)
with self._lock:
if self._root is None or self._root != dir_str:
# First call or different directory — full scan
self._root = dir_str
files_list, self._dir_mtimes = _full_scandir(directory)
self._files = tuple(files_list)
return self._files

# Check which directories have changed
changed_dirs: list[str] = []
current_mtimes: dict[str, float] = {}
_collect_dir_mtimes(dir_str, current_mtimes)

for dir_path, mtime in current_mtimes.items():
old_mtime = self._dir_mtimes.get(dir_path)
if old_mtime is None or old_mtime != mtime:
changed_dirs.append(dir_path)

# Also detect removed directories
removed_dirs: list[str] = []
for dir_path in self._dir_mtimes:
if dir_path not in current_mtimes:
removed_dirs.append(dir_path)

if not changed_dirs and not removed_dirs:
# Check for in-place file modifications (mtime/inode changes)
updated = list(self._files)
any_changed = False
for i, f in enumerate(updated):
try:
st = f.path.stat()
if st.st_mtime != f.mtime or st.st_ino != f.inode:
updated[i] = _MetadataFileInfo(
path=f.path,
mtime=st.st_mtime,
size=st.st_size,
inode=st.st_ino,
)
any_changed = True
except OSError:
pass
if any_changed:
self._files = tuple(updated)
return self._files

# Evict only direct children of changed/removed dirs
changed_set = {Path(d) for d in changed_dirs}
removed_prefixes = [d + os.sep for d in removed_dirs]
kept = [
f
for f in self._files
if f.path.parent not in changed_set
and not any(str(f.path).startswith(p) for p in removed_prefixes)
]
# Scan changed directories for new/updated files
new_files: list[_MetadataFileInfo] = []
for dir_path in changed_dirs:
if dir_path in current_mtimes:
_scan_single_dir(Path(dir_path), new_files)

self._files = tuple(kept + new_files)
self._dir_mtimes = current_mtimes
return self._files
Comment thread
coderabbitai[bot] marked this conversation as resolved.

def clear(self) -> None:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue: needs to be called in the test infrastructure once unit tests are added, to avoid flaky tests

"""Clear the scan cache."""
with self._lock:
self._files = ()
self._dir_mtimes.clear()
self._root = None


def _collect_dir_mtimes(dir_path: str, result: dict[str, float]) -> None:
"""Collect mtimes for a directory tree (dirs only, no file stats)."""
try:
stat_result = os.stat(dir_path)
result[dir_path] = stat_result.st_mtime
with os.scandir(dir_path) as entries:
for entry in entries:
try:
if entry.is_dir(follow_symlinks=False):
_collect_dir_mtimes(entry.path, result)
except OSError:
continue
except OSError:
pass


def _scan_single_dir(dir_path: Path, files: list[_MetadataFileInfo]) -> None:
"""Scan a single directory (non-recursive) for files."""
try:
with os.scandir(dir_path) as entries:
for entry in entries:
try:
if entry.is_file(follow_symlinks=False):
stat_result = entry.stat(follow_symlinks=False)
files.append(
_MetadataFileInfo(
path=Path(entry.path),
mtime=stat_result.st_mtime,
size=stat_result.st_size,
inode=stat_result.st_ino,
)
)
except OSError:
continue
except OSError:
pass


def _full_scandir(directory: Path) -> tuple[list[_MetadataFileInfo], dict[str, float]]:
"""Full recursive scan, returning both file list and directory mtimes."""
files: list[_MetadataFileInfo] = []
dir_mtimes: dict[str, float] = {}

def _scan_recursive(dir_path: Path) -> None:
try:
dir_str = str(dir_path)
dir_mtimes[dir_str] = dir_path.stat().st_mtime
with os.scandir(dir_path) as entries:
for entry in entries:
try:
Expand All @@ -233,7 +362,36 @@ def _scan_recursive(dir_path: Path) -> None:
pass

_scan_recursive(directory)
return files
return files, dir_mtimes


# Global scan cache instance
_scan_cache = _ScanCache()


def get_scan_cache() -> _ScanCache:
"""Get the global scan cache instance."""
return _scan_cache


def _scandir_files(directory: Path, *, use_scan_cache: bool = True) -> Sequence[_MetadataFileInfo]:
"""Recursively scan directory for files, using cached results when possible.

On the first call, performs a full recursive scan. On subsequent calls,
only rescans directories whose mtime has changed, avoiding re-stat-ing
thousands of unchanged files.

Args:
directory: Directory to scan.
use_scan_cache: If False, bypass the scan cache and do a full scan.

Returns:
Sequence of _MetadataFileInfo for all files found.
"""
if not use_scan_cache:
files, _ = _full_scandir(directory)
return files
return _scan_cache.get_files(directory)


def _read_metadata_file(
Expand Down Expand Up @@ -315,14 +473,14 @@ def iterate_metadata_files(
if not metadata_dir.exists():
return

# Use fast scandir-based recursive scan
files = _scandir_files(metadata_dir)
# Use fast scandir-based recursive scan (bypass scan cache when use_cache=False)
files = _scandir_files(metadata_dir, use_scan_cache=use_cache)
if not files:
return

# Sort by mtime (newest first by default)
if sort_by_mtime:
files.sort(key=lambda f: f.mtime, reverse=newest_first)
files = sorted(files, key=lambda f: f.mtime, reverse=newest_first)

total = len(files)
cache = get_metadata_cache() if use_cache else None
Expand All @@ -335,7 +493,7 @@ def iterate_metadata_files(


def _iterate_metadata_sequential(
files: list[_MetadataFileInfo],
files: Sequence[_MetadataFileInfo],
cache: MetadataCache | None,
progress_callback: ProgressCallback | None,
total: int,
Expand All @@ -351,7 +509,7 @@ def _iterate_metadata_sequential(


def _iterate_metadata_parallel(
files: list[_MetadataFileInfo],
files: Sequence[_MetadataFileInfo],
cache: MetadataCache | None,
progress_callback: ProgressCallback | None,
total: int,
Expand Down
Loading