diff --git a/.gitignore b/.gitignore index 4d73a83..ec1b72f 100644 --- a/.gitignore +++ b/.gitignore @@ -27,6 +27,10 @@ dist *.swp *.swo +# Python +__pycache__ +*.pyc + # OS .DS_Store Thumbs.db diff --git a/scripts/analyze-style/README.md b/scripts/analyze-style/README.md new file mode 100644 index 0000000..4cd8e19 --- /dev/null +++ b/scripts/analyze-style/README.md @@ -0,0 +1,108 @@ +# Video Style Analyzer + +Extracts visual style from launch videos — color palettes, motion intensity, audio pacing, scene structure — and optionally uses Gemini Vision to synthesize a detailed style guide that a motion designer could use to recreate the look and feel. + +## Setup + +```bash +cd scripts/analyze-style +pip install -r requirements.txt +``` + +For Gemini Vision synthesis, set your API key: + +```bash +export GEMINI_API_KEY="your-key-here" +``` + +## Usage + +### Basic analysis (metrics only) + +```bash +python3 analyze.py /path/to/video.mp4 --output-dir ./output --output-json ./output/results.json +``` + +This runs the full metrics pipeline: +- **Scene detection** — PySceneDetect identifies scene boundaries and extracts keyframes (start/mid/end per scene) +- **Color analysis** — dominant colors, brightness, saturation, warmth per scene +- **Audio analysis** — tempo (BPM), beat count, energy curve, spectral features +- **Motion analysis** — frame-difference scoring per scene, editing pace classification + +### Full analysis with style synthesis + +```bash +GEMINI_API_KEY="your-key" python3 analyze.py /path/to/video.mp4 \ + --output-dir ./output \ + --output-json ./output/results.json \ + --synthesize +``` + +The `--synthesize` flag sends extracted keyframes to Gemini Vision and produces: +- **Per-scene descriptions** — layout composition, typography treatment, animation states, iconography style +- **Transition descriptions** — how scenes connect (hard cuts, dissolves, shared visual anchors) +- **Style guide** — a comprehensive breakdown covering visual identity, typography, motion language, layout, narrative structure, iconography, audio-visual sync, and overall feel + +### Flags + +| Flag | Description | +|------|-------------| +| `--output-dir DIR` | Where to save extracted keyframes (default: temp directory) | +| `--output-json PATH` | Write JSON results to file (default: prints to stdout) | +| `--skip-audio` | Skip librosa audio analysis | +| `--synthesize` | Run Gemini Vision synthesis (requires `GEMINI_API_KEY`) | + +### Run individual modules + +Each module can also be run standalone for debugging: + +```bash +python3 scene_detector.py /path/to/video.mp4 # Scene detection + keyframes +python3 color_analyzer.py /path/to/image.jpg # Color palette from an image +python3 audio_analyzer.py /path/to/video.mp4 # Audio features +python3 motion_analyzer.py /path/to/video.mp4 # Motion scores +python3 synthesizer.py /path/to/results.json # Run synthesis on existing results +``` + +## Output format + +The JSON output follows this structure: + +```jsonc +{ + "video_path": "...", + "video_info": { "fps", "total_frames", "duration", "resolution" }, + "scenes": [ + { + "scene_number": 1, + "start_time": 0.0, + "end_time": 10.5, + "duration": 10.5, + "keyframe_path": "output/scene_0001_mid.jpg", + "keyframe_paths": ["..._start.jpg", "..._mid.jpg", "..._end.jpg"], + "colors": { "dominant_color", "avg_brightness", "avg_saturation", "warmth", "palette_hex" }, + "motion_score": 2.03 + } + ], + "audio": { "tempo_bpm", "beat_count", "duration", "avg_energy", "energy_description", "pacing_description" }, + "motion": { "avg_motion_score", "motion_variance", "motion_description", "editing_pace" }, + "style_summary": { "color_mood", "avg_brightness", "pacing", "motion_level", "scene_count", "avg_scene_duration" }, + // Only present with --synthesize: + "synthesis": { + "scene_descriptions": ["..."], + "transition_descriptions": ["..."], + "style_guide": "..." + } +} +``` + +## Architecture + +``` +analyze.py — CLI entry point, orchestrates all modules +scene_detector.py — PySceneDetect keyframe extraction (start/mid/end per scene) +color_analyzer.py — colorgram.py color palette extraction + warmth/brightness analysis +audio_analyzer.py — librosa tempo, energy, spectral feature extraction +motion_analyzer.py — OpenCV frame-difference motion scoring +synthesizer.py — Gemini Vision scene captioning + style guide synthesis +``` diff --git a/scripts/analyze-style/analyze.py b/scripts/analyze-style/analyze.py new file mode 100644 index 0000000..3c60c50 --- /dev/null +++ b/scripts/analyze-style/analyze.py @@ -0,0 +1,264 @@ +"""Main CLI entry point for video style analysis. + +Orchestrates scene detection, color analysis, audio analysis, and motion +analysis to produce a comprehensive style profile of a video file. +""" + +from __future__ import annotations + +import argparse +import json +import os +import sys +import tempfile +from collections import Counter +from typing import Optional + +from scene_detector import detect_scenes +from color_analyzer import extract_palette, analyze_palette +from audio_analyzer import analyze_audio +from motion_analyzer import analyze_motion, get_video_info +from synthesizer import run_synthesis + + +def _log(msg: str) -> None: + """Print a progress message to stderr so stdout stays clean for JSON.""" + print(msg, file=sys.stderr) + + +def analyze_video(video_path: str, output_dir: str, skip_audio: bool = False) -> dict: + """Run the full analysis pipeline on a video file. + + Args: + video_path: Path to the input video file. + output_dir: Directory to save keyframes and intermediate results. + skip_audio: If True, skip audio analysis entirely. + + Returns: + A dictionary containing the complete style analysis results. + """ + video_path = os.path.abspath(video_path) + + # ── Video metadata ────────────────────────────────────────────────── + _log("Extracting video metadata...") + video_info: dict = get_video_info(video_path) + + # ── Scene detection ───────────────────────────────────────────────── + _log("Detecting scenes and extracting keyframes...") + scenes: list[dict] = detect_scenes(video_path, output_dir) + _log(f" Found {len(scenes)} scene(s).") + + # ── Color analysis per keyframe ───────────────────────────────────── + _log("Analyzing color palettes...") + for scene in scenes: + keyframe_path: str = scene.get("keyframe_path", "") + if keyframe_path and os.path.isfile(keyframe_path): + palette = extract_palette(keyframe_path) + scene["colors"] = analyze_palette(palette) + else: + scene["colors"] = { + "dominant_color": None, + "avg_brightness": 0.0, + "avg_saturation": 0.0, + "warmth": "neutral", + "palette_hex": [], + } + + # ── Audio analysis ────────────────────────────────────────────────── + audio_result: Optional[dict] = None + if skip_audio: + _log("Skipping audio analysis (--skip-audio).") + else: + _log("Analyzing audio track...") + try: + audio_result = analyze_audio(video_path) + _log(" Audio analysis complete.") + except (FileNotFoundError, RuntimeError) as exc: + _log(f" Audio analysis failed: {exc}") + audio_result = None + + # ── Motion analysis ───────────────────────────────────────────────── + _log("Analyzing motion...") + scene_boundaries: list[tuple[float, float]] = [ + (s["start_time"], s["end_time"]) for s in scenes + ] + motion_result: dict = analyze_motion(video_path, scene_boundaries) + + # Attach per-scene motion scores when available. + per_scene_scores: list[float] = motion_result.get("scene_motion_scores", []) + for i, scene in enumerate(scenes): + if i < len(per_scene_scores): + scene["motion_score"] = per_scene_scores[i] + + # ── Style summary ─────────────────────────────────────────────────── + _log("Computing style summary...") + style_summary = _compute_style_summary(scenes, audio_result, motion_result) + + # ── Assemble final output ─────────────────────────────────────────── + # Build a clean audio dict for the output (drop energy_curve for brevity). + audio_output: Optional[dict] = None + if audio_result is not None: + audio_output = { + "tempo_bpm": audio_result["tempo_bpm"], + "beat_count": audio_result["beat_count"], + "duration": audio_result["duration_seconds"], + "avg_energy": audio_result["avg_energy"], + "energy_description": audio_result["energy_description"], + "pacing_description": audio_result["pacing_description"], + "spectral_centroid_mean": audio_result["spectral_centroid_mean"], + } + + result: dict = { + "video_path": video_path, + "video_info": video_info, + "scenes": [ + { + "scene_number": s["scene_number"], + "start_time": s["start_time"], + "end_time": s["end_time"], + "duration": s["duration"], + "keyframe_path": s.get("keyframe_path", ""), + "keyframe_paths": s.get("keyframe_paths", []), + "colors": s.get("colors", {}), + **({"motion_score": s["motion_score"]} if "motion_score" in s else {}), + } + for s in scenes + ], + "audio": audio_output, + "motion": { + "avg_motion_score": motion_result.get("avg_motion_score", 0.0), + "motion_variance": motion_result.get("motion_variance", 0.0), + "motion_description": motion_result.get("motion_description", "unknown"), + "editing_pace": motion_result.get("editing_pace", "unknown"), + }, + "style_summary": style_summary, + } + + return result + + +def _compute_style_summary( + scenes: list[dict], + audio_result: Optional[dict], + motion_result: dict, +) -> dict: + """Derive high-level style descriptors from the raw analysis results.""" + # Color mood: majority vote of per-scene warmth values. + warmth_counts: Counter[str] = Counter() + brightness_total = 0.0 + brightness_count = 0 + + for scene in scenes: + colors = scene.get("colors", {}) + warmth_counts[colors.get("warmth", "neutral")] += 1 + bri = colors.get("avg_brightness", 0.0) + if bri > 0: + brightness_total += bri + brightness_count += 1 + + color_mood: str = warmth_counts.most_common(1)[0][0] if warmth_counts else "neutral" + avg_brightness: float = round(brightness_total / brightness_count, 2) if brightness_count else 0.0 + + # Pacing: prefer audio-derived pacing, fall back to editing pace. + if audio_result is not None: + pacing: str = audio_result.get("pacing_description", "unknown") + else: + pacing = motion_result.get("editing_pace", "unknown") + + motion_level: str = motion_result.get("motion_description", "unknown") + + scene_count: int = len(scenes) + total_duration = sum(s.get("duration", 0.0) for s in scenes) + avg_scene_duration: float = round(total_duration / scene_count, 3) if scene_count else 0.0 + + return { + "color_mood": color_mood, + "avg_brightness": avg_brightness, + "pacing": pacing, + "motion_level": motion_level, + "scene_count": scene_count, + "avg_scene_duration": avg_scene_duration, + } + + +def main() -> None: + parser = argparse.ArgumentParser( + description="Analyze the visual and auditory style of a video file.", + ) + parser.add_argument( + "video_path", + help="Path to the video file to analyze.", + ) + parser.add_argument( + "--output-dir", + default=None, + help="Directory to save keyframes and results. A temp dir is created if omitted.", + ) + parser.add_argument( + "--output-json", + default=None, + help="Path to write the JSON results file. Prints to stdout if omitted.", + ) + parser.add_argument( + "--skip-audio", + action="store_true", + default=False, + help="Skip audio analysis.", + ) + parser.add_argument( + "--synthesize", + action="store_true", + default=False, + help="Run Gemini Vision synthesis for rich style descriptions (requires GEMINI_API_KEY).", + ) + + args = parser.parse_args() + + # Resolve output directory. + if args.output_dir: + output_dir: str = os.path.abspath(args.output_dir) + else: + output_dir = tempfile.mkdtemp(prefix="video_style_") + _log(f"Using temporary output directory: {output_dir}") + + try: + result = analyze_video(args.video_path, output_dir, skip_audio=args.skip_audio) + except Exception as exc: + _log(f"Error: {exc}") + sys.exit(1) + + # ── Gemini Vision synthesis (optional) ──────────────────────────── + if args.synthesize: + _log("\n── Running Gemini Vision synthesis ──") + try: + synthesis = run_synthesis(result) + result["synthesis"] = synthesis + except RuntimeError as exc: + _log(f"Synthesis failed: {exc}") + result["synthesis"] = None + + # Write JSON output. + json_str = json.dumps(result, indent=2, ensure_ascii=False) + + if args.output_json: + output_json_path = os.path.abspath(args.output_json) + os.makedirs(os.path.dirname(output_json_path), exist_ok=True) + with open(output_json_path, "w", encoding="utf-8") as f: + f.write(json_str) + f.write("\n") + _log(f"Results written to {output_json_path}") + else: + print(json_str) + + # Final summary to stderr. + summary = result.get("style_summary", {}) + _log( + f"Analysis complete. {summary.get('scene_count', 0)} scenes detected. " + f"Style: {summary.get('color_mood', 'unknown')}, " + f"{summary.get('pacing', 'unknown')} pacing, " + f"{summary.get('motion_level', 'unknown')} motion." + ) + + +if __name__ == "__main__": + main() diff --git a/scripts/analyze-style/audio_analyzer.py b/scripts/analyze-style/audio_analyzer.py new file mode 100644 index 0000000..31959ef --- /dev/null +++ b/scripts/analyze-style/audio_analyzer.py @@ -0,0 +1,145 @@ +"""Audio feature extraction from video files using librosa and ffmpeg.""" + +import subprocess +import sys +import tempfile +from pathlib import Path + +import librosa +import numpy as np + + +def analyze_audio(video_path: str) -> dict: + """Extract audio features from a video file. + + Args: + video_path: Path to the input video file. + + Returns: + Dictionary containing extracted audio features. + + Raises: + FileNotFoundError: If the video file or ffmpeg is not found. + RuntimeError: If audio extraction or analysis fails. + """ + video = Path(video_path) + if not video.is_file(): + raise FileNotFoundError(f"Video file not found: {video_path}") + + # Extract audio from video to a temporary wav file + with tempfile.NamedTemporaryFile(suffix=".wav", delete=True) as tmp: + tmp_path = tmp.name + + try: + _extract_audio(video_path, tmp_path) + features = _compute_features(tmp_path) + finally: + # Clean up temp file + Path(tmp_path).unlink(missing_ok=True) + + return features + + +def _extract_audio(video_path: str, output_path: str) -> None: + """Use ffmpeg to extract audio from a video file as mono 22050 Hz wav.""" + cmd = [ + "ffmpeg", + "-i", video_path, + "-vn", # no video + "-acodec", "pcm_s16le", + "-ar", "22050", # sample rate that librosa expects by default + "-ac", "1", # mono + "-y", # overwrite + output_path, + ] + try: + result = subprocess.run( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + timeout=120, + ) + except FileNotFoundError: + raise FileNotFoundError( + "ffmpeg not found. Please install ffmpeg and ensure it is on your PATH." + ) + + if result.returncode != 0: + stderr = result.stderr.decode(errors="replace") + if "does not contain any stream" in stderr or "no audio" in stderr.lower(): + raise RuntimeError(f"Video has no audio track: {video_path}") + raise RuntimeError(f"ffmpeg failed (exit {result.returncode}): {stderr[:500]}") + + +def _compute_features(wav_path: str) -> dict: + """Compute audio features from a wav file using librosa.""" + y, sr = librosa.load(wav_path, sr=22050, mono=True) + + if len(y) == 0: + raise RuntimeError("Extracted audio is empty.") + + duration_seconds: float = float(librosa.get_duration(y=y, sr=sr)) + + # Tempo and beats + tempo, beat_frames = librosa.beat.beat_track(y=y, sr=sr) + tempo_bpm: float = float(np.atleast_1d(tempo)[0]) + beat_count: int = int(len(beat_frames)) + + # RMS energy (per frame) + rms = librosa.feature.rms(y=y)[0] + avg_energy: float = float(np.mean(rms)) + + # Energy curve sampled at ~1-second intervals + frames_per_second = sr / 512 # default hop_length for rms is 512 + step = max(1, int(round(frames_per_second))) + energy_curve: list[float] = [float(v) for v in rms[::step]] + + # Spectral centroid + centroid = librosa.feature.spectral_centroid(y=y, sr=sr)[0] + spectral_centroid_mean: float = float(np.mean(centroid)) + + # Descriptive labels + if avg_energy > 0.05: + energy_description = "high" + elif avg_energy > 0.01: + energy_description = "medium" + else: + energy_description = "low" + + if tempo_bpm > 140: + pacing_description = "fast" + elif tempo_bpm >= 90: + pacing_description = "moderate" + else: + pacing_description = "slow" + + return { + "tempo_bpm": tempo_bpm, + "beat_count": beat_count, + "duration_seconds": duration_seconds, + "avg_energy": avg_energy, + "energy_curve": energy_curve, + "spectral_centroid_mean": spectral_centroid_mean, + "energy_description": energy_description, + "pacing_description": pacing_description, + } + + +if __name__ == "__main__": + if len(sys.argv) < 2: + print(f"Usage: python {sys.argv[0]} ") + sys.exit(1) + + try: + result = analyze_audio(sys.argv[1]) + except (FileNotFoundError, RuntimeError) as exc: + print(f"Error: {exc}", file=sys.stderr) + sys.exit(1) + + for key, value in result.items(): + if isinstance(value, list): + print(f" {key}: [{len(value)} samples]") + elif isinstance(value, float): + print(f" {key}: {value:.4f}") + else: + print(f" {key}: {value}") diff --git a/scripts/analyze-style/color_analyzer.py b/scripts/analyze-style/color_analyzer.py new file mode 100644 index 0000000..9342384 --- /dev/null +++ b/scripts/analyze-style/color_analyzer.py @@ -0,0 +1,149 @@ +"""Extract and analyze dominant color palettes from keyframe images.""" + +import colorsys +import sys +from pathlib import Path + +import colorgram + + +def extract_palette(image_path: str, num_colors: int = 5) -> list[dict]: + """Extract the dominant color palette from an image. + + Args: + image_path: Path to the image file. + num_colors: Number of dominant colors to extract. + + Returns: + List of dicts sorted by proportion descending, each containing: + rgb: (r, g, b) tuple + hex: hex color string + proportion: float between 0 and 1 + """ + path = Path(image_path) + if not path.exists(): + raise FileNotFoundError(f"Image not found: {image_path}") + + try: + colors = colorgram.extract(str(path), num_colors) + except Exception as exc: + raise ValueError(f"Could not process image: {image_path}") from exc + + total = sum(c.proportion for c in colors) + palette: list[dict] = [] + for c in colors: + r, g, b = c.rgb.r, c.rgb.g, c.rgb.b + palette.append( + { + "rgb": (r, g, b), + "hex": f"#{r:02x}{g:02x}{b:02x}", + "proportion": c.proportion / total if total > 0 else 0.0, + } + ) + + palette.sort(key=lambda x: x["proportion"], reverse=True) + return palette + + +def analyze_palette(colors: list[dict]) -> dict: + """Analyze a color palette produced by extract_palette. + + Args: + colors: List of color dicts from extract_palette. + + Returns: + Dict with: + dominant_color: hex string of the highest-proportion color + avg_brightness: weighted average brightness (0-255) + avg_saturation: weighted average saturation (0-1) + warmth: "warm", "cool", or "neutral" + palette_hex: list of hex strings + """ + if not colors: + return { + "dominant_color": None, + "avg_brightness": 0.0, + "avg_saturation": 0.0, + "warmth": "neutral", + "palette_hex": [], + } + + dominant_color = colors[0]["hex"] + + total_brightness = 0.0 + total_saturation = 0.0 + total_weight = 0.0 + weighted_red = 0.0 + weighted_blue = 0.0 + + for c in colors: + r, g, b = c["rgb"] + w = c["proportion"] + total_weight += w + + # Brightness: simple average of RGB channels + brightness = (r + g + b) / 3.0 + total_brightness += brightness * w + + # Saturation via colorsys (HSV) + _, s, _ = colorsys.rgb_to_hsv(r / 255.0, g / 255.0, b / 255.0) + total_saturation += s * w + + weighted_red += r * w + weighted_blue += b * w + + avg_brightness = total_brightness / total_weight if total_weight > 0 else 0.0 + avg_saturation = total_saturation / total_weight if total_weight > 0 else 0.0 + + # Warmth: compare weighted red vs blue channels + if total_weight > 0: + avg_red = weighted_red / total_weight + avg_blue = weighted_blue / total_weight + diff = avg_red - avg_blue + if diff > 10: + warmth = "warm" + elif diff < -10: + warmth = "cool" + else: + warmth = "neutral" + else: + warmth = "neutral" + + return { + "dominant_color": dominant_color, + "avg_brightness": round(avg_brightness, 2), + "avg_saturation": round(avg_saturation, 4), + "warmth": warmth, + "palette_hex": [c["hex"] for c in colors], + } + + +if __name__ == "__main__": + if len(sys.argv) < 2: + print(f"Usage: {sys.argv[0]} [num_colors]") + sys.exit(1) + + img_path = sys.argv[1] + n_colors = int(sys.argv[2]) if len(sys.argv) > 2 else 5 + + try: + palette = extract_palette(img_path, n_colors) + except (FileNotFoundError, ValueError) as err: + print(f"Error: {err}") + sys.exit(1) + + print("Extracted palette:") + for i, color in enumerate(palette, 1): + print( + f" {i}. {color['hex']} " + f"RGB{color['rgb']} " + f"{color['proportion']:.1%}" + ) + + analysis = analyze_palette(palette) + print("\nAnalysis:") + print(f" Dominant color : {analysis['dominant_color']}") + print(f" Avg brightness : {analysis['avg_brightness']}") + print(f" Avg saturation : {analysis['avg_saturation']}") + print(f" Warmth : {analysis['warmth']}") + print(f" Palette : {', '.join(analysis['palette_hex'])}") diff --git a/scripts/analyze-style/motion_analyzer.py b/scripts/analyze-style/motion_analyzer.py new file mode 100644 index 0000000..8074dab --- /dev/null +++ b/scripts/analyze-style/motion_analyzer.py @@ -0,0 +1,196 @@ +"""Analyze motion intensity and editing pace in a video using frame differencing.""" + +from __future__ import annotations + +import sys +from pathlib import Path + +import cv2 +import numpy as np + + +def get_video_info(video_path: str) -> dict: + """Return basic metadata for a video file. + + Args: + video_path: Path to the video file. + + Returns: + Dict with fps, total_frames, duration, and resolution. + """ + path = Path(video_path) + if not path.exists(): + raise FileNotFoundError(f"Video not found: {video_path}") + + cap = cv2.VideoCapture(str(path)) + if not cap.isOpened(): + raise RuntimeError(f"Failed to open video: {video_path}") + + fps = cap.get(cv2.CAP_PROP_FPS) + total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) + width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + cap.release() + + duration = total_frames / fps if fps > 0 else 0.0 + + return { + "fps": fps, + "total_frames": total_frames, + "duration": round(duration, 3), + "resolution": {"width": width, "height": height}, + } + + +def _motion_description(avg_score: float) -> str: + """Map an average motion score (0-255) to a human-readable label.""" + if avg_score < 2: + return "static" + if avg_score < 8: + return "gentle" + if avg_score < 20: + return "moderate" + if avg_score < 40: + return "dynamic" + return "frenetic" + + +def _editing_pace(scene_boundaries: list[tuple[float, float]]) -> str: + """Classify editing pace based on average scene duration.""" + if not scene_boundaries: + return "unknown" + durations = [end - start for start, end in scene_boundaries] + avg_duration = sum(durations) / len(durations) + if avg_duration > 5.0: + return "slow" + if avg_duration >= 2.0: + return "moderate" + return "fast" + + +def analyze_motion( + video_path: str, + scene_boundaries: list[tuple[float, float]] | None = None, + sample_every: int = 5, +) -> dict: + """Analyze motion intensity across a video by sampling frame differences. + + Args: + video_path: Path to the video file. + scene_boundaries: Optional list of (start_sec, end_sec) tuples + defining scene boundaries. When provided, per-scene motion + scores and editing pace are included in the result. + sample_every: Process every Nth frame for performance (default 5). + + Returns: + Dict with avg_motion_score, motion_variance, scene_motion_scores, + total_frames, fps, resolution, motion_description, and + editing_pace (when scene_boundaries is provided). + """ + path = Path(video_path) + if not path.exists(): + raise FileNotFoundError(f"Video not found: {video_path}") + + cap = cv2.VideoCapture(str(path)) + if not cap.isOpened(): + raise RuntimeError(f"Failed to open video: {video_path}") + + fps = cap.get(cv2.CAP_PROP_FPS) + total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) + width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + + # -- Collect per-frame motion scores ------------------------------------ + motion_scores: list[float] = [] + prev_gray = None + frame_idx = 0 + + while True: + ret, frame = cap.read() + if not ret: + break + + if frame_idx % sample_every == 0: + gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) + if prev_gray is not None: + diff = cv2.absdiff(prev_gray, gray) + motion_scores.append(float(np.mean(diff))) + prev_gray = gray + + frame_idx += 1 + + cap.release() + + avg_motion = float(np.mean(motion_scores)) if motion_scores else 0.0 + motion_var = float(np.var(motion_scores)) if motion_scores else 0.0 + + # -- Per-scene motion scores -------------------------------------------- + scene_motion_scores: list[float] | None = None + editing_pace: str | None = None + + if scene_boundaries is not None: + scene_motion_scores = [] + for start_sec, end_sec in scene_boundaries: + start_frame = int(start_sec * fps) + end_frame = int(end_sec * fps) + + # Map sampled indices back: sampled index i corresponds to + # original frame i * sample_every. + scene_vals = [ + score + for i, score in enumerate(motion_scores) + # The diff at index i is between sampled frame i and i+1, + # so the source frame is (i + 1) * sample_every. + if start_frame <= (i + 1) * sample_every < end_frame + ] + scene_avg = float(np.mean(scene_vals)) if scene_vals else 0.0 + scene_motion_scores.append(round(scene_avg, 4)) + + editing_pace = _editing_pace(scene_boundaries) + + result: dict = { + "avg_motion_score": round(avg_motion, 4), + "motion_variance": round(motion_var, 4), + "scene_motion_scores": scene_motion_scores, + "total_frames": total_frames, + "fps": fps, + "resolution": {"width": width, "height": height}, + "motion_description": _motion_description(avg_motion), + } + + if editing_pace is not None: + result["editing_pace"] = editing_pace + + return result + + +if __name__ == "__main__": + if len(sys.argv) < 2: + print(f"Usage: {sys.argv[0]} ") + sys.exit(1) + + video_file = sys.argv[1] + + try: + info = get_video_info(video_file) + except (FileNotFoundError, RuntimeError) as exc: + print(f"Error: {exc}", file=sys.stderr) + sys.exit(1) + + print("Video info:") + print(f" FPS : {info['fps']}") + print(f" Total frames : {info['total_frames']}") + print(f" Duration : {info['duration']}s") + print(f" Resolution : {info['resolution']['width']}x{info['resolution']['height']}") + + try: + result = analyze_motion(video_file) + except (FileNotFoundError, RuntimeError) as exc: + print(f"Error: {exc}", file=sys.stderr) + sys.exit(1) + + print("\nMotion analysis:") + print(f" Avg motion score : {result['avg_motion_score']}") + print(f" Motion variance : {result['motion_variance']}") + print(f" Description : {result['motion_description']}") + print(f" Total frames : {result['total_frames']}") diff --git a/scripts/analyze-style/requirements.txt b/scripts/analyze-style/requirements.txt new file mode 100644 index 0000000..1889679 --- /dev/null +++ b/scripts/analyze-style/requirements.txt @@ -0,0 +1,5 @@ +scenedetect[opencv]>=0.6 +colorgram.py>=1.2 +librosa>=0.10 +opencv-python>=4.8 +numpy>=1.24 diff --git a/scripts/analyze-style/scene_detector.py b/scripts/analyze-style/scene_detector.py new file mode 100644 index 0000000..934fbdd --- /dev/null +++ b/scripts/analyze-style/scene_detector.py @@ -0,0 +1,115 @@ +# Scene detection and keyframe extraction using PySceneDetect. + +from __future__ import annotations + +import os +import sys + +import cv2 +from scenedetect import open_video, SceneManager +from scenedetect.detectors import AdaptiveDetector + + +def detect_scenes(video_path: str, output_dir: str) -> list[dict]: + if not os.path.isfile(video_path): + raise FileNotFoundError(f"Video not found: {video_path}") + + os.makedirs(output_dir, exist_ok=True) + + video = open_video(video_path) + scene_manager = SceneManager() + scene_manager.add_detector(AdaptiveDetector()) + scene_manager.detect_scenes(video) + + scene_list = scene_manager.get_scene_list() + + # If no scenes detected (e.g. very short video), treat the entire video as one scene. + if not scene_list: + video.reset() + total_frames = video.duration.get_frames() + fps = video.frame_rate + start_time = 0.0 + end_time = total_frames / fps + scene_list = [ + (video.base_timecode, video.base_timecode + video.duration), + ] + + cap = cv2.VideoCapture(video_path) + if not cap.isOpened(): + raise RuntimeError(f"Failed to open video with OpenCV: {video_path}") + + fps = cap.get(cv2.CAP_PROP_FPS) + results: list[dict] = [] + + for idx, (start, end) in enumerate(scene_list, start=1): + start_sec = start.get_seconds() + end_sec = end.get_seconds() + duration = end_sec - start_sec + + # For very short scenes (< 0.5s), extract a single midpoint frame. + # Otherwise extract 3 keyframes: start (10%), middle (50%), end (90%). + if duration < 0.5: + sample_points = [("mid", 0.5)] + else: + sample_points = [("start", 0.1), ("mid", 0.5), ("end", 0.9)] + + keyframe_paths: list[str] = [] + mid_keyframe_path = "" + + for label, pct in sample_points: + sec = start_sec + duration * pct + frame_num = int(sec * fps) + + cap.set(cv2.CAP_PROP_POS_FRAMES, frame_num) + ret, frame = cap.read() + + keyframe_filename = f"scene_{idx:04d}_{label}.jpg" + keyframe_path = os.path.join(output_dir, keyframe_filename) + + if ret: + cv2.imwrite(keyframe_path, frame) + keyframe_paths.append(keyframe_path) + else: + keyframe_paths.append("") + + if label == "mid": + mid_keyframe_path = keyframe_path if ret else "" + + results.append( + { + "scene_number": idx, + "start_time": round(start_sec, 3), + "end_time": round(end_sec, 3), + "duration": round(duration, 3), + "keyframe_paths": keyframe_paths, + "keyframe_path": mid_keyframe_path, + } + ) + + cap.release() + return results + + +if __name__ == "__main__": + if len(sys.argv) < 2: + print(f"Usage: {sys.argv[0]} [output_dir]") + sys.exit(1) + + video_file = sys.argv[1] + out_dir = sys.argv[2] if len(sys.argv) > 2 else "keyframes" + + try: + scenes = detect_scenes(video_file, out_dir) + except (FileNotFoundError, RuntimeError) as exc: + print(f"Error: {exc}", file=sys.stderr) + sys.exit(1) + + print(f"Detected {len(scenes)} scene(s):\n") + for scene in scenes: + paths = ", ".join(scene["keyframe_paths"]) + print( + f" Scene {scene['scene_number']:>3d} " + f"{scene['start_time']:>8.3f}s - {scene['end_time']:>8.3f}s " + f"(duration: {scene['duration']:.3f}s) " + f"keyframes: [{paths}]" + ) diff --git a/scripts/analyze-style/synthesizer.py b/scripts/analyze-style/synthesizer.py new file mode 100644 index 0000000..49a5a68 --- /dev/null +++ b/scripts/analyze-style/synthesizer.py @@ -0,0 +1,334 @@ +"""Gemini Vision-powered style synthesis from video analysis data. + +Uses the Gemini 2.0 Flash model to generate rich natural-language +descriptions of individual scenes, transitions between scenes, and an +overall style guide that a motion designer could use to recreate the +video's look and feel. +""" + +from __future__ import annotations + +import json +import os +import sys +import time +from pathlib import Path +from typing import Any + +import google.generativeai as genai + +# ── Constants ──────────────────────────────────────────────────────────────── + +_MODEL_NAME = "gemini-2.5-flash" +_MAX_RETRIES = 2 # initial attempt + 1 retry +_RETRY_DELAY_S = 2 + +_SCENE_SYSTEM_PROMPT = ( + "You are a motion design analyst. Describe this video scene's visual style " + "in detail.\n" + "Focus on: layout composition, typography style and treatment, color palette " + "and mood, animation states visible (blur, fade, scale, position), " + "iconography/illustration style, spatial relationships between elements, and " + "the overall design language.\n" + "Be specific about what you see — name fonts if recognizable, describe exact " + "positions, note any parallax or depth effects. Keep it to 3-5 sentences." +) + +_TRANSITION_SYSTEM_PROMPT = ( + "You are a motion design analyst. You are given two keyframes: the last " + "frame of one scene and the first frame of the next scene.\n" + "Describe the transition style between them. Is it a hard cut, a cross " + "dissolve, a fade to/from black, a morphing animation, a wipe, a zoom, " + "or something else? Note any shared visual elements that carry across the " + "cut, color shifts, and how the viewer's eye is guided between the two " + "compositions.\n" + "Keep it to 2-3 sentences." +) + +_SYNTHESIS_PROMPT_TEMPLATE = """\ +You are a creative director analyzing a launch video's style to create a reusable style guide. + +Based on the following video analysis data, scene descriptions, and transition descriptions, +write a comprehensive style synthesis that could be used to recreate a video in this exact style. + +Cover these dimensions: +1. VISUAL IDENTITY: Color palette, brightness, contrast approach +2. TYPOGRAPHY: Font choices, text animation style (reveals, fades, blurs), hierarchy +3. MOTION LANGUAGE: How elements enter/exit, easing style, speed, fluidity +4. LAYOUT & COMPOSITION: Spatial patterns, use of whitespace, element placement +5. NARRATIVE STRUCTURE: How the video flows between sections, pacing rhythm +6. ICONOGRAPHY & ILLUSTRATION: Style of visual elements, how they relate to content +7. AUDIO-VISUAL SYNC: How motion relates to the soundtrack tempo/energy +8. OVERALL FEEL: 2-3 adjectives that capture the video's personality + +Be extremely specific and actionable — this should be detailed enough that a motion designer +could recreate the style without seeing the original video. + +--- VIDEO ANALYSIS DATA --- +{analysis_json} + +--- SCENE DESCRIPTIONS --- +{scene_descriptions} + +--- TRANSITION DESCRIPTIONS --- +{transition_descriptions} +""" + + +# ── Helpers ────────────────────────────────────────────────────────────────── + +def _log(msg: str) -> None: + """Print a progress message to stderr so stdout stays clean.""" + print(msg, file=sys.stderr) + + +def _get_api_key() -> str: + """Return the Gemini API key from the environment. + + Checks ``GEMINI_API_KEY`` first, then falls back to ``GOOGLE_API_KEY``. + Raises :class:`RuntimeError` if neither is set. + """ + key = os.environ.get("GEMINI_API_KEY") or os.environ.get("GOOGLE_API_KEY") + if not key: + raise RuntimeError( + "Gemini API key not found. Set the GEMINI_API_KEY or GOOGLE_API_KEY " + "environment variable before running this module." + ) + return key + + +def _configure_client() -> genai.GenerativeModel: + """Configure the genai SDK and return a model instance.""" + genai.configure(api_key=_get_api_key()) + return genai.GenerativeModel(_MODEL_NAME) + + +def _upload_image(path: str) -> Any: + """Upload a local image file to Gemini and return the File handle.""" + return genai.upload_file(path) + + +def _call_with_retry(generate_fn, *args, **kwargs) -> str: + """Call *generate_fn* with retry logic for transient API failures. + + Retries once after a 2-second delay. Returns the response text on + success, or an error note string on final failure. + """ + last_exc: Exception | None = None + for attempt in range(_MAX_RETRIES): + try: + response = generate_fn(*args, **kwargs) + return response.text + except Exception as exc: + last_exc = exc + if attempt < _MAX_RETRIES - 1: + _log(f" API call failed ({exc}), retrying in {_RETRY_DELAY_S}s...") + time.sleep(_RETRY_DELAY_S) + return f"[ERROR: API call failed after {_MAX_RETRIES} attempts — {last_exc}]" + + +# ── Public API ─────────────────────────────────────────────────────────────── + +def describe_scene(keyframe_paths: list[str], scene_data: dict) -> str: + """Describe a single scene's visual style using Gemini Vision. + + Args: + keyframe_paths: 1-3 image file paths for this scene's keyframes. + scene_data: Metadata dict for the scene (colors, motion_score, duration). + + Returns: + A natural-language description of the scene's style. + """ + model = _configure_client() + + # Upload images (skip missing / empty paths). + uploaded_files: list[Any] = [] + for p in keyframe_paths[:3]: + if p and os.path.isfile(p): + uploaded_files.append(_upload_image(p)) + + # Build the prompt parts: images first, then textual context. + parts: list[Any] = list(uploaded_files) + + metadata_summary = ( + f"Scene metadata — duration: {scene_data.get('duration', 'N/A')}s, " + f"motion score: {scene_data.get('motion_score', 'N/A')}, " + f"colors: {json.dumps(scene_data.get('colors', {}), indent=2)}" + ) + parts.append(f"{_SCENE_SYSTEM_PROMPT}\n\n{metadata_summary}") + + return _call_with_retry(model.generate_content, parts) + + +def describe_transitions(scene_keyframes: list[list[str]]) -> list[str]: + """Describe the transition style between each pair of adjacent scenes. + + Args: + scene_keyframes: A list of keyframe-path lists, one per scene. + Each inner list contains the keyframe paths for that scene + (typically start, mid, end). + + Returns: + A list of transition descriptions. The list has length + ``len(scene_keyframes) - 1``. + """ + if len(scene_keyframes) < 2: + return [] + + model = _configure_client() + descriptions: list[str] = [] + + for i in range(len(scene_keyframes) - 1): + # End frame of scene i, start frame of scene i+1. + end_frame = _last_valid_path(scene_keyframes[i]) + start_frame = _first_valid_path(scene_keyframes[i + 1]) + + parts: list[Any] = [] + if end_frame: + parts.append(_upload_image(end_frame)) + if start_frame: + parts.append(_upload_image(start_frame)) + + if not parts: + descriptions.append("[No keyframes available for this transition]") + continue + + parts.append(_TRANSITION_SYSTEM_PROMPT) + desc = _call_with_retry(model.generate_content, parts) + descriptions.append(desc) + + return descriptions + + +def synthesize_style( + analysis_json: dict, + scene_descriptions: list[str], + transition_descriptions: list[str], +) -> str: + """Produce an overall style guide from analysis data and descriptions. + + This is a text-only call (no images). + + Args: + analysis_json: The full analysis JSON dict. + scene_descriptions: Per-scene style descriptions. + transition_descriptions: Per-transition descriptions. + + Returns: + A comprehensive style guide string. + """ + model = _configure_client() + + scenes_text = "\n\n".join( + f"Scene {i + 1}: {desc}" for i, desc in enumerate(scene_descriptions) + ) + transitions_text = "\n\n".join( + f"Transition {i + 1} -> {i + 2}: {desc}" + for i, desc in enumerate(transition_descriptions) + ) + + prompt = _SYNTHESIS_PROMPT_TEMPLATE.format( + analysis_json=json.dumps(analysis_json, indent=2, default=str), + scene_descriptions=scenes_text, + transition_descriptions=transitions_text or "(no transitions detected)", + ) + + return _call_with_retry(model.generate_content, prompt) + + +def run_synthesis(analysis_json: dict) -> dict: + """Orchestrate the full synthesis pipeline. + + Args: + analysis_json: The complete analysis JSON (as produced by + ``analyze.py``). Must contain a ``scenes`` key whose entries + have ``keyframe_paths``. + + Returns: + A dict with keys ``scene_descriptions``, ``transition_descriptions``, + and ``style_guide``. + """ + scenes = analysis_json.get("scenes", []) + total = len(scenes) + + # ── Scene descriptions ─────────────────────────────────────────────── + _log(f"Describing {total} scene(s) with Gemini Vision...") + scene_descriptions: list[str] = [] + for i, scene in enumerate(scenes): + _log(f" Scene {i + 1}/{total}...") + keyframe_paths = scene.get("keyframe_paths", []) + # Fall back to the single mid-frame if keyframe_paths is absent. + if not keyframe_paths: + kf = scene.get("keyframe_path", "") + keyframe_paths = [kf] if kf else [] + desc = describe_scene(keyframe_paths, scene) + scene_descriptions.append(desc) + + # ── Transition descriptions ────────────────────────────────────────── + _log("Describing transitions between scenes...") + all_keyframe_lists: list[list[str]] = [] + for scene in scenes: + kfp = scene.get("keyframe_paths", []) + if not kfp: + kf = scene.get("keyframe_path", "") + kfp = [kf] if kf else [] + all_keyframe_lists.append(kfp) + + transition_descriptions = describe_transitions(all_keyframe_lists) + _log(f" Described {len(transition_descriptions)} transition(s).") + + # ── Style synthesis ────────────────────────────────────────────────── + _log("Synthesizing overall style guide...") + style_guide = synthesize_style( + analysis_json, scene_descriptions, transition_descriptions + ) + _log("Synthesis complete.") + + return { + "scene_descriptions": scene_descriptions, + "transition_descriptions": transition_descriptions, + "style_guide": style_guide, + } + + +# ── Internal helpers ───────────────────────────────────────────────────────── + +def _last_valid_path(paths: list[str]) -> str | None: + """Return the last non-empty path that exists on disk, or None.""" + for p in reversed(paths): + if p and os.path.isfile(p): + return p + return None + + +def _first_valid_path(paths: list[str]) -> str | None: + """Return the first non-empty path that exists on disk, or None.""" + for p in paths: + if p and os.path.isfile(p): + return p + return None + + +# ── CLI entry point ────────────────────────────────────────────────────────── + +if __name__ == "__main__": + if len(sys.argv) < 2: + print(f"Usage: {sys.argv[0]} ", file=sys.stderr) + sys.exit(1) + + results_path = sys.argv[1] + + if not os.path.isfile(results_path): + print(f"Error: file not found: {results_path}", file=sys.stderr) + sys.exit(1) + + with open(results_path, "r", encoding="utf-8") as f: + analysis = json.load(f) + + try: + output = run_synthesis(analysis) + except RuntimeError as exc: + print(f"Error: {exc}", file=sys.stderr) + sys.exit(1) + + print(output["style_guide"])