From eba25fc7704458452460ed72dd686e13e72d1775 Mon Sep 17 00:00:00 2001 From: Haochen Ding Date: Sat, 27 Mar 2021 19:05:01 -0400 Subject: [PATCH] Move finished or failed queries to finished query states and not fetching these query states when submitting new queries --- .../prestosql/dispatcher/DispatchManager.java | 20 ++- .../io/prestosql/execution/QueryTracker.java | 7 +- .../statestore/LocalStateStoreProvider.java | 1 + .../io/prestosql/statestore/StateFetcher.java | 60 ++++++- .../statestore/StateStoreConstants.java | 5 + .../io/prestosql/statestore/StateUpdater.java | 148 ++++++++---------- .../utils/DistributedResourceGroupUtils.java | 2 +- 7 files changed, 142 insertions(+), 101 deletions(-) diff --git a/presto-main/src/main/java/io/prestosql/dispatcher/DispatchManager.java b/presto-main/src/main/java/io/prestosql/dispatcher/DispatchManager.java index 35dc4ed55..54ef59902 100644 --- a/presto-main/src/main/java/io/prestosql/dispatcher/DispatchManager.java +++ b/presto-main/src/main/java/io/prestosql/dispatcher/DispatchManager.java @@ -39,6 +39,7 @@ import io.prestosql.spi.QueryId; import io.prestosql.spi.resourcegroups.SelectionContext; import io.prestosql.spi.resourcegroups.SelectionCriteria; import io.prestosql.spi.service.PropertyService; +import io.prestosql.spi.statestore.StateStore; import io.prestosql.statestore.SharedQueryState; import io.prestosql.statestore.StateCacheStore; import io.prestosql.statestore.StateFetcher; @@ -64,6 +65,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.stream.Collectors; +import java.util.stream.Stream; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.ImmutableList.toImmutableList; @@ -310,7 +312,11 @@ public class DispatchManager List queryInfos; if (isMultiCoordinatorEnabled() && StateCacheStore.get().getCachedStates(StateStoreConstants.QUERY_STATE_COLLECTION_NAME) != null) { Map queryStates = StateCacheStore.get().getCachedStates(StateStoreConstants.QUERY_STATE_COLLECTION_NAME); - queryInfos = queryStates.values().stream() + Map finishedQueryStates = StateCacheStore.get().getCachedStates(StateStoreConstants.FINISHED_QUERY_STATE_COLLECTION_NAME); + + queryInfos = Stream.concat( + queryStates.values().stream(), + finishedQueryStates.values().stream()) .map(SharedQueryState::getBasicQueryInfo) .collect(Collectors.toList()); } @@ -395,6 +401,7 @@ public class DispatchManager // Start state fetcher stateFetcher.registerStateCollection(StateStoreConstants.QUERY_STATE_COLLECTION_NAME); + stateFetcher.registerStateCollection(StateStoreConstants.FINISHED_QUERY_STATE_COLLECTION_NAME); stateFetcher.registerStateCollection(StateStoreConstants.OOM_QUERY_STATE_COLLECTION_NAME); stateFetcher.registerStateCollection(StateStoreConstants.CPU_USAGE_STATE_COLLECTION_NAME); stateFetcher.start(); @@ -416,12 +423,13 @@ public class DispatchManager private synchronized void submitQuerySync(DispatchQuery dispatchQuery, SelectionContext selectionContext) throws InterruptedException, PrestoException { - if (stateStoreProvider.getStateStore() == null) { + StateStore stateStore = stateStoreProvider.getStateStore(); + if (stateStore == null) { LOG.error("StateStore is not loaded yet"); throw new PrestoException(GENERIC_INTERNAL_ERROR, "Coordinator is not ready to accept queries"); } - Lock lock = stateStoreProvider.getStateStore().getLock(StateStoreConstants.SUBMIT_QUERY_LOCK_NAME); + Lock lock = stateStore.getLock(StateStoreConstants.SUBMIT_QUERY_LOCK_NAME); // Make sure query submission is synchronized boolean locked = lock.tryLock(hetuConfig.getQuerySubmitTimeout().toMillis(), TimeUnit.MILLISECONDS); long start = 0L; @@ -432,7 +440,7 @@ public class DispatchManager dispatchQuery.getQueryId(), start, new SimpleDateFormat("HH:mm:ss:SSS").format(new Date(start))); - stateFetcher.fetchStates(); + stateFetcher.fetchQueryStates(stateStore); resourceGroupManager.submit(dispatchQuery, selectionContext, queryExecutor); // Register dispatch query to StateUpdater if (PropertyService.getBooleanProperty(HetuConstant.MULTI_COORDINATOR_ENABLED) && stateUpdater != null) { @@ -441,7 +449,7 @@ public class DispatchManager stateUpdater.updateStates(); } catch (IOException e) { - throw new PrestoException(GENERIC_INTERNAL_ERROR, format("Failed to fetch states from or update states to state store: " + e.getMessage())); + throw new PrestoException(GENERIC_INTERNAL_ERROR, "Failed to fetch states from or update states to state store: " + e.getMessage()); } catch (Throwable e) { // dispatch query has already been registered, so just fail it directly @@ -459,7 +467,7 @@ public class DispatchManager } else { // TODO maybe just queue the query if the queue size is not a problem - throw new PrestoException(GENERIC_INTERNAL_ERROR, format("Coordinator probably too busy at the moment, please try again in a few minutes")); + throw new PrestoException(GENERIC_INTERNAL_ERROR, "Coordinator probably too busy at the moment, please try again in a few minutes"); } } } diff --git a/presto-main/src/main/java/io/prestosql/execution/QueryTracker.java b/presto-main/src/main/java/io/prestosql/execution/QueryTracker.java index fb6b79e5d..d9339e60f 100644 --- a/presto-main/src/main/java/io/prestosql/execution/QueryTracker.java +++ b/presto-main/src/main/java/io/prestosql/execution/QueryTracker.java @@ -22,6 +22,7 @@ import io.prestosql.spi.PrestoException; import io.prestosql.spi.QueryId; import io.prestosql.spi.statestore.StateCollection; import io.prestosql.spi.statestore.StateMap; +import io.prestosql.spi.statestore.StateStore; import io.prestosql.statestore.StateStoreConstants; import io.prestosql.statestore.StateStoreProvider; import org.joda.time.DateTime; @@ -254,7 +255,11 @@ public class QueryTracker private void removeQueryInStateStore(QueryId queryId) { - StateCollection stateCollection = stateStoreProvider.getStateStore().getStateCollection(StateStoreConstants.QUERY_STATE_COLLECTION_NAME); + StateStore stateStore = stateStoreProvider.getStateStore(); + if (stateStore == null) { + return; + } + StateCollection stateCollection = stateStore.getStateCollection(StateStoreConstants.FINISHED_QUERY_STATE_COLLECTION_NAME); if (stateCollection != null && stateCollection.getType().equals(StateCollection.Type.MAP)) { ((StateMap) stateCollection).remove(queryId.getId()); } diff --git a/presto-main/src/main/java/io/prestosql/statestore/LocalStateStoreProvider.java b/presto-main/src/main/java/io/prestosql/statestore/LocalStateStoreProvider.java index 76114fed4..588e6f440 100644 --- a/presto-main/src/main/java/io/prestosql/statestore/LocalStateStoreProvider.java +++ b/presto-main/src/main/java/io/prestosql/statestore/LocalStateStoreProvider.java @@ -120,6 +120,7 @@ public class LocalStateStoreProvider // Create essential state collections stateStore.createStateCollection(StateStoreConstants.DISCOVERY_SERVICE_COLLECTION_NAME, StateCollection.Type.MAP); stateStore.createStateCollection(StateStoreConstants.QUERY_STATE_COLLECTION_NAME, StateCollection.Type.MAP); + stateStore.createStateCollection(StateStoreConstants.FINISHED_QUERY_STATE_COLLECTION_NAME, StateCollection.Type.MAP); stateStore.createStateCollection(StateStoreConstants.OOM_QUERY_STATE_COLLECTION_NAME, StateCollection.Type.MAP); stateStore.createStateCollection(StateStoreConstants.CPU_USAGE_STATE_COLLECTION_NAME, StateCollection.Type.MAP); stateStore.createStateCollection(StateStoreConstants.TRANSACTION_STATE_COLLECTION_NAME, StateCollection.Type.MAP); diff --git a/presto-main/src/main/java/io/prestosql/statestore/StateFetcher.java b/presto-main/src/main/java/io/prestosql/statestore/StateFetcher.java index 5dca3535f..c75444295 100644 --- a/presto-main/src/main/java/io/prestosql/statestore/StateFetcher.java +++ b/presto-main/src/main/java/io/prestosql/statestore/StateFetcher.java @@ -24,6 +24,7 @@ import io.prestosql.server.BasicQueryInfo; import io.prestosql.spi.ErrorType; import io.prestosql.spi.statestore.StateCollection; import io.prestosql.spi.statestore.StateMap; +import io.prestosql.spi.statestore.StateStore; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; @@ -43,6 +44,12 @@ import java.util.concurrent.locks.Lock; import static com.google.common.base.Preconditions.checkState; import static io.airlift.concurrent.Threads.threadsNamed; import static io.prestosql.spi.StandardErrorCode.SERVER_SHUTTING_DOWN; +import static io.prestosql.statestore.StateStoreConstants.CPU_USAGE_STATE_COLLECTION_NAME; +import static io.prestosql.statestore.StateStoreConstants.DEFAULT_ACQUIRED_LOCK_TIME_MS; +import static io.prestosql.statestore.StateStoreConstants.FINISHED_QUERY_STATE_COLLECTION_NAME; +import static io.prestosql.statestore.StateStoreConstants.HANDLE_EXPIRED_QUERY_LOCK_NAME; +import static io.prestosql.statestore.StateStoreConstants.OOM_QUERY_STATE_COLLECTION_NAME; +import static io.prestosql.statestore.StateStoreConstants.QUERY_STATE_COLLECTION_NAME; import static io.prestosql.utils.StateUtils.removeState; /** @@ -147,7 +154,7 @@ public class StateFetcher if (stateCollection == null) { continue; } - if (stateCollectionName.equals(StateStoreConstants.CPU_USAGE_STATE_COLLECTION_NAME)) { + if (stateCollectionName.equals(CPU_USAGE_STATE_COLLECTION_NAME)) { StateCacheStore.get().setCachedStates(stateCollectionName, ((StateMap) stateCollection).getAll()); continue; } @@ -169,6 +176,46 @@ public class StateFetcher LOG.warn("Unsupported state collection type: %s", stateCollection.getType()); } } + long end = System.currentTimeMillis(); + LOG.debug("fetchStates ends at current time milliseconds: %s, at format HH:mm:ss:SSS:%s, total time use: %s", + end, + new SimpleDateFormat("HH:mm:ss:SSS").format(new Date(end)), + end - start); + } + } + + /** + * Fetch state from state store to cache store + * + * @throws IOException exception when failed to deserialize states + */ + public void fetchQueryStates(StateStore stateStore) + throws IOException + { + synchronized (this) { + long start = System.currentTimeMillis(); + LOG.debug("fetchStates starts at current time milliseconds: %s, at format HH:mm:ss:SSS:%s", + start, + new SimpleDateFormat("HH:mm:ss:SSS").format(new Date(start))); + + DateTime currentTime = new DateTime(DateTimeZone.UTC); + StateCollection cpuUsageCollection = stateStore.getStateCollection(CPU_USAGE_STATE_COLLECTION_NAME); + StateCollection queryStateCollection = stateStore.getStateCollection(QUERY_STATE_COLLECTION_NAME); + + StateCacheStore.get().setCachedStates(CPU_USAGE_STATE_COLLECTION_NAME, ((StateMap) cpuUsageCollection).getAll()); + + Map states = ((StateMap) queryStateCollection).getAll(); + + ImmutableMap.Builder queryStatesBuilder = ImmutableMap.builder(); + for (Map.Entry entry : states.entrySet()) { + SharedQueryState state = MAPPER.readerFor(SharedQueryState.class).readValue(entry.getValue()); + if (isStateExpired(state, currentTime)) { + handleExpiredQueryState(state); + } + queryStatesBuilder.put(entry.getKey(), state); + } + StateCacheStore.get().setCachedStates(QUERY_STATE_COLLECTION_NAME, queryStatesBuilder.build()); + long end = System.currentTimeMillis(); LOG.debug("updateStates ends at current time milliseconds: %s, at format HH:mm:ss:SSS:%s, total time use: %s", end, @@ -197,15 +244,16 @@ public class StateFetcher private void handleExpiredQueryState(SharedQueryState state) { // State store hasn't been loaded yet - if (stateStoreProvider.getStateStore() == null) { + final StateStore stateStore = stateStoreProvider.getStateStore(); + if (stateStore == null) { return; } Lock lock = null; boolean locked = false; try { - lock = stateStoreProvider.getStateStore().getLock(StateStoreConstants.HANDLE_EXPIRED_QUERY_LOCK_NAME); - locked = lock.tryLock(StateStoreConstants.DEFAULT_ACQUIRED_LOCK_TIME_MS, TimeUnit.MILLISECONDS); + lock = stateStore.getLock(HANDLE_EXPIRED_QUERY_LOCK_NAME); + locked = lock.tryLock(DEFAULT_ACQUIRED_LOCK_TIME_MS, TimeUnit.MILLISECONDS); if (locked) { LOG.debug(String.format("EXPIRED!!! REMOVING... Id: %s, state: %s, uri: %s, query: %s", state.getBasicQueryInfo().getQueryId().getId(), @@ -214,11 +262,11 @@ public class StateFetcher state.getBasicQueryInfo().getQuery())); // remove expired query from oom - StateCollection stateCollection = stateStoreProvider.getStateStore().getStateCollection(StateStoreConstants.OOM_QUERY_STATE_COLLECTION_NAME); + StateCollection stateCollection = stateStore.getStateCollection(OOM_QUERY_STATE_COLLECTION_NAME); removeState(stateCollection, Optional.of(state.getBasicQueryInfo().getQueryId()), LOG); // update query to failed in stateCollection if exists - stateCollection = stateStoreProvider.getStateStore().getStateCollection(StateStoreConstants.QUERY_STATE_COLLECTION_NAME); + stateCollection = stateStore.getStateCollection(FINISHED_QUERY_STATE_COLLECTION_NAME); if (stateCollection != null && stateCollection.getType().equals(StateCollection.Type.MAP)) { Map queryStateMap = ((StateMap) stateCollection).getAll(); if (queryStateMap.get(state.getBasicQueryInfo().getQueryId().getId()) != null) { diff --git a/presto-main/src/main/java/io/prestosql/statestore/StateStoreConstants.java b/presto-main/src/main/java/io/prestosql/statestore/StateStoreConstants.java index e5aedc777..b420e57bb 100644 --- a/presto-main/src/main/java/io/prestosql/statestore/StateStoreConstants.java +++ b/presto-main/src/main/java/io/prestosql/statestore/StateStoreConstants.java @@ -50,6 +50,11 @@ public class StateStoreConstants */ public static final String QUERY_STATE_COLLECTION_NAME = "query"; + /** + * Finished query state collection name + */ + public static final String FINISHED_QUERY_STATE_COLLECTION_NAME = "finished-query"; + /** * OOM Query state collection name */ diff --git a/presto-main/src/main/java/io/prestosql/statestore/StateUpdater.java b/presto-main/src/main/java/io/prestosql/statestore/StateUpdater.java index ce6eb5ac4..8a1991ad0 100644 --- a/presto-main/src/main/java/io/prestosql/statestore/StateUpdater.java +++ b/presto-main/src/main/java/io/prestosql/statestore/StateUpdater.java @@ -18,22 +18,22 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Multimap; +import com.google.common.collect.Multimaps; import io.airlift.json.ObjectMapperProvider; import io.airlift.log.Logger; import io.airlift.units.Duration; import io.prestosql.dispatcher.DispatchQuery; import io.prestosql.execution.ManagedQueryExecution; import io.prestosql.execution.QueryState; -import io.prestosql.spi.QueryId; import io.prestosql.spi.statestore.StateCollection; import io.prestosql.spi.statestore.StateMap; +import io.prestosql.spi.statestore.StateStore; import java.text.SimpleDateFormat; import java.util.Date; -import java.util.HashSet; -import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; import java.util.Optional; -import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -43,6 +43,9 @@ import static com.google.common.base.Preconditions.checkState; import static io.airlift.concurrent.Threads.threadsNamed; import static io.prestosql.spi.StandardErrorCode.CLUSTER_OUT_OF_MEMORY; import static io.prestosql.spi.StandardErrorCode.EXCEEDED_GLOBAL_MEMORY_LIMIT; +import static io.prestosql.statestore.StateStoreConstants.FINISHED_QUERY_STATE_COLLECTION_NAME; +import static io.prestosql.statestore.StateStoreConstants.OOM_QUERY_STATE_COLLECTION_NAME; +import static io.prestosql.statestore.StateStoreConstants.QUERY_STATE_COLLECTION_NAME; import static io.prestosql.utils.StateUtils.removeState; /** @@ -56,7 +59,7 @@ public class StateUpdater private final StateStoreProvider stateStoreProvider; private final Duration updateInterval; - private final Multimap states = ArrayListMultimap.create(); + private final Multimap registeredQueries = Multimaps.synchronizedMultimap(ArrayListMultimap.create()); private final ScheduledExecutorService stateUpdateExecutor; private ScheduledFuture backgroundTask; @@ -94,7 +97,7 @@ public class StateUpdater synchronized (this) { if (backgroundTask != null) { backgroundTask.cancel(true); - states.clear(); + registeredQueries.clear(); } } } @@ -108,14 +111,14 @@ public class StateUpdater */ public void registerQuery(String stateCollectionName, DispatchQuery query) { - synchronized (states) { - states.put(stateCollectionName, query); - query.addStateChangeListener(state -> { - if (state.isDone()) { - queryFinished(query); - } - }); + synchronized (registeredQueries) { + registeredQueries.put(stateCollectionName, query); } + query.addStateChangeListener(state -> { + if (state.isDone()) { + queryFinished(query); + } + }); } /** @@ -126,9 +129,7 @@ public class StateUpdater */ public void unregisterQuery(String stateCollectionName, ManagedQueryExecution query) { - synchronized (states) { - states.remove(stateCollectionName, query); - } + registeredQueries.remove(stateCollectionName, query); } /** @@ -139,54 +140,55 @@ public class StateUpdater public void updateStates() throws JsonProcessingException { - synchronized (states) { - // State store hasn't been loaded yet - if (stateStoreProvider.getStateStore() == null) { - return; - } + // State store hasn't been loaded yet + final StateStore stateStore = stateStoreProvider.getStateStore(); + if (stateStore == null) { + return; + } - long start = System.currentTimeMillis(); - LOG.debug("UpdateStates starts at current time milliseconds: %s, at format HH:mm:ss:SSS:%s", - start, - new SimpleDateFormat("HH:mm:ss:SSS").format(new Date(start))); - - for (String stateCollectionName : states.keySet()) { - StateCollection stateCollection = stateStoreProvider.getStateStore().getStateCollection(stateCollectionName); - Set finishedQueries = new HashSet<>(); - for (DispatchQuery query : states.get(stateCollectionName)) { - SharedQueryState state = SharedQueryState.create(query); - String stateJson = MAPPER.writeValueAsString(state); - - switch (stateCollection.getType()) { - case MAP: - ((StateMap) stateCollection).put(state.getBasicQueryInfo().getQueryId().getId(), stateJson); - break; - default: - LOG.error("Unsupported state collection type: %s", stateCollection.getType()); - } - - if (state.getBasicQueryInfo().getState() == QueryState.FINISHED || state.getBasicQueryInfo().getState() == QueryState.FAILED) { - finishedQueries.add(state.getBasicQueryInfo().getQueryId()); - } + long start = System.currentTimeMillis(); + LOG.debug("UpdateStates starts at current time milliseconds: %s, at format HH:mm:ss:SSS:%s", + start, + new SimpleDateFormat("HH:mm:ss:SSS").format(new Date(start))); + + StateCollection finishedQueries = stateStore.getStateCollection(FINISHED_QUERY_STATE_COLLECTION_NAME); + StateCollection queries = stateStore.getStateCollection(QUERY_STATE_COLLECTION_NAME); + + List queriesToUnregister = new LinkedList<>(); + synchronized (registeredQueries) { + for (DispatchQuery query : registeredQueries.get(QUERY_STATE_COLLECTION_NAME)) { + SharedQueryState state = SharedQueryState.create(query); + String stateJson = MAPPER.writeValueAsString(state); + + if (state.getBasicQueryInfo().getState() == QueryState.FINISHED || state.getBasicQueryInfo().getState() == QueryState.FAILED) { + // No need to update states for finished queries + // also move finished queries to finished-query state collection + queriesToUnregister.add(query); + ((StateMap) finishedQueries).put(state.getBasicQueryInfo().getQueryId().getId(), stateJson); + continue; } - // No need to update states for finished queries - unregisterFinishedQueries(stateCollectionName, finishedQueries); + ((StateMap) queries).put(state.getBasicQueryInfo().getQueryId().getId(), stateJson); } + } - long end = System.currentTimeMillis(); - LOG.debug("updateStates ends at current time milliseconds: %s, at format HH:mm:ss:SSS:%s, total time use: %s", - end, - new SimpleDateFormat("HH:mm:ss:SSS").format(new Date(end)), - end - start); + for (DispatchQuery query : queriesToUnregister) { + removeFromStateCollection(stateStore, QUERY_STATE_COLLECTION_NAME, query); } + + long end = System.currentTimeMillis(); + LOG.debug("updateStates ends at current time milliseconds: %s, at format HH:mm:ss:SSS:%s, total time use: %s", + end, + new SimpleDateFormat("HH:mm:ss:SSS").format(new Date(end)), + end - start); } private void queryFinished(ManagedQueryExecution query) { + StateStore stateStore = stateStoreProvider.getStateStore(); // If query killed by OOM remove the query from OOM query state store - if (isQueryKilledByOOMKiller(query)) { - removeFromStateStore(StateStoreConstants.OOM_QUERY_STATE_COLLECTION_NAME, query); + if (stateStore != null && isQueryKilledByOOMKiller(query)) { + removeFromStateCollection(stateStore, OOM_QUERY_STATE_COLLECTION_NAME, query); } } @@ -196,42 +198,14 @@ public class StateUpdater return false; } - if (query.getErrorCode().get().equals(CLUSTER_OUT_OF_MEMORY.toErrorCode()) || - query.getErrorCode().get().equals(EXCEEDED_GLOBAL_MEMORY_LIMIT.toErrorCode())) { - return true; - } - - return false; + return query.getErrorCode().get().equals(CLUSTER_OUT_OF_MEMORY.toErrorCode()) || + query.getErrorCode().get().equals(EXCEEDED_GLOBAL_MEMORY_LIMIT.toErrorCode()); } - private void removeFromStateStore(String stateCollectionName, ManagedQueryExecution query) + private void removeFromStateCollection(StateStore stateStore, String stateCollectionName, ManagedQueryExecution query) { - // State store hasn't been loaded yet - if (stateStoreProvider.getStateStore() == null) { - return; - } - - synchronized (states) { - StateCollection stateCollection = stateStoreProvider.getStateStore().getStateCollection(stateCollectionName); - removeState(stateCollection, Optional.of(query.getBasicQueryInfo().getQueryId()), LOG); - states.remove(stateCollectionName, query); - } - } - - /** - * For finished or failed queries, no need to keep updating their states to state store - * so unregister them from state updater - * - * @param stateCollectionName state collection name - * @param queryIds Queries to be unregistered - */ - private void unregisterFinishedQueries(String stateCollectionName, Set queryIds) - { - Iterator iterator = states.get(stateCollectionName).iterator(); - while (iterator.hasNext()) { - if (queryIds.contains(iterator.next().getQueryId())) { - iterator.remove(); - } - } + StateCollection stateCollection = stateStore.getStateCollection(stateCollectionName); + removeState(stateCollection, Optional.of(query.getBasicQueryInfo().getQueryId()), LOG); + unregisterQuery(stateCollectionName, query); } } diff --git a/presto-main/src/main/java/io/prestosql/utils/DistributedResourceGroupUtils.java b/presto-main/src/main/java/io/prestosql/utils/DistributedResourceGroupUtils.java index 515d509b5..05bae0a2f 100644 --- a/presto-main/src/main/java/io/prestosql/utils/DistributedResourceGroupUtils.java +++ b/presto-main/src/main/java/io/prestosql/utils/DistributedResourceGroupUtils.java @@ -86,7 +86,7 @@ public class DistributedResourceGroupUtils */ public static void accumulateCpuUsage(StateStore stateStore) { - StateCollection queryStateCollection = stateStore.getStateCollection(StateStoreConstants.QUERY_STATE_COLLECTION_NAME); + StateCollection queryStateCollection = stateStore.getStateCollection(StateStoreConstants.FINISHED_QUERY_STATE_COLLECTION_NAME); if (queryStateCollection == null) { return; } -- Gitee