paint-brush
Tencent Music passe de ClickHouse à Apache Dorispar@junzhang
2,865 lectures
2,865 lectures

Tencent Music passe de ClickHouse à Apache Doris

par Jun Zhang13m2023/03/14
Read on Terminal Reader

Trop long; Pour lire

Tencent Music est un fournisseur de services de streaming musical avec 800 millions d'utilisateurs actifs par mois. La bibliothèque musicale contient des données de toutes formes et de tous types : musique enregistrée, musique live, audios, vidéos, etc. Nous avons stocké et traité la plupart de nos données dans Tencent Data Warehouse (TDW). ClickHouse excellait dans le traitement des tables plates, mais c'était un énorme gaspillage de ressources de stockage pour verser toutes les données dans une table plate et les partitionner par jour. Apache Doris était la solution.
featured image - Tencent Music passe de ClickHouse à Apache Doris
Jun Zhang HackerNoon profile picture
0-item

Cet article est co-écrit par moi et mon collègue Kai Dai. Nous sommes tous les deux ingénieurs de la plate-forme de données chez Tencent Music (NYSE : TME), un fournisseur de services de streaming musical avec 800 millions d'utilisateurs actifs par mois. Laisser tomber le nombre ici n'est pas pour se vanter mais pour donner un aperçu de la mer de données que mes pauvres collègues et moi devons traiter tous les jours.


Pourquoi nous utilisons ClickHouse

La bibliothèque musicale de Tencent Music contient des données de toutes formes et de tous types : musique enregistrée, musique live, audios, vidéos, etc. En tant qu'ingénieurs de la plateforme de données, notre travail consiste à distiller des informations à partir des données, sur la base desquelles nos coéquipiers peuvent prendre de meilleures décisions. pour soutenir nos utilisateurs et partenaires musicaux.


Plus précisément, nous effectuons une analyse complète des chansons, des paroles, des mélodies, des albums et des artistes, transformons toutes ces informations en actifs de données et les transmettons à nos utilisateurs de données internes pour le comptage des stocks, le profilage des utilisateurs, l'analyse des mesures et le ciblage des groupes. .



Nous avons stocké et traité la plupart de nos données dans Tencent Data Warehouse (TDW), une plate-forme de données hors ligne où nous avons placé les données dans divers systèmes de balises et de métriques, puis créé des tableaux plats centrant chaque objet (chansons, artistes, etc.).


Ensuite, nous avons importé les tableaux plats dans ClickHouse pour l'analyse et Elasticsearch pour la recherche de données et le ciblage de groupe.


Après cela, nos analystes de données ont utilisé les données sous les balises et les métriques dont ils avaient besoin pour former des ensembles de données pour différents scénarios d'utilisation, au cours desquels ils pouvaient créer leurs propres balises et métriques.


Le pipeline de traitement des données ressemblait à ceci :



Les problèmes avec ClickHouse


Lors de l'utilisation du pipeline ci-dessus, nous avons rencontré quelques difficultés :


  1. Mise à jour partielle : la mise à jour partielle des colonnes n'était pas prise en charge. Par conséquent, toute latence de l'une des sources de données pourrait retarder la création de tableaux plats et ainsi nuire à l'actualité des données.


  2. Coût de stockage élevé : les données sous différentes balises et métriques ont été mises à jour à différentes fréquences. Autant ClickHouse excellait dans le traitement des tables plates, c'était un énorme gaspillage de ressources de stockage que de simplement verser toutes les données dans une table plate et de les partitionner par jour, sans parler des coûts de maintenance qui en découlaient.


  3. Coût de maintenance élevé : D'un point de vue architectural, ClickHouse se caractérisait par le fort couplage des nœuds de stockage et des nœuds de calcul. Ses composants étaient fortement interdépendants, ce qui augmentait les risques d'instabilité des clusters. De plus, pour les requêtes fédérées sur ClickHouse et Elasticsearch, nous avons dû nous occuper d'un grand nombre de problèmes de connexion. C'était juste fastidieux.


Transition vers Apache Doris

Apache Doris , une base de données analytique en temps réel, possède quelques fonctionnalités qui sont exactement ce dont nous avions besoin pour résoudre nos problèmes :


  1. Mise à jour partielle : Doris prend en charge une grande variété de modèles de données, parmi lesquels le modèle agrégé prend en charge la mise à jour partielle en temps réel des colonnes. Sur cette base, nous pouvons ingérer directement des données brutes dans Doris et y créer des tableaux plats. L'ingestion se déroule comme suit : premièrement, nous utilisons Spark pour charger des données dans Kafka ; ensuite, toutes les données incrémentielles seront mises à jour vers Doris et Elasticsearch via Flink. Pendant ce temps, Flink pré-agrégera les données afin de soulager Doris et Elasticsearch.


  2. Coût de stockage : Doris prend en charge les requêtes de jointure multi-tables et les requêtes fédérées sur Hive, Iceberg, Hudi, MySQL et Elasticsearch. Cela nous permet de diviser les grandes tables plates en plus petites et de les partitionner par fréquence de mise à jour. Les avantages de cette opération incluent un allègement de la charge de stockage et une augmentation du débit des requêtes.


  3. Coût de maintenance : Doris est d'architecture simple et est compatible avec le protocole MySQL. Le déploiement de Doris n'implique que deux processus (FE et BE) sans dépendance vis-à-vis d'autres systèmes, ce qui facilite son exploitation et sa maintenance. De plus, Doris prend en charge l'interrogation des tables de données ES externes. Il peut facilement s'interfacer avec les métadonnées dans ES et mapper automatiquement le schéma de table à partir d'ES afin que nous puissions effectuer des requêtes sur les données Elasticsearch via Doris sans nous débattre avec des connexions complexes.


De plus, Doris prend en charge plusieurs méthodes d'ingestion de données, y compris l'importation par lots à partir d'un stockage distant tel que HDFS et S3, les lectures de données à partir de MySQL binlog et Kafka, et la synchronisation de données en temps réel ou l'importation par lots à partir de MySQL, Oracle et PostgreSQL. Il assure la disponibilité du service et la fiabilité des données grâce à un protocole de cohérence et est capable de débogage automatique. C'est une excellente nouvelle pour nos opérateurs et nos mainteneurs.


Statistiquement parlant, ces fonctionnalités ont réduit nos coûts de stockage de 42 % et nos coûts de développement de 40 %.


Au cours de notre utilisation de Doris, nous avons reçu beaucoup de soutien de la communauté open source Apache Doris et une aide opportune de l'équipe SelectDB, qui exécute maintenant une version commerciale d'Apache Doris.



Améliorations supplémentaires pour répondre à nos besoins

Introduire une couche sémantique

En parlant d'ensembles de données, du bon côté, nos analystes de données ont la liberté de redéfinir et de combiner les balises et les mesures à leur convenance. Mais du côté obscur, la grande hétérogénéité des systèmes d'étiquettes et de métriques entraîne plus de difficultés dans leur utilisation et leur gestion.


Notre solution consiste à introduire une couche sémantique dans notre pipeline de traitement de données. La couche sémantique est l'endroit où tous les termes techniques sont traduits en concepts plus compréhensibles pour nos utilisateurs de données internes. En d'autres termes, nous transformons les balises et les métriques en citoyens de première classe pour la définition et la gestion des données.



Pourquoi cela aiderait-il ?


Pour les analystes de données, toutes les balises et mesures seront créées et partagées au niveau sémantique afin qu'il y ait moins de confusion et une plus grande efficacité.


Pour les utilisateurs de données, ils n'ont plus besoin de créer leurs propres jeux de données ou de déterminer lequel est applicable pour chaque scénario, mais peuvent simplement effectuer des requêtes sur leur jeu de balises et leur jeu de métriques spécifiés.

Mettre à niveau la couche sémantique

Définir explicitement les balises et les métriques au niveau de la couche sémantique ne suffisait pas. Afin de construire un système de traitement de données standardisé, notre objectif suivant était d'assurer une définition cohérente des balises et des métriques tout au long du pipeline de traitement des données.


Pour cela, nous avons fait de la couche sémantique le cœur de notre système de gestion de données :



Comment ça marche?


Toutes les logiques de calcul dans TDW seront définies au niveau de la couche sémantique sous la forme d'une seule balise ou métrique.


La couche sémantique reçoit les requêtes logiques du côté application, sélectionne un moteur en conséquence et génère du SQL. Ensuite, il envoie la commande SQL à TDW pour exécution. Pendant ce temps, il peut également envoyer des tâches de configuration et d'ingestion de données à Doris et décider quelles métriques et balises doivent être accélérées.


De cette façon, nous avons rendu les balises et les métriques plus gérables. Une mouche dans la pommade est que puisque chaque balise et métrique est définie individuellement, nous avons du mal à automatiser la génération d'une instruction SQL valide pour les requêtes. Si vous avez une idée à ce sujet, vous êtes plus que bienvenu pour nous en parler.


Donnez le plein jeu à Apache Doris


Comme vous pouvez le voir, Apache Doris a joué un rôle central dans notre solution. L'optimisation de l'utilisation de Doris peut grandement améliorer l'efficacité globale de notre traitement des données. Dans cette partie, nous allons donc partager avec vous ce que nous faisons avec Doris pour accélérer l'ingestion et les requêtes de données et réduire les coûts.


Ce que nous voulons?


Actuellement, nous avons plus de 800 balises et plus de 1300 métriques dérivées des plus de 80 tables sources dans TDW. Lors de l'importation de données de TDW vers Doris, nous espérons obtenir :


  • Disponibilité en temps réel : en plus de l'ingestion traditionnelle de données hors ligne T+1, nous avons besoin d'un marquage en temps réel.


  • Mise à jour partielle : chaque table source génère des données via sa propre tâche ETL à différents rythmes et n'implique qu'une partie des balises et des métriques, nous avons donc besoin de la prise en charge de la mise à jour partielle des colonnes.


  • Haute performance : Nous avons besoin d'un temps de réponse de quelques secondes seulement dans les scénarios de ciblage de groupe, d'analyse et de reporting.


  • Faibles coûts : Nous espérons réduire au maximum les coûts.


Que faisons-nous ?


  1. Générer des tables plates dans Flink au lieu de TDW



La génération de tableaux plats dans TDW présente quelques inconvénients :


  • Coût de stockage élevé : TDW doit maintenir une table plate supplémentaire en dehors des tables sources discrètes 80+. C'est une énorme redondance.


  • Temps réel faible : tout retard dans les tables source sera augmenté et retardera l'ensemble de la liaison de données.


  • Coût de développement élevé : Pour atteindre l'actualité, il faudrait des efforts et des ressources de développement supplémentaires.

Au contraire, générer des tableaux plats dans Doris est beaucoup plus facile et moins coûteux. Le processus est le suivant :


  • Utilisez Spark pour importer de nouvelles données dans Kafka de manière hors ligne.
  • Utilisez Flink pour consommer des données Kafka.
  • Créez une table plate via l'ID de clé primaire.
  • Importez la table plate dans Doris. Comme indiqué ci-dessous, Flink a regroupé les cinq lignes de données, dont "ID" = 1, en une seule ligne dans Doris, réduisant ainsi la pression d'écriture des données sur Doris.


Cela peut réduire considérablement les coûts de stockage puisque TDW n'a plus à conserver deux copies de données et KafKa n'a besoin que de stocker les nouvelles données en attente d'ingestion. De plus, nous pouvons ajouter la logique ETL de notre choix dans Flink et réutiliser de nombreuses logiques de développement pour l'ingestion de données hors ligne et en temps réel.


  1. Nommez les colonnes intelligemment


Comme nous l'avons mentionné, le modèle agrégé de Doris permet une mise à jour partielle des colonnes. Nous fournissons ici une introduction simple à d'autres modèles de données dans Doris pour votre référence :


Modèle unique : cela s'applique aux scénarios nécessitant l'unicité de la clé primaire. Il ne conserve que les dernières données du même ID de clé primaire. (Pour autant que nous le sachions, la communauté Apache Doris prévoit également d'inclure une mise à jour partielle des colonnes dans le modèle unique.)


Modèle dupliqué : ce modèle stocke toutes les données d'origine exactement telles qu'elles sont, sans pré-agrégation ni déduplication.


Après avoir déterminé le modèle de données, nous avons dû réfléchir à la façon de nommer les colonnes. L'utilisation des balises ou des métriques comme noms de colonne n'était pas un choix car :


Ⅰ. Nos utilisateurs de données internes peuvent avoir besoin de renommer les métriques ou les balises, mais Doris 1.1.3 ne prend pas en charge la modification des noms de colonne.


Ⅱ. Les balises peuvent être prises en ligne et hors ligne fréquemment. Si cela implique l'ajout et la suppression de colonnes, non seulement cela prendra du temps, mais cela nuira également aux performances des requêtes. Au lieu de cela, nous procédons comme suit :


  • Pour renommer de manière flexible les balises et les métriques, nous utilisons des tables MySQL pour stocker les métadonnées (nom, ID unique au monde, statut, etc.). Toute modification des noms ne se produira que dans les métadonnées mais n'affectera pas le schéma de table dans Doris. Par exemple, si un song_name reçoit un ID de 4, il sera stocké avec le nom de colonne a4 dans Doris. Ensuite, si le song_name est impliqué dans une requête, il sera converti en a4 en SQL.


  • Pour la mise en ligne et hors ligne des balises, nous trions les balises en fonction de la fréquence à laquelle elles sont utilisées. Les moins utilisés recevront une marque hors ligne dans leurs métadonnées. Aucune nouvelle donnée ne sera placée sous les balises hors ligne, mais les données existantes sous ces balises seront toujours disponibles.


  • Pour la disponibilité en temps réel des balises et des mesures nouvellement ajoutées, nous précréons quelques colonnes d'ID dans les tables Doris en fonction du mappage des ID de nom. Ces colonnes d'ID réservées seront attribuées aux balises et métriques nouvellement ajoutées. Ainsi, nous pouvons éviter le changement de schéma de table et les frais généraux qui en découlent. Notre expérience montre que seulement 10 minutes après l'ajout des balises et des mesures, les données sous-jacentes peuvent être disponibles.


Il convient de noter que Doris 1.2.0, récemment publié, prend en charge Light Schema Change, ce qui signifie que pour ajouter ou supprimer des colonnes, il vous suffit de modifier les métadonnées dans FE. En outre, vous pouvez renommer les colonnes dans les tables de données tant que vous avez activé le changement de schéma léger pour les tables. C'est un gros gain de temps pour nous.


  1. Optimiser l'écriture de la date


Voici quelques pratiques qui ont réduit notre temps quotidien d'ingestion de données hors ligne de 75 % et notre score de compactage CUMU de 600+ à 100.


  • Pré-agrégation Flink : comme mentionné ci-dessus.


  • Dimensionnement automatique du lot d'écriture : pour réduire l'utilisation des ressources Flink, nous permettons aux données d'un sujet Kafka d'être écrites dans différentes tables Doris et réalisons la modification automatique de la taille du lot en fonction de la quantité de données.


  • Optimisation de l'écriture des données Doris : affiner les tailles de tablettes et buckets ainsi que les paramètres de compactage pour chaque scénario :


     max_XXXX_compaction_thread max_cumulative_compaction_num_singleton_deltas


  • Optimisation de la logique de validation BE : effectuez une mise en cache régulière des listes BE, validez-les dans les nœuds BE lot par lot et utilisez une granularité d'équilibrage de charge plus fine.



  1. Utiliser Dori-on-ES dans les requêtes

    Environ 60 % de nos requêtes de données impliquent un ciblage de groupe. Le ciblage de groupe consiste à trouver nos données cibles en utilisant un ensemble de balises comme filtres. Cela pose quelques exigences pour notre architecture de traitement de données :


  • Le ciblage de groupe lié aux utilisateurs de l'APP peut impliquer une logique très compliquée. Cela signifie que le système doit prendre en charge simultanément des centaines de balises en tant que filtres.


  • La plupart des scénarios de ciblage de groupe ne nécessitent que les dernières données de tag. Cependant, les requêtes de métrique doivent prendre en charge les données historiques.


  • Les utilisateurs de données peuvent avoir besoin d'effectuer une analyse agrégée supplémentaire des données de métrique après le ciblage du groupe.


  • Les utilisateurs de données peuvent également avoir besoin d'effectuer des requêtes détaillées sur les balises et les mesures après le ciblage du groupe.


Après réflexion, nous avons décidé d'adopter Doris-on-ES. Doris est l'endroit où nous stockons les données de métriques pour chaque scénario sous forme de table de partition, tandis qu'Elasticsearch stocke toutes les données de balises. La solution Doris-on-ES combine la capacité de planification de requêtes distribuées de Doris et la capacité de recherche en texte intégral d'Elasticsearch. Le modèle de requête est le suivant :


 SELECT tag, agg(metric) FROM Doris WHERE id in (select id from Es where tagFilter) GROUP BY tag


Comme indiqué, les données d'identification situées dans Elasticsearch seront utilisées dans la sous-requête de Doris pour l'analyse des métriques. En pratique, nous constatons que le temps de réponse à la requête est lié à la taille du groupe cible. Si le groupe cible contient plus d'un million d'objets, la requête prendra jusqu'à 60 secondes. S'il est encore plus grand, une erreur de délai d'attente peut se produire. Après enquête, nous avons identifié nos deux plus grosses pertes de temps :


I. Lorsque Doris BE extrait des données d'Elasticsearch (1024 lignes à la fois par défaut), pour un groupe cible de plus d'un million d'objets, la surcharge d'E/S réseau peut être énorme.


II. Après l'extraction des données, Doris BE doit effectuer des opérations de jointure avec des tables de métriques locales via SHUFFLE/BROADCAST, ce qui peut coûter cher.



Ainsi, nous effectuons les optimisations suivantes :


  • Ajoutez une variable de session de requête es_optimize qui spécifie s'il faut activer l'optimisation.


  • Lors de l'écriture de données dans ES, ajoutez une colonne BK pour stocker le numéro de compartiment après le hachage de l'ID de clé primaire. L'algorithme est le même que l'algorithme de compartimentage dans Doris (CRC32).


  • Utilisez Doris BE pour générer un plan d'exécution Bucket Join, envoyez le numéro de compartiment à BE ScanNode et poussez-le vers ES.


  • Utilisez ES pour compresser les données interrogées ; transformez plusieurs récupérations de données en une seule et réduisez la surcharge d'E/S du réseau.


  • Assurez-vous que Doris BE extrait uniquement les données des buckets liés aux tables de métriques locales et effectue directement les opérations de jointure locales pour éviter le brassage des données entre les Doris BE.



    En conséquence, nous réduisons le temps de réponse aux requêtes pour le ciblage de grands groupes de 60 secondes à un surprenant 3,7 secondes. Les informations de la communauté montrent que Doris va prendre en charge l'indexation inversée depuis la version 2.0.0, qui sera bientôt publiée. Avec cette nouvelle version, nous serons en mesure d'effectuer une recherche en texte intégral sur les types de texte, le filtrage par équivalence ou par plage de textes, de nombres et de date/heure, et de combiner commodément la logique AND, OR, NOT dans le filtrage puisque l'indexation inversée prend en charge les types de tableaux. Cette nouvelle fonctionnalité de Doris devrait offrir des performances 3 à 5 fois supérieures à celles d'Elasticsearch sur la même tâche.


  1. Affiner la gestion des données


La capacité de séparation des données froides et chaudes de Doris constitue la base de nos stratégies de réduction des coûts dans le traitement des données.


  • Sur la base du mécanisme TTL de Doris, nous stockons uniquement les données de l'année en cours dans Doris et plaçons les données historiques avant cela dans TDW pour un coût de stockage inférieur.


  • Nous varions le nombre de copies pour différentes partitions de données. Par exemple, nous définissons trois copies pour les données des trois derniers mois, qui sont fréquemment utilisées, une copie pour les données de plus de six mois et deux copies pour les données intermédiaires.


Doris prend en charge la transformation des données chaudes en données froides. Nous ne stockons donc que les données des sept derniers jours sur SSD et transférons les données plus anciennes sur le disque dur pour un stockage moins coûteux.

Conclusion

Merci d'avoir fait défiler jusqu'ici et d'avoir terminé cette longue lecture. Nous avons partagé nos acclamations et nos larmes, les leçons apprises et quelques pratiques qui pourraient vous être utiles lors de notre transition de ClickHouse à Doris. Nous apprécions vraiment l'aide de la communauté Apache Doris et de l'équipe SelectDB, mais nous pourrions encore les poursuivre pendant un certain temps puisque nous essayons de réaliser l'auto-identification des données froides et chaudes, le pré-calcul des balises/métriques fréquemment utilisées, simplification de la logique du code à l'aide des vues matérialisées, etc.