# Storm Cassandra 集成
### Apache Cassandra 的 Bolt API 实现
这个库提供了 Apache Cassandra 之上的核心 storm bolt . 提供简单的 DSL 来 map storm _Tuple_ 到 Cassandra Query Language _Statement_ (Cassandra 查询语言 _Statement_).
### Configuration (配置)
以下属性可能会传递给 storm 配置.
| **Property name(属性名称)** | **Description(描述)** | **Default(默认)** |
| --- | --- | --- |
| **cassandra.keyspace** | - | |
| **cassandra.nodes** | - | {"localhost"} |
| **cassandra.username** | - | - |
| **cassandra.password** | - | - |
| **cassandra.port** | - | 9092 |
| **cassandra.output.consistencyLevel** | - | ONE |
| **cassandra.batch.size.rows** | - | 100 |
| **cassandra.retryPolicy** | - | DefaultRetryPolicy |
| **cassandra.reconnectionPolicy.baseDelayMs** | - | 100 (ms) |
| **cassandra.reconnectionPolicy.maxDelayMs** | - | 60000 (ms) |
### CassandraWriterBolt
#### Static import
```
import static org.apache.storm.cassandra.DynamicStatementBuilder.*
```
#### Insert Query Builder (插入查询生成器)
##### Insert query including only the specified tuple fields (插入仅包含指定 tuple 字段的查询).
```
new CassandraWriterBolt(
async(
simpleQuery("INSERT INTO album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);")
.with(
fields("title", "year", "performer", "genre", "tracks")
)
)
);
```
##### Insert query including all tuple fields(插入包含所有 tuple 字段的查询).
```
new CassandraWriterBolt(
async(
simpleQuery("INSERT INTO album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);")
.with( all() )
)
);
```
##### Insert multiple queries from one input tuple (从一个 input tuple 插入多个查询).
```
new CassandraWriterBolt(
async(
simpleQuery("INSERT INTO titles_per_album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all())),
simpleQuery("INSERT INTO titles_per_performer (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all()))
)
);
```
##### Insert query using QueryBuilder(使用 QueryBuilder 插入查询)
```
new CassandraWriterBolt(
async(
simpleQuery("INSERT INTO album (title,year,perfomer,genre,tracks) VALUES (?, ?, ?, ?, ?);")
.with(all()))
)
)
```
##### Insert query with static bound query (使用 static bound query 插入 查询)
```
new CassandraWriterBolt(
async(
boundQuery("INSERT INTO album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);")
.bind(all());
)
);
```
##### Insert query with static bound query using named setters and aliases (使用 named setters 和 aliases 插入带有 static bound query 的查询)
```
new CassandraWriterBolt(
async(
boundQuery("INSERT INTO album (title,year,performer,genre,tracks) VALUES (:ti, :ye, :pe, :ge, :tr);")
.bind(
field("ti"),as("title"),
field("ye").as("year")),
field("pe").as("performer")),
field("ge").as("genre")),
field("tr").as("tracks"))
).byNamedSetters()
)
);
```
##### Insert query with bound statement load from storm configuration (从 storm 配置插入 bound statement load 的查询)
```
new CassandraWriterBolt(
boundQuery(named("insertIntoAlbum"))
.bind(all());
```
##### Insert query with bound statement load from tuple field (从 tuple 字段插入 bound statement load 的查询)
```
new CassandraWriterBolt(
boundQuery(namedByField("cql"))
.bind(all());
```
##### Insert query with batch statement (使用 batch 语句插入查询)
```
// Logged
new CassandraWriterBolt(loggedBatch(
simpleQuery("INSERT INTO titles_per_album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all())),
simpleQuery("INSERT INTO titles_per_performer (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all()))
)
);
// UnLogged
new CassandraWriterBolt(unLoggedBatch(
simpleQuery("INSERT INTO titles_per_album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all())),
simpleQuery("INSERT INTO titles_per_performer (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all()))
)
);
```
### 如何处理 query execution results (查询执行结果)
_ExecutionResultHandler_ 接口可用于自定义 execution result (执行结果)应如何处理.
```
public interface ExecutionResultHandler extends Serializable {
void onQueryValidationException(QueryValidationException e, OutputCollector collector, Tuple tuple);
void onReadTimeoutException(ReadTimeoutException e, OutputCollector collector, Tuple tuple);
void onWriteTimeoutException(WriteTimeoutException e, OutputCollector collector, Tuple tuple);
void onUnavailableException(UnavailableException e, OutputCollector collector, Tuple tuple);
void onQuerySuccess(OutputCollector collector, Tuple tuple);
}
```
默认情况下, CassandraBolt 将在所有的 Cassandra Exception 中 fails(失败)一个 tuple (请参阅 [BaseExecutionResultHandler](https://github.com/apache/storm/tree/master/external/storm-cassandra/blob/master/src/main/java/org/apache/storm/cassandra/BaseExecutionResultHandler.java)).
```
new CassandraWriterBolt(insertInto("album").values(with(all()).build())
.withResultHandler(new MyCustomResultHandler());
```
### Declare Output fields (声明输出字段)
CassandraBolt 可以声明 declare output fields / stream output fields(输出字段/流输出字段). 例如, 这可以用于在 error (错误)或者 chain queries (链式查询)上 remit (传递)一个 new tuple (新的元组).
```
new CassandraWriterBolt(insertInto("album").values(withFields(all()).build())
.withResultHandler(new EmitOnDriverExceptionResultHandler());
.withStreamOutputFields("stream_error", new Fields("message");
public static class EmitOnDriverExceptionResultHandler extends BaseExecutionResultHandler {
@Override
protected void onDriverException(DriverException e, OutputCollector collector, Tuple tuple) {
LOG.error("An error occurred while executing cassandra statement", e);
collector.emit("stream_error", new Values(e.getMessage()));
collector.ack(tuple);
}
}
```
### Murmur3FieldGrouping
[Murmur3StreamGrouping](https://github.com/apache/storm/tree/master/external/storm-cassandra/blob/master/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java) 可以用来优化 cassandra writes (cassandra 的写入). 根据 specified row partition keys (指定的行分区键), 该 stream 在 bolt 的 task 之间进行 partitioned (分区).
```
CassandraWriterBolt bolt = new CassandraWriterBolt(
insertInto("album")
.values(
with(fields("title", "year", "performer", "genre", "tracks")
).build());
builder.setBolt("BOLT_WRITER", bolt, 4)
.customGrouping("spout", new Murmur3StreamGrouping("title"))
```
### Trident API 支持
storm-cassandra 支持 用于将 data `inserting(插入)` Cassandra 的 Trident `state` API . `java CassandraState.Options options = new CassandraState.Options(new CassandraContext()); CQLStatementTupleMapper insertTemperatureValues = boundQuery( "INSERT INTO weather.temperature(weather_station_id, weather_station_name, event_time, temperature) VALUES(?, ?, ?, ?)") .bind(with(field("weather_station_id"), field("name").as("weather_station_name"), field("event_time").now(), field("temperature"))); options.withCQLStatementTupleMapper(insertTemperatureValues); CassandraStateFactory insertValuesStateFactory = new CassandraStateFactory(options); TridentState selectState = topology.newStaticState(selectWeatherStationStateFactory); stream = stream.stateQuery(selectState, new Fields("weather_station_id"), new CassandraQuery(), new Fields("name")); stream = stream.each(new Fields("name"), new PrintFunction(), new Fields("name_x")); stream.partitionPersist(insertValuesStateFactory, new Fields("weather_station_id", "name", "event_time", "temperature"), new CassandraStateUpdater(), new Fields());`
以下 `state` API 用于从 Cassandra `querying(查询)` 数据. `java CassandraState.Options options = new CassandraState.Options(new CassandraContext()); CQLStatementTupleMapper insertTemperatureValues = boundQuery("SELECT name FROM weather.station WHERE id = ?") .bind(with(field("weather_station_id").as("id"))); options.withCQLStatementTupleMapper(insertTemperatureValues); options.withCQLResultSetValuesMapper(new TridentResultSetValuesMapper(new Fields("name"))); CassandraStateFactory selectWeatherStationStateFactory = new CassandraStateFactory(options); CassandraStateFactory selectWeatherStationStateFactory = getSelectWeatherStationStateFactory(); TridentState selectState = topology.newStaticState(selectWeatherStationStateFactory); stream = stream.stateQuery(selectState, new Fields("weather_station_id"), new CassandraQuery(), new Fields("name"));`
- Storm 基础
- 概念
- Scheduler(调度器)
- Configuration
- Guaranteeing Message Processing
- 守护进程容错
- 命令行客户端
- Storm UI REST API
- 理解 Storm Topology 的 Parallelism(并行度)
- FAQ
- Layers on Top of Storm
- Storm Trident
- Trident 教程
- Trident API 综述
- Trident State
- Trident Spouts
- Trident RAS API
- Storm SQL
- Storm SQL 集成
- Storm SQL 示例
- Storm SQL 语言参考
- Storm SQL 内部实现
- Flux
- Storm 安装和部署
- 设置Storm集群
- 本地模式
- 疑难解答
- 在生产集群上运行 Topology
- Maven
- 安全地运行 Apache Storm
- CGroup Enforcement
- Pacemaker
- 资源感知调度器 (Resource Aware Scheduler)
- 用于分析 Storm 的各种内部行为的 Metrics
- Windows 用户指南
- Storm 中级
- 序列化
- 常见 Topology 模式
- Clojure DSL
- 使用没有jvm的语言编辑storm
- Distributed RPC
- Transactional Topologies
- Hooks
- Storm Metrics
- Storm 状态管理
- Windowing Support in Core Storm
- Joining Streams in Storm Core
- Storm Distributed Cache API
- Storm 调试
- 动态日志级别设置
- Storm Logs
- 动态员工分析
- 拓扑事件检查器
- Storm 与外部系统, 以及其它库的集成
- Storm Kafka Integration
- Storm Kafka 集成(0.10.x+)
- Storm HBase Integration
- Storm HDFS Integration
- Storm Hive 集成
- Storm Solr 集成
- Storm Cassandra 集成
- Storm JDBC 集成
- Storm JMS 集成
- Storm Redis 集成
- Azue Event Hubs 集成
- Storm Elasticsearch 集成
- Storm MQTT(Message Queuing Telemetry Transport, 消息队列遥测传输) 集成
- Storm MongoDB 集成
- Storm OpenTSDB 集成
- Storm Kinesis 集成
- Storm Druid 集成
- Storm and Kestrel
- Container, Resource Management System Integration
- Storm 高级
- 针对 Storm 定义一个不是 JVM 的 DSL
- 多语言协议
- Storm 内部实现
- 翻译进度