自定义hive配置
问题
sqoop操作hive时,会读取hive的配置文件,但是如果想要通过参数动态的覆盖默认的hive配置该如何?
探讨
首先看这个类org.apache.sqoop.hive.HiveConfig
Copy //class: org.apache.sqoop.hive.HiveConfig
//line: 42
public static Configuration getHiveConf( Configuration conf) throws IOException {
//...
Class HiveConfClass = Class . forName (HIVE_CONF_CLASS);
return ((Configuration)( HiveConfClass . getConstructor ( Configuration . class , Class . class )
. newInstance (conf , Configuration . class )));
//...
}
这段代码获取的是org.apache.hadoop.hive.conf.HiveConf
类的实例。在HiveConf中有这段代码
Copy // class: org.apache.hadoop.hive.conf.HiveConf
// line: 2935
public static Map< String , String > getConfSystemProperties() {
Map < String , String > systemProperties = new HashMap < String , String >();
for ( ConfVars oneVar : ConfVars . values ()) {
if ( System . getProperty ( oneVar . varname ) != null ) {
if ( System . getProperty ( oneVar . varname ) . length () > 0 ) {
systemProperties . put ( oneVar . varname , System . getProperty ( oneVar . varname ));
}
}
}
return systemProperties;
}
在sqoopgetHiveConf
方法中去实例化HiveConf类时,会读取hive的配置,其中会调用到上述的getConfSystemProperties
方法,该方法会读取系统变量中的参数来覆盖默认的配置。这些可用的参数可以在ConfVars
枚举类中看到。
解决
因此如果想要覆盖默认的hive配置只需要添加系统变量即可,例如
Copy java \
-D 'hive.metastore.uris=thrift://service:90831' \
-cp libs \
org.apache.sqoop.Sqoop args
也可以通过修改sqoop配置文件的方式
获取sqoop导入导出记录数量
问题
如何获取sqoop导入或者导出数据条数,以进行例如上报日志等操作?
探讨
mapreduce有一个计数器的东西,其源码已经实现了对map输入输出的计数。
Copy //class: org.apache.sqoop.config.ConfigurationHelper
//line: 73
/**
* @return the number of mapper output records from a job using its counters.
*/
public static long getNumMapOutputRecords( Job job)
throws IOException , InterruptedException {
return job . getCounters () . findCounter (
ConfigurationConstants . COUNTER_GROUP_MAPRED_TASK_COUNTERS ,
ConfigurationConstants . COUNTER_MAP_OUTPUT_RECORDS ) . getValue ();
}
/**
* @return the number of mapper input records from a job using its counters.
*/
public static long getNumMapInputRecords( Job job)
throws IOException , InterruptedException {
return job . getCounters () . findCounter (
ConfigurationConstants . COUNTER_GROUP_MAPRED_TASK_COUNTERS ,
ConfigurationConstants . COUNTER_MAP_INPUT_RECORDS ) . getValue ();
}
对于导入操作来说,数据传入map并从map按照一条条的数据写出到hdfs中,因此getNumMapOutputRecords
便是数据导入的条数,
对于导出操作来说,传入map的数据是hdfs中的数据,一般情况下便是一条条的数据,因此getNumMapInputRecords
便是数据导出的条数。
而对于一些特殊格式的数据,sqoop重写了计数器的计数逻辑。
Copy //class: org.apache.sqoop.mapreduce.mainframe.MainframeDatasetImportMapper
private long numberOfRecords;
//line: 50
public void map( LongWritable key , SqoopRecord val , Context context)
throws IOException , InterruptedException {
String dataset = inputSplit . getCurrentDataset ();
outkey . set ( val . toString ());
numberOfRecords ++ ;
mos . write (outkey , NullWritable . get () , dataset);
}
//line: 68
@ Override
protected void cleanup( Context context)
throws IOException , InterruptedException {
super . cleanup (context);
mos . close ();
context . getCounter (
ConfigurationConstants . COUNTER_GROUP_MAPRED_TASK_COUNTERS ,
ConfigurationConstants . COUNTER_MAP_OUTPUT_RECORDS )
. increment (numberOfRecords);
}
在map的cleanup方法中进行了计数
解决
因此只需要从计数器获取数据即可
Copy //导出(export)操作条数
long inputRecords = ConfigurationHelper . getNumMapInputRecords (job);
//导入(import)操作条数
long outputRecords = ConfigurationHelper . getNumMapOutputRecords (job);