数据导入 hive
ftp .csv 文件导入
可以先将文件弄到 HDFS,然后创建/更新 hive 表来关联到 HDFS 文件。
将文件弄到 HDFS有以下一些方法:
- ftp -> local -> hdfs: 将文件先下载到本地,再通过 hdfs 命令拷贝到 hdfs 中
- ftp -> hdfs: 直接连接 FTP,将文件拷到 hdfs 中,省却本地拷贝
- 已有的数据采集工具:使用实时数据流处理系统,来实现不同系统之间的流通
一、ftp -> local ->hdfs
几种方案:
hadoop fs -get ftp://uid:password@server_url/file_path temp_file | hadoop fs -moveFromLocal tmp_file hadoop_path/dest_file参照这个实现用 python 包从 ftp 中读,然后用 hdfs 命令写到 hdfs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17from urllib.request import urlopen
from hdfs import InsecureClient
# You can also use KerberosClient or custom client
namenode_address = 'your namenode address'
webhdfs_port = 'your webhdfs port' # default for Hadoop 2: 50070, Hadoop 3: 9870
user = 'your user name'
client = InsecureClient('http://' + namenode_address + ':' + webhdfs_port, user=user)
ftp_address = 'your ftp address'
hdfs_path = 'where you want to write'
with urlopen(ftp_address) as response:
content = response.read()
# You can also use append=True
# Further reference: https://hdfscli.readthedocs.io/en/latest/api.html#hdfs.client.Client.write
with client.write(hdfs_path) as writer:
writer.write(content
二、ftp -> hdfs
几种方案:(参考 ftp 提取文件到 hdfs)
用 FTP To HDFS 连接 ftp,把文件直接放到 hdfs
HDFS dfs -cp: 简单快速,但不显示进度,适用于小文件
1
$ hdfs dfs –cp [ftp://username:password@hostname/ftp_path] [hdfs:///hdfs_path]
Hadoop distcp: 分布式提取,快,能显示拷贝进度,不支持流式写入(即拷贝的文件不能有其他程序在写入),适合大量文件或大文件的拷贝
1
$ hadoop distcp [ftp://username:password@hostname/ftp_path] [hdfs:///hdfs_path]
三、已有的数据采集工具
文件导入
apache NiFi 来实现不同系统之间的流通,似乎拷贝完,会直接删除 ftp 上的文件
Apache Flume是一个分布式、可靠、高可用的日志收集系统,支持各种各样的数据来源。基于流式数据,适用于日志和事件类型的数据收集,重构后的Flume-NG版本中一个agent(数据传输流程)中的source(源)和sink(目标)之间通过channel进行链接,同一个源可以配置多个channel。多个agent还可以进行链接组合共同完成数据收集任务,使用起来非常灵活。
flume 采集 ftp 文件 上传到 hadoop 使用 spooldir source(不确定是不是能用), 也可以使用第三方 source 组件 flume-ftp-source
Flume 也支持 sql source 的流式导入(使用 flume-ng-sql-source 插件),并提供对数据进行简单处理,并写到各数据接收方的能力。因此它的实时性更好。
DataX:阿里的开源框架,本身社区不太活跃,但有很多 fork 再改的,似乎架构不错
Gobllin: Gobblin是用来整合各种数据源的通用型ETL框架,Gobblin的接口封装和概念抽象做的很好,作为一个ETL框架使用者,我们只需要实现我们自己的Source,Extractor,Conventer类,再加上一些数据源和目的地址之类的配置文件提交给Gobblin就行了。Gobblin相对于其他解决方案具有普遍性、高度可扩展性、可操作性。
kettle:一款开源的ETL工具
其他数据源(非 FTP 文件)
- Apache Sqoop:RDBMS <–> HDFS
- Aegisthus:针对 Cassandra 数据源
- mongo-hadoop:针对 mongodb 数据源
数据导入需要关注的问题
- 数据源都有哪些?
- 结构化(sql)、半结构化(json, xml…)、非结构化(video、image、file…)
- 日志数据(csv)、业务数据
- 是否可以直接连接数据库?
- 针对关系型数据,如果可连接数据库,可以通过 sqoop 导入数据到 hive
- 增量式导入??
- 针对关系型数据,如果不能连接数据库:
- 是否可以默认周期性导出符合特定标准的 .csv 文件?
- 如果数据库导出 dump 文件,再将 dump 文件导入到 hadoop,则比较麻烦,以 oracle 为例,可能需要使用 COPYToBDA 来创建 hive table Query Oracle Dump File on BDA Using Copy2BDA ,或者将 dump 文件先导入到一个 temp oracle 数据库中,再用 sqoop 导入到 hive
- 如果数据库周期性导出 .csv 文件,将这些 .csv 文件使用上述工具(flume 等)导入到 hive,需要关注增量式导出和导入
- 增量式导出:文件的组织结构、命名规范 ,.csv 内 record 要求包含 modified date, delete date(在增量式导入时,需要基于这些时间来合并表)
- 增量式导入:将新增的 .csv 文件作为 hive external table,然后通过中间 view 来合并基表和incremental 表,并更新基表、清空 incremental 表。Incrementally Updating a Table
- 是否可以默认周期性导出符合特定标准的 .csv 文件?
- 针对关系型数据,如果可连接数据库,可以通过 sqoop 导入数据到 hive
- 导入周期和实时性需求
- 哪些需要每天批量导入、哪些需要流式实时导入
- 哪些需要全量导入、哪些需要增量式导入?
- 如何实现增量式导入?删除的数据是否有删除标识(软删除)?
- 如果用 sqoop,参考 sqoop 增量导入,不支持对删除数据的处理
- 如果用 flume
- 如果是 sql source,使用 flume-ng-sql-source, 对于 mysql 可以通过 query
agent.sources.sqlSource.custom.query来获取增量 source - 如果是文件导入,则需要通过 Incrementally Updating a Table 来合并表
- 如果是 sql source,使用 flume-ng-sql-source, 对于 mysql 可以通过 query
- Spark SQL