企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
# Storm Kinesis 集成 ## Storm Kinesis Spout 提供的核心storm spout(喷口),用户从Amzon Kinesis Streams 中的流中消费数据。它存储可以在zookeeper中提交的序列号,并在重新启动后默认启动消息记录。下面是创建使用spout的示例拓扑的代码示例。下面说明配置spout(喷口)时使用的每个对象。理想情况下,spout(喷口)任务的数量应等于运动时间碎片的数量。但是,每个任务都可以从多个分片中读取。 ``` public class KinesisSpoutTopology { public static void main (String args[]) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException { String topologyName = args[0]; RecordToTupleMapper recordToTupleMapper = new TestRecordToTupleMapper(); KinesisConnectionInfo kinesisConnectionInfo = new KinesisConnectionInfo(new CredentialsProviderChain(), new ClientConfiguration(), Regions.US_WEST_2, 1000); ZkInfo zkInfo = new ZkInfo("localhost:2181", "/kinesisOffsets", 20000, 15000, 10000L, 3, 2000); KinesisConfig kinesisConfig = new KinesisConfig(args[1], ShardIteratorType.TRIM_HORIZON, recordToTupleMapper, new Date(), new ExponentialBackoffRetrier(), zkInfo, kinesisConnectionInfo, 10000L); KinesisSpout kinesisSpout = new KinesisSpout(kinesisConfig); TopologyBuilder topologyBuilder = new TopologyBuilder(); topologyBuilder.setSpout("spout", kinesisSpout, 3); topologyBuilder.setBolt("bolt", new KinesisBoltTest(), 1).shuffleGrouping("spout"); Config topologyConfig = new Config(); topologyConfig.setDebug(true); topologyConfig.setNumWorkers(3); StormSubmitter.submitTopology(topologyName, topologyConfig, topologyBuilder.createTopology()); } } ``` 就像你可以看到的,在它的构造函数中,引出了一个KinesisConfig对象。KinesisConfig的构造函数需要8个对象,如下所描述。 #### `String` streamName 用于消费数据的kinesis时间流的名称 #### `ShardIteratorType` shardIteratorType 支持3种类型 - TRIM_HORIZON(beginning of shard), LATEST and AT_TIMESTAMP. 默认情况下,如果分片状态为this,则忽略此参数在zookeeper中寻找。所以它们将首次应用从拓扑开始。如果您想在拓扑的后续运行中使用任何这些拓扑,将需要清除用于存储序列号的zookeeper节点的状态。 #### `RecordToTupleMapper` recordToTupleMapper 一个 `RecordToTupleMapper` 接口的实现,该端口将会将kinesis记录转换为storm元组。它有两种方法。getOutputFields 告诉spout 要从getTuple方法发出的元组中存在的数据。如果getTuple返回null,记录将被确认。 `java Fields getOutputFields (); List&lt;Object&gt; getTuple (Record record);` #### `Date` timestamp 与 AT_TIMESTAMP sharedIteratorType参数结合使用。这将使得spout(喷口)从那时开始提取记录。该kinesis时间使用的时间是与kinesis时间记录相关的服务器端时间。 #### `FailedMessageRetryHadnler` failedMessageRetryHandler `FailedMessageRetryHandler` 接口的实现。 默认情况下,该模块提供支持指数退避重试的实现失败消息的机制。该实现有两个构造函数。默认值没有args 构造函数将配置在100毫秒的第一次重试随后在Math.pow(2,i-1)中退出,其中i是范围2中的重试次数到LONG.MAX_LONG。2表示指数函数的基数(秒)。另外一个构造函数以毫秒为单位进行重试间隔,作为第一个参数进行首次重试,以秒为单位的指数函数为第二个参数,重试作为第三个参数。这种接口的方法及其spout(喷口)的工作方式如下描述 `java boolean failed (KinesisMessageId messageId); KinesisMessageId getNextFailedMessageToRetry (); void failedMessageEmitted (KinesisMessageId messageId); void acked (KinesisMessageId messageId);` 将在每个发生故障的元组上调用失败的方法。如果计划重新尝试失败的消息,则返回true,否则返回false。 getNextFailedMessageToRetry 方法将被称为第一件事,每次一个spout(喷口)想要发生一个元组。它应该返回一个应该重试的消息,如果有,否则为空。请注意,在该时间段内没有任何重试信息的情况下,它可以返回null。但是,当该消息被调用失败方法时,它最终将返回它返回true的每个消息 如果spout(喷口)成功的设法从kinesis时获取记录并发送,failedMessageEmitted将被调用。否则,当getNextFailedMessageToRetry方法被再次调用的时候,该实现应该会返回相同的消息 一但失败的消息被重新发送并被spout(喷口)成功地确认,则会被呼叫。如果失败,spout(喷口)失败就会被再次呼叫。 #### `ZkInfo` zkInfo 封装的对象信息用来zookeeper的交互。这个构造函数使用zkUrl作为第一个参数,它是一个逗号分隔的字符串zk host和端口,zkNode作为第二个参数将用作存储提交序列号的根节点,会话超时以毫秒为单位,连接超时作为第四毫秒,提交间隔为提交序列号到zookeeper的毫秒数的五分之一,重试尝试作为zk客户端的第六个连接重试尝试,重试间隔以毫秒为单位的等待时间,然后再重试连接。 #### `KinesisConnectionInfo` kinesisConnectionInfo 使用kinesis客户端捕获连接到kinesis的参数的对象。它有一个构造函数来实现 `AWSCredentialsProvider`作为第一个参数。该模块提供了一个称为 `CredentialsProviderChain` 的实现,它允许spout(喷口)使用以下之一进行kinesis检测,这5个机制顺序按照以下顺序 - `EnvironmentVariableCredentialsProvider`, `SystemPropertiesCredentialsProvider`, `ClasspathPropertiesFileCredentialsProvider`, `InstanceProfileCredentialsProvider`, `ProfileCredentialsProvider`。它需要一个`ClientConfiguration` 对象作为配置。它需要一个 `ClientConfiguration` 对象作为配置运动的第二个参数客户端,`Regions` 作为第三个参数,设置客户端上要连接的区域,recordsLimit作为第四个参数,表示最大数量的记录Kinesis客户端将每个GetReocrds请求检索。这个限制应该根据记录的大小,运动时间来仔细选择吞吐量限制和风暴中每个元组延迟的拓扑。另外如果一个任务将从多个分片读取,那么这将影响选择权限认证。 #### `Long` maxUncommittedRecords 这表示每个任务允许的最大未提交序列号数。一旦达到这个数字,spout就不会从kinesis中获取任何新的记录。未提交的序列号被定义为尚未提交给zookeeper的任务的所有消息的总和。这与拓扑级别最大待处理消息不同。例如,如果此值设置为10,并且spout将序列号从1发送到10。序号1正在等待,2到10次被告知。在这种情况下,未提交的序列的数量为10,因为1到10范围内的序列号可以被提交到zk。但是,storm仍然可以在端口上调用下一个元组,因为只有一个等待消息。 ### Maven dependencies Aws sdk version that this was tested with is 1.10.77 ``` <dependencies> <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk</artifactId> <version>${aws-java-sdk.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>${project.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>${curator.version}</version> <exclusions> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>com.googlecode.json-simple</groupId> <artifactId>json-simple</artifactId> </dependency> </dependencies> ``` ## 将来的工作 处理kinesis中的碎片的合并或分裂,Trident喷口实施和指标