1.sqoop

教程 阿布都的都 ⋅ 于 2023-01-07 14:44:10 ⋅ 437 阅读

1 sqoop原理

1.1 sqoop介绍

Sqoop是Apache旗下的一款“hadoop和关系型数据库服务器之间传送数据”的工具。
  导入数据:MySQL、Oracle导入数据到hadoop的hdfs、hive、hbase等数据存储系统。
  导出数据:从hadoop的文件系统中导出数据到关系型数据库中。

1.2 sqoop架构

file

  • 导入流程
    1. 首先通过jdbc读取关系型数据库元数据信息,获取到表结构。
    2. 根据元数据信息生成Java类。
    3. 启动import程序,通过jdbc读取关系型数据库数据,并通过上一步的Java类进行序列化。
    4. MapReduce并行写数据到Hadoop中,并使用Java类进行反序列化。
      • 导出流程
    5. sqoop通过jdbc读取关系型数据库元数据,获取到表结构信息,生成Java类,用于序列化。
    6. MapReduce并行读取hdfs数据,并且通过Java类进行序列化。
    7. export程序启动,通过Java类反序列化,同时启动多个map,通过jdbc将数据写入到关系型数据库中。

2 cdh部署sqoop

1)添加服务

file

file

2)添加gateway节点

file

3)完成效果

file

4)测试sqoop

# hdfs认证
kinit hdfs

# shell 里执行 sqoop 命令
sqoop help

查看,说明sqoop安装完成

file

3 sqoop常用参数

安全环境下操作需要做安全认证

  • 常用命令
命令名称 对应类 命令说明
import ImportTool 将关系型数据库数据导入到HDFS、HIVE、HBASE
export ExportTool 将HDFS上的数据导出到关系型数据库
codegen CodeGenTool 获取数据库中某张表数据生成Java并打成Jar包
create-hive-table CreateHiveTableTool 创建hive的表
eval EvalSqlTool 查看SQL的执行结果
list-databases ListDatabasesTool 列出所有数据库
list-tables ListTablesTool 列出某个数据库下的所有表
help HelpTool 打印sqoop帮助信息
version VersionTool 打印sqoop版本信息
  • 连接参数列表
Argument Description
--connect <jdbc-uri> Specify JDBC connect string 指定JDBC连接字符串
--connection-manager <class-name> Specify connection manager class to use 指定要使用的连接管理器类
--driver <class-name> Manually specify JDBC driver class to use 指定要使用的JDBC驱动类
--hadoop-mapred-home <dir> Override $HADOOP_MAPRED_HOME 指定$HADOOP_MAPRED_HOME路径
--help Print usage instructions 帮助信息
--password-file Set path for a file containing the authentication password 设置用于存放认证的密码信息文件的路径
-P Read password from console 从控制台读取输入的密码
--password <password> Set authentication password 设置认证密码
--username <username> Set authentication username 设置认证用户名
--verbose Print more information while working 打印运行信息
--connection-param-file <filename> Optional properties file that provides connection parameters 指定存储数据库连接参数的属性文件
  • 连接MySQL示例
# 查询数据库列表 对标show databases
sqoop list-databases --connect jdbc:mysql://localhost:3306/ --username root --password 12345678

mysql -uroot -p12345678
# 如果想连接 jdbc:mysql://worker-1:3306, 需要创建远程root访问权限
# 创建远程root用户
CREATE USER 'root'@'%' IDENTIFIED BY '12345678';   

# 给远程root用户增加数据库权限
grant all privileges on *.* to 'root'@'%' identified by '12345678';
# 更新
flush privileges; 

sqoop list-databases --connect jdbc:mysql://worker-1:3306/ --username root --password 12345678

file

# 查询指定库下面所有表 对标show tables in cm
sqoop list-tables --connect jdbc:mysql://worker-1:3306/cm --username root --password 12345678

4 sqoop应用

4.1 准备测试数据

应用场景:

​ 使用sqoop上传字典表数据到hive中与我们的数据进行关联查询。

以 商品表 为例:

-- 创建sqoop_db 数据库
create database sqoop_db default charset utf8 collate utf8_general_ci;

-- 导入SQL文件
mysql -uroot -P3306 -p12345678 sqoop_db < /tmp/goods_table.sql

file

4.2 eval 查看 sql 查询结果

# 没有where条件
sqoop eval \
--connect jdbc:mysql://worker-1:3306/sqoop_db \
--username root \
--password 12345678 \
--query "select * from goods_table limit 10"

file

4.3 create-hive-table创建hive表

# 基于MySQL表创建hive表

# 需要认证以及拥有hive建表权限
kinit hive

sqoop create-hive-table \
--connect jdbc:mysql://worker-1:3306/sqoop_db \
--username root \
--password 12345678 \
--table goods_table \
--hive-table xinniu.goods_table

file

4.4 多map条件查询导入HDFS

语法 :

sqoop import \
--connect 数据库连接字符串 \
--username 数据库用户名 \
--password 数据库密码 \
--target-dir HDFS位置 \
--delete-target-dir \
--fields-terminated-by "\t" \
--num-mappers 3 \
--split-by 切分数据依据 \
--query 'select SQL where 查询条件 and $CONDITIONS'

参数解释 :

--query或--e 将查询结果的数据导入,使用时必须伴随参--target-dir,--hive-table,如果查询中有where条件,则条件后必须加上$CONDITIONS关键字

当sqoop使用--query+sql执行多个maptask并行运行导入数据时,每个maptask将执行一部分数据的导入,原始数据需要使用'--split-by 某个字段'来切分数据,不同的数据交给不同的maptask去处理。maptask执行sql副本时,需要在where条件中添加$CONDITIONS条件,这个是linux系统的变量,可以根据sqoop对边界条件的判断,来替换成不同的值,这就是说若split-by id,则sqoop会判断id的最小值和最大值判断id的整体区间,然后根据maptask的个数来进行区间拆分,每个maptask执行一定id区间范围的数值导入任务,如下为示意图。

file

4.3.1 导入文本文件

#用xinniu认证

sqoop import \
--connect jdbc:mysql://worker-1:3306/sqoop_db"?useUnicode=true&characterEncoding=UTF-8" \
--username root \
--password 12345678 \
--target-dir /data/xinniu/sqoop/data/goods_1 \
--delete-target-dir \
--fields-terminated-by "\001" \
--num-mappers 4 \
--split-by id \
--query 'select * from goods_table where id < 10 and $CONDITIONS'

查询结果 :

file

4.3.2 导入其他格式文件

# 导入不同格式,支持格式as-avrodatafile、as-binaryfile、as-parquetfile、as-sequencefile、as-textfile(默认格式)
# 多次导入时会报jar包已存在错误,请忽略,原因为sqoop读取源数据的schema文件创建的jar在前几次任务中已经创建了。

sqoop import \
--connect jdbc:mysql://worker-1:3306/sqoop_db"?useUnicode=true&characterEncoding=UTF-8" \
--username root \
--password 12345678 \
--target-dir /user/xinniu/sqoop/data/goods_2_parquet \
--delete-target-dir \
--as-parquetfile \
--num-mappers 4 \
--split-by id \
--query 'select * from goods_table where id < 10 and $CONDITIONS'

结果:

file

4.5 导入hive表

4.5.1 导入文本表

# hive认证
kinit -kt /data/hive.keytab hive

# 导入命令
sqoop import \
--connect jdbc:mysql://worker-1:3306/sqoop_db"?useUnicode=true&characterEncoding=UTF-8" \
--username root \
--password 12345678 \
--table goods_table \
--num-mappers 1 \
--delete-target-dir \
--hive-import \
--fields-terminated-by "\001" \
--hive-overwrite \
--hive-table xinniu.goods_table1

上面过程分为两步:

​ 1)第一步将数据导入到HDFS,默认的临时目录是/user/当前操作用户/mysql表名;

​ 2)第二步将导入到HDFS的数据迁移到Hive表,如果hive表不存在,sqoop会自动创建内部表;(我们的是在/user/xinniu/goods_table,通过查看job的configuration的outputdir属性得知)

file

结果:

file

查询数据:

file

4.5.2 导入其他格式表

# hive认证
kinit -kt /data/hive.keytab hive

# 导入命令
sqoop import \
--connect jdbc:mysql://worker-1:3306/sqoop_db"?useUnicode=true&characterEncoding=UTF-8" \
--username root \
--password 12345678 \
--table goods_table \
--num-mappers 1 \
--delete-target-dir \
--as-parquetfile \
--hive-import \
--hive-overwrite \
--hive-table xinniu.goods_table_parquet

结果:

file

4.6 import to hbase

# 用hdfs认证
kinit -kt /data/hdfs.keytab hdfs

hadoop fs -mkdir /user/hbase
hadoop fs -chown hbase /user/hbase

# 用hbase认证
kinit -kt /data/hbase.keytab hbase
# sqoop导入hbase
sqoop import \
--connect jdbc:mysql://worker-1:3306/sqoop_db"?useUnicode=true&characterEncoding=UTF-8" \
--username root \
--password 12345678 \
--table goods_table \
--hbase-create-table \
--hbase-table xinniu:goods_table \
--column-family cf \
--hbase-row-key id

# --hbase-row-key: 要求MySQL表必须有主键,将主键作为rowkey,标识一行 

导入后,查看:

file

5 应用实例一

sqoop_db 库导入 comm_area 表和数据

导入后查看:

file

前提:

# 给 impala 赋予hive的所有权限
grant role admin_role to group impala;

# sqoop_db库导入comm_area表

# 创建linux导出目录
/data/xinniu/extract

1) mysql --> hdfs目录 (sqoop)

2)创建hive外表指向 hdfs目录(临时表作用,先删除,后创建) (impala)

3)创建分区表(只创建一次),分区:batch_date (impala)

4) 先删除分区,再创建分区并查询导入(目的:可以使得脚本能重复执行) (impala)

file

vim area_op.sh

# 获取batch_date,比如:今天20211010, 那batch_date是20211009
batch_date=`date -d 1' day ago' +%Y%m%d`

# xinniu认证
kinit -kt /data/xinniu.keytab xinniu
# 用sqoop,查询表数据导入到hdfs上
# -Dorg.apache.sqoop.splitter.allow_text_splitter=true: --split-by的是字符串也可以
sqoop import -Dorg.apache.sqoop.splitter.allow_text_splitter=true \
--connect jdbc:mysql://worker-1:3306/sqoop_db \
--username root \
--password 12345678 \
--target-dir /user/xinniu/comm_area/${batch_date}/ \
--delete-target-dir \
--fields-terminated-by "\t" \
--split-by area_code \
--query 'select comm_area.* from comm_area where $CONDITIONS'
# $?: 返回上个命令的结果, 0:成功, 非0:失败
res=$?
if [ ${res} != 0 ];then
echo 'extract comm_area error! '`date` >> /data/xinniu/extract/comm_area.log
exit 1
else
echo 'extract comm_area successful '`date` >> /data/xinniu/extract/comm_area.log
fi

# impala认证
kinit -kt /data/impala.keytab impala
# 用impala-shell, 创建hive表tmp.comm_area(临时表作用),并指向导入的hdfs目录
impala-shell -k -q "set sync_ddl = true;drop table if exists tmp.comm_area;create external table tmp.comm_area (
area_code string,
area_cname string,
area_ename string
)
row format delimited fields terminated by '\t'
location '/user/xinniu/comm_area/${batch_date}';
set sync_ddl = false;"

res=$?
if [ ${res} != 0 ];then
echo 'create comm_area tmp table error! '`date` >> /data/xinniu/extract/comm_area.log
exit 1
else
echo 'create comm_area tmp table successful '`date` >> /data/xinniu/extract/comm_area.log
fi

# impala认证
kinit -kt /data/impala.keytab impala
# 用impala-shell,创建hive分区表 itl.comm_area
impala-shell -k -q"set sync_ddl = true;create table if not exists itl.comm_area (
area_code string,
area_cname string,
area_ename string
)
partitioned by (pt string)
stored as parquet
tblproperties ('parquet.compress'='SNAPPY')
;
set sync_ddl = false;"

res=$?
if [ ${res} != 0 ];then
echo 'create comm_area itl table error! '`date` >> /data/xinniu/extract/comm_area.log
exit 1
else
echo 'create comm_area itl table successful '`date` >> /data/xinniu/extract/comm_area.log
fi

# 将临时表中的数据导入到 itl.comm_area 表中
kinit -kt /data/impala.keytab impala
impala-shell -k -q "set sync_ddl = true;alter table itl.comm_area drop if exists partition (pt = '${batch_date}');set sync_ddl = false;"
impala-shell -k -q "set sync_ddl = true;insert into table itl.comm_area partition (pt) select a.*,'${batch_date}' from tmp.comm_area a;set sync_ddl = false;"

res=$?
if [ ${res} != 0 ];then
echo 'load comm_area data to itl table error! '`date` >> /data/xinniu/extract/comm_area.log
exit 1
else
echo 'load comm_area data to itl table successful '`date` >> /data/xinniu/extract/comm_area.log
fi

6 应用实例二

vim goods_op.sh

kinit -kt /data/xinniu.keytab xinniu

batch_date=$1

# --hive-drop-import-delims: 在导入数据到hive时,去掉数据中的\r\n\013\010这样的字符
sqoop import \
--connect jdbc:mysql://worker-1:3306/sqoop_db \
--username root \
--password 12345678 \
--target-dir /user/xinniu/goods_table/2022-08-24/ \
--hive-drop-import-delims \
--delete-target-dir \
--fields-terminated-by "\t" \
--split-by Id \
--query 'select * from goods_table where $CONDITIONS'

res=$?
if [ ${res} != 0 ];then
echo 'extract goods_table error! '`date` >> /data/xinniu/extract/goods_table.log
exit 1
else
echo 'extract goods_table successful '`date` >> /data/xinniu/extract/goods_table.log
fi

kinit -kt /data/impala.keytab impala
impala-shell -k -q "set sync_ddl = true;drop table if exists tmp.goods_table;create external table tmp.goods_table (
Id string,
goods_sn string,
goods_cname string,
goods_ename string,
goods_price string
)
row format delimited fields terminated by '\t'
location '/user/xinniu/goods_table/2022-08-24';
set sync_ddl = false;"

res=$?
if [ ${res} != 0 ];then
echo 'create goods_table tmp table error! '`date` >> /data/xinniu/extract/goods_table.log
exit 1
else
echo 'create goods_table tmp table successful '`date` >> /data/xinniu/extract/goods_table.log
fi

kinit -kt /data/impala.keytab impala
impala-shell -k -q"set sync_ddl = true;create table if not exists itl.goods_table (
Id string,
goods_sn string,
goods_cname string,
goods_ename string,
goods_price string
)
partitioned by (pt string)
stored as parquet
tblproperties ('parquet.compress'='SNAPPY')
;
set sync_ddl = false;
"

res=$?
if [ ${res} != 0 ];then
echo 'create goods_table itl table error! '`date` >> /data/xinniu/extract/goods_table.log
exit 1
else
echo 'create goods_table itl table successful '`date` >> /data/xinniu/extract/goods_table.log
fi

# 加载数据到itl层 跑批日期使用参数传递
kinit -kt /data/impala.keytab impala
impala-shell -k -q "set sync_ddl = true;alter table itl.goods_table drop if exists partition (pt = '2022-08-24');set sync_ddl = false;"
impala-shell -q "set sync_ddl = true;insert into table itl.goods_table partition (pt) select a.*,'2022-08-24' from tmp.goods_table a;set sync_ddl = false;"
res=$?
if [ ${res} != 0 ];then
echo 'load goods_table data to itl table error! '`date` >> /data/xinniu/extract/goods_table.log
exit 1
else
echo 'load goods_table data to itl table successful '`date` >> /data/xinniu/extract/goods_table.log
fi

执行时,需要从外界将日期传递过来

# 给脚本添加执行权
chmod a+x goods_op.sh

# 执行脚本
sh -x goods_op.sh 20211010
版权声明:原创作品,允许转载,转载时务必以超链接的形式表明出处和作者信息。否则将追究法律责任。来自海汼部落-阿布都的都,http://hainiubl.com/topics/76086
成为第一个点赞的人吧 :bowtie:
回复数量: 0
    暂无评论~~
    • 请注意单词拼写,以及中英文排版,参考此页
    • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`, 更多语法请见这里 Markdown 语法
    • 支持表情,可用Emoji的自动补全, 在输入的时候只需要 ":" 就可以自动提示了 :metal: :point_right: 表情列表 :star: :sparkles:
    • 上传图片, 支持拖拽和剪切板黏贴上传, 格式限制 - jpg, png, gif,教程
    • 发布框支持本地存储功能,会在内容变更时保存,「提交」按钮点击时清空
    Ctrl+Enter