gobblin
docker安装
Docker Integration - Gobblin Documentation
docker run -v /home/gobblin/conf:/etc/opt/job-conf \
-v /home/gobblin/work-dir:/home/gobblin/work-dir \
-v /home/gobblin/logs:/var/log/gobblin \
gobblin/gobblin-standalone:ubuntu-gobblin-latest采集kafka数据
Gobblin采集kafka数据 - Syn良子 - 博客园
概念
Gobblin--一个用于Hadoop的统一"数据抽取框架" - CSDN博客
组件
1、source
2、extractor
3、convertor
4、quality checker
5、writer
6、publisherSource主要负责将源数据整合到一系列workunits中,并指出对应的extractor是什么。这有点类似于Hadoop的InputFormat。
Extractor则通过workunit指定数据源的信息,例如kafka,指出topic中每个partition的起始offset,用于本次抽取使用。Gobblin使用了watermark的概念,记录每次抽取的数据的起始位置信息。
Converter顾名思义是转换器的意思,即对抽取的数据进行一些过滤、转换操作,例如将byte arrays 或者JSON格式的数据转换为需要输出的格式。转换操作也可以将一条数据映射成0条或多条数据(类似于flatmap操作)。
Quality Checker即质量检测器,有2中类型的checker:record-level和task-level的策略。通过手动策略或可选的策略,将被check的数据输出到外部文件或者给出warning。
Writer就是把导出的数据写出,但是这里并不是直接写出到output file,而是写到一个缓冲路径( staging directory)中。当所有的数据被写完后,才写到输出路径以便被publisher发布。Sink的路径可以包括HDFS或者kafka或者S3中,而格式可以是Avro,Parquet,或者CSV格式。同时Writer也可是根据时间戳,将输出的文件输出到按照“小时”或者“天”命名的目录中。
Publisher就是根据writer写出的路径,将数据输出到最终的路径。同时其提供2种提交机制:完全提交和部分提交;如果是完全提交,则需要等到task成功后才pub,如果是部分提交模式,则当task失败时,有部分在staging directory的数据已经被pub到输出路径了。
配置
Configuration Glossary - Gobblin Documentation
HiveWritableHdfsDataWriterBuilder和HiveSerDeConverter配置
压缩配置
writer..codec.type 配置压缩格式
以在 org.apache.gobblin.writer.SimpleDataWriterBuilder中为例:
源码分析
HiveWritableHdfsDataWriterBuilder和HiveSerDeConverter
配置方法见 HiveWritableHdfsDataWriterBuilder和HiveSerDeConverter配置
HiveWritableHdfsDataWriter的getWriter
HiveSerDeWrapper的get
数据写出
Writer就是把导出的数据写出,但是这里并不是直接写出到output file,而是写到一个缓冲路径( staging directory)中。当所有的数据被写完后,才写到输出路径以便被publisher发布。Sink的路径可以包括HDFS或者kafka或者S3中,而格式可以是Avro,Parquet,或者CSV格式。同时Writer也可是根据时间戳,将输出的文件输出到按照“小时”或者“天”命名的目录中 原文:https://blog.csdn.net/lmalds/article/details/53940549
STEP1.1 写到stagingFileOutputStream
```java SimpleDataWriter.java package org.apache.gobblin.writer;
public class SimpleDataWriter extends FsDataWriter {
}
STEP2 写出到最终目录,获取writer写出的文件发布到最终目录
public static final String DATA_PUBLISHER_FINAL_DIR = DATA_PUBLISHER_PREFIX + ".final.dir";
STEP4 publish之后的后续publish操作
如果在配置文件中配置了这个publish:
Source Extractor Converter 关系
Source获取Extractor Source配置source.class=org.apache.gobblin.source.extractor.extract.kafka.KafkaDeserializerSource
Extractor中配置了readRecord方法用于读取数据 Extractor中配置了getSchema方法用于获取schema
Converter 转换Schema,Schema从Extractor中获取 Converter配置converter.classes=org.apache.gobblin.converter.json.JsonStringToJsonIntermediateConverter,org.apache.gobblin.converter.avro.JsonIntermediateToAvroConverter 这个配置是链式的。按顺序依次转换,Source Schema and Converters
workunit的流转
在JobLauncher中调用source组件生成workunit然后生成hadoopjob去执行wu。
mapreduce相关
map分块 hadoopjob的GobblinWorkUnitsInputFormat中的getSplits方法决定了map分块的方式和数量, 在KafkaSource中,getWorkunits方法是按照partition和mr.job.max.mappers参数配置来决定生成的wu的数量, 比如map数量为默认值100,topic的分区数量为2,那么就会生成100个multiworkunit,其中有两个是有job的,对应topic的两个分区,其余的job均为空,如果分区数量大于map数量,那么其中便会有一些multiworkunit去获取多个分区的数据。
Last updated
Was this helpful?