Az adatmérnökök gyakran szembesülnek azzal a kihívással, hogy az adatok nem megfelelő formátumban vannak, különösen a kéretlen karakterek és adatok, a nulla vagy üres értékek, és ami a legfontosabb, az ismétlődő adatokkal kell foglalkozniuk, amelyek hatással vannak az összes későbbi alkalmazásra, beleértve a jelentéskészítési és adattudományi modelleket. Ez nehéz napi feladattá válik a mérnökök és a támogató csapatok számára, gyorsan lemerítik erőforrásaikat anélkül, hogy produktívak lennének. A rosszul megtervezett keretrendszereknél gyakran nehéz idők várják a fejlesztőket később, hogy enyhítsék ezeket az adatjavításokat. Sok szervezetnél redundáns adatok vannak a nem hatékony adatfolyam-architektúrák miatt, ami több millió dolláros tárolási költséget jelent, az adatok többszöri újrafeldolgozása és az erőforrások rossz kihasználása miatt.
Térjünk arra a lényegre, hogy jelenlegi szerepkörében szembesült-e valaha kihívással a streaming vagy a kötegelt adatfolyamokban található duplikátumok kezelése során? Az adatmérnökök, adattudósok és adatelemzők többsége „IGEN”-t mondana. A jelenlegi világban számos eszköz létezik a duplikált adatok kijavításához egy Data Lake-ben, de milyen áron? Tudja ezeket kezelni magában az építészeti tervezési szakaszban? Sok kérdés motoszkálhat a fejedben.
Beszéljük meg részletesen, melyek azok az eszközök, amelyek segíthetnek a streaming adatok duplikációjának megszüntetésében, azok előnyei és hátrányai, valamint a beállítás és a karbantartás. Ezután mélyrehatóan foglalkozunk a streamelési folyamatban található ismétlődések kezelésének legjobb gyakorlataival és szabványaival.
Tekintsünk meg három elsődleges megközelítést a duplikáció megszüntetésére az adatfolyam-folyamatokban:
Az összes adatfolyam-folyamat különböző alkalmazásokból, például IoT-eszközökből, érzékelőkből, játékstatisztikákból, forgalmi kamerákból és sebességérzékelő eszközökből, valamint intelligens rendszerekből nyeri ki az adatokat, amelyek az autonóm járművek járműhasználati adatait közvetítik. A legtöbb ilyen rendszer általában egy mintát követ az események streameléséhez, és minden eseménynek általában egyedi azonosítója van, például egy kiskereskedelmi üzlet tranzakcióazonosítója az értékesítési tranzakcióhoz az esemény időbélyegzőjével. Egyes rendszerek általában nem rendelkeznek egyedi azonosítóval, például a sebességérzékelő eszközök általában rendelkeznek az érzékelő azonosítójával, de az esemény időbélyegén kívül az összes adatfolyam-esemény nem rendelkezik egyedi azonosítóval. Ezekben az esetekben nagy a valószínűsége, hogy ugyanazon érzékelőeszközön megkettőződnek a streamelési események.
Képzeljen el egy olyan használati esetet, amikor a járművek gyorshajtási adatainak streamelése egy államközi eszközről általában nagy mennyiségben fordul elő percenként egy mozgalmas napon. Egy másik példa az ünnepi akciók során, amikor a kiskereskedelmi vállalkozásoknak naponta több milliárd tranzakciót kell lebonyolítaniuk. Az események ilyen mennyiségének valós idejű kezelése és az adatok eltávolítása nagyon fontos a pontos jelentéskészítési és adattudományi modellek hatékony működéséhez a kiugró értékek és ismétlődések eltávolításával.
Beszéljünk technikailag, a google felhő Pub/Sub szolgáltatást nyújt, amely egy aszinkron és méretezhető üzenetküldő szolgáltatás, amely leválasztja az üzeneteket előállító szolgáltatásokat az ezeket az üzeneteket feldolgozó szolgáltatásoktól. Erősen használják az analitikai és adatintegrációs folyamatok streamelésére az adatok betöltésére és terjesztésére. Általánosan használatos felhasználói interakciós események, szerveresemények, valós idejű események feldolgozására, adatok adatbázisok közötti replikálására, vállalati eseménybuszként működik az üzleti események megosztására a szervezeten belül, valamint adatfolyamként történő adatfolyamként történő alkalmazásból, beleértve az érzékelőket és az együtt használt alkalmazáseseményeket. más Google felhőtermékekkel egy adatfolyamon keresztül.
A Pub/Sub egyszerű, de hatékony módszert kínál a duplikált adatok attribútumainak használatával történő kezelésére. A Pub/Sub témakörben minden üzenet tartalmazhat kulcs-érték párokat a metaadatokban. Ezek az adatok felhasználhatók a duplikált események azonosítására és az adatfolyamban a duplikáció megszüntetésére anélkül, hogy az adatfeldolgozási szolgáltatásokra hárulnának a terhelés, amelyek általában magasabb erőforrás-költséggel járnak, és nagymértékben lelassítják az adatfolyamot.
Az olyan üzeneteknél, amelyek egyedi mezőt tartalmaznak, például a tranzakcióazonosítót , ez az érték beállítható attribútumként az üzenetek közzétételekor. Amikor az adatfolyamban Pub/Sub üzeneteit olvassa, beállíthatja a folyamatot úgy, hogy ezzel az attribútummal szüntesse meg a duplikációt.
Ez a megoldás akkor hatékony, ha a duplikátumok a forrásalkalmazásból vagy -eszközből streamelnek a Pub/Sub témakörben található egyedi azonosító használatával. Ennek a megoldásnak az a korlátja, hogy csak az egymáshoz képest 10 percen belül közzétett duplikált üzeneteknél működik jól. Bár egyszerűen megvalósítható, de nincs méretezhetősége a Pub/Sub időablak korlátozása miatt. Ez nagyon hasznos olyan esetekben, mint a sebességtúllépő kamerák vagy érzékelők, amelyek ismétlődő eseményeket generálnak az egyes üzenetek 10 perces ablakán belül. Ez nagyszerűen működik.
Előfordulhatnak olyan esetek, amikor a kiadón belül generált ismétlődések, például a Pub/Sub az üzenetek későbbi feldolgozása miatt, vagy a Pub/Sub soha nem kapott visszaigazolást a kézbesített üzenetekről, a Pub/Sub újra megpróbálja elküldeni ugyanazt az üzenetet ugyanazzal az üzenettel. Message_id, ezáltal ismétlődő események jönnek létre a kiadóban. Ennek megoldására a Pub/Sub használatával meghatározhatjuk a hasznos adat üzenet_azonosítóját, és ezt használhatjuk azonosítóként. A Cloud DataFlow egy teljesen felügyelt szolgáltatás az adatok adatfolyam-feldolgozására a Google Cloud platformon (GCP), amely minden rekordnak pontosan egyszeri feldolgozását biztosítja. Mit jelent számunkra? - A message_id alapján azonosítja a duplikált eseményeket, és kiküszöböli azokat az adatfolyamokban történő feldolgozás során, de ritka esetekben ezek a duplikált események, amikor az adatfolyamon belüli különböző worker csomópontokon dolgozták fel őket, hatástalanul jutnak el a downstreamhez. A végén továbbra is lesznek ismétlődések az adattóban.
Az ilyen esetek kezeléséről ebben a cikkben a vége felé tovább fogunk beszélni. Koncentráljunk a többi lehetőségre a streaming adatok duplikációjának megszüntetésére.
Most már tudjuk, hogy a Pub/Sub hogyan kezeli a duplikált eseményeket. Következik ezeknek az üzeneteknek a feldolgozása a Cloud DataFlow segítségével egy Pub/Sub előfizetővel, amely a forrásalkalmazásból olvassa be a streaming üzeneteket. A Dataflow egy teljesen felügyelt szolgáltatás, amely nyílt forráskódú Apache Beam SDK-t használ a fejlett streamelési képességek lehetővé tételére. Az adatfolyam feladatonként 4000 dolgozó csomópontra skálázódik, és petabájtnyi adatot dolgozhat fel automatikus skálázási funkciókkal a jobb erőforrás-kihasználás érdekében mind a kötegelt, mind az adatfolyam-folyamatokban.
Az Apache Beam beépített Deduplicate PTransformot kínál, amely konfigurálhatóbb és robusztusabb módszert biztosít a duplikációk eltávolítására. Ez a módszer a Beam Stateful API-ját használja minden megfigyelt kulcs állapotának fenntartásához, és eltávolítja a duplikációkat egy felhasználó által meghatározott időablakon belül. Ez a megközelítés lehetővé teszi, hogy meghatározza a deduplikációs logikát az adatok vagy a teljes üzenettartalom meghatározott mezői alapján, és az eseményidő vagy a feldolgozási idő alapján konfigurálja a deduplikációt.
A funkció kipróbálásához tekintse meg a GitHubból származó minta adatfolyam-kódomat.
Itt meg kell jegyezni, hogy a kötegelt feldolgozási folyamatok mindig pontosan egyszer használják a feldolgozást, míg a streaming folyamatok alapértelmezés szerint pontosan egyszeri feldolgozást használnak, de beállíthatók legalább egyszeri feldolgozásra is. A fogás itt az, hogy megjegyezzük, hogy az adatfolyam jelenleg feldolgozott ablaka, amikor az átlép az ablakon, amikor egy ismétlődő üzenetet dolgoz fel, nem hasonlítja össze a már feldolgozottal, mivel az adatfolyam nem tárolja a rekordazonosítókat a memóriában. A Dataflow elveheti ezt az üzenetet a későn érkező adatok alapján, vagy ha az adatfolyamnak van egy másik szakasza a feldolgozatlan üzenetek rögzítésére, és a Cloud Bigquery táblájába írhat – egy teljesen felügyelt, felhőalapú adattárház a GCP-n, vagy írhat egy felhőtárhelyet – egy menedzselt szolgáltatás strukturálatlan adatok tárolására, fájlként további újrafeldolgozási és hibaelhárítási célokra.
Ez a megoldás rugalmas lehetőséget biztosít az összetett deduplikációs bejelentkezés feldolgozására, és alkalmassá teszi olyan helyzetekre, ahol a deduplikációs ablak nagyobb vagy összetettebb, mint amit a Pub/Sub kínál. A kompromisszumok közé tartozik a magasabb erőforrás-felhasználás az egyes állapotok fenntartásához a rekord egyediségének meghatározásához
Eddig azt láttuk, hogy a Publisher-hez hasonló Pub/Sub és a Cloud DataFlow integrációs szolgáltatások hogyan kezelik valós időben a duplikációkat. Úgy gondolom, hogy ezek a megoldások nem 100%-ban hatékonyak a feldolgozási többlet- és volumenproblémák miatti ablakkezelésben, ilyen forgatókönyvekben a szélsőséges esetek kezeléséhez, beleértve azokat az eseteket is, amikor a duplikált üzenet későn érkezik, és az adatfolyam egyedi rekordnak gondolja, mert nem nem tartja meg a rekordazonosítókat az üzenetek egyediségének keresztellenőrzéséhez, és egy másik forgatókönyv szerint az adatfolyam különböző munkavégző csomópontokon kezeli ezeket az üzeneteket hálózati hibák és/vagy munkacsomópont-hibák miatt, ezért azt gondolja, hogy ez egy egyedi rögzíti az adatfolyam feldolgozása közben, és bekerül a downstream rendszerekbe, például a Google felhő bigquery táblájába.
Az ilyen esetek mérséklése és a duplikáció megszüntetésének végső ellenőrzése a nyelő szintjén történhet, például a BigQuery-ben vagy más adattárházakban. Ez a megközelítés gyakran hasznos, ha a valós idejű deduplikáció nem kritikus, és az időszakos deduplikáció elegendő. Ez hatékonyan kiszűri és megszünteti az összes ismétlődő üzenetet a fejlett SQL-lekérdezések segítségével.
A használati eset alapján kétféle megoldás létezik a duplikátumok rögzítésére.
Először is használjon ütemezett lekérdezéseket a szerző DAG-n keresztül vagy a BigQuery konzolon, hogy rendszeres időközönként (naponta vagy óránkénti) hozzon létre dedup táblát partíciók segítségével, így bárki számára egyszerű választás lehet a folyamat létrehozása, a dedup adatok állomásozó táblában való tárolása és betöltése. a különböző adatokat a végső táblázatba.
Másodszor, egy materializált nézetet is használhatunk a valós idejű adatok megszerzéséhez, így ideális megoldást jelent az üzleti betekintések gyors megszerzéséhez.
A Bigquery SQL-lekérdezés a Github dedup_sql hivatkozáson jelenik meg.
Az alábbiakban a bigquery sql kód két olyan lehetőséget magyaráz meg, amelyeket megbeszéltünk:
-- Use below SQL queries to periodically deduplicate data in BigQuery tables. CREATE OR REPLACE TABLE Transactions AS SELECT DISTINCT * FROM raw_transactions; --OR use below incremental steps to drop the necessary partitions and re-insert the deduped data into the original table -- Step 1: Insert distinct records from the original table based on the max timestamp available CREATE OR REPLACE TABLE STAGE_TRANSACTIONS AS SELECT DISTINCT * FROM raw_transactions WHERE event_timestamp > ( SELECT MAX(event_timestamp) FROM raw_transactions ); -- Step 2: Drop the partition after deduplication DELETE FROM raw_transactions WHERE event_timestamp = > ( SELECT MAX(event_timestamp) FROM raw_transactions ); -- Step 3: Insert deduplicated data into the main table INSERT INTO raw_transactions SELECT DISTINCT * FROM STAGE_TRANSACTIONS; --OR Use below SQL query to Merge new data without duplicates the table MERGE INTO raw_transactions AS target USING ( SELECT * FROM STAGE_TRANSACTIONS ) AS source ON target.transaction_id = source.transaction_id AND target.event_timestamp <= source.event_timestamp WHEN MATCHED THEN UPDATE SET target.product = source.product, target.price = source.price, target.location = source.location, target.store = source.store, target.zipcode = source.zipcode, target.city = source.city, target.promotion = source.promotion, target.event_timestamp = source.event_timestamp WHEN NOT MATCHED THEN INSERT (transaction_id, product, price, location, store, zipcode, city, promotion, event_timestamp) VALUES (source.transaction_id, source.product, source.price, source.location, source.store, source.zipcode, source.city, source.promotion, source.event_timestamp); --OR to get the real-time data without duplicates, use following materialized view and a finest solution to retrieve dedup records quickly CREATE MATERIALIZED VIEW raw_transactions_mv AS SELECT transaction_id, product, price, location, store, zipcode, city, promotion, event_timestamp FROM ( SELECT transaction_id, product, price, location, store, zipcode, city, promotion, event_timestamp, ROW_NUMBER() OVER ( PARTITION BY transaction_id ORDER BY event_timestamp DESC ) AS row_num FROM raw_transactions ) WHERE row_num = 1;
Minden deduplikációs stratégiának megvannak a maga kompromisszumai. Íme egy összefoglaló, amely segít a megfelelő megközelítés kiválasztásában:
Módszer | Előnyök | Hátrányok |
---|---|---|
Pub/Sub Message Attribútumok | Alacsony késleltetésű, natív Pub/Sub | 10 perces deduplikációs ablakra korlátozva |
Apache Beam Duplicate | Rendkívül rugalmas, támogatja az összetett deduplikációs logikát | Az állami gazdálkodás miatt magasabb erőforrás-felhasználás |
Sink-Based Deduplikáció | Alkalmas kötegelt vagy időszakos frissítésre, minimális logika | Bevezetheti a késleltetést; hangszerelési eszközökre lehet szükség |
A deduplikáció a hatékony adatfeldolgozás sarokköve az adatfolyam-folyamatokban. A stratégia megválasztása a folyamat valós idejű igényeitől, összetettségétől és az erőforrás-korlátoktól függ. A Pub/Sub attribútumok, az Apache Beam Deduplicate PTransform vagy a fogadó alapú deduplikáció erősségeit kihasználva tiszta, megbízható adatokat biztosíthat a downstream rendszerek számára. Fedezze fel ezeket a megközelítéseket, hajtsa végre a bemutatott példákat, és alkalmazza azokat az Ön használati esetéhez az optimális eredmény érdekében.
Érdekelnek részletesebb útmutatók az adatelemzésről és a gépi tanulásról? Kövess engem