您现在的位置:首页 >> 装修问答

通过 Java 来自学 Apache Beam

发布时间:2025/09/04 12:17    来源:江山家居装修网

egers).via((Integer number) -> number * 2));

为了解析结果,我们可以写一个假定。

PAssert.that(output).containsInAnyOrder(2, 4, 6, 8, 10);

注意,结果不排序,因为 Beam 将每一个特性作为法理的项透过并行处理事件。

测试者到这里就完并成了,我们通过加载后面的方法有开始运行管线:

pipeline.run;

Reduce 操纵

Reduce 操纵将多个输出特性透过交联,造并成了一个较小的不可数,通常只包涵一个特性。

MapReduce

那时候我们来延展上会的举例来说,将所有项等于 2 后求和,造并成了一个 MapReduce 转化操纵。

每一个 PCollection 转化都才会造并成了一个新的的 PCollection 程序中,这反之亦然我们可以常用 apply 方法有将转化URL上来。对于这个举例来说,将在每个特性等于 2 后常用 Sum 操纵:

PCollection numbers =pipeline.apply(Create.of(1, 2, 3, 4, 5));

PCollection output = numbers.apply(MapElements.into(TypeDeors.integers).via((Integer number) -> number * 2)).apply(Sum.integersGlobally);

PAssert.that(output).containsInAnyOrder(30);

pipeline.run;

FlatMap 操纵

FlatMap 先对每个输出特性应用映射,回到一个新的不可数,从而造并成了一个不可数的不可数。然后再应用 Flat 操纵将所有嵌套的不可数合并,之后生并成一个不可数。

下一个举例来说将把字符串运算符转化并成包涵唯一性后缀的运算符。

首先,我们道歉信将作为管线输出的后缀列出:

final String[] WORDS_ARRAY = new String[] {"hi bob", "hello alice", "hi sue"};

final List WORDS = Arrays.asList(WORDS_ARRAY);

然后,我们常用上会的列出创始人输出 PCollection:

PCollection input = pipeline.apply(Create.of(WORDS));

那时候,我们透过 FlatMap 转化,它将拆分每个嵌套运算符之中的后缀,并将结果合并并成一个列出:

PCollection output = input.apply(FlatMapElements.into(TypeDeors.strings).via((String line) -> Arrays.asList(line.split(" "))));

PAssert.that(output).containsInAnyOrder("hi", "bob", "hello", "alice", "hi", "sue");

pipeline.run;

Group 操纵

样本处理事件的一个类似的战斗任务是根据特定的键透过交联或计数。我们将计数上一个举例之中每个后缀出现的次数。

在有了扁平的字符串运算符以后,我们可以URL另一个 PTransform:

PCollection> output = input.apply(FlatMapElements.into(TypeDeors.strings).via((String line) -> Arrays.asList(line.split(" ")))).apply(Count.perElement);

造并成了结果:

PAssert.that(output).containsInAnyOrder(KV.of("hi", 2L),KV.of("hello", 1L),KV.of("alice", 1L),KV.of("sue", 1L),KV.of("bob", 1L));

从文档之中传输

Beam 的一个原则是可以从任何地方传输样本,所以我们来再来在实质当之中如何常用文本文档作为样本源。

后面的举例来说将传输包涵“An advanced unified programming model”文本的文档“words.txt”。然后转化表达型式将回到一个包涵每一个后缀的 PCollection。

PCollection input =pipeline.apply(TextIO.read.from("./src/main/resources/words.txt"));

PCollection output = input.apply(FlatMapElements.into(TypeDeors.strings).via((String line) -> Arrays.asList(line.split(" "))));

PAssert.that(output).containsInAnyOrder("An", "advanced", "unified", "programming", "model");

pipeline.run;

将结果撰写文档

从前面的输出举例来说可以想到,Beam 共享了多个配有的输出插槽。在后面的举例之中,我们将计数文本文档“words.txt”(只包涵一个词组“An advanced unified programming model")之中出现的每个后缀的总共,输出结果将撰写一个文本文档。

PCollection input =pipeline.apply(TextIO.read.from("./src/main/resources/words.txt"));

PCollection> output = input.apply(FlatMapElements.into(TypeDeors.strings).via((String line) -> Arrays.asList(line.split(" ")))).apply(Count.perElement);;

PAssert.that(output).containsInAnyOrder(KV.of("An", 1L),KV.of("advanced", 1L),KV.of("unified", 1L),KV.of("programming", 1L),KV.of("model", 1L));

output.apply(MapElements.into(TypeDeors.strings).via((KV kv) -> kv.getKey + " " + kv.getValue)).apply(TextIO.write.to("./src/main/resources/wordscount"));

pipeline.run;

普通用户情形,文档撰写也针对并行性透过了优化,这反之亦然 Beam 将决定保存结果的最佳移位(文档)总共。这些文档设于 src/main/resources 文档夹之中,文档名包涵了前缀“wordcount”、碎片序号和碎片总共。

在我的iPad上开始运行它生并成了 4 个移位:

第一个移位(文档名:wordscount-00001-of-00003):

An 1advanced 1

第二个移位(文档名:wordscount-00002-of-00003):

unified 1model 1

第三个移位(文档名:wordscount-00003-of-00003):

programming 1

之前一个移位是空的,因为所有的后缀都仍然被处理事件完了。

延展 Beam

我们可以通过撰写快捷键转化表达型式来延展 Beam。快捷键转化器将提高code的可维护性,并消除重复兼职。

基本上,我们所需创始人一个 PTransform 的子类,将输出和输出的型型式道歉信为 Java 泛型。然后撰写 expand 方法有,沙入我们的逻辑,它将接受单个字符串并回到包涵每个后缀的 PCollection。

public class WordsFileParser extends PTransform {

@Overridepublic PCollection expand(PCollection input) {return input.apply(FlatMapElements.into(TypeDeors.strings).via((String line) -> Arrays.asList(line.split(" "))));} }

用 WordsFileParser 来重构测试者布景就变并成了:

public class FileIOTest {

@Rulepublic final transient TestPipeline pipeline = TestPipeline.create;

@Testpublic void testReadInputFromFile {PCollection input =pipeline.apply(TextIO.read.from("./src/main/resources/words.txt"));

PCollection output = input.apply(new WordsFileParser);

PAssert.that(output).containsInAnyOrder("An", "advanced", "unified", "programming", "model");

pipeline.run;}

@Testpublic void testWriteOutputToFile {PCollection input =pipeline.apply(TextIO.read.from("./src/main/resources/words.txt"));

PCollection> output = input.apply(new WordsFileParser).apply(Count.perElement);

PAssert.that(output).containsInAnyOrder(KV.of("An", 1L),KV.of("advanced", 1L),KV.of("unified", 1L),KV.of("programming", 1L),KV.of("model", 1L));

output.apply(MapElements.into(TypeDeors.strings).via((KV kv) -> kv.getKey + " " + kv.getValue)).apply(TextIO.write.to ("./src/main/resources/wordscount"));

pipeline.run;}}

结果变并成了越来越模糊不清和越来越小型化的管线。

小时售票厅

Beam 的小时售票厅

都只处理事件之中一个类似的问题是将传入的样本按照一定的小时间隔透过配对,相比之下处理事件大量样本时。在这种情形,样本分析每小时或每天的交联样本比样本分析样本集的每个特性越来越有用。

在后面的举例之中,我们将假定我们身处金融各个领域,我们早就发送到包涵金额和结算小时的事件,我们决心获取每天的结算总值。

Beam 共享了一种用小时撕来花纹每个 PCollection 特性的方法有。我们可以通过这种方型式创始人一个代表 5 笔结算的 PCollection:

金额 10 和 20 是在 2022 年 02 月末 01 日转账的; 金额 30、40 和 50 是在 2022 年 02 月末 05 日转账的。 PCollection transactions =pipeline.apply(Create.timestamped(TimestampedValue.of(10, Instant.parse("2022-02-01T00:00:00+00:00")),TimestampedValue.of(20, Instant.parse("2022-02-01T00:00:00+00:00")),TimestampedValue.of(30, Instant.parse("2022-02-05T00:00:00+00:00")),TimestampedValue.of(40, Instant.parse("2022-02-05T00:00:00+00:00")),TimestampedValue.of(50, Instant.parse("2022-02-05T00:00:00+00:00"))));

接下来,我们将应用两个转化表达型式:

常用一天的小时售票厅对结算透过配对; 把每组的总共沙上来。 PCollection output =Transactions.apply(Window.into(FixedWindows.of(Duration.standardDays(1)))).apply(Combine.globally(Sum.ofIntegers).withoutDefaults);

在第一个小时售票厅(2022-02-01)之中,原计划投资总额为 30(10+20),而在第二个售票厅(2022-02-05)之中,我们无论如何想到投资总额为 120(30+40+50)。

PAssert.that(output).inWindow(new IntervalWindow(Instant.parse("2022-02-01T00:00:00+00:00"),Instant.parse("2022-02-02T00:00:00+00:00"))).containsInAnyOrder(30);

PAssert.that(output).inWindow(new IntervalWindow(Instant.parse("2022-02-05T00:00:00+00:00"),Instant.parse("2022-02-06T00:00:00+00:00"))).containsInAnyOrder(120);

每个 IntervalWindow 程序中所需匹配所落选小时段的确切开始和之前小时撕,因此所落选小时不能是“00:00:00”。

总 结

Beam 是一个强盛的经过实战检验的样本框架,反对程序中事件和都只处理事件。我们常用 Java SDK 透过了 Map、Reduce、Group 和小时售票厅等操纵。

Beam 适宜那些督导并行战斗任务的技术开发小组,可以简化大规模样本处理事件的的系统。

它的插槽、SDK 和对各种 Runner 的反对为我们带来了灵活性,你只要落选取一个原生 Runner,如 Google Cloud Dataflow,就可以借助计数资源的自动化管理。

作者简介:

Fabio Hiroki 是一位在 Mollie 母公司从事金融服务的的软件工程师。

原文URL:

进制化不是试出来,而是蹚出来的 | 行知进制之中国

此情可待并成珍重:Java Mac 旧版的20世纪

百度所有事业群一直执法人员减半;字节抖动新增的信念;传阿里正逐步撤除与蚂蚁集团的业务必经之路 | Q 网页

容计数的全球变局与之中国童话故事

点个在看少个 bug👇

贵州癫痫医院哪家最好
杭州白癜风专业医院
北京男科医院哪好
泉州白癜风医院排行榜
江苏男科医院哪个比较好
经常拉肚子
补益药
腰椎管狭窄
新冠后遗症
有没有什么办法可以快速止咳

上一篇: 你知道人工智能企业申请专利布局有哪些模式吗

下一篇: 武汉:1-2年完成不少于100公里开放测试道路智能化改造,智慧公交、自动驾驶大客车逐步落地

友情链接