# flume-Elasticsearch-sink **Repository Path**: eluup/flume-Elasticsearch-sink ## Basic Information - **Project Name**: flume-Elasticsearch-sink - **Description**: flume-ng 输出数据流到 Elasticsearch - **Primary Language**: Java - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 2 - **Forks**: 0 - **Created**: 2016-12-07 - **Last Updated**: 2024-06-18 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README ### flume-Elasticsearch-sink 主要功能: 1、执行 ES高版本 当前支持版本 flume-ng 1.6.0 Elasticsearch 5.0.1 2、执行ES-script修改数据 ### 支持 Elasticsearch 脚本事件执行 ### flume sink 配置说明: ``` s.sinks.s1.channel = c1 s.sinks.s1.type = com.eluup.flume.sink.elasticsearch.ElasticSearchSink #主类 s.sinks.s1.hostNames = 127.0.0.1:9300 #ES地址 s.sinks.s1.indexName = log #ES索引名 s.sinks.s1.indexType = access #ES-mapping s.sinks.s1.clusterName = club #ES-集群名 s.sinks.s1.batchSize = 500 s.sinks.s1.ttl = 5d s.sinks.s1.timeZone = Asia/Shanghai s.sinks.s1.client = script #flume链接ES的客户端对应key,对应类 ElasticSearchTransportScriptClient s.sinks.s1.serializer = com.eluup.flume.sink.elasticsearch.ElasticSearchScriptSeriali zer #client对应的 event格式解释类(必须是这个配置) ``` #### 消费的事件格式: ``` { "Update":"修改字段列表,使用^分割", "Append":"追加字段列表,使用^分割,对应字段的类型为 Array" "Incr":"自增字段列表,使用^分割,对应字段类型为 Long" "字段":"对应值", "字段":"对应值", "字段":"对应值" } ``` 说明:Update,Append,Incr 为关键字,不可被占用。 关键词对应的字段,存在ES中时,会被存储在最底层。而非关键词指定字段会存储在@message子集下。 底层有特殊字段[FAT] 也就是 flume add time,flume处理数据时间,可用作查看处理延时时间。 #### ES存储结构: ``` { "关键词对应字段":"对应值", "@timestamp": "2010-12-21T21:48:33.309258Z", "@tags": [ "array", "of", "tags" ], "@type": "string", "@source": "source of the event, usually a URL." "@source_host": "" "@source_path": "" "@fields":{ "user": "jordan", "command": "shutdown -r": } "other_field":"other_value" } ``` #### flume消费并写入到ES中,数据转换过程。例子 ##### 第一次通过flume发射到ES时 event数据为: ``` { "Update":"lastTime", "Append":"houseId^agnetId" "Incr":"view^hit" "view":2, "hit":1, "houseId":"390878", "agnetId":"12351", "lastTime":"2017-12-13T10:31:49", "city":"bj", "ip":"127.0.0.1" "ctime": "2017-12-13T10:31:49", } ``` ES存储数据为: ``` { "lastTime": "2017-12-13T10:31:49", "houseid": ["390878"], "agentid": ["12351"], "FAT": "2017-12-13T10:31:54", "view": 2, "hit":1, "@message": { "city":"bj", "ip":"127.0.0.1", "ctime": "2017-12-13T10:31:49" }, "@timestamp": "2017-12-13T10:31:55", "@fields": { "topic": "log", "key": "13747aba1ba7264c77f273f056809d7f", "timestamp": "1513132303959" } } ``` ##### 第二次通过flume发射到ES时 event数据为: ``` { "Update":"lastTime", "Append":"houseId^agnetId" "Incr":"view^hit" "view":2, "hit":1, "houseId":"64563", "agnetId":"2345676", "lastTime":"2017-12-13T10:32:21", "city":"bj", "ip":"127.0.0.1" "ctime": "2017-12-13T10:32:30", } ``` ES存储数据为: ``` { "lastTime": "2017-12-13T10:32:21", #在Update会被更新 "houseid": ["390878","64563"], #在Append内会追加 "agentid": ["12351","2345676"], #在Append内会追加 "FAT": "2017-12-13T10:32:25", #flume处理数据时间 "view": 4, #在Incr内会自加,加自己对应的val "hit":2, #在Incr内会自加,加自己对应的val "@message": { #无变化 "city":"bj", "ip":"127.0.0.1", "ctime": "2017-12-13T10:31:49" }, "@timestamp": "2017-12-13T10:31:55", #无变化 "@fields": { #无变化 "topic": "log", "key": "13747aba1ba7264c77f273f056809d7f", "timestamp": "1513132303959" } } ```