Flumedistributedlog收集

深入LearningFlumedistributedlog收集system basicconcepts, architecturedesign, working principles and usingmethod

Flumedistributedlog收集

1. FlumeIntroduction

Flume is a distributed, reliable , high availability log收集system, 用于 from 不同来sources收集, aggregate and move big 量logdata to 集in式datastore. 它 is Apache基金会 顶级project, widely used in big dataecosystemin.

1.1 Flume design目标

  • reliability: throughtransactionmechanism确保data reliable 传输
  • high availability性: supportfailure自动转移 and load balancing
  • distributed: supportdistributeddeployment, 可以横向scale
  • 可scale性: support many 种datasources and 目 地
  • flexible性: support many 种data格式 and 传输方式
  • 易用性: providing simple configuration and management界面

1.2 Flume application场景

  • log收集: 收集Webserver, applicationserveretc.产生 log
  • event收集: 收集systemevent, userbehavioreventetc.
  • dataaggregate: 将分散 dataaggregate to 集in式store
  • datamove: 将data from 一个storesystemmove to 另一个storesystem
  • 实时dataprocessing: and 实时processingsystem (such asStorm, Flink) 集成

2. Flumecore concepts

2.1 event (Event)

event is Flumein basicdata单位, 它package含:

  • Body: event practicaldata, 通常 is log记录
  • Headers: event 元data, is 键值 for 形式, 用于describesevent property

2.2 proxy (Agent)

proxy is Flume basic工作单元, 它 is a JVMprocess, package含以 under component:

  • Source: from datasources收集data, 将其转换 for event
  • Channel: 临时storeevent, 直 to Sink将其consume
  • Sink: from Channelin读取event, 将其发送 to 目 地

2.3 Source

Source负责 from datasources收集data, 将其转换 for event, concurrent送 to Channel. Flumesupport many 种Sourceclass型, including:

  • Avro Source: 接收Avro格式 data
  • Thrift Source: 接收Thrift格式 data
  • Exec Source: 执行commands, 收集commands 输出
  • Spooling Directory Source: monitorTable of Contents, 收集 new 增 file
  • NetCat Source: 监听network端口, 收集networkdata
  • HTTP Source: 接收HTTPrequest, 收集requestdata

2.4 Channel

Channel is 连接Source and Sink component, 用于临时storeevent. Flumesupport many 种Channelclass型, including:

  • Memory Channel: 将eventstore in memoryin, 速度 fast 但reliability low
  • File Channel: 将eventstore in 本地filesystemin, reliability high 但速度较 slow
  • Kafka Channel: 将eventstore in Kafkain, 结合了reliability and high performance

2.5 Sink

Sink负责 from Channelin读取event, 将其发送 to 目 地. Flumesupport many 种Sinkclass型, including:

  • HDFS Sink: 将event写入HDFS
  • HBase Sink: 将event写入HBase
  • Kafka Sink: 将event写入Kafka
  • Avro Sink: 将event发送 to Avro Source
  • Thrift Sink: 将event发送 to Thrift Source
  • Elasticsearch Sink: 将event写入Elasticsearch
  • Logger Sink: 将event写入logfile

2.6 拦截器 (Interceptor)

拦截器用于 in Source and Channel之间, Channel and Sink之间modify or filterevent. Flumesupport many 种拦截器, including:

  • Timestamp Interceptor: 添加时间戳 to event头
  • Host Interceptor: 添加主机名 or IP to event头
  • Static Interceptor: 添加静态键值 for to event头
  • Regex Filter Interceptor: 根据正则表达式filterevent
  • Regex Extractor Interceptor: 根据正则表达式提取字段 to event头

2.7 通道选择器 (Channel Selector)

通道选择器用于将Source产生 event发送 to 一个 or many 个Channel. Flumesupport两种通道选择器:

  • Replicating Channel Selector: 将eventcopy to 所 has 关联 Channel (默认)
  • Multiplexing Channel Selector: 根据event头 键值 for 将event发送 to 不同 Channel

2.8 Sink组 (Sink Group)

Sink组用于将 many 个Sink组合 in 一起, implementationload balancing and failure转移. Flumesupport两种Sinkprocessing器:

  • Load Balancing Sink Processor: implementationload balancing
  • Failover Sink Processor: implementationfailure转移

3. Flumearchitecturedesign

3.1 单个proxyarchitecture

单个proxyarchitecture is Flume basicarchitecture, package含一个Source, 一个 or many 个Channel and 一个 or many 个Sink.

Source → Channel → Sink
         ↑       ↓
         |       |
         └───────┘

3.2 many 级proxyarchitecture

many 级proxyarchitecture用于将data from 一个proxy传递 to 另一个proxy, implementationdata many 级processing and routing.

Source1 → Channel1 → Avro Sink → Avro Source → Channel2 → Sink2
                        ↑                                       ↓
                        |                                       |
                        └───────────────────────────────────────┘

3.3 扇入architecture

扇入architecture用于将 many 个Source dataaggregate to 一个Channelin.

Source1 →
          → Channel → Sink
Source2 →

3.4 扇出architecture

扇出architecture用于将一个Source data发送 to many 个Channelin.

          → Channel1 → Sink1
Source →
          → Channel2 → Sink2

4. Flumeinstallation and configuration

4.1 installation before 提

installationFlume之 before , 需要先installationJava JDK 1.8+.

4.2 installation步骤

# 1.  under 载Flume
$ wget https://dlcdn.apache.org/flume/1.10.1/apache-flume-1.10.1-bin.tar.gz

# 2. 解压Flume
$ tar -xzvf apache-flume-1.10.1-bin.tar.gz
$ mv apache-flume-1.10.1-bin /opt/flume

# 3. configurationenvironmentvariable
$ export FLUME_HOME=/opt/flume
$ export PATH=$PATH:$FLUME_HOME/bin

# 4. configurationFlume
$ cp $FLUME_HOME/conf/flume-env.sh.template $FLUME_HOME/conf/flume-env.sh
$ echo "export JAVA_HOME=/opt/java" >> $FLUME_HOME/conf/flume-env.sh

# 5. creationconfigurationfileTable of Contents
$ mkdir -p $FLUME_HOME/conf/agents

4.3 basicconfigurationexample

# creation一个 simple  Flumeproxyconfigurationfile
$ vim $FLUME_HOME/conf/agents/simple-agent.conf

# configurationproxy名称
agent1.sources = source1
agent1.channels = channel1
agent1.sinks = sink1

# configurationSource
agent1.sources.source1.type = netcat
agent1.sources.source1.bind = localhost
agent1.sources.source1.port = 44444

# configurationChannel
agent1.channels.channel1.type = memory
agent1.channels.channel1.capacity = 1000
agent1.channels.channel1.transactionCapacity = 100

# configurationSink
agent1.sinks.sink1.type = logger

# 连接component
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1

5. Flumebasicoperation

5.1 启动Flumeproxy

# 启动Flumeproxy
$ flume-ng agent --name agent1 --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/agents/simple-agent.conf -Dflume.root.logger=INFO,console

# using简写形式启动
$ flume-ng agent -n agent1 -c $FLUME_HOME/conf -f $FLUME_HOME/conf/agents/simple-agent.conf -Dflume.root.logger=INFO,console

5.2 testFlumeproxy

#  in 另一个终端in, usingnetcat发送data to Flumeproxy
$ nc localhost 44444
hello flume
world flume
# 查看Flumeproxy 输出, 应该看 to 接收 to  event

5.3 停止Flumeproxy

usingCtrl+C停止Flumeproxy, or 者usingkillcommands:

# findFlumeprocess
$ jps | grep Application
# usingkillcommands停止process
$ kill -9 

6. Flumeadvancedconfiguration

6.1 configurationfile语法

Flumeconfigurationfileusing键值 for 形式, 语法 for :

agent_name.component_type.component_name.property_name = value

# example: 
agent1.sources.source1.type = netcat

6.2 常用Sourceconfiguration

6.2.1 NetCat Source
agent1.sources.source1.type = netcat
agent1.sources.source1.bind = localhost
agent1.sources.source1.port = 44444
agent1.sources.source1.max-line-length = 512
6.2.2 Exec Source
agent1.sources.source1.type = exec
agent1.sources.source1.command = tail -F /var/log/nginx/access.log
agent1.sources.source1.restart = true
agent1.sources.source1.restartThrottle = 10000
6.2.3 Spooling Directory Source
agent1.sources.source1.type = spooldir
agent1.sources.source1.spoolDir = /var/log/flume/spool
agent1.sources.source1.fileSuffix = .COMPLETED
agent1.sources.source1.fileHeader = true
agent1.sources.source1.ignorePattern = ^\.

6.3 常用Channelconfiguration

6.3.1 Memory Channel
agent1.channels.channel1.type = memory
agent1.channels.channel1.capacity = 10000
agent1.channels.channel1.transactionCapacity = 1000
agent1.channels.channel1.byteCapacityBufferPercentage = 20
agent1.channels.channel1.byteCapacity = 800000
6.3.2 File Channel
agent1.channels.channel1.type = file
agent1.channels.channel1.checkpointDir = /var/log/flume/checkpoint
agent1.channels.channel1.dataDirs = /var/log/flume/data
agent1.channels.channel1.transactionCapacity = 1000
agent1.channels.channel1.checkpointInterval = 30000

6.4 常用Sinkconfiguration

6.4.1 Logger Sink
agent1.sinks.sink1.type = logger
agent1.sinks.sink1.maxBytesToLog = 1024
6.4.2 HDFS Sink
agent1.sinks.sink1.type = hdfs
agent1.sinks.sink1.hdfs.path = hdfs://localhost:9000/flume/logs/%y-%m-%d
agent1.sinks.sink1.hdfs.filePrefix = events-
agent1.sinks.sink1.hdfs.rollInterval = 3600
agent1.sinks.sink1.hdfs.rollSize = 104857600
agent1.sinks.sink1.hdfs.rollCount = 0
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.writeFormat = Text
agent1.sinks.sink1.hdfs.batchSize = 100
6.4.3 Avro Sink
agent1.sinks.sink1.type = avro
agent1.sinks.sink1.hostname = localhost
agent1.sinks.sink1.port = 41414

7. Flume集成example

7.1 集成HDFS

usingFlume将logdata写入HDFS:

# creationconfigurationfile
$ vim $FLUME_HOME/conf/agents/hdfs-agent.conf

agent1.sources = source1
agent1.channels = channel1
agent1.sinks = sink1

# configurationSource (监听file) 
agent1.sources.source1.type = exec
agent1.sources.source1.command = tail -F /var/log/nginx/access.log
agent1.sources.source1.restart = true

# configurationChannel
agent1.channels.channel1.type = file
agent1.channels.channel1.checkpointDir = /var/log/flume/checkpoint
agent1.channels.channel1.dataDirs = /var/log/flume/data

# configurationSink (写入HDFS) 
agent1.sinks.sink1.type = hdfs
agent1.sinks.sink1.hdfs.path = hdfs://localhost:9000/flume/nginx/%y-%m-%d
agent1.sinks.sink1.hdfs.filePrefix = access-
agent1.sinks.sink1.hdfs.rollInterval = 3600
agent1.sinks.sink1.hdfs.rollSize = 104857600
agent1.sinks.sink1.hdfs.fileType = DataStream

# 连接component
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1

# 启动proxy
$ flume-ng agent -n agent1 -c $FLUME_HOME/conf -f $FLUME_HOME/conf/agents/hdfs-agent.conf

7.2 many 级proxyexample

using两个Flumeproxy将data from 一个proxy传递 to 另一个proxy:

# creation第一个proxyconfiguration (采集端) 
$ vim $FLUME_HOME/conf/agents/collector-agent.conf

collector.sources = source1
collector.channels = channel1
collector.sinks = sink1

collector.sources.source1.type = exec
collector.sources.source1.command = tail -F /var/log/nginx/access.log

collector.channels.channel1.type = memory
collector.channels.channel1.capacity = 1000
collector.channels.channel1.transactionCapacity = 100

collector.sinks.sink1.type = avro
collector.sinks.sink1.hostname = localhost
collector.sinks.sink1.port = 41414

collector.sources.source1.channels = channel1
collector.sinks.sink1.channel = channel1

# creation第二个proxyconfiguration (aggregate端) 
$ vim $FLUME_HOME/conf/agents/aggregator-agent.conf

aggregator.sources = source1
aggregator.channels = channel1
aggregator.sinks = sink1

aggregator.sources.source1.type = avro
aggregator.sources.source1.bind = localhost
aggregator.sources.source1.port = 41414

aggregator.channels.channel1.type = file
aggregator.channels.channel1.checkpointDir = /var/log/flume/checkpoint
aggregator.channels.channel1.dataDirs = /var/log/flume/data

aggregator.sinks.sink1.type = hdfs
aggregator.sinks.sink1.hdfs.path = hdfs://localhost:9000/flume/nginx/%y-%m-%d
aggregator.sinks.sink1.hdfs.filePrefix = access-
aggregator.sinks.sink1.hdfs.rollInterval = 3600

aggregator.sources.source1.channels = channel1
aggregator.sinks.sink1.channel = channel1

# 启动第二个proxy (aggregate端) 
$ flume-ng agent -n aggregator -c $FLUME_HOME/conf -f $FLUME_HOME/conf/agents/aggregator-agent.conf

# 启动第一个proxy (采集端) 
$ flume-ng agent -n collector -c $FLUME_HOME/conf -f $FLUME_HOME/conf/agents/collector-agent.conf

实践练习

练习1: Flumeinstallation and configuration

installation并configurationFlume, creation一个 simple proxyconfiguration.

练习2: basicproxytest

usingNetCat Source and Logger Sinkcreation一个 simple Flumeproxy, testdata传输.

练习3: file采集 to HDFS

configurationFlumeproxy, 将本地logfile采集 to HDFSin.

练习4: many 级proxyconfiguration

configuration两个Flumeproxy, implementationdata many 级传输.

练习5: Sink组configuration

configurationSink组, implementationload balancing and failure转移.

8. Flumebest practices

8.1 performanceoptimization

  • 选择合适 Channel: for 于 high reliability要求, usingFile Channel; for 于 high performance要求, usingMemory Channel
  • 调整Channel容量: 根据practicalload调整Channel capacity and transactionCapacity
  • 批量processing: 调整Sink batchSize, 增加批量processingcapacity
  • 合理设置rollparameter: 根据data量调整HDFS Sink rollInterval, rollSize and rollCount
  • using压缩: for 于HDFS Sink, 启用压缩可以reducingstore空间 and network传输

8.2 reliabilitydesign

  • usingtransaction: 确保Source and Sinkusingtransactionmechanism
  • configuration重试mechanism: for Sinkconfiguration重试次数 and 重试间隔
  • usingSink组: configurationFailover Sink Processorimplementationfailure转移
  • monitorChannel填满: monitorChannel using率, 及时扩容
  • 定期backupCheckpoint: 定期backupFile Channel Checkpoint and dataTable of Contents

8.3 monitor and management

  • 启用JMXmonitor: throughJMXmonitorFlumeproxy runstatus
  • configurationlog: 合理configurationFlume log级别, 便于debug and monitor
  • using out 部monitortool: usingGanglia, Nagios, Prometheusetc.toolmonitorFlume
  • 定期cleandata: 定期clean过期 log and datafile

8.4 security考虑

  • configuration访问控制: 限制Flumeproxy 访问permission
  • usingSSL/TLS: for 于敏感data, usingSSL/TLSencryption传输
  • authentication and authorization: 集成Kerberosetc.authenticationmechanism
  • auditlog: 记录Flumeproxy operationlog

9. summarized

本tutorial深入介绍了Flumedistributedlog收集system basicconcepts, architecturedesign and usingmethod. Flumeserving as一个 reliable , high availability distributedlog收集system, 具 has 广泛 application场景, 特别 is in big dataecosystemin.

Flume core conceptsincludingevent, proxy, Source, Channel, Sink, 拦截器, 通道选择器 and Sink组etc.. Flumesupport many 种architecture, including单个proxyarchitecture, many 级proxyarchitecture, 扇入architecture and 扇出architecture, 可以根据practicalrequirements选择合适 architecture.

Flume configuration simple flexible, support many 种datasources and 目 地, 可以 and Hadoopother components in the ecosystem无缝集成. in usingFlume时, 需要注意performanceoptimization, reliabilitydesign, monitor and management and security考虑etc.best practices, 以确保Flumesystem stable run.