Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions src/hermes/commands/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,19 @@ def init_common_parser(self, parser: argparse.ArgumentParser) -> None:
type=pathlib.Path,
help="Configuration file in TOML format",
)
# Add a new argument to accept a URL for harvesting (in harvest command)
parser.add_argument(
"--url",
type=str,
help="URL from which to extract metadata (GitHub or GitLab))"
)
# Add a new argument to accept a token (from GitHub or GitLab) for harvesting (in harvest command)
parser.add_argument(
"--token",
type=str,
required=False,
help="Access token for GitHub/GitLab (optional, only needed for private repos or GitHub/GitLab API plugin)"
)

plugin_args = parser.add_argument_group("Extra options")
plugin_args.add_argument(
Expand Down
2 changes: 2 additions & 0 deletions src/hermes/commands/clean/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import argparse
import shutil
import logging
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Aidajafarbigloo Is it necessary for your changes to include logging? If not, please remove it everywhere.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sferenz Thank you for the comments.
When using hermes clean command on Windows, I face this error:
Run subcommand clean
Removing HERMES caches...
An error occurred during execution of clean (Find details in './hermes.log')

Error in the "hermes.log":
Original exception was: [WinError 32] The process cannot access the file because it is being used by another process: '.hermes\\audit.log'.

This happens because Windows does not allow deletion of a file that is still open by the current process. The audit.log file inside .hermes is held open by a logging file handler. When shutil.rmtree() attempts to remove the directory, it fails due to the open file handle. I'm using logging.shutdown() to ensure that all logging handlers are closed before the directory is deleted, it does not introduce new logging behavior.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was fixed before but somehow the fix got lost... Weird 🤔

#226

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, not only once. But it seems more important to not have a log file in the working directory than having a properly cleaned .hermes cache.

you should be able to configure the path to the logfile, though.


from pydantic import BaseModel

Expand All @@ -25,6 +26,7 @@ class HermesCleanCommand(HermesCommand):

def __call__(self, args: argparse.Namespace) -> None:
self.log.info("Removing HERMES caches...")
logging.shutdown()

# Naive implementation for now... check errors, validate directory, don't construct the path ourselves, etc.
shutil.rmtree(args.path / '.hermes')
Expand Down
30 changes: 29 additions & 1 deletion src/hermes/commands/harvest/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,17 @@
import argparse
import typing as t
from datetime import datetime
import tempfile
import pathlib
from hermes import logger

from pydantic import BaseModel

from hermes.commands.base import HermesCommand, HermesPlugin
from hermes.model.context import HermesContext, HermesHarvestContext
from hermes.model.errors import HermesValidationError, MergeError

from hermes.commands.harvest.util.token import update_token_to_toml, remove_token_from_toml
from hermes.commands.harvest.util.clone import clone_repository

class HermesHarvestPlugin(HermesPlugin):
"""Base plugin that does harvesting.
Expand Down Expand Up @@ -44,6 +48,30 @@ def __call__(self, args: argparse.Namespace) -> None:
# Initialize the harvest cache directory here to indicate the step ran
ctx.init_cache("harvest")

logger.init_logging()
log = logger.getLogger("hermes.cli")

if args.url:
with tempfile.TemporaryDirectory(dir=".") as temp_dir:
temp_path = pathlib.Path(temp_dir)
log.info(f"Cloning repository {args.url} into {temp_path}")

try:
clone_repository(args.url, temp_path, recursive=True, depth=1, filter_blobs=True, sparse=False, verbose=True)
except Exception as exc:
print("ERROR:", exc)
args.path = temp_path # Overwrite args.path to temp directory

if args.token:
update_token_to_toml(args.token)
self._harvest(ctx)
if args.token:
remove_token_from_toml('hermes.toml')
else:
self._harvest(ctx)

def _harvest(self, ctx: HermesContext) -> None:
"""Harvest metadata from configured sources using plugins."""
for plugin_name in self.settings.sources:
try:
plugin_func = self.plugins[plugin_name]()
Expand Down
249 changes: 249 additions & 0 deletions src/hermes/commands/harvest/util/clone.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,249 @@
# SPDX-FileCopyrightText: 2026 OFFIS e.V.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please put UOL here.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

#
# SPDX-License-Identifier: Apache-2.0

# SPDX-FileContributor: Stephan Ferenz
# SPDX-FileContributor: Aida Jafarbigloo

import os
import re
import shutil
import subprocess
import tempfile
import time
import stat
from pathlib import Path
from urllib.parse import urlparse
from typing import Sequence

# ---------------- utilities ----------------

def _normalize_clone_url(url: str) -> str:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please provide a general comment for each function.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docstrings were added.

s = str(url).strip()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More expressive variable names would be nice. stripped_url? Or just overwrite the url parameter with the new value so the unstripped value isn't accidentally used again.

if re.match(r'^[\w.-]+@[\w.-]+:.*', s):
return s if s.endswith('.git') else s + '.git'
if s.startswith('file://'):
return s
if os.path.exists(s):
return s
p = urlparse(s)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More expressive variable name, e.g. parsed_url

if p.scheme in ('http', 'https'):
path = p.path if p.path.endswith('.git') else (p.path.rstrip('/') + '.git')
return f"{p.scheme}://{p.netloc}{path}"
if s.endswith('.git'):
return s
raise ValueError(f"Unsupported repository URL format: {url!r}")

def _clear_readonly(func, path, excinfo):
"""
onerror handler for shutil.rmtree: try to remove read-only flag and retry.
"""
try:
os.chmod(path, stat.S_IWRITE)
except Exception:
pass
try:
if os.path.isdir(path):
shutil.rmtree(path)
else:
os.remove(path)
except Exception:
pass

def rmtree_with_retries(path: Path, retries: int = 6, initial_wait: float = 0.1):
"""
Best-effort removal of path with retries and read-only handling.
- retries: number of attempts
- initial_wait: initial sleep (multiplies by 2 each retry)
Logs exceptions but never raises.
"""
if not path.exists():
return

wait = initial_wait
for attempt in range(1, retries + 1):
try:
# Ensure files are writable where possible
for root, dirs, files in os.walk(path, topdown=False):
for name in files:
p = os.path.join(root, name)
try:
os.chmod(p, stat.S_IWRITE)
except Exception:
pass
for name in dirs:
p = os.path.join(root, name)
try:
os.chmod(p, stat.S_IWRITE)
except Exception:
pass

shutil.rmtree(path, onerror=_clear_readonly)

if not path.exists():
return
except Exception as e:
print(f"warn: rmtree attempt {attempt} failed for {path!s}: {e!r}")
time.sleep(wait)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something is seriously wrong if a sleep is required.

wait *= 2

try:
alt = path.with_name(path.name + "_TO_DELETE")
try:
os.replace(str(path), str(alt))
shutil.rmtree(alt, onerror=_clear_readonly)
return
except Exception:
pass
except Exception:
pass

if path.exists():
print(f"error: failed to remove temp dir {path!s} after {retries} attempts. "
f"Please remove it manually. (Often caused by antivirus or open handles.)")

def _move_or_copy(src: Path, dst: Path):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why make things this complicated? We don't need atomic moves here.

try:
os.replace(str(src), str(dst))
except Exception:
shutil.copytree(str(src), str(dst))
rmtree_with_retries(src)

# ---------------- clone logic ----------------

def clone_repository(
url: str,
dest_dir: str,
recursive: bool = True,
depth: int | None = 1,
filter_blobs: bool = True,
sparse: bool = False,
branch: str | None = None,
insecure_ssl: bool = False,
*,
root_only: bool = False,
include_files: Sequence[str] | None = None,
verbose: bool = False,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of this verbose flag and print() calls, the logging features should be used with appropriate log levels (info, warning, error, ...)

) -> None:
"""
Robust clone that guarantees best-effort cleanup of temp dirs.
- Creates temp directories next to the target dest_dir.
- Always tries to remove temp dirs (even on success/failure).
"""
dest_path = Path(dest_dir)
parent = dest_path.parent
parent.mkdir(parents=True, exist_ok=True)

clone_url = _normalize_clone_url(url)
is_gitlab = "gitlab.com" in url.lower()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comments here: #278 (review)

if is_gitlab:
if verbose:
print("⚠️ GitLab detected: disabling --depth and --filter=blob:none for safety.")
depth = None
filter_blobs = False

env = os.environ.copy()
if insecure_ssl:
env["GIT_SSL_NO_VERIFY"] = "1"
Comment on lines +236 to +238
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?


created_temp_dirs: list[Path] = []

def build_cmd_for(temp_path: Path, optimized: bool):
cmd = ["git", "clone"]
if optimized:
if branch:
cmd += ["--branch", branch]
if depth is not None:
cmd += ["--depth", str(depth)]
if filter_blobs:
cmd += ["--filter=blob:none"]
if sparse or root_only or (include_files and len(include_files) > 0):
cmd += ["--sparse"]
if recursive:
cmd += ["--recurse-submodules"]
else:
if branch:
cmd += ["--branch", branch]
cmd += [clone_url, str(temp_path)]
return cmd

def attempt_clone(optimized: bool):
tmp = Path(tempfile.mkdtemp(prefix="clone_tmp_", dir=str(parent)))
created_temp_dirs.append(tmp)
cmd = build_cmd_for(tmp, optimized)
if verbose:
print("running:", " ".join(cmd))
proc = subprocess.run(cmd, capture_output=True, text=True, env=env)
return proc.returncode, proc, tmp

try:
# Try optimized
rc1, p1, tmp1 = attempt_clone(optimized=True)
if rc1 != 0:
if verbose:
print("warn: optimized clone failed. stderr:")
print(p1.stderr.strip() or "(no stderr)")
# Try fallback plain clone
rc2, p2, tmp2 = attempt_clone(optimized=False)
if rc2 != 0:
# both failed -> raise with both stderr
raise RuntimeError(
"Both optimized clone AND fallback clone failed.\n\n"
f"Optimized STDERR:\n{p1.stderr}\n\n"
f"Fallback STDERR:\n{p2.stderr}\n"
)

# fallback succeeded: move into place
if dest_path.exists():
if any(dest_path.iterdir()):
raise RuntimeError(f"Destination '{dest_path}' already exists and is not empty. Won't overwrite.")
else:
rmtree_with_retries(dest_path)

_move_or_copy(tmp2, dest_path)
if verbose:
print("✅ Repository cloned successfully (fallback/full clone).")
return

# optimized succeeded (tmp1)
if dest_path.exists():
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no reason to run into this problem at all. Just create a new temporary directory for every clone attempt.

if any(dest_path.iterdir()):
raise RuntimeError(f"Destination '{dest_path}' already exists and is not empty. Won't overwrite.")
else:
rmtree_with_retries(dest_path)

_move_or_copy(tmp1, dest_path)
if verbose:
print("✅ Repository cloned successfully (optimized clone).")

# if sparse/root_only/include_files were requested, apply sparse-checkout
if sparse or root_only or (include_files and len(include_files) > 0):
try:
subprocess.run(
["git", "-C", str(dest_path), "sparse-checkout", "init", "--no-cone"],
check=True
)
patterns: list[str] = []
if root_only:
patterns += ["/*", "!/*/"]
if include_files:
for p in include_files:
p = p.strip()
if p:
patterns.append(p if p.startswith("/") else f"/{p}")
if patterns:
subprocess.run(
["git", "-C", str(dest_path), "sparse-checkout", "set", "--no-cone", *patterns],
check=True
)
if verbose:
print("📁 Sparse checkout applied:", patterns)
except subprocess.CalledProcessError as e:
print("warn: sparse-checkout setup failed:", e)

finally:
for t in created_temp_dirs:
try:
rmtree_with_retries(t)
except Exception as e:
print(f"warn: final cleanup failed for {t}: {e!r}")
Loading