sqoop

教程 薪牛 ⋅ 于 2023-02-04 17:54:36 ⋅ 2633 阅读

1 sqoop原理

1.1 sqoop介绍

Sqoop是Apache旗下的一款“hadoop和关系型数据库服务器之间传送数据”的工具。
  导入数据:MySQL、Oracle导入数据到hadoop的hdfs、hive、hbase等数据存储系统。
  导出数据:从hadoop的文件系统中导出数据到关系型数据库中。

1.2 sqoop架构

file

  • 导入流程
    1. 首先通过jdbc读取关系型数据库元数据信息,获取到表结构。
    2. 根据元数据信息生成Java类。
    3. 启动import程序,通过jdbc读取关系型数据库数据,并通过上一步的Java类进行序列化。
    4. MapReduce并行写数据到Hadoop中,并使用Java类进行反序列化。
      • 导出流程
    5. sqoop通过jdbc读取关系型数据库元数据,获取到表结构信息,生成Java类。
    6. MapReduce并行读取hdfs数据,并且通过Java类进行序列化。
    7. export程序启动,通过Java类反序列化,同时启动多个map,通过jdbc将数据写入到关系型数据库中。

file

2 安装sqoop

选择已经安装好hive镜像,里面以及安装过mysql,hadoop ,zookeeper,hive。根据自己资源情况调整cpu和内存的资源。

镜像地址:http://cloud.hainiubl.com/#/privateImageDetail?id=2953&imageType=private。选择添加到实验配置

file

1 将sqoop安装包解压到/usr/local目录下

tar -zxvf /public/software/bigdata/sqoop-1.4.7.bin__hadoop-2.6.0.tar.gz -C /usr/local/

2 创建软链接

ln -s sqoop-1.4.7.bin__hadoop-2.6.0/ sqoop

file

3 修改sqoop安装目录的所有者和属组

 chown -R  hadoop:hadoop sqoop-1.4.7.bin__hadoop-2.6.0/

file

4 修改sqoop环境变量

#添加到/etc/profile文件中
export SQOOP_HOME=/usr/local/sqoop
export PATH=$PATH:$SQOOP_HOME/bin
#让环境变量立即生效
source /etc/profile

5 测试sqoop

# shell 里执行 sqoop 命令
sqoop help

6 导入mysql驱动包到sqooplib目录下

cp /usr/local/hive/lib/mysql-connector-java-5.1.49.jar  /usr/local/sqoop/lib/

3 sqoop常用参数

  • 常用命令
命令名称 对应类 命令说明
import ImportTool 将关系型数据库数据导入到HDFS、HIVE、HBASE
export ExportTool 将HDFS上的数据导出到关系型数据库
codegen CodeGenTool 获取数据库中某张表数据生成Java并打成Jar包
create-hive-table CreateHiveTableTool 创建hive的表
eval EvalSqlTool 查看SQL的执行结果
list-databases ListDatabasesTool 列出所有数据库
list-tables ListTablesTool 列出某个数据库下的所有表
help HelpTool 打印sqoop帮助信息
version VersionTool 打印sqoop版本信息
  • 连接参数列表
Argument Description
--connect <jdbc-uri> Specify JDBC connect string 指定JDBC连接字符串
--connection-manager <class-name> Specify connection manager class to use 指定要使用的连接管理器类
--driver <class-name> Manually specify JDBC driver class to use 指定要使用的JDBC驱动类
--hadoop-mapred-home <dir> Override $HADOOP_MAPRED_HOME 指定$HADOOP_MAPRED_HOME路径
--help Print usage instructions 帮助信息
--password-file Set path for a file containing the authentication password 设置用于存放认证的密码信息文件的路径
-P Read password from console 从控制台读取输入的密码
--password <password> Set authentication password 设置认证密码
--username <username> Set authentication username 设置认证用户名
--verbose Print more information while working 打印运行信息
--connection-param-file <filename> Optional properties file that provides connection parameters 指定存储数据库连接参数的属性文件
  • 连接MySQL示例
# 查询数据库列表 对标show databases
sqoop list-databases --connect jdbc:mysql://localhost:3306/ --username root --password 12345678

file

# 查询指定库下面所有表 对标show tables in cm
sqoop list-tables --connect jdbc:mysql://localhost:3306/hive_meta --username root --password 12345678

file

4 sqoop应用

4.1 准备测试数据

应用场景:

​ 使用sqoop上传字典表数据到hive中与我们的数据进行关联查询。

以 商品表 为例:

-- 创建sqoop_db 数据库
create database sqoop_db default charset utf8 collate utf8_general_ci;

-- 导入SQL文件
mysql -uroot  -p sqoop_db < /public/data/goods_table.sql

file

4.2 eval 查看 sql 查询结果

# 没有where条件
sqoop eval \
--connect jdbc:mysql://nn1:3306/sqoop_db \
--username root \
--password 12345678 \
--query "select * from goods_table limit 10"

file

4.3 create-hive-table创建hive表

先启动hadoop集群

# 基于MySQL表创建hive表

sqoop create-hive-table \
--connect jdbc:mysql://nn1:3306/sqoop_db \
--username root \
--password 12345678 \
--table goods_table \
--hive-table hainiu.goods_table

报错:

file

修改sqoop配置文件

mv sqoop-env-template.sh  sqoop-env.sh

添加hadoop,hive,hbase等环境信息

export ZOOKEEPER_HOME=/usr/local/zookeeper
export HADOOP_HOME=/usr/local/hadoop
export HIVE_HOME=/usr/local/hive
export HIVE_CONF_DIR=/usr/local/hive/conf

将hive-common-3.1.3.jar拷贝到sqoop的lib目录下

cp /usr/local/hive/lib/hive-common-3.1.3.jar  /usr/local/sqoop/lib

测试:

file

查看hive表是否创建成功

file

4.4 多map条件查询导入HDFS

语法 :

sqoop import \
--connect 数据库连接字符串 \
--username 数据库用户名 \
--password 数据库密码 \
--target-dir HDFS位置 \
--delete-target-dir \
--fields-terminated-by "\t" \
--num-mappers 3 \
--split-by 切分数据依据 \
--query 'select SQL where 查询条件 and $CONDITIONS'

参数解释 :

--query或--e 将查询结果的数据导入,使用时必须伴随参--target-dir,--hive-table,如果查询中有where条件,则条件后必须加上$CONDITIONS关键字

当sqoop使用--query+sql执行多个maptask并行运行导入数据时,每个maptask将执行一部分数据的导入,原始数据需要使用'--split-by 某个字段'来切分数据,不同的数据交给不同的maptask去处理。maptask执行sql副本时,需要在where条件中添加$CONDITIONS条件,这个是linux系统的变量,可以根据sqoop对边界条件的判断,来替换成不同的值,这就是说若split-by id,则sqoop会判断id的最小值和最大值判断id的整体区间,然后根据maptask的个数来进行区间拆分,每个maptask执行一定id区间范围的数值导入任务,如下为示意图。

file

4.3.1 导入文本文件

#用hainiu认证

sqoop import \
--connect jdbc:mysql://nn1:3306/sqoop_db"?useUnicode=true&characterEncoding=UTF-8" \
--username root \
--password 12345678 \
--target-dir /user/hainiu/sqoop/data/goods_1 \
--delete-target-dir \
--fields-terminated-by "\001" \
--num-mappers 4 \
--split-by id \
--query 'select * from goods_table where id < 10 and $CONDITIONS'

# 注意:
# --split-by: 一般都是数值型。
# -Dorg.apache.sqoop.splitter.allow_text_splitter=true: --split-by的是字符串也可以

查询结果 :

file

4.3.2 导入其他格式文件

# 导入不同格式,支持格式as-avrodatafile、as-parquetfile、as-sequencefile、as-textfile(默认格式)
# 多次导入时会报jar包已存在错误,请忽略,原因为sqoop读取源数据的schema文件创建的jar在前几次任务中已经创建了。

sqoop import \
--connect jdbc:mysql://nn1:3306/sqoop_db"?useUnicode=true&characterEncoding=UTF-8" \
--username root \
--password 12345678 \
--target-dir /user/hainiu/sqoop/data/goods_2_parquet \
--delete-target-dir \
--as-parquetfile \
--num-mappers 4 \
--split-by id \
--query 'select * from goods_table where id < 10 and $CONDITIONS'

结果:

file

4.5 全量导入hive表

4.5.1 导入文本表

# 导入命令
sqoop import \
--connect jdbc:mysql://nn1:3306/sqoop_db"?useUnicode=true&characterEncoding=UTF-8" \
--username root \
--password 12345678 \
--table goods_table \
--num-mappers 1 \
--delete-target-dir \
--hive-import \
--fields-terminated-by "\001" \
--hive-overwrite \
--hive-table hainiu.goods_table

上面过程分为两步:

​ 1)第一步将数据导入到HDFS,默认的临时目录是/user/当前操作用户/mysql表名;

​ 2)第二步将导入到HDFS的数据迁移到Hive表,如果hive表不存在,sqoop会自动创建内部表;(我们的是在/user/hainiu/goods_table,通过查看job的configuration的outputdir属性得知)

file

结果:

file

查询数据:

file

4.6 增量数据导入

现在我们已经实现了 hive的数据导入方式,那么我们怎么实现hive的增量数据导入呢?

1、append方式

2、lastmodified方式,必须要加--append(追加)或者--merge-key(合并,一般填主键)

file

我们先建新表进行增量数据的演示

CREATE TABLE `goods_update_table` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `goods_sn` varchar(50) COLLATE utf8_bin NOT NULL COMMENT '商品的唯一编号、货号',
  `goods_cname` varchar(100) COLLATE utf8_bin NOT NULL COMMENT '商品名称(中文)',
  `goods_ename` varchar(100) COLLATE utf8_bin DEFAULT NULL COMMENT '商品名称(英文)',
  `goods_price` double NOT NULL COMMENT '商品价格',
  `last_update_time` datetime NOT NULL DEFAULT now()COMMENT '最近一次更新商品配置的时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
-- 添加数据
INSERT INTO `goods_update_table` (goods_sn,goods_cname,goods_ename,goods_price,last_update_time) 
VALUES ('111111', '漂亮的高跟鞋1', '', 888, '2020-10-10 11:00:00');

4.6.1 全量导入

sqoop import \
--connect jdbc:mysql://nn1:3306/sqoop_db"?useUnicode=true&characterEncoding=UTF-8" \
--username root \
--password 12345678 \
--table goods_update_table \
--num-mappers 1 \
--delete-target-dir \
--hive-import \
--fields-terminated-by "\001" \
--hive-overwrite \
--hive-table hainiu.goods_update_table

file

由于 --hive import 与 incremental 冲突, 所以增量导入不能直接导入到hive表中,但可以导入到hive表对应的hdfs目录里

4.6.2 按照id增量导入

incremental append 用法

-- MySQL添加一条新的数据
INSERT INTO `goods_update_table` (goods_sn,goods_cname,goods_ename,goods_price,last_update_time) VALUES 
('222222', '漂亮的长筒靴1', '', 999, '2020-10-10 12:00:00');

-- 按照id增量导入
sqoop import \
--connect jdbc:mysql://nn1:3306/sqoop_db"?useUnicode=true&characterEncoding=UTF-8" \
--username root \
--password 12345678 \
--table goods_update_table \
--num-mappers 1 \
--target-dir /hive/warehouse/hainiu.db/goods_update_table \
--fields-terminated-by "\001" \
--incremental append \
--check-column id \
--last-value 1

-- 参数解释:
-- 1)incremental <mode> : append或lastmodified,使用lastmodified方式导入数据要指定增量数据是要--append(追加)还是要--merge-key(合并) 
-- 2)check-column <字段> : 作为增量导入判断的列名
-- 3)last-value val :  指定某一个值,用于标记增量导入的位置,这个值的数据不会被导入到表中,只用于标记当前表中最后的值。

file

4.6.3 按照时间增量导入

--incremental lastmodified --append 用法

如果按照时间增量进行数据导入可以使用 --incremental lastmodified --append 这种方式进行数据导入,lastmodified 用于更新的日期列

file

INSERT INTO `goods_update_table` (goods_sn,goods_cname,goods_ename,goods_price,last_update_time) VALUES 
('333333', '漂亮的长筒靴2', '', 999, '2020-10-10 13:00:00');

sqoop import \
--connect jdbc:mysql://nn1:3306/sqoop_db"?useUnicode=true&characterEncoding=UTF-8" \
--username root \
--password 12345678 \
--table goods_update_table \
--num-mappers 1 \
--target-dir /hive/warehouse/hainiu.db/goods_update_table \
--fields-terminated-by "\001" \
--incremental lastmodified \
--check-column last_update_time \
--last-value '2020-10-10 13:00:00' \
--append

-- 注意:last-value 的设置是把包括 2020-10-10 13:00:00 时间的数据做增量导入。

结果:id=3的数据成功导入

file

4.6.3 按照时间增量并按照主键合并导入

--incremental lastmodified --merge-key 用法

如果之前的数据有修改的话可以使用--incremental lastmodified --merge-key进行数据合并执行修改的SQL

-- 更改商品价格
update goods_update_table set goods_price=666 where id=3;

进行合并导入

sqoop import \
--connect jdbc:mysql://nn1:3306/sqoop_db"?useUnicode=true&characterEncoding=UTF-8" \
--username root \
--password 12345678 \
--table goods_update_table \
--num-mappers 1 \
--target-dir /hive/warehouse/hainiu.db/goods_update_table \
--fields-terminated-by "\001" \
--incremental lastmodified \
--check-column last_update_time \
--last-value '2020-10-10 13:00:00' \
--merge-key id

-- --incremental lastmodified  --merge-key的作用:修改过的数据和新增的数据(前提是满足last-value的条件)都会导入进来,并且重复的数据(不需要满足last-value的条件)都会进行合并 

file

4.7 import to hbase

在nn1上安装hbase组件
解压hbase安装包到/usr/local目录下

 tar -zxvf /public/software/bigdata/hbase-2.4.13-bin.tar.gz -C /usr/local/

创建软连接

ln -s hbase-2.4.13/ hbase

修改hbase安装目录的所有者和属组为hadoop用户hadoop用户组

chown -R hadoop:hadoop /usr/local/hbase-2.4.13

file

修改conf目录下的hbase-env.sh配置文件

#添加以下内容
export JAVA_HOME=/usr/java/default
export HBASE_MANAGES_ZK=false

修改conf目录下的hbase-site.xml配置文件

<property>
    <name>hbase.rootdir</name>
    <value>hdfs://ns1/hbase</value>
</property>
<property>
  <name>hbase.tmp.dir</name>
  <value>/usr/local/hbase/tmp</value>
 </property>
<property>
    <name>hbase.zookeeper.property.clientPort</name>
    <value>2181</value>
</property>
<!-- zookeeper的端口号 -->
<property>
    <name>hbase.zookeeper.quorum</name>
    <value>nn1</value>
</property>
 <property>
  <name>hbase.cluster.distributed</name>
  <value>true</value>
 </property>

将准备好的hbasejar包导入到sqoop的lib目录下

tar -zxvf /public/software/other/hbasejars.tar.gz /usr/local/sqoop/lib

进入hbase客户端并创建hainiu名称空间

#连接hbase客户端
hbase shell
#创建hainiu的名称空间
create_namespace 'hainiu'

# sqoop导入hbase
sqoop import \
--connect jdbc:mysql://nn1:3306/sqoop_db"?useUnicode=true&characterEncoding=UTF-8" \
--username root \
--password 12345678 \
--table goods_table \
--hbase-create-table \
--hbase-table hainiu:goods_table \
--column-family cf \
--hbase-row-key id

# --hbase-row-key: 要求MySQL表必须有主键,将主键作为rowkey,标识一行 

导入后,查看:

scan 'hainiu:goods_table'

file

4.8 export 数据导出

4.8.1 hdfs数据导出到MySQL中

hdfs准备如下数据,放到/data/xinniu目录下

101|bob|manager|50000|yanfa
102|jerry|java |40000|yanfa
103|rose|php|30000|yanfa
104|jim|php|30000|yanfa
105|tom|bigdata|50000|yanfa
hadoop fs -put student /data/xinniu/

sqoop将hdfs数据导入到mysql表中,不会自动创建表,所以需要我们在mysql中,根据hdfs文件中的数据,创建对应的表

CREATE TABLE emp ( 
   id INT NOT NULL PRIMARY KEY, 
   name VARCHAR(20), 
   deg VARCHAR(20),
   salary INT,
   dept VARCHAR(10));

以下命令用于hdfs数据(位于HDFS上的/data/xinniu/的文件)导出到mysql中sqoop_db库下的emp表

sqoop export \
--connect jdbc:mysql://nn1:3306/sqoop_db"?useUnicode=true&characterEncoding=UTF-8" \
--username root \
--password 12345678 \
--export-dir /data/xinniu/data.txt \
--table emp \
--num-mappers 1 \
--input-fields-terminated-by '|'

验证

select * from emp;

file

4.8.2 hive表数据导出到mysql中

sqoop的export命令支持 insert、update到关系型数据库,但是不支持merge;

hive表导入mysql数据库insert案例

查看hive中hainiu.student表数据

file

将数据导出到mysql中

CREATE TABLE student ( 
   id INT NOT NULL PRIMARY KEY, 
   name VARCHAR(20), 
   age INT);
sqoop export \
--connect  jdbc:mysql://nn1:3306/sqoop_db"?useUnicode=true&characterEncoding=UTF-8" \
--username root \
--password 12345678 \
--table student \
--export-dir /hive/warehouse/hainiu.db/student \
--num-mappers 1 \
--fields-terminated-by '\t'

结果:

file

hive表导入mysql数据库update案例

更新hive的数据

file

重新将hive表中的数据导入到mysql中并按照id进行更新

sqoop export \
--connect jdbc:mysql://nn1:3306/sqoop_db"?useUnicode=true&characterEncoding=UTF-8" \
--username root \
--password 12345678 \
--table student \
--export-dir /hive/warehouse/hainiu.db/student \
--update-key id \
--num-mappers 1 \
--fields-terminated-by '\t'

查看结果:

file

6 应用实例

需求:

编写一个脚本

将sqoop_db中的goods_table表每天抽取所有数据并导入到hdfs:/user/hainiu/goods_table目录下。并按照每天的日期生成对应的目录保存表数据。以shell脚本的方式运行每天定时运行。

vim goods_op.sh

batch_date=$1

sqoop import \
--connect jdbc:mysql://nn1:3306/sqoop_db \
--username root \
--password 12345678 \
--target-dir /user/hainiu/goods_table/${batch_date}/ \
--delete-target-dir \
--fields-terminated-by "\t" \
--split-by Id \
--query 'select * from goods_table where $CONDITIONS'

res=$?
if [ ${res} != 0 ];then
echo 'extract goods_table error! '`date` >> /data/hainiu/extract/goods_table.log
exit 1
else
echo 'extract goods_table successful '`date` >> /data/hainiu/extract/goods_table.log
fi

执行时,需要从外界将日期传递过来

# 给脚本添加执行权
chmod a+x goods_op.sh

# 执行脚本
sh -x goods_op.sh 20230305
版权声明:原创作品,允许转载,转载时务必以超链接的形式表明出处和作者信息。否则将追究法律责任。来自海汼部落-薪牛,http://hainiubl.com/topics/76182
成为第一个点赞的人吧 :bowtie:
回复数量: 0
    暂无评论~~
    • 请注意单词拼写,以及中英文排版,参考此页
    • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`, 更多语法请见这里 Markdown 语法
    • 支持表情,可用Emoji的自动补全, 在输入的时候只需要 ":" 就可以自动提示了 :metal: :point_right: 表情列表 :star: :sparkles:
    • 上传图片, 支持拖拽和剪切板黏贴上传, 格式限制 - jpg, png, gif,教程
    • 发布框支持本地存储功能,会在内容变更时保存,「提交」按钮点击时清空
    Ctrl+Enter