# Beam 解释器
原文链接 : [http://zeppelin.apache.org/docs/0.7.2/interpreter/beam.html](http://zeppelin.apache.org/docs/0.7.2/interpreter/beam.html)
译文链接 : [http://www.apache.wiki/pages/viewpage.action?pageId=10030766](http://www.apache.wiki/pages/viewpage.action?pageId=10030766)
贡献者 : [片刻](/display/~jiangzhonglian) [ApacheCN](/display/~apachecn) [Apache中文网](/display/~apachechina)
## 概观
[Apache Beam](http://beam.incubator.apache.org/)是数据处理流水线的开源统一平台。可以使用其中一个Beam SDK构建管道。管道的执行由不同的跑步者完成。目前,Beam支持Apache Flink Runner,Apache Spark Runner和Google Dataflow Runner。
## 如何使用
基本上,您可以编写正常的Beam java代码,您可以在其中确定Runner。您应该在类中写入main方法,因为解释器调用此main来执行管道。与Zeppelin正常模式不同,每段都被视为单独的工作,与任何其他段落没有任何关系。
下面是一个字的一个示范与字符串数组表示的数据计数例如但它可以通过更换从文件中读取数据`Create.of(SENTENCES).withCoder(StringUtf8Coder.of())`用`TextIO.Read.from("path/to/filename.txt")`
```
%beam
// most used imports
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.transforms.Create;
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.ArrayList;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.*;
import org.apache.spark.SparkContext;
import org.apache.beam.runners.direct.*;
import org.apache.beam.sdk.runners.*;
import org.apache.beam.sdk.options.*;
import org.apache.beam.runners.spark.*;
import org.apache.beam.runners.spark.io.ConsoleIO;
import org.apache.beam.runners.flink.*;
import org.apache.beam.runners.flink.examples.WordCount.Options;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.options.PipelineOptions;
public class MinimalWordCount {
static List<String> s = new ArrayList<>();
static final String[] SENTENCES_ARRAY = new String[] {
"Hadoop is the Elephant King!",
"A yellow and elegant thing.",
"He never forgets",
"Useful data, or lets",
"An extraneous element cling!",
"A wonderful king is Hadoop.",
"The elephant plays well with Sqoop.",
"But what helps him to thrive",
"Are Impala, and Hive,",
"And HDFS in the group.",
"Hadoop is an elegant fellow.",
"An elephant gentle and mellow.",
"He never gets mad,",
"Or does anything bad,",
"Because, at his core, he is yellow",
};
static final List<String> SENTENCES = Arrays.asList(SENTENCES_ARRAY);
public static void main(String[] args) {
Options options = PipelineOptionsFactory.create().as(Options.class);
options.setRunner(FlinkRunner.class);
Pipeline p = Pipeline.create(options);
p.apply(Create.of(SENTENCES).withCoder(StringUtf8Coder.of()))
.apply("ExtractWords", ParDo.of(new DoFn<String, String>() {
@Override
public void processElement(ProcessContext c) {
for (String word : c.element().split("[^a-zA-Z']+")) {
if (!word.isEmpty()) {
c.output(word);
}
}
}
}))
.apply(Count.<String> perElement())
.apply("FormatResults", ParDo.of(new DoFn<KV<String, Long>, String>() {
@Override
public void processElement(DoFn<KV<String, Long>, String>.ProcessContext arg0)
throws Exception {
s.add("\n" + arg0.element().getKey() + "\t" + arg0.element().getValue());
}
}));
p.run();
System.out.println("%table word\tcount");
for (int i = 0; i < s.size(); i++) {
System.out.print(s.get(i));
}
}
}
```
- 快速入门
- 什么是Apache Zeppelin?
- 安装
- 配置
- 探索Apache Zeppelin UI
- 教程
- 动态表单
- 发表你的段落
- 自定义Zeppelin主页
- 升级Zeppelin版本
- 从源码编译
- 使用Flink和Spark Clusters安装Zeppelin教程
- 解释器
- 概述
- 解释器安装
- 解释器依赖管理
- 解释器的模拟用户
- 解释员执行Hook(实验)
- Alluxio 解释器
- Beam 解释器
- BigQuery 解释器
- Cassandra CQL 解释器
- Elasticsearch 解释器
- Flink 解释器
- Geode/Gemfire OQL 解释器
- HBase Shell 解释器
- HDFS文件系统 解释器
- Hive 解释器
- Ignite 解释器
- JDBC通用 解释器
- Kylin 解释器
- Lens 解释器
- Livy 解释器
- Markdown 解释器
- Pig 解释器
- PostgreSQL, HAWQ 解释器
- Python 2&3解释器
- R 解释器
- Scalding 解释器
- Scio 解释器
- Shell 解释器
- Spark 解释器
- 系统显示
- 系统基本显示
- 后端Angular API
- 前端Angular API
- 更多
- 笔记本存储
- REST API
- 解释器 API
- 笔记本 API
- 笔记本资源 API
- 配置 API
- 凭据 API
- Helium API
- Security ( 安全 )
- Shiro 授权
- 笔记本 授权
- 数据源 授权
- Helium 授权
- Advanced ( 高级 )
- Zeppelin on Vagrant VM ( Zeppelin 在 Vagrant 虚拟机上 )
- Zeppelin on Spark Cluster Mode( Spark 集群模式下的 Zeppelin )
- Zeppelin on CDH ( Zeppelin 在 CDH 上 )
- Contibute ( 贡献 )
- Writing a New Interpreter ( 写一个新的解释器 )
- Writing a new Visualization (Experimental) ( 编写新的可视化(实验) )
- Writing a new Application (Experimental) ( 写一个新的应用程序( 实验 ) )
- Contributing to Apache Zeppelin ( Code ) ( 向 Apache Zeppelin 贡献( 代码 ) )
- Contributing to Apache Zeppelin ( Website ) ( 向 Apache Zeppelin 贡献(website) )