用distcp实现ftp到hdfs、hdfs到hdfs、hdfs到ftp的文件传输
文件本身存在ftp上,为了文件内的数据和数仓hive中的数据做分析,需要将ftp的数据上传 到hdfs,方法有很多,如果不是需要完全实时监控的话,可以采取hdfs自带的指令distcp抽取; 题外话:完全实时上传可以采取flume监控ftp文件件,再读取存入kafka,后续消费者消费kafka获取文件数据的方式,后续再讲,这节只讲distcp;1. 指令用法 distcp不仅可以实现hdf
0.背景
文件本身存在ftp上,为了文件内的数据和数仓hive中的数据做分析,需要将ftp的数据上传 到hdfs,方法有很多,如果不是需要完全实时监控的话,可以采取hdfs自带的指令distcp
抽取;
题外话:完全实时上传可以采取flume监控ftp文件件,再读取存入kafka,后续消费者消费kafka获取文件数据的方式,后续再讲,这节只讲distcp;
1. 指令用法
distcp
不仅可以实现hdfs集群1到hdfs集群2的文件传输,也可以实现ftp和hdfs之间的文件传输,只需要切换相应的协议头即可;
# 将hdfs1的文件传入hdfs2的集群内
hadoop distcp hdfs://node1:port1/hdfs/path/ hdfs://node2:port2/hdfs/path/
# 将ftp的文件传入hdfs内
hadoop distcp ftp://fptuser:ftppassword@host/ftp/path/ hdfs://node:port1/hdfs/path/
# 将hdfs的文件传入ftp内
你猜?博主觉得你肯定会
这里挑一个将ftp的文件传入hdfs内
细讲;
2. 封装成shell文件
先准备一个权限文件my_config.ini
,权限600,内容如下;
ftp_host_name=10.245.192.13
ftp_port=21
ftp_user_name=test123
ftp_password=test123
mysql_hostname=10.245.192.13
mysql_port=3306
mysql_user_name=test_user
mysql_password=test123
mysql_database=testdb
编写shell脚本ftp_load_to_hdfs.sh
,内容如下;
#! /usr/bin/sh
# 判断shell脚本传参的个数
if [[ $# -ne 2 ]];then
echo "parameters are incorrect,parameters 1 shoild be ftp_file_path,parameters 2 should be hdfs_file_path!"
exit 5
fi
#获取ftp信心,hdfs信息因为最后在集群跑脚本,可以省略,否则一样需要获取
ftp_host_name=`cat ./../src/my_config.ini | grep 'ftp_host_name' | cut -d = -f 2`
ftp_port=`cat ./../src/my_config.ini | grep 'ftp_port' | cut -d = -f 2`
ftp_user_name=`cat ./../src/my_config.ini | grep 'ftp_user_name' | cut -d = -f 2`
ftp_password=`cat ./../src/my_config.ini | grep 'ftp_password' | cut -d = -f 2`
# 参数1为ftp路径,参数2为hdfs路径;
ftp_file_path=${1}
hdfs_file_path=${2}
# 调用distcp 语句执行
hadoop distcp ftp://${ftp_user_name}:${ftp_password}@${ftp_host_name}:${ftp_port}${ftp_file_path} hdfs://${hdfs_file_path}
最后直接调用就行,代码如下,就能吧/data/myfolder/test.json
文件上传到/data/myfolder/test.json
;
bash ./ftp_load_to_hdfs.sh /data/myfolder/test.json /data/myfolder/test.json
题外话:distcp
的本质是运行MapReduce,因为单独传文件,不要根据另外的key做Reduce,所以只有Map操作,还是很快的;
3.进阶—循环将文件从ftp到hdfs
任何文件的路径也并非完全无规律,这时候可以将文件,路径传入mysql存储起来;新建表task_child
;
字段 | 类型 | 含义 | 样例数据 |
---|---|---|---|
childId | bigint | 主键 | 1 |
exportParams | varchar(200) | 文件路径 | /data/myfolder/test.json |
updatetime | datetime | 数据更新时间 | 2021-04-10 10:47:00.256 |
mysql的连接信息一样写入my_config.ini
文件;
#! /bin/bash
# 判断参数,每天T+1的获取昨天的文件,所以参数为event_day,一个参数
if [[ $# -ne 1 ]];then
echo "parameters are incorrect,parameters 1 shoild be event_day,format us yyyyMMdd !"
exit 5
fi
event_day=${1}
# 获取mysql的连接信息 ,参数化
hostname=`cat ./../src/my_config.ini | grep 'mysql_hostname' | cut -d = -f 2`
port=`cat ./../src/my_config.ini | grep 'mysql_port' | cut -d = -f 2`
username=`cat ./../src/my_config.ini | grep 'mysql_user_name' | cut -d = -f 2`
password=`cat ./../src/my_config.ini | grep 'mysql_password' | cut -d = -f 2`
database=`cat ./../src/my_config.ini | grep 'mysql_database' | cut -d = -f 2`
# 获取文件路径的sql,参数化
select_sql="SELECT childId,exportParams FROM task_child WHERE DATE(updatetime)=DATE(DATE_ADD(${event_day},INTERVAL -1 DAY)) AND IFNULL(exportParams,'')<>''and upper(exportParams)<>upper('null');"
#echo $hostname
#echo $port
#echo $username
#echo $password
#echo $database
echo $select_sql
# 利用循环遍历将ftp的文件传入hdfs
i=0
while read line
do
childid=`echo $line | awk '{print $1}'`
ftp_file_path=`echo $line | awk '{print $2}'`
hdfs_file_path=`echo $line | awk '{print $2}'`
echo "ftp_file_path:${ftp_file_path},hdfs_file_path:${hdfs_file_path}"
bash ./ftp_load_to_hdfs.sh ${ftp_file_path} ${hdfs_file_path}
status=$?
if [ $status -eq 0 ]; then
i=$[$i + 1]
echo "Successed: load ${event_day} NO ${i} ftp file of ${childid} to hdfs ${hdfs_file_path} successed!!!!"
else
echo "Error: load ftp file of ${childid} to hdfs ${hdfs_file_path} failed!!!!"
exit 7
fi
done< <(mysql -h${hostname} -P${port} -u${username} -p${password} ${database} -e "${select_sql}" -s)
4.优化hdfs的小文件(节选)
如果文件再ftp上都是小文件(小于hdfs的块大小,hdfs2.x默认为128M),存入hdfs会造成hdfs的压力大,具体原理这里不细讲,可以参考hdfs的原理介绍;这个时候就需要将这些小文件合并;
4.1 合并思路1
将一天或者某个文件夹的小文件读取出来,再合并存入一个新的文件,如用spark等,读取文件内容,再存一个新的文件;
// 将/data/myfolder/全部读进df1
val df1=spark.read.json("/data/myfolder/")
df1.repartition(1).write.save("/data/myfolder/union_data")
4.1 合并思路2
利用hdfs自带的archive
归档;
将hdfs的/data
目录下的myfolder
文件夹归档到/data
下的test.har文件;
hadoop archive -archiveName test.har -p /data myfolder /data
hadoop存档是特殊格式的存档。Hadoop归档文件映射到文件系统目录。Hadoop归档文件始终具有* .har扩展名。Hadoop存档目录包含元数据(以_index和_masterindex的形式)和数据(part- *)文件。_index文件包含作为归档文件一部分的文件名以及这些文件内的位置,更多的hadoop archive可以参考官网:Hadoop Archives Guide
总结:这两种文件合并都不会删除源hdfs的文件,所以需要等分析任务确定能完整的分析道合并的文件后,把hdfs的源小文件删除掉;
更多推荐
所有评论(0)