数据湖 HUDI 基本概念 01

分享 123456789987654321 ⋅ 于 2022-07-13 17:10:17 ⋅ 799 阅读

Hudi

1.数据湖分类

1.Delta Lake

1.Delta Lake:DataBricks公司推出的一种数据湖方案,网址:https://delta.io/
    1.流批一体的Data Lake存储层,支持 update/delete/merge
    2.由于出自Databricks,Spark的所有数据写入方式,包括基于dataframe的批式、流式,以及SQL的Insert、 Insert Overwrite等都是支持的(开源的SQL写暂不支持,EMR做了支持)。
    3.在数据写入方面,Delta 与 Spark 是强绑定的;在查询方面,开源 Delta 目前支持 Spark 与 Presto,但是, Spark 是不可或缺的,因为 delta log 的处理需要用到 Spark

2.Iceberg

2.Apache Iceberg:以类似于SQL的形式高性能的处理大型的开放式表,网址:https://iceberg.apache.org/

file

3.Hudi

3.Apache Hudi:管理大型分析数据集在HDFS上的存储,网址:https://hudi.apache.org/
    主要支持Upserts、Deletes和Incrementa数据处理,支持三种数据写入方式:UPSERT,INSERT 和 BULK_INSERT。

file

2.Hudi 功能

1. Hudi是在大数据存储上的一个数据集,可以将Change Logs通过upsert的方式合并进Hudi; 
2. Hudi 对上可以暴露成一个普通Hive或Spark表,通过API或命令行可以获取到增量修改的信息,继续供下游消费; 
3. Hudi 保管修改历史,可以做时间旅行或回退; 
4. Hudi 内部有主键到文件级的索引,默认是记录到文件的布隆过滤器;

3.Hudi 特性

1. Apache Hudi使得用户能在Hadoop兼容的存储之上存储大量数据,同时它还提供两种原语,不仅可以批处理,还可 以在数据湖上进行流处理。

    (1) Update/Delete记录:Hudi使用细粒度的文件/记录级别索引来支持Update/Delete记录,同时还提供写操作的事务保证。查询 会处理最后一个提交的快照,并基于此输出结果。 
    (2) 变更流:Hudi对获取数据变更提供了一流的支持:可以从给定的时间点获取给定表中已updated/inserted/deleted的所有记录 的增量流,并解锁新的查询姿势(类别)。

4.Hudi 基础架构

1、通过DeltaStreammer、Flink、Spark等工具,将数据摄取到数据湖存储,可使用HDFS作为数据湖的数据存储; 
2、基于HDFS可以构建Hudi的数据湖; 
3、Hudi提供统一的访问Spark数据源和Flink数据源; 
4、外部通过不同引擎,如:Spark、Flink、Presto、Hive、Impala、Aliyun DLA、AWS Redshit访问接口;

file

5.编译源码

5.1下载源码

https://archive.apache.org/dist/hudi/0.9.0/hudi-0.10.0.src.tgz

5.2安装Maven

[root@localhost conf]#  vi /etc/profile
export MAVEN_HOME=/root/hudi/apache-maven-3.5.4
export PATH=$PATH:$MAVEN_HOME/bin
[root@localhost conf]#  source /etc/profile

settings

<localRepository>/root/hudi/m2</localRepository>

<mirror>
    <id>alimaven</id>
    <name>aliyun maven</name>
    <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
    <mirrorOf>central</mirrorOf>
</mirror>
<mirror>
    <id>aliyunmaven</id>
    <mirrorOf>*</mirrorOf>
    <name>阿里云spring沛件仓库</name>
    <url>https://maven.aliyun.com/repository/spring-plugin</url>
</mirror>
<mirror>
    <id>repo2</id>
    <name>Mirror from Maven Repo2</name>
    <url>https://repo.spring.io/plugins-release/</url>
    <mirrorOf>central</mirrorOf>
</mirror>
<mirror>
    <id>UK</id>
    <name>UK Central</name>
    <url>http://uk.maven.org/maven2</url>
    <mirrorOf>central</mirrorOf>
</mirror>
<mirror>
    <id>jboss-public-repository-group</id>
    <name>3Boss Public Repository Group</name>
    <url>http://repository.jboss.org/nexus/content/groups/public</url>
    <mirrorOf>central</mirrorOf>
</mirror>
<mirror>
    <id>CN</id>
    <name>OSChina Central</name>
    <url>http://maven.oschina.net/content/groups/public/</url>
    <mirrorOf>central</mirrorOf>
</mirror>
<mirror>
    <id>google-maven-central</id>
    <name>GCS Maven Central mirror Asia Pacific</name>
    <url>https://maven-central-asia.storage-download.googleapis.com/maven2/</url>
    <mirrorOf>central</mirrorOf>
</mirror>
<mirror>
    <id>confluent</id>
    <name>confluent maven</name>
    <url>http://packages.confluent.io/maven/</url>
    <mirrorOf>confluent</mirrorOf>
</mirror>

5.3安装jdk

#卸载openjdk
[root@localhost hudi]# rpm -qa | grep jdk
[root@localhost hudi]# [root@localhost hudi]# rpm -e --nodeps xxx

#安装jdk
export JAVA_HOME=/usr/java/jdk1.8.0_144
export JRE_HOME=$JAVA_HOME/jre
export PATH=$PATH:$JAVA_HOME/bin
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar

[root@localhost hudi]# source /etc/profile

5.4编译

[root@localhost hudi-0.9.0]# mvn clean install -DskipTests -DskipITs -Dscala-2.12 -Dspark3

6.Hudi CLI测试

[root@localhost hudi]# vim /etc/profile
export HUDI_HOME=/root/hudi/
export PATH=$PATH:$HUDI_HOME/bin

[root@localhost hudi]# source /etc/profile
[root@localhost hudi-cli]# sh ./hudi-cli.sh

7.大数据环境准备

7.1hdfs

tar -zxvf hadoop-2.7.3.tar.gz
#配置环境变量

export HADOOP_HOME=/root/hadoop
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export HADOOP_COMMON_HOME=$HADOOP_HOME
export HADOOP_HDFS_HOME=$HADOOP_HOME
export HADOOP_YARN_HOME=$HADOOP_HOME
export HADOOP_MAPRED_HOME=$HADOOP_HOME
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin

#配置hadoop-env.sh
[root@localhost hadoop]# vim /root/hadoop/etc/hadoop/hadoop-env.sh
export JAVA_HOME=/usr/java/jdk1.8.0_144
export HADOOP_HOME=/root/hadoop

#安装 HDFS
[root@localhost hadoop]# vim core-site.xml
<property>
         <name>fs.defaultFS</name>
         <value>hdfs://192.168.92.161:8020</value>
</property>
<property>
         <name>hadoop.tmp.dir</name>
         <value>/export/server/hadoop/datas/tmp</value>
</property>
<property>
         <name>hadoop.http.staticuser.user</name>
         <value>root</value>
</property>

#创建临时目录
[root@localhost hadoop]# mkdir -p .export.server/hadoop/datas/tmp

#配置hdfs-site.xml

<configuration>
<property>
        <name>dfs.namenode.name.dir</name>
         <value>/export/server/hadoop/datas/dfs/nn</value>
</property>
<property>
         <name>dfs.datanode.data.dir</name>
         <value>/export/server/hadoop/datas/dfs/dn</value>
</property>
<property>
         <name>dfs.replication</name>
         <value>1</value>
</property>
<property>
         <name>dfs.permissions.enabled</name>
         <value>false</value>
</property>
<property>
         <name>dfs.datanode.data.dir.perm</name>
         <value>750</value>
</property>
</configuration>

[root@localhost hadoop]# mkdir -p /export/server/hadoop/datas/dfs/nn
[root@localhost hadoop]# mkdir -p /export/server/hadoop/datas/dfs/dn

#配置slaves
[root@localhost hadoop]# vim slaves

#格式化HDFS
[root@localhost hadoop]# hdfs namenode -format

#启动HDFS集群
[root@localhost hadoop]# hadoop-daemon.sh start namenode
[root@localhost hadoop]# hadoop-daemon.sh start datanode

#关闭防火墙
[root@localhost hadoop]# systemctl stop firewalld
[root@localhost hadoop]# systemctl disable firewalld

#http://192.168.92.161:50070

7.2安装spark3

#1.安装Scala-2.12.10
[root@localhost ~]# tar -zxvf ./scala-2.12.10.tgz -C /export/server/
[root@localhost ~]# ln -s /export/server/scala-2.12.10/ /export/server/scala
[root@localhost ~]# vim /etc/profile
export SCALA_HOME=/export/server/scala
export PATH=$PATH:$SCALA_HOME/bin
[root@localhost ~]# source /etc/profile
[root@localhost ~]# scala -version

#2.安装 Spark 3.x
[root@localhost ~]# cd /export/server/spark/conf/
[root@localhost conf]# mv spark-env.sh.template spark-env.sh
JAVA_HOME=/usr/java/jdk1.8.0_144
SCALA_HOME=/export/server/scala
HADOOP_CONF_DIR=/export/server/hadoop/etc/hadoop

#3.本地模式启动spark-shell
[root@localhost spark]# bin/spark-shell --master local[2]

#4.测试
[root@localhost spark]# hdfs dfs -mkdir -p /datas/
[root@localhost spark]# hdfs dfs -put /export/server/spark/README.md /datas

8.spark-shell

#通过制定jar的方式启动
/export/server/spark/bin/spark-shell \
--master local[2] \
--jars /root/hudi-spark3-bundle_2.12-0.9.0.jar,\
/root/spark-avro_2.12-3.0.1.jar,/root/spark_unused-1.0.0.jar \
--conf "spark.serializer=org.apache.spark.serializer.KryoSerializer"

测试

//1.
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
//将数据保存到hudi表
val tableName = "hudi₋trips_cow"
val basePath = "hdfs://192.168.92.161:8020/datas/hudi-warehouse/hudi_trips_cow"
val dataGen = new DataGenerator
val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts,2))
df.printSchema()
df.select("rider","begin_lat","begin_lon", "driver", "fare", "uuid", "ts").show(10,truncate=false)
//数据查询  将模拟产生Trip数据,保存到Hudi表中,由于Hudi诞生时基于Spark框架,所以SparkSQL支持Hudi数据源,直接通 过format指定数据源Source,设置相关属性保存数据即可  可以通过:paste 粘贴模式执行  然后通过ctrl + d 触发
df.write
.mode(Overwrite)
.format("hudi")
.options(getQuickstartWriteConfigs)
.option(PRECOMBINE_FIELD_OPT_KEY, "ts")
.option(RECORDKEY_FIELD_OPT_KEY, "uuid")
.option(PARTITIONPATH_FIELD_OPT_KEY,"partitionpath")
.option(TABLE_NAME, tableName)
.save(basePath)
//参数说明
    //1.参数:getQuickstartWriteConfigs,设置写入/更新数据至Hudi时,Shuffle时分区数目
    //2.参数:PRECOMBINE_FIELD_OPT_KEY,数据合并时,依据主键字段
    //3.参数:RECORDKEY_FIELD_OPT_KEY,每条记录的唯一id,支持多个字段
    //4.参数:PARTITIONPATH_FIELD_OPT_KEY,用于存放数据的分区字段
//Hudi表数据存储在HDFS上,以PARQUET列式方式存储的

//读取hudi数据  "/*/*/*/*" 从0.9.0版本之后可以不用写,代表分区
val tripsSnapshotDF = spark.read.format("hudi").load(basePath + "/*/*/*/*")
tripsSnapshotDF.printSchema()

|-- _hoodie_commit_time: string (nullable = true) //数据提交时间
 |-- _hoodie_commit_seqno: string (nullable = true) //数据提交序列号
 |-- _hoodie_record_key: string (nullable = true) //数据的rowkey
 |-- _hoodie_partition_path: string (nullable = true) //数据存储路径
 |-- _hoodie_file_name: string (nullable = true) //数据的文件名称
 |-- begin_lat: double (nullable = true)
 |-- begin_lon: double (nullable = true)
 |-- driver: string (nullable = true)
 |-- end_lat: double (nullable = true)
 |-- end_lon: double (nullable = true)
 |-- fare: double (nullable = true)
 |-- partitionpath: string (nullable = true)
 |-- rider: string (nullable = true)
 |-- ts: long (nullable = true)
 |-- uuid: string (nullable = true)

//sql查询
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("select fare from hudi_trips_snapshot").show()

9. Hudi 数据管理

9.1.hudi表结构

1..hoodie 文件:由于CRUD的零散性,每一次的操作都会生成一个文件,这些小文件越来越多后,会严重影响HDFS的 性能,Hudi设计了一套文件合并机制。 .hoodie文件夹中存放了对应的`文件合并操作相关的日志文件`。
2.amricas和asia相关的路径是`实际的数据文件`,按分区存储,分区的路径key是可以指定的。

9.1.1.hoodie文件

Hudi把随着时间流逝,对表的一系列CRUD操作叫做Timeline,Timeline中某一次的操作,叫做Instant
    Instant Action,记录本次操作是一次数据提交COMMITS,还是文件合并COMPACTION,或者是文件清理(CLEANS); 
    Instant Time,本次操作发生的时间;
    State,操作的状态,发起(REQUESTED),进行中(INFLIGHT),还是已完成(COMPLETED);

9.1.2.数据文件

Hudi真实的数据文件使用Parquet文件格式存储
其中包含一个metadata元数据文件和数据文件parquet列式存储。
Hudi为了实现数据的CRUD,需要能够唯一标识一条记录,Hudi将把数据集中的`唯一字段(record key ) + 数据所在分区 (partitionPath)` 联合起来当做`数据的唯一键`。

1.在根目录下,每个分区都有唯一的分区路径,每个分区数据存储在多个文件中。
2.每个文件都有惟一的fileId和生成文件的commit所标识。如果发生更新操作时,多个文件共享相同的fileId,但会 有不同的commit

数据存储概述

每个文件都有惟一的fileId和生成文件的commit所标识。如果发生更新操作时,多个文件共享相同的fileId,但会 有不同的commit

Metadata 元数据

以时间轴(timeline)的形式将数据集上的各项操作元数据维护起来,以支持数据集的瞬态视图,这部分元数据存 储于根目录下的元数据目录。一共有三种类型的元数据: 
1.Commits:一个单独的commit包含对数据集之上一批数据的一次原子写入操作的相关信息。我们用单调递增的时间戳来标识 commits,标定的是一次写入操作的开始。 
2.Cleans:用于清除数据集中不再被查询所用到的旧版本文件的后台活动。 
3.Compactions:用于协调Hudi内部的数据结构差异的后台活动。例如,将更新操作由基于行存的日志文件归集到列存数据上

Index 索引

Hudi维护着一个索引,以支持在记录key存在情况下,将新记录的key快速映射到对应的fileId。 
    Bloom filter:存储于数据文件页脚。默认选项,不依赖外部系统实现。数据和索引始终保持一致。  
    Apache HBase :可高效查找一小批key。在索引标记期间,此选项可能快几秒钟

Data 数据

Hudi以两种不同的存储格式存储所有摄取的数据,用户可选择满足下列条件的任意数据格式: 
    `读`优化的列存格式(ROFormat): 缺省值为Apache `Parquet`; 
    `写`优化的行存格式(WOFormat): 缺省值为Apache `Avro`;

10.IDEA 编程开发

10.1插入Hudi表,采用COW模式

main方法

    def main(args: Array[String]): Unit = {
        // 创建SparkSession实例对象,设置属性
        val spark: SparkSession = {
            SparkSession.builder()
                .appName(this.getClass.getSimpleName.stripSuffix("$"))
                .master("local[2]")
                // 设置序列化方式:Kryo
                .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
                .getOrCreate()
        }

        // 定义变量:表名称、保存路径
        val tableName: String = "tbl_trips_cow"
        val tablePath: String = "/hudi-warehouse/tbl_trips_cow"

        // 构建数据生成器,模拟产生业务数据
        import org.apache.hudi.QuickstartUtils._

        // 模拟数据,插入Hudi表,采用COW模式
        insertData(spark, tableName, tablePath)
        // 应用结束,关闭资源
        spark.stop()
    }

保存数据

    def insertData(spark: SparkSession, table: String, path: String): Unit = {
        import spark.implicits._

        // 第1步、模拟乘车数据
        import org.apache.hudi.QuickstartUtils._

        val dataGen: DataGenerator = new DataGenerator()
        val inserts = convertToStringList(dataGen.generateInserts(100))

        import scala.collection.JavaConverters._
        val insertDF: DataFrame = spark.read.json(
            spark.sparkContext.parallelize(inserts.asScala, 2).toDS()
        )
        insertDF.printSchema()
        insertDF.show(10, truncate = false)

        //TODO 第2步、插入数据到Hudi表
        import org.apache.hudi.DataSourceWriteOptions._
        import org.apache.hudi.config.HoodieWriteConfig._
        insertDF.write
            .mode(SaveMode.Append)
            .format("hudi")
            .option("hoodie.insert.shuffle.parallelism", "2")
            .option("hoodie.upsert.shuffle.parallelism", "2")
            // Hudi 表的属性值设置
            .option(PRECOMBINE_FIELD.key(), "ts")
            .option(RECORDKEY_FIELD.key(), "uuid") //主键
            .option(PARTITIONPATH_FIELD.key(), "partitionpath") //分区
            .option(TBL_NAME.key(), table) //表名称
            .save(path)
    }

10.2.1快照方式查询数据

//采用Snapshot Query快照方式查询表的数据
    def queryData(spark: SparkSession, path: String): Unit = {
        import spark.implicits._

        val tripsDF: DataFrame = spark.read.format("hudi").load(path)
        tripsDF.printSchema()
        tripsDF.show(10, truncate = false)

        // 查询费用大于20,小于50的乘车数据
        tripsDF
            .filter($"fare" >= 20 && $"fare" <= 50)
            .select($"driver", $"rider", $"fare", $"begin_lat", $"begin_lon", $"partitionpath", $"_hoodie_commit_time")
            .orderBy($"fare".desc, $"_hoodie_commit_time".desc)
            .show(20, truncate = false)
    }

10.2.2指定字符串,按照日期时间过滤获取数据

def queryDataByTime(spark: SparkSession, path: String): Unit = {
        import org.apache.spark.sql.functions._

        // 方式一:指定字符串,按照日期时间过滤获取数据
        val df1 = spark.read
            .format("hudi")
            .option("as.of.instant", "20211225152016")
            .load(path)
            .sort(col("_hoodie_commit_time").desc)
        df1.printSchema()
        df1.show(numRows = 5, truncate = false)

        // 方式二:指定字符串,按照日期时间过滤获取数据
        val df2 = spark.read
            .format("hudi")
            .option("as.of.instant", "2021-12-25 15:20:16")
            .load(path)
            .sort(col("_hoodie_commit_time").desc)
        df2.printSchema()
        df2.show(numRows = 5, truncate = false)
    }

10.3更新数据

//更新(Update)数据,第1步、模拟产生数据,第2步、模拟产生数据,针对第1步数据字段值更新,第3步、将数据更新到Hudi表中
def updateData(spark: SparkSession, table: String, path: String, dataGen: DataGenerator): Unit = {
        import spark.implicits._

        // 第1步、模拟乘车数据
        import org.apache.hudi.QuickstartUtils._
        val updates = convertToStringList(dataGen.generateUpdates(100))

        import scala.collection.JavaConverters._
        val updateDF: DataFrame = spark.read.json(
            spark.sparkContext.parallelize(updates.asScala, 2).toDS()
        )

        //TODO: 第2步、插入数据到Hudi表
        import org.apache.hudi.DataSourceWriteOptions._
        import org.apache.hudi.config.HoodieWriteConfig._
        updateDF.write
            .mode(SaveMode.Append)
            .format("hudi")
            .option("hoodie.insert.shuffle.parallelism", "2")
            .option("hoodie.upsert.shuffle.parallelism", "2")
            // Hudi 表的属性值设置
            .option(PRECOMBINE_FIELD.key(), "ts")
            .option(RECORDKEY_FIELD.key(), "uuid")
            .option(PARTITIONPATH_FIELD.key(), "partitionpath")
            .option(TBL_NAME.key(), table)
            .save(path)
    }

10.4增量查询

//增量查询Incremental query
//当Hudi中表的类型为:COW时,支持2种方式查询:Snapshot Queries、Incremental Queries;
//默认情况下查询属于:Snapshot Queries快照查询,通过参数:hoodie.datasource.query.type 可以进行设置。

// 1.设置查询数据模式为:incremental,增量读取
// 2.设置增量读取数据时开始时间

def incrementalQueryData(spark: SparkSession, path: String): Unit = {
        import spark.implicits._

        // 第1步、加载Hudi表数据,获取commit time时间,作为增量查询数据阈值
        import org.apache.hudi.DataSourceReadOptions._
        spark.read
            .format("hudi")
            .load(path)
            .createOrReplaceTempView("view_temp_hudi_trips")
        val commits: Array[String] = spark
            .sql(
                """
                  |select
                  |  distinct(_hoodie_commit_time) as commitTime
                  |from
                  |  view_temp_hudi_trips
                  |order by
                  |  commitTime DESC
                  |""".stripMargin
            )
            .map(row => row.getString(0))
            .take(50)
        val beginTime = commits(commits.length - 1) // commit time we are interested in
        println(s"beginTime = ${beginTime}")

        // 第2步、设置Hudi数据CommitTime时间阈值,进行增量数据查询
        val tripsIncrementalDF = spark.read
            .format("hudi")
            // 设置查询数据模式为:incremental,增量读取
            .option(QUERY_TYPE.key(), QUERY_TYPE_INCREMENTAL_OPT_VAL)
            // 设置增量读取数据时开始时间
            .option(BEGIN_INSTANTTIME.key(), beginTime)
            .load(path)

        // 第3步、将增量查询数据注册为临时视图,查询费用大于20数据
        tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")
        spark
            .sql(
                """
                  |select
                  |  `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts
                  |from
                  |  hudi_trips_incremental
                  |where
                  |  fare > 20.0
                  |""".stripMargin
            )
            .show(10, truncate = false)
    }

10.5删除数据

使用DataGenerator数据生成器,基于已有数据构建要删除的数据,最终保存到Hudi表中,需要设置属性参数: hoodie.datasource.write.operation 值为:delete。
/**
 * 删除Hudi表数据,依据主键UUID进行删除,如果是分区表,指定分区路径
 */
def deleteData(spark: SparkSession, table: String, path: String): Unit = {
    import spark.implicits._

    // 第1步、加载Hudi表数据,获取条目数
    val tripsDF: DataFrame = spark.read.format("hudi").load(path)
    println(s"Raw Count = ${tripsDF.count()}")

    // 第2步、模拟要删除的数据,从Hudi中加载数据,获取几条数据,转换为要删除数据集合
    val dataframe = tripsDF.limit(2).select($"uuid", $"partitionpath")
    import org.apache.hudi.QuickstartUtils._

    val dataGenerator = new DataGenerator()
    val deletes = dataGenerator.generateDeletes(dataframe.collectAsList())

    import scala.collection.JavaConverters._
    val deleteDF = spark.read.json(spark.sparkContext.parallelize(deletes.asScala, 2))

    // 第3步、保存数据到Hudi表中,设置操作类型:DELETE
    import org.apache.hudi.DataSourceWriteOptions._
    import org.apache.hudi.config.HoodieWriteConfig._
    deleteDF.write
        .mode(SaveMode.Append)
        .format("hudi")
        .option("hoodie.insert.shuffle.parallelism", "2")
        .option("hoodie.upsert.shuffle.parallelism", "2")
        // 设置数据操作类型为delete,默认值为upsert
        .option(OPERATION.key(), "delete")
        .option(PRECOMBINE_FIELD.key(), "ts")
        .option(RECORDKEY_FIELD.key(), "uuid")
        .option(PARTITIONPATH_FIELD.key(), "partitionpath")
        .option(TBL_NAME.key(), table)
        .save(path)

    // 第4步、再次加载Hudi表数据,统计条目数,查看是否减少2条数据
    val hudiDF: DataFrame = spark.read.format("hudi").load(path)
    println(s"Delete After Count = ${hudiDF.count()}")
}

11.基本概念

11.1时间轴Timeline

Hudi 核心:在所有的表中维护了一个包含`在不同的即时(Instant)时间对数据集操作`(比如新增、修改或删除) 的时间轴(Timeline)。

在每一次对Hudi表的数据集操作时都会在`该表的Timeline上生成一个Instant`,从而可以实现在仅查询某个时间点 之后成功提交的数据,或是仅查询某个时间点之前的数据,有效避免了扫描更大时间范围的数据。

同时,可以高效地只查询更改前的文件(如在某个Instant提交了更改操作后,仅query某个时间点之前的数据,则 仍可以query修改前的数据)。

Timeline 是 Hudi 用来管理提交(commit)的抽象,每个 commit 都绑定一个固定时间戳,分散到时间线上。
在 Timeline 上,每个 commit 被抽象为一个 HoodieInstant,一个 instant 记录了一次提交 (commit) 的行为 、时间戳、和状态。

上图中采用时间(小时)作为分区字段,从 10:00 开始陆续产生各种 commits,10:20 来了一条 9:00 的数据, 该数据仍然可以落到 9:00 对应的分区,通过 timeline 直接消费 10:00 之后的增量更新(只消费有新 commits 的 group),那么这条延迟的数据仍然可以被消费到。

时间轴(Timeline)的实现类(位于hudi-common-xx.jar中),时间轴相关的实现类位于org.apache.hudi.common.table.timeline包下

11.2文件管理

在每个分区内,文件被组织为文件组,由文件id充当唯一标识。每个文件组包含多个文件切片,其中每个切片包含 在某个即时时间的提交/压缩生成的基本列文件(.parquet)以及一组日志文件(.log),该文件包含自生成基本 文件以来对基本文件的插入/更新

Hudi 的 base file (parquet 文件) 在 footer 的 meta 去记录了 record key 组成的 BloomFilter,用于在 file based index 的实现中实现高效率的 key contains 检测。 

Hudi 的 log (avro 文件)是自己编码的,通过积攒数据 buffer 以 LogBlock 为单位写出,每个 LogBlock 包 含 magic number、size、content、footer 等信息,用于数据读、校验和过滤

11.3索引Index

Hudi通过索引机制提供高效的Upsert操作,该机制会将一个RecordKey+PartitionPath组合的方式作为唯一标识映 射到一个文件ID,而且这个唯一标识和文件组/文件ID之间的映射自记录被写入文件组开始就不会再改变。 
    全局索引:在全表的所有分区范围下强制要求键保持唯一,即确保对给定的键有且只有一个对应的记录。 
    非全局索引:仅在表的某一个分区内强制要求键保持唯一,它依靠写入器为同一个记录的更删提供一致的分区路径。

11.4hudi数据管理

12.表的存储类型

12.1数据计算模型

12.1.1批式模型( Batch)

批式模型就是使用 MapReduce、Hive、Spark 等典型的批计算引擎,以小时任务或者天任务的形式来做数据计算。 
    1、延迟:小时级延迟或者天级别延迟。这里的延迟不单单指的是定时任务的时间,在数据架构里,这里的延迟时间通常是定 时任务间隔时间 + 一系列依赖任务的计算时间 + 数据平台最终可以展示结果的时间。数据量大、逻辑复杂的情况下,小时任 务计算的数据通常真正延迟的时间是 2-3 小时。 
    2、数据完整度:数据较完整。以处理时间为例,小时级别的任务,通常计算的原始数据已经包含了小时内的所有数据,所以 得到的数据相对较完整。但如果业务需求是事件时间,这里涉及到终端的一些延迟上报机制,在这里,批式计算任务就很难派 上用场。 
    3、成本:成本很低。只有在做任务计算时,才会占用资源,如果不做任务计算,可以将这部分批式计算资源出让给在线业务 使用。从另一个角度来说成本是挺高的,如原始数据做了一些增删改查,数据晚到的情况,那么批式任务是要全量重新计算。

12.1.2流式模型(Stream)

流式模型,典型的就是使用 Flink 来进行实时的数据计算。 
    1、延迟:很短,甚至是实时。
    2、数据完整度:较差。因为流式引擎不会等到所有数据到齐之后再开始计算,所以有一个 watermark 的概念,当数据的时间 小于 watermark 时,就会被丢弃,这样是无法对数据完整度有一个绝对的报障。在互联网场景中,流式模型主要用于活动时 的数据大盘展示,对数据的完整度要求并不算很高。在大部分场景中,用户需要开发两个程序,一是流式数据生产流式结果, 二是批式计算任务,用于次日修复实时结果。 
    3、成本:很高。因为流式任务是常驻的,并且对于多流 Join 的场景,通常要借助内存或者数据库来做 state 的存储,不管 是序列化开销,还是和外部组件交互产生的额外 IO,在大数据量下都是不容忽视的。

12.3增量模型(Incremental)

针对批式和流式的优缺点,Uber 提出了增量模型(Incremental Mode),相对批式来讲,更加实时;相对流式而 言,更加经济。
增量模型,简单来讲,是以 mini batch 的形式来跑准实时任务。Hudi 在增量模型中支持了两个最重要的特性:
    1、Upsert:这个主要是解决批式模型中,数据不能插入、更新的问题,有了这个特性,可以往 Hive 中写入增量数据,而不 是每次进行完全的覆盖。(Hudi 自身维护了 key->file 的映射,所以当 upsert 时很容易找到 key 对应的文件) 
    2、Incremental Query:增量查询,减少计算的原始数据量。以 Uber 中司机和乘客的数据流 Join 为例,每次抓取两条数据 流中的增量数据进行批式的 Join 即可,相比流式数据而言,成本要降低几个数量级

12.2增量模型查询方式

12.2.1.Snapshot Queries(快照查询)

    查询某个增量提交操作中数据集的最新快照,先进行动态合并最新的基本文件(Parquet)和增量文件(Avro)来提供近实时数据 集(通常会存在几分钟的延迟)。 
    读取所有 partiiton 下每个 FileGroup 最新的 FileSlice 中的文件,Copy On Write 表读 parquet 文件,Merge On Read 表读 parquet + log 文件

12.2.2.Incremental Queries(增量查询)

    仅查询新写入数据集的文件,需要`指定一个Commit/Compaction的即时时间`(位于Timeline上的某个Instant)作为条件,来`查询此条件之后的新数据`。
    可查看自给定commit/delta commit即时操作以来新写入的数据,有效的提供变更流来启用增量数据管道。

12.2.3.Read Optimized Queries(读优化查询)

    1.直接查询基本文件(数据集的最新快照),其实就是列式文件(Parquet)。并保证与非Hudi列式数据集相比,具有相同的列式查询性能。 
    2.可查看给定的commit/compact即时操作的表的最新快照。 
    3.读优化查询和快照查询相同仅访问基本文件,提供给定文件片自上次执行压缩操作以来的数据。通常查询数据的最新程度的保证取决于压缩策略 

12.2.4.总结

表类型 支持的查询类型
copy onwrite 快照,增量查询
merge on read 快照,增量查询,读优化查询

13.hudi表类型

13.1 cow(适合读)

`Copy On Write 表`
`在写入数据的时候,复制一份原来的拷贝,在其基础上添加新数据`
优点:读取时,只读取对应分区的一个数据文件即可,较为高效; 
缺点:数据写入的时候,需要复制一个先前的副本再在其基础上生成新的数据文件,这个过程比较耗时。
COW表主要使用列式文件格式(Parquet)存储数据,`在写入数据过程中,执行同步合并,更新数据版本并重写数据文件`,类似RDBMS中的B-Tree更新。 
    1)、更新update:在更新记录时,Hudi会先找到包含更新数据的文件,然后再使用更新值(最新的数据)重写该文件,包含 其他记录的文件保持不变。当突然有大量写操作时会导致重写大量文件,从而导致极大的I/O开销。 
    2)、读取read:在读取数据时,通过读取最新的数据文件来获取最新的更新,此存储类型适用于少量写入和大量读取的场景

13.2MOR

`Merge On Read`
简称MOR,新插入的数据存储在delta log 中,定期再将delta log合并进行parquet数据文件。 
读取数据时,会将delta log跟老的数据文件做merge,得到完整的数据返回。下图演示了MOR的两种数据读写方式。 

13.3对比

对于写时复制(COW)和读时合并(MOR)writer来说,Hudi的WriteClient是相同的。
COW表,用户在 snapshot 读取的时候会扫描所有最新的 FileSlice 下的 base file。
MOR表,在 READ OPTIMIZED 模式下,只会读最近的经过 compaction 的 commit。

14.数据写操作流程

在Hudi数据湖框架中支持三种方式写入数据:`UPSERT(插入更新)`、`INSERT(插入)`和`BULK INSERT(写排序)`。 
UPSERT:默认行为,数据先通过 index 打标(INSERT/UPDATE),有一些启发式算法决定消息的组织以优化文件的大小 
INSERT:跳过 index,写入效率更高 
BULK_INSERT:写排序,对大数据量的 Hudi 表初始化友好,对文件大小的限制 best effort(写 HFile)

14.1UPSERT 写流程

`Copy On Write类型表,UPSERT 写入流程`
1.第一步、先对 records 按照 record key 去重; 
2.第二步、首先对这批数据创建索引 (HoodieKey => HoodieRecordLocation);通过索引区分哪些 records 是update,哪些 records 是 insert(key 第一次写入); 
3.第三步、对于 update 消息,会直接找到对应 key 所在的最新 FileSlice 的 base 文件,并做 merge 后写新的 base file (新的 FileSlice); 
4.第四步、对于 insert 消息,会扫描当前 partition 的所有 SmallFile(小于一定大小的 base file),然后 merge 写新的 FileSlice;如果没有 SmallFile,直接写新的 FileGroup + FileSlice;

`Merge On Read类型表,UPSERT 写入流程`
1.第一步、先对 records 按照 record key 去重(可选) 
2.第二步、首先对这批数据创建索引 (HoodieKey => HoodieRecordLocation);通过索引区分哪些 records 是 update,哪些 records 是 insert(key 第一次写入) 
3.第三步、如果是 insert 消息,如果 log file 不可建索引(默认),会尝试 merge 分区内最小的 base file (不包含 log file 的 FileSlice),生成新的 FileSlice;如果没有 base file 就新写一个 FileGroup + FileSlice + base file;如果 log file 可建索引,尝试 append 小的 log file,如果没有就新写一个 FileGroup + FileSlice + base file 
4.第四步、如果是 update 消息,写对应的 file group + file slice,直接 append 最新的 log file(如果碰巧是当前最小 的小文件,会 merge base file,生成新的 file slice)log file 大小达到阈值会 roll over 一个新的

14.2INSERT 写流程

`Copy On Write类型表,INSERT 写入流程` 
1.第一步、先对 records 按照 record key 去重(可选);
2.第二步、不会创建 Index; 
3.第三步、如果有小的 base file 文件,merge base file,生成新的 FileSlice + base file,否则直接写新的 FileSlice + base file; 

`Merge On Read类型表,INSERT 写入流程`
1.第一步、先对 records 按照 record key 去重(可选); 
2.第二步、不会创建 Index; 
3.第三步、如果 log file 可索引,并且有小的 FileSlice,尝试追加或写最新的 log file;如果 log file 不可索引,写一 个新的 FileSlice + base file;
版权声明:原创作品,允许转载,转载时务必以超链接的形式表明出处和作者信息。否则将追究法律责任。来自海汼部落-123456789987654321,http://hainiubl.com/topics/75866
回复数量: 0
    暂无评论~~
    • 请注意单词拼写,以及中英文排版,参考此页
    • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`, 更多语法请见这里 Markdown 语法
    • 支持表情,可用Emoji的自动补全, 在输入的时候只需要 ":" 就可以自动提示了 :metal: :point_right: 表情列表 :star: :sparkles:
    • 上传图片, 支持拖拽和剪切板黏贴上传, 格式限制 - jpg, png, gif,教程
    • 发布框支持本地存储功能,会在内容变更时保存,「提交」按钮点击时清空
    Ctrl+Enter