1、景象

insert into test123 values(2,'aa1'); 发现每条insert语句都会执行两次 alter_table_with_environmentContext
2023-05-23T09:45:02,453 INFO  [daa8d359-22c3-4d7d-90de-613688f0191a main]: ql.Driver (:()) - Starting task [Stage-2:DEPENDENCY_COLLECTION] in serial mode2023-05-23T09:45:02,453 INFO  [daa8d359-22c3-4d7d-90de-613688f0191a main]: ql.Driver (:()) - Starting task [Stage-0:MOVE] in serial mode2023-05-23T09:45:02,454 INFO  [daa8d359-22c3-4d7d-90de-613688f0191a main]: exec.Task (:()) - Loading data to table ccc.test123 from hdfs://master-52600d0:8020/journey/test123/.hive-staging_hive_2023-05-23_09-44-50_674_1941813141939530227-1/-ext-100002023-05-23T09:45:02,454 INFO  [daa8d359-22c3-4d7d-90de-613688f0191a main]: metastore.XxxCatalogProxy (:()) - xxxx getTable: dbName:ccc, tableName:test1232023-05-23T09:45:02,477 INFO  [daa8d359-22c3-4d7d-90de-613688f0191a main]: metastore.XxxCatalogProxy (:()) - xxxx getTable: dbName:ccc, tableName:test1232023-05-23T09:45:02,507 INFO  [daa8d359-22c3-4d7d-90de-613688f0191a main]: metastore.xxxxCatalogHiveClient (:()) - alter_table_with_environmentContext: dbName: ccc, tblName: test123, newTable: Table(tableName:test123, dbName:ccc, owner:hdfs, createTime:1684805491, lastAccessTime:0, retention:0, sd:StorageDescriptor(cols:[FieldSchema(name:id, type:int, comment:), FieldSchema(name:name, type:string, comment:)], location:hdfs://master-52600d0:8020/journey/test123/, inputFormat:org.apache.hadoop.hive.ql.io.orc.OrcInputFormat, outputFormat:org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat, compressed:false, numBuckets:-1, serdeInfo:SerDeInfo(name:null, serializationLib:org.apache.hadoop.hive.ql.io.orc.OrcSerde, parameters:{serialization.format=1}), bucketCols:[], sortCols:[], parameters:{}, skewedInfo:SkewedInfo(skewedColNames:[], skewedColValues:[], skewedColValueLocationMaps:{}), storedAsSubDirectories:false), partitionKeys:[], parameters:{totalSize=292, EXTERNAL=FALSE, numRows=1, rawDataSize=90, COLUMN_STATS_ACCURATE={"BASIC_STATS":"true"}, numFiles=1, bucketing_version=2}, viewOriginalText:null, viewExpandedText:null, tableType:MANAGED_TABLE, catName:hive, ownerType:USER), EnvironmentContext : EnvironmentContext(properties:{DO_NOT_UPDATE_STATS=true})2023-05-23T09:45:02,537 INFO  [daa8d359-22c3-4d7d-90de-613688f0191a main]: metastore.XxxCatalogProxy (:()) - create or alter table: {"database":"ccc","tableName":"test123","tableType":"MANAGED","locationId":"_ccc_test123","relativePath":"","storage":{"format":"ORC","tableProperties":{"serialization.format":"1"},"rewriteEnabled":false,"dataLakeType":"none"},"columns":[{"name":"id","type":"INT","nullable":true,"secretLevel":"L0","description":""},{"name":"name","type":"STRING","nullable":true,"secretLevel":"L0","description":""}],"partitions":[],"ownerType":"USER","owner":"hdfs","properties":{"totalSize":"292","EXTERNAL":"FALSE","numRows":"1","rawDataSize":"90","COLUMN_STATS_ACCURATE":"{\"BASIC_STATS\":\"true\"}","numFiles":"1","bucketing_version":"2"}}2023-05-23T09:45:02,593 INFO  [daa8d359-22c3-4d7d-90de-613688f0191a main]: exec.MoveTask (:()) - Releasing 2 locks2023-05-23T09:45:02,616 INFO  [daa8d359-22c3-4d7d-90de-613688f0191a main]: ql.Driver (:()) - Starting task [Stage-3:STATS] in serial mode2023-05-23T09:45:02,616 INFO  [daa8d359-22c3-4d7d-90de-613688f0191a main]: stats.BasicStatsTask (:()) - Executing stats task2023-05-23T09:45:02,617 INFO  [daa8d359-22c3-4d7d-90de-613688f0191a main]: fs.FSStatsPublisher (:()) - created : hdfs://master-52600d0:8020/journey/test123/.hive-staging_hive_2023-05-23_09-44-50_674_1941813141939530227-1/-ext-100012023-05-23T09:45:02,622 INFO  [daa8d359-22c3-4d7d-90de-613688f0191a main]: metastore.XxxCatalogProxy (:()) - ugi current userName: hdfs2023-05-23T09:45:02,623 INFO  [daa8d359-22c3-4d7d-90de-613688f0191a main]: metastore.xxxxCatalogHiveClient (:()) - Connected to xxxx metastore.2023-05-23T09:45:02,623 INFO  [daa8d359-22c3-4d7d-90de-613688f0191a main]: metastore.RetryingMetaStoreClient (:()) - RetryingMetaStoreClient proxy=class com.baidubce.xxxx.catalog.metastore.xxxxCatalogHiveClient ugi=hdfs (auth:SIMPLE) retries=24 delay=5 lifetime=02023-05-23T09:45:02,623 INFO  [daa8d359-22c3-4d7d-90de-613688f0191a main]: metastore.XxxCatalogProxy (:()) - xxxx getTable: dbName:ccc, tableName:test1232023-05-23T09:45:02,642 INFO  [daa8d359-22c3-4d7d-90de-613688f0191a main]: FileOperations (:()) - Read stats for : ccc.test123/    numRows    12023-05-23T09:45:02,642 INFO  [daa8d359-22c3-4d7d-90de-613688f0191a main]: FileOperations (:()) - Read stats for : ccc.test123/    rawDataSize    912023-05-23T09:45:02,642 INFO  [daa8d359-22c3-4d7d-90de-613688f0191a main]: metastore.xxxxCatalogHiveClient (:()) - alter_table_with_environmentContext: dbName: ccc, tblName: test123, newTable: Table(tableName:test123, dbName:ccc, owner:hdfs, createTime:1684805491, lastAccessTime:0, retention:0, sd:StorageDescriptor(cols:[FieldSchema(name:id, type:int, comment:), FieldSchema(name:name, type:string, comment:)], location:hdfs://master-52600d0:8020/journey/test123/, inputFormat:org.apache.hadoop.hive.ql.io.orc.OrcInputFormat, outputFormat:org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat, compressed:false, numBuckets:-1, serdeInfo:SerDeInfo(name:null, serializationLib:org.apache.hadoop.hive.ql.io.orc.OrcSerde, parameters:{serialization.format=1}), bucketCols:[], sortCols:[], parameters:{}, skewedInfo:SkewedInfo(skewedColNames:[], skewedColValues:[], skewedColValueLocationMaps:{}), storedAsSubDirectories:false), partitionKeys:[], parameters:{totalSize=585, EXTERNAL=FALSE, numRows=2, rawDataSize=181, COLUMN_STATS_ACCURATE={"BASIC_STATS":"true"}, numFiles=2, bucketing_version=2}, viewOriginalText:null, viewExpandedText:null, tableType:MANAGED_TABLE, catName:hive, ownerType:USER), EnvironmentContext : EnvironmentContext(properties:{DO_NOT_UPDATE_STATS=true})2023-05-23T09:45:02,642 INFO  [daa8d359-22c3-4d7d-90de-613688f0191a main]: metastore.XxxCatalogProxy (:()) - tblLocation ->hdfs://master-52600d0:8020/journey/test123/2023-05-23T09:45:02,654 INFO  [daa8d359-22c3-4d7d-90de-613688f0191a main]: metastore.XxxCatalogProxy (:()) - dbLocation ->bos://01bmr/2023-05-23T09:45:02,669 INFO  [daa8d359-22c3-4d7d-90de-613688f0191a main]: metastore.XxxCatalogProxy (:()) - create or alter table: {"database":"ccc","tableName":"test123","tableType":"MANAGED","locationId":"_ccc_test123","relativePath":"","storage":{"format":"ORC","tableProperties":{"serialization.format":"1"},"rewriteEnabled":false,"dataLakeType":"none"},"columns":[{"name":"id","type":"INT","nullable":true,"secretLevel":"L0","description":""},{"name":"name","type":"STRING","nullable":true,"secretLevel":"L0","description":""}],"partitions":[],"ownerType":"USER","owner":"hdfs","properties":{"totalSize":"585","EXTERNAL":"FALSE","numRows":"2","rawDataSize":"181","COLUMN_STATS_ACCURATE":"{\"BASIC_STATS\":\"true\"}","numFiles":"2","bucketing_version":"2"}}2023-05-23T09:45:02,721 INFO  [daa8d359-22c3-4d7d-90de-613688f0191a main]: stats.BasicStatsTask (:()) - Table ccc.test123 stats: [numFiles=2, numRows=2, totalSize=585, rawDataSize=181]2023-05-23T09:45:02,721 INFO  [daa8d359-22c3-4d7d-90de-613688f0191a main]: stats.BasicStatsTask (:()) - Table ccc.test123 stats: [numFiles=2, numRows=2, totalSize=585, rawDataSize=181]2023-05-23T09:45:02,724 INFO  [daa8d359-22c3-4d7d-90de-613688f0191a main]: ql.Driver (:()) - Completed executing command(queryId=hdfs_20230523094450_11173651-ff84-44f4-8348-659ab663fa07); Time taken: 11.463 seconds2023-05-23T09:45:02,724 INFO  [daa8d359-22c3-4d7d-90de-613688f0191a main]: ql.Driver (:()) - OK2023-05-23T09:45:02,969 INFO  [daa8d359-22c3-4d7d-90de-613688f0191a main]: CliDriver (:()) - Time taken: 12.25 seconds2023-05-23T09:45:02,969 INFO  [daa8d359-22c3-4d7d-90de-613688f0191a main]: conf.HiveConf (HiveConf.java:getLogIdVar(5050)) - Using the default value passed in for log id: daa8d359-22c3-4d7d-90de-613688f0191a2023-05-23T09:45:02,969 INFO  [daa8d359-22c3-4d7d-90de-613688f0191a main]: session.SessionState (SessionState.java:resetThreadName(453)) - Resetting thread name to  main

为什么会有两次alter_table_with_environmentContext的打印输出?

原理

第一次alter table和第二次alter table都是干嘛的?

第一次alter table比如说更新 'transient_lastDdlTime'='1684823516',其实也没有看进去到底更新了啥
第二次alter table更新就是统计信息了,能勾销第二次更新么?能够,将 set hive.stats.autogather=false;会发现

开启统计信息如下 :

> insert into test123 values(2,'aa1');Query ID = hdfs_20230523094450_11173651-ff84-44f4-8348-659ab663fa07Total jobs = 1Launching Job 1 out of 1Tez session was closed. Reopening...Session re-established.Session re-established.Status: Running (Executing on YARN cluster with App id application_1682255920077_0929)----------------------------------------------------------------------------------------------        VERTICES      MODE        STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED----------------------------------------------------------------------------------------------Map 1 .......... container     SUCCEEDED      1          1        0        0       0       0----------------------------------------------------------------------------------------------VERTICES: 01/01  [==========================>>] 100%  ELAPSED TIME: 5.21 s----------------------------------------------------------------------------------------------Status: DAG finished successfully in 5.21 secondsQuery Execution Summary----------------------------------------------------------------------------------------------OPERATION                            DURATION----------------------------------------------------------------------------------------------Compile Query                           0.39sPrepare Plan                            0.36sGet Query Coordinator (AM)              0.00sSubmit Plan                             5.66sStart DAG                               0.13sRun DAG                                 5.21s----------------------------------------------------------------------------------------------Task Execution Summary----------------------------------------------------------------------------------------------  VERTICES      DURATION(ms)   CPU_TIME(ms)    GC_TIME(ms)   INPUT_RECORDS   OUTPUT_RECORDS----------------------------------------------------------------------------------------------     Map 1           2147.00          5,570             83               3                0----------------------------------------------------------------------------------------------Loading data to table ccc.test123Table ccc.test123 stats: [numFiles=2, numRows=2, totalSize=585, rawDataSize=181]OKTime taken: 12.25 seconds

敞开统计信息如下 :

hive> set hive.stats.autogather=false;hive> insert into test123 values(3,'aa1');Query ID = hdfs_20230523141725_691f0733-eb6a-45c5-858a-c755aca8bd7bTotal jobs = 1Launching Job 1 out of 1Tez session was closed. Reopening...Session re-established.Session re-established.Status: Running (Executing on YARN cluster with App id application_1682255920077_0941)----------------------------------------------------------------------------------------------        VERTICES      MODE        STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED----------------------------------------------------------------------------------------------Map 1 .......... container     SUCCEEDED      1          1        0        0       0       0----------------------------------------------------------------------------------------------VERTICES: 01/01  [==========================>>] 100%  ELAPSED TIME: 5.18 s----------------------------------------------------------------------------------------------Status: DAG finished successfully in 5.18 secondsQuery Execution Summary----------------------------------------------------------------------------------------------OPERATION                            DURATION----------------------------------------------------------------------------------------------Compile Query                           0.44sPrepare Plan                            0.39sGet Query Coordinator (AM)              0.00sSubmit Plan                             7.08sStart DAG                               0.12sRun DAG                                 5.18s----------------------------------------------------------------------------------------------Task Execution Summary----------------------------------------------------------------------------------------------  VERTICES      DURATION(ms)   CPU_TIME(ms)    GC_TIME(ms)   INPUT_RECORDS   OUTPUT_RECORDS----------------------------------------------------------------------------------------------     Map 1           2093.00          5,530            111               3                0----------------------------------------------------------------------------------------------Loading data to table ccc.test123OKTime taken: 13.609 seconds

发现区别了吗?区别在于如果敞开了统计信息,Table ccc.test123 stats: [numFiles=2, numRows=2, totalSize=585, rawDataSize=181],就没有打印了

源码剖析

org.apache.hadoop.hive.ql.exec.MoveTask#execute
跟打印日志信息查找代码 :

2023-05-23T09:45:02,454 INFO  [daa8d359-22c3-4d7d-90de-613688f0191a main]: exec.Task (:()) - Loading data to table ccc.test123 from hdfs://master-52600d0:8020/journey/test123/.hive-staging_hive_2023-05-23_09-44-50_674_1941813141939530227-1/-ext-10000

又两次getTable的调用 :

2023-05-23T09:45:02,454 INFO  [daa8d359-22c3-4d7d-90de-613688f0191a main]: metastore.EdapCatalogProxy (:()) - edap getTable: dbName:ccc, tableName:test1232023-05-23T09:45:02,477 INFO  [daa8d359-22c3-4d7d-90de-613688f0191a main]: metastore.EdapCatalogProxy (:()) - edap getTable: dbName:ccc, tableName:test123
@Override  public int execute(DriverContext driverContext) {     .....      LoadTableDesc tbd = work.getLoadTableWork();      if (tbd != null) {        // TODO 刚开始是走这里,这个是下面的打印日志处        logMessage(tbd);        // TODO 第一个getTable        Table table = db.getTable(tbd.getTable().getTableName());        // TODO 校验文件格局        checkFileFormats(db, tbd, table);        boolean isFullAcidOp = work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID            && !tbd.isMmTable(); //it seems that LoadTableDesc has Operation.INSERT only for CTAS...        // Create a data container        DataContainer dc = null;        if (tbd.getPartitionSpec().size() == 0) {          dc = new DataContainer(table.getTTable());          if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {            Utilities.FILE_OP_LOGGER.trace("loadTable called from " + tbd.getSourcePath()              + " into " + tbd.getTable().getTableName());          }          // TODO 在这里的时候会进行一次getTable          db.loadTable(tbd.getSourcePath(), tbd.getTable().getTableName(), tbd.getLoadFileType(),              work.isSrcLocal(), isSkewedStoredAsDirs(tbd), isFullAcidOp, hasFollowingStatsTask(),              tbd.getWriteId(), tbd.getStmtId(), tbd.isInsertOverwrite());          if (work.getOutputs() != null) {            DDLTask.addIfAbsentByName(new WriteEntity(table,              getWriteType(tbd, work.getLoadTableWork().getWriteType())), work.getOutputs());          }        } else {          LOG.info("Partition is: {}", tbd.getPartitionSpec());          // Check if the bucketing and/or sorting columns were inferred          TaskInformation ti = new TaskInformation(this, tbd.getSourcePath().toUri().toString());          inferTaskInformation(ti);          // deal with dynamic partitions          DynamicPartitionCtx dpCtx = tbd.getDPCtx();          if (dpCtx != null && dpCtx.getNumDPCols() > 0) { // dynamic partitions            dc = handleDynParts(db, table, tbd, ti, dpCtx);          } else { // static partitions            dc = handleStaticParts(db, table, tbd, ti);          }        }        .....  }
public void loadTable(Path loadPath, String tableName, LoadFileType loadFileType, boolean isSrcLocal,      boolean isSkewedStoreAsSubdir, boolean isAcidIUDoperation, boolean hasFollowingStatsTask,      Long writeId, int stmtId, boolean isInsertOverwrite) throws HiveException {    List<Path> newFiles = Collections.synchronizedList(new ArrayList<Path>());    // TODO 第二次getTable了    Table tbl = getTable(tableName);    assert tbl.getPath() != null : "null==getPath() for " + tbl.getTableName();    boolean isTxnTable = AcidUtils.isTransactionalTable(tbl);    boolean isMmTable = AcidUtils.isInsertOnlyTable(tbl);    boolean isFullAcidTable = AcidUtils.isFullAcidTable(tbl);    // Note: this assumes both paths are qualified; which they are, currently.    if ((isMmTable || isFullAcidTable) && loadPath.equals(tbl.getPath())) {      /**       * some operations on Transactional tables (e.g. Import) write directly to the final location       * and avoid the 'move' operation.  Since MoveTask does other things, setting 'loadPath' to be       * the table/partition path indicates that the 'file move' part of MoveTask is not needed.       */      if (Utilities.FILE_OP_LOGGER.isDebugEnabled()) {        Utilities.FILE_OP_LOGGER.debug(            "not moving " + loadPath + " to " + tbl.getPath() + " (MM)");      }      newFiles = listFilesCreatedByQuery(loadPath, writeId, stmtId);    } else {      // Either a non-MM query, or a load into MM table from an external source.      Path tblPath = tbl.getPath();      Path destPath = tblPath;      if (isMmTable) {        assert !isAcidIUDoperation;        // We will load into MM directory, and hide previous directories if needed.        destPath = new Path(destPath, isInsertOverwrite            ? AcidUtils.baseDir(writeId) : AcidUtils.deltaSubdir(writeId, writeId, stmtId));      }      if (!isAcidIUDoperation && isFullAcidTable) {        destPath = fixFullAcidPathForLoadData(loadFileType, destPath, writeId, stmtId, tbl);      }      Utilities.FILE_OP_LOGGER.debug("moving " + loadPath + " to " + tblPath          + " (replace = " + loadFileType + ")");      boolean isManaged = tbl.getTableType() == TableType.MANAGED_TABLE;      if (loadFileType == LoadFileType.REPLACE_ALL && !isTxnTable) {        //for fullAcid we don't want to delete any files even for OVERWRITE see HIVE-14988/HIVE-17361        boolean isAutopurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge"));        boolean needRecycle = !tbl.isTemporary()                && ReplChangeManager.isSourceOfReplication(Hive.get().getDatabase(tbl.getDbName()));        replaceFiles(tblPath, loadPath, destPath, tblPath, conf, isSrcLocal, isAutopurge,            newFiles, FileUtils.HIDDEN_FILES_PATH_FILTER, needRecycle, isManaged);      } else {        try {          FileSystem fs = tbl.getDataLocation().getFileSystem(conf);          copyFiles(conf, loadPath, destPath, fs, isSrcLocal, isAcidIUDoperation,              loadFileType == LoadFileType.OVERWRITE_EXISTING, newFiles,              tbl.getNumBuckets() > 0, isFullAcidTable, isManaged);        } catch (IOException e) {          throw new HiveException("addFiles: filesystem error in check phase", e);        }      }    }    // TODO 这里很重要哈,默认是 hive.stats.autogather=true,所以这默认不走,先不论    if (!this.getConf().getBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER)) {      StatsSetupConst.setBasicStatsState(tbl.getParameters(), StatsSetupConst.FALSE);    }    // column stats will be inaccurate    // 清理列的统计信息    if (!hasFollowingStatsTask) {      StatsSetupConst.clearColumnStatsState(tbl.getParameters());    }    try {      if (isSkewedStoreAsSubdir) {        SkewedInfo skewedInfo = tbl.getSkewedInfo();        // Construct list bucketing location mappings from sub-directory name.        Map<List<String>, String> skewedColValueLocationMaps = constructListBucketingLocationMap(            tbl.getPath(), skewedInfo);        // Add list bucketing location mappings.        skewedInfo.setSkewedColValueLocationMaps(skewedColValueLocationMaps);      }    } catch (IOException e) {      LOG.error(StringUtils.stringifyException(e));      throw new HiveException(e);    }    EnvironmentContext environmentContext = null;    if (hasFollowingStatsTask) {      environmentContext = new EnvironmentContext();      environmentContext.putToProperties(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE);    }    // TODO 这里会进行一次alterTable    alterTable(tbl, environmentContext);    if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary()) {      fireInsertEvent(tbl, null, (loadFileType == LoadFileType.REPLACE_ALL), newFiles);    } else {      fireInsertEvent(tbl, null, (loadFileType == LoadFileType.REPLACE_ALL), null);    }  }

再找 org.apache.hadoop.hive.ql.stats.BasicStatsTask#aggregateStats

private int aggregateStats(Hive db) {    StatsAggregator statsAggregator = null;    int ret = 0;    StatsCollectionContext scc = null;    EnvironmentContext environmentContext = null;    environmentContext = new EnvironmentContext();    environmentContext.putToProperties(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE);    try {      // Stats setup:      final Warehouse wh = new Warehouse(conf);      if (!getWork().getNoStatsAggregator() && !getWork().isNoScanAnalyzeCommand()) {        try {          scc = getContext();          statsAggregator = createStatsAggregator(scc, conf);        } catch (HiveException e) {          if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_RELIABLE)) {            throw e;          }          console.printError(ErrorMsg.STATS_SKIPPING_BY_ERROR.getErrorCodedMsg(e.toString()));        }      }      List<Partition> partitions = getPartitionsList(db);      String tableFullName = table.getDbName() + "." + table.getTableName();      List<Partish> partishes = new ArrayList<>();      if (partitions == null) {        Partish p;        partishes.add(p = new Partish.PTable(table));        BasicStatsProcessor basicStatsProcessor = new BasicStatsProcessor(p, work, conf, followedColStats);        basicStatsProcessor.collectFileStatus(wh, conf);        // TODO 也是外围,它会读取文件的大小        Table res = (Table) basicStatsProcessor.process(statsAggregator);        if (res == null) {          return 0;        }        // TODO 这里进行第二次alterTable更新        db.alterTable(tableFullName, res, environmentContext);        .....    return ret;  }