1、景象
insert into test123 values(2,'aa1');
发现每条 insert 语句都会执行两次 alter_table_with_environmentContext
2023-05-23T09:45:02,453 INFO [daa8d359-22c3-4d7d-90de-613688f0191a main]: ql.Driver (:()) - Starting task [Stage-2:DEPENDENCY_COLLECTION] in serial mode
2023-05-23T09:45:02,453 INFO [daa8d359-22c3-4d7d-90de-613688f0191a main]: ql.Driver (:()) - Starting task [Stage-0:MOVE] in serial mode
2023-05-23T09:45:02,454 INFO [daa8d359-22c3-4d7d-90de-613688f0191a main]: exec.Task (:()) - Loading data to table ccc.test123 from hdfs://master-52600d0:8020/journey/test123/.hive-staging_hive_2023-05-23_09-44-50_674_1941813141939530227-1/-ext-10000
2023-05-23T09:45:02,454 INFO [daa8d359-22c3-4d7d-90de-613688f0191a main]: metastore.XxxCatalogProxy (:()) - xxxx getTable: dbName:ccc, tableName:test123
2023-05-23T09:45:02,477 INFO [daa8d359-22c3-4d7d-90de-613688f0191a main]: metastore.XxxCatalogProxy (:()) - xxxx getTable: dbName:ccc, tableName:test123
2023-05-23T09:45:02,507 INFO [daa8d359-22c3-4d7d-90de-613688f0191a main]: metastore.xxxxCatalogHiveClient (:()) - alter_table_with_environmentContext: dbName: ccc, tblName: test123, newTable: Table(tableName:test123, dbName:ccc, owner:hdfs, createTime:1684805491, lastAccessTime:0, retention:0, sd:StorageDescriptor(cols:[FieldSchema(name:id, type:int, comment:), FieldSchema(name:name, type:string, comment:)], location:hdfs://master-52600d0:8020/journey/test123/, inputFormat:org.apache.hadoop.hive.ql.io.orc.OrcInputFormat, outputFormat:org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat, compressed:false, numBuckets:-1, serdeInfo:SerDeInfo(name:null, serializationLib:org.apache.hadoop.hive.ql.io.orc.OrcSerde, parameters:{serialization.format=1}), bucketCols:[], sortCols:[], parameters:{}, skewedInfo:SkewedInfo(skewedColNames:[], skewedColValues:[], skewedColValueLocationMaps:{}), storedAsSubDirectories:false), partitionKeys:[], parameters:{totalSize=292, EXTERNAL=FALSE, numRows=1, rawDataSize=90, COLUMN_STATS_ACCURATE={"BASIC_STATS":"true"}, numFiles=1, bucketing_version=2}, viewOriginalText:null, viewExpandedText:null, tableType:MANAGED_TABLE, catName:hive, ownerType:USER), EnvironmentContext : EnvironmentContext(properties:{DO_NOT_UPDATE_STATS=true})
2023-05-23T09:45:02,537 INFO [daa8d359-22c3-4d7d-90de-613688f0191a main]: metastore.XxxCatalogProxy (:()) - create or alter table: {"database":"ccc","tableName":"test123","tableType":"MANAGED","locationId":"_ccc_test123","relativePath":"","storage":{"format":"ORC","tableProperties":{"serialization.format":"1"},"rewriteEnabled":false,"dataLakeType":"none"},"columns":[{"name":"id","type":"INT","nullable":true,"secretLevel":"L0","description":""},{"name":"name","type":"STRING","nullable":true,"secretLevel":"L0","description":""}],"partitions":[],"ownerType":"USER","owner":"hdfs","properties":{"totalSize":"292","EXTERNAL":"FALSE","numRows":"1","rawDataSize":"90","COLUMN_STATS_ACCURATE":"{\"BASIC_STATS\":\"true\"}","numFiles":"1","bucketing_version":"2"}}
2023-05-23T09:45:02,593 INFO [daa8d359-22c3-4d7d-90de-613688f0191a main]: exec.MoveTask (:()) - Releasing 2 locks
2023-05-23T09:45:02,616 INFO [daa8d359-22c3-4d7d-90de-613688f0191a main]: ql.Driver (:()) - Starting task [Stage-3:STATS] in serial mode
2023-05-23T09:45:02,616 INFO [daa8d359-22c3-4d7d-90de-613688f0191a main]: stats.BasicStatsTask (:()) - Executing stats task
2023-05-23T09:45:02,617 INFO [daa8d359-22c3-4d7d-90de-613688f0191a main]: fs.FSStatsPublisher (:()) - created : hdfs://master-52600d0:8020/journey/test123/.hive-staging_hive_2023-05-23_09-44-50_674_1941813141939530227-1/-ext-10001
2023-05-23T09:45:02,622 INFO [daa8d359-22c3-4d7d-90de-613688f0191a main]: metastore.XxxCatalogProxy (:()) - ugi current userName: hdfs
2023-05-23T09:45:02,623 INFO [daa8d359-22c3-4d7d-90de-613688f0191a main]: metastore.xxxxCatalogHiveClient (:()) - Connected to xxxx metastore.
2023-05-23T09:45:02,623 INFO [daa8d359-22c3-4d7d-90de-613688f0191a main]: metastore.RetryingMetaStoreClient (:()) - RetryingMetaStoreClient proxy=class com.baidubce.xxxx.catalog.metastore.xxxxCatalogHiveClient ugi=hdfs (auth:SIMPLE) retries=24 delay=5 lifetime=0
2023-05-23T09:45:02,623 INFO [daa8d359-22c3-4d7d-90de-613688f0191a main]: metastore.XxxCatalogProxy (:()) - xxxx getTable: dbName:ccc, tableName:test123
2023-05-23T09:45:02,642 INFO [daa8d359-22c3-4d7d-90de-613688f0191a main]: FileOperations (:()) - Read stats for : ccc.test123/ numRows 1
2023-05-23T09:45:02,642 INFO [daa8d359-22c3-4d7d-90de-613688f0191a main]: FileOperations (:()) - Read stats for : ccc.test123/ rawDataSize 91
2023-05-23T09:45:02,642 INFO [daa8d359-22c3-4d7d-90de-613688f0191a main]: metastore.xxxxCatalogHiveClient (:()) - alter_table_with_environmentContext: dbName: ccc, tblName: test123, newTable: Table(tableName:test123, dbName:ccc, owner:hdfs, createTime:1684805491, lastAccessTime:0, retention:0, sd:StorageDescriptor(cols:[FieldSchema(name:id, type:int, comment:), FieldSchema(name:name, type:string, comment:)], location:hdfs://master-52600d0:8020/journey/test123/, inputFormat:org.apache.hadoop.hive.ql.io.orc.OrcInputFormat, outputFormat:org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat, compressed:false, numBuckets:-1, serdeInfo:SerDeInfo(name:null, serializationLib:org.apache.hadoop.hive.ql.io.orc.OrcSerde, parameters:{serialization.format=1}), bucketCols:[], sortCols:[], parameters:{}, skewedInfo:SkewedInfo(skewedColNames:[], skewedColValues:[], skewedColValueLocationMaps:{}), storedAsSubDirectories:false), partitionKeys:[], parameters:{totalSize=585, EXTERNAL=FALSE, numRows=2, rawDataSize=181, COLUMN_STATS_ACCURATE={"BASIC_STATS":"true"}, numFiles=2, bucketing_version=2}, viewOriginalText:null, viewExpandedText:null, tableType:MANAGED_TABLE, catName:hive, ownerType:USER), EnvironmentContext : EnvironmentContext(properties:{DO_NOT_UPDATE_STATS=true})
2023-05-23T09:45:02,642 INFO [daa8d359-22c3-4d7d-90de-613688f0191a main]: metastore.XxxCatalogProxy (:()) - tblLocation ->hdfs://master-52600d0:8020/journey/test123/
2023-05-23T09:45:02,654 INFO [daa8d359-22c3-4d7d-90de-613688f0191a main]: metastore.XxxCatalogProxy (:()) - dbLocation ->bos://01bmr/
2023-05-23T09:45:02,669 INFO [daa8d359-22c3-4d7d-90de-613688f0191a main]: metastore.XxxCatalogProxy (:()) - create or alter table: {"database":"ccc","tableName":"test123","tableType":"MANAGED","locationId":"_ccc_test123","relativePath":"","storage":{"format":"ORC","tableProperties":{"serialization.format":"1"},"rewriteEnabled":false,"dataLakeType":"none"},"columns":[{"name":"id","type":"INT","nullable":true,"secretLevel":"L0","description":""},{"name":"name","type":"STRING","nullable":true,"secretLevel":"L0","description":""}],"partitions":[],"ownerType":"USER","owner":"hdfs","properties":{"totalSize":"585","EXTERNAL":"FALSE","numRows":"2","rawDataSize":"181","COLUMN_STATS_ACCURATE":"{\"BASIC_STATS\":\"true\"}","numFiles":"2","bucketing_version":"2"}}
2023-05-23T09:45:02,721 INFO [daa8d359-22c3-4d7d-90de-613688f0191a main]: stats.BasicStatsTask (:()) - Table ccc.test123 stats: [numFiles=2, numRows=2, totalSize=585, rawDataSize=181]
2023-05-23T09:45:02,721 INFO [daa8d359-22c3-4d7d-90de-613688f0191a main]: stats.BasicStatsTask (:()) - Table ccc.test123 stats: [numFiles=2, numRows=2, totalSize=585, rawDataSize=181]
2023-05-23T09:45:02,724 INFO [daa8d359-22c3-4d7d-90de-613688f0191a main]: ql.Driver (:()) - Completed executing command(queryId=hdfs_20230523094450_11173651-ff84-44f4-8348-659ab663fa07); Time taken: 11.463 seconds
2023-05-23T09:45:02,724 INFO [daa8d359-22c3-4d7d-90de-613688f0191a main]: ql.Driver (:()) - OK
2023-05-23T09:45:02,969 INFO [daa8d359-22c3-4d7d-90de-613688f0191a main]: CliDriver (:()) - Time taken: 12.25 seconds
2023-05-23T09:45:02,969 INFO [daa8d359-22c3-4d7d-90de-613688f0191a main]: conf.HiveConf (HiveConf.java:getLogIdVar(5050)) - Using the default value passed in for log id: daa8d359-22c3-4d7d-90de-613688f0191a
2023-05-23T09:45:02,969 INFO [daa8d359-22c3-4d7d-90de-613688f0191a main]: session.SessionState (SessionState.java:resetThreadName(453)) - Resetting thread name to main
为什么会有两次 alter_table_with_environmentContext 的打印输出?
原理
第一次 alter table 和第二次 alter table 都是干嘛的?
第一次 alter table 比如说更新 ‘transient_lastDdlTime’=’1684823516’,其实也没有看进去到底更新了啥
第二次 alter table 更新就是统计信息了,能勾销第二次更新么?能够,将 set hive.stats.autogather=false; 会发现
开启统计信息如下 :
> insert into test123 values(2,'aa1');
Query ID = hdfs_20230523094450_11173651-ff84-44f4-8348-659ab663fa07
Total jobs = 1
Launching Job 1 out of 1
Tez session was closed. Reopening...
Session re-established.
Session re-established.
Status: Running (Executing on YARN cluster with App id application_1682255920077_0929)
----------------------------------------------------------------------------------------------
VERTICES MODE STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED
----------------------------------------------------------------------------------------------
Map 1 .......... container SUCCEEDED 1 1 0 0 0 0
----------------------------------------------------------------------------------------------
VERTICES: 01/01 [==========================>>] 100% ELAPSED TIME: 5.21 s
----------------------------------------------------------------------------------------------
Status: DAG finished successfully in 5.21 seconds
Query Execution Summary
----------------------------------------------------------------------------------------------
OPERATION DURATION
----------------------------------------------------------------------------------------------
Compile Query 0.39s
Prepare Plan 0.36s
Get Query Coordinator (AM) 0.00s
Submit Plan 5.66s
Start DAG 0.13s
Run DAG 5.21s
----------------------------------------------------------------------------------------------
Task Execution Summary
----------------------------------------------------------------------------------------------
VERTICES DURATION(ms) CPU_TIME(ms) GC_TIME(ms) INPUT_RECORDS OUTPUT_RECORDS
----------------------------------------------------------------------------------------------
Map 1 2147.00 5,570 83 3 0
----------------------------------------------------------------------------------------------
Loading data to table ccc.test123
Table ccc.test123 stats: [numFiles=2, numRows=2, totalSize=585, rawDataSize=181]
OK
Time taken: 12.25 seconds
敞开统计信息如下 :
hive> set hive.stats.autogather=false;
hive> insert into test123 values(3,'aa1');
Query ID = hdfs_20230523141725_691f0733-eb6a-45c5-858a-c755aca8bd7b
Total jobs = 1
Launching Job 1 out of 1
Tez session was closed. Reopening...
Session re-established.
Session re-established.
Status: Running (Executing on YARN cluster with App id application_1682255920077_0941)
----------------------------------------------------------------------------------------------
VERTICES MODE STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED
----------------------------------------------------------------------------------------------
Map 1 .......... container SUCCEEDED 1 1 0 0 0 0
----------------------------------------------------------------------------------------------
VERTICES: 01/01 [==========================>>] 100% ELAPSED TIME: 5.18 s
----------------------------------------------------------------------------------------------
Status: DAG finished successfully in 5.18 seconds
Query Execution Summary
----------------------------------------------------------------------------------------------
OPERATION DURATION
----------------------------------------------------------------------------------------------
Compile Query 0.44s
Prepare Plan 0.39s
Get Query Coordinator (AM) 0.00s
Submit Plan 7.08s
Start DAG 0.12s
Run DAG 5.18s
----------------------------------------------------------------------------------------------
Task Execution Summary
----------------------------------------------------------------------------------------------
VERTICES DURATION(ms) CPU_TIME(ms) GC_TIME(ms) INPUT_RECORDS OUTPUT_RECORDS
----------------------------------------------------------------------------------------------
Map 1 2093.00 5,530 111 3 0
----------------------------------------------------------------------------------------------
Loading data to table ccc.test123
OK
Time taken: 13.609 seconds
发现区别了吗?区别在于如果敞开了统计信息,Table ccc.test123 stats: [numFiles=2, numRows=2, totalSize=585, rawDataSize=181],就没有打印了
源码剖析
org.apache.hadoop.hive.ql.exec.MoveTask#execute
跟打印日志信息查找代码 :
2023-05-23T09:45:02,454 INFO [daa8d359-22c3-4d7d-90de-613688f0191a main]: exec.Task (:()) - Loading data to table ccc.test123 from hdfs://master-52600d0:8020/journey/test123/.hive-staging_hive_2023-05-23_09-44-50_674_1941813141939530227-1/-ext-10000
又两次 getTable 的调用 :
2023-05-23T09:45:02,454 INFO [daa8d359-22c3-4d7d-90de-613688f0191a main]: metastore.EdapCatalogProxy (:()) - edap getTable: dbName:ccc, tableName:test123
2023-05-23T09:45:02,477 INFO [daa8d359-22c3-4d7d-90de-613688f0191a main]: metastore.EdapCatalogProxy (:()) - edap getTable: dbName:ccc, tableName:test123
@Override
public int execute(DriverContext driverContext) {
.....
LoadTableDesc tbd = work.getLoadTableWork();
if (tbd != null) {
// TODO 刚开始是走这里,这个是下面的打印日志处
logMessage(tbd);
// TODO 第一个 getTable
Table table = db.getTable(tbd.getTable().getTableName());
// TODO 校验文件格局
checkFileFormats(db, tbd, table);
boolean isFullAcidOp = work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID
&& !tbd.isMmTable(); //it seems that LoadTableDesc has Operation.INSERT only for CTAS...
// Create a data container
DataContainer dc = null;
if (tbd.getPartitionSpec().size() == 0) {dc = new DataContainer(table.getTTable());
if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {Utilities.FILE_OP_LOGGER.trace("loadTable called from" + tbd.getSourcePath()
+ "into" + tbd.getTable().getTableName());
}
// TODO 在这里的时候会进行一次 getTable
db.loadTable(tbd.getSourcePath(), tbd.getTable().getTableName(), tbd.getLoadFileType(),
work.isSrcLocal(), isSkewedStoredAsDirs(tbd), isFullAcidOp, hasFollowingStatsTask(),
tbd.getWriteId(), tbd.getStmtId(), tbd.isInsertOverwrite());
if (work.getOutputs() != null) {
DDLTask.addIfAbsentByName(new WriteEntity(table,
getWriteType(tbd, work.getLoadTableWork().getWriteType())), work.getOutputs());
}
} else {LOG.info("Partition is: {}", tbd.getPartitionSpec());
// Check if the bucketing and/or sorting columns were inferred
TaskInformation ti = new TaskInformation(this, tbd.getSourcePath().toUri().toString());
inferTaskInformation(ti);
// deal with dynamic partitions
DynamicPartitionCtx dpCtx = tbd.getDPCtx();
if (dpCtx != null && dpCtx.getNumDPCols() > 0) { // dynamic partitions
dc = handleDynParts(db, table, tbd, ti, dpCtx);
} else { // static partitions
dc = handleStaticParts(db, table, tbd, ti);
}
}
.....
}
public void loadTable(Path loadPath, String tableName, LoadFileType loadFileType, boolean isSrcLocal,
boolean isSkewedStoreAsSubdir, boolean isAcidIUDoperation, boolean hasFollowingStatsTask,
Long writeId, int stmtId, boolean isInsertOverwrite) throws HiveException {List<Path> newFiles = Collections.synchronizedList(new ArrayList<Path>());
// TODO 第二次 getTable 了
Table tbl = getTable(tableName);
assert tbl.getPath() != null : "null==getPath() for" + tbl.getTableName();
boolean isTxnTable = AcidUtils.isTransactionalTable(tbl);
boolean isMmTable = AcidUtils.isInsertOnlyTable(tbl);
boolean isFullAcidTable = AcidUtils.isFullAcidTable(tbl);
// Note: this assumes both paths are qualified; which they are, currently.
if ((isMmTable || isFullAcidTable) && loadPath.equals(tbl.getPath())) {
/**
* some operations on Transactional tables (e.g. Import) write directly to the final location
* and avoid the 'move' operation. Since MoveTask does other things, setting 'loadPath' to be
* the table/partition path indicates that the 'file move' part of MoveTask is not needed.
*/
if (Utilities.FILE_OP_LOGGER.isDebugEnabled()) {
Utilities.FILE_OP_LOGGER.debug("not moving" + loadPath + "to" + tbl.getPath() + "(MM)");
}
newFiles = listFilesCreatedByQuery(loadPath, writeId, stmtId);
} else {
// Either a non-MM query, or a load into MM table from an external source.
Path tblPath = tbl.getPath();
Path destPath = tblPath;
if (isMmTable) {
assert !isAcidIUDoperation;
// We will load into MM directory, and hide previous directories if needed.
destPath = new Path(destPath, isInsertOverwrite
? AcidUtils.baseDir(writeId) : AcidUtils.deltaSubdir(writeId, writeId, stmtId));
}
if (!isAcidIUDoperation && isFullAcidTable) {destPath = fixFullAcidPathForLoadData(loadFileType, destPath, writeId, stmtId, tbl);
}
Utilities.FILE_OP_LOGGER.debug("moving" + loadPath + "to" + tblPath
+ "(replace =" + loadFileType + ")");
boolean isManaged = tbl.getTableType() == TableType.MANAGED_TABLE;
if (loadFileType == LoadFileType.REPLACE_ALL && !isTxnTable) {
//for fullAcid we don't want to delete any files even for OVERWRITE see HIVE-14988/HIVE-17361
boolean isAutopurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge"));
boolean needRecycle = !tbl.isTemporary()
&& ReplChangeManager.isSourceOfReplication(Hive.get().getDatabase(tbl.getDbName()));
replaceFiles(tblPath, loadPath, destPath, tblPath, conf, isSrcLocal, isAutopurge,
newFiles, FileUtils.HIDDEN_FILES_PATH_FILTER, needRecycle, isManaged);
} else {
try {FileSystem fs = tbl.getDataLocation().getFileSystem(conf);
copyFiles(conf, loadPath, destPath, fs, isSrcLocal, isAcidIUDoperation,
loadFileType == LoadFileType.OVERWRITE_EXISTING, newFiles,
tbl.getNumBuckets() > 0, isFullAcidTable, isManaged);
} catch (IOException e) {throw new HiveException("addFiles: filesystem error in check phase", e);
}
}
}
// TODO 这里很重要哈,默认是 hive.stats.autogather=true,所以这默认不走,先不论
if (!this.getConf().getBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER)) {StatsSetupConst.setBasicStatsState(tbl.getParameters(), StatsSetupConst.FALSE);
}
// column stats will be inaccurate
// 清理列的统计信息
if (!hasFollowingStatsTask) {StatsSetupConst.clearColumnStatsState(tbl.getParameters());
}
try {if (isSkewedStoreAsSubdir) {SkewedInfo skewedInfo = tbl.getSkewedInfo();
// Construct list bucketing location mappings from sub-directory name.
Map<List<String>, String> skewedColValueLocationMaps = constructListBucketingLocationMap(tbl.getPath(), skewedInfo);
// Add list bucketing location mappings.
skewedInfo.setSkewedColValueLocationMaps(skewedColValueLocationMaps);
}
} catch (IOException e) {LOG.error(StringUtils.stringifyException(e));
throw new HiveException(e);
}
EnvironmentContext environmentContext = null;
if (hasFollowingStatsTask) {environmentContext = new EnvironmentContext();
environmentContext.putToProperties(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE);
}
// TODO 这里会进行一次 alterTable
alterTable(tbl, environmentContext);
if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary()) {fireInsertEvent(tbl, null, (loadFileType == LoadFileType.REPLACE_ALL), newFiles);
} else {fireInsertEvent(tbl, null, (loadFileType == LoadFileType.REPLACE_ALL), null);
}
}
再找 org.apache.hadoop.hive.ql.stats.BasicStatsTask#aggregateStats
private int aggregateStats(Hive db) {
StatsAggregator statsAggregator = null;
int ret = 0;
StatsCollectionContext scc = null;
EnvironmentContext environmentContext = null;
environmentContext = new EnvironmentContext();
environmentContext.putToProperties(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE);
try {
// Stats setup:
final Warehouse wh = new Warehouse(conf);
if (!getWork().getNoStatsAggregator() && !getWork().isNoScanAnalyzeCommand()) {
try {scc = getContext();
statsAggregator = createStatsAggregator(scc, conf);
} catch (HiveException e) {if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_RELIABLE)) {throw e;}
console.printError(ErrorMsg.STATS_SKIPPING_BY_ERROR.getErrorCodedMsg(e.toString()));
}
}
List<Partition> partitions = getPartitionsList(db);
String tableFullName = table.getDbName() + "." + table.getTableName();
List<Partish> partishes = new ArrayList<>();
if (partitions == null) {
Partish p;
partishes.add(p = new Partish.PTable(table));
BasicStatsProcessor basicStatsProcessor = new BasicStatsProcessor(p, work, conf, followedColStats);
basicStatsProcessor.collectFileStatus(wh, conf);
// TODO 也是外围,它会读取文件的大小
Table res = (Table) basicStatsProcessor.process(statsAggregator);
if (res == null) {return 0;}
// TODO 这里进行第二次 alterTable 更新
db.alterTable(tableFullName, res, environmentContext);
.....
return ret;
}