spark 操作 hbase 之写入 hbase

教程 潘牛 ⋅ 于 2021-06-02 16:57:21 ⋅ 1375 阅读

1 概述

在大数据的应用场景中,hbase常用在实时读写。

写入 HBase 的方法大致有以下几种:

​ 1)Java 调用 HBase 原生 API,HTable.add(List(Put))。

​ 2)使用 TableOutputFormat 作为输出。

​ 3)Bulk Load,先将数据按照 HBase 的内部数据格式生成持久化的 HFile 文件,然后复制到合适的位置并通知 RegionServer ,即完成海量数据的入库。其中生成 Hfile 这一步可以选择 MapReduce 或 Spark。

其中:

​ 前两种适合实时写入hbase。

​ 第三种适合将大批量的数据一次性的导入hbase。

spark没有读写hbase的api,如果想用spark操作hbase表,需要参考java和MapReduce操作hbase的api。

2 Java 调用 HBase 原生 API

用表操作对象的put(list) 批量插入数据

package com.hainiu.sparkhbase
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, HTable, Put}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object SparkHbaseTablePuts {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkHbaseTablePuts")
    val sc: SparkContext = new SparkContext(conf)
    val rdd: RDD[Int] = sc.parallelize(20 until 30, 2)
    // 一个分区创建一个hbase连接,批量写入,效率高
    rdd.foreachPartition(it =>{
      // 把每个Int 转成 Put对象
      val puts: Iterator[Put] = it.map(f => {
        // 创建Put对象
        val put: Put = new Put(Bytes.toBytes(s"spark_puts_${f}"))
        put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("count"), Bytes.toBytes(s"${f}"))
        put
      })
      val hbaseConf: Configuration = HBaseConfiguration.create()
      var conn: Connection = null
      var table: HTable = null
      try{
        // 创建hbase连接
        conn = ConnectionFactory.createConnection(hbaseConf)
        // 创建表操作对象
        table = conn.getTable(TableName.valueOf("panniu:spark_user")).asInstanceOf[HTable]
        // 通过隐式转换,将scala的List转成javaList
        import scala.collection.convert.wrapAsJava.seqAsJavaList
        // 一个分区的数据批量写入
        table.put(puts.toList)
      }catch {
        case e:Exception => e.printStackTrace()
      }finally {
        table.close()
        conn.close()
      }
    })
  }
}

建hbase表:

file

写入后查询:

file

3 使用 TableOutputFormat 作为输出

用TableOutputFormat 来实现写入数据

file

package com.hainiu.sparkhbase
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, HTable, Put}
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object SparkHbaseTableWrite {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkHbaseTableWrite")
    val sc: SparkContext = new SparkContext(conf)
    val rdd: RDD[Int] = sc.parallelize(30 until 40, 2)
    // Int --> (NullWritable, Put)
    val hbaseWriteRdd: RDD[(NullWritable, Put)] = rdd.map(f => {
      // 创建Put对象
      val put: Put = new Put(Bytes.toBytes(s"spark_write_${f}"))
      put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("count"), Bytes.toBytes(s"${f}"))
      (NullWritable.get(), put)
    })
    // 创建带有hbase连接的Hadoop Configuration对象
    val hbaseConf: Configuration = HBaseConfiguration.create()
    // 设置写入hbase的表名
    hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, "panniu:spark_user")
    // 借助于mapreduce的Job对象添加参数配置
    val job: Job = Job.getInstance(hbaseConf)
    job.setOutputFormatClass(classOf[TableOutputFormat[NullWritable]])
    job.setOutputKeyClass(classOf[NullWritable])
    job.setOutputValueClass(classOf[Put])
    // 当输出数据没有输出目录时,用这个api
    hbaseWriteRdd.saveAsNewAPIHadoopDataset(job.getConfiguration)
  }
}

file

4 BulkLoad

先把数据转成 hfile 文件, 导入到 hbase 表。

基于下面代码实现的,通过调用这个方法,给 job 对象里面的配置对象设置了生成 hfile 文件的参数。

HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), table.getRegionLocator()); 

查看 configureIncrementalLoad() 的底层代码

file

file

查看 PutSortReducer 类, 发现输出的key和value都是有序的。

file

对于Spark来说,就比较直接, 直接输出 最终格式

keyout : ImmutableBytesWritable 有序

valueout:KeyValue 有序

创建hbase表:

file

程序:

package com.hainiu.sparkhbase
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, HTable}
import org.apache.hadoop.hbase.{HBaseConfiguration, KeyValue, TableName}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import scala.collection.mutable.ListBuffer
object SparkHbaseBulkload {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkHbaseBulkload")
    // 开启Kryo序列化
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    val sc: SparkContext = new SparkContext(conf)
    val hbaseTableName:String = "panniu:spark_load"
    val rdd: RDD[Int] = sc.parallelize(10 until 50, 2)
    // rdd[Int] --> rdd[(HbaseSortKey, KeyValue)]
    val rdd2: RDD[(HbaseSortKey, KeyValue)] = rdd.mapPartitions(it => {
      val list = new ListBuffer[(HbaseSortKey, KeyValue)]
      it.foreach(f => {
        val rowkey: String = s"spark_load_${f}"
        val w: ImmutableBytesWritable = new ImmutableBytesWritable(Bytes.toBytes(rowkey))
        val kv1: KeyValue = new KeyValue(Bytes.toBytes(rowkey), Bytes.toBytes("cf"), Bytes.toBytes("count"), Bytes.toBytes(s"${f}"))
        val kv2: KeyValue = new KeyValue(Bytes.toBytes(rowkey), Bytes.toBytes("cf"), Bytes.toBytes("name"), Bytes.toBytes(s"name${f}"))
        list += ((new HbaseSortKey(w, kv1), kv1))
        list += ((new HbaseSortKey(w, kv2), kv2))
      })
      list.iterator
    })
    // 通过 PutSortReducer 分析发现 输出的 (ImmutableBytesWritable, KeyValue)都需要排序,
    // 所以就搞个二次排序key HbaseSortKey 实现二次排序逻辑
    // rdd[(HbaseSortKey, KeyValue)].sortByKey
    val writeHfileRdd: RDD[(ImmutableBytesWritable, KeyValue)] = rdd2.sortByKey().map(f =>(f._1.rowkey, f._2))
    // 写入文件
    val outputPath:String = "/tmp/spark/hbase_bulk_output"
    import com.hainiu.util.MyPredef.string2HdfsUtil
    outputPath.deleteHdfs
    // 创建带有Hbase配置的Configuration对象
    val hbaseConf: Configuration = HBaseConfiguration.create()
    // 用job来设置参数
    val job: Job = Job.getInstance(hbaseConf)
    val conn: Connection = ConnectionFactory.createConnection(hbaseConf)
    val table: HTable = conn.getTable(TableName.valueOf(hbaseTableName)).asInstanceOf[HTable]
    // 通过调用这个方法,给job对象里面的配置对象设置了生成hfile文件的参数
    HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor, table.getRegionLocator)
    // 写入hfile文件
    writeHfileRdd.saveAsNewAPIHadoopFile(outputPath,
      classOf[ImmutableBytesWritable],
      classOf[KeyValue],
      classOf[HFileOutputFormat2],
      job.getConfiguration)
  }
}
// 要实现二次排序的key
class HbaseSortKey(val rowkey:ImmutableBytesWritable, val kv: KeyValue) extends Ordered[HbaseSortKey]{
  override def compare(that: HbaseSortKey): Int = {
    if(this.rowkey.compareTo(that.rowkey) == 0){
      KeyValue.COMPARATOR.compare(this.kv, that.kv)
    }else{
      this.rowkey.compareTo(that.rowkey)
    }
  }
}

生成hfile文件:

file

将hfile文件上传到hdfs上,并执行hbase导入

file

导入后查看:

file

版权声明:原创作品,允许转载,转载时务必以超链接的形式表明出处和作者信息。否则将追究法律责任。来自海汼部落-潘牛,http://hainiubl.com/topics/75561
回复数量: 0
    暂无评论~~
    • 请注意单词拼写,以及中英文排版,参考此页
    • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`, 更多语法请见这里 Markdown 语法
    • 支持表情,可用Emoji的自动补全, 在输入的时候只需要 ":" 就可以自动提示了 :metal: :point_right: 表情列表 :star: :sparkles:
    • 上传图片, 支持拖拽和剪切板黏贴上传, 格式限制 - jpg, png, gif,教程
    • 发布框支持本地存储功能,会在内容变更时保存,「提交」按钮点击时清空
    Ctrl+Enter