HBase
 sql >> Base de données >  >> NoSQL >> HBase

Comment:analyser les tables Apache HBase salées avec des plages de clés spécifiques à la région dans MapReduce

Merci à Pengyu Wang, développeur de logiciels à la FINRA, pour l'autorisation de republier ce message.

Les tables Salted Apache HBase avec pré-split sont une solution HBase efficace et éprouvée pour fournir une répartition uniforme de la charge de travail sur les serveurs de région et éviter les points chauds lors des écritures en masse. Dans cette conception, une clé de ligne est créée avec une clé logique plus du sel au début. Une façon de générer du sel consiste à calculer n (nombre de régions) modulo sur le code de hachage de la clé de ligne logique (date, etc.).

Clés de la ligne de salage

Par exemple, une table acceptant le chargement de données quotidiennement peut utiliser des clés de ligne logiques commençant par une date, et nous souhaitons pré-diviser cette table en 1 000 régions. Dans ce cas, nous nous attendons à générer 1 000 sels différents. Le sel peut être généré, par exemple, comme :

StringUtils.leftPad(Integer.toString(Math.abs(keyCore.hashCode() % 1000)), 3, "0") + "|" + logicalKey 

logicalKey = 2015-04-26|abc
rowKey = 893|2015-04-26|abc

La sortie de hashCode() avec modulo fournit un caractère aléatoire pour la valeur du sel de « 000 » à « 999 ». Avec cette transformation de clé, la table est pré-divisée sur les limites de sel lors de sa création. Cela rendra les volumes de ligne uniformément répartis lors du chargement des HFiles avec MapReduce bulkload. Il garantit que les clés de ligne avec le même sel tombent dans la même région.

Dans de nombreux cas d'utilisation, comme l'archivage de données, vous devez numériser ou copier les données sur une plage de clés logiques particulière (plage de dates) à l'aide de la tâche MapReduce. Les travaux MapReduce de table standard sont configurés en fournissant le Scan instance avec des attributs de plage de clés.

Scan scan = new Scan();
scan.setCaching(1000);
scan.setCacheBlocks(false);
scan.setBatch(1000);
scan.setMaxVersions(1);
scan.setStartRow(Bytes.toBytes("2015-04-26"));
scan.setStopRow(Bytes.toBytes("2015-04-27"));

/* Setup the table mapper job */
TableMapReduceUtil.initTableMapperJob(
tablename,
scan,
DataScanMapper.class,
ImmutableBytesWritable.class,
KeyValue.class,
job, 
true, 
TableInputFormat.class
);
…

Cependant, la configuration d'un tel travail devient difficile pour les tables pré-divisées salée. Les clés de ligne de démarrage et d'arrêt seront différentes pour chaque région car chacune a un sel unique. Et nous ne pouvons pas spécifier plusieurs plages à un Scan exemple.

Pour résoudre ce problème, nous devons examiner le fonctionnement de la table MapReduce. Généralement, le framework MapReduce crée une tâche de carte pour lire et traiter chaque fractionnement d'entrée. Chaque fractionnement est généré en InputFormat base de classe, par la méthode getSplits() .

Dans la tâche MapReduce de la table HBase, TableInputFormat est utilisé comme InputFormat . Dans l'implémentation, le getSplits() est remplacée pour récupérer les clés de ligne de démarrage et d'arrêt à partir du Scan exemple. Comme les clés de ligne de début et de fin s'étendent sur plusieurs régions, la plage est divisée par les limites de région et renvoie la liste de TableSplit objets qui couvre la gamme de clés de balayage. Au lieu d'être basé sur le bloc HDFS, TableSplit s sont basés sur la région. En écrasant le getSplits() méthode, nous sommes en mesure de contrôler le TableSplit .

Création d'un format d'entrée de table personnalisé

Pour changer le comportement de getSplits() method, une classe personnalisée étendant TableInputFormat est requis. Le but de getSplits() ici pour couvrir la plage de clés logiques dans chaque région, construisez leur plage de clés de ligne avec leur sel unique. La classe HTable fournit la méthode getStartEndKeys() qui renvoie les clés de ligne de début et de fin pour chaque région. À partir de chaque clé de démarrage, analysez le sel correspondant à la région.

Pair keys = table.getStartEndKeys();
for (int i = 0; i < keys.getFirst().length; i++) {

// The first 3 bytes is the salt, for the first region, start key is empty, so apply “000”
if (keys.getFirst()[i].length == 0) {
regionSalt = "000";
} else {
regionSalt = Bytes.toString(keys.getFirst()[i]).substring(0, 3);
}
…
}

La configuration de la tâche passe la plage de clés logiques

TableInputFormat récupère la clé de démarrage et d'arrêt de Scan exemple. Comme nous ne pouvons pas utiliser Scan dans notre travail MapReduce, nous pourrions utiliser Configuration au lieu de passer ces deux variables et seule la clé de démarrage et d'arrêt logique est suffisante (une variable peut être une date ou d'autres informations commerciales). Le getSplits() la méthode a JobContext argument, L'instance de configuration peut être lue comme context.getConfiguration() .

Dans le pilote MapReduce :

Configuration conf = getConf();
conf = HBaseConfiguration.addHbaseResources(conf);

conf.set("logical.scan.start", "2015-04-26");
conf.set("logical.scan.stop", "2015-04-27");

Dans Custom TableInputFormat :

@Override 
public List getSplits(JobContext context) throws IOException {
conf = context.getConfiguration();
String scanStart = conf.get("logical.scan.start");
String scanStop = conf.get("logical.scan.stop");
…
}

Reconstruire la plage de clés salée par région

Maintenant que nous avons le sel et la clé de démarrage/arrêt logique pour chaque région, nous pouvons reconstruire la plage de clés de ligne réelle.

byte[] startRowKey = Bytes.toBytes(regionSalt + "|" + scanStart);
byte[] endRowKey = Bytes.toBytes(regionSalt + "|" + scanStop);

Création d'un TableSplit pour chaque région

Avec la plage de clés de ligne, nous pouvons maintenant initialiser TableSplit instance pour la région.

List splits = new ArrayList(keys.getFirst().length);
for (int i = 0; i < keys.getFirst().length; i++) {
…
byte[] startRowKey = Bytes.toBytes(regionSalt + "|" + scanStart);
byte[] endRowKey = Bytes.toBytes(regionSalt + "|" + scanStop);

InputSplit split = new TableSplit(table.getTableName(), startRowKey, endRowKey, regionLocation);
splits.add(split);
}

Une autre chose à regarder est la localité des données. Le framework utilise les informations d'emplacement dans chaque fractionnement d'entrée pour attribuer une tâche de carte dans son hôte local. Pour notre TableInputFormat , nous utilisons la méthode getTableRegionLocation() pour récupérer l'emplacement de la région servant la clé de ligne.

Cet emplacement est ensuite transmis au TableSplit constructeur. Cela garantira que le mappeur traitant la division de la table se trouve sur le même serveur de région. Une méthode, appelée DNS.reverseDns() , requiert l'adresse du serveur de noms HBase. Cet attribut est stocké dans la configuration "hbase.nameserver.address ".

this.nameServer = context.getConfiguration().get("hbase.nameserver.address", null);
…

public String getTableRegionLocation(HTable table, byte[] rowKey) throws IOException {
HServerAddress regionServerAddress = table.getRegionLocation(rowKey).getServerAddress();
InetAddress regionAddress = regionServerAddress.getInetSocketAddress().getAddress();
String regionLocation;
try {
regionLocation = reverseDNS(regionAddress);
} catch (NamingException e) {
regionLocation = regionServerAddress.getHostname();
}
return regionLocation;
}

protected String reverseDNS(InetAddress ipAddress) throws NamingException {
String hostName = this.reverseDNSCacheMap.get(ipAddress);
if (hostName == null) {
hostName = Strings.domainNamePointerToHostName(DNS.reverseDns(ipAddress, this.nameServer));
this.reverseDNSCacheMap.put(ipAddress, hostName);
}
return hostName;
}

Un code complet de getSplits ressemblera à ceci :

@Override 
public List getSplits(JobContext context) throws IOException {
conf = context.getConfiguration();
table = getHTable(conf);
if (table == null) {
throw new IOException("No table was provided.");
}

// Get the name server address and the default value is null.
this.nameServer = conf.get("hbase.nameserver.address", null);
String scanStart = conf.get("region.scan.start");
String scanStop = conf.get("region.scan.stop");

Pair keys = table.getStartEndKeys();
if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
throw new RuntimeException("At least one region is expected");
}
List splits = new ArrayList(keys.getFirst().length);
for (int i = 0; i < keys.getFirst().length; i++) {

String regionLocation = getTableRegionLocation(table, keys.getFirst()[i]);

String regionSalt = null;
if (keys.getFirst()[i].length == 0) {
regionSalt = "000";
} else {
regionSalt = Bytes.toString(keys.getFirst()[i]).substring(0, 3);
}
byte[] startRowKey = Bytes.toBytes(regionSalt + "|" + scanStart);
byte[] endRowKey = Bytes.toBytes(regionSalt + "|" + scanStop);

InputSplit split = new TableSplit(table.getTableName(), startRowKey, endRowKey, regionLocation);
splits.add(split);
}
log.info("Total table splits: " + splits.size());
return splits;
}

Utilisez le TableInoutFormat personnalisé dans le pilote MapReduce

Maintenant, nous devons remplacer le TableInputFormat classe avec la construction personnalisée que nous avons utilisée pour la configuration de la tâche MapReduce de table.

Configuration conf = getConf();
conf = HBaseConfiguration.addHbaseResources(conf);
HTableInterface status_table = new HTable(conf, status_tablename);

conf.set("logical.scan.start", "2015-04-26");
conf.set("logical.scan.stop", "2015-04-27");

Scan scan = new Scan();
scan.setCaching(1000);
scan.setCacheBlocks(false);
scan.setBatch(1000);
scan.setMaxVersions(1);

/* Setup the table mapper job */
TableMapReduceUtil.initTableMapperJob(
tablename,
scan,
DataScanMapper.class,
ImmutableBytesWritable.class,
KeyValue.class,
job, 
true, 
MultiRangeTableInputFormat.class
);

L'approche de TableInputFormat personnalisé fournit une capacité d'analyse efficace et évolutive pour les tables HBase conçues pour utiliser du sel pour un chargement de données équilibré. Étant donné que l'analyse peut ignorer toutes les clés de ligne non liées, quelle que soit la taille de la table, la complexité de l'analyse est limitée uniquement à la taille des données cibles. Dans la plupart des cas d'utilisation, cela peut garantir un temps de traitement relativement constant à mesure que la table s'agrandit.