🔥码云GVP开源项目 12k star Uniapp+ElementUI 功能强大 支持多语言、二开方便! 广告
# Beam 解释器 原文链接 : [http://zeppelin.apache.org/docs/0.7.2/interpreter/beam.html](http://zeppelin.apache.org/docs/0.7.2/interpreter/beam.html) 译文链接 : [http://www.apache.wiki/pages/viewpage.action?pageId=10030766](http://www.apache.wiki/pages/viewpage.action?pageId=10030766) 贡献者 : [片刻](/display/~jiangzhonglian) [ApacheCN](/display/~apachecn) [Apache中文网](/display/~apachechina) ## 概观 [Apache Beam](http://beam.incubator.apache.org/)是数据处理流水线的开源统一平台。可以使用其中一个Beam SDK构建管道。管道的执行由不同的跑步者完成。目前,Beam支持Apache Flink Runner,Apache Spark Runner和Google Dataflow Runner。 ## 如何使用 基本上,您可以编写正常的Beam java代码,您可以在其中确定Runner。您应该在类中写入main方法,因为解释器调用此main来执行管道。与Zeppelin正常模式不同,每段都被视为单独的工作,与任何其他段落没有任何关系。 下面是一个字的一个示范与字符串数组表示的数据计数例如但它可以通过更换从文件中读取数据`Create.of(SENTENCES).withCoder(StringUtf8Coder.of())`用`TextIO.Read.from("path/to/filename.txt")` ``` %beam // most used imports import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.transforms.Create; import java.io.Serializable; import java.util.Arrays; import java.util.List; import java.util.ArrayList; import org.apache.spark.api.java.*; import org.apache.spark.api.java.function.Function; import org.apache.spark.SparkConf; import org.apache.spark.streaming.*; import org.apache.spark.SparkContext; import org.apache.beam.runners.direct.*; import org.apache.beam.sdk.runners.*; import org.apache.beam.sdk.options.*; import org.apache.beam.runners.spark.*; import org.apache.beam.runners.spark.io.ConsoleIO; import org.apache.beam.runners.flink.*; import org.apache.beam.runners.flink.examples.WordCount.Options; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.options.PipelineOptions; public class MinimalWordCount { static List<String> s = new ArrayList<>(); static final String[] SENTENCES_ARRAY = new String[] { "Hadoop is the Elephant King!", "A yellow and elegant thing.", "He never forgets", "Useful data, or lets", "An extraneous element cling!", "A wonderful king is Hadoop.", "The elephant plays well with Sqoop.", "But what helps him to thrive", "Are Impala, and Hive,", "And HDFS in the group.", "Hadoop is an elegant fellow.", "An elephant gentle and mellow.", "He never gets mad,", "Or does anything bad,", "Because, at his core, he is yellow", }; static final List<String> SENTENCES = Arrays.asList(SENTENCES_ARRAY); public static void main(String[] args) { Options options = PipelineOptionsFactory.create().as(Options.class); options.setRunner(FlinkRunner.class); Pipeline p = Pipeline.create(options); p.apply(Create.of(SENTENCES).withCoder(StringUtf8Coder.of())) .apply("ExtractWords", ParDo.of(new DoFn<String, String>() { @Override public void processElement(ProcessContext c) { for (String word : c.element().split("[^a-zA-Z']+")) { if (!word.isEmpty()) { c.output(word); } } } })) .apply(Count.<String> perElement()) .apply("FormatResults", ParDo.of(new DoFn<KV<String, Long>, String>() { @Override public void processElement(DoFn<KV<String, Long>, String>.ProcessContext arg0) throws Exception { s.add("\n" + arg0.element().getKey() + "\t" + arg0.element().getValue()); } })); p.run(); System.out.println("%table word\tcount"); for (int i = 0; i < s.size(); i++) { System.out.print(s.get(i)); } } }  ```