!serializers – Space-separated list of serializers for mapping matches to header names and serializing their values. (See example below) Flume provides built-in support for the following serializers: org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer
serializers..type default Must be default (org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer), org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer, or the FQCN of a custom class that implements org.apache.flume.interceptor.RegexExtractorInterceptorSerializer
sinks – Space-separated list of sinks that are participating in the group
processor.type default The component type name, needs to be failover
processor.priority. – Priority value. must be one of the sink instances associated with the current sink group A higher priority value Sink gets activated earlier. A larger absolute value indicates higher priority
processor.maxpenalty 30000 The maximum backoff period for the failed Sink (in millis)
修改log4j.properties文件中的:# Define the root logger to the system property plsql.root.logger.log4j.rootLogger=./sql.log# Logging Threshold to INFO for queryserver. root logger still at WARN for sqlline cl...
一、Selector(选择器)
Selector概述
Selector(选择器)可以工作在复制或多路复用(路由) 模式下 。
复制模式
Selector复制模式-属性说明
selector.type replicating 类型名称,默认是 replicating
selector.optional – 标志通道为可选
Selector复制模式-案例
一到多(一个输入多个输出),即扇出流 。
多路复用(路由)模式
Selector多路复用(路由)模式-属性说明
selector.type 类型,必须是"multiplexing"
selector.header 指定要监测的头的名称
selector.default –
selector.mapping.* –
举例:
Selector多路复用(路由)模式-案例
补充知识:扇入流
m3:
编写配置文件:
m1、m2:
编写配置文件:
启动flume:
./flume-ng agent --conf ../conf --conf-file ../conf/template9.conf --name a1 -Dflume.root.logger=INFO,console
m1通过curl发送一条http请求,由于默认使用的是jsonHandler,数据格式必须是指定的json格式:
[root@localhost conf]# curl -XPOST -d '[{ "headers" :{"flag" : "c"},"body" : "idoall.org_body"}]' http://0.0.0.0:8888
m2通过curl发送一条http请求,由于默认使用的是jsonHandler,数据格式必须是指定的json格式:
[root@localhost conf]# curl -XPOST -d '[{ "headers" :{"flag" : "c"},"body" : "idoall.org_body"}]' http://0.0.0.0:8888
发现m3均能正确收到消息。
二、Interceptors(拦截器)
Interceptors概述
Flume有能力在运行阶段修改/删除Event,这是通过拦截器(Interceptors)来实现的。
拦截器需要实现org.apache.flume.interceptor.Interceptor接口。
拦截器可以修改或删除事件基于开发者在选择器中选择的任何条件。
拦截器采用了责任链模式,多个拦截器可以按指定顺序拦截。
一个拦截器返回的事件列表被传递给链中的下一个拦截器。
如果一个拦截器需要删除事件,它只需要在返回的事件集中不包含要删除的事件即可。
如果要删除所有事件,只需返回一个空列表。
1、Timestamp Interceptor
Timestamp Interceptor概述
这个拦截器在事件头中插入以毫秒为单位的当前处理时间。头的名字为timestamp,值为当前处理的时间戳。如果在之前已经有这个时间戳,则保留原有的时间戳。
Timestamp Interceptor属性说明
!interceptors.type – 类型名称,必须是timestamp或自定义类的全路径名
preserveExisting false 如果时间戳已经存在是否保留
案例
2、Host Interceptor
Host Interceptor概述
这个拦截器插入当前处理Agent的主机名或ip,头的名字为host或配置的名称,值是主机名或ip地址,基于配置。
Host Interceptor属性说明
!type – 类型名称,必须是host
preserveExisting false 如果主机名已经存在是否保留
useIP true 如果配置为true则用IP,配置为false则用主机名
hostHeader host 加入头时使用的名称
案例
3、Static Interceptor
Static Interceptor概述
此拦截器允许用户增加静态头信息使用静态的值到所有事件。目前的实现中不允许一次指定多个头。如果需要增加多个静态头可以指定多个Static interceptors。
Static Interceptor属性说明
!type – 类型,必须是static
preserveExisting true 如果配置头已经存在是否应该保留
key key 要增加的头名
value value 要增加的头值
案例
在Event的headers中有多了country=China信息
4、UUID Interceptor
UUID Interceptor概述
这个拦截器在所有事件头中增加一个全局一致性标志,其实就是UUID。
UUID Interceptor属性说明
!type – 类型名称,必须是org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
headerName id 头名称
preserveExisting true 如果头已经存在,是否保留
prefix “” 在UUID前拼接的字符串前缀
案例
作用:标识一条日志
5、Search and Replace Interceptor
Search and Replace Interceptor概述
这个拦截器提供了简单的基于字符串的正则搜索和替换功能。可以修改body部分的内容
Search and Replace Interceptor属性说明
type – 类型名称,必须是"search_replace"
searchPattern – 要搜索和替换的正则表达式
replaceString – 要替换为的字符串
charset UTF-8 字符集编码,默认utf-8
案例
6、Regex Filtering Interceptor
Regex Filtering Interceptor概述
此拦截器通过解析事件体去匹配给定正则表达式来筛选事件,所提供的正则表达式即可以用来包含或刨除事件。
Regex Filtering Interceptor属性说明
!type – 类型,必须设定为regex_filter
regex ”.*” 所要匹配的正则表达式
excludeEvents false 如果是true则排除匹配的事件,false则包含匹配的事件。
案例
7、Regex Extractor Interceptor
Regex Extractor Interceptor概述
使用指定正则表达式匹配事件,并将匹配到的组作为头加入到事件中,它也支持插件化的序列化器用来格式化匹配到的组在加入他们作为头之前。
Regex Extractor Interceptor属性说明
!type – 类型,必须是regex_extractor
!regex – 要匹配的正则表达式
!serializers – Space-separated list of serializers for mapping matches to header names and serializing their values. (See example below) Flume provides built-in support for the following serializers: org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer
serializers..type default Must be default (org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer), org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer, or the FQCN of a custom class that implements org.apache.flume.interceptor.RegexExtractorInterceptorSerializer
serializers..name –
serializers.* – Serializer-specific properties
三、Processor(处理器)
Processor概述
Sink Group允许用户将多个Sink组合成一个实体。
Flume Sink Processor 可以通过切换组内Sink用来实现负载均衡的效果,或在一个Sink故障时切换到另一个Sink。
sinks – 用空格分隔的Sink集合
processor.type default 类型名称,必须是 default、failover 或 load_balance
1、Default Sink Processor
Default Sink Processor 只接受一个 Sink。不要求用户为单一Sink创建processor
2、Failover Sink Processor
Failover Sink Processor 维护一个sink们的优先表。确保只要一个是可用的事件就可以被处理。
失败处理原理是,为失效的sink指定一个冷却时间,在冷却时间到达后再重新使用。
sink们可以被配置一个优先级,数字越大优先级越高。
如果sink发送事件失败,则下一个最高优先级的sink将会尝试接着发送事件。
如果没有指定优先级,则优先级顺序取决于sink们的配置顺序,先配置的默认优先级高于后配置的。
在配置的过程中,设置一个group processor ,并且为每个sink都指定一个优先级。
优先级必须是唯一的。
另外可以设置maxpenalty属性指定限定失败时间。
属性说明
sinks – Space-separated list of sinks that are participating in the group
processor.type default The component type name, needs to be failover
processor.priority. – Priority value. must be one of the sink instances associated with the current sink group A higher priority value Sink gets activated earlier. A larger absolute value indicates higher priority
processor.maxpenalty 30000 The maximum backoff period for the failed Sink (in millis)
案例
h1配置文件
h2、h3配置文件
h1发送数据
curl -XPOST -d '[{ "headers" :{"flag" : "c"},"body" : "idoall.org_body"}]' http://0.0.0.0:44444
3、Load balancing Sink Processor
Load balancing Sink processor 提供了在多个sink之间实现负载均衡的能力。
它维护了一个活动sink的索引列表。
它支持轮询或随机方式的负载均衡,默认值是轮询方式,可以通过配置指定。
也可以通过实现AbstractSinkSelector接口实现自定义的选择机制。
属性说明
!processor.sinks – Space-separated list of sinks that are participating in the group
!processor.type default The component type name, needs to be load_balance
processor.backoff false Should failed sinks be backed off exponentially.
processor.selector round_robin Selection mechanism. Must be either round_robin, random or FQCN of custom class that inherits from AbstractSinkSelector
processor.selector.maxTimeOut 30000 Used by backoff selectors to limit exponential backoff (in milliseconds)
案例
h1配置文件
h2、h3配置文件
h1发送数据
curl -XPOST -d '[{ "headers" :{"flag" : "c"},"body" : "idoall.org_body"}]' http://0.0.0.0:44444
1、复制选择器: 一个 Source 以复制的方式将一个 Event 同时写入到多个 Channel 中,不同的 Sink 可以从不同的 Channel 中获取相同的 Event,比如一份日志数据同时写 Kafka 和 HDFS,一个 Event 同时写入两个 Channel,然后不同类型的 Sink 发送到不同的外部存储。该选择器复制每个事件到通过Source的channels参数所指定的所有的Channels中。复制Channel选择器还有一个可选参数optional,该参数是空格分隔的channel名字列表。此参数指定的所有channel都认为是可选的,所以如果事件写入这些channel时,若有失败发生,会忽略。而写入其他channel失败时会抛出异常。
2、(多路)复用选择器: 需要和拦截器配合使用,根据 Event 的头信息中不同键值数据来判断 Event 应该写入哪个 Channel 中。
还有一种是kafka channel,它是没有sink;
3. 自定义选择器
Sink
数据去向常见的目的地有:HDFS、Kafka、logger(记录INFO级别的日志)、avro(下一层的Flume)、File、Hbase、solr、ipc、thrift自定义等
Sink不断地轮询Channel中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个Flume Agent。
Sink是完全事务性的。在从Channel批量删除数据之前,每个Sink用Channel启动一个事务。批量事件一旦成功写出到存储系统或下一个Flume Agent,Sink就利用Channel提交事务。事务一旦被提交,该Channel从自己的内部缓冲区删除事件。
Sink groups允许组织多个sink到一个实体上。 Sink processors(处理器)能够提供在组内所有Sink之间实现负载均衡的能力,而且在失败的情况下能够进行故障转移从一个Sink到另一个Sink。简单的说就是一个source 对应一个Sinkgroups,即多个sink,这里实际上复用/复制情况差不多,只是这里考虑的是可靠性与性能,即故障转移与负载均衡的设置。
DefaultSink Processor 接收单一的Sink,不强制用户为Sink创建Processor
FailoverSink Processor故障转移处理器会通过配置维护了一个优先级列表。保证每一个有效的事件都会被处理。
工作原理是将连续失败sink分配到一个池中,在那里被分配一个冷冻期,在这个冷冻期里,这个sink不会做任何事。一旦sink成功发送一个event,sink将被还原到live 池中。
Load balancing Processor负载均衡处理器提供在多个Sink之间负载平衡的能力。实现支持通过① round_robin(轮询)或者② random(随机)参数来实现负载分发,默认情况下使用round_robin
相关问题推荐
数据库设计的根本目标是要解决A)数据共享问题 B)数据安全问题C)大量数据存储问题 D)简化数据维护一般书上很少这么提,大家在答这个题的时候,可以从数据库的概念上入手,数据库设计的根本目标是要解决应该是共享问题。四个答案可以做一些比较,最好的答案...
bin/flume-ng agent \ --conf ./conf/ \ -f conf/file_memory_avro.conf \ -Dflume.root.logger=DEBUG,console \ -n agent1
概述本篇文章是根据Flume官网对Flume组件(Source,Channel,Sink)的常用配置参数做一个主要介绍,如有表达意思错误希望不吝指出。SourcesFlume中常用的Source有NetCat,Avro,Exec,Spooling Directory,Taildir,也可以根据业务场景的需要自定义Source,具体...
一、什么是flumeFlume是一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。二、flume特点flume的数据流...
flume和kafka的侧重点不同,flume追求的是数据和数据源、数据流向的多样性,适合多个生产者的场景;flume有自己内置的多种source和sink组件,具体操作方式是编写source、channel和sink的.conf配置文件,开启flume组件的时候用命令关联读取配置文件实现kafka追...
Flume的运行原理是:Flume的核心角色为Agent,Flume分布式系统常常是由很多的Agent连接而形成的。Agent内部有三个组件,一是Source采集源,用于跟数据源对接,获取数据。二是Channel通道,Agent内部的数据传输通道,用于从source将数据传递到Sink。三是Sink目...
Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。...
想考计算机相关证书或者想学技术的同学可以抓紧时间啦!双11更多优惠活动详情咨询18597153017(微信同号)
大数据开发:1、负责公司大数据产品/项目的后台研发;2、负责技术预研,产品设计以及文档编写等工作;3、参与大数据的数据治理和数据处理相关java开发工作;4、参与海量数据处理,业务数据体系的设计、数据统计、分析及数据建模大数据开发要处理大规模的数据...
修改log4j.properties文件中的:# Define the root logger to the system property plsql.root.logger.log4j.rootLogger=./sql.log# Logging Threshold to INFO for queryserver. root logger still at WARN for sqlline cl...
如果没找到解决方法,最好的方法就是重装
杀毒软件关了
1、上传jar包到/opt/software/解压到/opt/module 改名为phoenix2、 复制server和client这俩个包拷贝到各个节点的hbase/lib在phoenix目录下3、向每个节点发送server jar4、向每个节点发送client jar5、在root权限下给/etc/profile 下添加如下内容6、启动Zookee...
安装和配置Phoenix把下载好的安装包放入software目录下解压Phoenix到opt目录下[root@hadoop100 software]# tar -zxvf apache-phoenix-4.14.0-cdh5.14.2-bin.tar.gz -C /opt进入Phoenix的bin目录下把phoenix-4.14.0-cdh5.14.2-server.jar包复制到hbase目...
phoenix是构建的Hbase之上的,使用标准的SQL操作Hbase,可以做联机事务处理,拥有低延迟的特性。phoenix会把SQL编译成一系列的Hbase的scan操作,然后把scan结果生成标准的JDBC结果集,其底层由于使用了Hbase的API,协处理器,过滤器,处理千万级行的数据也只...
ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。...