# aliyun-odps-fluentd-plugin **Repository Path**: mirrors_aliyun/aliyun-odps-fluentd-plugin ## Basic Information - **Project Name**: aliyun-odps-fluentd-plugin - **Description**: No description available - **Primary Language**: Unknown - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2020-08-08 - **Last Updated**: 2026-05-02 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # Aliyun ODPS Plugin for Fluentd ## 开始使用 --- ### 介绍 - 开放数据处理服务(Open Data Processing Service,简称ODPS)是阿里巴巴自主研发的海量数据处理平台。主要服务于批量结构化数据的存储和计算,可以提供海量数据仓库的解决方案以及针对大数据的分析建模服务。 - ODPS DataHub Service(DHS)是一个ODPS的内建服务,向用户提供实时数据的发布(Publish)和订阅(Subscribe)的功能。发布的数据会自动被写入ODPS表中。所以DHS也可以做为ODPS导入数据的一个入口。 - 本插件提供向odps表通过DataHub服务写入数据的能力,并具备按用户要求的格式自动创建分区的功能。 ### 环境要求 使用此插件,需要具备如下环境: 1. Ruby 2.1.0 或更新 2. Gem 2.4.5 或更新 3. Fluentd-0.10.49 或更新 (*[Home Page](http://www.fluentd.org/)*) 4. Protobuf-3.5.1 或更新(Ruby protobuf) 5. Ruby-devel ### 安装部署 安装部署Fluentd可以选择以下两种方式之一。 1. 一键安装包适用于第一次安装Ruby&Fluentd环境的用户或局域网用户,一键安装包包含了所需的Ruby环境以及Fluentd。目前一键安装包仅支持Linux环境。 2. 通过网络安装适用于对Ruby有了解的用户,需要提前确认Ruby版本,若低于2.1.0则需要升级或安装更高级的Ruby环境,然后通过RubyGem安装Fluentd。 注: * RubyGem源建议更改为https://ruby.taobao.org/ * 局域网环境安装可以通过本地安装Gem文件 ``` gem install --local fluent-plugin-aliyun-odps-0.1.2.gem ``` #### 安装方式一:一键安装包安装 1. 下载解压 [fluentd_package.tar.gz](http://gitlab.alibaba-inc.com/aliopensource/aliyun-odps-fluentd-plugin/blob/master/package/fluentd_package.tar.gz) 2. 可以修改install_agent.sh中$DIR为你想安装ruby的路径,默认会安装在当前路径下面 3. 执行如下命令,提示“Success”表示安装成功 ``` bash install_agent.sh ``` 4. fluentd程序会被安装在当前目录的bin目录下面 #### 安装方式二:通过网络安装 1. Ruby安装(已经存在Ruby 2.1.0以上环境可忽略此步骤): ``` wget https://cache.ruby-lang.org/pub/ruby/2.3/ruby-2.3.0.tar.gz tar xzvf ruby-2.3.0.tar.gz cd ruby-2.3.0 ./configure --prefix=DIR make make install ``` 2 Fluentd以及插件安装 ``` $ gem install fluent-plugin-aliyun-odps ``` ### 插件使用示例 #### 示例一 上传csv文件中的数据 1. 首先需要在odps准备一张表,在这里假设表名为 students, 包含三个字段 id, name, score, 类型分别为string, string, bigint 2. 准备csv数据文件, 假设数据文件内容如下 ``` 1, jack ma, 90 2, pony zhang, 85 3, lucy wang, 88 ``` 3. 准备fluentd配置文件, 保存以下内容为文件fluentd.conf。 ``` type tail path /path/to/students.csv tag input.csv format csv type aliyun_odps aliyun_access_id ************ aliyun_access_key ********* aliyun_odps_endpoint http://service.odps.aliyun.com/api aliyun_odps_hub_endpoint http://dh.odps.aliyun.com buffer_chunk_limit 2m buffer_queue_limit 128 flush_interval 5s project your_projectName #填写需要导入数据的project名称 enable_fast_crc true table students fields id,name,score shard_number 1 retry_time 3 retry_interval 1 abandon_mode true
``` 4. 执行fluentd命令,并用-c指定配置文件 ``` fluentd -c fluentd.conf ``` 5. 完成后可用如下sql命令查询数据 ``` select * from students; ``` #### 示例二 抓取上传实时nginx日志文件 1. 对于nginx日志文件,fluentd可用采用正则表达式的方式来解析数据。 2. 参考使用如下配置文件,执行命令同示例一。 ``` type tail path /home/admin/nginx/logs/access.log #nginx log 地址 pos_file /tmp/nginx.access.pos refresh_interval 5s tag nginx.access format /^(?[^ ]*) - \[(?
[^\]]*)\] "(?\S+) ((?[^\"]*) +\S*)?" (?[^ ]*) (?[^ ]*) "(?[^\"]*)" "(?[^\"]*)"? $/ #解析日志的正则表达式 time_format %d/%b/%Y:%H:%M:%S %z type aliyun_odps aliyun_access_id ************ aliyun_access_key ********* aliyun_odps_endpoint http://service.odps.aliyun.com/api aliyun_odps_hub_endpoint http://dh.odps.aliyun.com buffer_chunk_limit 2m buffer_queue_limit 128 flush_interval 5s project your_projectName enable_fast_crc true data_encoding UTF-8 table nginx_logs #对应日志写入的odps表 fields remote,method,path,code,size,agent,requesttime shard_number 5 partition ctime=${datetime.strftime('%Y%m%d')} time_format %d/%b/%Y:%H:%M:%S %z shard_number 1 retry_time 3 retry_interval 1 abandon_mode true
``` #### 示例三导入MySQL中的数据 1. mysql ### 参数说明 - type(Fixed): 固定值 aliyun_odps. - aliyun_access_id(Required):阿里云access_id. - aliyun_access_key(Required):阿里云access key. - aliyun_odps_hub_endpoint(Required):如果你的服务部署在ECS上,请把本值设定为 http://dh-ext.odps.aliyun-inc.com, 否则设置为 http://dh.odps.aliyun.com. - aliyunodps_endpoint(Required):如果你的服务部署在ECS上,请把本值设定为 http://odps-ext.aiyun-inc.com/api, 否则设置为 http://service.odps.aliyun.com/api . - buffer_chunk_limit(Optional): 块大小,支持“k”(KB),“m”(MB)单位,默认 8MB,建议值2MB, 目前最大支持20MB. - buffer_queue_limit(Optional): 块队列大小,此值与buffer_chunk_limit共同决定整个缓冲区大小。 - flush_interval(Optional): 强制发送间隔,达到时间后块数据未满则强制发送, 默认 60s. - abandon_mode(Optional):内置重试三次后抛弃该pack数据。 - project(Required): project名称. - table(Required): table名称. - fields(Required): 与source对应,字段名必须存在于source之中. - partition(Optional):若为分区表,则设置此项. - 分区名支持的设置模式: - 固定值: partition ctime=20150804 - 关键字: partition ctime=${remote} (其中remote为source中某字段) - 时间格式关键字: partition ctime=${datetime.strftime('%Y%m%d')} (其中datetime为source中某时间格式字段,输出为%Y%m%d格式作为分区名称) - time_format(Optional): - 如果使用时间格式关键字为, 请设置本参数. 例如: source[datetime]="29/Aug/2015:11:10:16 +0800",则设置为"%d/%b/%Y:%H:%M:%S %z" - shard_number(Optional):指定shard数量,将会随机向shard[0,shard_number-1]范围内的shard中写入数据,必须为大于0且小于table对应shard数量上限的整数. - enable_fast_crc(Optional): 使用快速crc计算,这将极大提升性能,但是由于使用了外部加载的动态链接库,目前仅支持64位linux、windows系统. - retry_time(Optional): 发送每个pack数据时内置重试次数,默认3次. - retry_interval(Optional): 重试间隔,默认1s. - abandon_mode(Optional): 默认为false,设置成true会在重试retry_time后抛弃该数据包,否则会将异常抛送给fluentd,利用fluentd的重试机制重试,这种情况可能会导致数据重复. - data_encoding(Optional): 默认使用源数据标示的encode方式,根据string.encoding获取,如果需要指定源数据编码方式,请设置该值,支持的类型:"US-ASCII", "ASCII-8BIT", "UTF-8", "ISO-8859-1", "Shift_JIS", "EUC-JP", "Windows-31J", "BINARY", "CP932", "eucJP" ## 常见使用问题以及异常描述 --- * 程序抛出异常InvalidShardId/ShardNotReady是什么原因导致? - 可能系统正在升级,会短暂出现这个问题,会在短时间内恢复; - fluentd如果存在多个进程请查看配置项shard_num是否都配置成了一样的值(或都不配置),如果配置不一样是会导致这个问题的,shard_number少的进程会把多余shard Unload掉; - 可能存在另外的使用sdk等方式进行了loadshard/unloadshard等操作。 * enable_fast_crc如何检查是否兼容? - 开启此配置后再启动fluentd进程,启动时会验证,如果失败会抛出错误原因(reload不会进行验证),或进入插件目录后利用ldd查看aliyun-odps-fluentd-plugin/lib/fluent/plugin/crc/lib/linux/crc32c.so。 * retry_time/retry_interval与fluentd自带的retry有何区别? - fluentd自带retry默认持续36小时,会将整个buffer_chunk重发,配置动态partition情况下重发全部数据可能造成数据重复。配置这两项就会使用插件内部重试,如果重试失败,会再根据abandon_mode的值判定放弃该pack的数据还是交给fluentd重发整个buffer。 * Warning:ErrorCode: NoSuchPartition, Message: write failed because The specified partition does not exist.是什么意思? - 本插件会再catch到Odps的NoSuchPartition时会主动创建分区,如果遇到这个warn表示Odps表中不存在数据对应分区,会自动创建,如果创建成功会有信息提示。 * Fluent::BufferQueueLimitError error="queue size exceeds limit"是什么原因? - fluentd在读取数据-发送数据过程中,会先读取到一个buffer中,具体大小根据配置中buffer_chunk_limit与buffer_queue_limit共同决定,如果遇到这个错误,很可能是因为堆积数据导致buffer不足,可以尝试增大buffer_queue_limit解决这个问题。 * 多个config文件如何分别启动一个fluentd进程? - 如果存在多个config文件,可以使用in_multiprocess这个插件同时启动不同的进程来服务。 * partition has no corresponding source key or the partition expression is wrong.这个异常是什么原因? - 这个异常表示在source data中找不到配置在partition字段中的值,例如partition ctime=${remote},而remote没有出现在source中,请检查配置。 * Failed to format the data.这个异常是什么原因? - 这个错误信息抛出代表解析partition过程出现问题,请检查partition配置,如果数据中存在脏数据也可能遇到这个问题。 * 如何更改为淘宝源RubyGem? - RubyGems 镜像[https://ruby.taobao.org/] * 导入数据抛异常"\xE8" from ASCII-8BIT to UTF-8 (Encoding::UndefinedConversionError) - 该错误往往由于source插件在读取数据时,数据真实编码为utf-8,但是transport给fluend的string.encoding却为ASCII-8BIT导致类似错误,这种情况需要设置data_encoding来进行转码。 ## 官方网站 - [Fluentd User Guide](http://docs.fluentd.org/) ## 作者 - [Sun Zongtao]() - [Cai Ying]() - [Dong Xiao](https://github.com/dongxiao1198) - [Yang Hongbo](https://github.com/hongbosoftware) ## License licensed under the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0.html)