Jednou ze zásadních dovedností zkušeného datového profesionála je efektivní manipulace s velkými datovými sadami, zajištění kvality a spolehlivosti dat. Data jsou ústřední a základní součástí každého datového systému a ať už máte jakékoli dobré dovednosti v jiných aspektech našeho obchodu, toto si nemůžete dovolit přehlédnout.
V tomto článku zkoumám robustní techniky pro provádění kontrol QA na velkých souborech dat pomocí knihovny Deequ a statistických metod. Kombinací přístupů, které vysvětluji níže, budete schopni zachovat integritu dat, zlepšit své postupy správy dat a předejít potenciálním problémům v navazujících aplikacích.
Zajištění kvality dat ve velkém je skličující úkol, zejména při práci s miliardami řádků uložených v distribuovaných souborových systémech nebo datových skladech. Knihovna Deequ je open-source datový profilování a rámec QA postavený na Sparku, který je moderním a všestranným nástrojem určeným k řešení tohoto problému. To, co jej odlišuje od podobných nástrojů, je jeho schopnost hladce se integrovat se Sparkem a využít výkon distribuovaného zpracování pro efektivní manipulaci s rozsáhlými datovými sadami.
Když to vyzkoušíte, uvidíte, jak vám jeho flexibilita umožňuje definovat složitá ověřovací pravidla přizpůsobená vašim konkrétním požadavkům a zajistit tak komplexní pokrytí. Deequ navíc nabízí rozsáhlé metriky a možnosti detekce anomálií, které vám pomohou identifikovat a proaktivně řešit problémy s kvalitou dat. Pro datové profesionály pracující s velkými a dynamickými datovými sadami je Deequ řešením švýcarského nože. Pojďme se podívat, jak to můžeme využít.
Další podrobnosti o nastavení knihovny Deequ a případech použití v oblasti profilování dat jsou dostupné zde . Pro zjednodušení jsme v tomto příkladu právě vygenerovali několik záznamů o hračkách:
val rdd = spark.sparkContext.parallelize(Seq( Item(1, "Thingy A", "awesome thing.", "high", 0), Item(2, "Thingy B", "available at http://thingb.com", null, 0), Item(3, null, null, "low", 5), Item(4, "Thingy D", "checkout https://thingd.ca", "low", 10), Item(5, "Thingy E", null, "high", 12))) val data = spark.createDataFrame(rdd)
Většina datových aplikací přichází s implicitními předpoklady o atributech dat, jako jsou hodnoty jiné než NULL a jedinečnost. S Deequ se tyto předpoklady stanou explicitními prostřednictvím jednotkových testů. Zde jsou některé běžné kontroly:
Počet řádků: Ujistěte se, že datová sada obsahuje určitý počet řádků.
Úplnost atributu: Zkontrolujte, zda atributy jako id a productName nikdy nemají hodnotu NULL.
Jedinečnost atributu: Ujistěte se, že určité atributy, jako je id, jsou jedinečné.
Rozsah hodnot: Ověřte, zda atributy jako priorita a numViews spadají do očekávaných rozsahů.
Shoda vzorů: Ověřte, zda popisy obsahují adresy URL, když se očekává.
Statistické vlastnosti: Ujistěte se, že medián číselných atributů splňuje specifická kritéria.
Zde je návod, jak můžete tyto kontroly implementovat pomocí Deequ:
import com.amazon.deequ.VerificationSuite import com.amazon.deequ.checks.{Check, CheckLevel, CheckStatus} val verificationResult = VerificationSuite() .onData(data) .addCheck( Check(CheckLevel.Error, "unit testing my data") .hasSize(_ == 5) // we expect 5 rows .isComplete("id") // should never be NULL .isUnique("id") // should not contain duplicates .isComplete("productName") // should never be NULL // should only contain the values "high" and "low" .isContainedIn("priority", Array("high", "low")) .isNonNegative("numViews") // should not contain negative values // at least half of the descriptions should contain a url .containsURL("description", _ >= 0.5) // half of the items should have less than 10 views .hasApproxQuantile("numViews", 0.5, _ <= 10)) .run()
Po provedení těchto kontrol je Deequ převede do řady úloh Spark, které provede za účelem výpočtu metrik na datech. Poté vyvolá vaše funkce tvrzení (např. _ == 5 pro kontrolu velikosti) pro tyto metriky, aby zjistil, zda omezení platí pro data. Můžeme zkontrolovat objekt „verificationResult“, abychom zjistili, zda test nenašel chyby:
import com.amazon.deequ.constraints.ConstraintStatus if (verificationResult.status == CheckStatus.Success) { println("The data passed the test, everything is fine!") } else { println("We found errors in the data:\n") val resultsForAllConstraints = verificationResult.checkResults .flatMap { case (_, checkResult) => checkResult.constraintResults } resultsForAllConstraints .filter { _.status != ConstraintStatus.Success } .foreach { result => println(s"${result.constraint}: ${result.message.get}") } }
Pokud spustíme příklad, dostaneme následující výstup:
We found errors in the data: CompletenessConstraint(Completeness(productName)): Value: 0.8 does not meet the requirement! PatternConstraint(containsURL(description)): Value: 0.4 does not meet the requirement!
Test zjistil, že naše předpoklady byly porušeny! Pouze 4 z 5 (80 %) hodnot atributu productName jsou nenulové a pouze 2 z 5 (tj. 40 %) hodnot atributu description obsahovaly adresu URL. Naštěstí jsme provedli test a našli chyby; někdo by měl data okamžitě opravit!
Zatímco Deequ nabízí robustní rámec pro ověřování dat, integrace statistických metod může dále zlepšit vaše kontroly kvality, zejména pokud se zabýváte agregovanými metrikami datové sady. Podívejme se, jak můžete využít statistické metody ke sledování a zajištění kvality dat.
Zvažte obchodní scénář, kde proces ETL (Extract, Transform, Load) produkuje N záznamů v denní naplánované úloze. Týmy podpory mohou chtít nastavit kontroly kvality, aby upozornily na významnou odchylku v počtu záznamů. Pokud například proces obvykle generuje 9 500 až 10 500 záznamů denně po dobu dvou měsíců, jakýkoli významný nárůst nebo pokles může naznačovat problém s podkladovými daty.
Můžeme použít statistickou metodu k definování této prahové hodnoty, na kterou by měl proces upozornit tým podpory. Níže je ukázka sledování počtu záznamů za dva měsíce:
Abychom to analyzovali, můžeme transformovat údaje o počtu záznamů, abychom mohli sledovat každodenní změny. Tyto změny obecně oscilují kolem nuly, jak ukazuje následující graf:
Když tuto rychlost změny znázorníme normálním rozdělením, vytvoří se zvonovitá křivka, což naznačuje, že data jsou distribuována normálně. Očekávaná změna se pohybuje kolem 0 %, se směrodatnou odchylkou 2,63 %.
Tato analýza naznačuje, že počet záznamů obvykle spadá do rozsahu -5,26 % až +5,25 % s 90% spolehlivostí. Na základě toho můžete vytvořit pravidlo pro upozornění, pokud se počet záznamů odchýlí mimo tento rozsah, což zajistí včasný zásah.
Atribut coverag e odkazuje na poměr hodnot, které nejsou NULL, k celkovému počtu záznamů pro snímek datové sady. Pokud má například 8 ze 100 záznamů pro určitý atribut hodnotu NULL, pokrytí tohoto atributu je 92 %.
Podívejme se na další obchodní případ s procesem ETL, který denně generuje snímek tabulky produktů. Chceme sledovat pokrytí atributů popisu produktu. Pokud pokrytí klesne pod určitou hranici, měl by tým podpory upozornit. Níže je vizuální znázornění pokrytí atributů pro popisy produktů za dva měsíce:
Analýzou absolutních každodenních rozdílů v pokrytí pozorujeme, že změny oscilují kolem nuly:
Znázornění těchto dat jako normální rozdělení ukazuje, že je normálně rozděleno s očekávanou změnou kolem 0 % a standardní odchylkou 2,45 %.
Jak vidíme, pro tuto datovou sadu se pokrytí atributu popisu produktu obvykle pohybuje od -4,9 % do +4,9 % s 90% spolehlivostí. Na základě tohoto indikátoru můžeme nastavit pravidlo pro upozornění, pokud se pokrytí odchyluje mimo tento rozsah.
Pokud pracujete se soubory dat, které vykazují významné odchylky v důsledku faktorů, jako je sezónnost nebo trendy, tradiční statistické metody mohou spustit falešná upozornění. Algoritmy časových řad nabízejí propracovanější přístup, zlepšující přesnost a spolehlivost vašich kontrol QA.
Chcete-li vytvořit rozumnější upozornění, můžete použít buď
Pojďme si pomocí Holt-Winters modelovat denní prodeje, které vykazují trendové i sezónní vzorce:
import pandas as pd from statsmodels.tsa.holtwinters import ExponentialSmoothing # Load and preprocess the dataset data = pd.read_csv('sales_data.csv', index_col='date', parse_dates=True) data = data.asfreq('D').fillna(method='ffill') # Fit the Holt-Winters model model = ExponentialSmoothing(data, trend='add', seasonal='add', seasonal_periods=365) fit = model.fit() # Forecast and detect anomalies forecast = fit.fittedvalues residuals = data - forecast threshold = 3 * residuals.std() anomalies = residuals[abs(residuals) > threshold] print("Anomalies detected:") print(anomalies)
Pomocí této metody můžete odhalit významné odchylky, které by mohly naznačovat problémy s kvalitou dat, a poskytnout tak jemnější přístup ke kontrolám QA.
Doufám, že vám tento článek pomůže efektivně implementovat kontroly QA pro vaše velké datové sady. Použitím knihovny Deequ a integrací statistických metod a algoritmů časových řad můžete zajistit integritu a spolehlivost dat, což v konečném důsledku zlepší vaše postupy správy dat.
Implementace výše popsaných technik vám pomůže předejít potenciálním problémům v navazujících aplikacích a zlepšit celkovou kvalitu vašich datových pracovních toků.