HBase
 sql >> Base de données >  >> NoSQL >> HBase

Mode d'emploi :utiliser le chargement en bloc HBase et pourquoi

Apache HBase consiste à vous donner un accès aléatoire, en temps réel, en lecture/écriture à votre Big Data, mais comment intégrez-vous efficacement ces données dans HBase en premier lieu ? Intuitivement, un nouvel utilisateur essaiera de le faire via les API clientes ou en utilisant un travail MapReduce avec TableOutputFormat, mais ces approches sont problématiques, comme vous l'apprendrez ci-dessous. Au lieu de cela, la fonctionnalité de chargement en masse HBase est beaucoup plus facile à utiliser et peut insérer la même quantité de données plus rapidement.

Cet article de blog présentera les concepts de base de la fonctionnalité de chargement en masse, présentera deux cas d'utilisation et proposera deux exemples.

Présentation du chargement groupé

Si vous présentez l'un de ces symptômes, le chargement groupé est probablement le bon choix pour vous :

  • Vous deviez modifier vos MemStores pour utiliser la majeure partie de la mémoire.
  • Vous deviez soit utiliser des WAL plus grands, soit les contourner complètement.
  • Vos files d'attente de compactage et de vidage se comptent par centaines.
  • Votre GC est hors de contrôle car vos insertions se situent dans les Mo.
  • Votre latence sort de votre SLA lorsque vous importez des données.

La plupart de ces symptômes sont communément appelés «douleurs de croissance». L'utilisation du chargement groupé peut vous aider à les éviter.

En langage HBase, le chargement en bloc est le processus de préparation et de chargement des HFiles (le propre format de fichier de HBase) directement dans les RegionServers, contournant ainsi le chemin d'écriture et évitant complètement ces problèmes. Ce processus est similaire à ETL et ressemble à ceci :

1. Extraire les données d'une source, généralement des fichiers texte ou une autre base de données. HBase ne gère pas cette partie du processus. En d'autres termes, vous ne pouvez pas dire à HBase de préparer les HFiles en les lisant directement à partir de MySQL - vous devez plutôt le faire par vos propres moyens. Par exemple, vous pouvez exécuter mysqldump sur une table et télécharger les fichiers résultants sur HDFS ou simplement récupérer vos fichiers journaux Apache HTTP. Dans tous les cas, vos données doivent être dans HDFS avant l'étape suivante.

2. Transformez les données en HFiles. Cette étape nécessite une tâche MapReduce et pour la plupart des types d'entrée, vous devrez écrire vous-même le mappeur. Le travail devra émettre la clé de ligne en tant que clé, et soit une KeyValue, un Put ou un Delete en tant que valeur. Le réducteur est géré par HBase ; vous le configurez en utilisant HFileOutputFormat.configureIncrementalLoad() et il fait ce qui suit :

  • Inspecte la table pour configurer un partitionneur de commandes totales
  • Télécharge le fichier de partitions sur le cluster et l'ajoute au DistributedCache
  • Définit le nombre de tâches de réduction pour correspondre au nombre actuel de régions
  • Définit la classe clé/valeur de sortie pour qu'elle corresponde aux exigences de HFileOutputFormat
  • Configure le réducteur pour effectuer le tri approprié (soit KeyValueSortReducer ou PutSortReducer)

A ce stade, un HFile sera créé par région dans le dossier de sortie. Gardez à l'esprit que les données d'entrée sont presque entièrement réécrites, vous aurez donc besoin d'au moins deux fois plus d'espace disque disponible que la taille de l'ensemble de données d'origine. Par exemple, pour un mysqldump de 100 Go, vous devez disposer d'au moins 200 Go d'espace disque disponible dans HDFS. Vous pouvez supprimer le fichier de vidage à la fin du processus.

3. Chargez les fichiers dans HBase en indiquant aux RegionServers où les trouver. C'est l'étape la plus facile. Il nécessite l'utilisation de LoadIncrementalHFiles (plus communément appelé l'outil completebulkload), et en lui transmettant une URL qui localise les fichiers dans HDFS, il chargera chaque fichier dans la région concernée via le RegionServer qui le dessert. Dans le cas où une région a été divisée après la création des fichiers, l'outil divisera automatiquement le HFile en fonction des nouvelles limites. Ce processus n'est pas très efficace, donc si votre table est en cours d'écriture par d'autres processus, il est préférable de charger les fichiers dès que l'étape de transformation est terminée.

Voici une illustration de ce processus. Le flux de données va de la source d'origine à HDFS, où les RegionServers déplaceront simplement les fichiers vers les répertoires de leurs régions.

Cas d'utilisation

Chargement de l'ensemble de données d'origine : Tous les utilisateurs migrant depuis un autre magasin de données doivent envisager ce cas d'utilisation. Tout d'abord, vous devez passer par l'exercice de conception du schéma de table, puis créer la table elle-même, pré-divisée. Les points de partage doivent prendre en considération la distribution des clés de ligne et le nombre de RegionServers. Je recommande de lire la présentation de mon collègue Lars George sur la conception de schémas avancés pour tout cas d'utilisation sérieux.

L'avantage ici est qu'il est beaucoup plus rapide d'écrire les fichiers directement que de passer par le chemin d'écriture du RegionServer (écriture à la fois sur le MemStore et le WAL), puis éventuellement de vider, de compacter, etc. Cela signifie également que vous n'avez pas besoin de régler votre cluster pour une charge de travail intensive en écriture, puis de le régler à nouveau pour votre charge de travail normale.

Chargement incrémentiel : Disons que vous avez un ensemble de données actuellement servi par HBase, mais maintenant vous devez importer plus de données par lots à partir d'un tiers ou vous avez un travail nocturne qui génère quelques gigaoctets que vous devez insérer. Ce n'est probablement pas aussi volumineux que l'ensemble de données que HBase sert déjà, mais cela pourrait affecter le 95e centile de votre latence. Passer par le chemin d'écriture normal aura pour effet négatif de déclencher plus de vidages et de compactages lors de l'importation que la normale. Cette contrainte d'E/S supplémentaire entrera en concurrence avec vos requêtes sensibles à la latence.

Exemples

Vous pouvez utiliser les exemples suivants dans votre propre cluster Hadoop, mais les instructions sont fournies pour la machine virtuelle Cloudera QuickStart, qui est un cluster à nœud unique, un système d'exploitation invité, ainsi que des exemples de données et d'exemples intégrés dans une machine virtuelle pour votre poste de travail.

Une fois que vous avez démarré la VM, dites-lui, via l'interface Web qui s'ouvrira automatiquement, de déployer CDH puis assurez-vous que le service HBase est également démarré.

Chargeur de masse TSV intégré

HBase est livré avec une tâche MR qui peut lire un fichier de valeurs séparées par des délimiteurs et sortir directement dans une table HBase ou créer des HFiles pour le chargement en masse. Ici, nous allons :

  1. Obtenez les exemples de données et importez-les sur HDFS.
  2. Exécutez la tâche ImportTsv pour transformer le fichier en plusieurs HFiles selon une table préconfigurée.
  3. Préparez et chargez les fichiers dans HBase.

La première étape consiste à ouvrir une console et à utiliser la commande suivante pour obtenir des exemples de données :

curl -O
https://people.apache.org/~jdcryans/word_count.csv

J'ai créé ce fichier en exécutant un comptage de mots sur le manuscrit original de cet article de blog, puis en affichant le résultat au format csv, sans aucun titre de colonne. Maintenant, chargez le fichier sur HDFS :

hdfs dfs -put word_count.csv

La partie extraction du chargement en bloc étant maintenant terminée, vous devez transformer le fichier. Vous devez d'abord concevoir la table. Pour garder les choses simples, appelez-le "wordcount" - les clés de ligne seront les mots eux-mêmes et la seule colonne contiendra le nombre, dans une famille que nous appellerons "f". La meilleure pratique lors de la création d'un tableau consiste à le diviser en fonction de la distribution des clés de ligne, mais pour cet exemple, nous allons simplement créer cinq régions avec des points de partage répartis uniformément sur l'espace clé. Ouvrez le shell hbase :

hbase shell

Et exécutez la commande suivante pour créer la table :

create 'wordcount', {NAME => 'f'},   {SPLITS => ['g', 'm', 'r', 'w']}

Les quatre points de partage généreront cinq régions, la première région commençant par une clé de ligne vide. Pour obtenir de meilleurs points partagés, vous pouvez également effectuer une analyse rapide pour voir comment les mots sont réellement distribués, mais je vous laisse le soin de le faire.

Si vous pointez le navigateur de votre machine virtuelle vers http://localhost:60010/, vous verrez notre table nouvellement créée et ses cinq régions toutes affectées au RegionServer.

Il est maintenant temps de faire le gros du travail. L'appel du jar HBase sur la ligne de commande avec le script "hadoop" affichera une liste des outils disponibles. Celui que nous voulons s'appelle importtsv et a l'usage suivant :

hadoop jar /usr/lib/hbase/hbase-0.94.6-cdh4.3.0-security.jar importtsv
 ERROR: Wrong number of arguments: 0
 Usage: importtsv -Dimporttsv.columns=a,b,c  

La ligne de commande que nous allons utiliser est la suivante :

hadoop jar   /usr/lib/hbase/hbase-0.94.6-cdh4.3.0-
security.jar importtsv
-Dimporttsv.separator=,
-Dimporttsv.bulk.output=output
-Dimporttsv.columns=HBASE_ROW_KEY,f:count wordcount word_count.csv

Voici un aperçu des différents éléments de configuration :

  • -Dimporttsv.separator=, spécifie que le séparateur est une virgule.
  • -Dimporttsv.bulk.output=sortie est un chemin relatif vers l'endroit où les HFiles seront écrits. Puisque votre utilisateur sur la machine virtuelle est "cloudera" par défaut, cela signifie que les fichiers seront dans /user/cloudera/output. Si vous ignorez cette option, la tâche écrira directement dans HBase.
  • -Dimporttsv.columns=HBASE_ROW_KEY,f:count est une liste de toutes les colonnes contenues dans ce fichier. La clé de ligne doit être identifiée à l'aide de la chaîne HBASE_ROW_KEY entièrement en majuscules ; sinon, il ne démarrera pas le travail. (J'ai décidé d'utiliser le qualificatif "count", mais cela pourrait être n'importe quoi d'autre.)

Le travail devrait se terminer en une minute, compte tenu de la petite taille de l'entrée. Notez que cinq réducteurs sont en cours d'exécution, un par région. Voici le résultat sur HDFS :

-rw-r--r--   3 cloudera cloudera         4265   2013-09-12 13:13 output/f/2c0724e0c8054b70bce11342dc91897b
-rw-r--r--   3 cloudera cloudera         3163   2013-09-12 13:14 output/f/786198ca47ae406f9be05c9eb09beb36
-rw-r--r--   3 cloudera cloudera         2487   2013-09-12 13:14 output/f/9b0e5b2a137e479cbc978132e3fc84d2
-rw-r--r--   3 cloudera cloudera         2961   2013-09-12 13:13 output/f/bb341f04c6d845e8bb95830e9946a914
-rw-r--r--   3 cloudera cloudera         1336   2013-09-12 13:14 output/f/c656d893bd704260a613be62bddb4d5f

Comme vous pouvez le voir, les fichiers appartiennent actuellement à l'utilisateur "cloudera". Pour les charger, nous devons changer le propriétaire en "hbase" ou HBase n'aura pas l'autorisation de déplacer les fichiers. Exécutez la commande suivante :

sudo -u hdfs hdfs dfs -chown -R   hbase:hbase/user/cloudera/output

Pour la dernière étape, nous devons utiliser l'outil completebulkload pour indiquer où se trouvent les fichiers et dans quelles tables nous chargeons :

hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles output wordcount

En revenant dans le shell HBase, vous pouvez exécuter la commande count qui vous montrera combien de lignes ont été chargées. Si vous avez oublié de chown, la commande se bloquera.

Tâche MR personnalisée

Le chargeur en bloc TSV est bon pour le prototypage, mais comme il interprète tout comme des chaînes et ne prend pas en charge la manipulation des champs au moment de la transformation, vous finirez par devoir écrire votre propre travail MR. Mon collègue James Kinley, qui travaille comme architecte de solutions en Europe, a écrit un tel travail que nous allons utiliser pour notre prochain exemple. Les données de l'emploi contiennent des messages publics Facebook et Twitter liés à la finale NBA 2010 (match 1) entre les Lakers et les Celtics. Vous pouvez trouver le code ici. (La machine virtuelle Quick Start est livrée avec git et maven installés afin que vous puissiez cloner le référentiel dessus.)

En regardant la classe Driver, les bits les plus importants sont les suivants :

job.setMapOutputKeyClass(ImmutableBytesWritable.class);
    job.setMapOutputValueClass(KeyValue.class);
…
	// Auto configure partitioner and reducer
    HFileOutputFormat.configureIncrementalLoad(job, hTable);

Tout d'abord, votre mappeur doit générer un ImmutableBytesWritable qui contient la clé de ligne, et la valeur de sortie peut être une KeyValue, un Put ou un Delete. Le deuxième extrait montre comment configurer le réducteur ; il est en fait entièrement géré par HFileOutputFormat. configureIncrementalLoad() comme décrit dans la section "Transformer" précédemment.

La classe HBaseKVMapper contient uniquement le Mapper qui respecte la clé et les valeurs de sortie configurées :

public class HBaseKVMapper extends
   Mapper<LongWritable,   Text, ImmutableBytesWritable,
KeyValue> {

Pour l'exécuter, vous devrez compiler le projet à l'aide de maven et récupérer les fichiers de données en suivant les liens du README. (Il contient également le script shell pour créer la table.) Avant de commencer le travail, n'oubliez pas de télécharger les fichiers sur HDFS et de définir votre chemin de classe pour qu'il soit conscient de HBase car vous n'allez pas utiliser son jar cette fois :

export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/etc/hbase/conf/:/usr/lib/hbase/*

Vous pourrez démarrer la tâche à l'aide d'une ligne de commande similaire à celle-ci :

hadoop jar hbase-examples-0.0.1-SNAPSHOT.jar
com.cloudera.examples.hbase.bulkimport.Driver -libjars
/home/cloudera/.m2/repository/joda-time/joda-time/2.1/joda-time-2.1.jar,
/home/cloudera/.m2/repository/net/sf/opencsv/opencsv/2.3/opencsv-2.3.jar
RowFeeder\ for\ Celtics\ and\ Lakers\ Game\ 1.csv output2 NBAFinal2010

Comme vous pouvez le voir, les dépendances du travail doivent être ajoutées séparément. Enfin, vous pouvez charger les fichiers en changeant d'abord leur propriétaire, puis en exécutant l'outil de chargement complet :

sudo -u hdfs hdfs dfs -chown -R hbase:hbase/user/cloudera/output2
hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles output2 NBAFinal2010

Problèmes potentiels

Les données récemment supprimées réapparaissent. Ce problème se produit lorsqu'une suppression est insérée via un chargement en bloc et qu'elle est fortement compactée alors que la mise correspondante est toujours dans un MemStore. Les données seront considérées comme supprimées lorsque le Delete est dans un HFile mais, une fois supprimé lors du compactage, le Put redeviendra visible. Si vous avez un tel cas d'utilisation, envisagez de configurer vos familles de colonnes pour conserver les cellules supprimées avec KEEP_DELETED_CELLS dans le shell ou HColumnDescriptor.setKeepDeletedCells().

Les données chargées en bloc ne peuvent pas être écrasées par un autre chargement en bloc. Ce problème se produit lorsque deux HFiles chargés en masse chargés à des moments différents tentent d'écrire une valeur différente dans la même cellule, ce qui signifie qu'ils ont la même clé de ligne, la même famille, le même qualificatif et l'horodatage. Le résultat est que la première valeur insérée sera renvoyée à la place de la seconde. Ce bogue sera corrigé dans HBase 0.96.0 et CDH 5 (la prochaine version majeure de CDH) et des travaux sont en cours dans HBASE-8521 pour la branche 0.94 et CDH 4.

Le chargement en vrac déclenche des compactages majeurs. Ce problème survient lorsque vous effectuez des chargements en bloc incrémentiels et qu'il y a suffisamment de fichiers chargés en bloc pour déclencher un compactage mineur (le seuil par défaut étant de 3). Les HFiles sont chargés avec un numéro de séquence défini sur 0 afin qu'ils soient récupérés en premier lorsque le RegionServer sélectionne des fichiers pour un compactage, et en raison d'un bogue, il sélectionnera également tous les fichiers restants. Ce problème affectera sérieusement ceux qui ont déjà de grandes régions (plusieurs Go) ou qui chargent souvent en masse (toutes les quelques heures et moins) car beaucoup de données seront compactées. HBase 0.96.0 a le correctif approprié, tout comme CDH 5 ; HBASE-8521 corrige le problème dans la version 0.94, car les HFiles chargés en masse se voient désormais attribuer un numéro de séquence approprié. HBASE-8283 peut être activé avec hbase.hstore.useExploringCompation après 0.94.9 et CDH 4.4.0 pour atténuer ce problème en étant simplement un algorithme de sélection de compactage plus intelligent.

Les données chargées en masse ne sont pas répliquées . Comme le chargement en bloc contourne le chemin d'écriture, le WAL n'est pas écrit dans le cadre du processus. La réplication fonctionne en lisant les fichiers WAL afin de ne pas voir les données chargées en bloc - et il en va de même pour les modifications qui utilisent Put.setWriteToWAL(true). Une façon de gérer cela est d'expédier les fichiers bruts ou les HFiles à l'autre cluster et d'y effectuer l'autre traitement.

Conclusion

L'objectif de cet article de blog était de vous présenter les concepts de base du chargement en masse Apache HBase. Nous avons expliqué en quoi le processus ressemble à un ETL et qu'il est bien meilleur pour les grands ensembles de données que d'utiliser l'API normale car il contourne le chemin d'écriture. Les deux exemples ont été inclus pour montrer comment des fichiers TSV simples peuvent être chargés en bloc dans HBase et comment écrire votre propre mappeur pour d'autres formats de données.

Vous pouvez maintenant essayer de faire la même chose en utilisant une interface utilisateur graphique via Hue.