Quelques ajustements mineurs sont nécessaires pour attaquer les tables HBase à partir d'un contexte Spark.
Commençons par créer rapidement un exemple de table "t1" HBase avec 40 lignes
[root@sandbox ~]# cat hbase_load.txtcreate 't1', 'f1'for i in '1'..'10' do \for j in '1'..'2' do \ for k in '1'..'2' do \rnd=(0...64).map { (65 + rand(26)).chr }.joinput 't1', "#{i}-#{ j}-#{k}", "f1:#{j}#{k}", "#{rnd}"end \end \end[root@sandbox ~]# cat hbase_load.txt |hbase shell
Vous devez ajuster votre chemin de classe Spark (goyave 14 nécessaire, donc inclus le premier que j'ai trouvé):
[root@sandbox ~]# export SPARK_CLASSPATH=/usr/hdp/current/spark-client/lib/hbase-common.jar:/usr/hdp/current/spark-client/lib/hbase- client.jar:/usr/hdp/current/spark-client/lib/hbase-protocol.jar:/usr/hdp/current/spark-client/lib/hbase-server.jar:/etc/hbase/conf:/ usr/hdp/2.3.2.0-2950/oozie/share/lib/spark/guava-14.0.1.jar[root@sandbox ~]# spark-shell --master yarn-client
En remarque, le SPARK_CLASSPATH est obsolète dans Spark 1.5.x+, vous devez donc utiliser à la place
[root@sandbox ~]# spark-shell --master yarn-client --driver-class-path=/ usr/hdp/current/spark-client/lib/hbase-common.jar:/usr/hdp/current/spark-client/lib/hbase-client.jar:/usr/hdp/current/spark-client/lib/ hbase-protocol.jar:/usr/hdp/current/spark-client/lib/hbase-hadoop2-compat.jar:/usr/hdp/current/spark-client/lib/hbase-server.jar:/etc/hbase /conf:/usr/hdp/2.3.2.0-2950/oozie/share/lib/spark/guava-14.0.1.jar
J'ai rencontré des bogues en utilisant le précédent :[…] Causé par :java.lang.IllegalStateException :données de bloc non lues, j'ai donc utilisé la première version (en utilisant SPARK_CLASSPATH). apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}import org.apache.hadoop.hbase.client.{HBaseAdmin, Result}import org.apache.hadoop.hbase.io.ImmutableBytesWritableimport org.apache.hadoop.hbase.mapreduce.TableInputFormatval tableName ="t1"val hconf =HBaseConfiguration.create()hconf.set(TableInputFormat.INPUT_TABLE, "t1")val hBaseRDD =sc.newAPIHadoopRDD(hconf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])println ("records found :" + hBaseRDD.count())[...]2016-04-07 18:44:40,553 INFO [main] scheduler.DAGScheduler :Job 0 terminé :compte à
scala> val admin =new HBaseAdmin(hconf)admin.listTables
Et si vous voulez créer une table :val tableDesc =new HTableDescriptor(tableName)admin.createTable(tableDesc)