Ho costruito il mio strumento di analisi dello schermo per un motivo: volevo qualcosa di brutalmente pratico che possa guardare a una registrazione dello schermo, dedurre ciò che è successo come un flusso di lavoro e quindi trasformare quella comprensione in artefatti di automazione (flussi n8n, elenchi di passaggi, riassunti strutturati, l'intero pipeline). La parte costosa non è la generazione di JSON o il rendering di un rapporto. È il passo di comprensione multimodale - ogni fotogramma aggiuntivo inviato a un modello è denaro reale. Le registrazioni dello schermo sono la distribuzione di input peggiore per il campionamento a tasso fisso: lunghe distanze di un UI statico, poi micro-burst improvvisi in cui l'utente fa clic, digitare due caratteri, un dropdown si apre, un flash modale o uno scambio di schede. Così ho smesso di trattare i quadri come "punti di dati" e ho iniziato a trattarli come un budget. Cosa è andato storto prima (il fallimento che ha costretto il cambiamento) Il mio primo taglio è stato l'ovvio: campione ogni Nth frame. Questa versione ha fallito in due modi, e entrambi erano visibili nel prodotto. Un caricamento tipico nel mio dashboard è una registrazione di 6-12 minuti. In molti di questi record, l'utente si ferma a pensare, legge una pagina, o il cursore si siede lì. il campionamento uniforme brucia fortunatamente frames su quei minuti di nulla. Failure #1: it wasted frames on dead air. L'incidente peggiore (quello che mi ha fatto riscrivere il campione) è stata una breve registrazione del flusso di lavoro dell'amministratore in cui un dropdown delle autorizzazioni critiche si è aperto e chiuso rapidamente. Il campione uniforme ha catturato i fotogrammi poco prima e poco dopo il dropdown - quindi il modello non ha mai visto la scelta delle autorizzazioni. Failure #2: it missed the “blink-and-you-miss-it” UI moments. Quella combinazione - pagare per il tempo statico mentre manca ancora l'azione effettiva - era intollerabile. avevo bisogno di un campione che spende i fotogrammi dove il video sta cambiando e smette di pagare il noleggio sull'interfaccia utente statica. L'idea chiave: spendere frames dove le informazioni sono L'approccio ingenuo è "esempio ogni Nth frame." Si sente giusto. Una registrazione dello schermo non è un film. È più vicino a un registro: stati stabili lunghi punteggiati da brevi transizioni. Si paga troppo per un UI stabile. Si sottofondano brevi esplosioni dove il flusso di lavoro si verifica effettivamente. Una analogia (lo userò una volta e continuerò): campionare in modo uniforme è come pagare a ogni squadra lo stesso bonus indipendentemente dall'impatto. La soluzione è quella di misurare i cambiamenti a buon mercato, raggruppare il tempo in segmenti e assegnare un budget chiave fisso a questi segmenti. Architettura del tempo di esecuzione: dove si trova il sampler Operativamente, il sistema è diviso in due parti cooperative: Un servizio di analisi Python (distribuito come servizio Cloud Run) che elabora il video caricato, seleziona i quadri chiave, esegue l'analisi multimodale e produce un carico utile dei risultati strutturato. Un'app Next.js che riceve il risultato tramite webhook e lo persiste (e guida l'interfaccia utente del dashboard). Il campione vive all'interno dell'analizzatore, prima della chiamata del modello costoso. Il punto non ovvio: il campione non è una “buona ottimizzazione”. è una superficie di controllo. Converte “quanto è lungo questo video?” in “quanta analisi voglio pagare?” L'analista invia i risultati indietro all'applicazione Next.js tramite un webhook firmato - HMAC-SHA256 sopra gli esatti byte JSON, verificati con L'importante lezione qui: firmare i byte che effettivamente inviate ( ), non una dicitura di Python che viene ri-serializzata dalla libreria HTTP. Che mismatch produce fallimenti di verifica intermittenti che si sentono come fantasmi. Il payload lascia l’analista. timingSafeEqual data=body_bytes prima Adaptive Keyframe Sampling: punteggio → segmento → assegnazione → selezione degli indici Il sampler è un tubo a quattro fasi: Il punteggio cambia a basso costo per frame (o per passo di frame). Segmentare la timeline in “principalmente stabile” e “alto cambiamento” corsi. Assegnare un budget di keyframe fisso su segmenti con guardrails. Scegliere indici di frame concreti all'interno di ogni segmento. Questa struttura è ciò che lo rende pratico. Il punteggio è economico. La segmentazione è lineare. L'allocazione è prevedibile. Il passo di estrazione è meccanico. Fase 1 – Scoring: cambiamento visivo a basso costo Sono aggressivo nel mantenere il punteggio a buon mercato.Se il punteggio costa troppo, sto semplicemente spostando il conto prima nel pipeline. Il segnale di base più affidabile per le registrazioni dello schermo è : frame difference energy Convertire in grigio. Calcola la differenza assoluta tra i quadri consecutivi. Prendi la media dell'immagine diff (opzionale normalizzare). Questo cattura: Movimento Cursore Tipping (blinking caret e aggiornamenti di testo) Dropdowns e modali Pagine di transizione Hover cambiamenti di stato Non è perfetto, ma è veloce e correlato bene con “qualcosa è successo”. Fase 2 – Formazione di segmenti: trasformare un flusso di punteggio rumoroso in corse I punteggi per frame sono spicy. non voglio che l'allocatore inseguisca il rumore. Quindi segmento utilizzando una semplice macchina di stato con isteresi: Mantenere un punteggio medio rotante. Transizione in un segmento “caldo” quando il punteggio di rotolamento sale al di sopra di una soglia. Ritorno al “frio” quando scende al di sotto di una soglia inferiore. Imporre una lunghezza minima di segmento in modo da non creare centinaia di micro-segmenti. Questa non è la rilevazione accademica del punto di cambiamento. è l'ingegneria: comportamento stabile, sovrapposizione bassa e uscita prevedibile. Fase 3 – Assegnazione del bilancio con guardaroba L'allocazione proporzionale pura non è sufficiente. fallisce nell'arrotondamento e può affamare brevi segmenti. Quindi il mio allocatore ha tre regole: Ogni segmento riceve un piano (min_frames_per_segment). Nessun segmento può superare un limite (max_frames_per_segment). Il bilancio rimanente viene distribuito proporzionalmente al segmento di utilità. Questo rende l'allocazione stabile e previene le patologie. Fase 4 – Selezione degli indici all’interno dei segmenti Una volta che a un segmento sono stati assegnati frames K, scelgo gli indici K distribuiti in tutto il segmento: Inserisci sempre l’inizio del segmento (importanza delle transizioni). Include sempre la fine del segmento (questioni di stato finale). Riempire il resto con indici uniformemente spaziati. Se ho bisogno di più fedeltà più tardi, posso pregiudicare i massimi locali del punteggio, ma la selezione uniformemente spaziata è una base forte e mantiene il codice diretto. Implementazione eseguibile completa (scoring + segmentazione + allocazione + estrazione) Per rendere questo post copiabile, ecco un singolo script Python che puoi eseguire contro qualsiasi MP4. Seleziona i frame keyframes in modo adattivo e li scrive in un directory di uscita. Le dipendenze : Apertura Python Numerosi Correre di: python adaptive_keyframes.py --video input.mp4 --budget 60 --out ./keyframes Ecco il : adaptive_keyframes.py import argparse import os from dataclasses import dataclass from typing import List, Tuple import cv2 import numpy as np @dataclass class Segment: start: int # inclusive frame index end: int # exclusive frame index score: float @property def length(self) -> int: return max(0, self.end - self.start) def frame_diff_score(prev_bgr: np.ndarray, curr_bgr: np.ndarray) -> float: """Cheap per-frame change score in [0, 1] (roughly). Uses grayscale mean absolute difference normalized by 255. """ prev_gray = cv2.cvtColor(prev_bgr, cv2.COLOR_BGR2GRAY) curr_gray = cv2.cvtColor(curr_bgr, cv2.COLOR_BGR2GRAY) diff = cv2.absdiff(prev_gray, curr_gray) return float(diff.mean() / 255.0) def compute_scores( cap: cv2.VideoCapture, stride: int = 1, max_frames: int | None = None, ) -> Tuple[List[float], int]: """Return (scores, total_frames_read). scores[i] is the change score between frame i and i+stride (based on sampled reads). """ scores: List[float] = [] ok, prev = cap.read() if not ok: return scores, 0 frame_idx = 1 frames_read = 1 while True: # Skip stride-1 frames between comparisons. for _ in range(stride - 1): ok = cap.grab() if not ok: return scores, frames_read frame_idx += 1 frames_read += 1 if max_frames is not None and frames_read >= max_frames: return scores, frames_read ok, curr = cap.read() if not ok: return scores, frames_read frames_read += 1 s = frame_diff_score(prev, curr) scores.append(s) prev = curr frame_idx += 1 if max_frames is not None and frames_read >= max_frames: return scores, frames_read def segment_scores( scores: List[float], window: int = 8, hot_thresh: float = 0.030, cold_thresh: float = 0.020, min_len: int = 12, ) -> List[Segment]: """Convert per-step scores into segments with a utility score. Uses a rolling mean with hysteresis to avoid segment flicker. """ if not scores: return [] # Rolling mean via cumulative sum. x = np.array(scores, dtype=np.float32) c = np.cumsum(np.insert(x, 0, 0.0)) def roll_mean(i: int) -> float: j0 = max(0, i - window + 1) n = i - j0 + 1 return float((c[i + 1] - c[j0]) / n) segments: List[Segment] = [] state_hot = False seg_start = 0 seg_scores: List[float] = [] for i in range(len(scores)): rm = roll_mean(i) if state_hot: seg_scores.append(scores[i]) if rm < cold_thresh: # Close hot segment at i+1 seg_end = i + 1 if seg_end - seg_start < min_len: # Too short: merge into previous if possible, else keep. pass segments.append(Segment(seg_start, seg_end, float(np.mean(seg_scores) if seg_scores else 0.0))) # Start cold state_hot = False seg_start = seg_end seg_scores = [] else: if rm > hot_thresh: # Close cold segment seg_end = i + 1 cold_score = float(np.mean(scores[seg_start:seg_end]) if seg_end > seg_start else 0.0) segments.append(Segment(seg_start, seg_end, cold_score)) # Start hot state_hot = True seg_start = seg_end seg_scores = [] # Close tail tail_end = len(scores) if tail_end > seg_start: tail_score = float(np.mean(scores[seg_start:tail_end])) segments.append(Segment(seg_start, tail_end, tail_score)) # Merge very short segments to keep output stable. merged: List[Segment] = [] for seg in segments: if not merged: merged.append(seg) continue if seg.length < min_len: prev = merged[-1] combined = Segment(prev.start, seg.end, (prev.score * prev.length + seg.score * seg.length) / max(1, (prev.length + seg.length))) merged[-1] = combined else: merged.append(seg) # One more pass: ensure non-empty and strictly increasing. cleaned: List[Segment] = [] for seg in merged: if seg.length <= 0: continue if cleaned and seg.start < cleaned[-1].end: seg = Segment(cleaned[-1].end, seg.end, seg.score) if seg.length > 0: cleaned.append(seg) return cleaned def allocate_frames( segments: List[Segment], budget: int, min_frames_per_segment: int = 1, max_frames_per_segment: int = 30, ) -> List[int]: """Allocate keyframes to segments using floor + proportional + cap.""" if budget <= 0 or not segments: return [] n = len(segments) min_total = min_frames_per_segment * n # If budget is smaller than the floor, distribute 1-by-1. if min_total >= budget: alloc = [0] * n for i in range(budget): alloc[i % n] += 1 return alloc utilities = np.array([max(0.0, s.score) for s in segments], dtype=np.float64) total_u = float(utilities.sum()) alloc = [min_frames_per_segment] * n remaining = budget - min_total if total_u == 0.0: raw = np.full(n, remaining / n, dtype=np.float64) else: raw = utilities * (remaining / total_u) # Add integer parts. for i in range(n): add = int(raw[i]) alloc[i] = min(max_frames_per_segment, alloc[i] + add) allocated = sum(alloc) # Distribute leftover by fractional parts, respecting caps. if allocated < budget: frac = raw - np.floor(raw) order = np.argsort(-frac) # descending fractional idx = 0 safety = 0 while allocated < budget and safety < 10_000: i = int(order[idx % n]) if alloc[i] < max_frames_per_segment: alloc[i] += 1 allocated += 1 idx += 1 safety += 1 # If we somehow exceeded budget due to caps/floor interplay, trim from lowest utility. if allocated > budget: order = np.argsort(utilities) # ascending utility idx = 0 safety = 0 while allocated > budget and safety < 10_000: i = int(order[idx % n]) if alloc[i] > 0 and alloc[i] > min_frames_per_segment: alloc[i] -= 1 allocated -= 1 idx += 1 safety += 1 return alloc def pick_indices_for_segment(seg: Segment, k: int) -> List[int]: """Pick k indices in [seg.start, seg.end] over the score-step domain. Note: scores are defined between frames; we later map these to actual frames. """ if k <= 0 or seg.length <= 0: return [] if k == 1: return [seg.start] # Evenly spaced across [start, end-1] xs = np.linspace(seg.start, seg.end - 1, num=k) idxs = sorted({int(round(x)) for x in xs}) # Ensure exactly k by filling gaps if rounding collapsed points. while len(idxs) < k: # Insert midpoints between existing points. candidates = [] for a, b in zip(idxs, idxs[1:]): if b - a >= 2: candidates.append((a + b) // 2) if not candidates: # Fall back: walk forward. x = idxs[-1] if x + 1 < seg.end: idxs.append(x + 1) else: break else: for c in candidates: if c not in idxs and seg.start <= c < seg.end: idxs.append(c) if len(idxs) >= k: break idxs = sorted(idxs) # Trim if we overshot. return idxs[:k] def select_keyframe_indices(segments: List[Segment], alloc: List[int], stride: int = 1) -> List[int]: """Return concrete frame indices (0-based) to extract from the video.""" chosen: List[int] = [] for seg, k in zip(segments, alloc): step_idxs = pick_indices_for_segment(seg, k) # Map score-step domain to frame indices. # score i corresponds to diff between frame i and i+stride; # picking frame i is a reasonable representative. for si in step_idxs: chosen.append(si * stride) chosen = sorted(set(chosen)) return chosen def extract_frames(video_path: str, frame_indices: List[int], out_dir: str) -> None: os.makedirs(out_dir, exist_ok=True) cap = cv2.VideoCapture(video_path) if not cap.isOpened(): raise RuntimeError(f"Failed to open video: {video_path}") frame_set = set(frame_indices) max_idx = max(frame_set) if frame_set else -1 idx = 0 saved = 0 while idx <= max_idx: ok, frame = cap.read() if not ok: break if idx in frame_set: path = os.path.join(out_dir, f"frame_{idx:06d}.jpg") ok2 = cv2.imwrite(path, frame) if not ok2: raise RuntimeError(f"Failed to write: {path}") saved += 1 idx += 1 cap.release() if saved == 0 and frame_indices: raise RuntimeError("No frames were saved; check indices and video decoding") def main() -> None: ap = argparse.ArgumentParser() ap.add_argument("--video", required=True, help="Path to input video") ap.add_argument("--out", required=True, help="Output directory for keyframes") ap.add_argument("--budget", type=int, default=60, help="Total keyframes to extract") ap.add_argument("--stride", type=int, default=2, help="Compare every Nth frame for scoring") ap.add_argument("--window", type=int, default=8, help="Rolling window for segmentation") ap.add_argument("--hot", type=float, default=0.030, help="Enter hot segment threshold") ap.add_argument("--cold", type=float, default=0.020, help="Exit hot segment threshold") args = ap.parse_args() cap = cv2.VideoCapture(args.video) if not cap.isOpened(): raise RuntimeError(f"Failed to open video: {args.video}") scores, frames_read = compute_scores(cap, stride=args.stride) cap.release() segments = segment_scores(scores, window=args.window, hot_thresh=args.hot, cold_thresh=args.cold) alloc = allocate_frames(segments, budget=args.budget, min_frames_per_segment=1, max_frames_per_segment=max(2, args.budget)) keyframes = select_keyframe_indices(segments, alloc, stride=args.stride) # Keep within a hard limit (rounding/uniqueness can change count). if len(keyframes) > args.budget: keyframes = keyframes[: args.budget] extract_frames(args.video, keyframes, args.out) print(f"frames_read={frames_read}") print(f"scores={len(scores)} segments={len(segments)}") print(f"budget={args.budget} selected={len(keyframes)}") if segments: hot_share = sum(1 for s in segments if s.score > args.hot) / len(segments) print(f"segment_hot_share={hot_share:.2f}") if __name__ == "__main__": main() Questo scenario è intenzionalmente diretto: Crea una lista di segmenti stabili. Questo significa che non supererai mai il tuo budget. Scrive file frame deterministici per l'analisi in basso. Nel mio servizio di produzione, i telai estratti alimentano la chiamata multimodale (Gemini via nelle mie dipendenze), e il carico utile dell'analisi risultante viene inviato indietro all'app Next.js tramite webhook firmato. google-generativeai Note di tuning pratiche (le cose che contano dopo la prima demo) Una volta che la forma funziona, i guadagni provengono dal comportamento di tuning sotto brutte registrazioni del mondo reale. 1) Scegli un passo che corrisponda al tuo contenuto Se segni ogni fotogramma su una registrazione a 30 FPS, troverai piccoli movimenti del cursore ovunque. Un passo di 2–5 è un buon punto di partenza per le registrazioni dello schermo. Sei ancora sensibile ai cambiamenti dell'interfaccia utente, ma sopprime il rumore del subframe. 2) L'isteresi previene il segmento di scintilla usare e (due soglie) importano.Con una sola soglia, si ripresenterà tra caldo / freddo costantemente intorno al taglio. hot_thresh cold_thresh L'isteresi ti dà segmenti stabili e rende i budget comportarsi in modo prevedibile. 3) I pavimenti e le coperte non sono facoltativi Senza un pavimento, brevi segmenti possono arrotondare a zero e ti mancherà l'esatto micro-burst che ti importava. Senza una copertura, un lungo segmento "occupato" può consumare l'intero budget e perderai il contesto dal resto del flusso di lavoro. 4) Significa sempre i byte che invia Se si firma Ma inviare , la libreria HTTP può serializzarsi con un ordine chiave/spazzatura diverso da quello che hai firmato. che produce fallimenti di verifica intermittenti che sembrano fantasmi. json.dumps(payload) json=payload Sottoscrivere i byte effettivi ( Questo è importante qui perché il carico utile dell'analisi - quello che trasporta i tuoi frame scelti con cura - è l'artefatto più costoso nel pipeline. data=body_bytes Perché questo design scala nella produzione Il campione funziona perché rispetta due restrizioni difficili: Scale di costo con fotogrammi selezionati, non lunghezza video. Una volta fissato il budget, il passo multimodale costoso ha un limite superiore. Il punteggio è un passaggio lineare; la segmentazione è un passaggio lineare; l'allocazione è lineare con piccoli fattori costanti. Il più importante: il campione fa comportarsi il mio sistema come qualcosa che posso gestire. Invece di discutere se un video di 12 minuti è “troppo lungo”, decido quanti fotogrammi sono disposto a comprare.L’analista spende quel budget sulle parti della registrazione che realmente cambiano, e smette di pagare per l’interfaccia utente statica. Questo è l'intero punto: l'assegnazione disciplinata batte il campionamento equo.