# Storm Elasticsearch 集成
## Storm Elasticsearch Bolt & Trident State
EdIndexBolt,EsPercolateBolt和Estate允许用户将storm中的数据直接传输到Elasticsearch。 详细说明请参考以下内容。
## EsIndexBolt (org.apache.storm.elasticsearch.bolt.EsIndexBolt)
EsIndexBolt将tuples直接流入Elasticsearch索。 Tuples以指定的索引和类型组合进行索引。 用户应确保`EsTupleMapper`可以从输入元组中提取“source”,“index”,“type”和“id”,“index”和“type”用于识别目标索引和类型。“source” 一个JSON格式的文档,将在Elasticsearch中编入索引。
```
EsConfig esConfig = new EsConfig(clusterName, new String[]{"localhost:9300"});
EsTupleMapper tupleMapper = new DefaultEsTupleMapper();
EsIndexBolt indexBolt = new EsIndexBolt(esConfig, tupleMapper);
```
## EsPercolateBolt (org.apache.storm.elasticsearch.bolt.EsPercolateBolt)
EsPercolateBolt将tuples直接流入Elasticsearch。 tuples用于发送渗透请求到指定的索引和类型组合。 用户应该确保`EsTupleMapper` 可以从输入元组中提取“source”,“index”,“type”,“index”和“type”用于识别目标索引和类型,“source”是一个文档 在JSON格式的字符串将发送到渗透请求到弹性搜索。
```
EsConfig esConfig = new EsConfig(clusterName, new String[]{"localhost:9300"});
EsTupleMapper tupleMapper = new DefaultEsTupleMapper();
EsPercolateBolt percolateBolt = new EsPercolateBolt(esConfig, tupleMapper);
```
如果存在非空的渗漏响应,EsPercolateBolt将会为PercolateResponse中每个Percolate.Match发出具有原始源和Percolate.Match的tuple。
## EsState (org.apache.storm.elasticsearch.trident.EsState)
Elasticsearch Trident state也与EsBolts类似。 它将EsConfig和EsTupleMapper作为参数。
```
EsConfig esConfig = new EsConfig(clusterName, new String[]{"localhost:9300"});
EsTupleMapper tupleMapper = new DefaultEsTupleMapper();
StateFactory factory = new EsStateFactory(esConfig, tupleMapper);
TridentState state = stream.partitionPersist(factory, esFields, new EsUpdater(), new Fields());
```
## EsLookupBolt (org.apache.storm.elasticsearch.bolt.EsLookupBolt)
EsLookupBolt对Elasticsearch执行获取请求。 为了做到这一点,需要满足三个依赖。 除了通常的EsConfig,还必须提供其他两个依赖关系: ElasticsearchGetRequest用于将传入的元组转换为将针对Elasticsearch执行的GetRequest。 EsLookupResultOutput用于声明输出字段,并将GetResponse转换为由bolt发出的值。
传入的tuple被传递给提供的GetRequest创建者,该执行的结果被传递给Elasticsearch客户端。 然后,bolt使用提供程序输出适配器(EsLookupResultOutput)将GetResponse转换为值以发送。 输出字段也由bolt的用户通过输出适配器(EsLookupResultOutput)指定。
```
EsConfig esConfig = createEsConfig();
ElasticsearchGetRequest getRequestAdapter = createElasticsearchGetRequest();
EsLookupResultOutput output = createOutput();
EsLookupBolt lookupBolt = new EsLookupBolt(esConfig, getRequestAdapter, output);
```
## EsConfig (org.apache.storm.elasticsearch.common.EsConfig)
=提供的组件(Bolt,State)以EsConfig作为构造函数arg。
```
EsConfig esConfig = new EsConfig(clusterName, new String[]{"localhost:9300"});
```
or
```
Map<String, String> additionalParameters = new HashMap<>();
additionalParameters.put("client.transport.sniff", "true");
EsConfig esConfig = new EsConfig(clusterName, new String[]{"localhost:9300"}, additionalParameters);
```
### EsConfig params
| Arg | Description | Type |
| --- | --- | --- |
| clusterName | Elasticsearch cluster name | String (required) |
| nodes | Elasticsearch nodes in a String array, each element should follow {host}:{port} pattern | String array (required) |
| additionalParameters | Additional Elasticsearch Transport Client configuration parameters | Map <string, string="">(optional)</string,> |
## EsTupleMapper (org.apache.storm.elasticsearch.common.EsTupleMapper)
对于存储在Elasticsearch中的tuple或者从Elasticsearch搜索到的tuple,我们需要定义使用哪些字段。 用户需要通过实现`EsTupleMapper`定义你自己的。 Storm-elasticsearch提供了默认的mapper`org.apache.storm.elasticsearch.common.DefaultEsTupleMapper`,它从相同的字段中提取其源,索引,类型,id值。 您可以参考DefaultEsTupleMapper的实现来看看如何实现自己的。
## Committer Sponsors
* Sriharsha Chintalapani ([@harshach](https://github.com/harshach))
* Jungtaek Lim ([@HeartSaVioR](https://github.com/HeartSaVioR))
- Storm 基础
- 概念
- Scheduler(调度器)
- Configuration
- Guaranteeing Message Processing
- 守护进程容错
- 命令行客户端
- Storm UI REST API
- 理解 Storm Topology 的 Parallelism(并行度)
- FAQ
- Layers on Top of Storm
- Storm Trident
- Trident 教程
- Trident API 综述
- Trident State
- Trident Spouts
- Trident RAS API
- Storm SQL
- Storm SQL 集成
- Storm SQL 示例
- Storm SQL 语言参考
- Storm SQL 内部实现
- Flux
- Storm 安装和部署
- 设置Storm集群
- 本地模式
- 疑难解答
- 在生产集群上运行 Topology
- Maven
- 安全地运行 Apache Storm
- CGroup Enforcement
- Pacemaker
- 资源感知调度器 (Resource Aware Scheduler)
- 用于分析 Storm 的各种内部行为的 Metrics
- Windows 用户指南
- Storm 中级
- 序列化
- 常见 Topology 模式
- Clojure DSL
- 使用没有jvm的语言编辑storm
- Distributed RPC
- Transactional Topologies
- Hooks
- Storm Metrics
- Storm 状态管理
- Windowing Support in Core Storm
- Joining Streams in Storm Core
- Storm Distributed Cache API
- Storm 调试
- 动态日志级别设置
- Storm Logs
- 动态员工分析
- 拓扑事件检查器
- Storm 与外部系统, 以及其它库的集成
- Storm Kafka Integration
- Storm Kafka 集成(0.10.x+)
- Storm HBase Integration
- Storm HDFS Integration
- Storm Hive 集成
- Storm Solr 集成
- Storm Cassandra 集成
- Storm JDBC 集成
- Storm JMS 集成
- Storm Redis 集成
- Azue Event Hubs 集成
- Storm Elasticsearch 集成
- Storm MQTT(Message Queuing Telemetry Transport, 消息队列遥测传输) 集成
- Storm MongoDB 集成
- Storm OpenTSDB 集成
- Storm Kinesis 集成
- Storm Druid 集成
- Storm and Kestrel
- Container, Resource Management System Integration
- Storm 高级
- 针对 Storm 定义一个不是 JVM 的 DSL
- 多语言协议
- Storm 内部实现
- 翻译进度