diff --git a/presto-main/src/main/java/io/prestosql/exchange/ExchangeManager.java b/presto-main/src/main/java/io/prestosql/exchange/ExchangeManager.java index 0e1fd2cdcad9c2c43492a04e28b9e019f07e4462..69db1a41fef460e5863d6edb9addef49d1dbd964 100644 --- a/presto-main/src/main/java/io/prestosql/exchange/ExchangeManager.java +++ b/presto-main/src/main/java/io/prestosql/exchange/ExchangeManager.java @@ -40,4 +40,6 @@ public interface ExchangeManager ExchangeSink createSink(ExchangeSinkInstanceHandle handle, DirectSerialisationType serType, boolean preserveRecordsOrder); ExchangeSource createSource(List handles); + + double getCompressionFactor(); } diff --git a/presto-main/src/main/java/io/prestosql/exchange/FileSystemExchange.java b/presto-main/src/main/java/io/prestosql/exchange/FileSystemExchange.java index 109f622da6e851e14cd6558b8f3a2548f6723201..93c105539048e50194b08c8f17863a392334ce92 100644 --- a/presto-main/src/main/java/io/prestosql/exchange/FileSystemExchange.java +++ b/presto-main/src/main/java/io/prestosql/exchange/FileSystemExchange.java @@ -309,4 +309,9 @@ public class FileSystemExchange "." + exchangeContext.getExchangeId() + "." + taskPartitionId + PATH_SEPARATOR); } + + public boolean isExchangeCompressionEnabled() + { + return exchangeCompressionEnabled; + } } diff --git a/presto-main/src/main/java/io/prestosql/exchange/FileSystemExchangeConfig.java b/presto-main/src/main/java/io/prestosql/exchange/FileSystemExchangeConfig.java index d5e71b36a3c22dc1beb1d18be33e0bd12e63b6bd..77550ed2ea73fce3d4838d01d6a535ddc3e27f28 100644 --- a/presto-main/src/main/java/io/prestosql/exchange/FileSystemExchangeConfig.java +++ b/presto-main/src/main/java/io/prestosql/exchange/FileSystemExchangeConfig.java @@ -48,6 +48,7 @@ public class FileSystemExchangeConfig private String exchangeFilesystemType = "local"; DirectSerialisationType directSerialisationType = DirectSerialisationType.JAVA; private DataSize directSerialisationBufferSize = new DataSize(16, KILOBYTE); + private double compressionFactor = 0.55; public enum DirectSerialisationType { @@ -104,6 +105,18 @@ public class FileSystemExchangeConfig return this; } + @Config("exchange.compression-factor") + public FileSystemExchangeConfig setCompressionFactor(double compressionFactor) + { + this.compressionFactor = compressionFactor; + return this; + } + + public double getCompressionFactor() + { + return compressionFactor; + } + public DataSize getMaxPageStorageSize() { return maxPageStorageSize; diff --git a/presto-main/src/main/java/io/prestosql/exchange/FileSystemExchangeManager.java b/presto-main/src/main/java/io/prestosql/exchange/FileSystemExchangeManager.java index c7fa8b459dd08137bf08abbeb87d57712110e0be..d655fcefa2943486bd17c3b800d3653bd8ac7ac6 100644 --- a/presto-main/src/main/java/io/prestosql/exchange/FileSystemExchangeManager.java +++ b/presto-main/src/main/java/io/prestosql/exchange/FileSystemExchangeManager.java @@ -62,6 +62,7 @@ public class FileSystemExchangeManager private final DirectSerialisationType directSerialisationType; private final int directSerialisationBufferSize; + private final double compressionFactor; FileSystemExchangeConfig exchangeConfig; @Inject @@ -88,6 +89,7 @@ public class FileSystemExchangeManager this.exchangeConfig = config; //TODO(Kishore): Need to enable compression and encryption for direct serde later directSerialisationType = (exchangeCompressionEnabled || exchangeEncryptionEnabled) ? DirectSerialisationType.OFF : config.getDirectSerializationType(); + this.compressionFactor = config.getCompressionFactor(); } @Override @@ -187,4 +189,10 @@ public class FileSystemExchangeManager directSerialisationType, directSerialisationBufferSize); } + + @Override + public double getCompressionFactor() + { + return compressionFactor; + } } diff --git a/presto-main/src/main/java/io/prestosql/exchange/FileSystemExchangeSink.java b/presto-main/src/main/java/io/prestosql/exchange/FileSystemExchangeSink.java index d985a4d4624f6b9082800177a5660e1c7c09284f..af32ccc131ec1a84a2c75964939a2c8e176d3e98 100644 --- a/presto-main/src/main/java/io/prestosql/exchange/FileSystemExchangeSink.java +++ b/presto-main/src/main/java/io/prestosql/exchange/FileSystemExchangeSink.java @@ -380,6 +380,7 @@ public class FileSystemExchangeSink if (currentFileSize + requiredPageStorageSize > maxFileSizeInBytes && !preserveRecordsOrder) { stats.getFileSizeInBytes().add(currentFileSize); flushIfNeeded(true); + currentWriter.finish(); addExchangeStorageWriter(); currentFileSize = 0; currentBuffer = null; diff --git a/presto-main/src/main/java/io/prestosql/exchange/FileSystemExchangeSource.java b/presto-main/src/main/java/io/prestosql/exchange/FileSystemExchangeSource.java index 251130833ffd4ca12358229610fa584ba12d303c..6ca49c05ecc76c77127e3d430d4d61bb23d6f40d 100644 --- a/presto-main/src/main/java/io/prestosql/exchange/FileSystemExchangeSource.java +++ b/presto-main/src/main/java/io/prestosql/exchange/FileSystemExchangeSource.java @@ -129,6 +129,9 @@ public class FileSystemExchangeSource catch (IOException e) { throw new UncheckedIOException(e); } + finally { + reader.close(); + } } } diff --git a/presto-main/src/main/java/io/prestosql/execution/scheduler/StageTaskSourceFactory.java b/presto-main/src/main/java/io/prestosql/execution/scheduler/StageTaskSourceFactory.java index 506d7ac39ee5efaca9cb278ba1a3e2d8977914ca..88f040239a2437ce34b7e5cba12224ea4ec8e664 100644 --- a/presto-main/src/main/java/io/prestosql/execution/scheduler/StageTaskSourceFactory.java +++ b/presto-main/src/main/java/io/prestosql/execution/scheduler/StageTaskSourceFactory.java @@ -35,9 +35,11 @@ import io.airlift.log.Logger; import io.airlift.units.DataSize; import io.prestosql.Session; import io.prestosql.exchange.Exchange; +import io.prestosql.exchange.ExchangeManagerRegistry; import io.prestosql.exchange.ExchangeSourceHandle; import io.prestosql.exchange.ExchangeSourceSplitter; import io.prestosql.exchange.ExchangeSourceStatistics; +import io.prestosql.exchange.FileSystemExchange; import io.prestosql.execution.ForQueryExecution; import io.prestosql.execution.Lifespan; import io.prestosql.execution.QueryManagerConfig; @@ -69,6 +71,7 @@ import java.util.HashMap; import java.util.IdentityHashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.Executor; @@ -112,36 +115,43 @@ public class StageTaskSourceFactory private final TableExecuteContextManager tableExecuteContextManager; private final int splitBatchSize; private final Executor executor; + private final ExchangeManagerRegistry exchangeManagerRegistry; @Inject public StageTaskSourceFactory( SplitSourceFactory splitSourceFactory, TableExecuteContextManager tableExecuteContextManager, QueryManagerConfig queryManagerConfig, - @ForQueryExecution ExecutorService executor) + @ForQueryExecution ExecutorService executor, + ExchangeManagerRegistry exchangeManagerRegistry) { this( splitSourceFactory, tableExecuteContextManager, requireNonNull(queryManagerConfig, "queryManagerConfig is null").getScheduleSplitBatchSize(), - executor); + executor, + requireNonNull(exchangeManagerRegistry, "exchangeManagerRegistry is null")); } public StageTaskSourceFactory( SplitSourceFactory splitSourceFactory, TableExecuteContextManager tableExecuteContextManager, int splitBatchSize, - ExecutorService executor) + ExecutorService executor, + ExchangeManagerRegistry exchangeManagerRegistry) { this.splitSourceFactory = requireNonNull(splitSourceFactory, "splitSourceFactory is null"); this.tableExecuteContextManager = requireNonNull(tableExecuteContextManager, "tableExecuteContextManager is null"); this.splitBatchSize = splitBatchSize; this.executor = requireNonNull(executor, "executor is null"); + this.exchangeManagerRegistry = exchangeManagerRegistry; } @Override public TaskSource create(Session session, PlanFragment fragment, Map sourceExchanges, Multimap exchangeSourceHandles, LongConsumer getSplitTimeRecorder, Optional bucketToPartitionMap, Optional bucketNodeMap) { + double compressionFactor = exchangeManagerRegistry.getExchangeManager().getCompressionFactor(); + PartitioningHandle partitioning = fragment.getPartitioning(); if (partitioning.equals(SINGLE_DISTRIBUTION)) { @@ -153,7 +163,8 @@ public class StageTaskSourceFactory fragment, sourceExchanges, exchangeSourceHandles, - getFaultTolerantExecutionTargetTaskInputSize(session)); + getFaultTolerantExecutionTargetTaskInputSize(session), + compressionFactor); } if (partitioning.equals(FIXED_HASH_DISTRIBUTION) || partitioning.getConnectorId().isPresent()) { return HashDistributionTaskSource.create( @@ -169,7 +180,8 @@ public class StageTaskSourceFactory getFaultTolerantExecutionTargetTaskSplitCount(session) * SplitWeight.standard().getRawValue(), getFaultTolerantExecutionTargetTaskInputSize(session), getFaultTolerantPreserveInputPartitionsInWriteStage(session), - executor); + executor, + compressionFactor); } if (partitioning.equals(SOURCE_DISTRIBUTION)) { return SourceDistributionTaskSource.create( @@ -254,7 +266,8 @@ public class StageTaskSourceFactory PlanFragment fragment, Map sourceExchanges, Multimap exchangeSourceHandles, - DataSize targetPartitionSize) + DataSize targetPartitionSize, + double compressionFactor) { checkArgument(fragment.getPartitionedSources().isEmpty(), "no partitioned sources (table scans) expected, got: %s", fragment.getPartitionedSources()); IdentityHashMap exchangeForHandleMap = getExchangeForHandleMap(sourceExchanges, exchangeSourceHandles); @@ -264,7 +277,8 @@ public class StageTaskSourceFactory getPartitionedExchangeSourceHandles(fragment, exchangeSourceHandles), getReplicatedExchangeSourceHandles(fragment, exchangeSourceHandles), targetPartitionSize, - getFaultTolerantExecutionDefaultTaskMemory(session)); + getFaultTolerantExecutionDefaultTaskMemory(session), + compressionFactor); } @VisibleForTesting @@ -273,7 +287,8 @@ public class StageTaskSourceFactory Multimap partitionedExchangeSourceHandles, Multimap replicatedExchangeSourceHandles, DataSize targetPartitionSize, - DataSize taskMemory) + DataSize taskMemory, + double compressionFactor) { this.sourceExchanges = new IdentityHashMap<>(requireNonNull(sourceExchanges, "sourceExchanges is null")); this.partitionedExchangeSourceHandles = ImmutableListMultimap.copyOf(requireNonNull(partitionedExchangeSourceHandles, "partitionedExchangeSourceHandles is null")); @@ -289,7 +304,18 @@ public class StageTaskSourceFactory "Unexpected entries in replicatedExchangeSourceHandles map: %s; allowed keys: %s", replicatedExchangeSourceHandles.values(), sourceExchanges.keySet()); - this.targetPartitionSizeInBytes = requireNonNull(targetPartitionSize, "targetPartitionSize is null").toBytes(); + requireNonNull(targetPartitionSize, "targetPartitionSize is null"); + boolean isExchangeCompressionEnabled = false; + if (!sourceExchanges.values().isEmpty()) { + isExchangeCompressionEnabled = this.sourceExchanges.values().stream().findAny().filter(Objects::nonNull).map(exchange -> (FileSystemExchange) exchange).get().isExchangeCompressionEnabled(); + } + if (isExchangeCompressionEnabled) { + this.targetPartitionSizeInBytes = (long) (targetPartitionSize.toBytes() - (targetPartitionSize.toBytes() * compressionFactor)); + log.info("Spooling File size reduced by %s", compressionFactor); + } + else { + this.targetPartitionSizeInBytes = targetPartitionSize.toBytes(); + } } @Override @@ -398,7 +424,8 @@ public class StageTaskSourceFactory long targetPartitionSplitWeight, DataSize targetPartitionSourceSize, boolean preserveInputPartitionsInWriteStage, - Executor executor) + Executor executor, + double compressionFactor) { checkArgument(bucketNodeMap.isPresent() || fragment.getPartitionedSources().isEmpty(), "bucketNodeMap is expected to be set when the fragment reads partitioned sources (tables)"); Map hashDistributionSplitSources = splitSourceFactory.createSplitSources(session, fragment); @@ -416,7 +443,8 @@ public class StageTaskSourceFactory targetPartitionSplitWeight, (preserveInputPartitionsInWriteStage && isWriteFragment(fragment)) ? new DataSize(0, BYTE) : targetPartitionSourceSize, getFaultTolerantExecutionDefaultTaskMemory(session), - executor); + executor, + compressionFactor); } private static boolean isWriteFragment(PlanFragment fragment) @@ -458,7 +486,8 @@ public class StageTaskSourceFactory long targetPartitionSplitWeight, DataSize targetPartitionSourceSize, DataSize taskMemory, - Executor executor) + Executor executor, + double compressionFactor) { this.splitSources = ImmutableMap.copyOf(requireNonNull(splitSources, "splitSources is null")); this.exchangeForHandle = new IdentityHashMap<>(); @@ -472,7 +501,20 @@ public class StageTaskSourceFactory this.taskMemory = requireNonNull(taskMemory, "taskMemory is null"); checkArgument(bucketNodeMap.isPresent() || splitSources.isEmpty(), "bucketNodeMap is expected to be set when the fragment reads partitioned sources (tables)"); this.catalogRequirement = requireNonNull(catalogRequirement, "catalogRequirement is null"); - this.targetPartitionSourceSizeInBytes = requireNonNull(targetPartitionSourceSize, "targetPartitionSourceSize is null").toBytes(); + requireNonNull(targetPartitionSourceSize, "targetPartitionSourceSize is null"); + boolean isExchangeCompressionEnabled = false; + if (!exchangeForHandle.values().isEmpty()) { + isExchangeCompressionEnabled = this.exchangeForHandle.values().stream().findAny().filter(Objects::nonNull).map(exchange -> (FileSystemExchange) exchange).get().isExchangeCompressionEnabled(); + } + + if (isExchangeCompressionEnabled) { + this.targetPartitionSourceSizeInBytes = (long) (targetPartitionSourceSize.toBytes() - (targetPartitionSourceSize.toBytes() * compressionFactor)); + log.info("Spooling File size reduced by %s", compressionFactor); + } + else { + this.targetPartitionSourceSizeInBytes = targetPartitionSourceSize.toBytes(); + } + this.targetPartitionSplitWeight = targetPartitionSplitWeight; this.executor = requireNonNull(executor, "executor is null"); } diff --git a/presto-main/src/test/java/io/prestosql/exchange/TestExchangeManager.java b/presto-main/src/test/java/io/prestosql/exchange/TestExchangeManager.java index c17262d832272916ae20d7473a6e9f3550878ef2..752d8781ae2e0838ffcc26f4c64ab619d2ed16ed 100644 --- a/presto-main/src/test/java/io/prestosql/exchange/TestExchangeManager.java +++ b/presto-main/src/test/java/io/prestosql/exchange/TestExchangeManager.java @@ -93,6 +93,12 @@ public abstract class TestExchangeManager { return null; } + + @Override + public double getCompressionFactor() + { + return 0.55; + } }; } }