配置文件

#代理名称 tier1
# Please paste flume.conf here. Example:

# Sources, channels, and sinks are defined per
# agent name, in this case 'tier1'.
tier1.sources  = source1
tier1.channels = channel1
tier1.sinks    = sink1

# For each source, channel, and sink, set
# standard properties.
tier1.sources.source1.type     = netcat
tier1.sources.source1.bind     = 127.0.0.1
tier1.sources.source1.port     = 9999
tier1.sources.source1.channels = channel1
tier1.channels.channel1.type   = memory
tier1.sinks.sink1.type         = logger
tier1.sinks.sink1.channel      = channel1

# Other properties are specific to each type of
# source, channel, or sink. In this case, we
# specify the capacity of the memory channel.
tier1.channels.channel1.capacity = 100

pro.sources = s1
pro.channels = c1
pro.sinks = k1

pro.sources.s1.type = exec
pro.sources.s1.command = tail -F /data/cdh/test/log/wordcount.log

pro.channels.c1.type = memory
pro.channels.c1.capacity = 1000
pro.channels.c1.transactionCapacity = 100

pro.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
pro.sinks.k1.kafka.topic = wordcount
pro.sinks.k1.kafka.bootstrap.servers = xxxdata002:9092
pro.sinks.k1.kafka.flumeBatchSize = 20
pro.sinks.k1.kafka.producer.acks = 1
pro.sinks.k1.kafka.producer.linger.ms = 1
pro.sinks.k1.kafka.producer.compression.type = snappy

pro.sources.s1.channels = c1
pro.sinks.k1.channel = c1

kafka topic

wordcount

log dir

/data/cdh/test/log
nohup flume-ng agent -n pro  -c ./ -f wordcount-log-to-kafka.conf &
flume-ng agent -n pro -c ./ -f wordcount-log-to-kafka.conf
nohup flume-ng agent --name pro --conf flume/ --conf-file flume/wordcount-log-to-kafka.conf &
Logo

永洪科技,致力于打造全球领先的数据技术厂商,具备从数据应用方案咨询、BI、AIGC智能分析、数字孪生、数据资产、数据治理、数据实施的端到端大数据价值服务能力。

更多推荐