A képernyőanalízis eszközt egy okból építettem fel: valami brutálisan praktikus dolgot akartam, amely megnézheti a képernyőfelvételt, következtetni, hogy mi történt munkafolyamatként, majd ezt a megértést automatizálási tárgyakká (n8n áramlások, lépéslisták, strukturált összefoglalók, az egész csővezeték). A drága része nem a JSON generálása vagy a jelentés megjelenítése. Ez a multimodális megértési lépés - minden extra keret, amelyet egy modellnek küldenek, valódi pénz. A képernyőfelvételek a legrosszabb helyzetű bemeneti eloszlás a rögzített arányú mintavételhez: egy statikus felhasználói felület hosszú szakaszai, majd hirtelen mikro-robbanások, ahol a felhasználó kattint, két karaktert ír be, egy dropdown megnyílik, egy modal flashes, vagy egy tab swaps. Tehát abbahagytam a kereteket, mint „adatpontokat”, és elkezdtem őket költségvetésként kezelni. Mi ment rosszul először (a kudarc, amely kényszerítette a változást) Az első vágásom a nyilvánvaló volt: mintavétel minden Nth keret. Ez a verzió kétféleképpen kudarcot vallott, és mindkettő látható volt a termékben. Egy tipikus feltöltése az én műszerfal egy 6-12 perces felvétel. Sok ilyen felvételek, a felhasználó szünetelt gondolkodni, olvassa el egy oldalt, vagy a kurzor csak ott ül. egységes mintavétel szerencsére égő kereteket azokban a percekben a semmi. Failure #1: it wasted frames on dead air. A legrosszabb eset (ami miatt újraírtam a mintavételt) egy rövid admin munkafolyamat rögzítése volt, ahol egy kritikus engedélycsökkentés gyorsan megnyílt és bezáródott.Az egységes mintavételt közvetlenül a csökkentés előtt és után rögzítették – így a modell soha nem látta az engedély kiválasztását. Failure #2: it missed the “blink-and-you-miss-it” UI moments. Ez a kombináció – statikus időért fizetve, miközben még mindig hiányzik a tényleges akció – elviselhetetlen volt. Szükségem volt egy mintavételezőre, amely képeket tölt, ahol a videó változik, és nem fizet bérleti díjat a statikus felhasználói felületre. A kulcs ötlet: töltsön kereteket, ahol az információ található A naiv megközelítés az, hogy „mintát minden Nth keret.” Ez úgy érzi, tisztességes. A képernyőfelvétel nem film. Közelebb van a könyvtárhoz: hosszú, stabil állapotok rövid átmenetekkel. Túlfizeted a stabil UI-t. Rövid kitörések, ahol a munkafolyamat valóban megtörténik. Egy analógia (egy alkalommal fogom használni, és továbblépni): a mintavétel egyenletesen olyan, mintha minden csapatnak ugyanazt a bónuszt fizetné, függetlenül a hatástól. A megoldás az, hogy olcsón mérjük a változásokat, csoportosítsuk az időt szegmensekbe, és egy rögzített billentyűzet költségvetést osztunk ki ezekre a szegmensekre. Runtime architektúra: ahol a mintázó ül Működési szempontból a rendszer két együttműködő részre oszlik: A Python elemzési szolgáltatás (felhő futtatási szolgáltatásként telepítve), amely feldolgozza a feltöltött videót, kiválasztja a billentyűzeteket, futtatja a multimodális elemzést, és strukturált eredményt termel. Egy Next.js alkalmazás, amely a webhook segítségével kapja meg az eredményt, és továbbra is tartja (és meghajtja a műszerfal felhasználói felületét). A mintavételező az elemző belsejében él, a drága modellhívás előtt. A nem nyilvánvaló pont: a mintavételi nem egy „szép optimalizálás”. Ez egy vezérlőfelület. Ez átalakítja „mennyi ideig tart ez a videó?” „mennyi elemzést szeretnék fizetni?” Az elemző eredményeket küld vissza a Next.js alkalmazásba egy aláírt webhook-on keresztül – a HMAC-SHA256 a pontos JSON-bajtokon keresztül, ellenőrizve a a fogadó végén. a fontos lecke ott: írja alá a valóban elküldött bájtokat ( ), nem egy Python-dikt, amelyet a HTTP-könyvtár újraszerializál. Ez a kiegyensúlyozatlanság időszakos ellenőrzési hibákat eredményez, amelyek szellemeknek tűnnek. De a webhook varrás egy másik poszt - ami itt számít, az történik A payload elhagyja az elemzőt. timingSafeEqual data=body_bytes Előtte Adaptive keyframe sampling: score → segment → allocate → pick indexek A minta egy négylépcsős csővezeték: A pontszám olcsón változik keretenként (vagy keretenkénti lépésenként). Segmentálja az idővonalat a „többnyire stabil” és a „magas változás” futásokra. Helyezzen ki egy rögzített billentyűzet költségvetést a szegmensek között. Válasszon konkrét keretindexeket minden szegmensben. Ez a szerkezet az, ami praktikus. A pontszám olcsó. A szegmentáció lineáris. Az elosztás kiszámítható. Az extrakciós lépés mechanikus. 1. szakasz – A pontszám: olcsó vizuális változás Agresszív vagyok a pontszámok olcsó megtartásában. Ha a pontszámok túl sokba kerülnek, csak a csővezetékben korábban mozgatom a számlát. A legmegbízhatóbb bázisjel a képernyőfelvételhez : frame difference energy Átalakítsa szürke színre. Az egymást követő képek közötti abszolút különbség kiszámítása. Vegyük a diff kép átlagát (opcionálisan normalizálni). Ezek a fogások: Cursor mozgás Írás (blinking caret és szöveges frissítések) Dropdowns és modals Oldal átmenetek Hover állami változások Nem tökéletes, de gyors és jól korrelál a „valami történt”. 2. szakasz – szegmensképződés: egy zajos pontszám áramlása futássá alakul A per-frame pontszámok csúszósak. Nem akarom, hogy az allocator a zajt üldözze. Tehát egy egyszerű állami gépet használok a hisztériával: Tartson fenn egy gördülő átlagot. Átmenet egy „forró” szegmensbe, amikor a gördülő pontszám egy küszöb felett emelkedik. Visszatérés a „hidegre”, amikor az alacsonyabb küszöb alatt van. Alkalmazzon minimális szegmens hosszát, hogy ne hozzon létre több száz mikro-segmentet. Ez nem egy akadémiai változási pont észlelése. Ez a mérnöki: stabil viselkedés, alacsony hangolás és kiszámítható kimenet. 3. szakasz – Költségvetés elosztása őrjáratokkal A tiszta arányos elosztás nem elég. A lekerekítésben nem sikerül, és rövid szegmenseket éhezhet. Tehát az én allokátoromnak három szabálya van: Minden szegmensnek van egy szintje (min_frames_per_segment). Semmilyen szegmens nem haladhatja meg a határértéket (max_frames_per_segment). A fennmaradó költségvetés a segmentekhez arányosan oszlik el. Ez stabilizálja az elosztást és megakadályozza a patológiákat. 4. lépés - Index kiválasztása a szegmenseken belül Miután egy szegmenshez K-kereteket rendeltek, a szegmensben elosztott K-indexeket választom: Mindig tartalmazza a szegmens kezdetét (átmeneti kérdés). Mindig tartalmazza a szegmens végét (végső állapot kérdései). Töltse ki a többi egyenletesen elosztott indexekkel. Ha később több hűségre van szükségem, előítéletet tehetek a pontszám helyi maximuma felé, de az egyenletesen elosztott kiválasztás erős kiindulópont, és a kódot egyértelműen tartja. Teljes körű végrehajtás (pontszám + szegmentáció + allokáció + kivonás) Ahhoz, hogy ezt a bejegyzést másolhatóvá tegye, itt van egy egyetlen Python-szkript, amelyet bármely MP4 ellen futtathat. adaptív módon kiválasztja a billentyűzeteket, és írja őket egy kimeneti könyvtárba. A függőségek : Nyitólap Python Numbál A futás: python adaptive_keyframes.py --video input.mp4 --budget 60 --out ./keyframes Itt az : adaptive_keyframes.py import argparse import os from dataclasses import dataclass from typing import List, Tuple import cv2 import numpy as np @dataclass class Segment: start: int # inclusive frame index end: int # exclusive frame index score: float @property def length(self) -> int: return max(0, self.end - self.start) def frame_diff_score(prev_bgr: np.ndarray, curr_bgr: np.ndarray) -> float: """Cheap per-frame change score in [0, 1] (roughly). Uses grayscale mean absolute difference normalized by 255. """ prev_gray = cv2.cvtColor(prev_bgr, cv2.COLOR_BGR2GRAY) curr_gray = cv2.cvtColor(curr_bgr, cv2.COLOR_BGR2GRAY) diff = cv2.absdiff(prev_gray, curr_gray) return float(diff.mean() / 255.0) def compute_scores( cap: cv2.VideoCapture, stride: int = 1, max_frames: int | None = None, ) -> Tuple[List[float], int]: """Return (scores, total_frames_read). scores[i] is the change score between frame i and i+stride (based on sampled reads). """ scores: List[float] = [] ok, prev = cap.read() if not ok: return scores, 0 frame_idx = 1 frames_read = 1 while True: # Skip stride-1 frames between comparisons. for _ in range(stride - 1): ok = cap.grab() if not ok: return scores, frames_read frame_idx += 1 frames_read += 1 if max_frames is not None and frames_read >= max_frames: return scores, frames_read ok, curr = cap.read() if not ok: return scores, frames_read frames_read += 1 s = frame_diff_score(prev, curr) scores.append(s) prev = curr frame_idx += 1 if max_frames is not None and frames_read >= max_frames: return scores, frames_read def segment_scores( scores: List[float], window: int = 8, hot_thresh: float = 0.030, cold_thresh: float = 0.020, min_len: int = 12, ) -> List[Segment]: """Convert per-step scores into segments with a utility score. Uses a rolling mean with hysteresis to avoid segment flicker. """ if not scores: return [] # Rolling mean via cumulative sum. x = np.array(scores, dtype=np.float32) c = np.cumsum(np.insert(x, 0, 0.0)) def roll_mean(i: int) -> float: j0 = max(0, i - window + 1) n = i - j0 + 1 return float((c[i + 1] - c[j0]) / n) segments: List[Segment] = [] state_hot = False seg_start = 0 seg_scores: List[float] = [] for i in range(len(scores)): rm = roll_mean(i) if state_hot: seg_scores.append(scores[i]) if rm < cold_thresh: # Close hot segment at i+1 seg_end = i + 1 if seg_end - seg_start < min_len: # Too short: merge into previous if possible, else keep. pass segments.append(Segment(seg_start, seg_end, float(np.mean(seg_scores) if seg_scores else 0.0))) # Start cold state_hot = False seg_start = seg_end seg_scores = [] else: if rm > hot_thresh: # Close cold segment seg_end = i + 1 cold_score = float(np.mean(scores[seg_start:seg_end]) if seg_end > seg_start else 0.0) segments.append(Segment(seg_start, seg_end, cold_score)) # Start hot state_hot = True seg_start = seg_end seg_scores = [] # Close tail tail_end = len(scores) if tail_end > seg_start: tail_score = float(np.mean(scores[seg_start:tail_end])) segments.append(Segment(seg_start, tail_end, tail_score)) # Merge very short segments to keep output stable. merged: List[Segment] = [] for seg in segments: if not merged: merged.append(seg) continue if seg.length < min_len: prev = merged[-1] combined = Segment(prev.start, seg.end, (prev.score * prev.length + seg.score * seg.length) / max(1, (prev.length + seg.length))) merged[-1] = combined else: merged.append(seg) # One more pass: ensure non-empty and strictly increasing. cleaned: List[Segment] = [] for seg in merged: if seg.length <= 0: continue if cleaned and seg.start < cleaned[-1].end: seg = Segment(cleaned[-1].end, seg.end, seg.score) if seg.length > 0: cleaned.append(seg) return cleaned def allocate_frames( segments: List[Segment], budget: int, min_frames_per_segment: int = 1, max_frames_per_segment: int = 30, ) -> List[int]: """Allocate keyframes to segments using floor + proportional + cap.""" if budget <= 0 or not segments: return [] n = len(segments) min_total = min_frames_per_segment * n # If budget is smaller than the floor, distribute 1-by-1. if min_total >= budget: alloc = [0] * n for i in range(budget): alloc[i % n] += 1 return alloc utilities = np.array([max(0.0, s.score) for s in segments], dtype=np.float64) total_u = float(utilities.sum()) alloc = [min_frames_per_segment] * n remaining = budget - min_total if total_u == 0.0: raw = np.full(n, remaining / n, dtype=np.float64) else: raw = utilities * (remaining / total_u) # Add integer parts. for i in range(n): add = int(raw[i]) alloc[i] = min(max_frames_per_segment, alloc[i] + add) allocated = sum(alloc) # Distribute leftover by fractional parts, respecting caps. if allocated < budget: frac = raw - np.floor(raw) order = np.argsort(-frac) # descending fractional idx = 0 safety = 0 while allocated < budget and safety < 10_000: i = int(order[idx % n]) if alloc[i] < max_frames_per_segment: alloc[i] += 1 allocated += 1 idx += 1 safety += 1 # If we somehow exceeded budget due to caps/floor interplay, trim from lowest utility. if allocated > budget: order = np.argsort(utilities) # ascending utility idx = 0 safety = 0 while allocated > budget and safety < 10_000: i = int(order[idx % n]) if alloc[i] > 0 and alloc[i] > min_frames_per_segment: alloc[i] -= 1 allocated -= 1 idx += 1 safety += 1 return alloc def pick_indices_for_segment(seg: Segment, k: int) -> List[int]: """Pick k indices in [seg.start, seg.end] over the score-step domain. Note: scores are defined between frames; we later map these to actual frames. """ if k <= 0 or seg.length <= 0: return [] if k == 1: return [seg.start] # Evenly spaced across [start, end-1] xs = np.linspace(seg.start, seg.end - 1, num=k) idxs = sorted({int(round(x)) for x in xs}) # Ensure exactly k by filling gaps if rounding collapsed points. while len(idxs) < k: # Insert midpoints between existing points. candidates = [] for a, b in zip(idxs, idxs[1:]): if b - a >= 2: candidates.append((a + b) // 2) if not candidates: # Fall back: walk forward. x = idxs[-1] if x + 1 < seg.end: idxs.append(x + 1) else: break else: for c in candidates: if c not in idxs and seg.start <= c < seg.end: idxs.append(c) if len(idxs) >= k: break idxs = sorted(idxs) # Trim if we overshot. return idxs[:k] def select_keyframe_indices(segments: List[Segment], alloc: List[int], stride: int = 1) -> List[int]: """Return concrete frame indices (0-based) to extract from the video.""" chosen: List[int] = [] for seg, k in zip(segments, alloc): step_idxs = pick_indices_for_segment(seg, k) # Map score-step domain to frame indices. # score i corresponds to diff between frame i and i+stride; # picking frame i is a reasonable representative. for si in step_idxs: chosen.append(si * stride) chosen = sorted(set(chosen)) return chosen def extract_frames(video_path: str, frame_indices: List[int], out_dir: str) -> None: os.makedirs(out_dir, exist_ok=True) cap = cv2.VideoCapture(video_path) if not cap.isOpened(): raise RuntimeError(f"Failed to open video: {video_path}") frame_set = set(frame_indices) max_idx = max(frame_set) if frame_set else -1 idx = 0 saved = 0 while idx <= max_idx: ok, frame = cap.read() if not ok: break if idx in frame_set: path = os.path.join(out_dir, f"frame_{idx:06d}.jpg") ok2 = cv2.imwrite(path, frame) if not ok2: raise RuntimeError(f"Failed to write: {path}") saved += 1 idx += 1 cap.release() if saved == 0 and frame_indices: raise RuntimeError("No frames were saved; check indices and video decoding") def main() -> None: ap = argparse.ArgumentParser() ap.add_argument("--video", required=True, help="Path to input video") ap.add_argument("--out", required=True, help="Output directory for keyframes") ap.add_argument("--budget", type=int, default=60, help="Total keyframes to extract") ap.add_argument("--stride", type=int, default=2, help="Compare every Nth frame for scoring") ap.add_argument("--window", type=int, default=8, help="Rolling window for segmentation") ap.add_argument("--hot", type=float, default=0.030, help="Enter hot segment threshold") ap.add_argument("--cold", type=float, default=0.020, help="Exit hot segment threshold") args = ap.parse_args() cap = cv2.VideoCapture(args.video) if not cap.isOpened(): raise RuntimeError(f"Failed to open video: {args.video}") scores, frames_read = compute_scores(cap, stride=args.stride) cap.release() segments = segment_scores(scores, window=args.window, hot_thresh=args.hot, cold_thresh=args.cold) alloc = allocate_frames(segments, budget=args.budget, min_frames_per_segment=1, max_frames_per_segment=max(2, args.budget)) keyframes = select_keyframe_indices(segments, alloc, stride=args.stride) # Keep within a hard limit (rounding/uniqueness can change count). if len(keyframes) > args.budget: keyframes = keyframes[: args.budget] extract_frames(args.video, keyframes, args.out) print(f"frames_read={frames_read}") print(f"scores={len(scores)} segments={len(segments)}") print(f"budget={args.budget} selected={len(keyframes)}") if segments: hot_share = sum(1 for s in segments if s.score > args.hot) / len(segments) print(f"segment_hot_share={hot_share:.2f}") if __name__ == "__main__": main() Ez a forgatókönyv szándékosan közvetlen: Ez egy stabil szegmens listát eredményez. Ez garantálja, hogy soha nem haladja meg a keret költségvetését. Determinisztikus keretfájlokat ír a downstream elemzéshez. A termelési szolgáltatásomban az extrahált kereteket a multimodális hívás (Gemini a függőségeimben), és a kapott elemzési haszonterhelést a Next.js alkalmazásba küldik vissza aláírt webhook segítségével. google-generativeai Gyakorlati hangolási jegyzetek (az első demo után fontos dolgok) Miután a forma működik, a nyeremények a csúnya valós felvételek alatt történő hangolásból származnak. 1) Válasszon egy lépést, amely megfelel a tartalmának Ha minden képet 30 fps felvételen kapsz, mindenhol apró mutatómozgásokat fogsz találni. Ez nem mindig rossz, de felfújhatja a „változás” jelét. A 2–5 lépés jó kiindulópont a képernyőfelvételekhez. Még mindig érzékeny a felhasználói felület változásaira, de elnyomja az alkeret zaját. 2) A hisztéria megakadályozza a szegmensek villogását Használat és (Két küszöbérték) számít. Egyetlen küszöbértékkel a forró / hideg között állandóan visszalép a vágás körül. hot_thresh cold_thresh A hisztéria stabil szegmenseket biztosít, és a költségvetések kiszámíthatóan viselkednek. 3) A padló és a fedélzet nem opcionális A padló nélkül a rövid szegmensek nullára kerekedhetnek, és hiányozni fog a pontos mikro-robbanás, amire gondolt. Kapcsolat nélkül egy hosszú „üzemelő” szegmens elfogyaszthatja az egész költségvetést, és elveszítheti a többi munkafolyamat kontextusát. 4) Mindig írja alá az elküldött bajtokat Ha aláírja Küldje el A HTTP-könyvtár más kulcsszervezéssel/távolsággal szérializálhat, mint amit aláírtál, ami időszakos ellenőrzési hibákat okoz, amelyek szellemeknek tűnnek. json.dumps(payload) json=payload A tényleges bájtok aláírása ( Ez azért fontos itt, mert az elemzési hasznos teher – az, amely a gondosan kiválasztott billentyűzeteket hordozza – a legdrágább tárgy a csővezetékben. data=body_bytes Miért ez a tervezési skála a termelésben A mintavételező azért működik, mert tiszteletben tartja a két kemény korlátozást: Költségi skálák kiválasztott képekkel, nem videó hosszúsággal.Ha a költségvetés rögzítve van, a drága multimodális lépés felső határa van. A pontszám lineáris áthaladás; a szegmentáció lineáris áthaladás; az elosztás lineáris, apró állandó tényezőkkel. A legfontosabb: a mintavételi rendszer úgy viselkedik, mint valami, amit működtethetek. Ahelyett, hogy arról vitatkoznék, hogy egy 12 perces videó „túl hosszú”, eldöntöm, hogy hány képet akarok vásárolni.Az elemző a felvétel azon részeire költi a költségvetést, amelyek valójában megváltoznak, és nem fizet a statikus felhasználói felületért. Ez a lényeg: a fegyelmezett elosztás legyőzi a tisztességes mintavételt.