package org.apache.gobblin.hive;
public class HiveSerDeWrapper {
/**
* Get an instance of {@link HiveSerDeWrapper}.
*
* @param serDeType The SerDe type. If serDeType is one of the available {@link HiveSerDeWrapper.BuiltInHiveSerDe},
* the other three parameters are not used. Otherwise, serDeType should be the class name of a {@link SerDe},
* and the other three parameters must be present.
*/
public static HiveSerDeWrapper get(String serDeType, Optional<String> inputFormatClassName,
Optional<String> outputFormatClassName) {
Optional<BuiltInHiveSerDe> hiveSerDe = Enums.getIfPresent(BuiltInHiveSerDe.class, serDeType.toUpperCase());
if (hiveSerDe.isPresent()) {
return new HiveSerDeWrapper(hiveSerDe.get());
}
Preconditions.checkArgument(inputFormatClassName.isPresent(),
"Missing input format class name for SerDe " + serDeType);
Preconditions.checkArgument(outputFormatClassName.isPresent(),
"Missing output format class name for SerDe " + serDeType);
return new HiveSerDeWrapper(serDeType, inputFormatClassName.get(), outputFormatClassName.get());
}
}
/**
* Write a source record to the staging file
*
* @param record data record to write
* @throws java.io.IOException if there is anything wrong writing the record
*/
@Override
public void write(byte[] record) throws IOException {
Preconditions.checkNotNull(record);
byte[] toWrite = record;
if (this.recordDelimiter.isPresent()) {
toWrite = Arrays.copyOf(record, record.length + 1);
toWrite[toWrite.length - 1] = this.recordDelimiter.get();
}
if (this.prependSize) {
long recordSize = toWrite.length;
ByteBuffer buf = ByteBuffer.allocate(Longs.BYTES);
buf.putLong(recordSize);
toWrite = ArrayUtils.addAll(buf.array(), toWrite);
}
this.stagingFileOutputStream.write(toWrite);
this.bytesWritten += toWrite.length;
this.recordsWritten++;
}
STEP1.2 从stagingFile 移动到outputFile,即writer组件写出文件
>`outputFile`文件就是`writer.output.dir`配置的路径.
`public static final String WRITER_OUTPUT_DIR = WRITER_PREFIX + ".output.dir";`
```java
package org.apache.gobblin.writer;
public abstract class FsDataWriter<D> implements DataWriter<D>, FinalState, MetadataAwareWriter, SpeculativeAttemptAwareConstruct {
/**
* {@inheritDoc}.
*
* <p>
* This default implementation simply renames the staging file to the output file. If the output file
* already exists, it will delete it first before doing the renaming.
* </p>
*
* @throws IOException if any file operation fails
*/
@Override
public void commit() throws IOException {
this.closer.close();
setStagingFileGroup();
if (!this.fs.exists(this.stagingFile)) {
throw new IOException(String.format("File %s does not exist", this.stagingFile));
}
FileStatus stagingFileStatus = this.fs.getFileStatus(this.stagingFile);
// Double check permission of staging file
if (!stagingFileStatus.getPermission().equals(this.filePermission)) {
this.fs.setPermission(this.stagingFile, this.filePermission);
}
this.bytesWritten = Optional.of(Long.valueOf(stagingFileStatus.getLen()));
LOG.info(String.format("Moving data from %s to %s", this.stagingFile, this.outputFile));
// For the same reason as deleting the staging file if it already exists, deleting
// the output file if it already exists prevents task retry from being blocked.
if (this.fs.exists(this.outputFile)) {
LOG.warn(String.format("Task output file %s already exists", this.outputFile));
HadoopUtils.deletePath(this.fs, this.outputFile, false);
}
// ⚠️移动stagingFile到outputFile也就是writer的写出文件
HadoopUtils.renamePath(this.fs, this.stagingFile, this.outputFile);
}
}
package org.apache.gobblin.publisher;
public class BaseDataPublisher extends SingleTaskDataPublisher {
protected void publishData(WorkUnitState state, int branchId, boolean publishSingleTaskData,
Set<Path> writerOutputPathsMoved)
throws IOException {
// Get a ParallelRunner instance for moving files in parallel
ParallelRunner parallelRunner = this.getParallelRunner(this.writerFileSystemByBranches.get(branchId));
// The directory where the workUnitState wrote its output data.
// ⚠️这里就是获取上一步写出的writer文件
Path writerOutputDir = WriterUtils.getWriterOutputDir(state, this.numBranches, branchId);
if (!this.writerFileSystemByBranches.get(branchId).exists(writerOutputDir)) {
LOG.warn(String.format("Branch %d of WorkUnit %s produced no data", branchId, state.getId()));
return;
}
// The directory where the final output directory for this job will be placed.
// It is a combination of DATA_PUBLISHER_FINAL_DIR and WRITER_FILE_PATH.
Path publisherOutputDir = getPublisherOutputDir(state, branchId);
if (publishSingleTaskData) {
// Create final output directory
WriterUtils.mkdirsWithRecursivePermissionWithRetry(this.publisherFileSystemByBranches.get(branchId), publisherOutputDir,
this.permissions.get(branchId), retrierConfig);
addSingleTaskWriterOutputToExistingDir(writerOutputDir, publisherOutputDir, state, branchId, parallelRunner);
} else {
if (writerOutputPathsMoved.contains(writerOutputDir)) {
// This writer output path has already been moved for another task of the same extract
// If publishSingleTaskData=true, writerOutputPathMoved is ignored.
return;
}
if (this.publisherFileSystemByBranches.get(branchId).exists(publisherOutputDir)) {
// The final output directory already exists, check if the job is configured to replace it.
// If publishSingleTaskData=true, final output directory is never replaced.
boolean replaceFinalOutputDir = this.getState().getPropAsBoolean(ForkOperatorUtils
.getPropertyNameForBranch(ConfigurationKeys.DATA_PUBLISHER_REPLACE_FINAL_DIR, this.numBranches, branchId));
// If the final output directory is not configured to be replaced, put new data to the existing directory.
if (!replaceFinalOutputDir) {
addWriterOutputToExistingDir(writerOutputDir, publisherOutputDir, state, branchId, parallelRunner);
writerOutputPathsMoved.add(writerOutputDir);
return;
}
// Delete the final output directory if it is configured to be replaced
LOG.info("Deleting publisher output dir " + publisherOutputDir);
this.publisherFileSystemByBranches.get(branchId).delete(publisherOutputDir, true);
} else {
// Create the parent directory of the final output directory if it does not exist
WriterUtils.mkdirsWithRecursivePermissionWithRetry(this.publisherFileSystemByBranches.get(branchId),
publisherOutputDir.getParent(), this.permissions.get(branchId), retrierConfig);
}
// ⚠️移动writerOutputDir到最终输出的publisherOutputDir
movePath(parallelRunner, state, writerOutputDir, publisherOutputDir, branchId);
writerOutputPathsMoved.add(writerOutputDir);
}
}
/**
* close操作将发布的文件路径publisherOutputDirs 附加到ConfigurationKeys.PUBLISHER_DIRS(`data.publisher.output.dirs`)属性的值中,供后续使用,比如 HiveRegistrationPublisher
*/
@Override
public void close()
throws IOException {
try {
for (Path path : this.publisherOutputDirs) {
this.state.appendToSetProp(ConfigurationKeys.PUBLISHER_DIRS, path.toString());
}
} finally {
// 调用google的`com.google.common.io.Closer`这个closer可以注册多个closable(`this.stack.push(closeable)`)在执行close方法时就会调用(`(Closeable)this.stack.pop().close()`)因此可以实现链式的关闭
this.closer.close();
}
}
}
package org.apache.gobblin.publisher;
public class BaseDataPublisherWithHiveRegistration extends BaseDataPublisher {
protected final HiveRegistrationPublisher hivePublisher;
public BaseDataPublisherWithHiveRegistration(State state) throws IOException {
super(state);
//向BaseDataPublisher的closer中注册该HiveRegistrationPublisher
this.hivePublisher = this.closer.register(new HiveRegistrationPublisher(state));
}
@Override
public void publish(Collection<? extends WorkUnitState> states) throws IOException {
super.publish(states);
this.hivePublisher.publish(states);
}
}
public class KafkaSimpleSource extends KafkaSource<String, byte[]> {
@Override
public Extractor<String, byte[]> getExtractor(WorkUnitState state) throws IOException {
return new KafkaSimpleExtractor(state);
}
}
public class KafkaSimpleExtractor extends KafkaExtractor<String, byte[]> {
public KafkaSimpleExtractor(WorkUnitState state) {
super(state);
this.kafkaSchemaRegistry = new SimpleKafkaSchemaRegistry(state.getProperties());
}
@Override
public String getSchema() throws IOException {
try {
return this.kafkaSchemaRegistry.getLatestSchemaByTopic(this.topicName);
} catch (SchemaRegistryException e) {
throw new RuntimeException(e);
}
}
}
package org.apache.gobblin.runtime;
public class Task implements TaskIFace {
private final Converter converter;
private final InstrumentedExtractorBase extractor;
public Task(TaskContext context, ...) {
//this.taskContext.getExtractor() 的内容: getSource().getExtractor(this.taskState)
this.extractor =
closer.register(new InstrumentedExtractorDecorator<>(this.taskState, this.taskContext.getExtractor()));
}
private void runSynchronousModel() throws Exception {
// 1。 转换schema
Object schema = converter.convertSchema(extractor.getSchema(), this.taskState);
// 2.转换数据
// extractor.readRecordEnvelope():从extractor获取数据
// converter.convertRecord():将从extractor获取的数据做转换
RecordEnvelope recordEnvelope;
// Extract, convert, and fork one source record at a time.
while (!shutdownRequested() && (recordEnvelope = extractor.readRecordEnvelope()) != null) {
onRecordExtract();
AcknowledgableWatermark ackableWatermark = new AcknowledgableWatermark(recordEnvelope.getWatermark());
if (watermarkTracker.isPresent()) {
watermarkTracker.get().track(ackableWatermark);
}
for (Object convertedRecord : converter.convertRecord(schema, recordEnvelope, this.taskState)) {
processRecord(convertedRecord, forkOperator, rowChecker, rowResults, branches,
ackableWatermark.incrementAck());
}
ackableWatermark.ack();
}
}
}
package org.apache.gobblin.runtime;
public abstract class AbstractJobLauncher implements JobLauncher {
@Override
public void launchJob(JobListener jobListener) hrows JobException {
//省略...
Source<?, ?> source = this.jobContext.getSource();
//1. 调用source组件生成workunit
if (source instanceof WorkUnitStreamSource) {
workUnitStream = ((WorkUnitStreamSource) source).getWorkunitStream(jobState);
} else {
workUnitStream = new BasicWorkUnitStream.Builder(source.getWorkunits(jobState)).build();
}
//省略...
//2. Start the job and wait for it to finish
// 见下方 MRJobLauncher$runWorkUnits 代码
runWorkUnitStream(workUnitStream);
//省略...
}
}
package org.apache.gobblin.runtime.mapreduce;
public class MRJobLauncher extends AbstractJobLauncher {
protected void runWorkUnits(List<WorkUnit> workUnits) throws Exception {
//省略...
prepareHadoopJob(workUnits);
//省略...
this.job.waitForCompletion(true);
//省略...
}
}
/**
* Prepare the Hadoop MR job, including configuring the job and setting up the input/output paths.
*/
private void prepareHadoopJob(List<WorkUnit> workUnits) throws IOException {
TimingEvent mrJobSetupTimer = this.eventSubmitter.getTimingEvent(TimingEvent.RunJobTimings.MR_JOB_SETUP);
// Add dependent jars/files
addDependencies(this.job.getConfiguration());
this.job.setJarByClass(MRJobLauncher.class);
this.job.setMapperClass(TaskRunner.class);
// The job is mapper-only
this.job.setNumReduceTasks(0);
this.job.setInputFormatClass(GobblinWorkUnitsInputFormat.class);
this.job.setOutputFormatClass(GobblinOutputFormat.class);
this.job.setMapOutputKeyClass(NullWritable.class);
this.job.setMapOutputValueClass(NullWritable.class);
// Set speculative execution
this.job.setSpeculativeExecution(isSpeculativeExecutionEnabled(this.jobProps));
this.job.getConfiguration().set("mapreduce.job.user.classpath.first", "true");
// Job input path is where input work unit files are stored
// Prepare job input
// wu文件写入hdfs
prepareJobInput(workUnits);
// 设置inputPath
FileInputFormat.addInputPath(this.job, this.jobInputPath);
// Job output path is where serialized task states are stored
FileOutputFormat.setOutputPath(this.job, this.jobOutputPath);
// Serialize source state to a file which will be picked up by the mappers
serializeJobState(this.fs, this.mrJobDir, this.conf, this.jobContext.getJobState(), this.job);
if (this.jobProps.containsKey(ConfigurationKeys.MR_JOB_MAX_MAPPERS_KEY)) {
GobblinWorkUnitsInputFormat.setMaxMappers(this.job,
Integer.parseInt(this.jobProps.getProperty(ConfigurationKeys.MR_JOB_MAX_MAPPERS_KEY)));
}
mrJobSetupTimer.stop();
}
package org.apache.gobblin.source.extractor.extract.kafka;
public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
public List<WorkUnit> getWorkunits(SourceState state) {
//省略...
Map<String, List<WorkUnit>> workUnits = Maps.newConcurrentMap();
//根据topic创建WorkUnitCreator
for (KafkaTopic topic : topics) {
threadPool.submit(
new WorkUnitCreator(topic, state, Optional.fromNullable(topicSpecificStateMap.get(topic.getName())),
workUnits));
}
//省略...
//获取map数量
int numOfMultiWorkunits =
state.getPropAsInt(ConfigurationKeys.MR_JOB_MAX_MAPPERS_KEY, ConfigurationKeys.DEFAULT_MR_JOB_MAX_MAPPERS);
//KafkaWorkUnitPacker是用来合并空的任务以及将workunit分配到numOfMultiWorkunits指定的数量的map上,见KafkaSingleLevelWorkUnitPacker
List<WorkUnit> workUnitList = KafkaWorkUnitPacker.getInstance(this, state).pack(workUnits, numOfMultiWorkunits);
//省略...
}
private class WorkUnitCreator implements Runnable {
@Override
public void run() {
//省略...
//生成workunit
this.allTopicWorkUnits.put(this.topic.getName(),
KafkaSource.this.getWorkUnitsForTopic(this.topic, this.state, this.topicSpecificState));
//省略...
}
}
private List<WorkUnit> getWorkUnitsForTopic(KafkaTopic topic, SourceState state, Optional<State> topicSpecificState) {
Timer.Context context = this.metricContext.timer("isTopicQualifiedTimer").time();
boolean topicQualified = isTopicQualified(topic);
context.close();
List<WorkUnit> workUnits = Lists.newArrayList();
for (KafkaPartition partition : topic.getPartitions()) {
WorkUnit workUnit = getWorkUnitForTopicPartition(partition, state, topicSpecificState);
this.partitionsToBeProcessed.add(partition);
if (workUnit != null) {
// For disqualified topics, for each of its workunits set the high watermark to be the same
// as the low watermark, so that it will be skipped.
if (!topicQualified) {
skipWorkUnit(workUnit);
}
workUnits.add(workUnit);
}
}
return workUnits;
}
}