HBase
 sql >> Base de données >  >> NoSQL >> HBase

Création d'une application d'apprentissage automatique avec Cloudera Data Science Workbench et base de données opérationnelle, partie 1 :la configuration et les bases

Présentation

Python est largement utilisé par les ingénieurs de données et les scientifiques de données pour résoudre toutes sortes de problèmes, des pipelines ETL/ELT à la construction de modèles d'apprentissage automatique. Apache HBase est un système de stockage de données efficace pour de nombreux flux de travail, mais l'accès à ces données spécifiquement via Python peut être difficile. Pour les professionnels des données qui souhaitent utiliser les données stockées dans HBase, le récent projet en amont "hbase-connectors" peut être utilisé avec PySpark pour les opérations de base.

Dans cette série de blogs, nous expliquerons comment configurer PySpark et HBase ensemble pour une utilisation de base de Spark ainsi que pour les tâches maintenues dans CDSW. Pour ceux qui ne connaissent pas CDSW, il s'agit d'une plate-forme de science des données d'entreprise sécurisée et en libre-service permettant aux scientifiques des données de gérer leurs propres pipelines d'analyse, accélérant ainsi les projets d'apprentissage automatique de l'exploration à la production. Pour plus d'informations sur CDSW, visitez la page produit de Cloudera Data Science Workbench.

Dans cet article, plusieurs opérations seront expliquées et démontrées avec des exemples de sortie. Pour le contexte, tous les exemples d'opérations dans cet article de blog spécifique sont exécutés avec un déploiement CDSW.

Prérequis :

  1. Avoir un cluster CDP avec HBase et Spark
  2. Si vous allez suivre des exemples via CDSW, vous en aurez besoin installé - Installation de Cloudera Data Science Workbench
  3. Python 3 est installé sur chaque nœud sur le même chemin

Configuration :

Tout d'abord, HBase et Spark doivent être configurés ensemble pour que les requêtes Spark SQL fonctionnent correctement. Pour ce faire, il y a deux parties :premièrement, configurez les serveurs de région HBase via Cloudera Manager; et deuxièmement, assurez-vous que le runtime Spark a des liaisons HBase. Une remarque à garder à l'esprit, cependant, est que Cloudera Manager configure déjà certaines variables de configuration et d'environnement pour pointer automatiquement Spark vers HBase pour vous. Néanmoins, la première étape de configuration des requêtes Spark SQL est commune à tous les types de déploiement sur les clusters CDP, mais la seconde est légèrement différente selon le type de déploiement.

Configuration des serveurs de région HBase

  1. Accédez à Cloudera Manager et sélectionnez le service HBase.
  2. Rechercher « environnement de serveur régional »

  1. Ajouter une nouvelle variable d'environnement à l'aide de l'extrait de configuration avancée de l'environnement RegionServer (soupape de sécurité) :
    • Clé :HBASE_CLASSPATH
    • Valeur :/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar:/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib /hbase-spark-protocol-shaded.jar:/opt/cloudera/parcels/CDH/jars/scala-library-2.11.12.jar
      Assurez-vous d'utiliser les numéros de version appropriés.
  2. Redémarrez les serveurs de région.

Une fois que vous avez suivi les étapes ci-dessus, suivez les étapes ci-dessous selon que vous souhaitez un déploiement CDSW ou non CDSW.

Ajout de liaisons HBase à Spark Runtime dans des déploiements non-CDSW

Pour déployer le shell ou utiliser correctement spark-submit, utilisez les commandes suivantes pour vous assurer que spark dispose des bonnes liaisons HBase.

pyspark –jars /opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-protocol-shaded. pot

spark-submit –jars /opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-protocol- shaded.jar

Ajout de liaisons HBase à Spark Runtime dans les déploiements CDSW

Pour configurer CDSW avec HBase et PySpark, vous devez suivre quelques étapes.

1) Assurez-vous que Python 3 est installé sur chaque nœud de cluster et notez le chemin d'accès

2) Créez un nouveau projet dans CDSW et utilisez un modèle PySpark

3) Ouvrez le projet, allez dans Paramètres -> Moteur -> Variables d'environnement.

4) Définir PYSPARK3_DRIVER_PYTHON et PYSPARK3_PYTHON au chemin où Python est installé sur vos nœuds de cluster (chemin indiqué à l'étape 1).

Vous trouverez ci-dessous un exemple de ce à quoi cela devrait ressembler.

5) Dans votre projet, allez dans Fichiers -> spark-defaults.conf et ouvrez-le dans le Workbench

6) Copiez et collez la ligne ci-dessous dans ce fichier et assurez-vous qu'elle est enregistrée avant de démarrer une nouvelle session.

spark.jars=/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-protocol-shaded.jar

À ce stade, CDSW est maintenant configuré pour exécuter des tâches PySpark sur HBase ! Le reste de cet article de blog fait référence à des exemples d'opérations sur un déploiement CDSW.

Exemples d'opérations 

Opérations Put

Il existe deux façons d'insérer et de mettre à jour des lignes dans HBase. La première méthode, et la plus recommandée, consiste à créer un catalogue, qui est un schéma qui mappera les colonnes d'une table HBase à une trame de données PySpark tout en spécifiant le nom de la table et l'espace de noms. La construction de ce format JSON défini par l'utilisateur est la méthode préférée car elle peut également être utilisée avec d'autres opérations. Pour plus d'informations sur les catalogues, reportez-vous à cette documentation http://hbase.apache.org/book.html#_define_catalog. La deuxième méthode utilise un paramètre de mappage spécifique appelé "hbase.columns.mapping" qui prend juste une chaîne de paires clé-valeur.

  • Utilisation des catalogues
from pyspark.sql import Row
from pyspark.sql import SparkSession

spark = SparkSession\
  .builder\
  .appName("SampleApplication")\
  .getOrCreate()

tableCatalog = ''.join("""{
               "table":{"namespace":"default", "name":"tblEmployee", "tableCoder":"PrimitiveType"},
               "rowkey":"key",
               "columns":{
                 "key":{"cf":"rowkey", "col":"key", "type":"int"},
                 "empId":{"cf":"personal","col":"empId","type":"string"},
                 "empName":{"cf":"personal", "col":"empName", "type":"string"},
                 "empState":{"cf":"personal", "col":"empWeight", "type":"string"}
               }
             }""".split())

employee = [(10, 'jonD', 'Jon Daniels', 'CA'), (6, 'billR', 'Bill Robert', 'FL')]
employeeRDD = spark.sparkContext.parallelize(employee)
employeeMap = employeeRDD.map(lambda x: Row(key=int(x[0]), empId=x[1], empName=x[2], empState=x[3]))
employeeDF = spark.createDataFrame(employeeMap)

employeeDF.write.format("org.apache.hadoop.hbase.spark") \
  .options(catalog=tableCatalog, newTable=5) \
  .option("hbase.spark.use.hbasecontext", False) \
  .save()
# newTable refers to the NumberOfRegions which has to be > 3

Vérifiez qu'une nouvelle table appelée "tblEmployee" est créée dans HBase en ouvrant simplement le shell HBase et en exécutant la commande suivante :

scan ‘tblEmployee’, {‘LIMIT’ => 2}

L'utilisation de catalogues peut également vous permettre de charger facilement des tables HBase. Cela sera discuté dans un prochain article.

  • Utilisation de hbase.columns.mapping

Lors de l'écriture de la trame de données PySpark, une option appelée "hbase.columns.mapping" peut être ajoutée pour inclure une chaîne qui mappe correctement les colonnes. Cette option vous permet uniquement d'insérer des lignes dans des tableaux existants.

Dans le shell HBase, créons d'abord une table create 'tblEmployee2', 'personal'

Maintenant, dans PySpark, insérons 2 lignes en utilisant "hbase.columns.mapping"

from pyspark.sql import Row
from pyspark.sql import SparkSession

spark = SparkSession\
  .builder\
  .appName("SampleApplication")\
  .getOrCreate()

employee = [(10, 'jonD', 'Jon Daniels', 170.7), (6, 'billR', 'Bill Robert', 200.1)]
employeeRDD = spark.sparkContext.parallelize(employee)
employeeMap = employeeRDD.map(lambda x: Row(key=int(x[0]), empId=x[1], empName=x[2], empWeight=float(x[3])))
employeeDF = spark.createDataFrame(employeeMap)


employeeDF.write.format("org.apache.hadoop.hbase.spark") \
       .option("hbase.columns.mapping", "key INTEGER :key, empId STRING personal:empId, empName STRING personal:empName, empWeight FLOAT personal:empWeight") \
       .option("hbase.table", "tblEmployee2") \
       .option("hbase.spark.use.hbasecontext", False) \
       .save()

Encore une fois, vérifiez simplement qu'une nouvelle table appelée "tblEmployee2" contient ces nouvelles lignes.

scanner ‘tblEmployee2’, {‘LIMIT’ => 2}

Cela complète nos exemples sur la façon d'insérer des lignes via PySpark dans les tables HBase. Dans le prochain article, je discuterai des opérations d'obtention et d'analyse, de PySpark SQL et de certains dépannages. Jusque-là, vous devriez obtenir un cluster CDP et parcourir ces exemples.