gobblin

docker安装

Docker Integration - Gobblin Documentation

docker run -v /home/gobblin/conf:/etc/opt/job-conf \
-v /home/gobblin/work-dir:/home/gobblin/work-dir \
-v /home/gobblin/logs:/var/log/gobblin \
gobblin/gobblin-standalone:ubuntu-gobblin-latest

Gobblin编译支持CDH5.4.0 - CSDN博客

采集kafka数据

Gobblin采集kafka数据 - Syn良子 - 博客园

概念

Gobblin--一个用于Hadoop的统一"数据抽取框架" - CSDN博客

组件

1、source
2、extractor
3、convertor
4、quality checker
5、writer
6、publisher
  • Source主要负责将源数据整合到一系列workunits中,并指出对应的extractor是什么。这有点类似于Hadoop的InputFormat。

  • Extractor则通过workunit指定数据源的信息,例如kafka,指出topic中每个partition的起始offset,用于本次抽取使用。Gobblin使用了watermark的概念,记录每次抽取的数据的起始位置信息。

  • Converter顾名思义是转换器的意思,即对抽取的数据进行一些过滤、转换操作,例如将byte arrays 或者JSON格式的数据转换为需要输出的格式。转换操作也可以将一条数据映射成0条或多条数据(类似于flatmap操作)。

  • Quality Checker即质量检测器,有2中类型的checker:record-level和task-level的策略。通过手动策略或可选的策略,将被check的数据输出到外部文件或者给出warning。

  • Writer就是把导出的数据写出,但是这里并不是直接写出到output file,而是写到一个缓冲路径( staging directory)中。当所有的数据被写完后,才写到输出路径以便被publisher发布。Sink的路径可以包括HDFS或者kafka或者S3中,而格式可以是Avro,Parquet,或者CSV格式。同时Writer也可是根据时间戳,将输出的文件输出到按照“小时”或者“天”命名的目录中。

  • Publisher就是根据writer写出的路径,将数据输出到最终的路径。同时其提供2种提交机制:完全提交和部分提交;如果是完全提交,则需要等到task成功后才pub,如果是部分提交模式,则当task失败时,有部分在staging directory的数据已经被pub到输出路径了。

配置

Configuration Glossary - Gobblin Documentation

# TaskContext.getDataWriterBuilder
## DEFAULT_WRITER_BUILDER_CLASS = "org.apache.gobblin.writer.AvroDataWriterBuilder";
writer.builder.class=
# 要和writerBuilder保持统一
# writer输出文件的后缀 以及outputformat配置(但是writer里面好像并没用上?)
writer.output.format=ORC
#这两个配合决定了 可以让publisher分成job和task两个来配置
publish.data.at.job.level=false
job.commit.policy=successful
data.publisher.job.type=datacenter.plugins.gobblin.publisher.MyHiveRegistrationPublisher
data.publisher.task.type=org.apache.gobblin.publisher.BaseDataPublisher

HiveWritableHdfsDataWriterBuilder和HiveSerDeConverter配置

# 这个converter里面要配置 一个 HiveSerDeWrapper 用于反序列化输入数据(serde.deserializer.type) 和序列化输出(serde.serializer.type)数据,有两种配置方式
converter.classes=org.apache.gobblin.converter.serde.HiveSerDeConverter
# 这个配置是HiveSerDeConverter解析数据的schema
avro.schema.literal={"namespace":"demo.hive","type":"record","name":"hiveorc","fields":[{"name":"jobRoles","type":["int"]},{"name":"peopleWeightAvg","type":["float"]},{"name":"peopleOrg","type":["string","null"]}]}
#配置序列化和反序列化
## this.serializer = HiveSerDeWrapper.getSerializer(state).getSerDe();
## this.deserializer = HiveSerDeWrapper.getDeserializer(state).getSerDe();
# style1
## 配置一个已经定义好的wrapper(一个wrapper包括inputformat和outputformat)
## org.apache.gobblin.hive.HiveSerDeWrapper.BuiltInHiveSerDe
serde.deserializer.type=TEXTFILE
serde.serializer.type=ORC
# style2
serde.deserializer.type=TEXTFILE
## 自定义一个wrapper
### 这个type必须是存在的org.apache.hadoop.hive.serde2.SerDe的实现类
### 如果这个class是在org.apache.gobblin.hive.HiveSerDeWrapper.BuiltInHiveSerDe中已经定义的,那么不会在使用'serde.serializer.input.format.type'和'serde.serializer.output.format.type',见[HiveSerDeWrapper的get](####HiveSerDeWrapper的get)
serde.serializer.type=org.apache.hadoop.hive.serde2.SerDe
### 定义inputformat
serde.serializer.input.format.type=org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
### 定义outputformat
serde.serializer.output.format.type=org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
# HiveSerDeConverter通常可以配合HiveWritableHdfsDataWriterBuilder使用,
# HiveWritableHdfsDataWriterBuilder有两种配置方式
## 1. 定义 WRITER_WRITABLE_CLASS,和WRITER_OUTPUT_FORMAT_CLASS
writer.writable.class=org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
writer.output.format.class=org.apache.hadoop.mapred.TextOutputFormat
## 2.定义 SERDE_SERIALIZER_TYPE,用于获取WRITER_WRITABLE_CLASS和WRITER_OUTPUT_FORMAT_CLASS
## 这个serde.serializer.type和convert中定义是一个东西,所以也是有两种风格定义,见HiveSerDeConverter配置
## 这个配置和HiveSerDeConverter是共用一个配置,HiveSerDeConverter序列化输出的数据,将由HiveWritableHdfsDataWriter使用相同的序列化配置去写出到文件 见[HiveWritableHdfsDataWriter的getWriter](####HiveWritableHdfsDataWriter的getWriter)
serde.serializer.type=ORC

压缩配置

writer..codec.type 配置压缩格式

以在 org.apache.gobblin.writer.SimpleDataWriterBuilder中为例:

@Override
protected List<StreamCodec> buildEncoders() {
Preconditions.checkNotNull(this.destination, "Destination must be set before building encoders");
List<StreamCodec> encoders = new ArrayList<>();
//从 writer..codec.type 中读取压缩类型
Map<String, Object> compressionConfig =
CompressionConfigParser.getConfigForBranch(this.destination.getProperties(), this.branches, this.branch);
if (compressionConfig != null) {
// 生成StreamCodec
encoders.add(CompressionFactory.buildStreamCompressor(compressionConfig));
}
Map<String, Object> encryptionConfig = EncryptionConfigParser
.getConfigForBranch(EncryptionConfigParser.EntityType.WRITER, this.destination.getProperties(), this.branches,
this.branch);
if (encryptionConfig != null) {
encoders.add(EncryptionFactory.buildStreamCryptoProvider(encryptionConfig));
}
return encoders;
}

源码分析

HiveWritableHdfsDataWriterBuilder和HiveSerDeConverter

配置方法见 HiveWritableHdfsDataWriterBuilder和HiveSerDeConverter配置

package org.apache.gobblin.writer;
public class HiveWritableHdfsDataWriterBuilder<S> extends FsDataWriterBuilder<S, Writable> {
public DataWriter<Writable> build() throws IOException {
Preconditions.checkNotNull(this.destination);
Preconditions.checkArgument(!Strings.isNullOrEmpty(this.writerId));
State properties = this.destination.getProperties();
// 如果定义了WRITER_WRITABLE_CLASS和WRITER_OUTPUT_FORMAT_CLASS则直接用这两个创建writer
// 如果没定义而是配置了serde.serializer.type则用其去生成writer
if (!properties.contains(WRITER_WRITABLE_CLASS) || !properties.contains(WRITER_OUTPUT_FORMAT_CLASS)) {
HiveSerDeWrapper serializer = HiveSerDeWrapper.getSerializer(properties);
// 从org.apache.hadoop.hive.serde2.SerDe的实现类中获取serializedClass
properties.setProp(WRITER_WRITABLE_CLASS, serializer.getSerDe().getSerializedClass().getName());
properties.setProp(WRITER_OUTPUT_FORMAT_CLASS, serializer.getOutputFormatClassName());
}
return new HiveWritableHdfsDataWriter(this, properties);
}
}

HiveWritableHdfsDataWriter的getWriter

package org.apache.gobblin.writer;
public class HiveWritableHdfsDataWriter extends FsDataWriter<Writable> {
private RecordWriter getWriter() throws IOException {
try {
HiveOutputFormat<?, ?> outputFormat = HiveOutputFormat.class
.cast(Class.forName(this.properties.getProp(HiveWritableHdfsDataWriterBuilder.WRITER_OUTPUT_FORMAT_CLASS))
.newInstance());
@SuppressWarnings("unchecked")
Class<? extends Writable> writableClass = (Class<? extends Writable>) Class
.forName(this.properties.getProp(HiveWritableHdfsDataWriterBuilder.WRITER_WRITABLE_CLASS));
return outputFormat.getHiveRecordWriter(new JobConf(), this.stagingFile, writableClass, true,
this.properties.getProperties(), null);
} catch (Throwable t) {
throw new IOException(String.format("Failed to create writer"), t);
}
}
}

HiveSerDeWrapper的get

package org.apache.gobblin.hive;
public class HiveSerDeWrapper {
/**
* Get an instance of {@link HiveSerDeWrapper}.
*
* @param serDeType The SerDe type. If serDeType is one of the available {@link HiveSerDeWrapper.BuiltInHiveSerDe},
* the other three parameters are not used. Otherwise, serDeType should be the class name of a {@link SerDe},
* and the other three parameters must be present.
*/
public static HiveSerDeWrapper get(String serDeType, Optional<String> inputFormatClassName,
Optional<String> outputFormatClassName) {
Optional<BuiltInHiveSerDe> hiveSerDe = Enums.getIfPresent(BuiltInHiveSerDe.class, serDeType.toUpperCase());
if (hiveSerDe.isPresent()) {
return new HiveSerDeWrapper(hiveSerDe.get());
}
Preconditions.checkArgument(inputFormatClassName.isPresent(),
"Missing input format class name for SerDe " + serDeType);
Preconditions.checkArgument(outputFormatClassName.isPresent(),
"Missing output format class name for SerDe " + serDeType);
return new HiveSerDeWrapper(serDeType, inputFormatClassName.get(), outputFormatClassName.get());
}
}

数据写出

Writer就是把导出的数据写出,但是这里并不是直接写出到output file,而是写到一个缓冲路径( staging directory)中。当所有的数据被写完后,才写到输出路径以便被publisher发布。Sink的路径可以包括HDFS或者kafka或者S3中,而格式可以是Avro,Parquet,或者CSV格式。同时Writer也可是根据时间戳,将输出的文件输出到按照“小时”或者“天”命名的目录中 原文:https://blog.csdn.net/lmalds/article/details/53940549

STEP1.1 写到stagingFileOutputStream

```java SimpleDataWriter.java package org.apache.gobblin.writer;

public class SimpleDataWriter extends FsDataWriter {

/**
* Write a source record to the staging file
*
* @param record data record to write
* @throws java.io.IOException if there is anything wrong writing the record
*/
@Override
public void write(byte[] record) throws IOException {
Preconditions.checkNotNull(record);
byte[] toWrite = record;
if (this.recordDelimiter.isPresent()) {
toWrite = Arrays.copyOf(record, record.length + 1);
toWrite[toWrite.length - 1] = this.recordDelimiter.get();
}
if (this.prependSize) {
long recordSize = toWrite.length;
ByteBuffer buf = ByteBuffer.allocate(Longs.BYTES);
buf.putLong(recordSize);
toWrite = ArrayUtils.addAll(buf.array(), toWrite);
}
this.stagingFileOutputStream.write(toWrite);
this.bytesWritten += toWrite.length;
this.recordsWritten++;
}

}

STEP1.2 从stagingFile 移动到outputFile,即writer组件写出文件
>`outputFile`文件就是`writer.output.dir`配置的路径.
`public static final String WRITER_OUTPUT_DIR = WRITER_PREFIX + ".output.dir";`
```java
package org.apache.gobblin.writer;
public abstract class FsDataWriter<D> implements DataWriter<D>, FinalState, MetadataAwareWriter, SpeculativeAttemptAwareConstruct {
/**
* {@inheritDoc}.
*
* <p>
* This default implementation simply renames the staging file to the output file. If the output file
* already exists, it will delete it first before doing the renaming.
* </p>
*
* @throws IOException if any file operation fails
*/
@Override
public void commit() throws IOException {
this.closer.close();
setStagingFileGroup();
if (!this.fs.exists(this.stagingFile)) {
throw new IOException(String.format("File %s does not exist", this.stagingFile));
}
FileStatus stagingFileStatus = this.fs.getFileStatus(this.stagingFile);
// Double check permission of staging file
if (!stagingFileStatus.getPermission().equals(this.filePermission)) {
this.fs.setPermission(this.stagingFile, this.filePermission);
}
this.bytesWritten = Optional.of(Long.valueOf(stagingFileStatus.getLen()));
LOG.info(String.format("Moving data from %s to %s", this.stagingFile, this.outputFile));
// For the same reason as deleting the staging file if it already exists, deleting
// the output file if it already exists prevents task retry from being blocked.
if (this.fs.exists(this.outputFile)) {
LOG.warn(String.format("Task output file %s already exists", this.outputFile));
HadoopUtils.deletePath(this.fs, this.outputFile, false);
}
// ⚠️移动stagingFile到outputFile也就是writer的写出文件
HadoopUtils.renamePath(this.fs, this.stagingFile, this.outputFile);
}
}

STEP2 写出到最终目录,获取writer写出的文件发布到最终目录

public static final String DATA_PUBLISHER_FINAL_DIR = DATA_PUBLISHER_PREFIX + ".final.dir";

package org.apache.gobblin.publisher;
public class BaseDataPublisher extends SingleTaskDataPublisher {
protected void publishData(WorkUnitState state, int branchId, boolean publishSingleTaskData,
Set<Path> writerOutputPathsMoved)
throws IOException {
// Get a ParallelRunner instance for moving files in parallel
ParallelRunner parallelRunner = this.getParallelRunner(this.writerFileSystemByBranches.get(branchId));
// The directory where the workUnitState wrote its output data.
// ⚠️这里就是获取上一步写出的writer文件
Path writerOutputDir = WriterUtils.getWriterOutputDir(state, this.numBranches, branchId);
if (!this.writerFileSystemByBranches.get(branchId).exists(writerOutputDir)) {
LOG.warn(String.format("Branch %d of WorkUnit %s produced no data", branchId, state.getId()));
return;
}
// The directory where the final output directory for this job will be placed.
// It is a combination of DATA_PUBLISHER_FINAL_DIR and WRITER_FILE_PATH.
Path publisherOutputDir = getPublisherOutputDir(state, branchId);
if (publishSingleTaskData) {
// Create final output directory
WriterUtils.mkdirsWithRecursivePermissionWithRetry(this.publisherFileSystemByBranches.get(branchId), publisherOutputDir,
this.permissions.get(branchId), retrierConfig);
addSingleTaskWriterOutputToExistingDir(writerOutputDir, publisherOutputDir, state, branchId, parallelRunner);
} else {
if (writerOutputPathsMoved.contains(writerOutputDir)) {
// This writer output path has already been moved for another task of the same extract
// If publishSingleTaskData=true, writerOutputPathMoved is ignored.
return;
}
if (this.publisherFileSystemByBranches.get(branchId).exists(publisherOutputDir)) {
// The final output directory already exists, check if the job is configured to replace it.
// If publishSingleTaskData=true, final output directory is never replaced.
boolean replaceFinalOutputDir = this.getState().getPropAsBoolean(ForkOperatorUtils
.getPropertyNameForBranch(ConfigurationKeys.DATA_PUBLISHER_REPLACE_FINAL_DIR, this.numBranches, branchId));
// If the final output directory is not configured to be replaced, put new data to the existing directory.
if (!replaceFinalOutputDir) {
addWriterOutputToExistingDir(writerOutputDir, publisherOutputDir, state, branchId, parallelRunner);
writerOutputPathsMoved.add(writerOutputDir);
return;
}
// Delete the final output directory if it is configured to be replaced
LOG.info("Deleting publisher output dir " + publisherOutputDir);
this.publisherFileSystemByBranches.get(branchId).delete(publisherOutputDir, true);
} else {
// Create the parent directory of the final output directory if it does not exist
WriterUtils.mkdirsWithRecursivePermissionWithRetry(this.publisherFileSystemByBranches.get(branchId),
publisherOutputDir.getParent(), this.permissions.get(branchId), retrierConfig);
}
// ⚠️移动writerOutputDir到最终输出的publisherOutputDir
movePath(parallelRunner, state, writerOutputDir, publisherOutputDir, branchId);
writerOutputPathsMoved.add(writerOutputDir);
}
}
/**
* close操作将发布的文件路径publisherOutputDirs 附加到ConfigurationKeys.PUBLISHER_DIRS(`data.publisher.output.dirs`)属性的值中,供后续使用,比如 HiveRegistrationPublisher
*/
@Override
public void close()
throws IOException {
try {
for (Path path : this.publisherOutputDirs) {
this.state.appendToSetProp(ConfigurationKeys.PUBLISHER_DIRS, path.toString());
}
} finally {
// 调用google的`com.google.common.io.Closer`这个closer可以注册多个closable(`this.stack.push(closeable)`)在执行close方法时就会调用(`(Closeable)this.stack.pop().close()`)因此可以实现链式的关闭
this.closer.close();
}
}
}

STEP4 publish之后的后续publish操作

如果在配置文件中配置了这个publish:

##配置方案1
data.publisher.type=org.apache.gobblin.publisher.BaseDataPublisherWithHiveRegistration
## 配置方案2
### org.apache.gobblin.runtime.Task.shouldPublishDataInTask()
publish.data.at.job.level=false
job.commit.policy=successful
data.publisher.job.type=org.apache.gobblin.publisher.HiveRegistrationPublisher
data.publisher.task.type=org.apache.gobblin.publisher.BaseDataPublisher
package org.apache.gobblin.publisher;
public class BaseDataPublisherWithHiveRegistration extends BaseDataPublisher {
protected final HiveRegistrationPublisher hivePublisher;
public BaseDataPublisherWithHiveRegistration(State state) throws IOException {
super(state);
//向BaseDataPublisher的closer中注册该HiveRegistrationPublisher
this.hivePublisher = this.closer.register(new HiveRegistrationPublisher(state));
}
@Override
public void publish(Collection<? extends WorkUnitState> states) throws IOException {
super.publish(states);
this.hivePublisher.publish(states);
}
}

Source Extractor Converter 关系

Source获取Extractor Source配置source.class=org.apache.gobblin.source.extractor.extract.kafka.KafkaDeserializerSource

public class KafkaSimpleSource extends KafkaSource<String, byte[]> {
@Override
public Extractor<String, byte[]> getExtractor(WorkUnitState state) throws IOException {
return new KafkaSimpleExtractor(state);
}
}

Extractor中配置了readRecord方法用于读取数据 Extractor中配置了getSchema方法用于获取schema

public class KafkaSimpleExtractor extends KafkaExtractor<String, byte[]> {
public KafkaSimpleExtractor(WorkUnitState state) {
super(state);
this.kafkaSchemaRegistry = new SimpleKafkaSchemaRegistry(state.getProperties());
}
@Override
public String getSchema() throws IOException {
try {
return this.kafkaSchemaRegistry.getLatestSchemaByTopic(this.topicName);
} catch (SchemaRegistryException e) {
throw new RuntimeException(e);
}
}
}

Converter 转换Schema,Schema从Extractor中获取 Converter配置converter.classes=org.apache.gobblin.converter.json.JsonStringToJsonIntermediateConverter,org.apache.gobblin.converter.avro.JsonIntermediateToAvroConverter 这个配置是链式的。按顺序依次转换,Source Schema and Converters

package org.apache.gobblin.runtime;
public class Task implements TaskIFace {
private final Converter converter;
private final InstrumentedExtractorBase extractor;
public Task(TaskContext context, ...) {
//this.taskContext.getExtractor() 的内容: getSource().getExtractor(this.taskState)
this.extractor =
closer.register(new InstrumentedExtractorDecorator<>(this.taskState, this.taskContext.getExtractor()));
}
private void runSynchronousModel() throws Exception {
// 1。 转换schema
Object schema = converter.convertSchema(extractor.getSchema(), this.taskState);
// 2.转换数据
// extractor.readRecordEnvelope():从extractor获取数据
// converter.convertRecord():将从extractor获取的数据做转换
RecordEnvelope recordEnvelope;
// Extract, convert, and fork one source record at a time.
while (!shutdownRequested() && (recordEnvelope = extractor.readRecordEnvelope()) != null) {
onRecordExtract();
AcknowledgableWatermark ackableWatermark = new AcknowledgableWatermark(recordEnvelope.getWatermark());
if (watermarkTracker.isPresent()) {
watermarkTracker.get().track(ackableWatermark);
}
for (Object convertedRecord : converter.convertRecord(schema, recordEnvelope, this.taskState)) {
processRecord(convertedRecord, forkOperator, rowChecker, rowResults, branches,
ackableWatermark.incrementAck());
}
ackableWatermark.ack();
}
}
}

workunit的流转

在JobLauncher中调用source组件生成workunit然后生成hadoopjob去执行wu。

package org.apache.gobblin.runtime;
public abstract class AbstractJobLauncher implements JobLauncher {
@Override
public void launchJob(JobListener jobListener) hrows JobException {
//省略...
Source<?, ?> source = this.jobContext.getSource();
//1. 调用source组件生成workunit
if (source instanceof WorkUnitStreamSource) {
workUnitStream = ((WorkUnitStreamSource) source).getWorkunitStream(jobState);
} else {
workUnitStream = new BasicWorkUnitStream.Builder(source.getWorkunits(jobState)).build();
}
//省略...
//2. Start the job and wait for it to finish
// 见下方 MRJobLauncher$runWorkUnits 代码
runWorkUnitStream(workUnitStream);
//省略...
}
}
package org.apache.gobblin.runtime.mapreduce;
public class MRJobLauncher extends AbstractJobLauncher {
protected void runWorkUnits(List<WorkUnit> workUnits) throws Exception {
//省略...
prepareHadoopJob(workUnits);
//省略...
this.job.waitForCompletion(true);
//省略...
}
}
/**
* Prepare the Hadoop MR job, including configuring the job and setting up the input/output paths.
*/
private void prepareHadoopJob(List<WorkUnit> workUnits) throws IOException {
TimingEvent mrJobSetupTimer = this.eventSubmitter.getTimingEvent(TimingEvent.RunJobTimings.MR_JOB_SETUP);
// Add dependent jars/files
addDependencies(this.job.getConfiguration());
this.job.setJarByClass(MRJobLauncher.class);
this.job.setMapperClass(TaskRunner.class);
// The job is mapper-only
this.job.setNumReduceTasks(0);
this.job.setInputFormatClass(GobblinWorkUnitsInputFormat.class);
this.job.setOutputFormatClass(GobblinOutputFormat.class);
this.job.setMapOutputKeyClass(NullWritable.class);
this.job.setMapOutputValueClass(NullWritable.class);
// Set speculative execution
this.job.setSpeculativeExecution(isSpeculativeExecutionEnabled(this.jobProps));
this.job.getConfiguration().set("mapreduce.job.user.classpath.first", "true");
// Job input path is where input work unit files are stored
// Prepare job input
// wu文件写入hdfs
prepareJobInput(workUnits);
// 设置inputPath
FileInputFormat.addInputPath(this.job, this.jobInputPath);
// Job output path is where serialized task states are stored
FileOutputFormat.setOutputPath(this.job, this.jobOutputPath);
// Serialize source state to a file which will be picked up by the mappers
serializeJobState(this.fs, this.mrJobDir, this.conf, this.jobContext.getJobState(), this.job);
if (this.jobProps.containsKey(ConfigurationKeys.MR_JOB_MAX_MAPPERS_KEY)) {
GobblinWorkUnitsInputFormat.setMaxMappers(this.job,
Integer.parseInt(this.jobProps.getProperty(ConfigurationKeys.MR_JOB_MAX_MAPPERS_KEY)));
}
mrJobSetupTimer.stop();
}

mapreduce相关

map分块 hadoopjob的GobblinWorkUnitsInputFormat中的getSplits方法决定了map分块的方式和数量, 在KafkaSource中,getWorkunits方法是按照partition和mr.job.max.mappers参数配置来决定生成的wu的数量, 比如map数量为默认值100,topic的分区数量为2,那么就会生成100个multiworkunit,其中有两个是有job的,对应topic的两个分区,其余的job均为空,如果分区数量大于map数量,那么其中便会有一些multiworkunit去获取多个分区的数据。

package org.apache.gobblin.source.extractor.extract.kafka;
public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
public List<WorkUnit> getWorkunits(SourceState state) {
//省略...
Map<String, List<WorkUnit>> workUnits = Maps.newConcurrentMap();
//根据topic创建WorkUnitCreator
for (KafkaTopic topic : topics) {
threadPool.submit(
new WorkUnitCreator(topic, state, Optional.fromNullable(topicSpecificStateMap.get(topic.getName())),
workUnits));
}
//省略...
//获取map数量
int numOfMultiWorkunits =
state.getPropAsInt(ConfigurationKeys.MR_JOB_MAX_MAPPERS_KEY, ConfigurationKeys.DEFAULT_MR_JOB_MAX_MAPPERS);
//KafkaWorkUnitPacker是用来合并空的任务以及将workunit分配到numOfMultiWorkunits指定的数量的map上,见KafkaSingleLevelWorkUnitPacker
List<WorkUnit> workUnitList = KafkaWorkUnitPacker.getInstance(this, state).pack(workUnits, numOfMultiWorkunits);
//省略...
}
private class WorkUnitCreator implements Runnable {
@Override
public void run() {
//省略...
//生成workunit
this.allTopicWorkUnits.put(this.topic.getName(),
KafkaSource.this.getWorkUnitsForTopic(this.topic, this.state, this.topicSpecificState));
//省略...
}
}
private List<WorkUnit> getWorkUnitsForTopic(KafkaTopic topic, SourceState state, Optional<State> topicSpecificState) {
Timer.Context context = this.metricContext.timer("isTopicQualifiedTimer").time();
boolean topicQualified = isTopicQualified(topic);
context.close();
List<WorkUnit> workUnits = Lists.newArrayList();
for (KafkaPartition partition : topic.getPartitions()) {
WorkUnit workUnit = getWorkUnitForTopicPartition(partition, state, topicSpecificState);
this.partitionsToBeProcessed.add(partition);
if (workUnit != null) {
// For disqualified topics, for each of its workunits set the high watermark to be the same
// as the low watermark, so that it will be skipped.
if (!topicQualified) {
skipWorkUnit(workUnit);
}
workUnits.add(workUnit);
}
}
return workUnits;
}
}