# Storm Druid 集成
## Storm Druid Bolt 和 TridentState
该模块提供了将数据写入[Druid](http://druid.io/) 数据存储的核心Strom和Trident bolt(螺栓)的实现。 该实现使用Druid's的[Tranquility库](https://github.com/druid-io/tranquility)向druid发送消息。
一些实施细节从现有的借用 [Tranquility Storm Bolt](https://github.com/druid-io/tranquility/blob/master/docs/storm.md). 这个新的Bolt(螺栓)增加了支持最新的storm释放,并保持在storm回购的bolt(螺栓)。
### Core Bolt
下面的例子描述了使用 `org.apache.storm.druid.bolt.DruidBeamBolt`的核心bolt(螺栓)默认情况下,该bolt(螺栓)希望收到元组,其中"事件"字段提供您的事件类型。可以通过实现ITupleDruidEventMapper接口来更改此逻辑。
```
DruidBeamFactory druidBeamFactory = new SampleDruidBeamFactoryImpl(new HashMap<String, Object>());
DruidConfig druidConfig = DruidConfig.newBuilder().discardStreamId(DruidConfig.DEFAULT_DISCARD_STREAM_ID).build();
ITupleDruidEventMapper<Map<String, Object>> eventMapper = new TupleDruidEventMapper<>(TupleDruidEventMapper.DEFAULT_FIELD_NAME);
DruidBeamBolt<Map<String, Object>> druidBolt = new DruidBeamBolt<Map<String, Object>>(druidBeamFactory, eventMapper, druidConfig);
topologyBuilder.setBolt("druid-bolt", druidBolt).shuffleGrouping("event-gen");
topologyBuilder.setBolt("printer-bolt", new PrinterBolt()).shuffleGrouping("druid-bolt" , druidConfig.getDiscardStreamId());
```
### Trident State
```
DruidBeamFactory druidBeamFactory = new SampleDruidBeamFactoryImpl(new HashMap<String, Object>());
ITupleDruidEventMapper<Map<String, Object>> eventMapper = new TupleDruidEventMapper<>(TupleDruidEventMapper.DEFAULT_FIELD_NAME);
final Stream stream = tridentTopology.newStream("batch-event-gen", new SimpleBatchSpout(10));
stream.peek(new Consumer() {
@Override
public void accept(TridentTuple input) {
LOG.info("########### Received tuple: [{}]", input);
}
}).partitionPersist(new DruidBeamStateFactory<Map<String, Object>>(druidBeamFactory, eventMapper), new Fields("event"), new DruidBeamStateUpdater());
```
### 样品工厂实现
Druid bolt 必须配置一个 BeamFactory. 您可以使用它们其中一个来实现 [DruidBeams builder's](https://github.com/druid-io/tranquility/blob/master/core/src/main/scala/com/metamx/tranquility/druid/DruidBeams.scala) "buildBeam()" method. See the [Configuration documentation](https://github.com/druid-io/tranquility/blob/master/docs/configuration.md) for details. For more details refer [Tranquility library](https://github.com/druid-io/tranquility) docs.
```
public class SampleDruidBeamFactoryImpl implements DruidBeamFactory<Map<String, Object>> {
@Override
public Beam<Map<String, Object>> makeBeam(Map<?, ?> conf, IMetricsContext metrics) {
final String indexService = "druid/overlord"; // The druid.service name of the indexing service Overlord node.
final String discoveryPath = "/druid/discovery"; // Curator service discovery path. config: druid.discovery.curator.path
final String dataSource = "test"; //The name of the ingested datasource. Datasources can be thought of as tables.
final List<String> dimensions = ImmutableList.of("publisher", "advertiser");
List<AggregatorFactory> aggregators = ImmutableList.<AggregatorFactory>of(
new CountAggregatorFactory(
"click"
)
);
// Tranquility needs to be able to extract timestamps from your object type (in this case, Map<String, Object>).
final Timestamper<Map<String, Object>> timestamper = new Timestamper<Map<String, Object>>()
{
@Override
public DateTime timestamp(Map<String, Object> theMap)
{
return new DateTime(theMap.get("timestamp"));
}
};
// Tranquility uses ZooKeeper (through Curator) for coordination.
final CuratorFramework curator = CuratorFrameworkFactory
.builder()
.connectString((String)conf.get("druid.tranquility.zk.connect")) //take config from storm conf
.retryPolicy(new ExponentialBackoffRetry(1000, 20, 30000))
.build();
curator.start();
// The JSON serialization of your object must have a timestamp field in a format that Druid understands. By default,
// Druid expects the field to be called "timestamp" and to be an ISO8601 timestamp.
final TimestampSpec timestampSpec = new TimestampSpec("timestamp", "auto", null);
// Tranquility needs to be able to serialize your object type to JSON for transmission to Druid. By default this is
// done with Jackson. If you want to provide an alternate serializer, you can provide your own via ```.objectWriter(...)```.
// In this case, we won't provide one, so we're just using Jackson.
final Beam<Map<String, Object>> beam = DruidBeams
.builder(timestamper)
.curator(curator)
.discoveryPath(discoveryPath)
.location(DruidLocation.create(indexService, dataSource))
.timestampSpec(timestampSpec)
.rollup(DruidRollup.create(DruidDimensions.specific(dimensions), aggregators, QueryGranularities.MINUTE))
.tuning(
ClusteredBeamTuning
.builder()
.segmentGranularity(Granularity.HOUR)
.windowPeriod(new Period("PT10M"))
.partitions(1)
.replicants(1)
.build()
)
.druidBeamConfig(
DruidBeamConfig
.builder()
.indexRetryPeriod(new Period("PT10M"))
.build())
.buildBeam();
return beam;
}
}
```
Example code is available [here.](https://github.com/apache/storm/tree/master/external/storm-druid/src/test/java/org/apache/storm/druid)
- Storm 基础
- 概念
- Scheduler(调度器)
- Configuration
- Guaranteeing Message Processing
- 守护进程容错
- 命令行客户端
- Storm UI REST API
- 理解 Storm Topology 的 Parallelism(并行度)
- FAQ
- Layers on Top of Storm
- Storm Trident
- Trident 教程
- Trident API 综述
- Trident State
- Trident Spouts
- Trident RAS API
- Storm SQL
- Storm SQL 集成
- Storm SQL 示例
- Storm SQL 语言参考
- Storm SQL 内部实现
- Flux
- Storm 安装和部署
- 设置Storm集群
- 本地模式
- 疑难解答
- 在生产集群上运行 Topology
- Maven
- 安全地运行 Apache Storm
- CGroup Enforcement
- Pacemaker
- 资源感知调度器 (Resource Aware Scheduler)
- 用于分析 Storm 的各种内部行为的 Metrics
- Windows 用户指南
- Storm 中级
- 序列化
- 常见 Topology 模式
- Clojure DSL
- 使用没有jvm的语言编辑storm
- Distributed RPC
- Transactional Topologies
- Hooks
- Storm Metrics
- Storm 状态管理
- Windowing Support in Core Storm
- Joining Streams in Storm Core
- Storm Distributed Cache API
- Storm 调试
- 动态日志级别设置
- Storm Logs
- 动态员工分析
- 拓扑事件检查器
- Storm 与外部系统, 以及其它库的集成
- Storm Kafka Integration
- Storm Kafka 集成(0.10.x+)
- Storm HBase Integration
- Storm HDFS Integration
- Storm Hive 集成
- Storm Solr 集成
- Storm Cassandra 集成
- Storm JDBC 集成
- Storm JMS 集成
- Storm Redis 集成
- Azue Event Hubs 集成
- Storm Elasticsearch 集成
- Storm MQTT(Message Queuing Telemetry Transport, 消息队列遥测传输) 集成
- Storm MongoDB 集成
- Storm OpenTSDB 集成
- Storm Kinesis 集成
- Storm Druid 集成
- Storm and Kestrel
- Container, Resource Management System Integration
- Storm 高级
- 针对 Storm 定义一个不是 JVM 的 DSL
- 多语言协议
- Storm 内部实现
- 翻译进度