Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 148 additions & 9 deletions snakesee/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,19 +199,129 @@ def safe_file_size(path: Path) -> int:
return 0


def _scandir_files(directory: Path) -> list[_MetadataFileInfo]:
"""Recursively scan directory for files using os.scandir (faster than rglob).
class _ScanCache:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue: can you add unit tests:

  • First call performs full scan
  • No-change call returns cached results
  • Adding/removing a file triggers rescan of that directory only
  • Removing an entire directory removes its files
  • Different root directory triggers full rescan
  • clear() resets state

"""Cache for directory scan results to avoid re-stat-ing thousands of files.

Args:
directory: Directory to scan.
Tracks directory mtimes to detect changes (additions, modifications, or
deletions). A directory's mtime updates whenever its direct entries change,
so this catches all file-level mutations within that directory.

Returns:
List of _MetadataFileInfo for all files found.
Note: This cache is designed for a single root directory. Calling get_files
with a different directory than the first call will trigger a full rescan.
"""

__slots__ = ("_dir_mtimes", "_files", "_lock", "_root")

def __init__(self) -> None:
self._files: tuple[_MetadataFileInfo, ...] = ()
self._dir_mtimes: dict[str, float] = {}
self._lock = threading.Lock()
self._root: str | None = None

def get_files(self, directory: Path) -> tuple[_MetadataFileInfo, ...]:
"""Return cached file tuple, rescanning only changed directories."""
dir_str = str(directory)
with self._lock:
if self._root is None or self._root != dir_str:
# First call or different directory — full scan
self._root = dir_str
files_list, self._dir_mtimes = _full_scandir(directory)
self._files = tuple(files_list)
return self._files

# Check which directories have changed
changed_dirs: list[str] = []
current_mtimes: dict[str, float] = {}
_collect_dir_mtimes(dir_str, current_mtimes)

for dir_path, mtime in current_mtimes.items():
old_mtime = self._dir_mtimes.get(dir_path)
if old_mtime is None or old_mtime != mtime:
changed_dirs.append(dir_path)

# Also detect removed directories
removed_dirs: list[str] = []
for dir_path in self._dir_mtimes:
if dir_path not in current_mtimes:
removed_dirs.append(dir_path)

if not changed_dirs and not removed_dirs:
return self._files

# Evict only direct children of changed/removed dirs
changed_set = {Path(d) for d in changed_dirs}
removed_prefixes = [d + os.sep for d in removed_dirs]
kept = [
f
for f in self._files
if f.path.parent not in changed_set
and not any(str(f.path).startswith(p) for p in removed_prefixes)
]
# Scan changed directories for new/updated files
new_files: list[_MetadataFileInfo] = []
for dir_path in changed_dirs:
if dir_path in current_mtimes:
_scan_single_dir(Path(dir_path), new_files)

self._files = tuple(kept + new_files)
self._dir_mtimes = current_mtimes
return self._files
Comment thread
coderabbitai[bot] marked this conversation as resolved.

def clear(self) -> None:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue: needs to be called in the test infrastructure once unit tests are added, to avoid flaky tests

"""Clear the scan cache."""
with self._lock:
self._files = ()
self._dir_mtimes.clear()
self._root = None


def _collect_dir_mtimes(dir_path: str, result: dict[str, float]) -> None:
"""Collect mtimes for a directory tree (dirs only, no file stats)."""
try:
stat_result = os.stat(dir_path)
result[dir_path] = stat_result.st_mtime
with os.scandir(dir_path) as entries:
for entry in entries:
try:
if entry.is_dir(follow_symlinks=False):
_collect_dir_mtimes(entry.path, result)
except OSError:
continue
except OSError:
pass


def _scan_single_dir(dir_path: Path, files: list[_MetadataFileInfo]) -> None:
"""Scan a single directory (non-recursive) for files."""
try:
with os.scandir(dir_path) as entries:
for entry in entries:
try:
if entry.is_file(follow_symlinks=False):
stat_result = entry.stat(follow_symlinks=False)
files.append(
_MetadataFileInfo(
path=Path(entry.path),
mtime=stat_result.st_mtime,
size=stat_result.st_size,
inode=stat_result.st_ino,
)
)
except OSError:
continue
except OSError:
pass


def _full_scandir(directory: Path) -> tuple[list[_MetadataFileInfo], dict[str, float]]:
"""Full recursive scan, returning both file list and directory mtimes."""
files: list[_MetadataFileInfo] = []
dir_mtimes: dict[str, float] = {}

def _scan_recursive(dir_path: Path) -> None:
try:
dir_str = str(dir_path)
dir_mtimes[dir_str] = dir_path.stat().st_mtime
with os.scandir(dir_path) as entries:
for entry in entries:
try:
Expand All @@ -233,7 +343,36 @@ def _scan_recursive(dir_path: Path) -> None:
pass

_scan_recursive(directory)
return files
return files, dir_mtimes


# Global scan cache instance
_scan_cache = _ScanCache()


def get_scan_cache() -> _ScanCache:
"""Get the global scan cache instance."""
return _scan_cache


def _scandir_files(directory: Path, *, use_scan_cache: bool = True) -> list[_MetadataFileInfo]:
"""Recursively scan directory for files, using cached results when possible.

On the first call, performs a full recursive scan. On subsequent calls,
only rescans directories whose mtime has changed, avoiding re-stat-ing
thousands of unchanged files.

Args:
directory: Directory to scan.
use_scan_cache: If False, bypass the scan cache and do a full scan.

Returns:
List of _MetadataFileInfo for all files found.
"""
if not use_scan_cache:
files, _ = _full_scandir(directory)
return files
return list(_scan_cache.get_files(directory))


def _read_metadata_file(
Expand Down Expand Up @@ -315,8 +454,8 @@ def iterate_metadata_files(
if not metadata_dir.exists():
return

# Use fast scandir-based recursive scan
files = _scandir_files(metadata_dir)
# Use fast scandir-based recursive scan (bypass scan cache when use_cache=False)
files = _scandir_files(metadata_dir, use_scan_cache=use_cache)
if not files:
return

Expand Down
162 changes: 162 additions & 0 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
"""Tests for utility functions."""

import json
import time
from pathlib import Path

import pytest

from snakesee.utils import MetadataCache
from snakesee.utils import _ScanCache
from snakesee.utils import get_metadata_cache
from snakesee.utils import get_scan_cache
from snakesee.utils import iterate_metadata_files
from snakesee.utils import json_loads
from snakesee.utils import safe_file_size
Expand Down Expand Up @@ -232,6 +237,12 @@ def test_instance_is_metadata_cache(self) -> None:
class TestIterateMetadataFiles:
"""Tests for iterate_metadata_files function."""

@pytest.fixture(autouse=True)
def _clear_global_caches(self) -> None:
"""Clear global caches before each test to avoid cross-test contamination."""
get_scan_cache().clear()
get_metadata_cache().clear()

def test_empty_directory(self, tmp_path: Path) -> None:
"""Test iterating empty metadata directory."""
metadata_dir = tmp_path / "metadata"
Expand Down Expand Up @@ -354,3 +365,154 @@ def test_uses_cache_when_enabled(self, tmp_path: Path) -> None:
# With use_cache=False, should not populate cache
list(iterate_metadata_files(metadata_dir, use_cache=False))
assert len(get_metadata_cache()) == 0


class TestScanCache:
"""Tests for _ScanCache class."""

@pytest.fixture
def cache(self) -> _ScanCache:
"""Create a fresh _ScanCache instance."""
return _ScanCache()

def test_first_call_performs_full_scan(self, tmp_path: Path, cache: _ScanCache) -> None:
"""First call scans the directory and returns all files."""
d = tmp_path / "meta"
d.mkdir()
(d / "a.json").write_text("{}")
(d / "b.json").write_text("{}")

result = cache.get_files(d)
assert len(result) == 2
names = {f.path.name for f in result}
assert names == {"a.json", "b.json"}

def test_no_change_returns_cached(self, tmp_path: Path, cache: _ScanCache) -> None:
"""Subsequent call with no changes returns the same tuple object."""
d = tmp_path / "meta"
d.mkdir()
(d / "a.json").write_text("{}")

first = cache.get_files(d)
second = cache.get_files(d)
# Same object — no re-allocation
assert first is second

def test_adding_file_triggers_rescan(self, tmp_path: Path, cache: _ScanCache) -> None:
"""Adding a file to a directory triggers rescan of that directory."""
d = tmp_path / "meta"
d.mkdir()
(d / "a.json").write_text("{}")

result1 = cache.get_files(d)
assert len(result1) == 1

# Add a file — directory mtime changes
time.sleep(0.05)
(d / "b.json").write_text("{}")

result2 = cache.get_files(d)
assert len(result2) == 2
names = {f.path.name for f in result2}
assert names == {"a.json", "b.json"}

def test_removing_file_triggers_rescan(self, tmp_path: Path, cache: _ScanCache) -> None:
"""Removing a file from a directory triggers rescan of that directory."""
d = tmp_path / "meta"
d.mkdir()
(d / "a.json").write_text("{}")
(d / "b.json").write_text("{}")

result1 = cache.get_files(d)
assert len(result1) == 2

# Remove a file — directory mtime changes
time.sleep(0.05)
(d / "a.json").unlink()

result2 = cache.get_files(d)
assert len(result2) == 1
assert result2[0].path.name == "b.json"

def test_removing_directory_removes_its_files(
self, tmp_path: Path, cache: _ScanCache
) -> None:
"""Removing an entire subdirectory removes its files from the cache."""
d = tmp_path / "meta"
sub = d / "sub"
sub.mkdir(parents=True)
(d / "top.json").write_text("{}")
(sub / "nested.json").write_text("{}")

result1 = cache.get_files(d)
assert len(result1) == 2

# Remove subdirectory
import shutil

time.sleep(0.05)
shutil.rmtree(sub)

result2 = cache.get_files(d)
assert len(result2) == 1
assert result2[0].path.name == "top.json"

def test_different_root_triggers_full_rescan(
self, tmp_path: Path, cache: _ScanCache
) -> None:
"""Calling get_files with a different directory triggers a full rescan."""
d1 = tmp_path / "dir1"
d2 = tmp_path / "dir2"
d1.mkdir()
d2.mkdir()
(d1 / "a.json").write_text("{}")
(d2 / "b.json").write_text("{}")

result1 = cache.get_files(d1)
assert len(result1) == 1
assert result1[0].path.name == "a.json"

result2 = cache.get_files(d2)
assert len(result2) == 1
assert result2[0].path.name == "b.json"

def test_clear_resets_state(self, tmp_path: Path, cache: _ScanCache) -> None:
"""clear() resets all cached state."""
d = tmp_path / "meta"
d.mkdir()
(d / "a.json").write_text("{}")

cache.get_files(d)
cache.clear()

# After clear, internal state is empty
assert cache._files == ()
assert cache._dir_mtimes == {}
assert cache._root is None

def test_subdirectory_files_preserved_on_parent_change(
self, tmp_path: Path, cache: _ScanCache
) -> None:
"""Files in unchanged subdirectories are preserved when parent changes."""
d = tmp_path / "meta"
sub = d / "sub"
sub.mkdir(parents=True)
(d / "top.json").write_text("{}")
(sub / "nested.json").write_text("{}")

result1 = cache.get_files(d)
assert len(result1) == 2

# Change only the parent directory (add a file)
time.sleep(0.05)
(d / "new.json").write_text("{}")

result2 = cache.get_files(d)
assert len(result2) == 3
names = {f.path.name for f in result2}
assert "nested.json" in names # subdirectory file preserved

@pytest.fixture(autouse=True)
def _clear_global_scan_cache(self) -> None:
"""Clear the global scan cache before each test to avoid flaky tests."""
get_scan_cache().clear()