# Storm Kinesis 集成
## Storm Kinesis Spout
提供的核心storm spout(喷口),用户从Amzon Kinesis Streams 中的流中消费数据。它存储可以在zookeeper中提交的序列号,并在重新启动后默认启动消息记录。下面是创建使用spout的示例拓扑的代码示例。下面说明配置spout(喷口)时使用的每个对象。理想情况下,spout(喷口)任务的数量应等于运动时间碎片的数量。但是,每个任务都可以从多个分片中读取。
```
public class KinesisSpoutTopology {
public static void main (String args[]) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
String topologyName = args[0];
RecordToTupleMapper recordToTupleMapper = new TestRecordToTupleMapper();
KinesisConnectionInfo kinesisConnectionInfo = new KinesisConnectionInfo(new CredentialsProviderChain(), new ClientConfiguration(), Regions.US_WEST_2,
1000);
ZkInfo zkInfo = new ZkInfo("localhost:2181", "/kinesisOffsets", 20000, 15000, 10000L, 3, 2000);
KinesisConfig kinesisConfig = new KinesisConfig(args[1], ShardIteratorType.TRIM_HORIZON,
recordToTupleMapper, new Date(), new ExponentialBackoffRetrier(), zkInfo, kinesisConnectionInfo, 10000L);
KinesisSpout kinesisSpout = new KinesisSpout(kinesisConfig);
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("spout", kinesisSpout, 3);
topologyBuilder.setBolt("bolt", new KinesisBoltTest(), 1).shuffleGrouping("spout");
Config topologyConfig = new Config();
topologyConfig.setDebug(true);
topologyConfig.setNumWorkers(3);
StormSubmitter.submitTopology(topologyName, topologyConfig, topologyBuilder.createTopology());
}
}
```
就像你可以看到的,在它的构造函数中,引出了一个KinesisConfig对象。KinesisConfig的构造函数需要8个对象,如下所描述。
#### `String` streamName
用于消费数据的kinesis时间流的名称
#### `ShardIteratorType` shardIteratorType
支持3种类型 - TRIM_HORIZON(beginning of shard), LATEST and AT_TIMESTAMP. 默认情况下,如果分片状态为this,则忽略此参数在zookeeper中寻找。所以它们将首次应用从拓扑开始。如果您想在拓扑的后续运行中使用任何这些拓扑,将需要清除用于存储序列号的zookeeper节点的状态。
#### `RecordToTupleMapper` recordToTupleMapper
一个 `RecordToTupleMapper` 接口的实现,该端口将会将kinesis记录转换为storm元组。它有两种方法。getOutputFields 告诉spout 要从getTuple方法发出的元组中存在的数据。如果getTuple返回null,记录将被确认。 `java Fields getOutputFields (); List<Object> getTuple (Record record);`
#### `Date` timestamp
与 AT_TIMESTAMP sharedIteratorType参数结合使用。这将使得spout(喷口)从那时开始提取记录。该kinesis时间使用的时间是与kinesis时间记录相关的服务器端时间。
#### `FailedMessageRetryHadnler` failedMessageRetryHandler
`FailedMessageRetryHandler` 接口的实现。 默认情况下,该模块提供支持指数退避重试的实现失败消息的机制。该实现有两个构造函数。默认值没有args 构造函数将配置在100毫秒的第一次重试随后在Math.pow(2,i-1)中退出,其中i是范围2中的重试次数到LONG.MAX_LONG。2表示指数函数的基数(秒)。另外一个构造函数以毫秒为单位进行重试间隔,作为第一个参数进行首次重试,以秒为单位的指数函数为第二个参数,重试作为第三个参数。这种接口的方法及其spout(喷口)的工作方式如下描述 `java boolean failed (KinesisMessageId messageId); KinesisMessageId getNextFailedMessageToRetry (); void failedMessageEmitted (KinesisMessageId messageId); void acked (KinesisMessageId messageId);` 将在每个发生故障的元组上调用失败的方法。如果计划重新尝试失败的消息,则返回true,否则返回false。
getNextFailedMessageToRetry 方法将被称为第一件事,每次一个spout(喷口)想要发生一个元组。它应该返回一个应该重试的消息,如果有,否则为空。请注意,在该时间段内没有任何重试信息的情况下,它可以返回null。但是,当该消息被调用失败方法时,它最终将返回它返回true的每个消息
如果spout(喷口)成功的设法从kinesis时获取记录并发送,failedMessageEmitted将被调用。否则,当getNextFailedMessageToRetry方法被再次调用的时候,该实现应该会返回相同的消息
一但失败的消息被重新发送并被spout(喷口)成功地确认,则会被呼叫。如果失败,spout(喷口)失败就会被再次呼叫。
#### `ZkInfo` zkInfo
封装的对象信息用来zookeeper的交互。这个构造函数使用zkUrl作为第一个参数,它是一个逗号分隔的字符串zk host和端口,zkNode作为第二个参数将用作存储提交序列号的根节点,会话超时以毫秒为单位,连接超时作为第四毫秒,提交间隔为提交序列号到zookeeper的毫秒数的五分之一,重试尝试作为zk客户端的第六个连接重试尝试,重试间隔以毫秒为单位的等待时间,然后再重试连接。
#### `KinesisConnectionInfo` kinesisConnectionInfo
使用kinesis客户端捕获连接到kinesis的参数的对象。它有一个构造函数来实现 `AWSCredentialsProvider`作为第一个参数。该模块提供了一个称为 `CredentialsProviderChain` 的实现,它允许spout(喷口)使用以下之一进行kinesis检测,这5个机制顺序按照以下顺序 - `EnvironmentVariableCredentialsProvider`, `SystemPropertiesCredentialsProvider`, `ClasspathPropertiesFileCredentialsProvider`, `InstanceProfileCredentialsProvider`, `ProfileCredentialsProvider`。它需要一个`ClientConfiguration` 对象作为配置。它需要一个 `ClientConfiguration` 对象作为配置运动的第二个参数客户端,`Regions` 作为第三个参数,设置客户端上要连接的区域,recordsLimit作为第四个参数,表示最大数量的记录Kinesis客户端将每个GetReocrds请求检索。这个限制应该根据记录的大小,运动时间来仔细选择吞吐量限制和风暴中每个元组延迟的拓扑。另外如果一个任务将从多个分片读取,那么这将影响选择权限认证。
#### `Long` maxUncommittedRecords
这表示每个任务允许的最大未提交序列号数。一旦达到这个数字,spout就不会从kinesis中获取任何新的记录。未提交的序列号被定义为尚未提交给zookeeper的任务的所有消息的总和。这与拓扑级别最大待处理消息不同。例如,如果此值设置为10,并且spout将序列号从1发送到10。序号1正在等待,2到10次被告知。在这种情况下,未提交的序列的数量为10,因为1到10范围内的序列号可以被提交到zk。但是,storm仍然可以在端口上调用下一个元组,因为只有一个等待消息。
### Maven dependencies
Aws sdk version that this was tested with is 1.10.77
```
<dependencies>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>
<version>${aws-java-sdk.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>${curator.version}</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
</dependency>
</dependencies>
```
## 将来的工作
处理kinesis中的碎片的合并或分裂,Trident喷口实施和指标
- Storm 基础
- 概念
- Scheduler(调度器)
- Configuration
- Guaranteeing Message Processing
- 守护进程容错
- 命令行客户端
- Storm UI REST API
- 理解 Storm Topology 的 Parallelism(并行度)
- FAQ
- Layers on Top of Storm
- Storm Trident
- Trident 教程
- Trident API 综述
- Trident State
- Trident Spouts
- Trident RAS API
- Storm SQL
- Storm SQL 集成
- Storm SQL 示例
- Storm SQL 语言参考
- Storm SQL 内部实现
- Flux
- Storm 安装和部署
- 设置Storm集群
- 本地模式
- 疑难解答
- 在生产集群上运行 Topology
- Maven
- 安全地运行 Apache Storm
- CGroup Enforcement
- Pacemaker
- 资源感知调度器 (Resource Aware Scheduler)
- 用于分析 Storm 的各种内部行为的 Metrics
- Windows 用户指南
- Storm 中级
- 序列化
- 常见 Topology 模式
- Clojure DSL
- 使用没有jvm的语言编辑storm
- Distributed RPC
- Transactional Topologies
- Hooks
- Storm Metrics
- Storm 状态管理
- Windowing Support in Core Storm
- Joining Streams in Storm Core
- Storm Distributed Cache API
- Storm 调试
- 动态日志级别设置
- Storm Logs
- 动态员工分析
- 拓扑事件检查器
- Storm 与外部系统, 以及其它库的集成
- Storm Kafka Integration
- Storm Kafka 集成(0.10.x+)
- Storm HBase Integration
- Storm HDFS Integration
- Storm Hive 集成
- Storm Solr 集成
- Storm Cassandra 集成
- Storm JDBC 集成
- Storm JMS 集成
- Storm Redis 集成
- Azue Event Hubs 集成
- Storm Elasticsearch 集成
- Storm MQTT(Message Queuing Telemetry Transport, 消息队列遥测传输) 集成
- Storm MongoDB 集成
- Storm OpenTSDB 集成
- Storm Kinesis 集成
- Storm Druid 集成
- Storm and Kestrel
- Container, Resource Management System Integration
- Storm 高级
- 针对 Storm 定义一个不是 JVM 的 DSL
- 多语言协议
- Storm 内部实现
- 翻译进度