Cet article de blog a été publié sur Hortonworks.com avant la fusion avec Cloudera. Certains liens, ressources ou références peuvent ne plus être exacts.
Nous sommes fiers d'annoncer l'aperçu technique de Spark-HBase Connector, développé par Hortonworks en collaboration avec Bloomberg.
Le connecteur Spark-HBase exploite l'API de source de données (SPARK-3247) introduite dans Spark-1.2.0. Il comble le fossé entre le simple magasin HBase Key Value et les requêtes SQL relationnelles complexes et permet aux utilisateurs d'effectuer des analyses de données complexes sur HBase à l'aide de Spark. Un DataFrame HBase est un DataFrame Spark standard, et est capable d'interagir avec toutes les autres sources de données telles que Hive, ORC, Parquet, JSON, etc.
Contexte
Il existe plusieurs connecteurs Spark HBase open source disponibles sous forme de packages Spark, de projets indépendants ou dans le tronc HBase.
Spark est passé aux API Dataset/DataFrame, qui fournit une optimisation intégrée du plan de requête. Désormais, les utilisateurs finaux préfèrent utiliser l'interface basée sur les DataFrames/Datasets.
Le connecteur HBase dans le tronc HBase a un support riche au niveau RDD, par ex. BulkPut, etc., mais son support DataFrame n'est pas aussi riche. Le connecteur de jonction HBase s'appuie sur le HadoopRDD standard avec le format de table d'entrée intégré à HBase qui présente certaines limitations de performances. De plus, BulkGet exécuté dans le pilote peut être un point de défaillance unique.
Il existe d'autres implémentations alternatives. Prenez Spark-SQL-on-HBase par exemple. Il applique des techniques d'optimisation personnalisées très avancées en intégrant son propre plan d'optimisation des requêtes dans le moteur Spark Catalyst standard, expédie le RDD à HBase et effectue des tâches complexes, telles que l'agrégation partielle, à l'intérieur du coprocesseur HBase. Cette approche est capable d'atteindre des performances élevées, mais elle est difficile à maintenir en raison de sa complexité et de l'évolution rapide de Spark. Autoriser également l'exécution de code arbitraire à l'intérieur d'un coprocesseur peut poser des risques de sécurité.
Le connecteur Spark-on-HBase (SHC) a été développé pour surmonter ces goulots d'étranglement et faiblesses potentiels. Il implémente l'API Spark Datasource standard et exploite le moteur Spark Catalyst pour l'optimisation des requêtes. En parallèle, le RDD est construit à partir de zéro au lieu d'utiliser TableInputFormat afin d'atteindre des performances élevées. Avec ce RDD personnalisé, toutes les techniques critiques peuvent être appliquées et entièrement mises en œuvre, telles que l'élagage des partitions, l'élagage des colonnes, le refoulement des prédicats et la localité des données. La conception rend la maintenance très facile, tout en réalisant un bon compromis entre performance et simplicité.
Architecture
Nous supposons que Spark et HBase sont déployés dans le même cluster et que les exécuteurs Spark sont colocalisés avec les serveurs de région, comme illustré dans la figure ci-dessous.
Figure 1. Architecture de connecteur Spark-on-HBase
À un niveau élevé, le connecteur traite à la fois Scan et Get de la même manière, et les deux actions sont effectuées dans les exécuteurs. Le pilote traite la requête, agrège les analyses/obtentions en fonction des métadonnées de la région et génère des tâches par région. Les tâches sont envoyées aux exécuteurs préférés colocalisés avec le serveur de région et sont exécutées en parallèle dans les exécuteurs pour obtenir une meilleure localité et simultanéité des données. Si une région ne contient pas les données requises, aucune tâche n'est affectée à ce serveur de région. Une tâche peut consister en plusieurs scans et BulkGets, et les demandes de données par une tâche sont récupérées à partir d'un seul serveur de région, et ce serveur de région sera également la préférence de localité pour la tâche. Notez que le pilote n'est pas impliqué dans l'exécution réelle du travail, sauf pour la planification des tâches. Cela évite que le conducteur ne soit le goulot d'étranglement.
Catalogue des tableaux
Pour amener la table HBase en tant que table relationnelle dans Spark, nous définissons un mappage entre les tables HBase et Spark, appelé Table Catalog. Il y a deux parties essentielles de ce catalogue. L'une est la définition de la clé de ligne et l'autre est le mappage entre la colonne de table dans Spark et la famille de colonnes et le qualificateur de colonne dans HBase. Veuillez vous référer à la section Utilisation pour plus de détails.
Prise en charge native d'Avro
Le connecteur prend en charge le format Avro de manière native, car il est très courant de conserver des données structurées dans HBase sous forme de tableau d'octets. L'utilisateur peut conserver l'enregistrement Avro directement dans HBase. En interne, le schéma Avro est automatiquement converti en un type de données Spark Catalyst natif. Notez que les deux parties clé-valeur d'une table HBase peuvent être définies au format Avro. Veuillez vous référer aux exemples/cas de test dans le référentiel pour une utilisation exacte.
Réduction des prédicats
Le connecteur récupère uniquement les colonnes requises du serveur de région pour réduire la surcharge du réseau et éviter un traitement redondant dans le moteur Spark Catalyst. Les filtres HBase standard existants sont utilisés pour effectuer le push-down de prédicat sans tirer parti de la capacité du coprocesseur. Étant donné que HBase n'est pas conscient du type de données, à l'exception du tableau d'octets, et de l'incohérence d'ordre entre les types primitifs Java et le tableau d'octets, nous devons prétraiter la condition de filtre avant de définir le filtre dans l'opération d'analyse pour éviter toute perte de données. À l'intérieur du serveur de région, les enregistrements ne correspondant pas à la condition de requête sont filtrés.
Élagage des partitions
En extrayant la clé de ligne des prédicats, nous divisons le Scan/BulkGet en plusieurs plages qui ne se chevauchent pas, seuls les serveurs de région qui ont les données demandées effectueront Scan/BulkGet. Actuellement, l'élagage de la partition est effectué sur la première dimension des clés de ligne. Par exemple, si une clé de ligne est "key1:key2:key3", l'élagage de la partition sera basé sur "key1" uniquement. Notez que les conditions WHERE doivent être définies avec soin. Sinon, l'élagage de la partition peut ne pas prendre effet. Par exemple, WHERE rowkey1> "abc" OR column ="xyz" (où rowkey1 est la première dimension de la rowkey et la colonne est une colonne hbase régulière) entraînera une analyse complète, car nous devons couvrir toutes les plages car du OU logique.
Localité des données
Lorsqu'un exécuteur Spark est colocalisé avec des serveurs de région HBase, la localité des données est obtenue en identifiant l'emplacement du serveur de région et s'efforce de colocaliser la tâche avec le serveur de région. Chaque exécuteur effectue Scan/BulkGet sur la partie des données co-localisées sur le même hôte.
Analyser et obtenir en masse
Ces deux opérateurs sont exposés aux utilisateurs en spécifiant WHERE CLAUSE, par exemple, WHERE colonne> x et colonne
Utilisation
Ce qui suit illustre la procédure de base sur la façon d'utiliser le connecteur. Pour plus de détails et des cas d'utilisation avancés, tels qu'Avro et la prise en charge des clés composites, veuillez vous référer aux exemples dans le référentiel.
1) Définissez le catalogue pour le mappage de schéma :
[code language="scala"]def catalog =s"""{ |"table":{"namespace":"default", "name":"table1"}, |"rowkey":"key" , |"columns":{ |"col0":{"cf":"rowkey", "col":"key", "type":"string"}, |"col1":{"cf":"cf1 ", "col":"col1", "type":"booléen"}, |"col2":{"cf":"cf2", "col":"col2", "type":"double"}, |"col3":{"cf":"cf3", "col":"col3", "type":"float"}, |"col4":{"cf":"cf4", "col":" col4", "type":"int"}, |"col5":{"cf":"cf5", "col":"col5", "type":"bigint"}, |"col6":{" cf":"cf6", "col":"col6", "type":"smallint"}, |"col7":{"cf":"cf7", "col":"col7", "type":"string"}, |"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"} |} |}""".stripMargin[/code]
2) Préparez les données et remplissez la table HBase :
classe de cas HBaseRecord(col0 :chaîne, col1 :booléen, col2 :double, col3 :flottant, col4 : entier, col5 :long, col6 :court, col7 :chaîne, col8 :octet)object HBaseRecord {def apply(i :Int, t :String):HBaseRecord ={ val s =s”””row${“%03d”.format(i)}””” HBaseRecord(s, i % 2 ==0, i.toDouble, i.toFloat, i, i.toLong, i.toShort, s"String$i :$t", i.toByte) }}
val data =(0 à 255).map { i => HBaseRecord(i, "extra")}
sc.parallelize(data).toDF.write.options(
Map(HBaseTableCatalog.tableCatalog -> catalogue, HBaseTableCatalog.newTable -> "5"))
.format("org.apache.spark. sql.execution.datasources.hbase")
.save()
3) Chargez le DataFrame :
def withCatalog(cat :String) :DataFrame ={
sqlContext
.read
.options(Map(HBaseTableCatalog.tableCatalog->cat))
.format( "org.apache.spark.sql.execution.datasources.hbase")
.load()
}val df =avecCatalogue(catalogue)
4) Requête intégrée à la langue :
val s =df.filter((($"col0″ <="row050″ &&$"col0"> "row040") ||
$"col0″ ==="row005" ||
$"col0″ ==="ligne020" ||
$"col0″ === "r20" ||
$"col0″ <="ligne005") &&
($"col4″ ===1 ||
$"col4″ ===42))
.select("col0", "col1", "col4")
s .montrer5) Requête SQL :
df.registerTempTable("table")
sqlContext.sql("select count(col1) from table").showConfiguration du paquet Spark
Les utilisateurs peuvent utiliser le connecteur Spark-on-HBase en tant que package Spark standard. Pour inclure le package dans votre application Spark, utilisez :
spark-shell, pyspark ou spark-submit
> $SPARK_HOME/bin/spark-shell –packages zhzhan:shc:0.0.11-1.6.1-s_2.10
Les utilisateurs peuvent également inclure le package en tant que dépendance dans votre fichier SBT. Le format est le spark-package-name:version
spDependencies +="zhzhan/shc:0.0.11-1.6.1-s_2.10"
Exécution dans un cluster sécurisé
Pour une exécution dans un cluster activé par Kerberos, l'utilisateur doit inclure les jars liés à HBase dans le chemin de classe car la récupération et le renouvellement du jeton HBase sont effectués par Spark et sont indépendants du connecteur. En d'autres termes, l'utilisateur doit lancer l'environnement de manière normale, soit via kinit, soit en fournissant principal/keytab. Les exemples suivants montrent comment s'exécuter dans un cluster sécurisé avec les modes yarn-client et yarn-cluster. Notez que SPARK_CLASSPATH doit être défini pour les deux modes et que l'exemple de fichier jar n'est qu'un espace réservé pour Spark.
export SPARK_CLASSPATH=/usr/hdp/current/hbase-client/lib/hbase-common.jar:/usr/hdp/current/hbase-client/lib/hbase-client.jar:/usr/hdp/current/hbase- client/lib/hbase-server.jar:/usr/hdp/current/hbase-client/lib/hbase-protocol.jar:/usr/hdp/current/hbase-client/lib/guava-12.0.1.jar
Supposons que hrt_qa soit un compte sans tête, l'utilisateur peut utiliser la commande suivante pour kinit :
kinit -k -t /tmp/hrt_qa.headless.keytab hrt_qa
/usr/hdp/current/spark-client/bin/spark-submit –class org.apache.spark.sql.execution.datasources.hbase.examples.HBaseSource –master yarn-client –packages zhzhan:shc:0.0.11- 1.6.1-s_2.10 –num-executors 4 –driver-memory 512m –executor-memory 512m –executor-cores 1 /usr/hdp/current/spark-client/lib/spark-examples-1.6.1.2.4.2. 0-106-hadoop2.7.1.2.4.2.0-106.jar
/usr/hdp/current/spark-client/bin/spark-submit –class org.apache.spark.sql.execution.datasources.hbase.examples.HBaseSource –master yarn-cluster –files /etc/hbase/conf/hbase -site.xml –packages zhzhan:shc:0.0.11-1.6.1-s_2.10 –num-executors 4 –driver-memory 512m –executor-memory 512m –executor-cores 1 /usr/hdp/current/spark- client/lib/spark-examples-1.6.1.2.4.2.0-106-hadoop2.7.1.2.4.2.0-106.jar
Tout mettre ensemble
Nous venons de donner un aperçu rapide de la façon dont HBase prend en charge Spark au niveau DataFrame. Avec l'API DataFrame, les applications Spark peuvent travailler avec des données stockées dans la table HBase aussi facilement que n'importe quelles données stockées dans d'autres sources de données. Avec cette nouvelle fonctionnalité, les données des tables HBase peuvent être facilement consommées par les applications Spark et d'autres outils interactifs, par ex. les utilisateurs peuvent exécuter une requête SQL complexe sur une table HBase à l'intérieur de Spark, effectuer une jointure de table avec Dataframe ou s'intégrer à Spark Streaming pour implémenter un système plus compliqué.
Quelle est la prochaine ?
Actuellement, le connecteur est hébergé dans le référentiel Hortonworks et publié en tant que package Spark. Il est en cours de migration vers le tronc Apache HBase. Au cours de la migration, nous avons identifié quelques bogues critiques dans le tronc HBase, et ils seront corrigés avec la fusion. Le travail de la communauté est suivi par le parapluie HBase JIRA HBASE-14789, y compris HBASE-14795 et HBASE-14796 pour optimiser l'architecture informatique sous-jacente pour Scan et BulkGet, HBASE-14801 pour fournir une interface utilisateur JSON pour une utilisation facile, HBASE-15336 pour le chemin d'écriture DataFrame, HBASE-15334 pour la prise en charge d'Avro, HBASE-15333 pour prendre en charge les types primitifs Java, tels que short, int, long, float et double, etc., HBASE-15335 pour prendre en charge la clé de ligne composite et HBASE-15572 pour ajouter une sémantique d'horodatage facultative. Nous sommes impatients de produire une future version du connecteur qui rendra le connecteur encore plus facile à utiliser.
Reconnaissance
Nous tenons à remercier Hamel Kothari, Sudarshan Kadambi et l'équipe de Bloomberg pour nous avoir guidés dans ce travail et nous avoir également aidés à valider ce travail. Nous tenons également à remercier la communauté HBase pour ses commentaires et son amélioration. Enfin, ce travail a tiré les leçons des intégrations précédentes de Spark HBase et nous tenons à remercier leurs développeurs d'avoir ouvert la voie.
Référence :
SHC :https://github.com/hortonworks/shc-release
Paquet Spark :http://spark-packages.org/package/zhzhan/shc
Apache HBase : https://hbase.apache.org/
Apache Spark :http://spark.apache.org/