▌Apache Beam 的架构设计
我们接下来看一下 Beam 架构是怎样的:
1. Apache Beam 的总体架构

文章插图
Apache Beam 的总体架构是这样的,上面有各种语言,编写了不同的 SDKs,Beam 通过连接这些 SDK 的数据源进行管道的逻辑操作,最后发布到大数据引擎上去执行 。需要注意的是,Local 虽然是一个 runner 但是不能用于生产上,它是用于调试/开发使用的 。
2. Apache Beam 的部署流程图

文章插图
让我们一起看下 Apache Beam 总体的部署流程 。首先我们去构建这个 Beam jobAPI .jar 通过 job 服务器以及设置大数据执行平台,最后提交 flink 或 spark 的任务集群去执行任务 。
▌Apache Beam 的核心组件刨析
1. SDks+Pipeline+Runners (前后端分离)

文章插图
如上图,前端是不同语言的 SDKs,读取数据写入管道,最后用这些大数据引擎去运行 。可以发现完整的 beam 程序由 SDks+Pipeline+Runners 构成的 。
2. 什么是 SDK?

文章插图
什么是 SDK,就是一个编写 beam 管道构成的一部分,一个客户端或一个类库组件也可以,最后提交到大数据运行平台上 。
3. Beam 版本和 Kafka-clients 依赖情况表

文章插图
我们以 kafka 为例,看一下 Kafka-client 对版本的依赖情况,从图中可以看出 beam 2.6.0 版本的 api 改变基本是稳定的 。当然,现在用的比较多的2.4、2.5版本 。吐个槽,2.6版本之前的兼容性问题,上个版本还有这个类或方法,下一个版本就没有了,兼容性不是很好 。
4. SDK beam-sdks-java-io-kafka 读取源码剖析

文章插图

文章插图

文章插图

文章插图

文章插图
① 指定 KafkaIO 的模型,从源码中不难看出这个地方的 KafkaIO<K,V> 类型是 Long 和 String 类型,也可以换成其他类型 。
pipeline.Apply(KafkaIO.<Long, String>read() pipeline.apply(KafkaIO.<Long, String>read()
② 设置 Kafka 集群的集群地址 。
.withBootstrapServers("broker_1:9092,broker_2:9092")
③ 设置 Kafka 的主题类型,源码中使用了单个主题类型,如果是多个主题类型则用withTopics(List) 方法进行设置 。设置情况基本跟 Kafka 原生是一样的 。
.withTopic("my_topic") // use withTopics(List<String>) to read from multiple topics.
④ 设置序列化类型 。Apache Beam KafkaIO 在序列化的时候做了很大的简化,例如原生 Kafka 可能要通过 Properties 类去设置,还要加上很长一段 jar 包的名字 。
Beam KafkaIO 的写法:
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
原生 Kafka 的设置:
Properties props = new Properties();
props.put("key.deserializer","org.apache.kafka.common.serialization.ByteArrayDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.ByteArrayDeserializer");
⑤ 设置 Kafka 的消费者属性,这个地方还可以设置其他的属性 。源码中是针对消费分组进行设置 。
.updateConsumerProperties(ImmutableMap.of("group.id", my_beam_app_1"))
⑥ 设置 Kafka 吞吐量的时间戳,可以是默认的,也可以自定义 。
.withLogAppendTime()
⑦ 相当于 Kafka 中 "isolation.level" , "read_committed",指定 KafkaConsumer 只应读取非事务性消息,或从其输入主题中提交事务性消息 。流处理应用程序通常在多个读取处理写入阶段处理其数据,每个阶段使用前一阶段的输出作为其输入 。通过指定read_committed 模式,我们可以在所有阶段完成一次处理 。针对 "Exactly-once" 语义,支持 Kafka 0.11 版本 。
.withReadCommitted()
⑧ 设置 Kafka 是否自动提交属性 "AUTO_COMMIT",默认为自动提交,使用 Beam 的方法来设置 。
set CommitOffsetsInFinalizeEnabled(boolean commitOffsetInFinalize)
推荐阅读
- 微服务架构如何实现网站服务垂直化拆分
- 详细讲解Tomcat系统架构
- Linux下apache安全配置策略
- 支付系统整体架构详解
- Apache ShardingSphere开源的分布式数据库中间件
- 架构师深入剖析JVM体系结构详解
- 微服务架构实践之api-gateway
- 京东架构师分享:微服务分布式一致性模式
- 百度技术架构师总结:微服务架构之访问安全
- 互联网架构演化
