HBase
 sql >> Base de données >  >> NoSQL >> HBase

Apache HBase + Apache Hadoop + Xceivers

Présentation

Certaines des propriétés de configuration trouvées dans Apache Hadoop ont un effet direct sur les clients, comme Apache HBase. L'une de ces propriétés s'appelle "dfs.datanode.max.xcievers" et appartient au sous-projet HDFS. Il définit le nombre de threads côté serveur et, dans une certaine mesure, de sockets utilisés pour les connexions de données. Définir ce nombre trop bas peut entraîner des problèmes à mesure que vous développez ou augmentez l'utilisation de votre cluster. Cet article vous aidera à comprendre ce qui se passe entre le client et le serveur, et comment déterminer un nombre raisonnable pour cette propriété.

Le problème

Étant donné que HBase stocke tout ce dont il a besoin dans HDFS, la limite supérieure stricte imposée par la propriété de configuration "dfs.datanode.max.xcievers" peut entraîner trop peu de ressources disponibles pour HBase, se manifestant par des IOExceptions de part et d'autre de la connexion. Voici un exemple de la liste de diffusion HBase [1], où les messages suivants ont été initialement enregistrés côté RegionServer :

2008-11-11 19:55:52,451 INFO org.apache.hadoop.dfs.DFSClient : Exception dans createBlockOutputStream java.io.IOException :Impossible de lire à partir du flux
2008-11-11 19:55:52,451 INFO org.apache.hadoop.dfs.DFSClient : Abandoning block blk_-5467014108758633036_595771
2008-11- 11 19:55:58,455 WARN org.apache.hadoop.dfs.DFSClient : Exception DataStreamer :java.io.IOException :Impossible de créer un nouveau bloc.
2008-11-11 19:55:58,455 WARN org.apache .hadoop.dfs.DFSClient :Récupération d'erreur pour le bloc blk_-5467014108758633036_595771 bad datanode[0]
2008-11-11 19:55:58,482 FATAL org.apache.hadoop.hbase.regionserver.Flusher :Relecture de hlog requise . Forcer l'arrêt du serveur

La corrélation avec les journaux Hadoop DataNode a révélé l'entrée suivante :

ERREUR org.apache.hadoop.dfs.DataNode : DatanodeRegistration(10.10.10.53 :50010,storageID=DS-1570581820-10.10.10.53-50010-1224117842339,infoPort=50075, ipcPort=50020):DataXceiver :java.io.IOException : xceiverCount 258 dépasse la limite de xcievers simultanés 256

Dans cet exemple, la faible valeur de "dfs.datanode.max.xcievers" pour les DataNodes a provoqué l'arrêt de l'ensemble du RegionServer. C'est une très mauvaise situation. Malheureusement, il n'y a pas de règle absolue qui explique comment calculer la limite requise. Il est généralement conseillé d'augmenter le nombre de la valeur par défaut de 256 à quelque chose comme 4096 (voir [1], [2], [3], [4] et [5] pour référence). Cela se fait en ajoutant cette propriété au fichier hdfs-site.xml de tous les DataNodes (notez qu'elle est mal orthographiée) :

    dfs.datanode.max.xcievers
4096

Remarque :Vous devrez redémarrer vos DataNodes après avoir apporté cette modification au fichier de configuration.

Cela devrait aider à résoudre le problème ci-dessus, mais vous voudrez peut-être en savoir plus sur la façon dont tout cela joue ensemble et sur ce que HBase fait avec ces ressources. Nous en discuterons dans la suite de cet article. Mais avant de le faire, nous devons comprendre pourquoi vous ne pouvez pas simplement définir ce nombre très haut, disons 64 000 et en finir avec cela.

Il y a une raison pour une limite supérieure, et elle est double :premièrement, les threads ont besoin de leur propre pile, ce qui signifie qu'ils occupent de la mémoire. Pour les serveurs actuels, cela signifie 1 Mo par thread[6] par défaut. En d'autres termes, si vous utilisez tous les 4096 threads DataXceiver, vous avez besoin d'environ 4 Go de tas pour les accueillir. Cela réduit l'espace que vous avez attribué aux mémoires et aux caches de blocs, ainsi qu'à toutes les autres parties mobiles de la JVM. Dans le pire des cas, vous pouvez rencontrer une exception OutOfMemoryException et le processus RegionServer est grillé. Vous souhaitez définir cette propriété sur un nombre raisonnablement élevé, mais pas trop élevé non plus.

Deuxièmement, en ayant ces nombreux threads actifs, vous verrez également votre CPU devenir de plus en plus chargé. Il y aura de nombreux changements de contexte pour gérer tout le travail simultané, ce qui enlève des ressources pour le travail réel. Comme pour les préoccupations concernant la mémoire, vous voulez que le nombre de threads n'augmente pas sans limite, mais fournisse une limite supérieure raisonnable - et c'est à cela que sert "dfs.datanode.max.xcievers".

Détails du système de fichiers Hadoop

Du côté client, la bibliothèque HDFS fournit l'abstraction appelée Path. Cette classe représente un fichier dans un système de fichiers pris en charge par Hadoop, représenté par la classe FileSystem. Il existe quelques implémentations concrètes de la classe abstraite FileSystem, dont l'une est DistributedFileSytem, ​​représentant HDFS. Cette classe encapsule à son tour la classe DFSClient réelle qui gère toutes les interactions avec les serveurs distants, c'est-à-dire le NameNode et les nombreux DataNodes.

Lorsqu'un client, tel que HBase, ouvre un fichier, il le fait, par exemple, en appelant les méthodes open() ou create() de la classe FileSystem, ici les incarnations les plus simplistes

  public DFSInputStream open(String src) throws IOException
public FSDataOutputStream create(Path f) throws IOException

L'instance de flux renvoyée est ce qui a besoin d'un socket et d'un thread côté serveur, qui sont utilisés pour lire et écrire des blocs de données. Ils font partie du contrat d'échange de données entre le client et le serveur. Notez qu'il existe d'autres protocoles basés sur RPC utilisés entre les différentes machines, mais pour les besoins de cette discussion, ils peuvent être ignorés.

L'instance de flux renvoyée est une classe DFSOutputStream ou DFSInputStream spécialisée, qui gère toutes les interactions avec le NameNode pour déterminer où résident les copies des blocs et la communication de données par bloc et par DataNode.

Côté serveur, le DataNode encapsule une instance de DataXceiverServer, qui est la classe réelle qui lit la clé de configuration ci-dessus et lève également l'exception ci-dessus lorsque la limite est dépassée.

Lorsque le DataNode démarre, il crée un groupe de threads et démarre l'instance DataXceiverServer mentionnée comme suit :

  this.threadGroup =new ThreadGroup(“dataXceiverServer”);
this.dataXceiverServer =new Daemon( threadGroup,
new DataXceiverServer(ss, conf, this));
this.threadGroup.setDaemon(true); // destruction automatique lorsqu'il est vide

Notez que le thread DataXceiverServer occupe déjà une place dans le groupe de threads. Le DataNode possède également cette classe interne pour récupérer le nombre de threads actuellement actifs dans ce groupe :

  /** Nombre de récepteurs simultanés par nœud. */
int getXceiverCount() {
return threadGroup ==null ? 0 :threadGroup.activeCount();
}

La lecture et l'écriture de blocs, initiées par le client, entraînent l'établissement d'une connexion, qui est encapsulée par le thread DataXceiverServer dans une instance DataXceiver. Au cours de ce transfert, un fil est créé et enregistré dans le groupe de fils ci-dessus. Ainsi, pour chaque opération de lecture et d'écriture active, un nouveau thread est suivi côté serveur. Si le nombre de threads dans le groupe dépasse le maximum configuré, ladite exception est levée et enregistrée dans les journaux du DataNode :

  if (curXceiverCount> dataXceiverServer.maxXceiverCount) {
throw new IOException(“xceiverCount ” + curXceiverCount
+ ” dépasse la limite des xcievers simultanés ”
+ dataXceiverServer.maxXceiverCount);
}

Implications pour les clients

Maintenant, la question est de savoir comment la lecture et l'écriture du client sont liées aux threads côté serveur. Avant d'entrer dans les détails, utilisons les informations de débogage que la classe DataXceiver enregistre lorsqu'elle est créée et fermée

  LOG.debug("Le nombre de connexions actives est :” + datanode.getXceiverCount());

LOG.debug(datanode.dnRegistration + ":Le nombre de connexions actives est :      + datanode.getXceiverCount());

et surveiller lors d'un démarrage de HBase ce qui est enregistré sur le DataNode. Pour des raisons de simplicité, cela se fait sur une configuration pseudo-distribuée avec une seule instance DataNode et RegionServer. Ce qui suit montre le haut de la page d'état de RegionServer.

La partie importante se trouve dans la section "Metrics", où il est écrit "storefiles=22". Donc, en supposant que HBase a au moins autant de fichiers à gérer, plus quelques fichiers supplémentaires pour le journal à écriture anticipée, nous devrions voir le message de journal ci-dessus indiquer que nous avons au moins 22 "connexions actives". Démarrons HBase et vérifions les fichiers journaux DataNode et RegionServer :

Ligne de commande :

$ bin/start-hbase.sh

Journal DataNode :

2012-03-05 13:01:35,309 DEBUG org.apache.hadoop.hdfs.server.datanode. DataNode :Le nombre de connexions actives est :1
2012-03-05 13:01:35,315 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode :DatanodeRegistration(127.0.0.1:50010, storageID=DS- 1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020) :le nombre de connexions actives est :2 globalMemStoreLimitLowMark=347.1m, maxHeap=991.7m
12/03/05 13:01:39 INFO http.HttpServer :le port renvoyé par webServer.getConnectors()[0].getLocalPort() avant open() est -1 . Ouverture du listener sur 60030
2012-03-05 13:01:40,003 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode :Le nombre de connexions actives est :1
12/03/05 13:01:40 INFO regionserver.HRegionServer :Demande reçue pour ouvrir la région :-ROOT-,,0.70236052
2012-03-05 13:01:40,882 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode :Le nombre de connexions actives est :3 -10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020) :le nombre de connexions actives est :4
2012-03-05 13:01:40,888 DEBUG org.apache.hadoop.hdfs.server. datanode.DataNode :Le nombre de connexions actives est :3

12/03/05 13:01:40 INFO regionserver.HRegion :Onlined -ROOT-,,0.70236052 ; next sequenceid=63083
2012-03-05 13:01:40,982 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode :Le nombre de connexions actives est :3
2012-03-05 13 :01:40,983 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode :DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020) :Le nombre de connexions actives est :4

12/03/05 13:01:41 INFO regionserver.HRegionServer :Demande reçue pour ouvrir la région :.META.,,1.1028785192
2012-03 -05 13:01:41,026 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode :Le nombre de connexions actives est :3
2012-03-05 13:01:41,027 DEBUG org.apache.hadoop. hdfs.server.datanode.DataNode :DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020) : 4

12/03/05 13:01:41 INFO regionserver.HRegion :Onlined .META.,,1.1028785192 ; next sequenceid=63082
2012-03-05 13:01:41,109 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode :Le nombre de connexions actives est :3
2012-03-05 13 :01:41,114 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode : le nombre de connexions actives est :4
2012-03-05 13:01:41,117 DEBUG org.apache.hadoop.hdfs.server .datanode.DataNode :Le nombre de connexions actives est :5
12/03/05 13:01:41 INFO regionserver.HRegionServer :Demande reçue pour ouvrir 16 région(s)
12/03/05 13 :01:41 INFO regionserver.HRegionServer :requête reçue pour ouvrir la région :usertable,,1330944810191.62a312d67981c86c42b6bc02e6ec7e3f.
12/03/05 13:01:41 INFO regionserver.HRegionServer :requête reçue pour ouvrir la région :usertable,user1120311784, 1330944810191.90d287473fe223f0ddc137020efda25d.

2012-03-05 13:01:41,246 DEBUG org.apache.hadoop.hdfs.server.datanode. DataNode :Le nombre de connexions actives est :6
2012-03-05 13:01:41,248 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode :Le nombre de connexions actives est :7

2012-03-05 13:01:41,257 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode :DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772 , infoPort=50075, ipcPort=50020):Le nombre de connexions actives est :10 0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020) :Le nombre de connexions actives est :9

12/03/05 13 :01:41 INFO regionserver.HRegion :table utilisateur en ligne, utilisateur1120311784,1330944810191.90d287473fe223f0ddc137020efda25d. ; next sequenceid=62917
12/03/05 13:01:41 INFO regionserver.HRegion :Onlined usertable,,1330944810191.62a312d67981c86c42b6bc02e6ec7e3f.; next sequenceid=62916

12/03/05 13:01:41 INFO regionserver.HRegion :table utilisateur en ligne, utilisateur1361265841,1330944811370.80663fcf291e3ce00080599964f406ba.; next sequenceid=62919
2012-03-05 13:01:41,474 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode :Le nombre de connexions actives est :6
2012-03-05 13 :01:41,491 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode : le nombre de connexions actives est :7
2012-03-05 13:01:41,495 DEBUG org.apache.hadoop.hdfs.server .datanode.DataNode :DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020) : 8
2012-03 -05 13:01:41,508 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode :Le nombre de connexions actives est :7

12/03/05 13:01:41 INFO regionserver .HRegion :table utilisateur en ligne, utilisateur1964968041,1330944848231.dd89596e9129e1caa7e07f8a491c9734.; next sequenceid=62920
2012-03-05 13:01:41,618 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode :Le nombre de connexions actives est :6
2012-03-05 13 :01:41,621 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode :DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020) :Le nombre de connexions actives est : 7 =DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Nombre de connexions actives :7
12/03/05 13:01:41 INFO regionserver.HRegion :table utilisateur en ligne ,utilisateur515290649,1330944849739.d23924dc9e9d5891f332c337977af83d.; next sequenceid=62926
2012-03-05 13:01:41,832 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode :Le nombre de connexions actives est :6
2012-03-05 13 :01:41,838 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode :DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020) :Le nombre de connexions actives est :7 next sequenceid=62929

2012-03-05 14:01:39,711 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode :Le nombre de connexions actives est :4
2012 -03-05 22:48:41,945 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode :DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Le nombre de connexions actives est :4 prochain sequenceid=62929
2012-03-05 22:48:41,963 DEBUG -50010-1321352233772, infoPort=50075, ipcPort=50020) :le nombre de connexions actives est :4

Vous pouvez voir comment les régions sont ouvertes les unes après les autres, mais ce que vous remarquerez peut-être également, c'est que le nombre de connexions actives ne grimpe jamais à 22 – il atteint à peine 10. Pourquoi donc? Pour mieux comprendre cela, nous devons voir comment les fichiers dans HDFS sont mappés à l'instance de DataXceiver côté serveur - et les threads réels qu'ils représentent.

Hadoop Plongée en profondeur

Les DFSInputStream et DFSOutputStream susmentionnés sont vraiment des façades autour des concepts de flux habituels. Ils enveloppent la communication client-serveur dans ces interfaces Java standard, tout en acheminant en interne le trafic vers un DataNode sélectionné - qui est celui qui contient une copie du bloc actuel. Il a la liberté d'ouvrir et de fermer ces connexions au besoin. Lorsqu'un client lit un fichier dans HDFS, les classes de la bibliothèque cliente basculent de manière transparente d'un bloc à l'autre, et donc de DataNode à DataNode, il doit donc ouvrir et fermer les connexions selon les besoins.

Le DFSInputStream a une instance d'une classe DFSClient.BlockReader , qui ouvre la connexion au DataNode. L'instance de flux appelle blockSeekTo() pour chaque appel à read() qui s'occupe d'ouvrir la connexion, s'il n'y en a pas déjà. Une fois qu'un bloc est complètement lu, la connexion est fermée. La fermeture du flux a bien sûr le même effet.

Le DFSOutputStream a une classe d'assistance similaire, le DataStreamer. Il suit la connexion au serveur, qui est initiée par la méthode nextBlockOutputStream(). Il a d'autres classes internes qui aident à écrire les données de bloc, que nous omettons ici par souci de brièveté.

L'écriture et la lecture de blocs nécessitent un thread pour contenir le socket et les données intermédiaires côté serveur, enveloppées dans l'instance DataXceiver. En fonction de ce que fait votre client, vous verrez le nombre de connexions fluctuer autour du nombre de fichiers actuellement consultés dans HDFS.

Revenons à l'énigme HBase ci-dessus :la raison pour laquelle vous ne voyez pas jusqu'à 22 connexions (et plus) au démarrage est que pendant que les régions s'ouvrent, les seules données requises sont le bloc d'informations de HFile. Ce bloc est lu pour obtenir des détails vitaux sur chaque fichier, puis refermé. Cela signifie que la ressource côté serveur est libérée en succession rapide. Les quatre connexions restantes sont plus difficiles à déterminer. Vous pouvez utiliser JStack pour vider tous les threads sur le DataNode, qui dans cet exemple affiche cette entrée :

"DataXceiver pour le client /127.0.0.1:64281 [bloc d'envoi blk_5532741233443227208_4201]" démon prio=5 tid=7fb96481d000 nid=0x1178b4000 exécutable [1178b3000]
java.lang.Thread.State :RUNNABLE

"DataXceiver pour le client /127.0.0.1:64172 [bloc de réception blk_-2005512129579433420_4199 client=DFSClient_hb_rs_10.0.0.29 ,60020,1330984111693_1330984118810]" démon prio=5 tid=7fb966109000 nid=0x1169cb000 exécutable [1169ca000]
java.lang.Thread.State :RUNNABLE

Ce sont les seules entrées DataXceiver (dans cet exemple), donc le nombre dans le groupe de threads est un peu trompeur. Rappelez-vous que le thread du démon DataXceiverServer représente déjà une entrée supplémentaire, qui, combinée aux deux ci-dessus, représente les trois connexions actives - ce qui signifie en fait trois threads actifs. La raison pour laquelle le journal indique quatre à la place est qu'il enregistre le nombre d'un thread actif qui est sur le point de se terminer. Ainsi, peu de temps après que le décompte de quatre est enregistré, c'est en fait un de moins, c'est-à-dire trois et correspond donc à notre nombre de threads actifs.

Notez également que les classes d'assistance internes, telles que PacketResponder, occupent un autre thread dans le groupe tout en étant actives. La sortie de JStack indique ce fait, répertoriant le thread comme tel :

 "PacketResponder 0 for Block blk_-2005512129579433420_4199" démon prio=5 tid=7fb96384d000 nid=0x116ace000 dans Object.wait () [116acd000]
java.lang.Thread.State :TIMED_WAITING (sur le moniteur d'objet)
sur java.lang.Object.wait (méthode native)
sur org.apache.hadoop. hdfs.server.datanode.BlockReceiver$PacketResponder \
.lastDataNodeRun(BlockReceiver.java:779)
– verrouillé (un org.apache.hadoop.hdfs.server.datanode.BlockReceiver$PacketResponder)
sur org.apache.hadoop.hdfs.server.datanode.BlockReceiver$PacketResponder.run(BlockReceiver.java:870)
sur java.lang.Thread.run(Thread.java:680)

Ce fil de discussion est actuellement à l'état TIMED_WAITING et n'est pas considéré comme actif. C'est pourquoi le décompte émis par les instructions de journal DataXceiver n'inclut pas ce type de threads. S'ils deviennent actifs en raison de l'envoi de données par le client, le nombre de threads actifs augmentera à nouveau. Une autre chose à noter est que ce thread n'a pas besoin d'une connexion séparée, ou socket, entre le client et le serveur. Le PacketResponder n'est qu'un thread côté serveur pour recevoir des données de bloc et les transmettre au DataNode suivant dans le pipeline d'écriture.

La commande Hadoop fsck a également une option pour signaler les fichiers actuellement ouverts en écriture :

$ hadoop fsck /hbase -openforwrite
FSCK démarré par larsgeorge à partir de /10.0.0.29 pour le chemin / hbase au lundi 5 mars 22:59:47 CET 2012
……/hbase/.logs/10.0.0.29,60020,1330984111693/10.0.0.29%3A60020.1330984118842 0 octets, 1 bloc(s), OPENFORWRITE :………………………………..Statut :HEALTHY
Taille totale :     2088783626 B
Nombre total de répertoires :     54
Nombre total de fichiers :   45

Cela ne concerne pas immédiatement un thread côté serveur occupé, car ceux-ci sont alloués par ID de bloc. Mais vous pouvez en déduire qu'il y a un bloc ouvert pour l'écriture. La commande Hadoop dispose d'options supplémentaires pour imprimer les fichiers réels et l'ID de bloc qui les composent :

$ hadoop fsck /hbase -files -blocks
FSCK démarré par larsgeorge à partir de /10.0.0.29 pour chemin /hbase au mar 06 mars 10:39:50 CET 2012


/hbase/.META./1028785192/.tmp


/hbase/.META./1028785192/info
/hbase/.META./1028785192/info/4027596949915293355 36517 octets, 1 bloc(s) :  OK
0. blk_5532741233443227208_4201 len=36517 repl=1


Statut :HEALTHY
Taille totale :     2088788703 B
Total dirs :     54
Total de fichiers :     45 (Fichiers en cours d'écriture :1)
Total de blocs (validés) :     64 (taille de bloc moyenne 32637323 B) (Total de blocs de fichiers ouverts (non validés) :1)
Blocs peu répliqués :     64 (100,0 %)

Cela vous donne deux choses. Tout d'abord, le résumé indique qu'il existe un bloc de fichiers ouvert au moment de l'exécution de la commande - correspondant au nombre indiqué par l'option "-openforwrite" ci-dessus. Deuxièmement, la liste des blocs à côté de chaque fichier vous permet de faire correspondre le nom du thread au fichier contenant le bloc auquel vous accédez. Dans cet exemple, le bloc avec l'ID "blk_5532741233443227208_4201" est envoyé du serveur au client, ici un RegionServer. Ce bloc appartient au HBase .META. table, comme indiqué par la sortie de la commande Hadoop fsck. La combinaison de JStack et fsck peut remplacer lsof (un outil sur la ligne de commande Linux pour "lister les fichiers ouverts").

Le JStack signale également qu'il existe un thread DataXceiver, accompagné d'un PacketResponder, pour l'ID de bloc "blk_-2005512129579433420_4199", mais cet ID est absent de la liste des blocs signalés par fsck. En effet, le bloc n'est pas encore terminé et n'est donc pas disponible pour les lecteurs. En d'autres termes, Hadoop fsck ne signale que les blocs complets (ou synchronisés[7][8], pour la version Hadoop qui prend en charge cette fonctionnalité).

Retour à HBase

L'ouverture de toutes les régions ne nécessite pas autant de ressources sur le serveur que prévu. Si vous parcourez l'intégralité de la table HBase, vous forcez HBase à lire tous les blocs de tous les HFiles :

Shell HBase :

hbase(main):003:0> scan 'usertable'

1000000 ligne(s) en 1460,3120 secondes

Journal DataNode :

2012-03-05 14:42:20,580 DEBUG org.apache.hadoop.hdfs.server.datanode. DataNode : le nombre de connexions actives est de : 6 -03-05 14:43:23,299 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode :DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Le nombre de connexions actives est :8

2012-03-05 14:49:24,332 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode :DatanodeRegistration(127.0. 0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020) :le nombre de connexions actives est :11
2012-03-05 14:49:24,332 DEBUG org .apache.hadoop.hdfs.server.datanode.DataNode :le nombre de connexions actives est :10
2012-03-05 14:49:59,987 DEBUG org.apache.hadoop.hdfs.server.datanod e.DataNode :Le nombre de connexions actives est :11
2012-03-05 14:51:12,603 ​​DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode :DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020) :le nombre de connexions actives est :12
2012-03-05 14:51:12,605 DEBUG org.apache.hadoop.hdfs .server.datanode.DataNode :Le nombre de connexions actives est :11
2012-03-05 14:51:46,473 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode :Le nombre de connexions actives est :12

2012-03-05 14:56:59,420 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode :Le nombre de connexions actives est :15
2012-03-05 14:57:31,722 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode :le nombre de connexions actives est :16
2012-03-05 14:58:24,909 DEBUG org.apache.hadoop.hdfs. server.datanode.DataNode :DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Numéro d'acte ive connexions est :17
2012-03-05 14:58:24,910 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode : le nombre de connexions actives est :16

2012-03-05 15:04:17,688 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode :Le nombre de connexions actives est :21
2012-03-05 15:04:17,689 DEBUG org.apache .hadoop.hdfs.server.datanode.DataNode :DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020) : 22
2012-03-05 15:04:54,545 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode :Le nombre de connexions actives est :21
2012-03-05 15:05:55,901 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode :DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020) :le nombre de connexions actives est :22
2012-03-05 15:05:55,901 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode :Le nombre de connexions actives est :21

Le nombre de connexions actives atteint désormais l'insaisissable 22. Notez que ce nombre inclut déjà le thread du serveur, nous sommes donc encore un peu en deçà de ce que nous pourrions considérer comme le maximum théorique - basé sur le nombre de fichiers que HBase doit gérer.

Qu'est-ce que tout cela signifie ?

Alors, de combien de "xcievers (sic)" avez-vous besoin ? Étant donné que vous n'utilisez que HBase, vous pouvez simplement surveiller la métrique "storefiles" ci-dessus (que vous obtenez également via Ganglia ou JMX) et ajouter quelques pour cent pour les fichiers journaux intermédiaires et à écriture anticipée. Cela devrait fonctionner pour les systèmes en mouvement. Cependant, si vous deviez déterminer ce nombre sur un système inactif et entièrement compacté et supposer qu'il s'agit du maximum, vous pourriez trouver ce nombre trop faible une fois que vous commencez à ajouter plus de fichiers de magasin lors des vidages réguliers du magasin de mémoire, c'est-à-dire dès que vous commencez à ajouter des données aux tables HBase. Ou si vous utilisez également MapReduce sur ce même cluster, l'agrégation de journaux Flume, etc. Vous devrez tenir compte de ces fichiers supplémentaires et, plus important encore, des blocs ouverts pour la lecture et l'écriture.

Notez à nouveau que les exemples de cet article utilisent un seul DataNode, quelque chose que vous n'aurez pas sur un vrai cluster. À cette fin, vous devrez diviser le nombre total de fichiers de magasin (selon la métrique HBase) par le nombre de DataNodes dont vous disposez. Si vous avez, par exemple, un nombre de fichiers de magasin de 1000 et que votre cluster a 10 DataNodes, alors vous devriez être d'accord avec la valeur par défaut de 256 threads xceiver par DataNode.

Le pire des cas serait le nombre de tous les lecteurs et écrivains actifs, c'est-à-dire ceux qui envoient ou reçoivent actuellement des données. Mais comme cela est difficile à déterminer à l'avance, vous voudrez peut-être envisager de construire dans une réserve décente. De plus, étant donné que le processus d'écriture nécessite un thread supplémentaire - bien que de durée plus courte - (pour le PacketResponder), vous devez également en tenir compte. Donc une formule raisonnable, mais plutôt simpliste pourrait être :

Cette formule tient compte du fait que vous avez besoin d'environ deux threads pour un rédacteur actif et un autre pour un lecteur actif. Ceci est ensuite additionné et divisé par le nombre de DataNodes, puisque vous devez spécifier le "dfs.datanode.max.xcievers" par DataNode.

Si vous revenez à la capture d'écran HBase RegionServer ci-dessus, vous avez vu qu'il y avait 22 fichiers de magasin. These are immutable and will only be read, or in other words occupy one thread only. For all memstores that are flushed to disk you need two threads – but only until they are fully written. The files are finalized and closed for good, cleaning up any thread in the process. So these come and go based on your flush frequency. Same goes for compactions, they will read N files and write them into a single new one, then finalize the new file. As for the write-ahead logs, these will occupy a thread once you have started to add data to any table. There is a log file per server, meaning that you can only have twice as many active threads for these files as you have RegionServers.

For a pure HBase setup (HBase plus its own HDFS, with no other user), we can estimate the number of needed DataXceiver’s with the following formula:

Since you will be hard pressed to determine the active number of store files, flushes, and so on, it might be better to estimate the theoretical maximum instead. This maximum value takes into account that you can only have a single flush and compaction active per region at any time. The maximum number of logs you can have active matches the number of RegionServers, leading us to this formula:

Obviously, the number of store files will increase over time, and the number of regions typically as well. Same for the numbers of servers, so keep in mind to adjust this number over time. In practice, you can add a buffer of, for example, 20%, as shown in the formula below – in an attempt to not force you to change the value too often.

On the other hand, if you keep the number of regions fixed per server[9], and rather split them manually, while adding new servers as you grow, you should be able to keep this configuration property stable for each server.

Final Advice &TL;DR

Here is the final formula you want to use:

It computes the maximum number of threads needed, based on your current HBase vitals (no. of store files, regions, and region servers). It also adds a fudge factor of 20% to give you room for growth. Keep an eye on the numbers on a regular basis and adjust the value as needed. You might want to use Nagios with appropriate checks to warn you when any of the vitals goes over a certain percentage of change.

Note:Please make sure you also adjust the number of file handles your process is allowed to use accordingly[10]. This affects the number of sockets you can use, and if that number is too low (default is often 1024), you will get connection issues first.

Finally, the engineering devil on one of your shoulders should already have started to snicker about how horribly non-Erlang-y this is, and how you should use an event driven approach, possibly using Akka with Scala[11] – if you want to stay within the JVM world. Bear in mind though that the clever developers in the community share the same thoughts and have already started to discuss various approaches[12][13].

Links:

  • [1] http://old.nabble.com/Re%3A-xceiverCount-257-exceeds-the-limit-of-concurrent-xcievers-256-p20469958.html
  • [2] http://ccgtech.blogspot.com/2010/02/hadoop-hdfs-deceived-by-xciever.html
  • [3] https://issues.apache.org/jira/browse/HDFS-1861 “Rename dfs.datanode.max.xcievers and bump its default value”
  • [4] https://issues.apache.org/jira/browse/HDFS-1866 “Document dfs.datanode.max.transfer.threads in hdfs-default.xml”
  • [5] http://hbase.apache.org/book.html#dfs.datanode.max.xcievers
  • [6] http://www.oracle.com/technetwork/java/hotspotfaq-138619.html#threads_oom
  • [7] https://issues.apache.org/jira/browse/HDFS-200 “In HDFS, sync() not yet guarantees data available to the new readers”
  • [8] https://issues.apache.org/jira/browse/HDFS-265 “Revisit append”
  • [9] http://search-hadoop.com/m/CBBoV3z24H1 “HBase, mail # user – region size/count per regionserver”
  • [10] http://hbase.apache.org/book.html#ulimit “ulimit and nproc”
  • [11] http://akka.io/ “Akka”
  • [12] https://issues.apache.org/jira/browse/HDFS-223 “Asynchronous IO Handling in Hadoop and HDFS”
  • [13] https://issues.apache.org/jira/browse/HDFS-918 “Use single Selector and small thread pool to replace many instances of BlockSender for reads”