4.Hbase 的计算和 bulkload

教程 DER ⋅ 于 2023-01-19 20:20:21 ⋅ 1557 阅读

Hbase的计算和bulkload

15.mr操作Hbase

15.1 mr读取hbase数据

首先在hbase中准备数据

# 创建分数表
create 'score','info'
# 增加学生数据
put 'score','001','info:name','zhangsan'
put 'score','001','info:score','100'
put 'score','001','info:class','1'
put 'score','002','info:name','lisi'
put 'score','002','info:score','95'
put 'score','002','info:class','1'
put 'score','003','info:name','wangwu'
put 'score','003','info:score','98'
put 'score','003','info:class','2'
put 'score','004','info:name','zhaosi'
put 'score','004','info:score','92'
put 'score','004','info:class','2'

下面读取hbase的案例,使用mr读取数据并且求出每个班级的平均分

读取HBASE中的数据需要继承TableMapper类

class HMapper extends TableMapper<outkey,outvalue>

获取score表的数据然后输出到reducer端进行分组,将班级作为key,然后这班级的数据都会按照key分在一起,我们就可以计算出来这个班级的平均分了

首先我们引入maven依赖

在hbase中去除hadoop的所有依赖,这样就不会出现冲突问题

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.hainiu.hbase</groupId>
    <artifactId>TestHBase</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>2.4.13</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.hadoop</groupId>
                    <artifactId>*</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-mapreduce</artifactId>
            <version>2.4.13</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.hadoop</groupId>
                    <artifactId>*</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-auth</artifactId>
            <version>3.1.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>2.4.13</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.hadoop</groupId>
                    <artifactId>*</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.1.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.1.4</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.30</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>
    </dependencies>
</project>

整体代码如下:

package com.hainiu.hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import java.io.IOException;

public class MapreduceRead {
    public static class HMapper extends TableMapper<Text, IntWritable>{
        Text k = new Text();
        IntWritable v = new IntWritable();
        @Override
        protected void map(ImmutableBytesWritable key, Result value, Mapper<ImmutableBytesWritable, Result, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            byte[] classBytes = value.getValue(Bytes.toBytes("info"), Bytes.toBytes("class"));
            byte[] scoreBytes = value.getValue(Bytes.toBytes("info"), Bytes.toBytes("score"));
            k.set(Bytes.toString(classBytes));
            String score = Bytes.toString(scoreBytes);
            v.set(Integer.valueOf(score));
            System.out.println(k);
            System.out.println(v);
            context.write(k,v);
        }
    }

    public static class HReducer extends Reducer<Text,IntWritable,Text, DoubleWritable> {
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, DoubleWritable>.Context context) throws IOException, InterruptedException {
            int sum = 0;
            int count = 0;
            for (IntWritable value : values) {
                sum += value.get();
                count ++;
            }
            double avg = sum * 1.0 / count;
            context.write(key,new DoubleWritable(avg));
        }
    }

    public static void main(String[] args) throws Exception{
        Configuration conf = HBaseConfiguration.create();
//        conf.set("HADOOP_HOME","/hadoop");
//        conf.set("HBASE_HOME","/hbase");
        Job job = Job.getInstance(conf);
        job.setJarByClass(MapreduceRead.class);

        TableMapReduceUtil.initTableMapperJob(
                "score",new Scan(),HMapper.class,Text.class,IntWritable.class,job
        );

        job.setReducerClass(HReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(DoubleWritable.class);
        TextOutputFormat.setOutputPath(job,new Path("res"));

        job.waitForCompletion(true);
    }
}

结果如下

file

15.2 mr写出数据到hbase中

首先在本地data文件下面准备数据a.txt

hello tom hello jack
hello tom hello jack
hello tom hello jack
hello tom hello jack
hello tom hello jack

统计每个单词的出现次数并且将结果存储到hbase的表中

#在hbase中创建存储单词出现次数的表
create 'wordcount','info'
# 存储数据的时候rowkey设定为单词,info:count 记录单词出现次数

这个时候要存储数据到hbase中那么我们需要在reducer中增加TableReducer的类用于插入hbase中数据

class HReducer extends TableReducer

整体代码如下:

package com.hainiu.hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import java.io.IOException;

public class MapreduceWrite {
    public static class HMapper extends Mapper<LongWritable,Text, Text, IntWritable>{
        Text k = new Text();
        IntWritable v = new IntWritable(1);
        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            String[] strs = value.toString().split(" ");
            for (String str : strs) {
                k.set(str);
                context.write(k,v);
            }
        }
    }

    public static class HReducer extends TableReducer<Text,IntWritable, NullWritable>{
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, NullWritable, Mutation>.Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable value : values) {
                sum += 1;
            }
            Put put = new Put(Bytes.toBytes(key.toString()));
            put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("count"),Bytes.toBytes(sum));
            context.write(NullWritable.get(),put);
        }
    }

    public static void main(String[] args) throws Exception{
        Configuration conf = HBaseConfiguration.create();
        Job job = Job.getInstance(conf);
        job.setJarByClass(MapreduceWrite.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setMapperClass(HMapper.class);

        TextInputFormat.addInputPath(job,new Path("data/a.txt"));

        TableMapReduceUtil.initTableReducerJob("wordcount",HReducer.class,job);

        job.waitForCompletion(true);
    }
}

查看hbase中的数据

file

16.hive操作hbase

首先我们的需要安装hive但是这个hive已经在我们准备的分布式环境中存在了,我们可以直接使用,想学习hive的同学可以去海牛的hive教程中具体学习,我们这边不做概述

一般在查询hbase的数据的时候我们可以直接使用hbase的命令行或者是api进行查询就行了,但是在日常的计算过程中我们一般都不是为了查询,都是在查询的基础上进行二次计算,所以使用hbase的命令是没有办法进行数据计算的,并且对于hbase的压力也会增加很多,hbase的本身并没有提供任何的计算逻辑,所以我们要依赖于mapreducer进行计算,这个代码上面我们已经实现过了,但是后续开发过程中很少有人会直接开发mr程序,这个代码的复杂程度比较高,并且会非常大的拖慢我们的开发速度,所以一般我们都会使用hive以外表的形式操作hbase中的数据,进行多表的管理查询计算或者是进行数据的导入和导出

首先在hive中增加hbase的链接信息

修改hive-site.xml中的值

<property>
 <name>hive.zookeeper.quorum</name>
 <value>nn1,nn2,s1</value>
</property>
<property>
 <name>hive.zookeeper.client.port</name>
 <value>2181</value>
</property>

/usr/local/hadoop/etc/hadoop/mapred-site.xml

<property>
    <name>hbase.zookeeper.quorum</name>
    <value>nn1,nn2,s1</value>
</property>

在hive/conf目录中增加log4j.properties文件输入日志级别设置

log4j.rootLogger=error,console

log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c %M(): %m%n

然后启动hive就可以直接连接hbase了

16.1 创建hive的内部表

create table student_hive(id int,name string,age int)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = 
":key,info:name,info:age")
TBLPROPERTIES ("hbase.table.name" = "student_hbase");

file

file

# 删除hive中的表
drop table student_hive;
# 内部表在删除的时候hbase的表也会被删除

file

16.2创建外部表

有的时候在hbase中已经存在一个表并且其中存在数据,我们需要使用hive进行分析,那么我们就需要创建一个外部表进行映射

# 首先在hbase中创建表
create 'student_hbase','info'
# 增加数据
put 'student_hbase','1','info:name','zhangsan'
put 'student_hbase','1','info:age','20'
put 'student_hbase','2','info:name','lisi'
put 'student_hbase','2','info:age','30'
# 这个时候就需要创建外部表进行映射
create external table student_hive(id int,name string,age int)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = 
":key,info:name,info:age")
TBLPROPERTIES ("hbase.table.name" = "student_hbase");

file

删除表,因为hive对应的是外部表所以hbase的表不会被删除掉

drop table student_hive;

file

16.3 关联计算表的值

hbase中创建工资表

#创建salary工资表
create 'salary','info'
put 'salary','001','info:id','1'
put 'salary','002','info:id','1'
put 'salary','003','info:id','1'
put 'salary','004','info:id','2'
put 'salary','005','info:id','2'
put 'salary','006','info:id','2'

put 'salary','001','info:salary','1000'
put 'salary','002','info:salary','2000'
put 'salary','003','info:salary','3000'
put 'salary','004','info:salary','4000'
put 'salary','005','info:salary','5000'
put 'salary','006','info:salary','6000'

#创建hive的表映射
create external table salary_hive(salary_id string,id int,salary int)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = 
":key,info:id,info:salary")
TBLPROPERTIES ("hbase.table.name" = "salary");

file

现在实现关联查询,每个用户的平均工资是多少,以及人名

select a.name,avg(b.salary) as avg
from student_hive a join salary_hive b
on a.id = b.id 
group by a.name

可以根据计算得出最终结果

file

16.4 hbase的数据导入导出

hbase的数据导出

# 使用hive的导出命令可以直接导出数据
insert overwrite local directory '/home/hadoop/salary.txt' select * from salary_hive;

可以通过外表的形式直接将数据导出到文件夹中

file

结果数据查看

file

默认使用^A进行文件分割

导入数据

不能会用hive的load方式直接将数据导入到hbase中,但是可以通过中间表的形式导入进行

# 首先在本地创建teacher.txt 输入以下内容
1,yeniu,20
2,xinniu,30
3,qingniu,35
# 在hive中创建临时表
create table teacher_tmp(id int,name string,age int)
row format delimited fields terminated by ',';
# 将数据加载到临时表中
load data local inpath '/home/hadoop/teacher.txt' into table teacher_tmp;
# 创建和hbase的外部映射表
create table teacher_hive(id int,name string,age int)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = 
":key,info:name,info:age")
TBLPROPERTIES ("hbase.table.name" = "teacher_hbase");

#从临时表使用mr将数据导入到hbase中
 insert into teacher_hive select * from teacher_tmp;

file

数据导入成功

17.hbase的bulkload

在大数据的场景计算中,有时候我们会遇见将大量数据一次性导入到hbase的情况,但是这个时候hbase是不能够容纳的,因为插入的数据首先会进入到memstore中如果大量插入数据会造成memstore的内存压力急剧增大,这个时候机器的其他进程是没有办法执行的,并且还会出现非常严重的问题,比如hbase在大量插入数据的时候首先这个region会急剧增加,后续region会按照拆分策略进行region拆分,当前region下线,插入程序会直接卡死造成hbase宕机等严重问题,为了解决这个问题,hbase给用户提供了一种新的插入数据的方式bulkload方式,这个方式中会跳过hbase本身的过程,首先在使用hbase的提供的mapreduce程序按照插入数据的格式和hbase的表格式生成hfile文件,然后我们将hfile文件一次性插入到hbase对应的hdfs的文件夹中,这种方式是最快捷并且对于hbase的压力最小的方式

过程如下:

# 首先在本地创建文件a.txt 输入以下内容
1,zhangsan,20
2,lisi,30
3,wangwu,40
5 zhaosi,50
# 然后将数据上传到hdfs中
hdfs dfs -put a.txt /
# 在hbase中创建表
create 't','info'
# 然后将id当成是rowkey,info:name存放名称 info:age存放年龄

执行importTSV方法,产生hfile文件

-Dimporttsv.separator :指定分隔符

-Dimporttsv.columns  :指定列映射 

​                                                   HBASE_ROW_KEY强制要求写

​                                                   cf:pk指定rowkey字段

​                                                   其他字段与hive表中对应

-Dimporttsv.skip.bad.lines:是否跳过无效行

-Dimporttsv.bulk.output:hfile输出路径

hbase表名

hdfs://worker-1:8020/data/hainiu/t2 :用于生成hfile文件的输入目录

具体执行命令如下:

hbase org.apache.hadoop.hbase.mapreduce.ImportTsv \
-Dimporttsv.separator=',' \
-Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:age  \
-Dimporttsv.skip.bad.lines=false \
-Dimporttsv.bulk.output=/t \
default:t hdfs://ns1/a.txt

查看hdfs文件

file

发现hfile文件已经生成,然后我们将数据导入到hdfs对应的目录中

hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles /t default:t

file

扫描结果发现数据已经存在

版权声明:原创作品,允许转载,转载时务必以超链接的形式表明出处和作者信息。否则将追究法律责任。来自海汼部落-DER,http://hainiubl.com/topics/76168
回复数量: 0
    暂无评论~~
    • 请注意单词拼写,以及中英文排版,参考此页
    • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`, 更多语法请见这里 Markdown 语法
    • 支持表情,可用Emoji的自动补全, 在输入的时候只需要 ":" 就可以自动提示了 :metal: :point_right: 表情列表 :star: :sparkles:
    • 上传图片, 支持拖拽和剪切板黏贴上传, 格式限制 - jpg, png, gif,教程
    • 发布框支持本地存储功能,会在内容变更时保存,「提交」按钮点击时清空
    Ctrl+Enter