私はスクリーン分析ツールを一つの理由で構築しました:私はスクリーン録画を見ることができ、ワークフローとして起こったことを推測し、その理解を自動化のアーティファクト(n8n フロー、ステップ リスト、構造化された概要、パイプライン全体)に変えることができます。 コストの高い部分はJSONを生成したりレポートを表示することではありません。それは多様な理解のステップです - モデルに送信する追加のフレームはすべて現実のお金です。スクリーンレコーディングは、固定レートのサンプリングのための最悪の入力配布です:静的なUIの長いストレッチ、ユーザーがクリックする突然のマイクロブレスト、2つの文字を入力、ダウンロードが開く、モダルフラッシュ、またはタブ交換。 だから私はフレームを「データポイント」として扱うのをやめ、予算のように扱い始めました。 まず何が間違ったのか(変化を余儀なくされた失敗) 私の最初のカットは明らかなものでありました: 各 Nth フレームのサンプル。 そのバージョンは2つの方法で失敗し、両方とも製品に顕著でした。 私のダッシュボードの典型的なアップロードは6〜12分の録音です。これらの録音の多くでは、ユーザーは考えるために休憩をとり、ページを読み、またはコースターがそこに座っているだけです。 Failure #1: it wasted frames on dead air. 最悪の出来事(サンプラーを書き換えさせた出来事)は、重要な許可のダウンロードが迅速に開いて閉じられた短い管理者ワークフローのレコーディングでした。 統一サンプラーはダウンロードの直前と直後にフレームを捕らえました - したがって、モデルは許可の選択を決して見たことがありません。 Failure #2: it missed the “blink-and-you-miss-it” UI moments. 実際のアクションを欠いている間に静的な時間を支払うという組み合わせは容認できないものであり、ビデオが変化している場所でフレームを費やし、静的なUIでレンタルを支払うのをやめるサンプラーが必要でした。 The key idea: spending frames where the information is ナイフなアプローチは「すべてのNthフレームをサンプル」で、それは公平に感じます。 スクリーン レコーディングは映画ではありません。 レギュラーに近い:長い安定した状態は短い移行によってポイント化されます。 あなたは安定したUIのために過払いします。 あなたは、ワークフローが実際に起こる場所の短い爆発を支援します。 一つの類似性(私はそれを一度使用し、継続します):均一なサンプルは、影響に関係なくすべてのチームに同じボーナスを支払うようなものです。 解決策は、変更を安く測定し、時間をセグメントにグループ化し、これらのセグメントに固定キーフレーム予算を割り当てることです。 Runtime architecture: where the sampler sits. ランタイムアーキテクチャ 運用的には、システムは2つの協力部品に分かれています: アップロードされたビデオを処理し、キーフレームを選択し、マルチモダル分析を実行し、構造化された結果のパイロードを生成するPython分析サービス(Cloud Run Service)です。 Next.js アプリは、Webhook 経由で結果を受け取り、それを持続します(そしてダッシュボードの UI を駆動します)。 サンプラーは、高価なモデル呼び出しの前に、分析器の内部に住んでいます。 非明確な点:サンプラーは「良い最適化」ではありません。それは制御表面です. それは「このビデオはどのくらい長いですか?」を「どのくらいの分析を支払いたいですか?」に変換します。 解析器は、サインされた Webhook — HMAC-SHA256 を介して、正確な JSON バイトで Next.js アプリに結果を送信します。 受信エンドで重要なレッスン:実際に送信したバイトをサインする( ), HTTP ライブラリによって再連続化される Python ディクトではありません. That mismatch produces intermittent verification failures that feel like ghosts. But the webhook seam is a different post—what matters here is what happens. この不一致は、幽霊のように感じる間隔的な検証の失敗を生成します。 payload が analyzer を離れます。 timingSafeEqual data=body_bytes 前 Adaptive keyframe sampling: score → segment → allocate → pick indexes サンプラーは4段階のパイプラインです。 スコアはフレームごとに安く変わります(またはフレームごとに)。 タイムラインを「ほとんど安定」と「高変化」のランクに分割します。 ガードレイルを使用してセグメントに固定キーフレーム予算を割り当てます。 各セグメント内の具体的なフレームインデックスを選択します。 この構造はそれを実用的に保つものです。スコアは安いです。セグメントは線形です。割り当ては予測可能です。抽出ステップは機械的です。 ステージ1 - スコアリング:安価な視覚変化 私は、スコアを安く保つことに攻撃的です. スコアがあまりにも費用がかかる場合、私はパイプラインで早めに口座を移動します。 画面レコーディングのための最も信頼性の高いベースライン信号は、 : frame difference energy グレイスケールに変換する 連続フレーム間の絶対差を計算します。 ディフ画像の平均値(オプションで正常化)を取る。 捕まえたのは: コースター運動 タップ (blinking caret and text updates) Dropdowns と Modals Page 移行 Hover Stateの変化 それは完璧ではありませんが、それは速く、そして「何かが起こった」とよく関連しています。 Stage 2 — Segment formation: turn a noisy score stream into runs. 段階2 — セグメント形成:ノイズスコアストリームをランに変える Per-frame scores are spiky. I don't want the allocator to chase noise. フレームごとのスコアはピッタリです。 したがって、私はヒステリックでシンプルな状態のマシンを用いて分割します: ローリング平均スコアを維持する 「ホット」セグメントへの移行は、ローリングスコアが限界を超えるときに起こります。 低い値の下に落ちたときに「冷たい」に戻る。 最小のセグメントの長さを強化して、何百ものマイクロセグメントを作成しないようにします。 これは学術的な変更点検出ではありません。それはエンジニアリングです:安定した行動、低い調節オーバーヘッド、予測可能な出力。 ステップ3 - ガードレイルによる予算配分 pure proportional allocation is not enough. It fails on rounding and it can starve short segments.純比例配分は不十分です。 だから、私の分配者は3つのルールを持っています: 各セグメントには、フロア(min_frames_per_segment)が付属します。 どのセグメントも 1 つのマップ (max_frames_per_segment) を超えることはできません。 残りの予算は、セグメントユーティリティに比例して配分されます。 これは分布を安定させ、病理を防ぐ。 ステップ4 - セグメント内のインデックス選択 セグメントに K フレームが割り当てられると、セグメント全体に広がる K インデックスを選択します。 常にセグメントの開始を含む(移行の問題)。 常にセグメントの端を含む(最終状態の問題)。 残りの部分を均等に割り当てた指数で満たす。 後でより多くの忠誠心が必要な場合は、得点のローカルマクシマに向かって偏見を示すことができますが、均等に割り当てられた選択は強力なベースラインであり、コードをまっすぐに保つことができます。 完全な実行可能な実装(スコア + セグメント + 割り当て + 抽出) この投稿をコピーできるようにするには、どのMP4に対しても実行できる単一のPythonスクリプトがあります。 依存性: オープンPython ナンパ 走る: python adaptive_keyframes.py --video input.mp4 --budget 60 --out ./keyframes こちらは : adaptive_keyframes.py import argparse import os from dataclasses import dataclass from typing import List, Tuple import cv2 import numpy as np @dataclass class Segment: start: int # inclusive frame index end: int # exclusive frame index score: float @property def length(self) -> int: return max(0, self.end - self.start) def frame_diff_score(prev_bgr: np.ndarray, curr_bgr: np.ndarray) -> float: """Cheap per-frame change score in [0, 1] (roughly). Uses grayscale mean absolute difference normalized by 255. """ prev_gray = cv2.cvtColor(prev_bgr, cv2.COLOR_BGR2GRAY) curr_gray = cv2.cvtColor(curr_bgr, cv2.COLOR_BGR2GRAY) diff = cv2.absdiff(prev_gray, curr_gray) return float(diff.mean() / 255.0) def compute_scores( cap: cv2.VideoCapture, stride: int = 1, max_frames: int | None = None, ) -> Tuple[List[float], int]: """Return (scores, total_frames_read). scores[i] is the change score between frame i and i+stride (based on sampled reads). """ scores: List[float] = [] ok, prev = cap.read() if not ok: return scores, 0 frame_idx = 1 frames_read = 1 while True: # Skip stride-1 frames between comparisons. for _ in range(stride - 1): ok = cap.grab() if not ok: return scores, frames_read frame_idx += 1 frames_read += 1 if max_frames is not None and frames_read >= max_frames: return scores, frames_read ok, curr = cap.read() if not ok: return scores, frames_read frames_read += 1 s = frame_diff_score(prev, curr) scores.append(s) prev = curr frame_idx += 1 if max_frames is not None and frames_read >= max_frames: return scores, frames_read def segment_scores( scores: List[float], window: int = 8, hot_thresh: float = 0.030, cold_thresh: float = 0.020, min_len: int = 12, ) -> List[Segment]: """Convert per-step scores into segments with a utility score. Uses a rolling mean with hysteresis to avoid segment flicker. """ if not scores: return [] # Rolling mean via cumulative sum. x = np.array(scores, dtype=np.float32) c = np.cumsum(np.insert(x, 0, 0.0)) def roll_mean(i: int) -> float: j0 = max(0, i - window + 1) n = i - j0 + 1 return float((c[i + 1] - c[j0]) / n) segments: List[Segment] = [] state_hot = False seg_start = 0 seg_scores: List[float] = [] for i in range(len(scores)): rm = roll_mean(i) if state_hot: seg_scores.append(scores[i]) if rm < cold_thresh: # Close hot segment at i+1 seg_end = i + 1 if seg_end - seg_start < min_len: # Too short: merge into previous if possible, else keep. pass segments.append(Segment(seg_start, seg_end, float(np.mean(seg_scores) if seg_scores else 0.0))) # Start cold state_hot = False seg_start = seg_end seg_scores = [] else: if rm > hot_thresh: # Close cold segment seg_end = i + 1 cold_score = float(np.mean(scores[seg_start:seg_end]) if seg_end > seg_start else 0.0) segments.append(Segment(seg_start, seg_end, cold_score)) # Start hot state_hot = True seg_start = seg_end seg_scores = [] # Close tail tail_end = len(scores) if tail_end > seg_start: tail_score = float(np.mean(scores[seg_start:tail_end])) segments.append(Segment(seg_start, tail_end, tail_score)) # Merge very short segments to keep output stable. merged: List[Segment] = [] for seg in segments: if not merged: merged.append(seg) continue if seg.length < min_len: prev = merged[-1] combined = Segment(prev.start, seg.end, (prev.score * prev.length + seg.score * seg.length) / max(1, (prev.length + seg.length))) merged[-1] = combined else: merged.append(seg) # One more pass: ensure non-empty and strictly increasing. cleaned: List[Segment] = [] for seg in merged: if seg.length <= 0: continue if cleaned and seg.start < cleaned[-1].end: seg = Segment(cleaned[-1].end, seg.end, seg.score) if seg.length > 0: cleaned.append(seg) return cleaned def allocate_frames( segments: List[Segment], budget: int, min_frames_per_segment: int = 1, max_frames_per_segment: int = 30, ) -> List[int]: """Allocate keyframes to segments using floor + proportional + cap.""" if budget <= 0 or not segments: return [] n = len(segments) min_total = min_frames_per_segment * n # If budget is smaller than the floor, distribute 1-by-1. if min_total >= budget: alloc = [0] * n for i in range(budget): alloc[i % n] += 1 return alloc utilities = np.array([max(0.0, s.score) for s in segments], dtype=np.float64) total_u = float(utilities.sum()) alloc = [min_frames_per_segment] * n remaining = budget - min_total if total_u == 0.0: raw = np.full(n, remaining / n, dtype=np.float64) else: raw = utilities * (remaining / total_u) # Add integer parts. for i in range(n): add = int(raw[i]) alloc[i] = min(max_frames_per_segment, alloc[i] + add) allocated = sum(alloc) # Distribute leftover by fractional parts, respecting caps. if allocated < budget: frac = raw - np.floor(raw) order = np.argsort(-frac) # descending fractional idx = 0 safety = 0 while allocated < budget and safety < 10_000: i = int(order[idx % n]) if alloc[i] < max_frames_per_segment: alloc[i] += 1 allocated += 1 idx += 1 safety += 1 # If we somehow exceeded budget due to caps/floor interplay, trim from lowest utility. if allocated > budget: order = np.argsort(utilities) # ascending utility idx = 0 safety = 0 while allocated > budget and safety < 10_000: i = int(order[idx % n]) if alloc[i] > 0 and alloc[i] > min_frames_per_segment: alloc[i] -= 1 allocated -= 1 idx += 1 safety += 1 return alloc def pick_indices_for_segment(seg: Segment, k: int) -> List[int]: """Pick k indices in [seg.start, seg.end] over the score-step domain. Note: scores are defined between frames; we later map these to actual frames. """ if k <= 0 or seg.length <= 0: return [] if k == 1: return [seg.start] # Evenly spaced across [start, end-1] xs = np.linspace(seg.start, seg.end - 1, num=k) idxs = sorted({int(round(x)) for x in xs}) # Ensure exactly k by filling gaps if rounding collapsed points. while len(idxs) < k: # Insert midpoints between existing points. candidates = [] for a, b in zip(idxs, idxs[1:]): if b - a >= 2: candidates.append((a + b) // 2) if not candidates: # Fall back: walk forward. x = idxs[-1] if x + 1 < seg.end: idxs.append(x + 1) else: break else: for c in candidates: if c not in idxs and seg.start <= c < seg.end: idxs.append(c) if len(idxs) >= k: break idxs = sorted(idxs) # Trim if we overshot. return idxs[:k] def select_keyframe_indices(segments: List[Segment], alloc: List[int], stride: int = 1) -> List[int]: """Return concrete frame indices (0-based) to extract from the video.""" chosen: List[int] = [] for seg, k in zip(segments, alloc): step_idxs = pick_indices_for_segment(seg, k) # Map score-step domain to frame indices. # score i corresponds to diff between frame i and i+stride; # picking frame i is a reasonable representative. for si in step_idxs: chosen.append(si * stride) chosen = sorted(set(chosen)) return chosen def extract_frames(video_path: str, frame_indices: List[int], out_dir: str) -> None: os.makedirs(out_dir, exist_ok=True) cap = cv2.VideoCapture(video_path) if not cap.isOpened(): raise RuntimeError(f"Failed to open video: {video_path}") frame_set = set(frame_indices) max_idx = max(frame_set) if frame_set else -1 idx = 0 saved = 0 while idx <= max_idx: ok, frame = cap.read() if not ok: break if idx in frame_set: path = os.path.join(out_dir, f"frame_{idx:06d}.jpg") ok2 = cv2.imwrite(path, frame) if not ok2: raise RuntimeError(f"Failed to write: {path}") saved += 1 idx += 1 cap.release() if saved == 0 and frame_indices: raise RuntimeError("No frames were saved; check indices and video decoding") def main() -> None: ap = argparse.ArgumentParser() ap.add_argument("--video", required=True, help="Path to input video") ap.add_argument("--out", required=True, help="Output directory for keyframes") ap.add_argument("--budget", type=int, default=60, help="Total keyframes to extract") ap.add_argument("--stride", type=int, default=2, help="Compare every Nth frame for scoring") ap.add_argument("--window", type=int, default=8, help="Rolling window for segmentation") ap.add_argument("--hot", type=float, default=0.030, help="Enter hot segment threshold") ap.add_argument("--cold", type=float, default=0.020, help="Exit hot segment threshold") args = ap.parse_args() cap = cv2.VideoCapture(args.video) if not cap.isOpened(): raise RuntimeError(f"Failed to open video: {args.video}") scores, frames_read = compute_scores(cap, stride=args.stride) cap.release() segments = segment_scores(scores, window=args.window, hot_thresh=args.hot, cold_thresh=args.cold) alloc = allocate_frames(segments, budget=args.budget, min_frames_per_segment=1, max_frames_per_segment=max(2, args.budget)) keyframes = select_keyframe_indices(segments, alloc, stride=args.stride) # Keep within a hard limit (rounding/uniqueness can change count). if len(keyframes) > args.budget: keyframes = keyframes[: args.budget] extract_frames(args.video, keyframes, args.out) print(f"frames_read={frames_read}") print(f"scores={len(scores)} segments={len(segments)}") print(f"budget={args.budget} selected={len(keyframes)}") if segments: hot_share = sum(1 for s in segments if s.score > args.hot) / len(segments) print(f"segment_hot_share={hot_share:.2f}") if __name__ == "__main__": main() このシナリオは意図的に直接: 安定したセグメントリストを作成します。 それはあなたがあなたのフレーム予算を決して超えることを保証します。 それは、ダウンストリーム分析のための決定的なフレームファイルを書きます。 私の生産サービスでは、抽出されたフレームは、マルチモダル呼び出し(Gemini via 私の依存症)で、結果として得られた分析パイロットは、サインしたWebhookを介してNext.jsアプリに送信されます。 google-generativeai 実践的な調節ノート(最初のデモ後に重要なもの) 形が働いたら、勝利は醜い現実世界のレコーディングの下の調節行動から来る。 1)あなたのコンテンツに合うステップを選択する あなたが30FPSのレコーディングで各フレームをスコアする場合、あなたはどこにでも小さなコースターの動きを見つけるでしょう。 2〜5のステップはスクリーンレコーディングの良い出発点です. あなたはまだUIの変更に敏感ですが、サブフレームの騒音を抑制します。 2) ヒステレスはセグメントのフリッカーを防ぐ 利用 そして (二つの値下げ)は重要です. 単一の値下げで、あなたは絶え間なくカットの周りでホット / 冷たい間をボンクします。 hot_thresh cold_thresh ヒステレスはあなたに安定したセグメントを与え、予算を予測可能に振る舞う。 3)床とキャップはオプションではありません。 床がなければ、短いセグメントはゼロまで丸められ、あなたが気にかけた正確なマイクロブレストを逃すことができます。 キャップがなければ、長い「忙しい」セグメントがあなたの予算全体を消費し、残りのワークフローからコンテキストを失うことができます。 4)常に送信するバイトをサインする サインしたら 送る , HTTP ライブラリは、あなたが署名したものとは異なるキーの順序 / スペースで連続化する可能性があります. That produces intermittent verification failures that feel like ghosts. json.dumps(payload) json=payload (実際のバイトを署名する) ) バグのすべてのクラスを排除します。ここで重要なのは、分析パイロット - 慎重に選択したキーフレームを運ぶもの - パイプラインで最も高価なアーティファクトです。 data=body_bytes なぜこのデザインは生産にスケール サンプラーは2つの厳しい制約を尊重しているため機能します。 選択されたフレームではなくビデオの長さを持つコストスケール 一旦予算が固定されると、高価なマルチモダルステップには上限があります。 Runtime remains predictable. Scoring is a linear pass; segmentation is a linear pass; allocation is linear with tiny constant factors. ランタイムは予測可能である。 最も重要なことは、サンプラーが私のシステムを操作できるようなものとして振る舞うことです。 12分間のビデオが「長すぎる」かどうかを論じる代わりに、私はどれだけのフレームを購入したいかを決めます。 それが全体のポイントです:規律的な割り当ては公正なサンプリングを打ち負います。