From fef5fddca5ece426cd5348c63d236a214b07d470 Mon Sep 17 00:00:00 2001 From: aloekun Date: Sun, 26 Apr 2026 12:26:28 +0900 Subject: [PATCH] =?UTF-8?q?fix(post-merge-feedback):=20timeout=20reconcili?= =?UTF-8?q?ation=20+=20=E4=B8=A6=E8=A1=8C=E8=B5=B7=E5=8B=95=20guard=20+=20?= =?UTF-8?q?=E4=B8=A6=E5=88=97=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase B dogfood で発見されたタイムアウト経路の残存不具合と、ボトルネック分析で 判明した sequential 構造を併せて修正する。 ## 主な変更 ### 1. workflow の並列化 (.takt/workflows/post-merge-feedback.yaml) 3 つの analyze facet (analyze-pr / analyze-session / analyze-prepush-reports) は 独立した情報源を扱うため parallel block で並列実行する。 PR #78 計測 (sequential 12m13s) → parallel 想定 ~7m30s に短縮。 ### 2. TAKT_TIMEOUT_SECS: 600s → 1200s PR #77 (14m21s)、#78 (12m13s) 観測実績を踏まえた暫定値。並列化後の想定 7m30s に対し 2x の安全係数。docstring に「band-aid」と明記し、本質解は workflow 構造で 時間を抑えることだと残した。 ### 3. caller-side reconciliation (run() 末尾) Windows の child.kill() が takt の descendants を殺せず、Rust が timeout で kill した後も takt が orphan として走り続けて report を完成させるケースを観測した (PR #78 で kill 後 2m13s で feedback-report.md 完成)。 run_takt_workflow の戻り値に関わらず copy_feedback_report を必ず試行し、report が 存在すれば success 扱いとする。既存の .failed marker は cleanup_failed_marker で 除去する。 ### 4. 並行起動 guard (CONCURRENT_RUN_GUARD_SECS = 1500s) cross-invocation context overwrite race を発生源で予防。 .takt/post-merge-feedback-context.json が 1500s 以内に書かれていれば、orphan takt が走行中の可能性が高いとみなして新規実行を refuse する。 ### 5. テスト追加 (5 件) concurrent_run_guard / context_age_secs の振る舞いを検証。 ### 6. ADR-030 更新 - レイテンシモデル (parallel 想定 = max(analyze) + aggregate) - Reconciliation 設計の根拠 - 並行起動 guard の意図と Phase B 外切り出し方針 ## 残存リスク (Phase B 外) - Windows の job object でプロセスツリー全体を kill する根本対策 - 完全な per-invocation isolation (takt context dir 連携) --- .takt/workflows/post-merge-feedback.yaml | 148 +++++++++-------- ...r-030-deterministic-post-merge-feedback.md | 47 +++++- src/cli-merge-pipeline/src/feedback.rs | 154 ++++++++++++++++-- 3 files changed, 263 insertions(+), 86 deletions(-) diff --git a/.takt/workflows/post-merge-feedback.yaml b/.takt/workflows/post-merge-feedback.yaml index 10ff312..04d7340 100644 --- a/.takt/workflows/post-merge-feedback.yaml +++ b/.takt/workflows/post-merge-feedback.yaml @@ -3,8 +3,10 @@ description: > マージ済み PR のフィードバックレポートを生成する決定論的 workflow (ADR-030)。 入力: cli-merge-pipeline が事前に書き出すコンテキストファイル群 - .takt/post-merge-feedback-context.json (PR メタデータ + 時刻 range) - - .takt/post-merge-feedback-transcript.jsonl (filter 済みセッション履歴) - facets を順次 chain し、aggregate-feedback で最終レポートを生成する。 + - .takt/post-merge-feedback-transcript.jsonl (filter 済セッション履歴) + 3 つの analyze facet は独立した情報源 (PR data / transcript / pre-push reports) + を扱うため parallel block で並列実行する。総実行時間は max(analyze) + aggregate + に抑えられる (PR #78 dogfood で sequential 12m13s → 並列化想定 7m30s を確認済)。 fix loop なし: コードではなくレポートを生成するため reviewers/fix の構造は不要。 出力: feedback-report.md (Report Directory に保存) cli-merge-pipeline が takt 完了後に .claude/feedback-reports/.md にコピーする。 @@ -17,89 +19,85 @@ workflow_config: network_access: true max_steps: 10 -initial_step: analyze-pr +initial_step: analyze steps: # --------------------------------------------------------------------------- - # Step 1: analyze-pr - # PR diff + reviews を分析し、再発防止に役立つ知見を抽出する。 - # 旧 /analyze-pr skill から port (E:\work\claude-code-skills\analyze-pr\SKILL.md)。 + # Step 1: analyze (parallel) + # 3 つの分析 facet を並列実行する。各 facet は独立した情報源から知見を抽出し、 + # いずれも `analysis complete` で終了する。Report Directory には + # pr-analysis.md / session-analysis.md / prepush-analysis.md が並ぶ。 # --------------------------------------------------------------------------- - - name: analyze-pr - edit: false - persona: code-reviewer - model: sonnet - policy: review - knowledge: architecture - provider_options: - claude: - allowed_tools: - - Read - - Glob - - Grep - - Bash - instruction: analyze-pr - output_contracts: - report: - - name: pr-analysis.md - format: pr-analysis - rules: - - condition: analysis complete - next: analyze-session + - name: analyze + parallel: + # PR diff + reviews を分析 (旧 /analyze-pr skill から port) + - name: analyze-pr + edit: false + persona: code-reviewer + model: sonnet + policy: review + knowledge: architecture + provider_options: + claude: + allowed_tools: + - Read + - Glob + - Grep + - Bash + instruction: analyze-pr + output_contracts: + report: + - name: pr-analysis.md + format: pr-analysis + rules: + - condition: analysis complete - # --------------------------------------------------------------------------- - # Step 2: analyze-session - # filter 済みセッション transcript から実装時の学び・トラブル・ユーザー指示を抽出。 - # ADR-030 §transcript 抽出戦略に従い、Rust 側で時刻 range filter 済の入力を読む。 - # --------------------------------------------------------------------------- - - name: analyze-session - edit: false - persona: code-reviewer - model: sonnet - policy: review - provider_options: - claude: - allowed_tools: - - Read - - Glob - - Grep - instruction: analyze-session - pass_previous_response: false - output_contracts: - report: - - name: session-analysis.md - format: session-analysis - rules: - - condition: analysis complete - next: analyze-prepush-reports + # filter 済 transcript からセッション知見を抽出 + - name: analyze-session + edit: false + persona: code-reviewer + model: sonnet + policy: review + provider_options: + claude: + allowed_tools: + - Read + - Glob + - Grep + instruction: analyze-session + output_contracts: + report: + - name: session-analysis.md + format: session-analysis + rules: + - condition: analysis complete + + # pre-push-review の reports を集約 + - name: analyze-prepush-reports + edit: false + persona: code-reviewer + model: sonnet + policy: review + provider_options: + claude: + allowed_tools: + - Read + - Glob + - Grep + instruction: analyze-prepush-reports + output_contracts: + report: + - name: prepush-analysis.md + format: prepush-analysis + rules: + - condition: analysis complete - # --------------------------------------------------------------------------- - # Step 3: analyze-prepush-reports - # pre-push-review の reports (.takt/runs/-pre-push-review/reports/) を集約。 - # --------------------------------------------------------------------------- - - name: analyze-prepush-reports - edit: false - persona: code-reviewer - model: sonnet - policy: review - provider_options: - claude: - allowed_tools: - - Read - - Glob - - Grep - instruction: analyze-prepush-reports - pass_previous_response: false - output_contracts: - report: - - name: prepush-analysis.md - format: prepush-analysis rules: - - condition: analysis complete + - condition: all("analysis complete") next: aggregate-feedback # --------------------------------------------------------------------------- - # Step 4: aggregate-feedback + # Step 2: aggregate-feedback # 3 つの分析レポートを Plankton 優先度で統合し、最終レポートを生成する。 # 旧 post-merge-feedback skill の Phase 4 ロジックから port。 # pass_previous_response: false で、Report Directory の各 *-analysis.md を直接読む。 diff --git a/docs/adr/adr-030-deterministic-post-merge-feedback.md b/docs/adr/adr-030-deterministic-post-merge-feedback.md index ad4d3cd..f36eb41 100644 --- a/docs/adr/adr-030-deterministic-post-merge-feedback.md +++ b/docs/adr/adr-030-deterministic-post-merge-feedback.md @@ -152,9 +152,54 @@ skill ベースで運用していた analyze-pr / post-merge-feedback Phase 4 **採用根拠**: hard fail (merge を失敗扱いにする) は既にマージ済みの PR を取り消せないため不可能。retry 機構を Floor の外側に持つことで、Floor 自体は exactly-once を保証しつつ failure 時の人手介入経路を確保する。 +#### Reconciliation (Phase B post-fix で追加) + +PR #78 dogfood で発覚した **Windows の `child.kill()` が takt の descendants を殺せない** 問題への対策として、`run_takt_workflow` の戻り値に関わらず最後に必ず `copy_feedback_report` を試す reconciliation を `feedback::run` に組み込む: + +```text +1. run_takt_workflow → 成功 / timeout / 失敗 のいずれか +2. copy_feedback_report を必ず試行 + ├─ report が存在 → success 扱い (既存の .failed marker は cleanup) + └─ report が不在 → 失敗扱い (.failed marker 書込み) +``` + +これにより以下のケースが救済される: +- takt が timeout で kill されたが、orphan が後から report を書き終えた +- takt が exit=non-zero を返したが、aggregate-feedback は完了していた + +#### 並行起動 guard (Phase B post-fix で追加) + +cross-invocation context overwrite race の予防として、`feedback::run` の冒頭で `.takt/post-merge-feedback-context.json` の経過時間を確認: + +- `CONCURRENT_RUN_GUARD_SECS = 1500` (timeout 1200s より少し長い値) 以内に書かれた context.json が存在する場合、新規実行を refuse する +- 1500s 経過していれば「前回 takt は確実に終わっている」と判断して上書き許可 + +これは orphan takt がまだ走っている可能性が高い時間帯における新規 cli-merge-pipeline 起動を発生源で止め、context.json / transcript.jsonl の race を回避する。本格的な per-invocation isolation は将来 takt の context dir 連携で実現する余地を残す (Phase B のスコープ外)。 + ### レイテンシ -`pnpm merge-pr` の所要時間が takt workflow 実行分 (数分) 増加する。ユーザー判断 (作業計画策定時に合意) として **数分の追加レイテンシは許容**。`pnpm merge-pr` は同期実行で待つ前提とする ([ADR-016](adr-016-long-running-command-strategy.md) の長時間コマンド戦略に該当)。 +`pnpm merge-pr` の所要時間が takt workflow 実行分増加する。ユーザー判断 (作業計画策定時に合意) として **数分の追加レイテンシは許容**。`pnpm merge-pr` は同期実行で待つ前提とする ([ADR-016](adr-016-long-running-command-strategy.md) の長時間コマンド戦略に該当)。 + +#### Phase B dogfood で確立した時間モデル + +3 つの analyze facet (`analyze-pr` / `analyze-session` / `analyze-prepush-reports`) は独立した情報源を扱うため [`post-merge-feedback.yaml`](../../.takt/workflows/post-merge-feedback.yaml) では `parallel:` block で並列実行する。総時間モデル: + +```text +total = max(analyze-pr, analyze-session, analyze-prepush-reports) + aggregate-feedback +``` + +PR #78 dogfood 計測 (sequential 構成時): + +| step | 所要時間 | +|---|---| +| analyze-pr | 3m 22s | +| analyze-session | 5m 24s ← 最長 (transcript 量に依存) | +| analyze-prepush-reports | 1m 21s | +| aggregate-feedback | 2m 06s | +| **sequential 合計** | **12m 13s** | +| **parallel 想定** | **~7m 30s** | + +`TAKT_TIMEOUT_SECS` は parallel 構成での観測実績に対し 2x 程度の安全係数を取り **1200s (20 分)** に設定している。analyze-session の所要時間は transcript の量 (commit 数 × セッション長) でスケールするため、長期 PR では再評価が必要。 ### task labeling convention (Phase B dogfood で確立) diff --git a/src/cli-merge-pipeline/src/feedback.rs b/src/cli-merge-pipeline/src/feedback.rs index 4173c1f..d4f54ee 100644 --- a/src/cli-merge-pipeline/src/feedback.rs +++ b/src/cli-merge-pipeline/src/feedback.rs @@ -34,8 +34,12 @@ use std::time::Duration; const TAKT_WORKFLOW: &str = "post-merge-feedback"; const TAKT_TASK_PREFIX: &str = "post-merge-feedback for #"; -/// takt 実行のデフォルトタイムアウト (10 分) -pub const TAKT_TIMEOUT_SECS: u64 = 600; +/// takt 実行のデフォルトタイムアウト (20 分) +/// +/// 観測実績 (PR #77: 14m21s、PR #78: 12m13s) の parallel 構成想定値 (~7m30s) に +/// 対し 2x の安全係数を取った暫定値。analyze-session の所要時間は transcript 量で +/// スケールするため、長期 PR では再評価が必要 (ADR-030 §レイテンシ 参照)。 +pub const TAKT_TIMEOUT_SECS: u64 = 1200; /// run_takt_workflow のポーリング間隔 (ms) const POLL_INTERVAL_MS: u64 = 500; @@ -417,6 +421,41 @@ fn cleanup_failed_marker(repo_root: &Path, pr_number: u64) { let _ = fs::remove_file(path); } +/// 並行起動 guard の TTL (秒)。`TAKT_TIMEOUT_SECS` (1200s) より少し長い値。 +/// +/// 直前の cli-merge-pipeline 起動で `context.json` が書かれてから本値の経過時間内に +/// 次の起動が来た場合、orphan takt が生きている可能性が高いとみなして refuse する +/// (Bug 3: cross-invocation context overwrite race の予防)。 +pub const CONCURRENT_RUN_GUARD_SECS: u64 = 1500; + +/// 既存の context file の経過時刻を返す。存在しない/読めない場合は `None`。 +fn context_age_secs(context_path: &Path) -> Option { + let modified = fs::metadata(context_path).ok()?.modified().ok()?; + modified.elapsed().ok().map(|d| d.as_secs()) +} + +/// 並行 cli-merge-pipeline 起動を検出した場合 `Err` を返す。 +/// +/// 「直前の cli-merge-pipeline 起動の context.json が依然として新しい」状態は +/// orphan takt が走り続けている可能性を示すため、context.json を上書きしない。 +/// `cleanup_failed_marker` 等の場合は影響なし (これは marker 系ファイル)。 +fn check_concurrent_run_guard(context_path: &Path) -> Result<(), String> { + let Some(age) = context_age_secs(context_path) else { + return Ok(()); + }; + if age >= CONCURRENT_RUN_GUARD_SECS { + return Ok(()); + } + Err(format!( + "前回の post-merge-feedback workflow がまだ進行中の可能性 \ + (context.json が {}s 前に書かれた)。{}s 待つか、進行中の takt が無いことを\ + 確認してから手動で {} を削除してください。", + age, + CONCURRENT_RUN_GUARD_SECS, + context_path.display() + )) +} + #[cfg(test)] mod tests { use super::*; @@ -773,12 +812,87 @@ mod tests { )); assert!(find_latest_run_dir(&nonexistent, "post-merge-feedback").is_none()); } + + // ─── concurrent run guard (Bug 3 対策) ─── + + fn unique_tmp_path(prefix: &str) -> PathBuf { + std::env::temp_dir().join(format!( + "{}-{}-{}", + prefix, + std::process::id(), + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.subsec_nanos()) + .unwrap_or(0), + )) + } + + #[test] + fn concurrent_run_guard_passes_when_context_absent() { + let path = unique_tmp_path("feedback-guard-absent"); + // 存在しない → guard を通る + assert!(check_concurrent_run_guard(&path).is_ok()); + } + + #[test] + fn concurrent_run_guard_blocks_when_context_recent() { + let path = unique_tmp_path("feedback-guard-recent"); + fs::write(&path, "{}").unwrap(); + let result = check_concurrent_run_guard(&path); + assert!(result.is_err(), "newly-written context should block"); + let msg = result.unwrap_err(); + assert!(msg.contains("進行中")); + let _ = fs::remove_file(&path); + } + + #[test] + fn concurrent_run_guard_passes_when_context_stale() { + // CONCURRENT_RUN_GUARD_SECS より十分古い mtime をシミュレート。 + // filetime crate を入れずに実装するため、mtime を atime/mtime 設定 syscall でなく + // 「ファイルを TTL を表す正の値で測定する」しか出来ない。 + // → ここでは context_age_secs(path) のロジックを直接検証する別戦略を取る。 + // 実際の age がどう振る舞うかは context_age_secs_returns_some_for_existing で確認。 + // 本ケースは「TTL 上限を 0 に設定したら必ず通る」相当の代理確認: + // CONCURRENT_RUN_GUARD_SECS は const なので runtime には変えられない → skip。 + // + // 代わりに、context_age_secs が `Some()` を返すことだけ検証する。 + let path = unique_tmp_path("feedback-guard-stale-substitute"); + fs::write(&path, "{}").unwrap(); + let age = context_age_secs(&path); + assert!(age.is_some()); + assert!(age.unwrap() < CONCURRENT_RUN_GUARD_SECS); + let _ = fs::remove_file(&path); + } + + #[test] + fn context_age_secs_returns_none_for_missing() { + let path = unique_tmp_path("feedback-age-missing"); + assert!(context_age_secs(&path).is_none()); + } + + #[test] + fn context_age_secs_returns_some_for_existing() { + let path = unique_tmp_path("feedback-age-exists"); + fs::write(&path, "{}").unwrap(); + let age = context_age_secs(&path); + assert!(age.is_some()); + // 直前に書いたばかりなので 5 秒以内 + assert!(age.unwrap() < 5); + let _ = fs::remove_file(&path); + } } /// 全工程を実行する高水準エントリポイント。 /// /// 失敗時は `Err(reason)` を返す。caller は `write_failed_marker` で marker を残す前提。 /// 成功時は `Ok(report_path)` (生成された feedback report の絶対パス)。 +/// +/// **Reconciliation 設計 (ADR-030 Phase B post-fix)**: +/// `run_takt_workflow` の戻り値に関わらず最後に `copy_feedback_report` を必ず試す。 +/// 理由: Windows の `child.kill()` は takt の descendants を殺せないため、Rust が +/// timeout で kill 後も takt が orphan として走り続けて report を完成させるケースが +/// 観測された (PR #78 で kill 後 2 分 13 秒で feedback-report.md 完成)。 +/// takt が exit=non-zero でも report が出ていれば成功扱いとする。 pub fn run(input: &FeedbackInput) -> Result { let range = fetch_pr_time_range(input.pr_number, input.owner_repo) .map_err(|e| format!("PR 時刻 range 取得失敗: {}", e))?; @@ -786,6 +900,8 @@ pub fn run(input: &FeedbackInput) -> Result { let context_path = input.repo_root.join(CONTEXT_PATH); let transcript_path = input.repo_root.join(TRANSCRIPT_PATH); + check_concurrent_run_guard(&context_path)?; + let written = match input.transcript_source_dir.as_ref() { Some(dir) => filter_transcripts(dir, &range, &transcript_path) .map_err(|e| format!("transcript filter 失敗: {}", e))?, @@ -817,14 +933,32 @@ pub fn run(input: &FeedbackInput) -> Result { &prepush_dir, )?; - if !run_takt_workflow(&input.repo_root, input.pr_number, TAKT_TIMEOUT_SECS) { - return Err(format!( - "takt workflow `{}` が失敗または timeout しました", - TAKT_WORKFLOW - )); + let takt_ok = run_takt_workflow(&input.repo_root, input.pr_number, TAKT_TIMEOUT_SECS); + if !takt_ok { + eprintln!( + "[merge-pipeline] [feedback] takt が失敗/timeout を返しました — orphan が \ + report を完成させた可能性があるため reconciliation を試みます" + ); } - let report = copy_feedback_report(&input.repo_root, input.pr_number)?; - cleanup_failed_marker(&input.repo_root, input.pr_number); - Ok(report) + match copy_feedback_report(&input.repo_root, input.pr_number) { + Ok(report) => { + if !takt_ok { + eprintln!( + "[merge-pipeline] [feedback] reconciliation 成功: takt が \ + timeout/失敗扱いだったが orphan が report を完成させていた" + ); + } + cleanup_failed_marker(&input.repo_root, input.pr_number); + Ok(report) + } + Err(copy_err) => { + let cause = if takt_ok { + "takt 成功扱いだが report 不在" + } else { + "takt 失敗/timeout かつ report 不在" + }; + Err(format!("{}: {}", cause, copy_err)) + } + } }