diff options
47 files changed, 254 insertions, 777 deletions
diff --git a/clustercontroller-apps/src/main/java/com/yahoo/vespa/clustercontroller/apps/clustercontroller/ClusterController.java b/clustercontroller-apps/src/main/java/com/yahoo/vespa/clustercontroller/apps/clustercontroller/ClusterController.java index 75ea7000e4c..b04f04abfb6 100644 --- a/clustercontroller-apps/src/main/java/com/yahoo/vespa/clustercontroller/apps/clustercontroller/ClusterController.java +++ b/clustercontroller-apps/src/main/java/com/yahoo/vespa/clustercontroller/apps/clustercontroller/ClusterController.java @@ -1,4 +1,4 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.clustercontroller.apps.clustercontroller; import com.google.inject.Inject; @@ -10,6 +10,7 @@ import com.yahoo.vespa.clustercontroller.core.FleetControllerOptions; import com.yahoo.vespa.clustercontroller.core.RemoteClusterControllerTaskScheduler; import com.yahoo.vespa.clustercontroller.core.restapiv2.ClusterControllerStateRestAPI; import com.yahoo.vespa.clustercontroller.core.status.StatusHandler; +import com.yahoo.vespa.curator.Curator; import java.util.LinkedHashMap; import java.util.Map; @@ -33,7 +34,6 @@ public class ClusterController extends AbstractComponent * to ensure that zookeeper has started before we start polling it. */ @Inject - @SuppressWarnings("unused") public ClusterController(ZooKeeperProvider zooKeeperProvider) { this(); } @@ -45,8 +45,17 @@ public class ClusterController extends AbstractComponent public void setOptions(String clusterName, FleetControllerOptions options, Metric metricImpl) throws Exception { metricWrapper.updateMetricImplementation(metricImpl); + if (options.zooKeeperServerAddress != null && !"".equals(options.zooKeeperServerAddress)) { + // Wipe this path ... it's unclear why + String path = "/" + options.clusterName + options.fleetControllerIndex; + Curator curator = Curator.create(options.zooKeeperServerAddress); + if (curator.framework().checkExists().forPath(path) != null) + curator.framework().delete().deletingChildrenIfNeeded().forPath(path); + curator.framework().create().creatingParentsIfNeeded().forPath(path); + } synchronized (controllers) { FleetController controller = controllers.get(clusterName); + if (controller == null) { StatusHandler.ContainerStatusPageServer statusPageServer = new StatusHandler.ContainerStatusPageServer(); controller = FleetController.create(options, statusPageServer, metricWrapper); diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java index 19dfd031dfc..fd887a4196b 100644 --- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java +++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java @@ -49,10 +49,10 @@ public class Reindexer { private final ReindexingMetrics metrics; private final Clock clock; private final Phaser phaser = new Phaser(2); // Reindexer and visitor. + private final double windowSizeIncrement; - @Inject public Reindexer(Cluster cluster, Map<DocumentType, Instant> ready, ReindexingCurator database, - DocumentAccess access, Metric metric, Clock clock) { + DocumentAccess access, Metric metric, Clock clock, double windowSizeIncrement) { this(cluster, ready, database, @@ -65,11 +65,13 @@ public class Reindexer { } }, metric, - clock); + clock, + windowSizeIncrement); } Reindexer(Cluster cluster, Map<DocumentType, Instant> ready, ReindexingCurator database, - Function<VisitorParameters, Runnable> visitorSessions, Metric metric, Clock clock) { + Function<VisitorParameters, Runnable> visitorSessions, Metric metric, Clock clock, + double windowSizeIncrement) { for (DocumentType type : ready.keySet()) cluster.bucketSpaceOf(type); // Verifies this is known. @@ -79,6 +81,7 @@ public class Reindexer { this.visitorSessions = visitorSessions; this.metrics = new ReindexingMetrics(metric, cluster.name); this.clock = clock; + this.windowSizeIncrement = windowSizeIncrement; } /** Lets the reindexer abort any ongoing visit session, wait for it to complete normally, then exit. */ @@ -91,6 +94,9 @@ public class Reindexer { if (phaser.isTerminated()) throw new IllegalStateException("Already shut down"); + // Keep metrics in sync across cluster controller containers. + metrics.dump(database.readReindexing(cluster.name)); + try (Lock lock = database.lockReindexing(cluster.name())) { AtomicReference<Reindexing> reindexing = new AtomicReference<>(database.readReindexing(cluster.name())); reindexing.set(updateWithReady(ready, reindexing.get(), clock.instant())); @@ -191,7 +197,7 @@ public class Reindexer { VisitorParameters createParameters(DocumentType type, ProgressToken progress) { VisitorParameters parameters = new VisitorParameters(type.getName()); - parameters.setThrottlePolicy(new DynamicThrottlePolicy().setWindowSizeIncrement(0.2) + parameters.setThrottlePolicy(new DynamicThrottlePolicy().setWindowSizeIncrement(windowSizeIncrement) .setWindowSizeDecrementFactor(5) .setResizeRate(10) .setMinWindowSize(1)); diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java index 9a114eabbb5..8668ed037ef 100644 --- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java +++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java @@ -72,7 +72,8 @@ public class ReindexingMaintainer extends AbstractComponent { access.getDocumentTypeManager()), access, metric, - clock)) + clock, + reindexingConfig.windowSizeIncrement())) .collect(toUnmodifiableList()); this.executor = new ScheduledThreadPoolExecutor(reindexingConfig.clusters().size(), new DaemonThreadFactory("reindexer-")); if (reindexingConfig.enabled()) diff --git a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java index 7086c36af3f..9a88d8aad1f 100644 --- a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java +++ b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java @@ -59,17 +59,12 @@ class ReindexerTest { @Test void throwsWhenUnknownBuckets() { assertThrows(NullPointerException.class, - () -> new Reindexer(new Cluster("cluster", "id", Map.of()), - Map.of(music, Instant.EPOCH), - database, - failIfCalled, - metric, - clock)); + () -> new Reindexer(new Cluster("cluster", "id", Map.of()), Map.of(music, Instant.EPOCH), database, failIfCalled, metric, clock, 0.2)); } @Test void throwsWhenLockHeldElsewhere() throws InterruptedException, ExecutionException { - Reindexer reindexer = new Reindexer(cluster, Map.of(music, Instant.EPOCH), database, failIfCalled, metric, clock); + Reindexer reindexer = new Reindexer(cluster, Map.of(music, Instant.EPOCH), database, failIfCalled, metric, clock, 0.2); Executors.newSingleThreadExecutor().submit(() -> database.lockReindexing("cluster")).get(); assertThrows(ReindexingLockException.class, reindexer::reindex); } @@ -77,13 +72,13 @@ class ReindexerTest { @Test @Timeout(10) void nothingToDoWithEmptyConfig() throws ReindexingLockException { - new Reindexer(cluster, Map.of(), database, failIfCalled, metric, clock).reindex(); + new Reindexer(cluster, Map.of(), database, failIfCalled, metric, clock, 0.2).reindex(); assertEquals(Map.of(), metric.metrics()); } @Test void testParameters() { - Reindexer reindexer = new Reindexer(cluster, Map.of(), database, failIfCalled, metric, clock); + Reindexer reindexer = new Reindexer(cluster, Map.of(), database, failIfCalled, metric, clock, 0.2); ProgressToken token = new ProgressToken(); VisitorParameters parameters = reindexer.createParameters(music, token); assertEquals("music:[document]", parameters.getFieldSet()); @@ -100,7 +95,7 @@ class ReindexerTest { void testReindexing() throws ReindexingLockException { // Reindexer is told to update "music" documents no earlier than EPOCH, which is just now. // Since "music" is a new document type, it is stored as just reindexed, and nothing else happens. - new Reindexer(cluster, Map.of(music, Instant.EPOCH), database, failIfCalled, metric, clock).reindex(); + new Reindexer(cluster, Map.of(music, Instant.EPOCH), database, failIfCalled, metric, clock, 0.2).reindex(); Reindexing reindexing = Reindexing.empty().with(music, Status.ready(Instant.EPOCH).running().successful(Instant.EPOCH)); assertEquals(reindexing, database.readReindexing("cluster")); assertEquals(Map.of("reindexing.progress", Map.of(Map.of("documenttype", "music", @@ -124,7 +119,7 @@ class ReindexerTest { // New config tells reindexer to reindex "music" documents no earlier than at 10 millis after EPOCH, which isn't yet. // Nothing happens, since it's not yet time. This isn't supposed to happen unless high clock skew. clock.advance(Duration.ofMillis(5)); - new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(10)), database, failIfCalled, metric, clock).reindex(); + new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(10)), database, failIfCalled, metric, clock, 0.2).reindex(); assertEquals(reindexing, database.readReindexing("cluster")); // It's time to reindex the "music" documents — let this complete successfully. @@ -132,10 +127,10 @@ class ReindexerTest { AtomicBoolean shutDown = new AtomicBoolean(); Executor executor = Executors.newSingleThreadExecutor(); new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(10)), database, parameters -> { - database.writeReindexing(Reindexing.empty(), "cluster"); // Wipe database to verify we write data from reindexer. - executor.execute(() -> parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.SUCCESS, "OK")); - return () -> shutDown.set(true); - }, metric, clock).reindex(); + database.writeReindexing(Reindexing.empty(), "cluster"); // Wipe database to verify we write data from reindexer. + executor.execute(() -> parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.SUCCESS, "OK")); + return () -> shutDown.set(true); + }, metric, clock, 0.2).reindex(); reindexing = reindexing.with(music, Status.ready(clock.instant()).running().successful(clock.instant())); assertEquals(reindexing, database.readReindexing("cluster")); assertTrue(shutDown.get(), "Session was shut down"); @@ -146,14 +141,14 @@ class ReindexerTest { shutDown.set(false); AtomicReference<Reindexer> aborted = new AtomicReference<>(); aborted.set(new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(20)), database, parameters -> { - database.writeReindexing(Reindexing.empty(), "cluster"); // Wipe database to verify we write data from reindexer. - parameters.getControlHandler().onProgress(new ProgressToken()); - aborted.get().shutdown(); - return () -> { - shutDown.set(true); - parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.ABORTED, "Shut down"); - }; - }, metric, clock)); + database.writeReindexing(Reindexing.empty(), "cluster"); // Wipe database to verify we write data from reindexer. + parameters.getControlHandler().onProgress(new ProgressToken()); + aborted.get().shutdown(); + return () -> { + shutDown.set(true); + parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.ABORTED, "Shut down"); + }; + }, metric, clock, 0.2)); aborted.get().reindex(); reindexing = reindexing.with(music, Status.ready(clock.instant()).running().progressed(new ProgressToken()).halted()); assertEquals(reindexing, database.readReindexing("cluster")); @@ -168,16 +163,16 @@ class ReindexerTest { clock.advance(Duration.ofMillis(10)); shutDown.set(false); new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(30)), database, parameters -> { - database.writeReindexing(Reindexing.empty(), "cluster"); // Wipe database to verify we write data from reindexer. - executor.execute(() -> parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.FAILURE, "Error")); - return () -> shutDown.set(true); - }, metric, clock).reindex(); + database.writeReindexing(Reindexing.empty(), "cluster"); // Wipe database to verify we write data from reindexer. + executor.execute(() -> parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.FAILURE, "Error")); + return () -> shutDown.set(true); + }, metric, clock, 0.2).reindex(); reindexing = reindexing.with(music, Status.ready(clock.instant()).running().failed(clock.instant(), "Error")); assertEquals(reindexing, database.readReindexing("cluster")); assertTrue(shutDown.get(), "Session was shut down"); // Document type is ignored in next run, as it has failed fatally. - new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(30)), database, failIfCalled, metric, clock).reindex(); + new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(30)), database, failIfCalled, metric, clock, 0.2).reindex(); assertEquals(reindexing, database.readReindexing("cluster")); } diff --git a/config-model-api/src/main/java/com/yahoo/config/model/api/ModelContext.java b/config-model-api/src/main/java/com/yahoo/config/model/api/ModelContext.java index e940bf5ff04..381f0ace69e 100644 --- a/config-model-api/src/main/java/com/yahoo/config/model/api/ModelContext.java +++ b/config-model-api/src/main/java/com/yahoo/config/model/api/ModelContext.java @@ -65,6 +65,7 @@ public interface ModelContext { */ interface FeatureFlags { @ModelFeatureFlag(owners = {"bjorncs", "jonmv"}) default boolean enableAutomaticReindexing() { return false; } + @ModelFeatureFlag(owners = {"bjorncs", "jonmv"}) default double reindexerWindowSizeIncrement() { return 0.2; } @ModelFeatureFlag(owners = {"baldersheim"}, comment = "Revisit in May or June 2020") default double defaultTermwiseLimit() { throw new UnsupportedOperationException("TODO specify default value"); } @ModelFeatureFlag(owners = {"vekterli"}) default boolean useThreePhaseUpdates() { throw new UnsupportedOperationException("TODO specify default value"); } @ModelFeatureFlag(owners = {"geirst"}, comment = "Remove on 7.XXX when this is default on") default boolean useDirectStorageApiRpc() { throw new UnsupportedOperationException("TODO specify default value"); } diff --git a/config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ClusterControllerContainer.java b/config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ClusterControllerContainer.java index b128cc5a20b..14fbeb17aaf 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ClusterControllerContainer.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ClusterControllerContainer.java @@ -124,17 +124,15 @@ public class ClusterControllerContainer extends Container implements } private void configureReindexing() { - if (reindexingContext().reindexing().enabled()) { - addFileBundle(REINDEXING_CONTROLLER_BUNDLE.getName()); - addComponent(new SimpleComponent(DocumentAccessProvider.class.getName())); - addComponent("reindexing-maintainer", - "ai.vespa.reindexing.ReindexingMaintainer", - REINDEXING_CONTROLLER_BUNDLE); - addHandler("reindexing-status", - "ai.vespa.reindexing.http.ReindexingV1ApiHandler", - "/reindexing/v1/*", - REINDEXING_CONTROLLER_BUNDLE); - } + addFileBundle(REINDEXING_CONTROLLER_BUNDLE.getName()); + addComponent(new SimpleComponent(DocumentAccessProvider.class.getName())); + addComponent("reindexing-maintainer", + "ai.vespa.reindexing.ReindexingMaintainer", + REINDEXING_CONTROLLER_BUNDLE); + addHandler("reindexing-status", + "ai.vespa.reindexing.http.ReindexingV1ApiHandler", + "/reindexing/v1/*", + REINDEXING_CONTROLLER_BUNDLE); } @@ -151,9 +149,13 @@ public class ClusterControllerContainer extends Container implements @Override public void getConfig(ReindexingConfig.Builder builder) { ReindexingContext ctx = reindexingContext(); - if (!ctx.reindexing().enabled()) return; + if (!ctx.reindexing().enabled()) { + builder.enabled(false); + return; + } builder.enabled(ctx.reindexing().enabled()); + builder.windowSizeIncrement(ctx.windowSizeIncrement()); for (String clusterId : ctx.clusterIds()) { ReindexingConfig.Clusters.Builder clusterBuilder = new ReindexingConfig.Clusters.Builder(); for (NewDocumentType type : ctx.documentTypesForCluster(clusterId)) { diff --git a/config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ClusterControllerContainerCluster.java b/config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ClusterControllerContainerCluster.java index 63fc0b4515f..3fe6ce3ff27 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ClusterControllerContainerCluster.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ClusterControllerContainerCluster.java @@ -35,7 +35,7 @@ public class ClusterControllerContainerCluster extends ContainerCluster<ClusterC Reindexing reindexing = deployState.featureFlags().enableAutomaticReindexing() ? deployState.reindexing().orElse(Reindexing.DISABLED_INSTANCE) : Reindexing.DISABLED_INSTANCE; - return new ReindexingContext(reindexing); + return new ReindexingContext(reindexing, deployState.featureFlags().reindexerWindowSizeIncrement()); } } diff --git a/config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ReindexingContext.java b/config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ReindexingContext.java index 712498f78cb..7380b950fb2 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ReindexingContext.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ReindexingContext.java @@ -21,9 +21,11 @@ public class ReindexingContext { private final Object monitor = new Object(); private final Map<String, Set<NewDocumentType>> documentTypesPerCluster = new HashMap<>(); private final Reindexing reindexing; + private final double windowSizeIncrement; - public ReindexingContext(Reindexing reindexing) { + public ReindexingContext(Reindexing reindexing, double windowSizeIncrement) { this.reindexing = Objects.requireNonNull(reindexing); + this.windowSizeIncrement = windowSizeIncrement; } public void addDocumentType(String clusterId, NewDocumentType type) { @@ -46,4 +48,7 @@ public class ReindexingContext { } public Reindexing reindexing() { return reindexing; } + + public double windowSizeIncrement() { return windowSizeIncrement; } + } diff --git a/configdefinitions/src/vespa/reindexing.def b/configdefinitions/src/vespa/reindexing.def index e020aec3f65..93dc767fed0 100644 --- a/configdefinitions/src/vespa/reindexing.def +++ b/configdefinitions/src/vespa/reindexing.def @@ -6,13 +6,16 @@ namespace=vespa.config.content.reindexing # Whether reindexing should run at all enabled bool default=false -# TODO jonmv: remove after 7.310 is gone +# TODO jonmv: remove after 7.324 is gone # The name of the content cluster to reindex documents from clusterName string default="" -# TODO jonmv: remove after 7.310 is gone +# TODO jonmv: remove after 7.324 is gone # Epoch millis after which latest reprocessing may begin, per document type status{}.readyAtMillis long # Epoch millis after which latest reprocessing may begin, per document type, per cluster clusters{}.documentTypes{}.readyAtMillis long + +# Window size increment used for the dynamic throttling policy of the reindexing visitor session +windowSizeIncrement double default=0.2 diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java b/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java index 9f3ae86b2cf..2d96c92bd88 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java @@ -149,6 +149,7 @@ public class ModelContextImpl implements ModelContext { public static class FeatureFlags implements ModelContext.FeatureFlags { private final boolean enableAutomaticReindexing; + private final double reindexerWindowSizeIncrement; private final double defaultTermwiseLimit; private final boolean useThreePhaseUpdates; private final boolean useDirectStorageApiRpc; @@ -167,24 +168,26 @@ public class ModelContextImpl implements ModelContext { public FeatureFlags(FlagSource source, ApplicationId appId) { this.enableAutomaticReindexing = flagValue(source, appId, Flags.ENABLE_AUTOMATIC_REINDEXING); - defaultTermwiseLimit = flagValue(source, appId, Flags.DEFAULT_TERM_WISE_LIMIT); - useThreePhaseUpdates = flagValue(source, appId, Flags.USE_THREE_PHASE_UPDATES); - useDirectStorageApiRpc = flagValue(source, appId, Flags.USE_DIRECT_STORAGE_API_RPC); - useFastValueTensorImplementation = flagValue(source, appId, Flags.USE_FAST_VALUE_TENSOR_IMPLEMENTATION); - feedSequencer = flagValue(source, appId, Flags.FEED_SEQUENCER_TYPE); - responseSequencer = flagValue(source, appId, Flags.RESPONSE_SEQUENCER_TYPE); - numResponseThreads = flagValue(source, appId, Flags.RESPONSE_NUM_THREADS); - skipCommunicationManagerThread = flagValue(source, appId, Flags.SKIP_COMMUNICATIONMANAGER_THREAD); - skipMbusRequestThread = flagValue(source, appId, Flags.SKIP_MBUS_REQUEST_THREAD); - skipMbusReplyThread = flagValue(source, appId, Flags.SKIP_MBUS_REPLY_THREAD); + this.reindexerWindowSizeIncrement = flagValue(source, appId, Flags.REINDEXER_WINDOW_SIZE_INCREMENT); + this.defaultTermwiseLimit = flagValue(source, appId, Flags.DEFAULT_TERM_WISE_LIMIT); + this.useThreePhaseUpdates = flagValue(source, appId, Flags.USE_THREE_PHASE_UPDATES); + this.useDirectStorageApiRpc = flagValue(source, appId, Flags.USE_DIRECT_STORAGE_API_RPC); + this.useFastValueTensorImplementation = flagValue(source, appId, Flags.USE_FAST_VALUE_TENSOR_IMPLEMENTATION); + this.feedSequencer = flagValue(source, appId, Flags.FEED_SEQUENCER_TYPE); + this.responseSequencer = flagValue(source, appId, Flags.RESPONSE_SEQUENCER_TYPE); + this.numResponseThreads = flagValue(source, appId, Flags.RESPONSE_NUM_THREADS); + this.skipCommunicationManagerThread = flagValue(source, appId, Flags.SKIP_COMMUNICATIONMANAGER_THREAD); + this.skipMbusRequestThread = flagValue(source, appId, Flags.SKIP_MBUS_REQUEST_THREAD); + this.skipMbusReplyThread = flagValue(source, appId, Flags.SKIP_MBUS_REPLY_THREAD); this.useAccessControlTlsHandshakeClientAuth = flagValue(source, appId, Flags.USE_ACCESS_CONTROL_CLIENT_AUTHENTICATION); - useAsyncMessageHandlingOnSchedule = flagValue(source, appId, Flags.USE_ASYNC_MESSAGE_HANDLING_ON_SCHEDULE); - contentNodeBucketDBStripeBits = flagValue(source, appId, Flags.CONTENT_NODE_BUCKET_DB_STRIPE_BITS); - mergeChunkSize = flagValue(source, appId, Flags.MERGE_CHUNK_SIZE); - feedConcurrency = flagValue(source, appId, Flags.FEED_CONCURRENCY); + this.useAsyncMessageHandlingOnSchedule = flagValue(source, appId, Flags.USE_ASYNC_MESSAGE_HANDLING_ON_SCHEDULE); + this.contentNodeBucketDBStripeBits = flagValue(source, appId, Flags.CONTENT_NODE_BUCKET_DB_STRIPE_BITS); + this.mergeChunkSize = flagValue(source, appId, Flags.MERGE_CHUNK_SIZE); + this.feedConcurrency = flagValue(source, appId, Flags.FEED_CONCURRENCY); } @Override public boolean enableAutomaticReindexing() { return enableAutomaticReindexing; } + @Override public double reindexerWindowSizeIncrement() { return reindexerWindowSizeIncrement; } @Override public double defaultTermwiseLimit() { return defaultTermwiseLimit; } @Override public boolean useThreePhaseUpdates() { return useThreePhaseUpdates; } @Override public boolean useDirectStorageApiRpc() { return useDirectStorageApiRpc; } diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/maintenance/ReindexingMaintainer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/maintenance/ReindexingMaintainer.java index cd3e1d97b28..22eb95261bd 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/maintenance/ReindexingMaintainer.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/maintenance/ReindexingMaintainer.java @@ -35,8 +35,6 @@ public class ReindexingMaintainer extends ConfigServerMaintainer { /** Timeout per service when getting config generations. */ private static final Duration timeout = Duration.ofSeconds(10); - static final Duration reindexingInterval = Duration.ofDays(28); - private final ConfigConvergenceChecker convergence; private final Clock clock; @@ -89,11 +87,6 @@ public class ReindexingMaintainer extends ConfigServerMaintainer { reindexing = reindexing.withReady(cluster.getKey(), pending.getKey(), now) .withoutPending(cluster.getKey(), pending.getKey()); - // Additionally, reindex the whole application with a fixed interval. - Instant nextPeriodicReindexing = reindexing.common().ready(); - while ((nextPeriodicReindexing = nextPeriodicReindexing.plus(reindexingInterval)).isBefore(now)) - reindexing = reindexing.withReady(nextPeriodicReindexing); // Deterministic timestamp. - return reindexing; } diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/maintenance/ReindexingMaintainerTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/maintenance/ReindexingMaintainerTest.java index b6177a11da8..d75b91f45e3 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/maintenance/ReindexingMaintainerTest.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/maintenance/ReindexingMaintainerTest.java @@ -30,20 +30,15 @@ public class ReindexingMaintainerTest { withNewReady(reindexing, () -> -1L, Instant.EPOCH)); // Status for (one, a) changes, but not (two, b). - - assertEquals(reindexing.withReady("one", "a", Instant.EPOCH) - .withoutPending("one", "a"), - withNewReady(reindexing, () -> 19L, Instant.EPOCH)); - - Instant later = Instant.EPOCH.plus(ReindexingMaintainer.reindexingInterval.multipliedBy(3)); + Instant later = Instant.ofEpochMilli(3 << 10); assertEquals(reindexing.withoutPending("one", "a") // Converged, no longer pending. - .withReady(later), // Had EPOCH as previous, so is updated, overwriting all more specific status. - withNewReady(reindexing, () -> 19L, later.plusMillis(1))); + .withReady("one", "a", later), // Converged, now ready. + withNewReady(reindexing, () -> 19L, later)); assertEquals(reindexing.withoutPending("one", "a") // Converged, no longer pending. - .withoutPending("two", "b") // Converged, no LOnger pending. - .withReady(later), // Had EPOCH as previous, so is updated, overwriting all more specific status. - withNewReady(reindexing, () -> 20L, later.plusMillis(1))); + .withoutPending("two", "b") // Converged, no Longer pending. + .withReady(later), // Outsider calls withReady(later), overriding more specific status. + withNewReady(reindexing, () -> 20L, later).withReady(later)); // Verify generation supplier isn't called when no pending document types. withNewReady(reindexing.withoutPending("one", "a").withoutPending("two", "b"), diff --git a/eval/src/tests/tensor/instruction_benchmark/.gitignore b/eval/src/tests/tensor/instruction_benchmark/.gitignore index dc5c408cf29..5b3eab59ff6 100644 --- a/eval/src/tests/tensor/instruction_benchmark/.gitignore +++ b/eval/src/tests/tensor/instruction_benchmark/.gitignore @@ -1 +1,3 @@ vespa-tensor-instructions-benchmark +/result.json +/ghost.json diff --git a/eval/src/tests/tensor/instruction_benchmark/instruction_benchmark.cpp b/eval/src/tests/tensor/instruction_benchmark/instruction_benchmark.cpp index 2beaacb7723..aa1da07bc91 100644 --- a/eval/src/tests/tensor/instruction_benchmark/instruction_benchmark.cpp +++ b/eval/src/tests/tensor/instruction_benchmark/instruction_benchmark.cpp @@ -39,6 +39,10 @@ #include <vespa/vespalib/objects/nbostream.h> #include <vespa/vespalib/util/stash.h> #include <vespa/vespalib/gtest/gtest.h> +#include <vespa/vespalib/io/mapped_file_input.h> +#include <vespa/vespalib/io/fileutil.h> +#include <vespa/vespalib/data/slime/slime.h> +#include <vespa/vespalib/data/smart_buffer.h> #include <optional> #include <algorithm> @@ -47,6 +51,8 @@ using namespace vespalib::eval; using namespace vespalib::eval::instruction; using vespalib::make_string_short::fmt; +using vespalib::slime::JsonFormat; + using Instruction = InterpretedFunction::Instruction; using EvalSingle = InterpretedFunction::EvalSingle; @@ -255,6 +261,8 @@ Impl optimized_fast_value_impl(0, " Optimized FastValue", " Impl fast_value_impl(1, " FastValue", " FastV", FastValueBuilderFactory::get(), false); Impl simple_value_impl(2, " SimpleValue", " SimpleV", SimpleValueBuilderFactory::get(), false); vespalib::string short_header("--------"); +vespalib::string ghost_name(" loaded from ghost.json"); +vespalib::string ghost_short_name(" ghost"); constexpr double budget = 5.0; constexpr double best_limit = 0.95; // everything within 95% of best performance gets a star @@ -265,6 +273,10 @@ std::vector<CREF<Impl>> impl_list = {simple_value_impl, optimized_fast_value_impl, fast_value_impl}; +Slime ghost; // loaded from 'ghost.json' +bool has_ghost = false; +Slime prod_result; // saved to 'result.json' + //----------------------------------------------------------------------------- struct BenchmarkHeader { @@ -274,6 +286,9 @@ struct BenchmarkHeader { for (const Impl &impl: impl_list) { short_names[impl.order] = impl.short_name; } + if (has_ghost) { + short_names.push_back(ghost_short_name); + } } void print_header(const vespalib::string &desc) const { for (const auto &name: short_names) { @@ -299,12 +314,17 @@ struct BenchmarkResult { ~BenchmarkResult(); void sample(size_t order, double time) { relative_perf[order] = time; - if (order == 1) { - if (ref_time.has_value()) { - ref_time = std::min(ref_time.value(), time); - } else { - ref_time = time; + if (order == 0) { + prod_result.get().setDouble(desc, time); + if (has_ghost && (relative_perf.size() == impl_list.size())) { + double ghost_time = ghost.get()[desc].asDouble(); + size_t ghost_order = relative_perf.size(); + fprintf(stderr, " %s(%s): %10.3f us\n", ghost_name.c_str(), ghost_short_name.c_str(), ghost_time); + relative_perf.resize(ghost_order + 1); + return sample(ghost_order, ghost_time); } + } else if (order == 1) { + ref_time = time; } } void normalize() { @@ -332,6 +352,23 @@ std::vector<BenchmarkResult> benchmark_results; //----------------------------------------------------------------------------- +void load_ghost(const vespalib::string &file_name) { + MappedFileInput input(file_name); + has_ghost = JsonFormat::decode(input, ghost); +} + +void save_result(const vespalib::string &file_name) { + SmartBuffer output(4096); + JsonFormat::encode(prod_result, output, false); + Memory memory = output.obtain(); + File file(file_name); + file.open(File::CREATE | File::TRUNC); + file.write(memory.data, memory.size, 0); + file.close(); +} + +//----------------------------------------------------------------------------- + struct MyParam : LazyParams { Value::UP my_value; MyParam() : my_value() {} @@ -432,8 +469,8 @@ void benchmark(const vespalib::string &desc, const std::vector<EvalOp::UP> &list } for (const auto &eval: list) { double time = eval->estimate_cost_us(loop_cnt[eval->impl.order], loop_cnt[1]); - result.sample(eval->impl.order, time); fprintf(stderr, " %s(%s): %10.3f us\n", eval->impl.name.c_str(), eval->impl.short_name.c_str(), time); + result.sample(eval->impl.order, time); } result.normalize(); benchmark_results.push_back(result); @@ -673,9 +710,9 @@ void benchmark_encode_decode(const vespalib::string &desc, const TensorSpec &pro } double encode_us = encode_timer.min_time() * 1000.0 * 1000.0 / double(loop_cnt); double decode_us = decode_timer.min_time() * 1000.0 * 1000.0 / double(loop_cnt); - fprintf(stderr, " %s (%s) <encode>: %10.3f us\n", impl.name.c_str(), impl.short_name.c_str(), encode_us); - fprintf(stderr, " %s (%s) <decode>: %10.3f us\n", impl.name.c_str(), impl.short_name.c_str(), decode_us); + fprintf(stderr, " %s(%s): %10.3f us <encode>\n", impl.name.c_str(), impl.short_name.c_str(), encode_us); encode_result.sample(impl.order, encode_us); + fprintf(stderr, " %s(%s): %10.3f us <decode>\n", impl.name.c_str(), impl.short_name.c_str(), decode_us); decode_result.sample(impl.order, decode_us); } encode_result.normalize(); @@ -1076,16 +1113,26 @@ void print_summary() { } int main(int argc, char **argv) { + prod_result.setObject(); + load_ghost("ghost.json"); const std::string run_only_prod_option = "--limit-implementations"; + const std::string ghost_mode_option = "--ghost-mode"; if ((argc > 1) && (argv[1] == run_only_prod_option )) { impl_list.clear(); impl_list.push_back(optimized_fast_value_impl); impl_list.push_back(fast_value_impl); ++argv; --argc; + } else if ((argc > 1) && (argv[1] == ghost_mode_option )) { + impl_list.clear(); + impl_list.push_back(optimized_fast_value_impl); + has_ghost = true; + ++argv; + --argc; } ::testing::InitGoogleTest(&argc, argv); int result = RUN_ALL_TESTS(); + save_result("result.json"); print_summary(); return result; } diff --git a/flags/src/main/java/com/yahoo/vespa/flags/Flags.java b/flags/src/main/java/com/yahoo/vespa/flags/Flags.java index 7c00277d8dd..a53c0bd6a78 100644 --- a/flags/src/main/java/com/yahoo/vespa/flags/Flags.java +++ b/flags/src/main/java/com/yahoo/vespa/flags/Flags.java @@ -272,6 +272,13 @@ public class Flags { "Takes effect on next internal redeployment", APPLICATION_ID); + public static final UnboundDoubleFlag REINDEXER_WINDOW_SIZE_INCREMENT = defineDoubleFlag( + "reindexer-window-size-increment", 0.2, + List.of("jonmv"), "2020-12-09", "2021-02-07", + "Window size increment for dynamic throttle policy used by reindexer visitor session — more means more aggressive reindexing", + "Takes effect on (re)deployment", + APPLICATION_ID); + public static final UnboundBooleanFlag USE_POWER_OF_TWO_CHOICES_LOAD_BALANCING = defineFeatureFlag( "use-power-of-two-choices-load-balancing", false, List.of("tokle"), "2020-12-02", "2021-02-01", diff --git a/storage/src/tests/common/teststorageapp.h b/storage/src/tests/common/teststorageapp.h index ce76a9f98c9..4a49b5d7953 100644 --- a/storage/src/tests/common/teststorageapp.h +++ b/storage/src/tests/common/teststorageapp.h @@ -27,7 +27,6 @@ #include <vespa/storage/common/nodestateupdater.h> #include <vespa/storage/frameworkimpl/component/distributorcomponentregisterimpl.h> #include <vespa/storage/frameworkimpl/component/servicelayercomponentregisterimpl.h> -#include <vespa/storage/storageserver/framework.h> #include <vespa/storageframework/defaultimplementation/clock/realclock.h> #include <vespa/storageframework/defaultimplementation/component/testcomponentregister.h> #include <vespa/vespalib/util/sequencedtaskexecutor.h> diff --git a/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp b/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp index e8401914abf..320c55f9998 100644 --- a/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp +++ b/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp @@ -8,6 +8,8 @@ #include <vespa/storage/distributor/operations/external/visitoroperation.h> #include <vespa/storage/distributor/distributor.h> #include <vespa/storage/distributor/distributormetricsset.h> +#include <vespa/storage/distributor/pendingmessagetracker.h> +#include <vespa/storageapi/message/bucket.h> #include <vespa/storageapi/message/persistence.h> #include <vespa/storageapi/message/visitor.h> #include <tests/distributor/distributortestutil.h> @@ -25,6 +27,11 @@ Bucket default_bucket(BucketId id) { return Bucket(document::FixedBucketSpaces::default_space(), id); } +api::StorageMessageAddress make_storage_address(uint16_t node) { + static vespalib::string _storage("storage"); + return {&_storage, lib::NodeType::STORAGE, node}; +} + } struct ReadForWriteVisitorOperationStarterTest : Test, DistributorTestUtil { @@ -46,6 +53,7 @@ struct ReadForWriteVisitorOperationStarterTest : Test, DistributorTestUtil { createLinks(); setupDistributor(1, 1, "version:1 distributor:1 storage:1"); _op_owner = std::make_unique<OperationOwner>(_sender, getClock()); + _sender.setPendingMessageTracker(getDistributor().getPendingMessageTracker()); addNodesToBucketDB(_sub_bucket, "0=1/2/3/t"); } @@ -81,6 +89,7 @@ struct ReadForWriteVisitorOperationStarterTest : Test, DistributorTestUtil { TEST_F(ReadForWriteVisitorOperationStarterTest, visitor_that_fails_precondition_checks_is_immediately_failed) { auto op = create_rfw_op(create_nested_visitor_op(false)); _op_owner->start(op, OperationStarter::Priority(120)); + ASSERT_EQ("", _sender.getCommands(true)); EXPECT_EQ("CreateVisitorReply(last=BucketId(0x0000000000000000)) " "ReturnCode(ILLEGAL_PARAMETERS, No buckets in CreateVisitorCommand for visitor 'foo')", _sender.getLastReply()); @@ -92,6 +101,21 @@ TEST_F(ReadForWriteVisitorOperationStarterTest, visitor_immediately_started_if_n ASSERT_EQ("Visitor Create => 0", _sender.getCommands(true)); } +TEST_F(ReadForWriteVisitorOperationStarterTest, visitor_is_bounced_if_merge_pending_for_bucket) { + auto op = create_rfw_op(create_nested_visitor_op(true)); + std::vector<api::MergeBucketCommand::Node> nodes({{0, false}, {1, false}}); + auto merge = std::make_shared<api::MergeBucketCommand>(default_bucket(_sub_bucket), + std::move(nodes), + api::Timestamp(123456)); + merge->setAddress(make_storage_address(0)); + getDistributor().getPendingMessageTracker().insert(merge); + _op_owner->start(op, OperationStarter::Priority(120)); + ASSERT_EQ("", _sender.getCommands(true)); + EXPECT_EQ("CreateVisitorReply(last=BucketId(0x0000000000000000)) " + "ReturnCode(BUSY, A merge operation is pending for this bucket)", + _sender.getLastReply()); +} + namespace { struct ConcurrentMutationFixture { diff --git a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp index 7b934af4bdd..5be6f310c71 100644 --- a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp +++ b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp @@ -169,12 +169,11 @@ namespace { Count() : docs(0), bytes(0), buckets(0), active(0), ready(0) {} }; - uint16_t diskCount; - std::vector<Count> disk; + Count count; uint32_t lowestUsedBit; - explicit MetricsUpdater(uint16_t diskCnt) - : diskCount(diskCnt), disk(diskCnt), lowestUsedBit(58) {} + MetricsUpdater() + : count(), lowestUsedBit(58) {} void operator()(document::BucketId::Type bucketId, const StorBucketDatabase::Entry& data) @@ -183,15 +182,15 @@ namespace { document::BucketId::keyToBucketId(bucketId)); if (data.valid()) { - ++disk[0].buckets; + ++count.buckets; if (data.getBucketInfo().isActive()) { - ++disk[0].active; + ++count.active; } if (data.getBucketInfo().isReady()) { - ++disk[0].ready; + ++count.ready; } - disk[0].docs += data.getBucketInfo().getDocumentCount(); - disk[0].bytes += data.getBucketInfo().getTotalDocumentSize(); + count.docs += data.getBucketInfo().getDocumentCount(); + count.bytes += data.getBucketInfo().getTotalDocumentSize(); if (bucket.getUsedBits() < lowestUsedBit) { lowestUsedBit = bucket.getUsedBits(); @@ -200,16 +199,13 @@ namespace { }; void add(const MetricsUpdater& rhs) { - assert(diskCount == rhs.diskCount); - for (uint16_t i = 0; i < diskCount; i++) { - auto& d = disk[i]; - auto& s = rhs.disk[i]; - d.buckets += s.buckets; - d.docs += s.docs; - d.bytes += s.bytes; - d.ready += s.ready; - d.active += s.active; - } + auto& d = count; + auto& s = rhs.count; + d.buckets += s.buckets; + d.docs += s.docs; + d.bytes += s.bytes; + d.ready += s.ready; + d.active += s.active; } }; @@ -230,29 +226,26 @@ BucketManager::updateMetrics(bool updateDocCount) updateDocCount ? "" : ", minusedbits only", _doneInitialized ? "" : ", server is not done initializing"); - const uint16_t diskCount = 1; if (!updateDocCount || _doneInitialized) { - MetricsUpdater total(diskCount); + MetricsUpdater total; for (auto& space : _component.getBucketSpaceRepo()) { - MetricsUpdater m(diskCount); + MetricsUpdater m; auto guard = space.second->bucketDatabase().acquire_read_guard(); guard->for_each(std::ref(m)); total.add(m); if (updateDocCount) { auto bm = _metrics->bucket_spaces.find(space.first); assert(bm != _metrics->bucket_spaces.end()); - // No system with multiple bucket spaces has more than 1 "disk" - // TODO remove disk concept entirely as it's a VDS relic - bm->second->buckets_total.set(m.disk[0].buckets); - bm->second->docs.set(m.disk[0].docs); - bm->second->bytes.set(m.disk[0].bytes); - bm->second->active_buckets.set(m.disk[0].active); - bm->second->ready_buckets.set(m.disk[0].ready); + bm->second->buckets_total.set(m.count.buckets); + bm->second->docs.set(m.count.docs); + bm->second->bytes.set(m.count.bytes); + bm->second->active_buckets.set(m.count.active); + bm->second->ready_buckets.set(m.count.ready); } } if (updateDocCount) { auto & dest = *_metrics->disk; - const auto & src = total.disk[0]; + const auto & src = total.count; dest.buckets.addValue(src.buckets); dest.docs.addValue(src.docs); dest.bytes.addValue(src.bytes); @@ -273,7 +266,7 @@ void BucketManager::update_bucket_db_memory_usage_metrics() { void BucketManager::updateMinUsedBits() { - MetricsUpdater m(1); + MetricsUpdater m; _component.getBucketSpaceRepo().for_each_bucket(std::ref(m)); // When going through to get sizes, we also record min bits MinimumUsedBitsTracker& bitTracker(_component.getMinUsedBitsTracker()); diff --git a/storage/src/vespa/storage/common/CMakeLists.txt b/storage/src/vespa/storage/common/CMakeLists.txt index efbd62a45a0..741d97f78ef 100644 --- a/storage/src/vespa/storage/common/CMakeLists.txt +++ b/storage/src/vespa/storage/common/CMakeLists.txt @@ -2,7 +2,6 @@ vespa_add_library(storage_common OBJECT SOURCES bucketmessages.cpp - bucketoperationlogger.cpp content_bucket_space.cpp content_bucket_space_repo.cpp distributorcomponent.cpp diff --git a/storage/src/vespa/storage/common/bucketmessages.h b/storage/src/vespa/storage/common/bucketmessages.h index 428b5268293..6cc65cc1501 100644 --- a/storage/src/vespa/storage/common/bucketmessages.h +++ b/storage/src/vespa/storage/common/bucketmessages.h @@ -11,7 +11,7 @@ namespace storage { * @class ReadBucketList * @ingroup common * - * @brief List buckets existing on a partition. + * @brief List buckets existing in a bucket space. */ class ReadBucketList : public api::InternalCommand { document::BucketSpace _bucketSpace; diff --git a/storage/src/vespa/storage/common/bucketoperationlogger.cpp b/storage/src/vespa/storage/common/bucketoperationlogger.cpp deleted file mode 100644 index 905b704409f..00000000000 --- a/storage/src/vespa/storage/common/bucketoperationlogger.cpp +++ /dev/null @@ -1,330 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include "bucketoperationlogger.h" -#include <vespa/storage/bucketdb/storbucketdb.h> -#include <vespa/storage/bucketdb/bucketcopy.h> - -#include <vespa/storageapi/buckets/bucketinfo.h> -#include <vespa/storageframework/defaultimplementation/clock/realclock.h> -#include <vespa/vespalib/util/backtrace.h> -#include <vespa/vespalib/stllike/asciistream.h> - -#ifdef ENABLE_BUCKET_OPERATION_LOGGING -#include <vespa/log/log.h> -LOG_SETUP(".debuglogger"); - -namespace storage { - -namespace debug { - -BucketOperationLogger opLogger; - -void -BucketOperationLogger::log(const document::BucketId& id, - const vespalib::string& text, - bool requireLock, - State::LockUpdate lockUpdate) -{ - LogEntry entry; - framework::defaultimplementation::RealClock rclock; - entry._frameCount = vespalib::getStackTraceFrames(entry._stackFrames, MAX_STACK_FRAMES); - entry._text = text; - entry._timestamp = rclock.getTimeInMicros(); - entry._threadId = FastOS_Thread::GetCurrentThreadId() & 0xffff; - uint32_t lockedByThread = 0; - bool hasError = false; - - { - std::lock_guard<std:.mutex> guard(_logLock); - BucketMapType::iterator i = _bucketMap.lower_bound(id); - if (i != _bucketMap.end() && i->first == id) { - if (i->second._history.size() >= MAX_ENTRIES) { - i->second._history.pop_front(); - } - i->second._history.push_back(entry); - if (lockUpdate == State::BUCKET_LOCKED) { - if (i->second._lockedByThread != 0) { - LOG(warning, "Attempting to acquire lock, but lock " - "is already held by thread %u", i->second._lockedByThread); - hasError = true; - } - i->second._lockedByThread = entry._threadId; - } - lockedByThread = i->second._lockedByThread; - if (lockUpdate == State::BUCKET_UNLOCKED) { - if (i->second._lockedByThread == 0) { - LOG(warning, "Attempting to release lock, but lock " - "is not held"); - hasError = true; - } - i->second._lockedByThread = 0; - } - } else { - State addState; - addState._lockedByThread = 0; - addState._history.push_back(entry); - if (lockUpdate == State::BUCKET_LOCKED) { - addState._lockedByThread = entry._threadId; - } else if (lockUpdate == State::BUCKET_UNLOCKED) { - LOG(warning, "Attempting to release lock, but lock " - "is not held"); - hasError = true; - } - _bucketMap.insert(i, BucketMapType::value_type(id, addState)); - } - } - - if (requireLock && !lockedByThread) { - LOG(warning, "Operation '%s' requires lock, but lock is " - "not registered as held", text.c_str()); - hasError = true; - } - if (hasError) { - LOG(warning, "%s", getHistory(id).c_str()); - } -} - -namespace { - -// Must hold logger lock -template <typename LineHandler> -void -processHistory(const BucketOperationLogger& opLogger, - const document::BucketId& id, LineHandler& handler) -{ - BucketOperationLogger::BucketMapType::const_iterator i( - opLogger._bucketMap.find(id)); - if (i == opLogger._bucketMap.end()) { - vespalib::asciistream ss; - ss << "No history recorded for bucket '" - << id.toString() << "'"; - handler(ss.str()); - return; - } - - { - vespalib::asciistream ss; - ss << "Showing last " << i->second._history.size() << " operations on " - << "bucket " << id.toString() << " (newest first):"; - handler(ss.str()); - } - for (BucketOperationLogger::State::LogEntryListType::const_reverse_iterator j( - i->second._history.rbegin()), end(i->second._history.rend()); - j != end; ++j) - { - vespalib::asciistream ss; - ss << storage::framework::getTimeString( - j->_timestamp.getTime(), - storage::framework::DATETIME_WITH_MICROS) - << " " << j->_threadId << " " - << j->_text << ". " - << vespalib::getStackTrace(1, j->_stackFrames, j->_frameCount); - handler(ss.str()); - } -} - -struct LogWarnAppender -{ - void operator()(const vespalib::string& line) - { - LOG(warning, "%s", line.c_str()); - } -}; - -struct LogStringBuilder -{ - vespalib::asciistream ss; - void operator()(const vespalib::string& line) - { - ss << line << "\n"; - } -}; - -} - -void -BucketOperationLogger::dumpHistoryToLog(const document::BucketId& id) const -{ - LogWarnAppender handler; - std::lock_guard<std::mutex> guard(_logLock); - processHistory(*this, id, handler); -} - -vespalib::string -BucketOperationLogger::getHistory(const document::BucketId& id) const -{ - LogStringBuilder handler; - std::lock_guard<std::mutex> lock(_logLock); - processHistory(*this, id, handler); - return handler.ss.str(); -} - -vespalib::string -BucketOperationLogger::searchBucketHistories( - const vespalib::string& sub, - const vespalib::string& urlPrefix) const -{ - vespalib::asciistream ss; - ss << "<ul>\n"; - // This may block for a while... Assuming such searches run when system - // is otherwise idle. - std::lock_guard<std::mutex> guard(_logLock); - for (BucketMapType::const_iterator - bIt(_bucketMap.begin()), bEnd(_bucketMap.end()); - bIt != bEnd; ++bIt) - { - for (State::LogEntryListType::const_iterator - sIt(bIt->second._history.begin()), - sEnd(bIt->second._history.end()); - sIt != sEnd; ++sIt) - { - if (sIt->_text.find(sub.c_str()) != vespalib::string::npos) { - ss << "<li><a href=\"" << urlPrefix - << "0x" << vespalib::hex << bIt->first.getId() - << vespalib::dec << "\">" << bIt->first.toString() - << "</a>:\n"; - ss << sIt->_text << "</li>\n"; - } - } - } - ss << "</ul>\n"; - return ss.str(); -} - -BucketOperationLogger& -BucketOperationLogger::getInstance() -{ - return opLogger; -} - -// Storage node -void logBucketDbInsert(uint64_t key, const bucketdb::StorageBucketInfo& entry) -{ - LOG_BUCKET_OPERATION_NO_LOCK( - document::BucketId(document::BucketId::keyToBucketId(key)), - vespalib::make_string( - "bucketdb insert Bucket(crc=%x, docs=%u, size=%u, " - "metacount=%u, usedfilesize=%u, ready=%s, " - "active=%s, lastModified=%zu) disk=%u", - entry.info.getChecksum(), - entry.info.getDocumentCount(), - entry.info.getTotalDocumentSize(), - entry.info.getMetaCount(), - entry.info.getUsedFileSize(), - (entry.info.isReady() ? "true" : "false"), - (entry.info.isActive() ? "true" : "false"), - entry.info.getLastModified(), - entry.disk)); -} - -void logBucketDbErase(uint64_t key, const TypeTag<bucketdb::StorageBucketInfo>&) -{ - LOG_BUCKET_OPERATION_NO_LOCK( - document::BucketId(document::BucketId::keyToBucketId(key)), - "bucketdb erase"); -} - -// Distributor -void -checkAllConsistentNodesImpliesTrusted( - const document::BucketId& bucket, - const BucketInfo& entry) -{ - // If all copies are consistent, they should also be trusted - if (entry.validAndConsistent() && entry.getNodeCount() > 1) { - for (std::size_t i = 0; i < entry.getNodeCount(); ++i) { - const BucketCopy& copy = entry.getNodeRef(i); - if (copy.trusted() == false) { - LOG(warning, "Bucket DB entry %s for %s is consistent, but " - "contains non-trusted copy %s", entry.toString().c_str(), - bucket.toString().c_str(), copy.toString().c_str()); - DUMP_LOGGED_BUCKET_OPERATIONS(bucket); - } - } - } -} - -std::size_t -firstTrustedNode(const BucketInfo& entry) -{ - for (std::size_t i = 0; i < entry.getNodeCount(); ++i) { - const distributor::BucketCopy& copy = entry.getNodeRef(i); - if (copy.trusted()) { - return i; - } - } - return std::numeric_limits<std::size_t>::max(); -} - -void -checkNotInSyncImpliesNotTrusted( - const document::BucketId& bucket, - const BucketInfo& entry) -{ - // If there are copies out of sync, different copies should not - // be set to trusted - std::size_t trustedNode = firstTrustedNode(entry); - if (trustedNode != std::numeric_limits<std::size_t>::max()) { - // Ensure all other trusted copies match the metadata of the - // first trusted bucket - const BucketCopy& trustedCopy = entry.getNodeRef(trustedNode); - for (std::size_t i = 0; i < entry.getNodeCount(); ++i) { - if (i == trustedNode) { - continue; - } - const BucketCopy& copy = entry.getNodeRef(i); - const api::BucketInfo& copyInfo = copy.getBucketInfo(); - const api::BucketInfo& trustedInfo = trustedCopy.getBucketInfo(); - if (copy.trusted() - && ((copyInfo.getChecksum() != trustedInfo.getChecksum()))) - //|| (copyInfo.getTotalDocumentSize() != trustedInfo.getTotalDocumentSize()))) - { - LOG(warning, "Bucket DB entry %s for %s has trusted node copy " - "with differing metadata %s", entry.toString().c_str(), - bucket.toString().c_str(), copy.toString().c_str()); - DUMP_LOGGED_BUCKET_OPERATIONS(bucket); - } - } - } -} - -void -checkInvalidImpliesNotTrusted( - const document::BucketId& bucket, - const BucketInfo& entry) -{ - for (std::size_t i = 0; i < entry.getNodeCount(); ++i) { - const BucketCopy& copy = entry.getNodeRef(i); - if (!copy.valid() && copy.trusted()) { - LOG(warning, "Bucket DB entry %s for %s has invalid copy %s " - "marked as trusted", entry.toString().c_str(), - bucket.toString().c_str(), copy.toString().c_str()); - DUMP_LOGGED_BUCKET_OPERATIONS(bucket); - } - } -} - -void -logBucketDbInsert(uint64_t key, const BucketInfo& entry) -{ - document::BucketId bucket(document::BucketId::keyToBucketId(key)); - LOG_BUCKET_OPERATION_NO_LOCK( - bucket, vespalib::make_string( - "bucketdb insert of %s", entry.toString().c_str())); - // Do some sanity checking of the inserted entry - checkAllConsistentNodesImpliesTrusted(bucket, entry); - checkNotInSyncImpliesNotTrusted(bucket, entry); - checkInvalidImpliesNotTrusted(bucket, entry); -} - -void -logBucketDbErase(uint64_t key, const TypeTag<BucketInfo>&) -{ - document::BucketId bucket(document::BucketId::keyToBucketId(key)); - LOG_BUCKET_OPERATION_NO_LOCK(bucket, "bucketdb erase"); -} - -} // namespace debug - -} // namespace storage - -#endif // ENABLE_BUCKET_OPERATION_LOGGING diff --git a/storage/src/vespa/storage/common/bucketoperationlogger.h b/storage/src/vespa/storage/common/bucketoperationlogger.h deleted file mode 100644 index af4b539a4c8..00000000000 --- a/storage/src/vespa/storage/common/bucketoperationlogger.h +++ /dev/null @@ -1,123 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#pragma once - -#include <vespa/vespalib/stllike/string.h> -#include <vespa/document/bucket/bucketid.h> -#include <map> -#include <list> -#include <mutex> - -/** - * Enable this to log most slotfile operations (such as all mutations) as - * well as common bucket operations such as splitting, joining and bucket db - * updates. Each log entry contains the stack frames for the logging callsite, - * a timestamp, the ID of the thread performing the operation as well as a - * message. The stack trace is cheaply acquired and does thus not affect runtime - * performance to a great degree. Expect some overhead from the logging itself - * since it requires a global mutex around the log state. - * - * All relevant bucket/slotfile operations are checked to ensure that the - * filestor lock is held during the operation and that the thread performing - * it is the same as the one that acquired the lock. - * - * Similarly, code has been added to distributor bucket database and ideal - * state handling to log these. - * - * In the case of an invariant violation (such as a locking bug), the last - * BUCKET_OPERATION_LOG_ENTRIES log entries will be dumped to the vespalog. - * Code may also dump the logged history for a bucket by calling - * DUMP_LOGGED_BUCKET_OPERATIONS(bucketid) - */ -//#define ENABLE_BUCKET_OPERATION_LOGGING -#define BUCKET_OPERATION_LOG_ENTRIES 40 - -#ifdef ENABLE_BUCKET_OPERATION_LOGGING -#define LOG_BUCKET_OPERATION_NO_LOCK(bucket, string) \ -debug::BucketOperationLogger::getInstance().log( \ - (bucket), (string), false) - -#define LOG_BUCKET_OPERATION(bucket, string) \ -debug::BucketOperationLogger::getInstance().log( \ - (bucket), (string), true) - -#define LOG_BUCKET_OPERATION_SPECIFY_LOCKED(bucket, string, require_locked) \ -debug::BucketOperationLogger::getInstance().log( \ - (bucket), (string), (require_locked)) - -#define LOG_BUCKET_OPERATION_SET_LOCK_STATE(bucket, string, require_locked, new_state) \ -debug::BucketOperationLogger::getInstance().log( \ - (bucket), (string), (require_locked), (new_state)) - -#define DUMP_LOGGED_BUCKET_OPERATIONS(bucket) \ - debug::BucketOperationLogger::getInstance().dumpHistoryToLog(bucket) - -namespace storage { - -// Debug stuff for tracking the last n operations to buckets -namespace debug { - -struct BucketOperationLogger -{ - static const std::size_t MAX_ENTRIES = BUCKET_OPERATION_LOG_ENTRIES; - static const std::size_t MAX_STACK_FRAMES = 25; - - struct LogEntry - { - void* _stackFrames[MAX_STACK_FRAMES]; - vespalib::string _text; - framework::MicroSecTime _timestamp; - int _frameCount; - int32_t _threadId; - }; - - struct State - { - typedef std::list<LogEntry> LogEntryListType; - enum LockUpdate - { - NO_UPDATE = 0, - BUCKET_LOCKED = 1, - BUCKET_UNLOCKED = 2 - }; - LogEntryListType _history; - uint32_t _lockedByThread; - }; - - typedef std::map<document::BucketId, State> BucketMapType; - - std::mutex _logLock; - BucketMapType _bucketMap; - - void log(const document::BucketId& id, - const vespalib::string& text, - bool requireLock = true, - State::LockUpdate update = State::NO_UPDATE); - - vespalib::string getHistory(const document::BucketId& id) const; - void dumpHistoryToLog(const document::BucketId& id) const; - //void dumpAllBucketHistoriesToFile(const vespalib::string& filename) const; - /** - * Search through all bucket history entry descriptions to find substring, - * creating a itemized list of buckets containing it as well as a preview. - * @param sub the exact substring to search for. - * @param urlPrefix the URL used for creating bucket links. - */ - vespalib::string searchBucketHistories(const vespalib::string& sub, - const vespalib::string& urlPrefix) const; - static BucketOperationLogger& getInstance(); -}; - -} - -} - -#else - -#define LOG_BUCKET_OPERATION_NO_LOCK(bucket, string) -#define LOG_BUCKET_OPERATION(bucket, string) -#define LOG_BUCKET_OPERATION_SPECIFY_LOCKED(bucket, string, require_locked) -#define DUMP_LOGGED_BUCKET_OPERATIONS(bucket) -#define LOG_BUCKET_OPERATION_SET_LOCK_STATE(bucket, string, require_locked, new_state) - -#endif - diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp index d7fa770ef12..a8c4e7c3544 100644 --- a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp +++ b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp @@ -8,7 +8,6 @@ #include "distributormetricsset.h" #include "simpleclusterinformation.h" #include <vespa/document/bucket/fixed_bucket_spaces.h> -#include <vespa/storage/common/bucketoperationlogger.h> #include <vespa/storageapi/message/persistence.h> #include <vespa/storageapi/message/removelocation.h> #include <vespa/vespalib/util/xmlstream.h> diff --git a/storage/src/vespa/storage/distributor/distributorcomponent.cpp b/storage/src/vespa/storage/distributor/distributorcomponent.cpp index dde5281a15f..86c98cc7b78 100644 --- a/storage/src/vespa/storage/distributor/distributorcomponent.cpp +++ b/storage/src/vespa/storage/distributor/distributorcomponent.cpp @@ -4,7 +4,6 @@ #include "distributor_bucket_space.h" #include "pendingmessagetracker.h" #include <vespa/document/select/parser.h> -#include <vespa/storage/common/bucketoperationlogger.h> #include <vespa/vdslib/state/cluster_state_bundle.h> diff --git a/storage/src/vespa/storage/distributor/operations/external/read_for_write_visitor_operation.cpp b/storage/src/vespa/storage/distributor/operations/external/read_for_write_visitor_operation.cpp index 1e87172e870..5ebf20138a4 100644 --- a/storage/src/vespa/storage/distributor/operations/external/read_for_write_visitor_operation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/read_for_write_visitor_operation.cpp @@ -2,6 +2,7 @@ #include "read_for_write_visitor_operation.h" #include "visitoroperation.h" +#include <vespa/storage/distributor/distributormessagesender.h> #include <vespa/storage/distributor/pendingmessagetracker.h> #include <vespa/storage/distributor/operationowner.h> #include <cassert> @@ -40,6 +41,11 @@ void ReadForWriteVisitorOperationStarter::onStart(DistributorMessageSender& send assert(_visitor_op->has_sent_reply()); return; } + if (bucket_has_pending_merge(*maybe_bucket, sender.getPendingMessageTracker())) { + LOG(debug, "A merge is pending for bucket %s, failing visitor", maybe_bucket->toString().c_str()); + _visitor_op->fail_with_merge_pending(sender); + return; + } auto bucket_handle = _operation_sequencer.try_acquire(*maybe_bucket); if (!bucket_handle.valid()) { LOG(debug, "An operation is already pending for bucket %s, failing visitor", @@ -71,4 +77,26 @@ void ReadForWriteVisitorOperationStarter::onReceive(DistributorMessageSender& se _visitor_op->onReceive(sender, msg); } +namespace { + +struct MergePendingChecker : PendingMessageTracker::Checker { + bool has_pending_merge = false; + bool check(uint32_t message_type, [[maybe_unused]] uint16_t node, [[maybe_unused]] uint8_t priority) override { + if (message_type == api::MessageType::MERGEBUCKET_ID) { + has_pending_merge = true; + } + return true; + } +}; + +} + +bool ReadForWriteVisitorOperationStarter::bucket_has_pending_merge(const document::Bucket& bucket, + const PendingMessageTracker& tracker) const { + MergePendingChecker merge_checker; + tracker.checkPendingMessages(bucket, merge_checker); + return merge_checker.has_pending_merge; +} + + } diff --git a/storage/src/vespa/storage/distributor/operations/external/read_for_write_visitor_operation.h b/storage/src/vespa/storage/distributor/operations/external/read_for_write_visitor_operation.h index 06b4f60307e..a6b414e6fb5 100644 --- a/storage/src/vespa/storage/distributor/operations/external/read_for_write_visitor_operation.h +++ b/storage/src/vespa/storage/distributor/operations/external/read_for_write_visitor_operation.h @@ -43,6 +43,8 @@ public: void onStart(DistributorMessageSender& sender) override; void onReceive(DistributorMessageSender& sender, const std::shared_ptr<api::StorageReply> & msg) override; +private: + bool bucket_has_pending_merge(const document::Bucket&, const PendingMessageTracker& tracker) const; }; } diff --git a/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp b/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp index 3d79aa176d7..5a8adb26cd8 100644 --- a/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp @@ -887,6 +887,13 @@ VisitorOperation::fail_with_bucket_already_locked(DistributorMessageSender& send sendReply(api::ReturnCode(api::ReturnCode::BUSY, "This bucket is already locked by another operation"), sender); } +void +VisitorOperation::fail_with_merge_pending(DistributorMessageSender& sender) +{ + assert(is_read_for_write()); + sendReply(api::ReturnCode(api::ReturnCode::BUSY, "A merge operation is pending for this bucket"), sender); +} + std::optional<document::Bucket> VisitorOperation::first_bucket_to_visit() const { diff --git a/storage/src/vespa/storage/distributor/operations/external/visitoroperation.h b/storage/src/vespa/storage/distributor/operations/external/visitoroperation.h index 4043b0b2c50..e6ad7a042dd 100644 --- a/storage/src/vespa/storage/distributor/operations/external/visitoroperation.h +++ b/storage/src/vespa/storage/distributor/operations/external/visitoroperation.h @@ -51,6 +51,7 @@ public: // Only valid to call if is_read_for_write() == true void fail_with_bucket_already_locked(DistributorMessageSender& sender); + void fail_with_merge_pending(DistributorMessageSender& sender); [[nodiscard]] bool verify_command_and_expand_buckets(DistributorMessageSender& sender); diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp index a74dc1c0d65..ea9cb56fae8 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp @@ -2,7 +2,6 @@ #include "splitoperation.h" #include <vespa/storage/distributor/idealstatemanager.h> -#include <vespa/storage/common/bucketoperationlogger.h> #include <vespa/storageapi/message/bucketsplitting.h> #include <vespa/storage/distributor/distributor_bucket_space.h> #include <climits> @@ -107,11 +106,6 @@ SplitOperation::onReceive(DistributorMessageSender&, const api::StorageReply::SP (DatabaseUpdate::CREATE_IF_NONEXISTING | DatabaseUpdate::RESET_TRUSTED)); - LOG_BUCKET_OPERATION_NO_LOCK( - sinfo.first, vespalib::make_string( - "Split from bucket %s: %s", - getBucketId().toString().c_str(), - copy.toString().c_str())); } } else if ( rep.getResult().getResult() == api::ReturnCode::BUCKET_NOT_FOUND @@ -137,21 +131,6 @@ SplitOperation::onReceive(DistributorMessageSender&, const api::StorageReply::SP getBucketId().toString().c_str(), rep.getResult().toString().c_str()); } -#ifdef ENABLE_BUCKET_OPERATION_LOGGING - if (_ok) { - LOG_BUCKET_OPERATION_NO_LOCK( - getBucketId(), vespalib::make_string( - "Split OK on node %d: %s. Finished: %s", - node, ost.str().c_str(), - _tracker.finished() ? "yes" : "no")); - } else { - LOG_BUCKET_OPERATION_NO_LOCK( - getBucketId(), vespalib::make_string( - "Split FAILED on node %d: %s. Finished: %s", - node, rep.getResult().toString().c_str(), - _tracker.finished() ? "yes" : "no")); - } -#endif if (_tracker.finished()) { LOG(debug, "Split done on node %d: %s completed operation", diff --git a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp index 6983e3594af..7eb2e2bf236 100644 --- a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp +++ b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp @@ -4,7 +4,6 @@ #include "clusterinformation.h" #include "pendingclusterstate.h" #include "distributor_bucket_space.h" -#include <vespa/storage/common/bucketoperationlogger.h> #include <algorithm> #include <vespa/log/log.h> diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp index 3dda989ff74..d009375a641 100644 --- a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp +++ b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp @@ -6,7 +6,6 @@ #include "distributor_bucket_space_repo.h" #include "distributor_bucket_space.h" #include <vespa/storageframework/defaultimplementation/clock/realclock.h> -#include <vespa/storage/common/bucketoperationlogger.h> #include <vespa/storage/common/global_bucket_space_distribution_converter.h> #include <vespa/document/bucket/fixed_bucket_spaces.h> #include <vespa/vespalib/util/xmlstream.hpp> diff --git a/storage/src/vespa/storage/distributor/statecheckers.cpp b/storage/src/vespa/storage/distributor/statecheckers.cpp index e861adda428..9616ca018b5 100644 --- a/storage/src/vespa/storage/distributor/statecheckers.cpp +++ b/storage/src/vespa/storage/distributor/statecheckers.cpp @@ -9,7 +9,6 @@ #include <vespa/storage/distributor/operations/idealstate/setbucketstateoperation.h> #include <vespa/storage/distributor/operations/idealstate/mergeoperation.h> #include <vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h> -#include <vespa/storage/common/bucketoperationlogger.h> #include <vespa/vespalib/stllike/asciistream.h> #include <vespa/log/log.h> diff --git a/storage/src/vespa/storage/persistence/bucketownershipnotifier.cpp b/storage/src/vespa/storage/persistence/bucketownershipnotifier.cpp index 9a2fe2cd6ce..0ff12ac71bc 100644 --- a/storage/src/vespa/storage/persistence/bucketownershipnotifier.cpp +++ b/storage/src/vespa/storage/persistence/bucketownershipnotifier.cpp @@ -2,7 +2,6 @@ #include "bucketownershipnotifier.h" #include <vespa/storage/common/nodestateupdater.h> -#include <vespa/storage/common/bucketoperationlogger.h> #include <vespa/storage/common/content_bucket_space_repo.h> #include <vespa/vdslib/state/cluster_state_bundle.h> #include <vespa/storageapi/message/bucket.h> @@ -83,12 +82,6 @@ BucketOwnershipNotifier::logNotification(const document::Bucket &bucket, currentOwnerIndex, sourceIndex, newInfo.toString().c_str()); - LOG_BUCKET_OPERATION_NO_LOCK( - bucket, - vespalib::make_string( - "Sending notify to distributor %u " - "(ownership changed away from %u)", - currentOwnerIndex, sourceIndex)); } void diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h index 8b031af4b69..7723d0ee765 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h @@ -86,7 +86,6 @@ public: enum DiskState { AVAILABLE, - DISABLED, CLOSED }; @@ -108,11 +107,6 @@ public: /** Check whether it is enabled or not. */ bool enabled() { return (getDiskState() == AVAILABLE); } bool closed() { return (getDiskState() == CLOSED); } - /** - * Disable the disk. Operations towards threads using this disk will - * start to fail. Typically called when disk errors are detected. - */ - void disable() { setDiskState(DISABLED); } /** Closes all disk threads. */ virtual void close() = 0; diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp index d60b48a54ae..c97965969bc 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp @@ -9,7 +9,6 @@ #include <vespa/storage/bucketdb/storbucketdb.h> #include <vespa/storage/common/bucketmessages.h> #include <vespa/storage/common/statusmessages.h> -#include <vespa/storage/common/bucketoperationlogger.h> #include <vespa/storage/common/messagebucket.h> #include <vespa/storage/persistence/asynchandler.h> #include <vespa/storage/persistence/messages.h> @@ -196,20 +195,6 @@ FileStorHandlerImpl::flush(bool killPendingMerges) } void -FileStorHandlerImpl::reply(api::StorageMessage& msg, DiskState state) const -{ - if (!msg.getType().isReply()) { - std::shared_ptr<api::StorageReply> rep = static_cast<api::StorageCommand&>(msg).makeReply(); - if (state == FileStorHandler::DISABLED) { - rep->setResult(api::ReturnCode(api::ReturnCode::DISK_FAILURE, "Disk disabled")); - } else { - rep->setResult(api::ReturnCode(api::ReturnCode::ABORTED, "Shutting down storage node.")); - } - _messageSender.sendReply(rep); - } -} - -void FileStorHandlerImpl::setDiskState(DiskState state) { // Mark disk closed @@ -494,16 +479,6 @@ FileStorHandlerImpl::remapMessage(api::StorageMessage& msg, const document::Buck if (idx > -1) { cmd.remapBucketId(targets[idx]->bucket.getBucketId()); targets[idx]->foundInQueue = true; -#if defined(ENABLE_BUCKET_OPERATION_LOGGING) - { - vespalib::string desc = vespalib::make_string( - "Remapping %s from %s to %s, targetDisk = %u", - cmd.toString().c_str(), source.toString().c_str(), - targets[idx]->bid.toString().c_str(), targetDisk); - LOG_BUCKET_OPERATION_NO_LOCK(source, desc); - LOG_BUCKET_OPERATION_NO_LOCK(targets[idx]->bid, desc); - } -#endif newBucket = targets[idx]->bucket; } else { document::DocumentId did(getDocId(msg)); @@ -536,16 +511,6 @@ FileStorHandlerImpl::remapMessage(api::StorageMessage& msg, const document::Buck cmd.toString().c_str(), targets[0]->bucket.getBucketId().toString().c_str()); cmd.remapBucketId(targets[0]->bucket.getBucketId()); newBucket = targets[0]->bucket; -#ifdef ENABLE_BUCKET_OPERATION_LOGGING - { - vespalib::string desc = vespalib::make_string( - "Remapping %s from %s to %s, targetDisk = %u", - cmd.toString().c_str(), source.toString().c_str(), - targets[0]->bid.toString().c_str(), targetDisk); - LOG_BUCKET_OPERATION_NO_LOCK(source, desc); - LOG_BUCKET_OPERATION_NO_LOCK(targets[0]->bid, desc); - } -#endif } } else { LOG(debug, "Did not remap %s with bucket %s from bucket %s", @@ -1260,7 +1225,6 @@ FileStorHandlerImpl::getStatus(std::ostream& out, const framework::HttpUrlPath& out << "Disk state: "; switch (getState()) { case FileStorHandler::AVAILABLE: out << "AVAILABLE"; break; - case FileStorHandler::DISABLED: out << "DISABLED"; break; case FileStorHandler::CLOSED: out << "CLOSED"; break; } out << "<h4>Active operations</h4>\n"; diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h index 6006fdeb7fd..688a4b96def 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h @@ -249,8 +249,6 @@ private: mutable std::condition_variable _pauseCond; std::atomic<bool> _paused; - void reply(api::StorageMessage&, DiskState state) const; - // Returns the index in the targets array we are sending to, or -1 if none of them match. int calculateTargetBasedOnDocId(const api::StorageMessage& msg, std::vector<RemapInfo*>& targets); diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp index fc0f7cd0b15..9fde88cfe51 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp @@ -289,9 +289,6 @@ FileStorManager::handlePersistenceMessage(const shared_ptr<api::StorageMessage>& } } switch (_filestorHandler->getDiskState()) { - case FileStorHandler::DISABLED: - errorCode = api::ReturnCode(api::ReturnCode::DISK_FAILURE, "Disk disabled"); - break; case FileStorHandler::CLOSED: errorCode = api::ReturnCode(api::ReturnCode::ABORTED, "Shutting down storage node."); break; diff --git a/storage/src/vespa/storage/persistence/simplemessagehandler.cpp b/storage/src/vespa/storage/persistence/simplemessagehandler.cpp index 5a5d3dcabca..9c31a1c81bc 100644 --- a/storage/src/vespa/storage/persistence/simplemessagehandler.cpp +++ b/storage/src/vespa/storage/persistence/simplemessagehandler.cpp @@ -3,7 +3,6 @@ #include "simplemessagehandler.h" #include "persistenceutil.h" #include <vespa/persistence/spi/persistenceprovider.h> -#include <vespa/storage/common/bucketoperationlogger.h> #include <vespa/storageapi/message/bucket.h> #include <vespa/document/base/exceptions.h> #include <vespa/document/fieldset/fieldsetrepo.h> @@ -105,7 +104,6 @@ SimpleMessageHandler::handleCreateBucket(api::CreateBucketCommand& cmd, MessageT LOG(debug, "CreateBucket(%s)", cmd.getBucketId().toString().c_str()); if (_env._fileStorHandler.isMerging(cmd.getBucket())) { LOG(warning, "Bucket %s was merging at create time. Unexpected.", cmd.getBucketId().toString().c_str()); - DUMP_LOGGED_BUCKET_OPERATIONS(cmd.getBucketId()); } spi::Bucket spiBucket(cmd.getBucket()); _spi.createBucket(spiBucket, tracker->context()); @@ -147,7 +145,6 @@ SimpleMessageHandler::handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageT { tracker->setMetric(_env._metrics.deleteBuckets); LOG(debug, "DeletingBucket(%s)", cmd.getBucketId().toString().c_str()); - LOG_BUCKET_OPERATION(cmd.getBucketId(), "deleteBucket()"); if (_env._fileStorHandler.isMerging(cmd.getBucket())) { _env._fileStorHandler.clearMergeStatus(cmd.getBucket(), api::ReturnCode(api::ReturnCode::ABORTED, "Bucket was deleted during the merge")); diff --git a/storage/src/vespa/storage/storageserver/distributornodecontext.h b/storage/src/vespa/storage/storageserver/distributornodecontext.h index 152218707e0..3e3541498c0 100644 --- a/storage/src/vespa/storage/storageserver/distributornodecontext.h +++ b/storage/src/vespa/storage/storageserver/distributornodecontext.h @@ -8,10 +8,6 @@ * This utility class sets up the default component register implementation. * It also sets up the clock and the threadpool, such that the most basic * features are available to the provider, before the service layer is set up. - * - * The service layer still provides the memory manager functionality though, - * so you cannot retrieve the memory manager before the service layer has - * started up. (Before getPartitionStates() have been called on provider) */ #pragma once diff --git a/storage/src/vespa/storage/storageserver/framework.cpp b/storage/src/vespa/storage/storageserver/framework.cpp deleted file mode 100644 index 1cbed2ea39d..00000000000 --- a/storage/src/vespa/storage/storageserver/framework.cpp +++ /dev/null @@ -1,33 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "framework.h" - -#include <vespa/storageframework/defaultimplementation/memory/prioritymemorylogic.h> - -using storage::framework::defaultimplementation::AllocationLogic; - -namespace storage { - -Framework::Framework(framework::Clock::UP clock) - : _componentRegister(), - _clock(clock), - _threadPool(*_clock), - _memoryLogic(new framework::defaultimplementation::PriorityMemoryLogic( - *_clock, 1024 * 1024 * 1024)), - _memoryManager(AllocationLogic::UP(_memoryLogic)) -{ - framework::defaultimplementation::ComponentRegisterImpl& cri( - _componentRegister.getComponentRegisterImpl()); - cri.setClock(*_clock); - cri.setThreadPool(_threadPool); - cri.setMemoryManager(_memoryManager); -} - -void -Framework::setMaximumMemoryUsage(uint64_t max) -{ - using storage::framework::defaultimplementation::PriorityMemoryLogic; - static_cast<PriorityMemoryLogic*>(_memoryLogic)->setMaximumMemoryUsage(max); -} - -} // storage diff --git a/storage/src/vespa/storage/storageserver/framework.h b/storage/src/vespa/storage/storageserver/framework.h deleted file mode 100644 index f0ea1d71aa6..00000000000 --- a/storage/src/vespa/storage/storageserver/framework.h +++ /dev/null @@ -1,57 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -/** - * @class storage::Framework - * @ingroup storageserver - * - * @brief Data available to both provider implementations and storage server - * - * This utility class sets up the default component register implementation. - * It also sets up the clock and the threadpool, such that the most basic - * features are available to the provider, before the service layer is set up. - * - * The service layer still provides the memory manager functionality though, - * so you cannot retrieve the memory manager before the service layer has - * started up. (Before getPartitionStates() have been called on provider) - */ - -#pragma once - -#include <vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.h> -#include <vespa/storageframework/defaultimplementation/clock/realclock.h> -#include <vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h> - -namespace storage { - -struct Framework { - // Typedefs to simplify the remainder of the interface - typedef StorageComponentRegisterImpl CompReg; - typedef framework::defaultimplementation::RealClock RealClock; - - /** - * You can provide your own clock implementation. Useful in testing where - * you want to fake the clock. - */ - Framework(framework::Clock::UP clock = framework::Clock::UP(new RealClock)); - - /** - * Get the actual component register. Available as the actual type as the - * storage server need to set implementations, and the components need the - * actual component register interface. - */ - CompReg& getComponentRegister() { return _componentRegister; } - - /** - * There currently exist threads that doesn't use the component model. - * Let the backend threadpool be accessible for now. - */ - FastOS_ThreadPool& getThreadPool() { return _threadPool.getThreadPool(); } - -private: - CompReg _componentRegister; - framework::Clock::UP _clock; - framework::defaultimplementation::ThreadPoolImpl _threadPool; - -}; - -} // storage - diff --git a/storage/src/vespa/storage/storageserver/servicelayernode.cpp b/storage/src/vespa/storage/storageserver/servicelayernode.cpp index 68b53738b20..e4bc1757493 100644 --- a/storage/src/vespa/storage/storageserver/servicelayernode.cpp +++ b/storage/src/vespa/storage/storageserver/servicelayernode.cpp @@ -35,8 +35,7 @@ ServiceLayerNode::ServiceLayerNode(const config::ConfigUri & configUri, ServiceL _persistenceProvider(persistenceProvider), _externalVisitors(externalVisitors), _fileStorManager(nullptr), - _init_has_been_called(false), - _noUsablePartitionMode(false) + _init_has_been_called(false) { } @@ -162,14 +161,6 @@ ServiceLayerNode::createChain(IStorageChainBuilder &builder) _communicationManager = communication_manager.get(); builder.add(std::move(communication_manager)); builder.add(std::make_unique<Bouncer>(compReg, _configUri)); - if (_noUsablePartitionMode) { - /* - * No usable partitions. Use minimal chain. Still needs to be - * able to report state back to cluster controller. - */ - builder.add(releaseStateManager()); - return; - } builder.add(std::make_unique<OpsLogger>(compReg, _configUri)); auto merge_throttler_up = std::make_unique<MergeThrottler>(_configUri, compReg); auto merge_throttler = merge_throttler_up.get(); diff --git a/storage/src/vespa/storage/storageserver/servicelayernode.h b/storage/src/vespa/storage/storageserver/servicelayernode.h index 9513888fd8d..9153e085033 100644 --- a/storage/src/vespa/storage/storageserver/servicelayernode.h +++ b/storage/src/vespa/storage/storageserver/servicelayernode.h @@ -35,7 +35,6 @@ class ServiceLayerNode std::unique_ptr<config::ConfigFetcher> _configFetcher; FileStorManager* _fileStorManager; bool _init_has_been_called; - bool _noUsablePartitionMode; public: typedef std::unique_ptr<ServiceLayerNode> UP; diff --git a/storage/src/vespa/storage/storageserver/servicelayernodecontext.h b/storage/src/vespa/storage/storageserver/servicelayernodecontext.h index 0516c9e3bda..0f2ca2a9048 100644 --- a/storage/src/vespa/storage/storageserver/servicelayernodecontext.h +++ b/storage/src/vespa/storage/storageserver/servicelayernodecontext.h @@ -8,10 +8,6 @@ * This utility class sets up the default component register implementation. * It also sets up the clock and the threadpool, such that the most basic * features are available to the provider, before the service layer is set up. - * - * The service layer still provides the memory manager functionality though, - * so you cannot retrieve the memory manager before the service layer has - * started up. (Before getPartitionStates() have been called on provider) */ #pragma once diff --git a/storage/src/vespa/storage/storageserver/statemanager.cpp b/storage/src/vespa/storage/storageserver/statemanager.cpp index 653822626ed..395e33a0393 100644 --- a/storage/src/vespa/storage/storageserver/statemanager.cpp +++ b/storage/src/vespa/storage/storageserver/statemanager.cpp @@ -6,7 +6,6 @@ #include <vespa/document/bucket/fixed_bucket_spaces.h> #include <vespa/metrics/jsonwriter.h> #include <vespa/metrics/metricmanager.h> -#include <vespa/storage/common/bucketoperationlogger.h> #include <vespa/storageapi/messageapi/storagemessage.h> #include <vespa/vdslib/state/cluster_state_bundle.h> #include <vespa/vespalib/io/fileutil.h> diff --git a/storage/src/vespa/storage/storageserver/storagenodecontext.h b/storage/src/vespa/storage/storageserver/storagenodecontext.h index 163c02ef5af..34afac43ab3 100644 --- a/storage/src/vespa/storage/storageserver/storagenodecontext.h +++ b/storage/src/vespa/storage/storageserver/storagenodecontext.h @@ -8,10 +8,6 @@ * This utility class sets up the default component register implementation. * It also sets up the clock and the threadpool, such that the most basic * features are available to the provider, before the service layer is set up. - * - * The service layer still provides the memory manager functionality though, - * so you cannot retrieve the memory manager before the service layer has - * started up. (Before getPartitionStates() have been called on provider) */ #pragma once diff --git a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/ZooKeeperRunner.java b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/ZooKeeperRunner.java index 492cfffa1ad..9b2c80cb3b1 100644 --- a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/ZooKeeperRunner.java +++ b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/ZooKeeperRunner.java @@ -56,7 +56,11 @@ public class ZooKeeperRunner implements Runnable { Path path = Paths.get(getDefaults().underVespaHome(zookeeperServerConfig.zooKeeperConfigFile())); log.log(Level.INFO, "Starting ZooKeeper server with config file " + path.toFile().getAbsolutePath() + ". Trying to establish ZooKeeper quorum (members: " + zookeeperServerHostnames(zookeeperServerConfig) + ")"); - server.start(path); + try { + server.start(path); + } catch (Throwable e) { + log.log(Level.SEVERE, "Starting ZooKeeper server failed:", e); + } } } |