测试数据准备
1 2 -- 用于在 Mysql 中生成测试数据 CREATE DATABASE sqoop;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 CREATE TABLE sqoop.goodtbl( gname varchar(50), serialNumber int, price int, stock_number int, create_time date ); DROP FUNCTION IF EXISTS `rand_string`; DROP PROCEDURE IF EXISTS `batchInsertTestData`; -- 替换语句默认的执行符号,将;替换成 // DELIMITER // CREATE FUNCTION `rand_string` (n INT) RETURNS VARCHAR(255) CHARSET 'utf8' BEGIN DECLARE char_str varchar(200) DEFAULT '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ'; DECLARE return_str varchar(255) DEFAULT ''; DECLARE i INT DEFAULT 0; WHILE i < n DO SET return_str = concat(return_str, substring(char_str, FLOOR(1 + RAND()*36), 1)); SET i = i+1; END WHILE; RETURN return_str; END // -- 第一个参数表示:序号从几开始;第二个参数表示:插入多少条记录 CREATE PROCEDURE `batchInsertTestData` (m INT, n INT) BEGIN DECLARE i INT DEFAULT 0; WHILE i < n DO insert into goodtbl (gname, serialNumber, price, stock_number, create_time) values (rand_string(6), i+m, ROUND(RAND()*100), FLOOR(RAND()*100), now()); SET i = i+1; END WHILE; END // DELIMITER ;
1 call batchInsertTestData(1, 100);
以下案例需要启动:HDFS、YARN、MySQL 对应的服务;
导入是指:从关系型数据库向大数据集群(HDFS、HIVE、HBASE)传输数据;使用import关键字;
导出是指:从 大数据集群 向 关系型数据库 传输数据;使用export关键字;
导入数据
MySQL 到 HDFS
1. 导入全部数据
1 2 3 4 5 6 7 8 9 sqoop import \ --connect jdbc:mysql://Linux123:3306/sqoop \ --username hive \ --password 12345678 \ --table goodtbl \ --target-dir /root/lagou \ --delete-target-dir \ --num-mappers 1 \ --fields-terminated-by "\t"
target-dir:将数据导入 HDFS 的路径;
delete-target-dir:如果目标文件夹在 HDFS 上已经存在,那么再次运行就会报错。可以使用–delete-target-dir来先删除目录。也可以使用 append 参数,表
示追加数据;
num-mappers:启动多少个Map Task;默认启动4个Map Task;也可以写成 -m 1
fields-terminated-by:HDFS文件中数据的分隔符;
2. 导入查询数据
1 2 3 4 5 6 7 8 9 sqoop import \ --connect jdbc:mysql://Linux123:3306/sqoop \ --username hive \ --password 12345678 \ --target-dir /root/lagou \ --append \ -m 1 \ --fields-terminated-by "\t" \ --query 'select gname, serialNumber, price, stock_number, create_time from goodtbl where price>88 and $CONDITIONS;'
3. 导入指定的列
1 2 3 4 5 6 7 8 9 10 sqoop import \ --connect jdbc:mysql://Linux123:3306/sqoop \ --username hive \ --password 12345678 \ --target-dir /root/lagou \ --delete-target-dir \ --num-mappers 1 \ --fields-terminated-by "\t" \ --columns gname,serialNumber,price \ --table goodtbl
columns中如果涉及到多列,用逗号分隔,不能添加空格
4. 导入查询数据(使用关键字)
1 2 3 4 5 6 7 8 9 10 sqoop import \ --connect jdbc:mysql://Linux123:3306/sqoop \ --username hive \ --password 12345678 \ --target-dir /root/lagou \ --delete-target-dir \ -m 1 \ --fields-terminated-by "\t" \ --table goodtbl \ --where "price>=68"
5. 启动多个Map Task导入数据
1 2 3 4 5 # 给 goodtbl 表增加主键 alter table goodtbl add primary key(serialNumber); # 在 goodtbl 中增加数据 call batchInsertTestData(100,1000000);
1 2 3 4 5 6 7 8 9 sqoop import \ --connect jdbc:mysql://Linux123:3306/sqoop \ --username hive \ --password 12345678 \ --target-dir /root/lagou \ --delete-target-dir \ --fields-terminated-by "\t" \ --table goodtbl \ --split-by gname
使用多个 Map Task 进行数据导入时,sqoop 要对每个Task的数据进行分区
如果 MySQL 中的表有主键,指定 Map Task 的个数就行
如果 MySQL 中的表没有主键,要使用 split-by 指定分区字段
如果分区字段是字符类型,使用 sqoop 命令的时候要添加:-Dorg.apache.sqoop.splitter.allow_text_splitter=true。即
1 2 3 4 sqoop import - Dorg.apache.sqoop.splitter.allow_text_splitter=true \ --connect jdbc:mysql://Liunx123:3306/sqoop \ ... ...
查询语句的where子句中的 ‘$CONDITIONS’ ,也是为了做数据分区使用的,即使只有1个Map Task
1 2 3 4 5 6 7 8 9 10 sqoop import \ -Dorg.apache.sqoop.splitter.allow_text_splitter=true \ --connect jdbc:mysql://Linux123:3306/sqoop \ --username hive \ --password 12345678 \ --target-dir /root/lagou \ --delete-target-dir \ --fields-terminated-by "\t" \ --table goodtbl \ --split-by gname
MySQL 到 Hive
1 2 3 4 5 6 7 CREATE TABLE mydb.goodtbl( gname string, serialNumber int, price int, stock_number int, create_time date );
1 2 3 4 5 6 7 8 9 10 11 sqoop import \ --connect jdbc:mysql://Linux123:3306/sqoop \ --username hive \ --password 12345678 \ --table goodtbl \ --hive-import \ --create-hive-table \ --fields-terminated-by "\t" \ --hive-overwrite \ --hive-table mydb.goodtbl \ -m 1
hive-import。必须参数,指定导入hive
hive-database。Hive库名(缺省值default)
hive-table。Hive表名
fields-terminated-by。Hive字段分隔符
hive-overwrite。覆盖中已经存在的数据
create-hive-table。创建好 hive 表,但是表可能存在错误。不建议使用这个参数,建议提前建好表
1 2 3 4 5 6 7 8 9 10 sqoop import \ --connect jdbc:mysql://Linux123:3306/sqoop \ --username hive \ --password 12345678 \ --table goodtbl \ --hive-import \ --fields-terminated-by "\t" \ --hive-overwrite \ --hive-table mydb.goodtbl \ -m 1
导出数据
Hive/HDFS到RDBMS
MySQL表需要提前创建
1 2 3 4 5 6 7 8 # 提前创建表 CREATE TABLE sqoop.goodtbl2( gname varchar(50), serialNumber int, price int, stock_number int, create_time date );
1 2 3 4 5 6 7 8 9 # 执行导出 sqoop export \ --connect jdbc:mysql://Linux123:3306/sqoop \ --username hive \ --password 12345678 \ --table goodtbl2 \ --num-mappers 1 \ --export-dir /user/hive/warehouse/mydb.db/goodtbl \ --input-fields-terminated-by "\t"
增量数据导入
变化数据捕获(CDC)
前面都是执行的全量数据导入。如果数据量很小,则采取完全源数据抽取;如果源数据量很大,则需要抽取发生变化的数据,这种数据抽取模式叫做变化数据捕获,简称CDC(Change Data Capture)。
CDC大体分为两种:侵入式和非侵入式。侵入式指CDC操作会给源系统带来性能影响,只要CDC操作以任何一种方式对源数据库执行了SQL操作,就认为是侵入式的。
常用的4种CDC方法是(前三种是侵入式的):
基于时间戳的CDC 。抽取过程可以根据某些属性列来判断哪些数据是增量的,最常见的属性列有以下两种:
时间戳:最好有两个列,一个插入时间戳,表示何时创建,一个更新时间戳,表示最后一次更新的时间;时间戳的CDC是最简单且常用的,但是有如下缺点:
不能记录删除记录的操作
无法识别多次更新
不具有实时能力
序列:大多数数据库都提供自增功能,表中的列定义成自增的,很容易地根据该列识别新插入的数据;
基于触发器的CDC 。当执行INSERT、UPDATE、DELETE这些SQL语句时,激活数据库里的触发器,使用触发器可捕获变更的数据,并把数据保存在中间临时表里。然后这些变更数据再从临时表取出。大多数场合下,不允许向操作型数据库里添加触发器,且这种方法会降低系统性能,基本不会被采用;
基于快照的CDC 。可以通过比较源表和快照表来获得数据变化。基于快照的CDC可以检测到插入、更新和删除的数据,这是相对于基于时间戳的CDC方案的优点。其缺点是需要大量存储空间来保存快照;
基于日志的CDC 。最复杂的和没有侵入性的CDC方法是基于日志的方式。数据库会把每个插入、更新、删除操作记录到日志里。解析日志文件,就可以获取相关信息。每个关系型数据库日志格式不一致,没有通用的产品。阿里巴巴的canal可以完成MySQL日志文件解析。
基于时间戳的CDC
增量导入数据分为两种方式:
Append方式
1. 准备初始数据
1 2 3 4 5 6 7 8 -- 删除 MySQL 表中的全部数据 truncate table sqoop.goodtbl; -- 删除 Hive 表中的全部数据 truncate table mydb.goodtbl; -- 向MySQL的表中插入100条数据 call batchInsertTestData(1, 100);
2. 将数据导入Hive
1 2 3 4 5 6 7 8 9 10 11 12 sqoop import \ --connect jdbc:mysql://Linux123:3306/sqoop \ --username hive \ --password 12345678 \ --table goodtbl \ --incremental append \ --hive-import \ --fields-terminated-by "\t" \ --hive-table mydb.goodtbl \ --check-column serialNumber \ --last-value 50 \ -m 1
3. 检查hive表中是否有数据,有多少条数据
1 2 3 4 select count(1) from mydb.goodtbl; OK _c0 50
4. 再向MySQL中加入1000条数据,编号从200开始
1 call batchInsertTestData(200, 1000);
5. 再次执行增量导入,将数据从 MySQL 导入 Hive 中;此时要将 last-value 改为100
1 2 3 4 5 6 7 8 9 10 11 12 sqoop import \ --connect jdbc:mysql://Linux123:3306/sqoop \ --username hive \ --password 12345678 \ --table goodtbl \ --incremental append \ --hive-import \ --fields-terminated-by "\t" \ --hive-table mydb.goodtbl \ --check-column serialNumber \ --last-value 100 \ -m 1
6. 再检查hive表中是否有数据,有多少条数据
1 2 3 4 select count(1) from mydb.goodtbl; OK _c0 1050
执行 job
执行数据增量导入有两种实现方式:
很明显方式2更简便
1. 创建口令文件
1 2 3 4 5 6 7 echo -n "12345678" > sqoopPWD.pwd hdfs dfs -mkdir -p /sqoop/pwd hdfs dfs -put sqoopPWD.pwd /sqoop/pwd hdfs dfs -chmod 400 /sqoop/pwd/sqoopPWD.pwd # 可以在 sqoop 的 job 中增加: --password-file /sqoop/pwd/sqoopPWD.pwd
2. 创建 sqoop job
1 2 3 4 5 6 7 8 9 10 11 12 # 创建 sqoop job sqoop job --create myjob1 -- import \ --connect jdbc:mysql://Linux123:3306/sqoop?useSSL=false \ --username hive \ --password-file /sqoop/pwd/sqoopPWD.pwd \ --table goodtbl \ --incremental append \ --hive-import \ --hive-table mydb.goodtbl \ --check-column serialNumber \ --last-value 0 \ -m 1
3. 查看、执行job
1 2 3 4 5 6 7 8 # 查看已创建的job sqoop job --list # 查看job详细运行是参数 sqoop job --show myjob1 # 执行job sqoop job --exec myjob1
4. 删除job
1 2 # 删除job sqoop job --delete myjob1
实现原理
因为job执行完成后,会把当前check-column的最大值记录到meta中,下次再调起时把此值赋给last-value。
缺省情况下元数据保存在 ~/.sqoop/ 其中,metastore.db.script 文件记录了对last-value的更新操作
1 2 3 cd ~/.sqoop cat metastore.db.script |grep incremental.last.value