ThinkChat2.0新版上线,更智能更精彩,支持会话、画图、视频、阅读、搜索等,送10W Token,即刻开启你的AI之旅 广告
[TOC] ## Flume Apache Flume 是一个 **分布式、高可靠、高可用** 的 **日志数据采集系统**,主要用于 **高效地收集、聚合并传输大规模日志数据**,通常把日志数据从各种来源(Web 服务器、应用程序、消息系统)采集到 **HDFS、HBase、Kafka** 等大数据存储系统中。 ## 核心特点 1. **专注日志采集** * 典型场景:Web 服务器的访问日志、应用日志、事件日志。 2. **高吞吐量** * 设计目标就是能应对 **大规模数据流**,支持 **批量传输**、**可扩展**。 3. **可扩展架构** * Flume 的架构基于 **Source → Channel → Sink** 三段式流水线: * **Source**:数据采集入口(比如监听日志文件、接收HTTP、Kafka、Avro)。 * **Channel**:数据缓冲区(内存 / 文件 / Kafka 等)。 * **Sink**:数据输出目标(HDFS、HBase、Kafka、自定义存储)。 4. **可靠性** * 提供事务机制,保证数据不会丢失(at-least-once)。 5. **易扩展** * 插件化设计,可以通过自定义 Source / Sink 来扩展。 ## 示例 go ``` package main import ( "bytes" "fmt" "net/http" ) func main() { // Flume HTTP Source 地址 url := "http://localhost:41414" // 模拟一条日志 logData := `{"headers":{"timestamp":"%d"},"body":"Hello Flume from Go!"}` message := fmt.Sprintf(logData, 1670000000000) // Flume HTTP Source 接收的是 JSON 数组 data := []byte("[" + message + "]") // 发送 POST 请求 resp, err := http.Post(url, "application/json", bytes.NewBuffer(data)) if err != nil { panic(err) } defer resp.Body.Close() fmt.Println("发送日志到 Flume,返回状态:", resp.Status) } ``` 架构 ``` 日志源 (Web/App) ↓ Source [Channel] —— 缓冲区 ↓ Sink HDFS / HBase / Kafka / ElasticSearch ... ``` flume.conf ``` # 定义 agent agent1.sources = r1 agent1.sinks = k1 agent1.channels = c1 # Source: HTTP Source,监听 41414 端口 agent1.sources.r1.type = http agent1.sources.r1.port = 41414 # Channel: Memory Channel agent1.channels.c1.type = memory agent1.channels.c1.capacity = 10000 agent1.channels.c1.transactionCapacity = 1000 # Sink: 输出到本地文件 agent1.sinks.k1.type = file_roll agent1.sinks.k1.channel = c1 agent1.sinks.k1.sink.directory = /tmp/flume-logs # 绑定关系 agent1.sources.r1.channels = c1 agent1.sinks.k1.channel = c1 ```