企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
故障转移组逻辑处理器维护了一个发送 Event 失败的 sink 的列表,保证有一个 sink 是可用的来发送 Event。 <br/> 故障转移机制的工作原理是将故障 Sink 降级到一个池中,在池中为它们分配冷却期(超时时间),在重试之前随顺序故障而增加。 Sink 成功发送事件后,它将恢复到实时池。Sink 具有与之相关的优先级,数值越大,优先级越高。 如果在发送 Event 时 Sink 发生故障,会继续尝试下一个具有最高优先级的 Sink。例如,在优先级为 80 的 Sink 之前激活优先级为 100 的 Sink。如果未指定优先级,则根据配置中的顺序来选取。<br/> 要使用故障转移选择器,不仅要设置 Sink 组的选择器为 failover,还有为每一个 Sink 设置一个唯一的优先级数值。可以使用 maxpenalty 属性设置故障转移时间的上限(毫秒)。<br/> 示例: ```conf ######### Agent ############ a1.sources = r1 a1.channels = c1 a1.sinks = s1 s2 # Sink 组逻辑器 a1.sinkgroups = g1 ######### Sources ########## a1.sources.r1.type = netcat a1.sources.r1.bind = hadoop101 a1.sources.r1.port = 6666 ########## SinkGroups ############# # 将上面定义的sink加入到组中 a1.sinkgroups.g1.sinks = s1 s2 # failover为故障转移组件 a1.sinkgroups.g1.processor.type = failover # 组内 sink 的权重值,<sinkName>必须是当前组关联的 sink 之一。数值(绝对值)越高越早被激活 a1.sinkgroups.g1.processor.priority.s1 = 5 a1.sinkgroups.g1.processor.priority.s2 = 10 # 发生异常的 sink 最大故障转移时间(默认 30000 毫秒) a1.sinkgroups.g1.processor.maxpenalty = 10000 ############ Channel ########### a1.channels.c1.type = memory ########## Sinks ################ a1.sinks.s1.type = avro a1.sinks.s1.hostname = hadoop101 a1.sinks.s1.port = 7777 a1.sinks.s2.type = avro a1.sinks.s2.hostname = hadoop101 a1.sinks.s2.port = 8888 ########## 连接 ########### a1.sources.r1.channels = c1 a1.sinks.s1.channel = c1 a1.sinks.s2.channel = c1 ``` 上面配置了`s1`和`s2`两个Sink,如果其中一个Sink发生故障,则另一个Sink则开始工作。