Flume 监控kafka主题写HDFS小结
Flume 监控kafka主题写HDFS小结需求采集Flume采集kafka主题内容,往Hadoop集群上写HDFS,该机器没有安装Hadoop这里的Flume版本是1.7.0,Hadoop版本是2.7.2,kafka版本是0.11.0.2把Hadoop集群的hdfs-site.xml、core-site.xml两个配置文件复制到 flume安装目录的conf目录去,把hadoop-hdfs...
Flume 监控kafka主题写HDFS小结
需求采集Flume采集kafka主题内容,往Hadoop集群上写HDFS,该机器没有安装Hadoop
这里的Flume版本是1.7.0,Hadoop版本是2.7.2,kafka版本是0.11.0.2
把Hadoop集群的hdfs-site.xml、core-site.xml两个配置文件复制到 flume安装目录的conf目录去,把hadoop-hdfs-2.7.2.jar复制到 Flume lib目录。
一、Flume配置文件:
## 组件
a1.sources=r1 r2
a1.channels=c1 c2
a1.sinks=k1 k2
## source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = 192.168.110.114:9092,192.168.110.114:9092,192.168.110.114:9092
a1.sources.r1.kafka.topics=topic_start
## source2
a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r2.batchSize = 5000
a1.sources.r2.batchDurationMillis = 2000
a1.sources.r2.kafka.bootstrap.servers = 192.168.110.114:9092,192.168.110.114:9092,192.168.110.114:9092
a1.sources.r2.kafka.topics=topic_event
## channel1
a1.channels.c1.type=memory
a1.channels.c1.capacity=100000
a1.channels.c1.transactionCapacity=10000
## channel2
a1.channels.c2.type=memory
a1.channels.c2.capacity=100000
a1.channels.c2.transactionCapacity=10000
## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_start/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = logstart-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 30
a1.sinks.k1.hdfs.roundUnit = second
##sink2
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = /origin_data/gmall/log/topic_event/%Y-%m-%d
a1.sinks.k2.hdfs.filePrefix = logevent-
a1.sinks.k2.hdfs.round = true
a1.sinks.k2.hdfs.roundValue = 30
a1.sinks.k2.hdfs.roundUnit = second
## 不要产生大量小文件
a1.sinks.k1.hdfs.rollInterval = 30
a1.sinks.k1.hdfs.rollSize = 0
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k2.hdfs.rollInterval = 30
a1.sinks.k2.hdfs.rollSize = 0
a1.sinks.k2.hdfs.rollCount = 0
## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1
a1.sources.r2.channels = c2
a1.sinks.k2.channel= c2
启动: bin/flume-ng agent --conf conf --conf-file conf/kafka-flume-hdfs.conf --name a1 -Dflume.root.logger=info,console
二、错误集:
1、 找不到主机名
(SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:459)] process failed
java.lang.IllegalArgumentException: java.net.UnknownHostException: cluster
at org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:378)
at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:310)
at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:678)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:619)
at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:149)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2653)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:92)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2687)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2669)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:371)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:170)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:355)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:243)
at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:235)
at org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:679)
at org.apache.flume.auth.SimpleAuthenticator.execute(SimpleAuthenticator.java:50)
at org.apache.flume.sink.hdfs.BucketWriter$9.call(BucketWriter.java:676)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
Caused by: java.net.UnknownHostException: cluster
cluster是公司Hadoop集群NameService的名字,这个错误是由于找不到Hadoop集群NameService造成的,所以需要把hdfs-site.xml复制到flume/conf目录。
2. 创建文件失败
java.io.IOException: Mkdirs failed to create /test/flume/02-28 (exists=false, cwd=file:/data/apache-flume-1.6.0-bin)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:450)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:435)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:909)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:890)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:787)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:776)
at org.apache.flume.sink.hdfs.HDFSSequenceFile.open(HDFSSequenceFile.java:96)
at org.apache.flume.sink.hdfs.HDFSSequenceFile.open(HDFSSequenceFile.java:78)
at org.apache.flume.sink.hdfs.HDFSSequenceFile.open(HDFSSequenceFile.java:69)
at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:246)
at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:235)
at org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:679)
at org.apache.flume.auth.SimpleAuthenticator.execute(SimpleAuthenticator.java:50)
at org.apache.flume.sink.hdfs.BucketWriter$9.call(BucketWriter.java:676)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
把 core-site.xml复制到flume/conf目录
3.无文件系统
java.io.IOException: No FileSystem for scheme: hdfs
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2644)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2651)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:92)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2687)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2669)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:371)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:170)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:355)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:243)
at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:235)
at org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:679)
at org.apache.flume.auth.SimpleAuthenticator.execute(SimpleAuthenticator.java:50)
at org.apache.flume.sink.hdfs.BucketWriter$9.call(BucketWriter.java:676)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
把hadoop-hdfs-2.7.2.jar复制到flume/lib目录下
4、HDFS权限不足,这里往HDFS写文件的用户是登录Flume采集机器的用户。
org.apache.hadoop.security.AccessControlException: Permission denied: user=kafka, access=WRITE, inode="/test/flume/02-28/events-.1474268726127.tmp":hadoop:supergroup:drwxr-xr-x
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:319)
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:292)
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:213)
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:190)
at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1698)
at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1682)
at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkAncestorAccess(FSDirectory.java:1665)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2517)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2452)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2335)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:623)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:397)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)
HDFS 权限不足,要授权。hadoop fs -chmod -R 777 /test/
5.时间戳
java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null
at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204)
at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:228)
at org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:432)
at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:380)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:744)
原因是Event对象headers没有设置timestamp造成的,解决办法:设置a1.sinks.k1.hdfs.useLocalTimeStamp=true,使用本地时间戳。
版权声明:本博客为记录本人自学感悟,转载需注明出处!
https://me.csdn.net/qq_39657909
更多推荐
所有评论(0)