# flume自定义interceptor和sink **Repository Path**: oneal/flumeZiDingYiinterceptor ## Basic Information - **Project Name**: flume自定义interceptor和sink - **Description**: 通过自定义Interceptor,正则提取事件中的内容,完成数据日志入库前的解析 - **Primary Language**: Java - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 1 - **Forks**: 0 - **Created**: 2017-12-19 - **Last Updated**: 2020-12-19 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # 本项目使用方法 ### 正则拦截器RegexpExtractInterceptor --- - 打包 部署jar到${FLUME_HOME}/lib下 - 配置文件 ```properties agent.sources = r1 agent.channels = c1 agent.sinks = s1 agent.sources.r1.type = avro agent.sources.r1.channels = c1 agent.sources.r1.bind = 127.0.0.1 agent.sources.r1.port = 4141 agent.sources.r1.interceptors = i1 agent.sources.r1.interceptors.i1.type = com.wen.RegexpExtractInterceptor$Builder # 正则提取的表达式, agent.sources.r1.interceptors.i1.regexp = ([0-9]+):([0-9]+):([\\w+]) # 定义输出顺序,数字从1开始,匹配括号的顺序(结果字段用\t分隔) agent.sources.r1.interceptors.i1.regexpOrder = 1 3 2 # 是否对结果执行URLDecoder agent.sources.r1.interceptors.i1.isDecoder = true ## 匹配组名的配置方法如下: # 正则提取的表达式, # agent.sources.r1.interceptors.i1.regexp = (?[0-9]+):(?[0-9]+):(?[\\w+]) # 匹配组名 # agent.sources.r1.interceptors.i1.regexpOrder = a b c # 正则组名匹配模式 # agent.sources.r1.interceptors.i1.mode = name agent.sinks.s1.channel = c1 agent.sinks.s1.type = file_roll agent.sinks.s1.sink.directory = /var/log/flume agent.sinks.s1.sink.batchSize = 1 agent.sinks.s1.sink.rollInterval = 0 ``` - 测试方法见本文最后 --- ### 事件写入到数据库 JDBCSink --- > ***注意:*** 每一条数据需要以固定分隔符分隔,且列数和sql绑定参数的数量一致。 - 打包 - [x] 把对应数据库的驱动jar包导入到${FLUME_HOME}/lib - [x] 本项目打jar包导入到${FLUME_HOME}/lib - 配置 ```properties agent.sinks.s1.type = com.wen.qfsink.JDBCSink agent.sinks.s1.driverName = com.mysql.jdbc.Driver agent.sinks.s1.jdbcUrl = jdbc:mysql://localhost:3306/database_name agent.sinks.s1.user = db_username agent.sinks.s1.password = db_password agent.sinks.s1.sql = replace into table_name (id,user_name) values (?,?) # 插入数据库的批次大小。默认100,改参数不要超过 channel的transactionCapacity值。 agent.sinks.s1.batchSize = 10 #数据内容分隔符,非必须参数,默认是制表符 agent.sinks.s1.splitStr = \t ``` ---- # flume自定义interceptor 通过自定义Interceptor,正则提取事件中的内容,完成数据日志入库前的解析。 ***注意:*** 以下说明不是本项目的代码为例的,只是简单说明如何自定义Interceptor(实现功能:在Event头部添加本机ip) ## 依赖引入 ```xml org.apache.flume flume-ng-core ${flume.core.version} ``` ## 需要实现的方法 ```java public interface Interceptor { void initialize(); Event intercept(Event event); List intercept(List list); void close(); static interface Builder extends Configurable { Interceptor build(); } } ``` ## 实现Interceptor ```java public class CustomHostInterceptor implements Interceptor { private String hostValue; private String hostHeader; public CustomHostInterceptor(String hostHeader){ this.hostHeader = hostHeader; } @Override public void initialize() { // At interceptor start up try { hostValue = InetAddress.getLocalHost().getHostName(); } catch (UnknownHostException e) { throw new FlumeException("Cannot get Hostname", e); } } @Override public Event intercept(Event event) { // This is the event's body String body = new String(event.getBody()); // These are the event's headers Map headers = event.getHeaders(); // Enrich header with hostname headers.put(hostHeader, hostValue); // Let the enriched event go return event; } @Override public List intercept(List events) { List interceptedEvents = new ArrayList(events.size()); for (Event event : events) { // Intercept any event Event interceptedEvent = intercept(event); interceptedEvents.add(interceptedEvent); } return interceptedEvents; } @Override public void close() { // At interceptor shutdown } public static class Builder implements Interceptor.Builder { private String hostHeader; @Override public void configure(Context context) { // Retrieve property from flume conf hostHeader = context.getString("hostHeader"); } @Override public Interceptor build() { return new CustomHostInterceptor(hostHeader); } } } ``` ## 打包 部署到flume - 方式一: maven 打包 > 这个方法包flume的包也打进去,需要自己配置pom.xml去除依赖包 ```bash mvn clean package ``` - 方式二:intellij idea打包。 Project Structure->Artifacts 把依赖包剔除,只选择编译输出结果打包。 Build -> Build Artifacts 把打出的jar复制到${flume_home}/lib目录下 ## 配置文件 ```properties agent.sources = r1 agent.channels = c1 agent.sinks = s1 agent.sources.r1.type = avro agent.sources.r1.channels = c1 agent.sources.r1.bind = 127.0.0.1 agent.sources.r1.port = 4141 agent.sources.r1.interceptors = i1 # 指定当前inteceptor的包路径+"$Builder" (实现Interceptor.Builder的类路径) agent.sources.r1.interceptors.i1.type = com.wen.CustomHostInterceptor$Builder # 下面是Inteceptor的输入参数,参数名类中获取的名字要一直。 agent.sources.r1.interceptors.i1.hostHeader = mykey # agent.sources.r1.interceptors.i1.regexp = ([0-9.,\\s]+) - - \\[(.*)\\] \"([A-Z]+) \\/(.*?)\\?(.*)\\s*HTTP.*\" ([0-9]+) .* \"(.*)\" \"(.*)\" \"([0-9.,\\s]+)\" # agent.sources.r1.interceptors.i1.regexpOrder = 1 2 4 5 6 9 # agent.sources.r1.interceptors.i1.isDecoder = true agent.sinks.s1.channel = c1 agent.sinks.s1.type = file_roll agent.sinks.s1.sink.directory = /var/log/flume agent.sinks.s1.sink.batchSize = 1 agent.sinks.s1.sink.rollInterval = 0 ``` ## 启动 -启动agent `flume-ng agent -n agent -c conf -f conf/test_interceptor.conf -Dflume.root.logger=INFO,console` - 启动测试客户端 `flume-ng avro-client -H localhost -p 4141` 随便输入一些数据测试