-
Notifications
You must be signed in to change notification settings - Fork 1
perf: cache metadata directory scans for large workflows #58
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 1 commit
aab0451
ef2d51b
b63d0e5
7e4b8a9
3c3e015
f8e48e2
b1a9831
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -199,19 +199,115 @@ def safe_file_size(path: Path) -> int: | |
| return 0 | ||
|
|
||
|
|
||
| def _scandir_files(directory: Path) -> list[_MetadataFileInfo]: | ||
| """Recursively scan directory for files using os.scandir (faster than rglob). | ||
|
|
||
| Args: | ||
| directory: Directory to scan. | ||
| class _ScanCache: | ||
| """Cache for directory scan results to avoid re-stat-ing thousands of files. | ||
|
|
||
| Returns: | ||
| List of _MetadataFileInfo for all files found. | ||
| Tracks directory mtimes to detect when new files are added. Since metadata | ||
| files are append-only (Snakemake adds them, never modifies or deletes), | ||
| we only need to rescan directories whose mtime has changed. | ||
|
coderabbitai[bot] marked this conversation as resolved.
Outdated
|
||
| """ | ||
|
|
||
| __slots__ = ("_files", "_dir_mtimes", "_lock") | ||
|
|
||
| def __init__(self) -> None: | ||
| self._files: list[_MetadataFileInfo] = [] | ||
| self._dir_mtimes: dict[str, float] = {} | ||
| self._lock = threading.Lock() | ||
|
|
||
| def get_files(self, directory: Path) -> list[_MetadataFileInfo]: | ||
| """Return cached file list, rescanning only changed directories.""" | ||
| with self._lock: | ||
| if not self._dir_mtimes: | ||
| # First call — full scan | ||
| self._files, self._dir_mtimes = _full_scandir(directory) | ||
| return list(self._files) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. issue (blocking): There's a |
||
|
|
||
| # Check which directories have changed | ||
| changed_dirs: list[str] = [] | ||
| current_mtimes: dict[str, float] = {} | ||
| _collect_dir_mtimes(str(directory), current_mtimes) | ||
|
|
||
| for dir_path, mtime in current_mtimes.items(): | ||
| old_mtime = self._dir_mtimes.get(dir_path) | ||
| if old_mtime is None or old_mtime != mtime: | ||
| changed_dirs.append(dir_path) | ||
|
|
||
| # Also detect removed directories | ||
| for dir_path in self._dir_mtimes: | ||
| if dir_path not in current_mtimes: | ||
| changed_dirs.append(dir_path) | ||
|
|
||
| if not changed_dirs: | ||
| return list(self._files) | ||
|
|
||
| # Rescan only changed directories; remove stale entries from them | ||
| changed_set = set(changed_dirs) | ||
| # Keep files NOT in changed directories | ||
| kept = [f for f in self._files if not any(str(f.path).startswith(d) for d in changed_set)] | ||
| # Scan changed directories for new/updated files | ||
| new_files: list[_MetadataFileInfo] = [] | ||
| for dir_path in changed_dirs: | ||
| if dir_path in current_mtimes: | ||
| _scan_single_dir(Path(dir_path), new_files) | ||
|
|
||
| self._files = kept + new_files | ||
| self._dir_mtimes = current_mtimes | ||
| return list(self._files) | ||
|
coderabbitai[bot] marked this conversation as resolved.
Outdated
|
||
|
|
||
| def clear(self) -> None: | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. issue: needs to be called in the test infrastructure once unit tests are added, to avoid flaky tests |
||
| """Clear the scan cache.""" | ||
| with self._lock: | ||
| self._files.clear() | ||
| self._dir_mtimes.clear() | ||
|
|
||
|
|
||
| def _collect_dir_mtimes(dir_path: str, result: dict[str, float]) -> None: | ||
| """Collect mtimes for a directory tree (dirs only, no file stats).""" | ||
| try: | ||
| stat_result = os.stat(dir_path) | ||
| result[dir_path] = stat_result.st_mtime | ||
| with os.scandir(dir_path) as entries: | ||
| for entry in entries: | ||
| try: | ||
| if entry.is_dir(follow_symlinks=False): | ||
| _collect_dir_mtimes(entry.path, result) | ||
| except OSError: | ||
| continue | ||
| except OSError: | ||
| pass | ||
|
|
||
|
|
||
| def _scan_single_dir(dir_path: Path, files: list[_MetadataFileInfo]) -> None: | ||
| """Scan a single directory (non-recursive) for files.""" | ||
| try: | ||
| with os.scandir(dir_path) as entries: | ||
| for entry in entries: | ||
| try: | ||
| if entry.is_file(follow_symlinks=False): | ||
| stat_result = entry.stat(follow_symlinks=False) | ||
| files.append( | ||
| _MetadataFileInfo( | ||
| path=Path(entry.path), | ||
| mtime=stat_result.st_mtime, | ||
| size=stat_result.st_size, | ||
| inode=stat_result.st_ino, | ||
| ) | ||
| ) | ||
| except OSError: | ||
| continue | ||
| except OSError: | ||
| pass | ||
|
|
||
|
|
||
| def _full_scandir(directory: Path) -> tuple[list[_MetadataFileInfo], dict[str, float]]: | ||
| """Full recursive scan, returning both file list and directory mtimes.""" | ||
| files: list[_MetadataFileInfo] = [] | ||
| dir_mtimes: dict[str, float] = {} | ||
|
|
||
| def _scan_recursive(dir_path: Path) -> None: | ||
| try: | ||
| dir_str = str(dir_path) | ||
| dir_mtimes[dir_str] = dir_path.stat().st_mtime | ||
| with os.scandir(dir_path) as entries: | ||
| for entry in entries: | ||
| try: | ||
|
|
@@ -233,7 +329,32 @@ def _scan_recursive(dir_path: Path) -> None: | |
| pass | ||
|
|
||
| _scan_recursive(directory) | ||
| return files | ||
| return files, dir_mtimes | ||
|
|
||
|
|
||
| # Global scan cache instance | ||
| _scan_cache = _ScanCache() | ||
|
|
||
|
|
||
| def get_scan_cache() -> _ScanCache: | ||
| """Get the global scan cache instance.""" | ||
| return _scan_cache | ||
|
|
||
|
|
||
| def _scandir_files(directory: Path) -> list[_MetadataFileInfo]: | ||
| """Recursively scan directory for files, using cached results when possible. | ||
|
|
||
| On the first call, performs a full recursive scan. On subsequent calls, | ||
| only rescans directories whose mtime has changed, avoiding re-stat-ing | ||
| thousands of unchanged files. | ||
|
|
||
| Args: | ||
| directory: Directory to scan. | ||
|
|
||
| Returns: | ||
| List of _MetadataFileInfo for all files found. | ||
| """ | ||
| return _scan_cache.get_files(directory) | ||
|
coderabbitai[bot] marked this conversation as resolved.
Outdated
|
||
|
|
||
|
|
||
| def _read_metadata_file( | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue: can you add unit tests: