1 Star 0 Fork 0

shampoo6 / cjsf-20dsj1

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
使用flume监听目录.md 1.83 KB
一键复制 编辑 原始数据 按行查看 历史
shampoo6 提交于 2023-05-25 14:43 . update

使用flume监听目录

接下来我们要做到如下的效果:

假设有一个目录 listendDir,我们可以使用 flume 监听这个目录中的所有文件,例如新增一个文件 listendDir/test.txt,那么 flume 就能自动将文件转移到 hadoop hdfs 中。

创建listendDir

创建目录 mkdir ~/listendDir

创建 flume 配置文件

job 目录下创建文件 listen-dir.conf

内容如下:

# listen-dir.conf
a1.sources = r1
a1.sources.r1.type = spooldir
# 这个是要监听的目录路径
a1.sources.r1.spoolDir = /home/master/listendDir
a1.sources.r1.fileHeader = true
a1.sources.r1.fileSuffix = .COMPLETED
a1.sources.r1.basenameHeader = true
a1.sources.r1.deletePolicy = immediate
a1.sources.r1.ignorePattern = ^\.
a1.sources.r1.deserializer = org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder

a1.sinks = k1
a1.sinks.k1.type = hdfs
# 这个是 hdfs 中对应的目录路径
a1.sinks.k1.hdfs.path = hdfs://master:9000/listendDir
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.rollSize = 0
a1.sinks.k1.hdfs.rollCount = 10000
a1.sinks.k1.hdfs.batchSize = 100
a1.sinks.k1.hdfs.useLocalTimeStamp = true

a1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

hdfs 上创建对应目录

hdfs 上创建对应目录 /listendDir

hdfs dfs -mkdir /listendDir

启动 flume

bin/flume-ng agent -n a1 -c conf -f job/listen-dir.conf -Dflume.log.file -Dflume.root.logger=INFO,console

测试

~/listendDir 中添加 test.txt 文件,随便输入点内容那个,查看是否被同步到 hdfs

1
https://gitee.com/shampoo6/cjsf-20dsj1.git
git@gitee.com:shampoo6/cjsf-20dsj1.git
shampoo6
cjsf-20dsj1
cjsf-20dsj1
master

搜索帮助