多应用+插件架构,代码干净,二开方便,首家独创一键云编译技术,文档视频完善,免费商用码云13.8K 广告
# Storm Elasticsearch 集成 ## Storm Elasticsearch Bolt & Trident State EdIndexBolt,EsPercolateBolt和Estate允许用户将storm中的数据直接传输到Elasticsearch。 详细说明请参考以下内容。 ## EsIndexBolt (org.apache.storm.elasticsearch.bolt.EsIndexBolt) EsIndexBolt将tuples直接流入Elasticsearch索。 Tuples以指定的索引和类型组合进行索引。 用户应确保`EsTupleMapper`可以从输入元组中提取“source”,“index”,“type”和“id”,“index”和“type”用于识别目标索引和类型。“source” 一个JSON格式的文档,将在Elasticsearch中编入索引。 ``` EsConfig esConfig = new EsConfig(clusterName, new String[]{"localhost:9300"}); EsTupleMapper tupleMapper = new DefaultEsTupleMapper(); EsIndexBolt indexBolt = new EsIndexBolt(esConfig, tupleMapper); ``` ## EsPercolateBolt (org.apache.storm.elasticsearch.bolt.EsPercolateBolt) EsPercolateBolt将tuples直接流入Elasticsearch。 tuples用于发送渗透请求到指定的索引和类型组合。 用户应该确保`EsTupleMapper` 可以从输入元组中提取“source”,“index”,“type”,“index”和“type”用于识别目标索引和类型,“source”是一个文档 在JSON格式的字符串将发送到渗透请求到弹性搜索。 ``` EsConfig esConfig = new EsConfig(clusterName, new String[]{"localhost:9300"}); EsTupleMapper tupleMapper = new DefaultEsTupleMapper(); EsPercolateBolt percolateBolt = new EsPercolateBolt(esConfig, tupleMapper); ``` 如果存在非空的渗漏响应,EsPercolateBolt将会为PercolateResponse中每个Percolate.Match发出具有原始源和Percolate.Match的tuple。 ## EsState (org.apache.storm.elasticsearch.trident.EsState) Elasticsearch Trident state也与EsBolts类似。 它将EsConfig和EsTupleMapper作为参数。 ``` EsConfig esConfig = new EsConfig(clusterName, new String[]{"localhost:9300"}); EsTupleMapper tupleMapper = new DefaultEsTupleMapper(); StateFactory factory = new EsStateFactory(esConfig, tupleMapper); TridentState state = stream.partitionPersist(factory, esFields, new EsUpdater(), new Fields()); ``` ## EsLookupBolt (org.apache.storm.elasticsearch.bolt.EsLookupBolt) EsLookupBolt对Elasticsearch执行获取请求。 为了做到这一点,需要满足三个依赖。 除了通常的EsConfig,还必须提供其他两个依赖关系: ElasticsearchGetRequest用于将传入的元组转换为将针对Elasticsearch执行的GetRequest。 EsLookupResultOutput用于声明输出字段,并将GetResponse转换为由bolt发出的值。 传入的tuple被传递给提供的GetRequest创建者,该执行的结果被传递给Elasticsearch客户端。 然后,bolt使用提供程序输出适配器(EsLookupResultOutput)将GetResponse转换为值以发送。 输出字段也由bolt的用户通过输出适配器(EsLookupResultOutput)指定。 ``` EsConfig esConfig = createEsConfig(); ElasticsearchGetRequest getRequestAdapter = createElasticsearchGetRequest(); EsLookupResultOutput output = createOutput(); EsLookupBolt lookupBolt = new EsLookupBolt(esConfig, getRequestAdapter, output); ``` ## EsConfig (org.apache.storm.elasticsearch.common.EsConfig) =提供的组件(Bolt,State)以EsConfig作为构造函数arg。 ``` EsConfig esConfig = new EsConfig(clusterName, new String[]{"localhost:9300"}); ``` or ``` Map<String, String> additionalParameters = new HashMap<>(); additionalParameters.put("client.transport.sniff", "true"); EsConfig esConfig = new EsConfig(clusterName, new String[]{"localhost:9300"}, additionalParameters); ``` ### EsConfig params | Arg | Description | Type | | --- | --- | --- | | clusterName | Elasticsearch cluster name | String (required) | | nodes | Elasticsearch nodes in a String array, each element should follow {host}:{port} pattern | String array (required) | | additionalParameters | Additional Elasticsearch Transport Client configuration parameters | Map &lt;string, string=""&gt;(optional)&lt;/string,&gt; | ## EsTupleMapper (org.apache.storm.elasticsearch.common.EsTupleMapper) 对于存储在Elasticsearch中的tuple或者从Elasticsearch搜索到的tuple,我们需要定义使用哪些字段。 用户需要通过实现`EsTupleMapper`定义你自己的。 Storm-elasticsearch提供了默认的mapper`org.apache.storm.elasticsearch.common.DefaultEsTupleMapper`,它从相同的字段中提取其源,索引,类型,id值。 您可以参考DefaultEsTupleMapper的实现来看看如何实现自己的。 ## Committer Sponsors * Sriharsha Chintalapani ([@harshach](https://github.com/harshach)) * Jungtaek Lim ([@HeartSaVioR](https://github.com/HeartSaVioR))