diff --git a/presto-main/src/main/java/io/prestosql/operator/ScanFilterAndProjectOperator.java b/presto-main/src/main/java/io/prestosql/operator/ScanFilterAndProjectOperator.java index 5046517cbe7612ba41f86479c1fbd783530c4dec..417b3cc6af1981bba05702e0baf1b83dfd2dddd8 100644 --- a/presto-main/src/main/java/io/prestosql/operator/ScanFilterAndProjectOperator.java +++ b/presto-main/src/main/java/io/prestosql/operator/ScanFilterAndProjectOperator.java @@ -493,7 +493,12 @@ public class ScanFilterAndProjectOperator page = recordMaterializedBytes(page, sizeInBytes -> processedBytes += sizeInBytes); // update operator stats - processedPositions += page.getPositionCount(); + if (pageSource.getCompletedPositionCount().isPresent()) { + processedPositions = pageSource.getCompletedPositionCount().getAsLong(); + } + else { + processedPositions += page.getPositionCount(); + } physicalBytes = pageSource.getCompletedBytes(); readTimeNanos = pageSource.getReadTimeNanos(); diff --git a/presto-main/src/main/java/io/prestosql/operator/TableScanOperator.java b/presto-main/src/main/java/io/prestosql/operator/TableScanOperator.java index ccf28b635f2514504ee7a3d2d2a5358447c33bd8..6dcd2654b17f01e3a41928206b7f3ddb1369a409 100644 --- a/presto-main/src/main/java/io/prestosql/operator/TableScanOperator.java +++ b/presto-main/src/main/java/io/prestosql/operator/TableScanOperator.java @@ -267,6 +267,8 @@ public class TableScanOperator private boolean finished; + // completedPositionCount is used only if connectorPageSource.getCompletedPositionCount is present. + private long completedPositionCount; private long completedBytes; private long readTimeNanos; Optional tableScanNodeOptional; @@ -707,17 +709,34 @@ public class TableScanOperator } Page page = source.getNextPage(); - if (page != null) { - // assure the page is in memory before handing to another operator - page = page.getLoadedPage(); - // update operator stats + // if pageSource.getCompletedPositionCount is present, get operator statistics from pageSource + if (source.getCompletedPositionCount().isPresent()) { + long endCompletedPositionCount = source.getCompletedPositionCount().getAsLong(); long endCompletedBytes = source.getCompletedBytes(); long endReadTimeNanos = source.getReadTimeNanos(); - operatorContext.recordPhysicalInputWithTiming(endCompletedBytes - completedBytes, page.getPositionCount(), endReadTimeNanos - readTimeNanos); - operatorContext.recordProcessedInput(page.getSizeInBytes(), page.getPositionCount()); + long currentPositionCount = endCompletedPositionCount - completedPositionCount; + long currentCompletedBytes = endCompletedBytes - completedBytes; + operatorContext.recordPhysicalInputWithTiming(currentCompletedBytes, currentPositionCount, endReadTimeNanos - readTimeNanos); + operatorContext.recordProcessedInput(currentCompletedBytes, currentPositionCount); + completedPositionCount = endCompletedPositionCount; completedBytes = endCompletedBytes; readTimeNanos = endReadTimeNanos; + } + + if (page != null) { + // assure the page is in memory before handing to another operator + page = page.getLoadedPage(); + + // update operator stats + if (!source.getCompletedPositionCount().isPresent()) { + long endCompletedBytes = source.getCompletedBytes(); + long endReadTimeNanos = source.getReadTimeNanos(); + operatorContext.recordPhysicalInputWithTiming(endCompletedBytes - completedBytes, page.getPositionCount(), endReadTimeNanos - readTimeNanos); + operatorContext.recordProcessedInput(page.getSizeInBytes(), page.getPositionCount()); + completedBytes = endCompletedBytes; + readTimeNanos = endReadTimeNanos; + } // pull bloomFilter from stateStore and filter page if (existsCrossFilter) { diff --git a/presto-spi/src/main/java/io/prestosql/spi/connector/ConnectorPageSource.java b/presto-spi/src/main/java/io/prestosql/spi/connector/ConnectorPageSource.java index f0db117723787c9504f8cc5f6cd7d2d489f6a1f4..0049cc63fd2a6eb1b7201879a08071d00639a190 100644 --- a/presto-spi/src/main/java/io/prestosql/spi/connector/ConnectorPageSource.java +++ b/presto-spi/src/main/java/io/prestosql/spi/connector/ConnectorPageSource.java @@ -17,6 +17,7 @@ import io.prestosql.spi.Page; import java.io.Closeable; import java.io.IOException; +import java.util.OptionalLong; import java.util.concurrent.CompletableFuture; public interface ConnectorPageSource @@ -75,4 +76,14 @@ public interface ConnectorPageSource { return false; } + + /** + * Some components can push down some operators to other component for calculation. + * This API is used to return the number of position count before push down for statistics. + * If this API is not used, just return empty. + */ + default OptionalLong getCompletedPositionCount() + { + return OptionalLong.empty(); + } }