diff options
10 files changed, 36 insertions, 55 deletions
diff --git a/config-model-api/src/main/java/com/yahoo/config/model/api/ModelContext.java b/config-model-api/src/main/java/com/yahoo/config/model/api/ModelContext.java index 37cf2922a6c..6517c3e57fc 100644 --- a/config-model-api/src/main/java/com/yahoo/config/model/api/ModelContext.java +++ b/config-model-api/src/main/java/com/yahoo/config/model/api/ModelContext.java @@ -77,7 +77,7 @@ public interface ModelContext { @ModelFeatureFlag(owners = {"tokle"}) default boolean useAccessControlTlsHandshakeClientAuth() { return false; } @ModelFeatureFlag(owners = {"baldersheim"}) default boolean useAsyncMessageHandlingOnSchedule() { throw new UnsupportedOperationException("TODO specify default value"); } @ModelFeatureFlag(owners = {"baldersheim"}, removeAfter = "7.348") default int contentNodeBucketDBStripeBits() { return 4; } - @ModelFeatureFlag(owners = {"baldersheim"}) default int mergeChunkSize() { throw new UnsupportedOperationException("TODO specify default value"); } + @ModelFeatureFlag(owners = {"baldersheim"}, removeAfter = "7.350") default int mergeChunkSize() { return 0x2000000; } @ModelFeatureFlag(owners = {"baldersheim"}) default double feedConcurrency() { throw new UnsupportedOperationException("TODO specify default value"); } @ModelFeatureFlag(owners = {"baldersheim"}) default boolean useBucketExecutorForLidSpaceCompact() { throw new UnsupportedOperationException("TODO specify default value"); } @ModelFeatureFlag(owners = {"musum", "mpolden"}, comment = "Revisit in February 2021") default boolean reconfigurableZookeeperServer() { return false; } diff --git a/config-model/src/main/java/com/yahoo/config/model/deploy/TestProperties.java b/config-model/src/main/java/com/yahoo/config/model/deploy/TestProperties.java index 686cb7ce7c6..6ca1c8f2b79 100644 --- a/config-model/src/main/java/com/yahoo/config/model/deploy/TestProperties.java +++ b/config-model/src/main/java/com/yahoo/config/model/deploy/TestProperties.java @@ -47,7 +47,6 @@ public class TestProperties implements ModelContext.Properties, ModelContext.Fea private Quota quota = Quota.unlimited(); private boolean useAccessControlTlsHandshakeClientAuth; private boolean useAsyncMessageHandlingOnSchedule = false; - private int mergeChunkSize = 0x400000 - 0x1000; // 4M -4k private double feedConcurrency = 0.5; private boolean enableAutomaticReindexing = false; private boolean reconfigurableZookeeperServer = false; @@ -82,7 +81,6 @@ public class TestProperties implements ModelContext.Properties, ModelContext.Fea @Override public Quota quota() { return quota; } @Override public boolean useAccessControlTlsHandshakeClientAuth() { return useAccessControlTlsHandshakeClientAuth; } @Override public boolean useAsyncMessageHandlingOnSchedule() { return useAsyncMessageHandlingOnSchedule; } - @Override public int mergeChunkSize() { return mergeChunkSize; } @Override public double feedConcurrency() { return feedConcurrency; } @Override public boolean enableAutomaticReindexing() { return enableAutomaticReindexing; } @Override public boolean reconfigurableZookeeperServer() { return reconfigurableZookeeperServer; } @@ -94,11 +92,6 @@ public class TestProperties implements ModelContext.Properties, ModelContext.Fea return this; } - public TestProperties setMergeChunkSize(int size) { - mergeChunkSize = size; - return this; - } - public TestProperties setAsyncMessageHandlingOnSchedule(boolean value) { useAsyncMessageHandlingOnSchedule = value; return this; diff --git a/config-model/src/main/java/com/yahoo/vespa/model/content/storagecluster/FileStorProducer.java b/config-model/src/main/java/com/yahoo/vespa/model/content/storagecluster/FileStorProducer.java index ab5dab4fbb9..57292b32a35 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/content/storagecluster/FileStorProducer.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/content/storagecluster/FileStorProducer.java @@ -47,7 +47,6 @@ public class FileStorProducer implements StorFilestorConfig.Producer { private final int reponseNumThreads; private final StorFilestorConfig.Response_sequencer_type.Enum responseSequencerType; private final boolean useAsyncMessageHandlingOnSchedule; - private final int mergeChunkSize; private static StorFilestorConfig.Response_sequencer_type.Enum convertResponseSequencerType(String sequencerType) { try { @@ -56,17 +55,13 @@ public class FileStorProducer implements StorFilestorConfig.Producer { return StorFilestorConfig.Response_sequencer_type.Enum.ADAPTIVE; } } - private static int alignUp2MiB(int value) { - final int twoMB = 0x200000; - return ((value + twoMB - 1)/twoMB) * twoMB; - } + public FileStorProducer(ModelContext.FeatureFlags featureFlags, ContentCluster parent, Integer numThreads) { this.numThreads = numThreads; this.cluster = parent; this.reponseNumThreads = featureFlags.defaultNumResponseThreads(); this.responseSequencerType = convertResponseSequencerType(featureFlags.responseSequencerType()); useAsyncMessageHandlingOnSchedule = featureFlags.useAsyncMessageHandlingOnSchedule(); - mergeChunkSize = alignUp2MiB(featureFlags.mergeChunkSize()); // Align up to default huge page size. } @Override @@ -78,7 +73,6 @@ public class FileStorProducer implements StorFilestorConfig.Producer { builder.num_response_threads(reponseNumThreads); builder.response_sequencer_type(responseSequencerType); builder.use_async_message_handling_on_schedule(useAsyncMessageHandlingOnSchedule); - builder.bucket_merge_chunk_size(mergeChunkSize); } } diff --git a/config-model/src/test/java/com/yahoo/vespa/model/content/StorageClusterTest.java b/config-model/src/test/java/com/yahoo/vespa/model/content/StorageClusterTest.java index 9c857414717..5cf57430f91 100644 --- a/config-model/src/test/java/com/yahoo/vespa/model/content/StorageClusterTest.java +++ b/config-model/src/test/java/com/yahoo/vespa/model/content/StorageClusterTest.java @@ -287,19 +287,6 @@ public class StorageClusterTest { assertEquals(StorFilestorConfig.Response_sequencer_type.THROUGHPUT, config.response_sequencer_type()); } - private void verifyMergeChunkSize(int expected, int value) { - StorFilestorConfig.Builder builder = new StorFilestorConfig.Builder(); - simpleCluster(new TestProperties().setMergeChunkSize(value)).getConfig(builder); - StorFilestorConfig config = new StorFilestorConfig(builder); - assertEquals(expected, config.bucket_merge_chunk_size()); - } - - @Test - public void testFeatureFlagControlOfMergeChunkSize() { - verifyMergeChunkSize(0x200000, 13); - verifyMergeChunkSize(0x1600000, 0x1500000); - } - private void verifyAsyncMessageHandlingOnSchedule(boolean expected, boolean value) { StorFilestorConfig.Builder builder = new StorFilestorConfig.Builder(); simpleCluster(new TestProperties().setAsyncMessageHandlingOnSchedule(value)).getConfig(builder); diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java b/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java index 4c1c0a2d738..aa9ae65394f 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java @@ -158,7 +158,6 @@ public class ModelContextImpl implements ModelContext { private final boolean skipMbusReplyThread; private final boolean useAccessControlTlsHandshakeClientAuth; private final boolean useAsyncMessageHandlingOnSchedule; - private final int mergeChunkSize; private final double feedConcurrency; private final boolean reconfigurableZookeeperServer; private final boolean enableJdiscConnectionLog; @@ -178,7 +177,6 @@ public class ModelContextImpl implements ModelContext { this.skipMbusReplyThread = flagValue(source, appId, Flags.SKIP_MBUS_REPLY_THREAD); this.useAccessControlTlsHandshakeClientAuth = flagValue(source, appId, Flags.USE_ACCESS_CONTROL_CLIENT_AUTHENTICATION); this.useAsyncMessageHandlingOnSchedule = flagValue(source, appId, Flags.USE_ASYNC_MESSAGE_HANDLING_ON_SCHEDULE); - this.mergeChunkSize = flagValue(source, appId, Flags.MERGE_CHUNK_SIZE); this.feedConcurrency = flagValue(source, appId, Flags.FEED_CONCURRENCY); this.reconfigurableZookeeperServer = flagValue(source, appId, Flags.RECONFIGURABLE_ZOOKEEPER_SERVER_FOR_CLUSTER_CONTROLLER); this.enableJdiscConnectionLog = flagValue(source, appId, Flags.ENABLE_JDISC_CONNECTION_LOG); @@ -198,7 +196,6 @@ public class ModelContextImpl implements ModelContext { @Override public boolean skipMbusReplyThread() { return skipMbusReplyThread; } @Override public boolean useAccessControlTlsHandshakeClientAuth() { return useAccessControlTlsHandshakeClientAuth; } @Override public boolean useAsyncMessageHandlingOnSchedule() { return useAsyncMessageHandlingOnSchedule; } - @Override public int mergeChunkSize() { return mergeChunkSize; } @Override public double feedConcurrency() { return feedConcurrency; } @Override public boolean reconfigurableZookeeperServer() { return reconfigurableZookeeperServer; } @Override public boolean enableJdiscConnectionLog() { return enableJdiscConnectionLog; } diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/ModelContextImplTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/ModelContextImplTest.java index 582ecd13ce5..b1edc031e0b 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/ModelContextImplTest.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/ModelContextImplTest.java @@ -95,7 +95,6 @@ public class ModelContextImplTest { assertEquals(new Version(8), context.wantedNodeVespaVersion()); assertEquals(1.0, context.properties().featureFlags().defaultTermwiseLimit(), 0.0); assertFalse(context.properties().featureFlags().useAsyncMessageHandlingOnSchedule()); - assertEquals(0x2000000, context.properties().featureFlags().mergeChunkSize()); assertEquals(0.5, context.properties().featureFlags().feedConcurrency(), 0.0); } diff --git a/flags/src/main/java/com/yahoo/vespa/flags/Flags.java b/flags/src/main/java/com/yahoo/vespa/flags/Flags.java index 532f19b911f..f29860c79bb 100644 --- a/flags/src/main/java/com/yahoo/vespa/flags/Flags.java +++ b/flags/src/main/java/com/yahoo/vespa/flags/Flags.java @@ -60,14 +60,14 @@ public class Flags { public static final UnboundDoubleFlag DEFAULT_TERM_WISE_LIMIT = defineDoubleFlag( "default-term-wise-limit", 1.0, - List.of("baldersheim"), "2020-12-02", "2021-02-01", + List.of("baldersheim"), "2020-12-02", "2022-01-01", "Default limit for when to apply termwise query evaluation", "Takes effect at redeployment", ZONE_ID, APPLICATION_ID); public static final UnboundStringFlag FEED_SEQUENCER_TYPE = defineStringFlag( "feed-sequencer-type", "LATENCY", - List.of("baldersheim"), "2020-12-02", "2021-02-01", + List.of("baldersheim"), "2020-12-02", "2022-01-01", "Selects type of sequenced executor used for feeding, valid values are LATENCY, ADAPTIVE, THROUGHPUT", "Takes effect at redeployment", ZONE_ID, APPLICATION_ID); @@ -81,28 +81,28 @@ public class Flags { public static final UnboundIntFlag RESPONSE_NUM_THREADS = defineIntFlag( "response-num-threads", 2, - List.of("baldersheim"), "2020-12-02", "2021-02-01", + List.of("baldersheim"), "2020-12-02", "2022-01-01", "Number of threads used for mbus responses, default is 2, negative number = numcores/4", "Takes effect at redeployment", ZONE_ID, APPLICATION_ID); public static final UnboundBooleanFlag SKIP_COMMUNICATIONMANAGER_THREAD = defineFeatureFlag( "skip-communicatiomanager-thread", false, - List.of("baldersheim"), "2020-12-02", "2021-02-01", + List.of("baldersheim"), "2020-12-02", "2022-01-01", "Should we skip the communicationmanager thread", "Takes effect at redeployment", ZONE_ID, APPLICATION_ID); public static final UnboundBooleanFlag SKIP_MBUS_REQUEST_THREAD = defineFeatureFlag( "skip-mbus-request-thread", false, - List.of("baldersheim"), "2020-12-02", "2021-02-01", + List.of("baldersheim"), "2020-12-02", "2022-01-01", "Should we skip the mbus request thread", "Takes effect at redeployment", ZONE_ID, APPLICATION_ID); public static final UnboundBooleanFlag SKIP_MBUS_REPLY_THREAD = defineFeatureFlag( "skip-mbus-reply-thread", false, - List.of("baldersheim"), "2020-12-02", "2021-02-01", + List.of("baldersheim"), "2020-12-02", "2022-01-01", "Should we skip the mbus reply thread", "Takes effect at redeployment", ZONE_ID, APPLICATION_ID); @@ -233,21 +233,14 @@ public class Flags { public static final UnboundBooleanFlag USE_ASYNC_MESSAGE_HANDLING_ON_SCHEDULE = defineFeatureFlag( "async-message-handling-on-schedule", false, - List.of("baldersheim"), "2020-12-02", "2021-02-01", + List.of("baldersheim"), "2020-12-02", "2022-01-01", "Optionally deliver async messages in own thread", "Takes effect at redeployment", ZONE_ID, APPLICATION_ID); - public static final UnboundIntFlag MERGE_CHUNK_SIZE = defineIntFlag( - "merge-chunk-size", 0x2000000, - List.of("baldersheim"), "2020-12-02", "2021-02-01", - "Size of baldersheim buffer in service layer", - "Takes effect at redeployment", - ZONE_ID, APPLICATION_ID); - public static final UnboundDoubleFlag FEED_CONCURRENCY = defineDoubleFlag( "feed-concurrency", 0.5, - List.of("baldersheim"), "2020-12-02", "2021-02-01", + List.of("baldersheim"), "2020-12-02", "2022-01-01", "How much concurrency should be allowed for feed", "Takes effect at redeployment", ZONE_ID, APPLICATION_ID); diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp index 42f9e876eee..371b63e442c 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp @@ -820,6 +820,13 @@ FileStorManager::sendUp(const std::shared_ptr<api::StorageMessage>& msg) void FileStorManager::onClose() { LOG(debug, "Start closing"); + std::unique_ptr<vespalib::IDestructorCallback> toDestruct; + { + std::lock_guard guard(_executeLock); + toDestruct = std::move(_bucketExecutorRegistration); + } + toDestruct.reset(); + _resource_usage_listener_registration.reset(); // Avoid getting config during shutdown _configFetcher.close(); LOG(debug, "Closed _configFetcher."); @@ -985,18 +992,18 @@ void FileStorManager::initialize_bucket_databases_from_provider() { class FileStorManager::TrackExecutedTasks : public vespalib::Executor::Task { public: - TrackExecutedTasks(FileStorManager & manager); + TrackExecutedTasks(std::lock_guard<std::mutex> & guard, FileStorManager & manager); void run() override; private: FileStorManager & _manager; size_t _serialNum; }; -FileStorManager::TrackExecutedTasks::TrackExecutedTasks(FileStorManager & manager) +FileStorManager::TrackExecutedTasks::TrackExecutedTasks(std::lock_guard<std::mutex> & guard, FileStorManager & manager) : _manager(manager), _serialNum(0) { - std::lock_guard guard(_manager._executeLock); + (void) guard; _serialNum = _manager._executeCount++; _manager._tasksInExecute.insert(_serialNum); } @@ -1015,8 +1022,16 @@ FileStorManager::execute(const spi::Bucket &bucket, std::unique_ptr<spi::BucketT StorBucketDatabase::WrappedEntry entry(_component.getBucketDatabase(bucket.getBucketSpace()).get( bucket.getBucketId(), "FileStorManager::execute")); if (entry.exist()) { - auto trackBuckets = std::make_unique<TrackExecutedTasks>(*this); - _filestorHandler->schedule(std::make_shared<RunTaskCommand>(bucket, std::move(trackBuckets), std::move(task))); + std::unique_ptr<TrackExecutedTasks> trackTasks; + { + std::lock_guard guard(_executeLock); + if (_bucketExecutorRegistration) { + trackTasks = std::make_unique<TrackExecutedTasks>(guard, *this); + } + } + if (trackTasks) { + _filestorHandler->schedule(std::make_shared<RunTaskCommand>(bucket, std::move(trackTasks), std::move(task))); + } } return task; } diff --git a/storage/src/vespa/storage/persistence/messages.cpp b/storage/src/vespa/storage/persistence/messages.cpp index 14f41850a4f..6b1e0e50c65 100644 --- a/storage/src/vespa/storage/persistence/messages.cpp +++ b/storage/src/vespa/storage/persistence/messages.cpp @@ -192,7 +192,11 @@ RunTaskCommand::RunTaskCommand(const spi::Bucket &bucket, _bucket(bucket) { } -RunTaskCommand::~RunTaskCommand() = default; +RunTaskCommand::~RunTaskCommand() { + if (_afterRun) { + _afterRun->run(); + } +} void RunTaskCommand::run(const spi::Bucket & bucket, std::shared_ptr<vespalib::IDestructorCallback> onComplete) @@ -202,6 +206,7 @@ RunTaskCommand::run(const spi::Bucket & bucket, std::shared_ptr<vespalib::IDestr } if (_afterRun) { _afterRun->run(); + _afterRun.reset(); } } diff --git a/yolean/src/main/java/com/yahoo/yolean/chain/Chain.java b/yolean/src/main/java/com/yahoo/yolean/chain/Chain.java index e57e83c644f..4d638847ed9 100644 --- a/yolean/src/main/java/com/yahoo/yolean/chain/Chain.java +++ b/yolean/src/main/java/com/yahoo/yolean/chain/Chain.java @@ -3,10 +3,8 @@ package com.yahoo.yolean.chain; import com.google.common.collect.ImmutableList; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.Iterator; import java.util.List; |