diff options
70 files changed, 944 insertions, 715 deletions
diff --git a/clustercontroller-reindexer/CMakeLists.txt b/clustercontroller-reindexer/CMakeLists.txt new file mode 100644 index 00000000000..4d3fde15fca --- /dev/null +++ b/clustercontroller-reindexer/CMakeLists.txt @@ -0,0 +1,2 @@ +# Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +install_fat_java_artifact(clustercontroller-reindexer) diff --git a/clustercontroller-reindexer/pom.xml b/clustercontroller-reindexer/pom.xml new file mode 100644 index 00000000000..3791ea7d3f4 --- /dev/null +++ b/clustercontroller-reindexer/pom.xml @@ -0,0 +1,80 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. --> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>parent</artifactId> + <groupId>com.yahoo.vespa</groupId> + <version>7-SNAPSHOT</version> + <relativePath>../parent/pom.xml</relativePath> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>clustercontroller-reindexer</artifactId> + <packaging>container-plugin</packaging> + + <dependencies> + <dependency> + <groupId>com.yahoo.vespa</groupId> + <artifactId>container-dev</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>com.yahoo.vespa</groupId> + <artifactId>zkfacade</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>com.yahoo.vespa</groupId> + <artifactId>zookeeper-server-common</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>com.yahoo.vespa</groupId> + <artifactId>config-provisioning</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.yahoo.vespa</groupId> + <artifactId>config-model-api</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.yahoo.vespa</groupId> + <artifactId>config-model</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter-engine</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>com.yahoo.vespa</groupId> + <artifactId>bundle-plugin</artifactId> + <version>${project.version}</version> + <extensions>true</extensions> + <configuration> + <useCommonAssemblyIds>true</useCommonAssemblyIds> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + </plugin> + </plugins> + </build> + +</project>
\ No newline at end of file diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexing.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexing.java new file mode 100644 index 00000000000..c9ca0d94995 --- /dev/null +++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexing.java @@ -0,0 +1,193 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.reindexing; + +import com.yahoo.document.DocumentType; +import com.yahoo.documentapi.ProgressToken; + +import java.time.Instant; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.function.UnaryOperator; +import java.util.stream.Stream; + +import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.toUnmodifiableMap; + +/** + * Reindexing status per document type. + * + * @author jonmv + */ +public class Reindexing { + + private static Reindexing empty = new Reindexing(Map.of()); + + private final Map<DocumentType, Status> status; + + Reindexing(Map<DocumentType, Status> status) { + this.status = Map.copyOf(status); + } + + public static Reindexing empty() { + return empty; + } + + public Reindexing with(DocumentType documentType, Status updated) { + return new Reindexing(Stream.concat(Stream.of(documentType), + status.keySet().stream()) + .distinct() + .collect(toUnmodifiableMap(type -> type, + type -> documentType.equals(type) ? updated : status.get(type)))); + } + + /** Reindexing status per document type, for types where this is known. */ + public Map<DocumentType, Status> status() { + return status; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Reindexing that = (Reindexing) o; + return status.equals(that.status); + } + + @Override + public int hashCode() { + return Objects.hash(status); + } + + @Override + public String toString() { + return "Reindexing status " + status; + } + + /** + * Reindexing status for a single document type, in an application. Immutable. + * + * Reindexing starts at a given instant, and is progressed by visitors. + */ + public static class Status { + + private final Instant startedAt; + private final Instant endedAt; + private final ProgressToken progress; + private final State state; + private final String message; + + Status(Instant startedAt, Instant endedAt, ProgressToken progress, State state, String message) { + this.startedAt = startedAt; + this.endedAt = endedAt; + this.progress = progress; + this.state = state; + this.message = message; + } + + /** Returns a new, empty status, with no progress or result, in state READY. */ + public static Status ready(Instant now) { + return new Status(requireNonNull(now), null, new ProgressToken(), State.READY, null); + } + + /** Returns a copy of this, in state RUNNING. */ + public Status running() { + if (state != State.READY) + throw new IllegalStateException("Current state must be READY when changing to RUNNING"); + return new Status(startedAt, null, progress, State.RUNNING, null); + } + + /** Returns a copy of this with the given progress. */ + public Status progressed(ProgressToken progress) { + if (state != State.RUNNING) + throw new IllegalStateException("Current state must be RUNNING when updating progress"); + return new Status(startedAt, null, requireNonNull(progress), state, null); + } + + /** Returns a copy of this in state HALTED. */ + public Status halted() { + if (state != State.RUNNING) + throw new IllegalStateException("Current state must be RUNNING when changing to READY"); + return new Status(startedAt, null, progress, State.READY, null); + } + + /** Returns a copy of this with the given end instant, in state SUCCESSFUL. */ + public Status successful(Instant now) { + if (state != State.RUNNING) + throw new IllegalStateException("Current state must be RUNNING when changing to SUCCESSFUL"); + return new Status(startedAt, requireNonNull(now), null, State.SUCCESSFUL, null); + } + + /** Returns a copy of this with the given end instant and failure message, in state FAILED. */ + public Status failed(Instant now, String message) { + if (state != State.RUNNING) + throw new IllegalStateException("Current state must be RUNNING when changing to FAILED"); + return new Status(startedAt, requireNonNull(now), null, State.FAILED, requireNonNull(message)); + } + + public Instant startedAt() { + return startedAt; + } + + public Optional<Instant> endedAt() { + return Optional.ofNullable(endedAt); + } + + public Optional<ProgressToken> progress() { + return Optional.ofNullable(progress); + } + + public State state() { + return state; + } + + public Optional<String> message() { + return Optional.ofNullable(message); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Status status = (Status) o; + return startedAt.equals(status.startedAt) && + Objects.equals(endedAt, status.endedAt) && + Objects.equals(progress().map(ProgressToken::serializeToString), + status.progress().map(ProgressToken::serializeToString)) && + state == status.state && + Objects.equals(message, status.message); + } + + @Override + public int hashCode() { + return Objects.hash(startedAt, endedAt, progress().map(ProgressToken::serializeToString), state, message); + } + + @Override + public String toString() { + return state + (message != null ? " (" + message + ")" : "") + + ", started at " + startedAt + + (endedAt != null ? ", ended at " + endedAt : "") + + (progress != null ? ", with progress " + progress : ""); + } + + } + + + public enum State { + + /** Visit ready to be started. */ + READY, + + /** Visit currently running. */ + RUNNING, + + /** Visit completed successfully. */ + SUCCESSFUL, + + /** Visit failed fatally. */ + FAILED + + } + +} diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java new file mode 100644 index 00000000000..5eef93dad23 --- /dev/null +++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java @@ -0,0 +1,121 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.reindexing; + +import ai.vespa.reindexing.Reindexing.Status; +import com.google.inject.Inject; +import com.yahoo.document.DocumentTypeManager; +import com.yahoo.documentapi.ProgressToken; +import com.yahoo.path.Path; +import com.yahoo.slime.Cursor; +import com.yahoo.slime.Inspector; +import com.yahoo.slime.Slime; +import com.yahoo.slime.SlimeUtils; +import com.yahoo.vespa.curator.Curator; +import com.yahoo.yolean.Exceptions; + +import java.time.Instant; +import java.util.function.Function; + +import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.toUnmodifiableMap; + +/** + * Reads and writes status of initiated reindexing jobs. + * + * @author jonmv + */ +public class ReindexingCurator { + + private static final String STATUS = "status"; + private static final String TYPE = "type"; + private static final String STARTED_MILLIS = "startedMillis"; + private static final String ENDED_MILLIS = "endedMillis"; + private static final String PROGRESS = "progress"; + private static final String STATE = "state"; + private static final String MESSAGE = "message"; + + private static final Path statusPath = Path.fromString("/reindexing/v1/status"); + + private final Curator curator; + private final ReindexingSerializer serializer; + + public ReindexingCurator(Curator curator, DocumentTypeManager manager) { + this.curator = curator; + this.serializer = new ReindexingSerializer(manager); + } + + public Reindexing readReindexing() { + return curator.getData(statusPath).map(serializer::deserialize) + .orElse(Reindexing.empty()); + } + + public void writeReindexing(Reindexing reindexing) { + curator.set(statusPath, serializer.serialize(reindexing)); + } + + + private static class ReindexingSerializer { + + private final DocumentTypeManager types; + + public ReindexingSerializer(DocumentTypeManager types) { + this.types = types; + } + + private byte[] serialize(Reindexing reindexing) { + Cursor root = new Slime().setObject(); + Cursor statusArray = root.setArray(STATUS); + reindexing.status().forEach((type, status) -> { + Cursor statusObject = statusArray.addObject(); + statusObject.setString(TYPE, type.getName()); + statusObject.setLong(STARTED_MILLIS, status.startedAt().toEpochMilli()); + status.endedAt().ifPresent(endedAt -> statusObject.setLong(ENDED_MILLIS, endedAt.toEpochMilli())); + status.progress().ifPresent(progress -> statusObject.setString(PROGRESS, progress.serializeToString())); + statusObject.setString(STATE, toString(status.state())); + status.message().ifPresent(message -> statusObject.setString(MESSAGE, message)); + }); + return Exceptions.uncheck(() -> SlimeUtils.toJsonBytes(root)); + } + + private Reindexing deserialize(byte[] json) { + return new Reindexing(SlimeUtils.entriesStream(SlimeUtils.jsonToSlimeOrThrow(json).get().field(STATUS)) + .filter(object -> require(TYPE, object, field -> types.hasDataType(field.asString()))) // Forget unknown documents. + .collect(toUnmodifiableMap(object -> require(TYPE, object, field -> types.getDocumentType(field.asString())), + object -> new Status(require(STARTED_MILLIS, object, field -> Instant.ofEpochMilli(field.asLong())), + get(ENDED_MILLIS, object, field -> Instant.ofEpochMilli(field.asLong())), + get(PROGRESS, object, field -> ProgressToken.fromSerializedString(field.asString())), + require(STATE, object, field -> toState(field.asString())), + get(MESSAGE, object, field -> field.asString()))))); + } + + private static <T> T get(String name, Inspector object, Function<Inspector, T> mapper) { + return object.field(name).valid() ? mapper.apply(object.field(name)) : null; + } + + private static <T> T require(String name, Inspector object, Function<Inspector, T> mapper) { + return requireNonNull(get(name, object, mapper)); + } + + private static String toString(Reindexing.State state) { + switch (state) { + case READY: return "ready"; + case RUNNING: return "running"; + case SUCCESSFUL: return "successful"; + case FAILED: return "failed"; + default: throw new IllegalArgumentException("Unexpected state '" + state + "'"); + } + } + + private static Reindexing.State toState(String value) { + switch (value) { + case "ready": return Reindexing.State.READY; + case "running": return Reindexing.State.RUNNING; + case "successful": return Reindexing.State.SUCCESSFUL; + case "failed": return Reindexing.State.FAILED; + default: throw new IllegalArgumentException("Unknown state '" + value + "'"); + } + } + + } + +} diff --git a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingCuratorTest.java b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingCuratorTest.java new file mode 100644 index 00000000000..0501ff474ca --- /dev/null +++ b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingCuratorTest.java @@ -0,0 +1,48 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.reindexing; + +import com.yahoo.document.DocumentType; +import com.yahoo.document.DocumentTypeManager; +import com.yahoo.document.config.DocumentmanagerConfig; +import com.yahoo.documentapi.ProgressToken; +import com.yahoo.searchdefinition.derived.Deriver; +import com.yahoo.vespa.curator.mock.MockCurator; +import org.junit.jupiter.api.Test; + +import java.time.Instant; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * @author jonmv + */ +class ReindexingCuratorTest { + + @Test + void testSerialization() { + DocumentmanagerConfig musicConfig = Deriver.getDocumentManagerConfig("src/test/resources/schemas/music.sd").build(); + DocumentmanagerConfig emptyConfig = new DocumentmanagerConfig.Builder().build(); + DocumentTypeManager manager = new DocumentTypeManager(musicConfig); + DocumentType music = manager.getDocumentType("music"); + MockCurator mockCurator = new MockCurator(); + ReindexingCurator curator = new ReindexingCurator(mockCurator, manager); + + assertEquals(Reindexing.empty(), curator.readReindexing()); + + Reindexing.Status status = Reindexing.Status.ready(Instant.ofEpochMilli(123)) + .running() + .progressed(new ProgressToken()); + Reindexing reindexing = Reindexing.empty().with(music, status); + curator.writeReindexing(reindexing); + assertEquals(reindexing, curator.readReindexing()); + + status = status.halted().running().failed(Instant.ofEpochMilli(321), "error"); + reindexing = reindexing.with(music, status); + curator.writeReindexing(reindexing); + assertEquals(reindexing, curator.readReindexing()); + + // Unknown document types are forgotten. + assertEquals(Reindexing.empty(), new ReindexingCurator(mockCurator, new DocumentTypeManager(emptyConfig)).readReindexing()); + } + +} diff --git a/clustercontroller-reindexer/src/test/resources/schemas/music.sd b/clustercontroller-reindexer/src/test/resources/schemas/music.sd new file mode 100644 index 00000000000..a289f5a686b --- /dev/null +++ b/clustercontroller-reindexer/src/test/resources/schemas/music.sd @@ -0,0 +1,6 @@ +# Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +search music { + document music { + field artist type string { } + } +} diff --git a/configdefinitions/src/vespa/reindexing.def b/configdefinitions/src/vespa/reindexing.def new file mode 100644 index 00000000000..468a10e8199 --- /dev/null +++ b/configdefinitions/src/vespa/reindexing.def @@ -0,0 +1,7 @@ +# Copyright 2020 Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +# Reindexing status per document type, for a Vespa application + +namespace=vespa.config.content.reindexing + +# Epoch millis after which latest reprocessing may begin, per document type +status{}.readyAtMillis int diff --git a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/systemflags/v1/SystemFlagsDataArchive.java b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/systemflags/v1/SystemFlagsDataArchive.java index a00992da815..8f91a8127bd 100644 --- a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/systemflags/v1/SystemFlagsDataArchive.java +++ b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/systemflags/v1/SystemFlagsDataArchive.java @@ -221,6 +221,9 @@ public class SystemFlagsDataArchive { } else if (dimension.isEqualTo(DimensionHelper.toWire(FetchVector.Dimension.CONSOLE_USER_EMAIL))) { condition.get("values").forEachArrayElement(conditionValue -> conditionValue.asString() .orElseThrow(() -> new IllegalArgumentException("Non-string email address: " + conditionValue))); + } else if (dimension.isEqualTo(DimensionHelper.toWire(FetchVector.Dimension.TENANT_ID))) { + condition.get("values").forEachArrayElement(conditionValue -> conditionValue.asString() + .orElseThrow(() -> new IllegalArgumentException("Non-string tenant ID: " + conditionValue))); } })); } diff --git a/controller-api/src/test/java/com/yahoo/vespa/hosted/controller/api/systemflags/v1/SystemFlagsDataArchiveTest.java b/controller-api/src/test/java/com/yahoo/vespa/hosted/controller/api/systemflags/v1/SystemFlagsDataArchiveTest.java index aca991ec637..771e42e85f9 100644 --- a/controller-api/src/test/java/com/yahoo/vespa/hosted/controller/api/systemflags/v1/SystemFlagsDataArchiveTest.java +++ b/controller-api/src/test/java/com/yahoo/vespa/hosted/controller/api/systemflags/v1/SystemFlagsDataArchiveTest.java @@ -236,6 +236,30 @@ public class SystemFlagsDataArchiveTest { } } + @Test + public void normalize_json_fail_on_invalid_tenant_id() { + try { + SystemFlagsDataArchive.normalizeJson("{\n" + + " \"id\": \"foo\",\n" + + " \"rules\": [\n" + + " {\n" + + " \"conditions\": [\n" + + " {\n" + + " \"type\": \"whitelist\",\n" + + " \"dimension\": \"tenant\",\n" + + " \"values\": [ 123 ]\n" + + " }\n" + + " ],\n" + + " \"value\": true\n" + + " }\n" + + " ]\n" + + "}\n"); + fail(); + } catch (IllegalArgumentException e) { + assertEquals("Non-string tenant ID: 123", e.getMessage()); + } + } + private static void assertArchiveReturnsCorrectTestFlagDataForTarget(SystemFlagsDataArchive archive) { assertFlagDataHasValue(archive, MY_TEST_FLAG, mainControllerTarget, "main.controller"); assertFlagDataHasValue(archive, MY_TEST_FLAG, prodUsWestCfgTarget, "main.prod.us-west-1"); diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/routing/RoutingApiHandler.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/routing/RoutingApiHandler.java index 114a2967e9a..ace176bd91e 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/routing/RoutingApiHandler.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/routing/RoutingApiHandler.java @@ -265,6 +265,7 @@ public class RoutingApiHandler extends AuditLoggingRequestHandler { // Include status from routing policies var routingPolicies = controller.routing().policies().get(deploymentId); for (var policy : routingPolicies.values()) { + if (policy.endpoints().isEmpty()) continue; // This policy does not apply to a global endpoint if (!controller.zoneRegistry().routingMethods(policy.id().zone()).contains(RoutingMethod.exclusive)) continue; deploymentStatusToSlime(deploymentsArray.addObject(), new DeploymentId(policy.id().owner(), policy.id().zone()), diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/routing/RoutingApiTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/routing/RoutingApiTest.java index b0549662ab0..9bd8485db77 100644 --- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/routing/RoutingApiTest.java +++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/routing/RoutingApiTest.java @@ -2,6 +2,7 @@ package com.yahoo.vespa.hosted.controller.restapi.routing; import com.yahoo.application.container.handler.Request; +import com.yahoo.config.application.api.ValidationId; import com.yahoo.config.provision.AthenzDomain; import com.yahoo.config.provision.AthenzService; import com.yahoo.config.provision.zone.RoutingMethod; @@ -184,6 +185,21 @@ public class RoutingApiTest extends ControllerContainerTest { tester.assertResponse(operatorRequest("http://localhost:8080/routing/v1/status/environment/prod/region/us-west-1", "", Request.Method.GET), new File("policy/zone-status-in.json")); + + // Endpoint is removed + applicationPackage = new ApplicationPackageBuilder() + .athenzIdentity(AthenzDomain.from("domain"), AthenzService.from("service")) + .compileVersion(RoutingController.DIRECT_ROUTING_MIN_VERSION) + .region(westZone.region()) + .region(eastZone.region()) + .allow(ValidationId.globalEndpointChange) + .build(); + context.submit(applicationPackage).deploy(); + + // GET deployment status. Now empty as no routing policies have global endpoints + tester.assertResponse(operatorRequest("http://localhost:8080/routing/v1/status/tenant/tenant/application/application/instance/default/environment/prod/region/us-west-1", + "", Request.Method.GET), + "{\"deployments\":[]}"); } @Test diff --git a/documentapi/src/main/java/com/yahoo/documentapi/VisitorControlHandler.java b/documentapi/src/main/java/com/yahoo/documentapi/VisitorControlHandler.java index 489128c211e..05e66ef95f5 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/VisitorControlHandler.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/VisitorControlHandler.java @@ -20,9 +20,9 @@ public class VisitorControlHandler { SUCCESS, /** Aborted by user. */ ABORTED, - /** Failure */ + /** Fatal failure. */ FAILURE, - /** Create visitor reply did not return within the specified timeframe. */ + /** Create visitor reply did not return within the specified timeframe, or the session timed out. */ TIMEOUT }; diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java index 6203b724e95..e1d18080faf 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java @@ -735,7 +735,7 @@ public class MessageBusVisitorSession implements VisitorSession { String msg = "Got exception of type " + e.getClass().getName() + " with message '" + e.getMessage() + "' while processing reply in visitor session"; - e.printStackTrace(); + log.log(Level.WARNING, msg, e); transitionTo(new StateDescription(State.FAILED, msg)); } catch (Throwable t) { // We can't reliably handle this; take a nosedive diff --git a/eval/src/vespa/eval/eval/value.h b/eval/src/vespa/eval/eval/value.h index 0902a7c1752..e876ba7b472 100644 --- a/eval/src/vespa/eval/eval/value.h +++ b/eval/src/vespa/eval/eval/value.h @@ -103,6 +103,9 @@ public: static const ValueType &shared_type() { return _type; } }; +extern template class ScalarValue<double>; +extern template class ScalarValue<float>; + using DoubleValue = ScalarValue<double>; /** diff --git a/flags/src/main/java/com/yahoo/vespa/flags/FetchVector.java b/flags/src/main/java/com/yahoo/vespa/flags/FetchVector.java index 89c4f16e27b..37849b65adf 100644 --- a/flags/src/main/java/com/yahoo/vespa/flags/FetchVector.java +++ b/flags/src/main/java/com/yahoo/vespa/flags/FetchVector.java @@ -24,6 +24,9 @@ public class FetchVector { * Note: If this enum is changed, you must also change {@link DimensionHelper}. */ public enum Dimension { + /** A legal value for TenantName, e.g. vespa-team */ + TENANT_ID, + /** Value from ApplicationId::serializedForm of the form tenant:applicationName:instance. */ APPLICATION_ID, diff --git a/flags/src/main/java/com/yahoo/vespa/flags/Flags.java b/flags/src/main/java/com/yahoo/vespa/flags/Flags.java index e622e1dd419..bf3b497bc3f 100644 --- a/flags/src/main/java/com/yahoo/vespa/flags/Flags.java +++ b/flags/src/main/java/com/yahoo/vespa/flags/Flags.java @@ -15,6 +15,7 @@ import static com.yahoo.vespa.flags.FetchVector.Dimension.APPLICATION_ID; import static com.yahoo.vespa.flags.FetchVector.Dimension.CONSOLE_USER_EMAIL; import static com.yahoo.vespa.flags.FetchVector.Dimension.HOSTNAME; import static com.yahoo.vespa.flags.FetchVector.Dimension.NODE_TYPE; +import static com.yahoo.vespa.flags.FetchVector.Dimension.TENANT_ID; import static com.yahoo.vespa.flags.FetchVector.Dimension.VESPA_VERSION; import static com.yahoo.vespa.flags.FetchVector.Dimension.ZONE_ID; @@ -266,7 +267,7 @@ public class Flags { "tenant-budget-quota", -1, "The budget in cents/hr a tenant is allowed spend per instance, as calculated by NodeResources", "Only takes effect on next deployment, if set to a value other than the default for flag!", - APPLICATION_ID + TENANT_ID ); public static final UnboundBooleanFlag ONLY_PUBLIC_ACCESS = defineFeatureFlag( diff --git a/flags/src/main/java/com/yahoo/vespa/flags/json/DimensionHelper.java b/flags/src/main/java/com/yahoo/vespa/flags/json/DimensionHelper.java index 4b989b8f819..f109d3c950b 100644 --- a/flags/src/main/java/com/yahoo/vespa/flags/json/DimensionHelper.java +++ b/flags/src/main/java/com/yahoo/vespa/flags/json/DimensionHelper.java @@ -20,6 +20,7 @@ public class DimensionHelper { serializedDimensions.put(FetchVector.Dimension.CLUSTER_TYPE, "cluster-type"); serializedDimensions.put(FetchVector.Dimension.VESPA_VERSION, "vespa-version"); serializedDimensions.put(FetchVector.Dimension.CONSOLE_USER_EMAIL, "console-user-email"); + serializedDimensions.put(FetchVector.Dimension.TENANT_ID, "tenant"); if (serializedDimensions.size() != FetchVector.Dimension.values().length) { throw new IllegalStateException(FetchVectorHelper.class.getName() + " is not in sync with " + diff --git a/node-admin/.gitignore b/node-admin/.gitignore index 5881f4b513b..adbb97d2d31 100644 --- a/node-admin/.gitignore +++ b/node-admin/.gitignore @@ -1 +1 @@ -node-admin-zone-app/components +data/
\ No newline at end of file diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/coredump/CoreCollector.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/coredump/CoreCollector.java index 63bfe87b662..db9460ee07a 100644 --- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/coredump/CoreCollector.java +++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/coredump/CoreCollector.java @@ -24,9 +24,11 @@ import java.util.regex.Pattern; public class CoreCollector { private static final Logger logger = Logger.getLogger(CoreCollector.class.getName()); + private static final Pattern JAVA_HEAP_DUMP_PATTERN = Pattern.compile("java_pid.*\\.hprof"); private static final Pattern CORE_GENERATOR_PATH_PATTERN = Pattern.compile("^Core was generated by `(?<path>.*?)'.$"); private static final Pattern EXECFN_PATH_PATTERN = Pattern.compile("^.* execfn: '(?<path>.*?)'"); private static final Pattern FROM_PATH_PATTERN = Pattern.compile("^.* from '(?<path>.*?)'"); + static final String GDB_PATH = "/opt/rh/devtoolset-9/root/bin/gdb"; private final ContainerOperations docker; @@ -35,8 +37,7 @@ public class CoreCollector { } Path readBinPathFallback(NodeAgentContext context, Path coredumpPath) { - String command = GDBPath().toString() - + " -n -batch -core " + coredumpPath + " | grep \'^Core was generated by\'"; + String command = GDB_PATH + " -n -batch -core " + coredumpPath + " | grep \'^Core was generated by\'"; String[] wrappedCommand = {"/bin/sh", "-c", command}; ProcessResult result = docker.executeCommandInContainerAsRoot(context, wrappedCommand); @@ -48,10 +49,6 @@ public class CoreCollector { return Paths.get(matcher.group("path").split(" ")[0]); } - Path GDBPath() { - return Paths.get("/opt/rh/devtoolset-9/root/bin/gdb"); - } - Path readBinPath(NodeAgentContext context, Path coredumpPath) { String[] command = {"file", coredumpPath.toString()}; try { @@ -79,7 +76,7 @@ public class CoreCollector { List<String> readBacktrace(NodeAgentContext context, Path coredumpPath, Path binPath, boolean allThreads) { String threads = allThreads ? "thread apply all bt" : "bt"; - String[] command = {GDBPath().toString(), "-n", "-ex", threads, "-batch", binPath.toString(), coredumpPath.toString()}; + String[] command = {GDB_PATH, "-n", "-ex", threads, "-batch", binPath.toString(), coredumpPath.toString()}; ProcessResult result = docker.executeCommandInContainerAsRoot(context, command); if (result.getExitStatus() != 0) @@ -105,6 +102,9 @@ public class CoreCollector { * @return map of relevant metadata about the core dump */ Map<String, Object> collect(NodeAgentContext context, Path coredumpPath) { + if (JAVA_HEAP_DUMP_PATTERN.matcher(coredumpPath.getFileName().toString()).matches()) + return Map.of("bin_path", "java"); + Map<String, Object> data = new HashMap<>(); try { Path binPath = readBinPath(context, coredumpPath); diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/coredump/CoredumpHandler.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/coredump/CoredumpHandler.java index 293f2c688d6..20d710cbad8 100644 --- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/coredump/CoredumpHandler.java +++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/coredump/CoredumpHandler.java @@ -37,7 +37,6 @@ import static com.yahoo.yolean.Exceptions.uncheck; */ public class CoredumpHandler { - private static final Pattern JAVA_CORE_PATTERN = Pattern.compile("java_pid.*\\.hprof"); private static final Pattern HS_ERR_PATTERN = Pattern.compile("hs_err_pid[0-9]+\\.log"); private static final String LZ4_PATH = "/usr/bin/lz4"; private static final String PROCESSING_DIRECTORY_NAME = "processing"; @@ -84,12 +83,6 @@ public class CoredumpHandler { Path containerCrashPathOnHost = context.pathOnHostFromPathInNode(crashPatchInContainer); Path containerProcessingPathOnHost = containerCrashPathOnHost.resolve(PROCESSING_DIRECTORY_NAME); - // Remove java core dumps - FileFinder.files(containerCrashPathOnHost) - .match(nameMatches(JAVA_CORE_PATTERN)) - .maxDepth(1) - .deleteRecursively(context); - updateMetrics(context, containerCrashPathOnHost); // Check if we have already started to process a core dump or we can enqueue a new core one diff --git a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/maintenance/coredump/CoreCollectorTest.java b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/maintenance/coredump/CoreCollectorTest.java index 0be68e905ca..9f8f8953424 100644 --- a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/maintenance/coredump/CoreCollectorTest.java +++ b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/maintenance/coredump/CoreCollectorTest.java @@ -12,6 +12,7 @@ import java.nio.file.Paths; import java.util.List; import java.util.Map; +import static com.yahoo.vespa.hosted.node.admin.maintenance.coredump.CoreCollector.GDB_PATH; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; @@ -21,7 +22,6 @@ import static org.mockito.Mockito.when; * @author freva */ public class CoreCollectorTest { - private final String GDB_PATH = "/opt/rh/devtoolset-9/root/bin/gdb"; private final String JDK_PATH = "/path/to/jdk/java"; private final ContainerOperations docker = mock(ContainerOperations.class); private final CoreCollector coreCollector = new CoreCollector(docker); @@ -161,6 +161,11 @@ public class CoreCollectorTest { assertEquals(expectedData, coreCollector.collect(context, TEST_CORE_PATH)); } + @Test + public void metadata_for_java_heap_dump() { + assertEquals(Map.of("bin_path", "java"), coreCollector.collect(context, Paths.get("java_pid123.hprof"))); + } + private void mockExec(String[] cmd, String output) { mockExec(cmd, output, ""); } diff --git a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/maintenance/coredump/CoredumpHandlerTest.java b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/maintenance/coredump/CoredumpHandlerTest.java index e5c2e35f2b2..fe0a7a52a62 100644 --- a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/maintenance/coredump/CoredumpHandlerTest.java +++ b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/maintenance/coredump/CoredumpHandlerTest.java @@ -45,7 +45,6 @@ import static org.mockito.Mockito.when; */ public class CoredumpHandlerTest { private final FileSystem fileSystem = TestFileSystem.create(); - private final Path donePath = fileSystem.getPath("/home/docker/dumps"); private final NodeAgentContext context = new NodeAgentContextImpl.Builder("container-123.domain.tld") .fileSystem(fileSystem).build(); private final Path crashPathInContainer = fileSystem.getPath("/var/crash"); diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/QuestMetricsDb.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/QuestMetricsDb.java index b1585922f38..070bf98bf87 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/QuestMetricsDb.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/QuestMetricsDb.java @@ -49,6 +49,8 @@ public class QuestMetricsDb implements MetricsDb { private final String dataDir; private final CairoEngine engine; + private long highestTimestampAdded = 0; + @Inject public QuestMetricsDb() { this(Defaults.getDefaults().underVespaHome("var/db/vespa/autoscaling"), Clock.systemUTC()); @@ -67,6 +69,7 @@ public class QuestMetricsDb implements MetricsDb { // silence Questdb's custom logging system IOUtils.writeFile(new File(dataDir, "quest-log.conf"), new byte[0]); System.setProperty("questdbLog", dataDir + "/quest-log.conf"); + System.setProperty("org.jooq.no-logo", "true"); CairoConfiguration configuration = new DefaultCairoConfiguration(dataDir); engine = new CairoEngine(configuration); @@ -77,7 +80,9 @@ public class QuestMetricsDb implements MetricsDb { public void add(Collection<Pair<String, MetricSnapshot>> snapshots) { try (TableWriter writer = engine.getWriter(newContext().getCairoSecurityContext(), tableName)) { for (var snapshot : snapshots) { - long atMillis = snapshot.getSecond().at().toEpochMilli(); + long atMillis = adjustIfRecent(snapshot.getSecond().at().toEpochMilli(), highestTimestampAdded); + if (atMillis < highestTimestampAdded) continue; // Ignore old data + highestTimestampAdded = atMillis; TableWriter.Row row = writer.newRow(atMillis * 1000); // in microseconds row.putStr(0, snapshot.getFirst()); row.putFloat(2, (float)snapshot.getSecond().cpu()); @@ -154,6 +159,17 @@ public class QuestMetricsDb implements MetricsDb { } } + private long adjustIfRecent(long timestamp, long highestTimestampAdded) { + if (timestamp >= highestTimestampAdded) return timestamp; + + // We cannot add old data to QuestDb, but we want to use all recent information + long oneMinute = 60 * 1000; + if (timestamp >= highestTimestampAdded - oneMinute) return highestTimestampAdded; + + // Too old; discard + return timestamp; + } + private ListMap<String, MetricSnapshot> getSnapshots(Instant startTime, Set<String> hostnames, SqlCompiler compiler, diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/DynamicProvisioningMaintainer.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/DynamicProvisioningMaintainer.java index a43b2655ea3..0195466b689 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/DynamicProvisioningMaintainer.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/DynamicProvisioningMaintainer.java @@ -116,7 +116,6 @@ public class DynamicProvisioningMaintainer extends NodeRepositoryMaintainer { }); } - /** * Provision hosts to ensure there is room to allocate spare nodes. * diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/CapacityPolicies.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/CapacityPolicies.java index f1a006b1359..597a48ad0c9 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/CapacityPolicies.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/CapacityPolicies.java @@ -75,8 +75,8 @@ public class CapacityPolicies { * Whether or not the nodes requested can share physical host with other applications. * A security feature which only makes sense for prod. */ - public boolean decideExclusivity(boolean requestedExclusivity) { - return requestedExclusivity && zone.environment() == Environment.prod; + public boolean decideExclusivity(Capacity capacity, boolean requestedExclusivity) { + return requestedExclusivity && (capacity.isRequired() || zone.environment() == Environment.prod); } /** diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodeAllocation.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodeAllocation.java index 1e98160955c..68e11c4c995 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodeAllocation.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodeAllocation.java @@ -127,11 +127,7 @@ class NodeAllocation { ++rejectedDueToClashingParentHost; continue; } - if ( ! exclusiveTo(application, candidate.parentHostname())) { - ++rejectedDueToExclusivity; - continue; - } - if ( requestedNodes.isExclusive() && ! hostsOnly(application, candidate.parentHostname())) { + if ( violatesExclusivity(candidate)) { ++rejectedDueToExclusivity; continue; } @@ -158,7 +154,7 @@ class NodeAllocation { if (violatesParentHostPolicy(candidate)) return true; if ( ! hasCompatibleFlavor(candidate)) return true; if (candidate.wantToRetire()) return true; - if (requestedNodes.isExclusive() && ! hostsOnly(application, candidate.parentHostname())) return true; + if (violatesExclusivity(candidate)) return true; return false; } @@ -182,35 +178,23 @@ class NodeAllocation { return false; } - /** - * If a parent host is given, and it hosts another application which requires exclusive access - * to the physical host, then we cannot host this application on it. - */ - private boolean exclusiveTo(ApplicationId applicationId, Optional<String> parentHostname) { - if (parentHostname.isEmpty()) return true; - for (Node nodeOnHost : allNodes.childrenOf(parentHostname.get())) { - if (nodeOnHost.allocation().isEmpty()) continue; - if ( nodeOnHost.allocation().get().membership().cluster().isExclusive() && - ! allocatedTo(applicationId, nodeOnHost)) - return false; - } - return true; - } + private boolean violatesExclusivity(NodeCandidate candidate) { + if (candidate.parentHostname().isEmpty()) return false; - /** Returns true if this host only hosts the given application (in any instance) */ - private boolean hostsOnly(ApplicationId application, Optional<String> parentHostname) { - if (parentHostname.isEmpty()) return true; // yes, as host is exclusive + // In dynamic provisioned zones a node requiring exclusivity must be on a host that has exclusiveTo equal to its owner + if (nodeRepository.zone().getCloud().dynamicProvisioning()) + return requestedNodes.isExclusive() && + ! candidate.parent.flatMap(Node::exclusiveTo).map(application::equals).orElse(false); - for (Node nodeOnHost : allNodes.childrenOf(parentHostname.get())) { + // In non-dynamic provisioned zones we require that if either of the nodes on the host requires exclusivity, + // then all the nodes on the host must have the same owner + for (Node nodeOnHost : allNodes.childrenOf(candidate.parentHostname().get())) { if (nodeOnHost.allocation().isEmpty()) continue; - if ( ! allocatedTo(application, nodeOnHost)) return false; + if (requestedNodes.isExclusive() || nodeOnHost.allocation().get().membership().cluster().isExclusive()) { + if ( ! nodeOnHost.allocation().get().owner().equals(application)) return true; + } } - return true; - } - - private boolean allocatedTo(ApplicationId applicationId, Node node) { - if (node.allocation().isEmpty()) return false; - return node.allocation().get().owner().equals(applicationId); + return false; } /** @@ -390,7 +374,7 @@ class NodeAllocation { /** Prefer to unretire nodes we don't want to retire, and otherwise those with lower index */ private List<NodeCandidate> byUnretiringPriority(Collection<NodeCandidate> candidates) { return candidates.stream() - .sorted(Comparator.comparing((NodeCandidate n) -> n.wantToRetire()) + .sorted(Comparator.comparing(NodeCandidate::wantToRetire) .thenComparing(n -> n.allocation().get().membership().index())) .collect(Collectors.toList()); } @@ -407,7 +391,7 @@ class NodeAllocation { reasons.add("insufficient real resources on hosts"); if (reasons.isEmpty()) return ""; - return ": Not enough nodes available due to " + reasons.stream().collect(Collectors.joining(", ")); + return ": Not enough nodes available due to " + String.join(", ", reasons); } static class FlavorCount { diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodeRepositoryProvisioner.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodeRepositoryProvisioner.java index eaa1b2bfad3..731090b9cc7 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodeRepositoryProvisioner.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodeRepositoryProvisioner.java @@ -107,7 +107,7 @@ public class NodeRepositoryProvisioner implements Provisioner { int nodeCount = capacityPolicies.decideSize(target.nodes(), requested, cluster, application); groups = Math.min(target.groups(), nodeCount); // cannot have more groups than nodes resources = capacityPolicies.decideNodeResources(target.nodeResources(), requested, cluster); - boolean exclusive = capacityPolicies.decideExclusivity(cluster.isExclusive()); + boolean exclusive = capacityPolicies.decideExclusivity(requested, cluster.isExclusive()); nodeSpec = NodeSpec.from(nodeCount, resources, exclusive, requested.canFail()); logIfDownscaled(target.nodes(), nodeCount, cluster, logger); } diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/autoscale/QuestMetricsDbTest.java b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/autoscale/QuestMetricsDbTest.java index a1cc66ffa28..6d52fb29160 100644 --- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/autoscale/QuestMetricsDbTest.java +++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/autoscale/QuestMetricsDbTest.java @@ -69,6 +69,30 @@ public class QuestMetricsDbTest { } @Test + public void testWriteOldData() { + String dataDir = "data/QuestMetricsDbWriteOldData"; + IOUtils.recursiveDeleteDir(new File(dataDir)); + IOUtils.createDirectory(dataDir + "/metrics"); + ManualClock clock = new ManualClock("2020-10-01T00:00:00"); + QuestMetricsDb db = new QuestMetricsDb(dataDir, clock); + Instant startTime = clock.instant(); + clock.advance(Duration.ofSeconds(300)); + db.add(timeseriesAt(10, clock.instant(), "host1", "host2", "host3")); + clock.advance(Duration.ofSeconds(1)); + + List<NodeTimeseries> nodeTimeSeries1 = db.getNodeTimeseries(startTime, Set.of("host1")); + assertEquals(10, nodeTimeSeries1.get(0).size()); + + db.add(timeseriesAt(10, clock.instant().minus(Duration.ofSeconds(20)), "host1", "host2", "host3")); + List<NodeTimeseries> nodeTimeSeries2 = db.getNodeTimeseries(startTime, Set.of("host1")); + assertEquals("Recent data is accepted", 20, nodeTimeSeries2.get(0).size()); + + db.add(timeseriesAt(10, clock.instant().minus(Duration.ofSeconds(200)), "host1", "host2", "host3")); + List<NodeTimeseries> nodeTimeSeries3 = db.getNodeTimeseries(startTime, Set.of("host1")); + assertEquals("Too old data is rejected", 20, nodeTimeSeries3.get(0).size()); + } + + @Test public void testGc() { String dataDir = "data/QuestMetricsDbGc"; IOUtils.recursiveDeleteDir(new File(dataDir)); @@ -102,4 +126,16 @@ public class QuestMetricsDbTest { return timeseries; } + private Collection<Pair<String, MetricSnapshot>> timeseriesAt(int countPerHost, Instant at, String ... hosts) { + Collection<Pair<String, MetricSnapshot>> timeseries = new ArrayList<>(); + for (int i = 1; i <= countPerHost; i++) { + for (String host : hosts) + timeseries.add(new Pair<>(host, new MetricSnapshot(at, + i * 0.1, + i * 0.2, + i * 0.4, + i % 100))); + } + return timeseries; + } } diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/DynamicDockerProvisionTest.java b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/DynamicDockerProvisionTest.java index b871404aa9d..40d0f52dc37 100644 --- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/DynamicDockerProvisionTest.java +++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/DynamicDockerProvisionTest.java @@ -96,6 +96,7 @@ public class DynamicDockerProvisionTest { // Deploy new exclusive application ApplicationId application3 = ProvisioningTester.makeApplicationId(); + mockHostProvisioner(hostProvisioner, "large", 3, application3); prepareAndActivate(application3, clusterSpec("mycluster", true), 4, 1, resources); verify(hostProvisioner).provisionHosts(List.of(104, 105, 106, 107), resources, application3, Version.emptyVersion, HostSharing.exclusive); @@ -159,6 +160,28 @@ public class DynamicDockerProvisionTest { } @Test + public void retires_on_exclusivity_violation() { + ApplicationId application1 = ProvisioningTester.makeApplicationId(); + NodeResources resources = new NodeResources(1, 4, 10, 1); + + mockHostProvisioner(hostProvisioner, "large", 3, null); // Provision shared hosts + prepareAndActivate(application1, clusterSpec("mycluster"), 4, 1, resources); + Set<Node> initialNodes = tester.nodeRepository().list(application1).stream().collect(Collectors.toSet()); + assertEquals(4, initialNodes.size()); + + // Redeploy same application with exclusive=true + mockHostProvisioner(hostProvisioner, "large", 3, application1); + prepareAndActivate(application1, clusterSpec("mycluster", true), 4, 1, resources); + assertEquals(8, tester.nodeRepository().list(application1).size()); + assertEquals(initialNodes, tester.nodeRepository().list(application1).retired().stream().collect(Collectors.toSet())); + + // Redeploy without exclusive again is no-op + prepareAndActivate(application1, clusterSpec("mycluster"), 4, 1, resources); + assertEquals(8, tester.nodeRepository().list(application1).size()); + assertEquals(initialNodes, tester.nodeRepository().list(application1).retired().stream().collect(Collectors.toSet())); + } + + @Test public void node_indices_are_unique_even_when_a_node_is_left_in_reserved_state() { NodeResources resources = new NodeResources(10, 10, 10, 10); ApplicationId app = ProvisioningTester.makeApplicationId(); @@ -37,6 +37,7 @@ <module>clustercontroller-apps</module> <module>clustercontroller-apputil</module> <module>clustercontroller-core</module> + <module>clustercontroller-reindexer</module> <module>clustercontroller-standalone</module> <module>clustercontroller-utils</module> <module>component</module> diff --git a/searchcore/src/tests/proton/common/pendinglidtracker_test.cpp b/searchcore/src/tests/proton/common/pendinglidtracker_test.cpp index 3b42a399888..b276ed9e46d 100644 --- a/searchcore/src/tests/proton/common/pendinglidtracker_test.cpp +++ b/searchcore/src/tests/proton/common/pendinglidtracker_test.cpp @@ -86,24 +86,4 @@ TEST("test pendinglidtracker for needcommit") { EXPECT_EQUAL(ILidCommitState::State::COMPLETED, tracker.getState(LIDV_2_1_3)); } -TEST("test two phase pendinglidtracker for needcommit") { - TwoPhasePendingLidTracker tracker; - ILidCommitState::State incomplete = ILidCommitState::State::NEED_COMMIT; - verifyPhase1ProduceAndNeedCommit(tracker, incomplete); - EXPECT_EQUAL(incomplete, tracker.getState()); - EXPECT_EQUAL(incomplete, tracker.getState(LID_1)); - EXPECT_EQUAL(incomplete, tracker.getState(LIDV_2_1_3)); - EXPECT_EQUAL(ILidCommitState::State::COMPLETED, tracker.getState(LIDV_2_3)); - { - ILidCommitState::State waiting = ILidCommitState::State::WAITING; - auto snapshot = tracker.produceSnapshot(); - EXPECT_EQUAL(waiting, tracker.getState()); - EXPECT_EQUAL(waiting, tracker.getState(LID_1)); - EXPECT_EQUAL(waiting, tracker.getState(LIDV_2_1_3)); - } - EXPECT_EQUAL(ILidCommitState::State::COMPLETED, tracker.getState()); - EXPECT_EQUAL(ILidCommitState::State::COMPLETED, tracker.getState(LID_1)); - EXPECT_EQUAL(ILidCommitState::State::COMPLETED, tracker.getState(LIDV_2_1_3)); -} - TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/searchcore/src/tests/proton/documentdb/configurer/configurer_test.cpp b/searchcore/src/tests/proton/documentdb/configurer/configurer_test.cpp index b2903f00226..203b6646880 100644 --- a/searchcore/src/tests/proton/documentdb/configurer/configurer_test.cpp +++ b/searchcore/src/tests/proton/documentdb/configurer/configurer_test.cpp @@ -7,7 +7,6 @@ #include <vespa/searchcore/proton/attribute/attributemanager.h> #include <vespa/searchcore/proton/attribute/imported_attributes_repo.h> #include <vespa/searchcore/proton/docsummary/summarymanager.h> -#include <vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.h> #include <vespa/searchcore/proton/index/index_writer.h> #include <vespa/searchcore/proton/index/indexmanager.h> #include <vespa/searchcore/proton/reprocessing/attribute_reprocessing_initializer.h> @@ -51,7 +50,6 @@ using Configurer = SearchableDocSubDBConfigurer; using ConfigurerUP = std::unique_ptr<SearchableDocSubDBConfigurer>; using SummarySetup = SummaryManager::SummarySetup; using DocumenttypesConfigSP = proton::DocumentDBConfig::DocumenttypesConfigSP; -using LidReuseDelayerConfig = documentmetastore::LidReuseDelayerConfig; const vespalib::string BASE_DIR("baseDir"); const vespalib::string DOC_TYPE("invalid"); @@ -153,6 +151,7 @@ struct Fixture EmptyConstantValueFactory _constantValueFactory; ConstantValueRepo _constantValueRepo; vespalib::ThreadStackExecutor _summaryExecutor; + std::shared_ptr<PendingLidTrackerBase> _pendingLidsForCommit; ViewSet _views; MyDocumentDBReferenceResolver _resolver; ConfigurerUP _configurer; @@ -167,6 +166,7 @@ Fixture::Fixture() _constantValueFactory(), _constantValueRepo(_constantValueFactory), _summaryExecutor(8, 128*1024), + _pendingLidsForCommit(std::make_shared<PendingLidTracker>()), _views(), _resolver(), _configurer() @@ -175,7 +175,7 @@ Fixture::Fixture() vespalib::mkdir(BASE_DIR); initViewSet(_views); _configurer = std::make_unique<Configurer>(_views._summaryMgr, _views.searchView, _views.feedView, _queryLimiter, - _constantValueRepo, _clock, "test", 0); + _constantValueRepo, _clock, "test", 0); } Fixture::~Fixture() = default; @@ -210,20 +210,16 @@ Fixture::initViewSet(ViewSet &views) std::move(matchView))); views.feedView.set( make_shared<SearchableFeedView>(StoreOnlyFeedView::Context(summaryAdapter, - schema, - views.searchView.get()->getDocumentMetaStore(), - *views._gidToLidChangeHandler, - views.repo, - views._writeService, - LidReuseDelayerConfig()), - SearchableFeedView::PersistentParams( - views.serialNum, - views.serialNum, - views._docTypeName, - 0u /* subDbId */, - SubDbType::READY), - FastAccessFeedView::Context(attrWriter, views._docIdLimit), - SearchableFeedView::Context(indexWriter))); + schema, + views.searchView.get()->getDocumentMetaStore(), + views.repo, + _pendingLidsForCommit, + *views._gidToLidChangeHandler, + views._writeService), + SearchableFeedView::PersistentParams(views.serialNum, views.serialNum, + views._docTypeName, 0u, SubDbType::READY), + FastAccessFeedView::Context(attrWriter, views._docIdLimit), + SearchableFeedView::Context(indexWriter))); } @@ -238,6 +234,7 @@ struct MyFastAccessFeedView proton::IDocumentMetaStoreContext::SP _dmsc; std::shared_ptr<IGidToLidChangeHandler> _gidToLidChangeHandler; + std::shared_ptr<PendingLidTrackerBase> _pendingLidsForCommit; VarHolder<FastAccessFeedView::SP> _feedView; explicit MyFastAccessFeedView(IThreadingService &writeService) @@ -247,6 +244,7 @@ struct MyFastAccessFeedView _hwInfo(), _dmsc(), _gidToLidChangeHandler(make_shared<DummyGidToLidChangeHandler>()), + _pendingLidsForCommit(std::make_shared<PendingLidTracker>()), _feedView() { init(); @@ -255,18 +253,18 @@ struct MyFastAccessFeedView ~MyFastAccessFeedView(); void init() { - ISummaryAdapter::SP summaryAdapter(new MySummaryAdapter()); - Schema::SP schema(new Schema()); + MySummaryAdapter::SP summaryAdapter = std::make_shared<MySummaryAdapter>(); + Schema::SP schema = std::make_shared<Schema>(); _dmsc = make_shared<DocumentMetaStoreContext>(std::make_shared<BucketDBOwner>()); std::shared_ptr<const DocumentTypeRepo> repo = createRepo(); - StoreOnlyFeedView::Context storeOnlyCtx(summaryAdapter, schema, _dmsc, *_gidToLidChangeHandler, repo, - _writeService, LidReuseDelayerConfig()); + StoreOnlyFeedView::Context storeOnlyCtx(summaryAdapter, schema, _dmsc, repo, + _pendingLidsForCommit, *_gidToLidChangeHandler, _writeService); StoreOnlyFeedView::PersistentParams params(1, 1, DocTypeName(DOC_TYPE), 0, SubDbType::NOTREADY); auto mgr = make_shared<AttributeManager>(BASE_DIR, "test.subdb", TuneFileAttributes(), _fileHeaderContext, _writeService.attributeFieldWriter(), _writeService.shared(), _hwInfo); auto writer = std::make_shared<AttributeWriter>(mgr); FastAccessFeedView::Context fastUpdateCtx(writer, _docIdLimit); - _feedView.set(std::make_shared<FastAccessFeedView>(storeOnlyCtx, params, fastUpdateCtx)); + _feedView.set(std::make_shared<FastAccessFeedView>(std::move(storeOnlyCtx), params, fastUpdateCtx)); } }; @@ -446,11 +444,7 @@ TEST_F("require that we can reconfigure index searchable", Fixture) } { // verify feed view FeedViewComparer cmp(o.fv, n.fv); - cmp.expect_not_equal(); - cmp.expect_equal_index_adapter(); - cmp.expect_equal_attribute_writer(); - cmp.expect_equal_summary_adapter(); - cmp.expect_equal_schema(); + cmp.expect_equal(); } } @@ -604,11 +598,7 @@ TEST_F("require that we can reconfigure matchers", Fixture) } { // verify feed view FeedViewComparer cmp(o.fv, n.fv); - cmp.expect_not_equal(); - cmp.expect_equal_index_adapter(); - cmp.expect_equal_attribute_writer(); - cmp.expect_equal_summary_adapter(); - cmp.expect_equal_schema(); + cmp.expect_equal(); } } diff --git a/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp b/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp index 9bb8865707d..b875ab8e058 100644 --- a/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp +++ b/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp @@ -4,7 +4,6 @@ #include <vespa/searchcore/proton/attribute/ifieldupdatecallback.h> #include <vespa/searchcore/proton/test/bucketfactory.h> #include <vespa/searchcore/proton/common/feedtoken.h> -#include <vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.h> #include <vespa/searchcore/proton/index/i_index_writer.h> #include <vespa/searchcore/proton/server/executorthreadingservice.h> #include <vespa/searchcore/proton/server/isummaryadapter.h> @@ -34,7 +33,6 @@ using document::DocumentId; using document::DocumentUpdate; using proton::matching::SessionManager; using proton::test::MockGidToLidChangeHandler; -using proton::documentmetastore::LidReuseDelayerConfig; using search::AttributeVector; using search::CacheStats; using search::DocumentMetaData; @@ -491,6 +489,7 @@ FeedTokenContext::~FeedTokenContext() = default; struct FixtureBase { MyTracer _tracer; + std::shared_ptr<PendingLidTracker> _pendingLidsForCommit; SchemaContext sc; IIndexWriter::SP iw; ISummaryAdapter::SP sa; @@ -505,10 +504,9 @@ struct FixtureBase vespalib::ThreadStackExecutor _sharedExecutor; ExecutorThreadingService _writeServiceReal; test::ThreadingServiceObserver _writeService; - vespalib::duration _visibilityDelay; SerialNum serial; std::shared_ptr<MyGidToLidChangeHandler> _gidToLidChangeHandler; - FixtureBase(vespalib::duration visibilityDelay); + FixtureBase(); virtual ~FixtureBase(); @@ -678,8 +676,9 @@ struct FixtureBase }; -FixtureBase::FixtureBase(vespalib::duration visibilityDelay) +FixtureBase::FixtureBase() : _tracer(), + _pendingLidsForCommit(std::make_shared<PendingLidTracker>()), sc(), iw(std::make_shared<MyIndexWriter>(_tracer)), sa(std::make_shared<MySummaryAdapter>(*sc._builder->getDocumentTypeRepo())), @@ -694,7 +693,6 @@ FixtureBase::FixtureBase(vespalib::duration visibilityDelay) _sharedExecutor(1, 0x10000), _writeServiceReal(_sharedExecutor), _writeService(_writeServiceReal), - _visibilityDelay(visibilityDelay), serial(0), _gidToLidChangeHandler(std::make_shared<MyGidToLidChangeHandler>()) { @@ -710,44 +708,44 @@ FixtureBase::populateBeforeCompactLidSpace() { putAndWait(makeDummyDocs(0, 2, 1000)); removeAndWait(makeDummyDocs(1, 1, 2000)); + forceCommitAndWait(); } struct SearchableFeedViewFixture : public FixtureBase { SearchableFeedView fv; - SearchableFeedViewFixture(vespalib::duration visibilityDelay = 0ms) : - FixtureBase(visibilityDelay), - fv(StoreOnlyFeedView::Context(sa, - sc._schema, - _dmsc, - *_gidToLidChangeHandler, - sc.getRepo(), - _writeService, - LidReuseDelayerConfig(_visibilityDelay, true)), + SearchableFeedViewFixture() : + FixtureBase(), + fv(StoreOnlyFeedView::Context(sa, sc._schema, _dmsc, + sc.getRepo(), _pendingLidsForCommit, + *_gidToLidChangeHandler, _writeService), pc.getParams(), FastAccessFeedView::Context(aw, _docIdLimit), SearchableFeedView::Context(iw)) { } + ~SearchableFeedViewFixture() override + { + forceCommitAndWait(); + } IFeedView &getFeedView() override { return fv; } }; struct FastAccessFeedViewFixture : public FixtureBase { FastAccessFeedView fv; - FastAccessFeedViewFixture(vespalib::duration visibilityDelay = vespalib::duration::zero()) : - FixtureBase(visibilityDelay), - fv(StoreOnlyFeedView::Context(sa, - sc._schema, - _dmsc, - *_gidToLidChangeHandler, - sc.getRepo(), - _writeService, - LidReuseDelayerConfig(_visibilityDelay, false)), + FastAccessFeedViewFixture() : + FixtureBase(), + fv(StoreOnlyFeedView::Context(sa, sc._schema, _dmsc, sc.getRepo(), _pendingLidsForCommit, + *_gidToLidChangeHandler, _writeService), pc.getParams(), FastAccessFeedView::Context(aw, _docIdLimit)) { } + ~FastAccessFeedViewFixture() override + { + forceCommitAndWait(); + } IFeedView &getFeedView() override { return fv; } }; @@ -907,12 +905,14 @@ TEST_F("require that remove() calls removeComplete() via delayed thread service" { EXPECT_TRUE(assertThreadObserver(0, 0, 0, f.writeServiceObserver())); f.putAndWait(f.doc1(10)); + f.forceCommitAndWait(); // put index fields handled in index thread - EXPECT_TRUE(assertThreadObserver(1, 1, 1, f.writeServiceObserver())); + EXPECT_TRUE(assertThreadObserver(2, 2, 2, f.writeServiceObserver())); f.removeAndWait(f.doc1(20)); + f.forceCommitAndWait(); // remove index fields handled in index thread // delayed remove complete handled in same index thread, then master thread - EXPECT_TRUE(assertThreadObserver(3, 2, 2, f.writeServiceObserver())); + EXPECT_TRUE(assertThreadObserver(5, 4, 4, f.writeServiceObserver())); EXPECT_EQUAL(1u, f.metaStoreObserver()._removeCompleteCnt); EXPECT_EQUAL(1u, f.metaStoreObserver()._removeCompleteLid); } @@ -995,18 +995,25 @@ TEST_F("require that removes are not remembered", SearchableFeedViewFixture) docs.push_back(f.doc("id:test:searchdocument:n=2:2", 14)); f.putAndWait(docs); + f.forceCommitAndWait(); f.removeAndWait(docs[0]); + f.forceCommitAndWait(); f.removeAndWait(docs[3]); + f.forceCommitAndWait(); assertPostConditionAfterRemoves(docs, f); // try to remove again : should have little effect f.removeAndWait(docs[0]); + f.forceCommitAndWait(); f.removeAndWait(docs[3]); + f.forceCommitAndWait(); assertPostConditionAfterRemoves(docs, f); // re-add docs f.putAndWait(docs[3]); + f.forceCommitAndWait(); f.putAndWait(docs[0]); + f.forceCommitAndWait(); EXPECT_EQUAL(5u, f.getMetaStore().getNumUsedLids()); EXPECT_TRUE(f.getMetaData(docs[0]).valid()); EXPECT_TRUE(f.getMetaData(docs[1]).valid()); @@ -1030,7 +1037,9 @@ TEST_F("require that removes are not remembered", SearchableFeedViewFixture) EXPECT_EQUAL(5u, f.msa._store._docs.size()); f.removeAndWait(docs[0]); + f.forceCommitAndWait(); f.removeAndWait(docs[3]); + f.forceCommitAndWait(); EXPECT_EQUAL(3u, f.msa._store._docs.size()); } @@ -1047,11 +1056,13 @@ void putDocumentAndUpdate(Fixture &f, const vespalib::string &fieldName) { DocumentContext dc1 = f.doc1(); f.putAndWait(dc1); + f.forceCommitAndWait(); EXPECT_EQUAL(1u, f.msa._store._lastSyncToken); DocumentContext dc2("id:ns:searchdocument::1", 20, f.getBuilder()); dc2.addFieldUpdate(f.getBuilder(), fieldName); f.updateAndWait(dc2); + f.forceCommitAndWait(); } template <typename Fixture> @@ -1127,11 +1138,11 @@ TEST_F("require that compactLidSpace() propagates to document meta store and doc SearchableFeedViewFixture) { f.populateBeforeCompactLidSpace(); - EXPECT_TRUE(assertThreadObserver(4, 3, 3, f.writeServiceObserver())); + EXPECT_TRUE(assertThreadObserver(5, 4, 4, f.writeServiceObserver())); f.compactLidSpaceAndWait(2); // performIndexForceCommit in index thread, then completion callback // in master thread. - EXPECT_TRUE(assertThreadObserver(6, 5, 5, f.writeServiceObserver())); + EXPECT_TRUE(assertThreadObserver(7, 6, 6, f.writeServiceObserver())); EXPECT_EQUAL(2u, f.metaStoreObserver()._compactLidSpaceLidLimit); EXPECT_EQUAL(2u, f.getDocumentStore()._compactLidSpaceLidLimit); EXPECT_EQUAL(1u, f.metaStoreObserver()._holdUnblockShrinkLidSpaceCnt); @@ -1144,12 +1155,12 @@ TEST_F("require that compactLidSpace() doesn't propagate to " SearchableFeedViewFixture) { f.populateBeforeCompactLidSpace(); - EXPECT_TRUE(assertThreadObserver(4, 3, 3, f.writeServiceObserver())); + EXPECT_TRUE(assertThreadObserver(5, 4, 4, f.writeServiceObserver())); CompactLidSpaceOperation op(0, 2); op.setSerialNum(0); f.runInMaster([&] () { f.fv.handleCompactLidSpace(op); }); // Delayed holdUnblockShrinkLidSpace() in index thread, then master thread - EXPECT_TRUE(assertThreadObserver(5, 4, 3, f.writeServiceObserver())); + EXPECT_TRUE(assertThreadObserver(6, 5, 4, f.writeServiceObserver())); EXPECT_EQUAL(0u, f.metaStoreObserver()._compactLidSpaceLidLimit); EXPECT_EQUAL(0u, f.getDocumentStore()._compactLidSpaceLidLimit); EXPECT_EQUAL(0u, f.metaStoreObserver()._holdUnblockShrinkLidSpaceCnt); @@ -1171,40 +1182,14 @@ TEST_F("require that compactLidSpace() propagates to index writer", EXPECT_EQUAL(2u, f.miw._wantedLidLimit); } -const vespalib::duration LONG_DELAY = 60s; -const vespalib::duration SHORT_DELAY = 500ms; - -TEST_F("require that commit is not called when inside a commit interval", - SearchableFeedViewFixture(LONG_DELAY)) -{ - DocumentContext dc = f.doc1(); - f.putAndWait(dc); - EXPECT_EQUAL(0u, f.miw._commitCount); - EXPECT_EQUAL(0u, f.maw._commitCount); - EXPECT_EQUAL(0u, f._docIdLimit.get()); - f.removeAndWait(dc); - EXPECT_EQUAL(0u, f.miw._commitCount); - EXPECT_EQUAL(0u, f.maw._commitCount); - EXPECT_EQUAL(0u, f._docIdLimit.get()); - f.assertTrace("put(adapter=attribute,serialNum=1,lid=1)," - "put(adapter=index,serialNum=1,lid=1)," - "ack(Result(0, ))," - "remove(adapter=attribute,serialNum=2,lid=1)," - "remove(adapter=index,serialNum=2,lid=1)," - "ack(Result(0, ))"); - f.forceCommitAndWait(); -} - TEST_F("require that commit is not implicitly called", - SearchableFeedViewFixture(SHORT_DELAY)) + SearchableFeedViewFixture) { - std::this_thread::sleep_for(SHORT_DELAY + 100ms); DocumentContext dc = f.doc1(); f.putAndWait(dc); EXPECT_EQUAL(0u, f.miw._commitCount); EXPECT_EQUAL(0u, f.maw._commitCount); EXPECT_EQUAL(0u, f._docIdLimit.get()); - std::this_thread::sleep_for(SHORT_DELAY + 100ms); f.removeAndWait(dc); EXPECT_EQUAL(0u, f.miw._commitCount); EXPECT_EQUAL(0u, f.maw._commitCount); @@ -1219,7 +1204,7 @@ TEST_F("require that commit is not implicitly called", } TEST_F("require that forceCommit updates docid limit", - SearchableFeedViewFixture(LONG_DELAY)) + SearchableFeedViewFixture) { DocumentContext dc = f.doc1(); f.putAndWait(dc); @@ -1237,7 +1222,7 @@ TEST_F("require that forceCommit updates docid limit", "commit(adapter=index,serialNum=1)"); } -TEST_F("require that forceCommit updates docid limit during shrink", SearchableFeedViewFixture(LONG_DELAY)) +TEST_F("require that forceCommit updates docid limit during shrink", SearchableFeedViewFixture) { f.putAndWait(f.makeDummyDocs(0, 3, 1000)); EXPECT_EQUAL(0u, f._docIdLimit.get()); @@ -1262,13 +1247,17 @@ TEST_F("require that move() notifies gid to lid change handler", SearchableFeedV DocumentContext dc1 = f.doc("id::searchdocument::1", 10); DocumentContext dc2 = f.doc("id::searchdocument::2", 20); f.putAndWait(dc1); + f.forceCommitAndWait(); TEST_DO(f.assertChangeHandler(dc1.gid(), 1u, 1u)); f.putAndWait(dc2); + f.forceCommitAndWait(); TEST_DO(f.assertChangeHandler(dc2.gid(), 2u, 2u)); DocumentContext dc3 = f.doc("id::searchdocument::1", 30); f.removeAndWait(dc3); + f.forceCommitAndWait(); TEST_DO(f.assertChangeHandler(dc3.gid(), 0u, 3u)); f.moveAndWait(dc2, 2, 1); + f.forceCommitAndWait(); TEST_DO(f.assertChangeHandler(dc2.gid(), 1u, 4u)); } diff --git a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp index a3148aa9372..cd6b23e5e26 100644 --- a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp +++ b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp @@ -9,7 +9,6 @@ #include <vespa/searchcore/proton/attribute/i_attribute_manager.h> #include <vespa/searchcore/proton/bucketdb/bucket_create_notifier.h> #include <vespa/searchcore/proton/common/doctypename.h> -#include <vespa/searchcore/proton/common/feedtoken.h> #include <vespa/searchcore/proton/common/transient_memory_usage_provider.h> #include <vespa/searchcore/proton/documentmetastore/operation_listener.h> #include <vespa/searchcore/proton/feedoperation/moveoperation.h> diff --git a/searchcore/src/tests/proton/documentdb/storeonlyfeedview/storeonlyfeedview_test.cpp b/searchcore/src/tests/proton/documentdb/storeonlyfeedview/storeonlyfeedview_test.cpp index 3a75f8cd494..c874ce5e84a 100644 --- a/searchcore/src/tests/proton/documentdb/storeonlyfeedview/storeonlyfeedview_test.cpp +++ b/searchcore/src/tests/proton/documentdb/storeonlyfeedview/storeonlyfeedview_test.cpp @@ -3,7 +3,6 @@ #include <vespa/document/base/documentid.h> #include <vespa/document/datatype/datatype.h> #include <vespa/searchcommon/common/schema.h> -#include <vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.h> #include <vespa/searchcore/proton/server/executorthreadingservice.h> #include <vespa/searchcore/proton/server/putdonecontext.h> #include <vespa/searchcore/proton/server/removedonecontext.h> @@ -44,7 +43,7 @@ private: int &_heartbeatCount; public: - MySummaryAdapter(int &removeCount, int &putCount, int &heartbeatCount) + MySummaryAdapter(int &removeCount, int &putCount, int &heartbeatCount) noexcept : _rmCount(removeCount), _putCount(putCount), _heartbeatCount(heartbeatCount) { @@ -86,17 +85,17 @@ struct MyMinimalFeedView : public MyMinimalFeedViewBase, public StoreOnlyFeedVie MyMinimalFeedView(const ISummaryAdapter::SP &summaryAdapter, const DocumentMetaStore::SP &metaStore, searchcorespi::index::IThreadingService &writeService, - documentmetastore::LidReuseDelayerConfig &lidReuseDelayerConfig, const PersistentParams ¶ms, + std::shared_ptr<PendingLidTrackerBase> pendingLidsForCommit, int &outstandingMoveOps_) : MyMinimalFeedViewBase(), StoreOnlyFeedView(StoreOnlyFeedView::Context(summaryAdapter, - search::index::Schema::SP(), - std::make_shared<DocumentMetaStoreContext>(metaStore), - *gidToLidChangeHandler, + search::index::Schema::SP(), + std::make_shared<DocumentMetaStoreContext>(metaStore), myGetDocumentTypeRepo(), - writeService, - lidReuseDelayerConfig), + std::move(pendingLidsForCommit), + *gidToLidChangeHandler, + writeService), params), removeMultiAttributesCount(0), removeMultiIndexFieldsCount(0), @@ -134,11 +133,11 @@ struct MoveOperationFeedView : public MyMinimalFeedView { MoveOperationFeedView(const ISummaryAdapter::SP &summaryAdapter, const DocumentMetaStore::SP &metaStore, searchcorespi::index::IThreadingService &writeService, - documentmetastore::LidReuseDelayerConfig &lidReuseDelayerConfig, const PersistentParams ¶ms, + std::shared_ptr<PendingLidTrackerBase> pendingLidsForCommit, int &outstandingMoveOps_) : - MyMinimalFeedView(summaryAdapter, metaStore, writeService, lidReuseDelayerConfig, - params, outstandingMoveOps_), + MyMinimalFeedView(summaryAdapter, metaStore, writeService, + params, std::move(pendingLidsForCommit), outstandingMoveOps_), putAttributesCount(0), putIndexFieldsCount(0), removeAttributesCount(0), @@ -191,33 +190,35 @@ struct FixtureBase { DocumentMetaStore::SP metaStore; vespalib::ThreadStackExecutor sharedExecutor; ExecutorThreadingService writeService; - documentmetastore::LidReuseDelayerConfig lidReuseDelayerConfig; + std::shared_ptr<PendingLidTrackerBase> pendingLidsForCommit; typename FeedViewType::UP feedview; + SerialNum serial_num; explicit FixtureBase(SubDbType subDbType = SubDbType::READY) : removeCount(0), putCount(0), heartbeatCount(0), outstandingMoveOps(0), - metaStore(new DocumentMetaStore(std::make_shared<BucketDBOwner>(), - DocumentMetaStore::getFixedName(), - search::GrowStrategy(), - std::make_shared<DocumentMetaStore::DefaultGidCompare>(), - subDbType)), + metaStore(std::make_shared<DocumentMetaStore>(std::make_shared<BucketDBOwner>(), + DocumentMetaStore::getFixedName(), + search::GrowStrategy(), + std::make_shared<DocumentMetaStore::DefaultGidCompare>(), + subDbType)), sharedExecutor(1, 0x10000), writeService(sharedExecutor), - lidReuseDelayerConfig(), - feedview() + pendingLidsForCommit(std::make_shared<PendingLidTracker>()), + feedview(), + serial_num(2u) { StoreOnlyFeedView::PersistentParams params(0, 0, DocTypeName("foo"), subdb_id, subDbType); metaStore->constructFreeList(); - ISummaryAdapter::SP adapter = std::make_unique<MySummaryAdapter>(removeCount, putCount, heartbeatCount); - feedview = std::make_unique<FeedViewType>(adapter, metaStore, writeService, lidReuseDelayerConfig, - params, outstandingMoveOps); + ISummaryAdapter::SP adapter = std::make_shared<MySummaryAdapter>(removeCount, putCount, heartbeatCount); + feedview = std::make_unique<FeedViewType>(adapter, metaStore, writeService, + params, pendingLidsForCommit, outstandingMoveOps); } ~FixtureBase() { - writeService.sync(); + this->force_commit(); } void addSingleDocToMetaStore(uint32_t expected_lid) { @@ -243,6 +244,10 @@ struct FixtureBase { test::runInMaster(writeService, func); } + void force_commit() { + runInMaster([this] () { static_cast<IFeedView&>(*feedview).forceCommit(serial_num); }); + writeService.sync(); + } }; using Fixture = FixtureBase<MyMinimalFeedView>; diff --git a/searchcore/src/tests/proton/documentmetastore/lidreusedelayer/lidreusedelayer_test.cpp b/searchcore/src/tests/proton/documentmetastore/lidreusedelayer/lidreusedelayer_test.cpp index 453f7eb638e..6e4fe34a3c9 100644 --- a/searchcore/src/tests/proton/documentmetastore/lidreusedelayer/lidreusedelayer_test.cpp +++ b/searchcore/src/tests/proton/documentmetastore/lidreusedelayer/lidreusedelayer_test.cpp @@ -128,7 +128,6 @@ class Fixture { public: using LidReuseDelayer = documentmetastore::LidReuseDelayer; - using LidReuseDelayerConfig = documentmetastore::LidReuseDelayerConfig; vespalib::ThreadStackExecutor _sharedExecutor; ExecutorThreadingService _writeServiceReal; test::ThreadingServiceObserver _writeService; @@ -140,7 +139,7 @@ public: _writeServiceReal(_sharedExecutor), _writeService(_writeServiceReal), _store(), - _lidReuseDelayer(std::make_unique<LidReuseDelayer>(_writeService, _store, LidReuseDelayerConfig())) + _lidReuseDelayer(std::make_unique<LidReuseDelayer>(_writeService, _store)) { } @@ -195,15 +194,6 @@ public: return res; } - void - configureLidReuseDelayer(bool immediateCommit, bool hasIndexedOrAttributeFields) { - runInMaster([&] () { - _lidReuseDelayer = std::make_unique<LidReuseDelayer>(_writeService, _store, - LidReuseDelayerConfig(immediateCommit ? vespalib::duration::zero() : 1ms, - hasIndexedOrAttributeFields)); - } ); - } - void commit() { runInMaster([&] () { cycleLids(_lidReuseDelayer->getReuseLids()); }); } @@ -230,88 +220,43 @@ public: TEST_F("require that nothing happens before free list is active", Fixture) { - f.configureLidReuseDelayer(true, true); EXPECT_FALSE(f.delayReuse(4)); EXPECT_FALSE(f.delayReuse({ 5, 6})); EXPECT_TRUE(f._store.assertWork(0, 0, 0)); - EXPECT_TRUE(assertThreadObserver(3, 0, 0, f._writeService)); -} - - -TEST_F("require that single lid is delayed", Fixture) -{ - f._store._freeListActive = true; - f.configureLidReuseDelayer(true, true); - EXPECT_TRUE(f.delayReuse(4)); - f.scheduleDelayReuseLid(4); - EXPECT_TRUE(f._store.assertWork(1, 0, 1)); - EXPECT_TRUE(assertThreadObserver(4, 1, 0, f._writeService)); -} - - -TEST_F("require that lid vector is delayed", Fixture) -{ - f._store._freeListActive = true; - f.configureLidReuseDelayer(true, true); - EXPECT_TRUE(f.delayReuse({ 5, 6, 7})); - f.scheduleDelayReuseLids({ 5, 6, 7}); - EXPECT_TRUE(f._store.assertWork(0, 1, 3)); - EXPECT_TRUE(assertThreadObserver(4, 1, 0, f._writeService)); + EXPECT_TRUE(assertThreadObserver(2, 0, 0, f._writeService)); } TEST_F("require that reuse can be batched", Fixture) { f._store._freeListActive = true; - f.configureLidReuseDelayer(false, true); EXPECT_FALSE(f.delayReuse(4)); EXPECT_FALSE(f.delayReuse({ 5, 6, 7})); EXPECT_TRUE(f._store.assertWork(0, 0, 0)); - EXPECT_TRUE(assertThreadObserver(3, 0, 0, f._writeService)); + EXPECT_TRUE(assertThreadObserver(2, 0, 0, f._writeService)); f.commit(); EXPECT_TRUE(f._store.assertWork(0, 1, 4)); - EXPECT_TRUE(assertThreadObserver(5, 1, 0, f._writeService)); + EXPECT_TRUE(assertThreadObserver(4, 1, 0, f._writeService)); EXPECT_FALSE(f.delayReuse(8)); EXPECT_FALSE(f.delayReuse({ 9, 10})); EXPECT_TRUE(f._store.assertWork(0, 1, 4)); - EXPECT_TRUE(assertThreadObserver(7, 1, 0, f._writeService)); + EXPECT_TRUE(assertThreadObserver(6, 1, 0, f._writeService)); } TEST_F("require that single element array is optimized", Fixture) { f._store._freeListActive = true; - f.configureLidReuseDelayer(false, true); EXPECT_FALSE(f.delayReuse({ 4})); EXPECT_TRUE(f._store.assertWork(0, 0, 0)); - EXPECT_TRUE(assertThreadObserver(2, 0, 0, f._writeService)); + EXPECT_TRUE(assertThreadObserver(1, 0, 0, f._writeService)); f.commit(); - f.configureLidReuseDelayer(true, true); EXPECT_TRUE(f._store.assertWork(1, 0, 1)); - EXPECT_TRUE(assertThreadObserver(5, 1, 0, f._writeService)); - EXPECT_TRUE(f.delayReuse({ 8})); - f.scheduleDelayReuseLids({ 8}); - EXPECT_TRUE(f._store.assertWork(2, 0, 2)); - EXPECT_TRUE(assertThreadObserver(8, 2, 0, f._writeService)); + EXPECT_TRUE(assertThreadObserver(3, 1, 0, f._writeService)); } - -TEST_F("require that lids are reused faster with no indexed fields", Fixture) -{ - f._store._freeListActive = true; - f.configureLidReuseDelayer(true, false); - EXPECT_FALSE(f.delayReuse(4)); - EXPECT_TRUE(f._store.assertWork(1, 0, 1)); - EXPECT_TRUE(assertThreadObserver(2, 0, 0, f._writeService)); - EXPECT_FALSE(f.delayReuse({ 5, 6, 7})); - EXPECT_TRUE(f._store.assertWork(1, 1, 4)); - EXPECT_TRUE(assertThreadObserver(3, 0, 0, f._writeService)); } -} - - - TEST_MAIN() { TEST_RUN_ALL(); diff --git a/searchcore/src/vespa/searchcore/proton/common/pendinglidtracker.cpp b/searchcore/src/vespa/searchcore/proton/common/pendinglidtracker.cpp index dd6ca70248b..24ddacafaa8 100644 --- a/searchcore/src/vespa/searchcore/proton/common/pendinglidtracker.cpp +++ b/searchcore/src/vespa/searchcore/proton/common/pendinglidtracker.cpp @@ -1,8 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "pendinglidtracker.h" -#include <vespa/vespalib/stllike/hash_map.hpp> -#include <algorithm> #include <cassert> namespace proton { @@ -89,104 +87,4 @@ PendingLidTracker::pendingLids() const { return lids; } -TwoPhasePendingLidTracker::TwoPhasePendingLidTracker() - : _sequenceId(0), - _lastCommitStarted(0), - _lastCommitCompleted(0), - _pending() -{} - -TwoPhasePendingLidTracker::~TwoPhasePendingLidTracker() { - assert(_pending.empty()); -} - -IPendingLidTracker::Token -TwoPhasePendingLidTracker::produce(uint32_t lid) { - std::lock_guard guard(_mutex); - _pending[lid] = ++_sequenceId; - return Token(lid, *this); -} -void -TwoPhasePendingLidTracker::consume(uint32_t lid) { - (void) lid; -} - -ILidCommitState::State -TwoPhasePendingLidTracker::waitFor(MonitorGuard & guard, State state, uint32_t lid) const { - for (auto found = _pending.find(lid); found != _pending.end(); found = _pending.find(lid)) { - if (state == State::NEED_COMMIT) { - if (found->second > _lastCommitStarted) { - return State::NEED_COMMIT; - } - return State::WAITING; - } - _cond.wait(guard); - } - return State::COMPLETED; -} - -void -TwoPhasePendingLidTracker::consumeSnapshot(uint64_t sequenceIdWhenStarted) { - MonitorGuard guard(_mutex); - assert(sequenceIdWhenStarted >= _lastCommitCompleted); - _lastCommitCompleted = sequenceIdWhenStarted; - std::vector<uint32_t> committed; - for (const auto & entry : _pending) { - if (entry.second <= sequenceIdWhenStarted) - committed.push_back(entry.first); - } - for (uint32_t lid : committed) { - _pending.erase(lid); - } - _cond.notify_all(); -} - -ILidCommitState::LidList -TwoPhasePendingLidTracker::pendingLids() const { - MonitorGuard guard(_mutex); - LidList lids; - lids.reserve(_pending.size()); - for (const auto & entry : _pending) { - lids.push_back(entry.first); - } - return lids; -} - -namespace common::internal { - -class CommitList : public PendingLidTrackerBase::Payload { -public: - using LidList = ILidCommitState::LidList; - CommitList(uint64_t commitStarted, TwoPhasePendingLidTracker & tracker) - : _tracker(&tracker), - _commitStarted(commitStarted) - { } - CommitList(const CommitList &) = delete; - CommitList & operator = (const CommitList &) = delete; - CommitList & operator = (CommitList &&) = delete; - CommitList(CommitList && rhs) noexcept - : _tracker(rhs._tracker), - _commitStarted(rhs._commitStarted) - { - rhs._tracker = nullptr; - } - ~CommitList() override { - if (_tracker != nullptr) { - _tracker->consumeSnapshot(_commitStarted); - } - } -private: - TwoPhasePendingLidTracker * _tracker; - uint64_t _commitStarted; -}; - -} - -PendingLidTrackerBase::Snapshot -TwoPhasePendingLidTracker::produceSnapshot() { - MonitorGuard guard(_mutex); - _lastCommitStarted = _sequenceId; - return std::make_unique<common::internal::CommitList>(_lastCommitStarted, *this); -} - } diff --git a/searchcore/src/vespa/searchcore/proton/common/pendinglidtracker.h b/searchcore/src/vespa/searchcore/proton/common/pendinglidtracker.h index ef0a1dbb1a3..079634f56bb 100644 --- a/searchcore/src/vespa/searchcore/proton/common/pendinglidtracker.h +++ b/searchcore/src/vespa/searchcore/proton/common/pendinglidtracker.h @@ -60,30 +60,4 @@ private: vespalib::hash_map<uint32_t, uint32_t> _pending; }; -namespace common::internal { - class CommitList; -} -/** - * Use for tracking lids in 2 phases which is needed when visibility-delay is non-zero. - * It tracks lids that are in feed pipeline, lids where commit has been started and when they fully complete. - */ -class TwoPhasePendingLidTracker : public PendingLidTrackerBase -{ -public: - TwoPhasePendingLidTracker(); - ~TwoPhasePendingLidTracker() override; - Token produce(uint32_t lid) override; - Snapshot produceSnapshot() override; -private: - friend common::internal::CommitList; - void consume(uint32_t lid) override; - void consumeSnapshot(uint64_t sequenceIdWhenStarted); - LidList pendingLids() const override; - State waitFor(MonitorGuard & guard, State state, uint32_t lid) const override; - uint64_t _sequenceId; - uint64_t _lastCommitStarted; - uint64_t _lastCommitCompleted; - vespalib::hash_map<uint32_t, uint64_t> _pending; -}; - } diff --git a/searchcore/src/vespa/searchcore/proton/documentmetastore/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/documentmetastore/CMakeLists.txt index d257f29c9df..10e0c149ad2 100644 --- a/searchcore/src/vespa/searchcore/proton/documentmetastore/CMakeLists.txt +++ b/searchcore/src/vespa/searchcore/proton/documentmetastore/CMakeLists.txt @@ -12,7 +12,6 @@ vespa_add_library(searchcore_documentmetastore STATIC search_context.cpp lid_allocator.cpp lid_gid_key_comparator.cpp - lid_reuse_delayer_config.cpp lidreusedelayer.cpp lidstatevector.cpp lid_hold_list.cpp diff --git a/searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.cpp b/searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.cpp deleted file mode 100644 index b04bac5ef26..00000000000 --- a/searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.cpp +++ /dev/null @@ -1,25 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "lid_reuse_delayer_config.h" -#include <vespa/searchcore/proton/server/documentdbconfig.h> - -namespace proton::documentmetastore { - -LidReuseDelayerConfig::LidReuseDelayerConfig(const DocumentDBConfig & configSnapshot) - : _visibilityDelay(configSnapshot.getMaintenanceConfigSP()->getVisibilityDelay()), - _hasIndexedOrAttributeFields(configSnapshot.getSchemaSP()->getNumIndexFields() > 0 || - configSnapshot.getSchemaSP()->getNumAttributeFields() > 0) -{ -} - -LidReuseDelayerConfig::LidReuseDelayerConfig() - : LidReuseDelayerConfig(vespalib::duration::zero(), false) -{} - -LidReuseDelayerConfig::LidReuseDelayerConfig(vespalib::duration visibilityDelay, bool hasIndexedOrAttributeFields_in) - : _visibilityDelay(visibilityDelay), - _hasIndexedOrAttributeFields(hasIndexedOrAttributeFields_in) -{ -} - -} diff --git a/searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.h b/searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.h deleted file mode 100644 index 82dab433a22..00000000000 --- a/searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.h +++ /dev/null @@ -1,27 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#pragma once - -#include <vespa/vespalib/util/time.h> - -namespace proton { class DocumentDBConfig; } - -namespace proton::documentmetastore { - -/* - * Class representing configuration for lid reuse delayer. - */ -class LidReuseDelayerConfig -{ -private: - vespalib::duration _visibilityDelay; - bool _hasIndexedOrAttributeFields; -public: - LidReuseDelayerConfig(); - LidReuseDelayerConfig(vespalib::duration visibilityDelay, bool _hasIndexedOrAttributeFields_in); - explicit LidReuseDelayerConfig(const DocumentDBConfig &configSnapshot); - vespalib::duration visibilityDelay() const { return _visibilityDelay; } - bool hasIndexedOrAttributeFields() const { return _hasIndexedOrAttributeFields; } -}; - -} diff --git a/searchcore/src/vespa/searchcore/proton/documentmetastore/lidreusedelayer.cpp b/searchcore/src/vespa/searchcore/proton/documentmetastore/lidreusedelayer.cpp index 03dfd83a132..003812589d1 100644 --- a/searchcore/src/vespa/searchcore/proton/documentmetastore/lidreusedelayer.cpp +++ b/searchcore/src/vespa/searchcore/proton/documentmetastore/lidreusedelayer.cpp @@ -11,12 +11,9 @@ using searchcorespi::index::IThreadingService; using vespalib::makeClosure; using vespalib::makeTask; -LidReuseDelayer::LidReuseDelayer(IThreadingService &writeService, IStore &documentMetaStore, - const LidReuseDelayerConfig & config) +LidReuseDelayer::LidReuseDelayer(IThreadingService &writeService, IStore &documentMetaStore) : _writeService(writeService), _documentMetaStore(documentMetaStore), - _immediateCommit(config.visibilityDelay() == vespalib::duration::zero()), - _config(config), _pendingLids() { } @@ -31,15 +28,8 @@ LidReuseDelayer::delayReuse(uint32_t lid) assert(_writeService.master().isCurrentThread()); if ( ! _documentMetaStore.getFreeListActive()) return false; - if ( ! _immediateCommit) { - _pendingLids.push_back(lid); - return false; - } - if ( ! _config.hasIndexedOrAttributeFields() ) { - _documentMetaStore.removeComplete(lid); - return false; - } - return true; + _pendingLids.push_back(lid); + return false; } bool @@ -48,15 +38,8 @@ LidReuseDelayer::delayReuse(const std::vector<uint32_t> &lids) assert(_writeService.master().isCurrentThread()); if ( ! _documentMetaStore.getFreeListActive() || lids.empty()) return false; - if ( ! _immediateCommit) { - _pendingLids.insert(_pendingLids.end(), lids.cbegin(), lids.cend()); - return false; - } - if ( ! _config.hasIndexedOrAttributeFields()) { - _documentMetaStore.removeBatchComplete(lids); - return false; - } - return true; + _pendingLids.insert(_pendingLids.end(), lids.cbegin(), lids.cend()); + return false; } std::vector<uint32_t> diff --git a/searchcore/src/vespa/searchcore/proton/documentmetastore/lidreusedelayer.h b/searchcore/src/vespa/searchcore/proton/documentmetastore/lidreusedelayer.h index 0fe16636e1d..07cfbda1dba 100644 --- a/searchcore/src/vespa/searchcore/proton/documentmetastore/lidreusedelayer.h +++ b/searchcore/src/vespa/searchcore/proton/documentmetastore/lidreusedelayer.h @@ -2,8 +2,8 @@ #pragma once -#include "lid_reuse_delayer_config.h" #include <vector> +#include <cstdint> namespace searchcorespi::index { struct IThreadingService; } @@ -26,19 +26,14 @@ class LidReuseDelayer { searchcorespi::index::IThreadingService &_writeService; IStore &_documentMetaStore; - const bool _immediateCommit; - LidReuseDelayerConfig _config; std::vector<uint32_t> _pendingLids; // lids waiting for commit public: - LidReuseDelayer(searchcorespi::index::IThreadingService &writeService, IStore &documentMetaStore, - const LidReuseDelayerConfig & config); + LidReuseDelayer(searchcorespi::index::IThreadingService &writeService, IStore &documentMetaStore); ~LidReuseDelayer(); bool delayReuse(uint32_t lid); bool delayReuse(const std::vector<uint32_t> &lids); std::vector<uint32_t> getReuseLids(); - - const LidReuseDelayerConfig & getConfig() const { return _config; } }; } diff --git a/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.cpp b/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.cpp index 6f52a49749e..b3e32beba15 100644 --- a/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.cpp @@ -233,22 +233,6 @@ CombiningFeedView::forceCommit(search::SerialNum serialNum, DoneCallback onDone) } } -ILidCommitState & -CombiningFeedView::getUncommittedLidsTracker() { - LOG_ABORT("CombiningFeedView::getUncommittedLidsTracker should never be called."); -} - -bool -CombiningFeedView::isDrained() const -{ - for (const auto &view : _views) { - if ( ! view->isDrained()) { - return false; - } - } - return true; -} - void CombiningFeedView:: handlePruneRemovedDocuments(const PruneRemovedDocumentsOperation &pruneOp) diff --git a/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.h b/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.h index ce5cb303aba..194f64cdfaa 100644 --- a/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.h +++ b/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.h @@ -79,8 +79,6 @@ public: void sync() override; void handlePruneRemovedDocuments(const PruneRemovedDocumentsOperation &pruneOp) override; void handleCompactLidSpace(const CompactLidSpaceOperation &op) override; - ILidCommitState & getUncommittedLidsTracker() override; - bool isDrained() const override; // Called by document db executor void setCalculator(const IBucketStateCalculator::SP &newCalc); diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp index a176b1cf8c2..f509bcf31ef 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp @@ -479,6 +479,15 @@ DocumentDB::applyConfig(DocumentDBConfig::SP configSnapshot, SerialNum serialNum } } + +namespace { +void +doNothing(IFeedView::SP) +{ + // Called by index executor, delays when feed view is dropped. +} +} // namespace + void DocumentDB::performDropFeedView(IFeedView::SP feedView) { @@ -499,21 +508,7 @@ DocumentDB::performDropFeedView2(IFeedView::SP feedView) { // Also called by DocumentDB::receive() method to keep feed view alive _writeService.indexFieldInverter().sync(); _writeService.indexFieldWriter().sync(); - masterExecute([this, feedView]() { performDropFeedView3(feedView, 10); }); -} - -void -DocumentDB::performDropFeedView3(IFeedView::SP feedView, uint32_t numRetries) { - // We must keep the feedView allive until all operations are drained. - // TODO: This is a very brittle appraoch that we should reconsider. - if (feedView && ! feedView->isDrained()) { - LOG(warning, "FeedView for document type '%s' has not been drained. Reposting to check again. %d retries left", - getName().c_str(), numRetries); - if (numRetries > 0) { - _writeService.sync(); - masterExecute([this, feedView, numRetries]() { performDropFeedView3(feedView, numRetries - 1); }); - } - } + masterExecute([feedView]() { doNothing(feedView); }); } diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.h b/searchcore/src/vespa/searchcore/proton/server/documentdb.h index c94b8ffca46..a171861b590 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdb.h +++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.h @@ -189,7 +189,6 @@ private: */ void performDropFeedView(IFeedView::SP feedView); void performDropFeedView2(IFeedView::SP feedView); - void performDropFeedView3(IFeedView::SP feedView, uint32_t numRetries); /** * Implements IFeedHandlerOwner diff --git a/searchcore/src/vespa/searchcore/proton/server/documentsubdbcollection.cpp b/searchcore/src/vespa/searchcore/proton/server/documentsubdbcollection.cpp index bb1cbcf9371..85d9c767535 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentsubdbcollection.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/documentsubdbcollection.cpp @@ -129,9 +129,9 @@ DocumentSubDBCollection::createRetrievers() namespace { IDocumentRetriever::SP -wrapRetriever(IDocumentRetriever::SP retriever, ILidCommitState & unCommitedLidsTracker) +wrapRetriever(IDocumentRetriever::SP retriever, ILidCommitState & unCommittedLidsTracker) { - return std::make_shared<CommitAndWaitDocumentRetriever>(std::move(retriever), unCommitedLidsTracker); + return std::make_shared<CommitAndWaitDocumentRetriever>(std::move(retriever), unCommittedLidsTracker); } } @@ -145,11 +145,11 @@ DocumentSubDBCollection::getRetrievers(IDocumentRetriever::ReadConsistency consi wrappedList->reserve(list->size()); assert(list->size() == 3); wrappedList->push_back(wrapRetriever((*list)[_readySubDbId], - getReadySubDB()->getFeedView()->getUncommittedLidsTracker())); + getReadySubDB()->getUncommittedLidsTracker())); wrappedList->push_back(wrapRetriever((*list)[_remSubDbId], - getRemSubDB()->getFeedView()->getUncommittedLidsTracker())); + getRemSubDB()->getUncommittedLidsTracker())); wrappedList->push_back(wrapRetriever((*list)[_notReadySubDbId], - getNotReadySubDB()->getFeedView()->getUncommittedLidsTracker())); + getNotReadySubDB()->getUncommittedLidsTracker())); return wrappedList; } else { return list; @@ -162,18 +162,18 @@ void DocumentSubDBCollection::maintenanceSync(MaintenanceController &mc) { _readySubDbId, getReadySubDB()->getDocumentMetaStoreContext().getSP(), wrapRetriever((*retrievers)[_readySubDbId], - getReadySubDB()->getFeedView()->getUncommittedLidsTracker()), + getReadySubDB()->getUncommittedLidsTracker()), getReadySubDB()->getFeedView()); MaintenanceDocumentSubDB remSubDB(getRemSubDB()->getName(), _remSubDbId, getRemSubDB()->getDocumentMetaStoreContext().getSP(), - wrapRetriever((*retrievers)[_remSubDbId], getRemSubDB()->getFeedView()->getUncommittedLidsTracker()), + wrapRetriever((*retrievers)[_remSubDbId], getRemSubDB()->getUncommittedLidsTracker()), getRemSubDB()->getFeedView()); MaintenanceDocumentSubDB notReadySubDB(getNotReadySubDB()->getName(), _notReadySubDbId, getNotReadySubDB()->getDocumentMetaStoreContext().getSP(), wrapRetriever((*retrievers)[_notReadySubDbId], - getNotReadySubDB()->getFeedView()->getUncommittedLidsTracker()), + getNotReadySubDB()->getUncommittedLidsTracker()), getNotReadySubDB()->getFeedView()); mc.syncSubDBs(readySubDB, remSubDB, notReadySubDB); } diff --git a/searchcore/src/vespa/searchcore/proton/server/fast_access_doc_subdb_configurer.cpp b/searchcore/src/vespa/searchcore/proton/server/fast_access_doc_subdb_configurer.cpp index e0d9f28252f..a55267e3eb2 100644 --- a/searchcore/src/vespa/searchcore/proton/server/fast_access_doc_subdb_configurer.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/fast_access_doc_subdb_configurer.cpp @@ -15,22 +15,21 @@ namespace proton { using ARIConfig = AttributeReprocessingInitializer::Config; void -FastAccessDocSubDBConfigurer::reconfigureFeedView(const FastAccessFeedView::SP &curr, - const Schema::SP &schema, - const std::shared_ptr<const DocumentTypeRepo> &repo, - IAttributeWriter::SP writer, - const LidReuseDelayerConfig & lidReuseDelayerConfig) +FastAccessDocSubDBConfigurer::reconfigureFeedView(FastAccessFeedView & curr, + Schema::SP schema, + std::shared_ptr<const DocumentTypeRepo> repo, + IAttributeWriter::SP writer) { _feedView.set(std::make_shared<FastAccessFeedView>( - StoreOnlyFeedView::Context(curr->getSummaryAdapter(), - schema, - curr->getDocumentMetaStore(), - curr->getGidToLidChangeHandler(), - repo, - curr->getWriteService(), - lidReuseDelayerConfig), - curr->getPersistentParams(), - FastAccessFeedView::Context(std::move(writer),curr->getDocIdLimit()))); + StoreOnlyFeedView::Context(curr.getSummaryAdapter(), + std::move(schema), + curr.getDocumentMetaStore(), + std::move(repo), + curr.getUncommittedLidTracker(), + curr.getGidToLidChangeHandler(), + curr.getWriteService()), + curr.getPersistentParams(), + FastAccessFeedView::Context(std::move(writer),curr.getDocIdLimit()))); } FastAccessDocSubDBConfigurer::FastAccessDocSubDBConfigurer(FeedViewVarHolder &feedView, @@ -51,7 +50,7 @@ FastAccessDocSubDBConfigurer::reconfigure(const DocumentDBConfig &newConfig, { FastAccessFeedView::SP oldView = _feedView.get(); IAttributeWriter::SP writer = _factory->create(oldView->getAttributeWriter(), attrSpec); - reconfigureFeedView(oldView, newConfig.getSchemaSP(), newConfig.getDocumentTypeRepoSP(), writer, LidReuseDelayerConfig(newConfig)); + reconfigureFeedView(*oldView, newConfig.getSchemaSP(), newConfig.getDocumentTypeRepoSP(), writer); const document::DocumentType *newDocType = newConfig.getDocumentType(); const document::DocumentType *oldDocType = oldConfig.getDocumentType(); diff --git a/searchcore/src/vespa/searchcore/proton/server/fast_access_doc_subdb_configurer.h b/searchcore/src/vespa/searchcore/proton/server/fast_access_doc_subdb_configurer.h index 2c07d904339..0321ee1335c 100644 --- a/searchcore/src/vespa/searchcore/proton/server/fast_access_doc_subdb_configurer.h +++ b/searchcore/src/vespa/searchcore/proton/server/fast_access_doc_subdb_configurer.h @@ -5,7 +5,6 @@ #include "fast_access_feed_view.h" #include "i_attribute_writer_factory.h" #include <vespa/searchcore/proton/reprocessing/i_reprocessing_initializer.h> -#include <vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.h> namespace proton { @@ -17,18 +16,16 @@ class FastAccessDocSubDBConfigurer { public: using FeedViewVarHolder = vespalib::VarHolder<FastAccessFeedView::SP>; - using LidReuseDelayerConfig = documentmetastore::LidReuseDelayerConfig; private: FeedViewVarHolder &_feedView; IAttributeWriterFactory::UP _factory; vespalib::string _subDbName; - void reconfigureFeedView(const FastAccessFeedView::SP &curr, - const search::index::Schema::SP &schema, - const std::shared_ptr<const document::DocumentTypeRepo> &repo, - IAttributeWriter::SP attrWriter, - const LidReuseDelayerConfig & lidReuseDelayerConfig); + void reconfigureFeedView(FastAccessFeedView & curr, + search::index::Schema::SP schema, + std::shared_ptr<const document::DocumentTypeRepo> repo, + IAttributeWriter::SP attrWriter); public: FastAccessDocSubDBConfigurer(FeedViewVarHolder &feedView, diff --git a/searchcore/src/vespa/searchcore/proton/server/fast_access_feed_view.cpp b/searchcore/src/vespa/searchcore/proton/server/fast_access_feed_view.cpp index 52b4d869ce8..c8fa5c1e6c4 100644 --- a/searchcore/src/vespa/searchcore/proton/server/fast_access_feed_view.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/fast_access_feed_view.cpp @@ -61,9 +61,8 @@ FastAccessFeedView::heartBeatAttributes(SerialNum serialNum) _attributeWriter->heartBeat(serialNum); } -FastAccessFeedView::FastAccessFeedView(const StoreOnlyFeedView::Context &storeOnlyCtx, - const PersistentParams ¶ms, const Context &ctx) - : Parent(storeOnlyCtx, params), +FastAccessFeedView::FastAccessFeedView(StoreOnlyFeedView::Context storeOnlyCtx, const PersistentParams ¶ms, const Context &ctx) + : Parent(std::move(storeOnlyCtx), params), _attributeWriter(ctx._attrWriter), _docIdLimit(ctx._docIdLimit) {} diff --git a/searchcore/src/vespa/searchcore/proton/server/fast_access_feed_view.h b/searchcore/src/vespa/searchcore/proton/server/fast_access_feed_view.h index e0823be3e43..5956368b3dd 100644 --- a/searchcore/src/vespa/searchcore/proton/server/fast_access_feed_view.h +++ b/searchcore/src/vespa/searchcore/proton/server/fast_access_feed_view.h @@ -51,8 +51,7 @@ protected: void internalForceCommit(SerialNum serialNum, OnForceCommitDoneType onCommitDone) override; public: - FastAccessFeedView(const StoreOnlyFeedView::Context &storeOnlyCtx, - const PersistentParams ¶ms, const Context &ctx); + FastAccessFeedView(StoreOnlyFeedView::Context storeOnlyCtx, const PersistentParams ¶ms, const Context &ctx); ~FastAccessFeedView(); virtual const IAttributeWriter::SP &getAttributeWriter() const { diff --git a/searchcore/src/vespa/searchcore/proton/server/idocumentsubdb.h b/searchcore/src/vespa/searchcore/proton/server/idocumentsubdb.h index 65724e66913..c19f38b738f 100644 --- a/searchcore/src/vespa/searchcore/proton/server/idocumentsubdb.h +++ b/searchcore/src/vespa/searchcore/proton/server/idocumentsubdb.h @@ -41,6 +41,7 @@ class ISummaryAdapter; class ISummaryManager; class ReconfigParams; class RemoveDocumentsOperation; +class PendingLidTrackerBase; /** * Interface for a document sub database that handles a subset of the documents that belong to a @@ -119,6 +120,7 @@ public: virtual std::shared_ptr<IDocumentDBReference> getDocumentDBReference() = 0; virtual void tearDownReferences(IDocumentDBReferenceResolver &resolver) = 0; virtual void validateDocStore(FeedHandler &op, SerialNum serialNum) const = 0; + virtual PendingLidTrackerBase & getUncommittedLidsTracker() = 0; }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/ifeedview.h b/searchcore/src/vespa/searchcore/proton/server/ifeedview.h index aea8c909b8c..0fc645fdf78 100644 --- a/searchcore/src/vespa/searchcore/proton/server/ifeedview.h +++ b/searchcore/src/vespa/searchcore/proton/server/ifeedview.h @@ -4,7 +4,6 @@ #include <vespa/searchcore/proton/common/feedtoken.h> #include <vespa/searchlib/common/serialnum.h> -#include <vespa/searchcore/proton/common/ipendinglidtracker.h> namespace document { class DocumentTypeRepo; } @@ -64,8 +63,6 @@ public: void forceCommit(search::SerialNum serialNum) { forceCommit(serialNum, DoneCallback()); } virtual void handlePruneRemovedDocuments(const PruneRemovedDocumentsOperation & pruneOp) = 0; virtual void handleCompactLidSpace(const CompactLidSpaceOperation &op) = 0; - virtual ILidCommitState & getUncommittedLidsTracker() = 0; - virtual bool isDrained() const = 0; }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/searchable_doc_subdb_configurer.cpp b/searchcore/src/vespa/searchcore/proton/server/searchable_doc_subdb_configurer.cpp index 8f34484dfe2..1e5c558af0b 100644 --- a/searchcore/src/vespa/searchcore/proton/server/searchable_doc_subdb_configurer.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/searchable_doc_subdb_configurer.cpp @@ -26,39 +26,22 @@ using matching::OnnxModels; typedef AttributeReprocessingInitializer::Config ARIConfig; void -SearchableDocSubDBConfigurer::reconfigureFeedView(const SearchView::SP &searchView) -{ - SearchableFeedView::SP curr = _feedView.get(); - reconfigureFeedView(curr->getIndexWriter(), - curr->getSummaryAdapter(), - curr->getAttributeWriter(), - curr->getSchema(), - curr->getDocumentTypeRepo(), - searchView, - curr->getLidReuseDelayerConfig()); -} - -void -SearchableDocSubDBConfigurer::reconfigureFeedView(const IIndexWriter::SP &indexWriter, - const ISummaryAdapter::SP &summaryAdapter, - IAttributeWriter::SP attrWriter, - const Schema::SP &schema, - const std::shared_ptr<const DocumentTypeRepo> &repo, - const SearchView::SP &searchView, - const LidReuseDelayerConfig & lidReuseDelayerConfig) +SearchableDocSubDBConfigurer::reconfigureFeedView(IAttributeWriter::SP attrWriter, + Schema::SP schema, + std::shared_ptr<const DocumentTypeRepo> repo) { SearchableFeedView::SP curr = _feedView.get(); _feedView.set(std::make_shared<SearchableFeedView>( - StoreOnlyFeedView::Context(summaryAdapter, - schema, - searchView->getDocumentMetaStore(), + StoreOnlyFeedView::Context(curr->getSummaryAdapter(), + std::move(schema), + curr->getDocumentMetaStore(), + std::move(repo), + curr->getUncommittedLidTracker(), curr->getGidToLidChangeHandler(), - repo, - curr->getWriteService(), - lidReuseDelayerConfig), + curr->getWriteService()), curr->getPersistentParams(), FastAccessFeedView::Context(std::move(attrWriter), curr->getDocIdLimit()), - SearchableFeedView::Context(indexWriter))); + SearchableFeedView::Context(curr->getIndexWriter()))); } void @@ -147,8 +130,6 @@ SearchableDocSubDBConfigurer::reconfigureIndexSearchable() const IIndexWriter::SP &indexWriter = feedView->getIndexWriter(); const searchcorespi::IIndexManager::SP &indexManager = indexWriter->getIndexManager(); reconfigureMatchView(indexManager->getSearchable()); - const SearchView::SP searchView(_searchView.get()); - reconfigureFeedView(searchView); } void @@ -249,7 +230,6 @@ SearchableDocSubDBConfigurer::reconfigure(const DocumentDBConfig &newConfig, IndexSearchable::SP indexSearchable = searchView->getIndexSearchable(); reconfigureMatchView(matchers, indexSearchable, attrMgr); searchView = _searchView.get(); - shouldFeedViewChange = true; } if (shouldSearchViewChange) { @@ -257,14 +237,9 @@ SearchableDocSubDBConfigurer::reconfigure(const DocumentDBConfig &newConfig, } if (shouldFeedViewChange) { - SearchableFeedView::SP curr = _feedView.get(); - reconfigureFeedView(curr->getIndexWriter(), - curr->getSummaryAdapter(), - std::move(attrWriter), + reconfigureFeedView(std::move(attrWriter), newConfig.getSchemaSP(), - newConfig.getDocumentTypeRepoSP(), - searchView, - LidReuseDelayerConfig(newConfig)); + newConfig.getDocumentTypeRepoSP()); } return initializer; } diff --git a/searchcore/src/vespa/searchcore/proton/server/searchable_doc_subdb_configurer.h b/searchcore/src/vespa/searchcore/proton/server/searchable_doc_subdb_configurer.h index 0f86520fd0b..d460bb6506f 100644 --- a/searchcore/src/vespa/searchcore/proton/server/searchable_doc_subdb_configurer.h +++ b/searchcore/src/vespa/searchcore/proton/server/searchable_doc_subdb_configurer.h @@ -36,7 +36,6 @@ class SearchableDocSubDBConfigurer private: typedef vespalib::VarHolder<SearchView::SP> SearchViewHolder; typedef vespalib::VarHolder<SearchableFeedView::SP> FeedViewHolder; - using LidReuseDelayerConfig = documentmetastore::LidReuseDelayerConfig; const ISummaryManager::SP &_summaryMgr; SearchViewHolder &_searchView; FeedViewHolder &_feedView; @@ -46,15 +45,9 @@ private: vespalib::string _subDbName; uint32_t _distributionKey; - void reconfigureFeedView(const SearchView::SP &searchView); - - void reconfigureFeedView(const IIndexWriter::SP &indexWriter, - const ISummaryAdapter::SP &summaryAdapter, - IAttributeWriter::SP attrWriter, - const search::index::Schema::SP &schema, - const std::shared_ptr<const document::DocumentTypeRepo> &repo, - const SearchView::SP &searchView, - const LidReuseDelayerConfig & lidReuseDelayerConfig); + void reconfigureFeedView(IAttributeWriter::SP attrWriter, + search::index::Schema::SP schema, + std::shared_ptr<const document::DocumentTypeRepo> repo); void reconfigureMatchView(const searchcorespi::IndexSearchable::SP &indexSearchable); diff --git a/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp b/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp index ebef7b4b6d4..7cfad4f1ac1 100644 --- a/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp @@ -30,9 +30,9 @@ SearchableFeedView::Context::Context(const IIndexWriter::SP &indexWriter) SearchableFeedView::Context::~Context() = default; -SearchableFeedView::SearchableFeedView(const StoreOnlyFeedView::Context &storeOnlyCtx, const PersistentParams ¶ms, +SearchableFeedView::SearchableFeedView(StoreOnlyFeedView::Context storeOnlyCtx, const PersistentParams ¶ms, const FastAccessFeedView::Context &fastUpdateCtx, Context ctx) - : Parent(storeOnlyCtx, params, fastUpdateCtx), + : Parent(std::move(storeOnlyCtx), params, fastUpdateCtx), _indexWriter(ctx._indexWriter), _hasIndexedFields(_schema->getNumIndexFields() > 0) { } diff --git a/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.h b/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.h index 944d383e06d..5eee23c60f2 100644 --- a/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.h +++ b/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.h @@ -54,7 +54,7 @@ private: void internalForceCommit(SerialNum serialNum, OnForceCommitDoneType onCommitDone) override; public: - SearchableFeedView(const StoreOnlyFeedView::Context &storeOnlyCtx, const PersistentParams ¶ms, + SearchableFeedView(StoreOnlyFeedView::Context storeOnlyCtx, const PersistentParams ¶ms, const FastAccessFeedView::Context &fastUpdateCtx, Context ctx); ~SearchableFeedView() override; diff --git a/searchcore/src/vespa/searchcore/proton/server/searchabledocsubdb.cpp b/searchcore/src/vespa/searchcore/proton/server/searchabledocsubdb.cpp index aaae7621562..7f2b8fcaa63 100644 --- a/searchcore/src/vespa/searchcore/proton/server/searchabledocsubdb.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/searchabledocsubdb.cpp @@ -16,7 +16,6 @@ #include <vespa/searchlib/fef/indexproperties.h> #include <vespa/searchlib/fef/properties.h> #include <vespa/vespalib/util/closuretask.h> -#include <vespa/eval/tensor/default_tensor_engine.h> using vespa::config::search::RankProfilesConfig; using proton::matching::MatchingStats; @@ -241,9 +240,8 @@ SearchableDocSubDB::initFeedView(IAttributeWriter::SP attrWriter, /** * Handle reconfigure caused by index manager changing state. * - * Flush engine is disabled (for all document dbs) during initial replay and - * recovery feed modes, the flush engine has not started. For a resurrected - * document type, flushing might occur during replay. + * Flush engine is disabled (for all document dbs) during initial replay, the + * flush engine has not started. */ bool SearchableDocSubDB::reconfigure(vespalib::Closure0<bool>::UP closure) @@ -255,7 +253,6 @@ SearchableDocSubDB::reconfigure(vespalib::Closure0<bool>::UP closure) // Everything should be quiet now. SearchView::SP oldSearchView = _rSearchView.get(); - IFeedView::SP oldFeedView = _iFeedView.get(); bool ret = true; diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlydocsubdb.cpp b/searchcore/src/vespa/searchcore/proton/server/storeonlydocsubdb.cpp index 07ae2adba99..908daf02206 100644 --- a/searchcore/src/vespa/searchcore/proton/server/storeonlydocsubdb.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/storeonlydocsubdb.cpp @@ -44,7 +44,6 @@ using vespalib::GenericHeader; using search::common::FileHeaderContext; using proton::initializer::InitializerTask; using searchcorespi::IFlushTarget; -using proton::documentmetastore::LidReuseDelayerConfig; namespace proton { @@ -118,6 +117,7 @@ StoreOnlyDocSubDB::StoreOnlyDocSubDB(const Config &cfg, const Context &ctx) _tlsSyncer(ctx._writeService.master(), ctx._getSerialNum, ctx._tlSyncer), _dmsFlushTarget(), _dmsShrinkTarget(), + _pendingLidsForCommit(std::make_shared<PendingLidTracker>()), _subDbId(cfg._subDbId), _subDbType(cfg._subDbType), _fileHeaderContext(*this, ctx._fileHeaderContext, _docTypeName, _baseDir), @@ -338,8 +338,8 @@ StoreOnlyFeedView::Context StoreOnlyDocSubDB::getStoreOnlyFeedViewContext(const DocumentDBConfig &configSnapshot) { return StoreOnlyFeedView::Context(getSummaryAdapter(), configSnapshot.getSchemaSP(), _metaStoreCtx, - *_gidToLidChangeHandler, configSnapshot.getDocumentTypeRepoSP(), - _writeService, LidReuseDelayerConfig(configSnapshot)); + configSnapshot.getDocumentTypeRepoSP(), _pendingLidsForCommit, + *_gidToLidChangeHandler, _writeService); } StoreOnlyFeedView::PersistentParams @@ -354,7 +354,7 @@ void StoreOnlyDocSubDB::initViews(const DocumentDBConfig &configSnapshot, const SessionManager::SP &sessionManager) { assert(_writeService.master().isCurrentThread()); - _iSearchView.set(ISearchHandler::SP(new EmptySearchView)); + _iSearchView.set(std::make_shared<EmptySearchView>()); { std::lock_guard<std::mutex> guard(_configMutex); initFeedView(configSnapshot); @@ -390,11 +390,11 @@ void StoreOnlyDocSubDB::initFeedView(const DocumentDBConfig &configSnapshot) { assert(_writeService.master().isCurrentThread()); - auto feedView = std::make_unique<StoreOnlyFeedView>(getStoreOnlyFeedViewContext(configSnapshot), + auto feedView = std::make_shared<StoreOnlyFeedView>(getStoreOnlyFeedViewContext(configSnapshot), getFeedViewPersistentParams()); // XXX: Not exception safe. - _iFeedView.set(StoreOnlyFeedView::SP(feedView.release())); + _iFeedView.set(std::move(feedView)); } vespalib::string diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlydocsubdb.h b/searchcore/src/vespa/searchcore/proton/server/storeonlydocsubdb.h index 8d7fce4544d..1cdd22fcc41 100644 --- a/searchcore/src/vespa/searchcore/proton/server/storeonlydocsubdb.h +++ b/searchcore/src/vespa/searchcore/proton/server/storeonlydocsubdb.h @@ -156,6 +156,7 @@ private: TlsSyncer _tlsSyncer; DocumentMetaStoreFlushTarget::SP _dmsFlushTarget; std::shared_ptr<ShrinkLidSpaceFlushTarget> _dmsShrinkTarget; + std::shared_ptr<PendingLidTrackerBase> _pendingLidsForCommit; IFlushTargetList getFlushTargets() override; protected: @@ -233,6 +234,7 @@ public: void close() override; std::shared_ptr<IDocumentDBReference> getDocumentDBReference() override; void tearDownReferences(IDocumentDBReferenceResolver &resolver) override; + PendingLidTrackerBase & getUncommittedLidsTracker() override { return *_pendingLidsForCommit; } }; } diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp index 56f25b11d9f..72eed56415e 100644 --- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp @@ -209,24 +209,19 @@ moveMetaData(documentmetastore::IStore &meta_store, const DocumentId & doc_id, c meta_store.move(op.getPrevLid(), op.getLid(), op.get_prepare_serial_num()); } -std::unique_ptr<PendingLidTrackerBase> -createUncommitedLidTracker() { - return std::make_unique<PendingLidTracker>(); -} - } // namespace -StoreOnlyFeedView::StoreOnlyFeedView(const Context &ctx, const PersistentParams ¶ms) +StoreOnlyFeedView::StoreOnlyFeedView(Context ctx, const PersistentParams ¶ms) : IFeedView(), FeedDebugger(), - _summaryAdapter(ctx._summaryAdapter), - _documentMetaStoreContext(ctx._documentMetaStoreContext), + _summaryAdapter(std::move(ctx._summaryAdapter)), + _documentMetaStoreContext(std::move(ctx._documentMetaStoreContext)), _repo(ctx._repo), _docType(nullptr), - _lidReuseDelayer(ctx._writeService, _documentMetaStoreContext->get(), ctx._lidReuseDelayerConfig), + _lidReuseDelayer(ctx._writeService, _documentMetaStoreContext->get()), _pendingLidsForDocStore(), - _pendingLidsForCommit(createUncommitedLidTracker()), - _schema(ctx._schema), + _pendingLidsForCommit(std::move(ctx._pendingLidsForCommit)), + _schema(std::move(ctx._schema)), _writeService(ctx._writeService), _params(params), _metaStore(_documentMetaStoreContext->get()), @@ -236,6 +231,8 @@ StoreOnlyFeedView::StoreOnlyFeedView(const Context &ctx, const PersistentParams } StoreOnlyFeedView::~StoreOnlyFeedView() = default; +StoreOnlyFeedView::Context::Context(Context &&) noexcept = default; +StoreOnlyFeedView::Context::~Context() = default; void StoreOnlyFeedView::sync() @@ -243,11 +240,6 @@ StoreOnlyFeedView::sync() _writeService.summary().sync(); } -ILidCommitState & -StoreOnlyFeedView::getUncommittedLidsTracker() { - return *_pendingLidsForCommit; -} - void StoreOnlyFeedView::forceCommit(SerialNum serialNum, DoneCallback onDone) { @@ -383,8 +375,6 @@ StoreOnlyFeedView::handleUpdate(FeedToken token, const UpdateOperation &updOp) void StoreOnlyFeedView::putSummary(SerialNum serialNum, Lid lid, FutureStream futureStream, OnOperationDoneType onDone) { -#pragma GCC diagnostic push -#pragma GCC diagnostic ignored "-Winline" // Avoid spurious inlining warning from GCC related to lambda destructor. summaryExecutor().execute( makeLambdaTask([serialNum, lid, futureStream = std::move(futureStream), trackerToken = _pendingLidsForDocStore.produce(lid), onDone, this] () mutable { (void) onDone; @@ -394,20 +384,16 @@ void StoreOnlyFeedView::putSummary(SerialNum serialNum, Lid lid, _summaryAdapter->put(serialNum, lid, os); } })); -#pragma GCC diagnostic pop } void StoreOnlyFeedView::putSummary(SerialNum serialNum, Lid lid, Document::SP doc, OnOperationDoneType onDone) { -#pragma GCC diagnostic push -#pragma GCC diagnostic ignored "-Winline" // Avoid spurious inlining warning from GCC related to lambda destructor. summaryExecutor().execute( makeLambdaTask([serialNum, doc = std::move(doc), trackerToken = _pendingLidsForDocStore.produce(lid), onDone, lid, this] { (void) onDone; (void) trackerToken; _summaryAdapter->put(serialNum, lid, *doc); })); -#pragma GCC diagnostic pop } void StoreOnlyFeedView::removeSummary(SerialNum serialNum, Lid lid, OnWriteDoneType onDone) { summaryExecutor().execute( @@ -852,10 +838,4 @@ StoreOnlyFeedView::getDocumentMetaStorePtr() const return &_documentMetaStoreContext->get(); } -bool -StoreOnlyFeedView::isDrained() const -{ - return _pendingLidsForDocStore.getState() == ILidCommitState::State::COMPLETED; -} - } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h index f2819170d5e..da7d5e53a88 100644 --- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h +++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h @@ -13,7 +13,6 @@ #include <vespa/searchcore/proton/common/feeddebugger.h> #include <vespa/searchcore/proton/documentmetastore/documentmetastore.h> #include <vespa/searchcore/proton/documentmetastore/documentmetastorecontext.h> -#include <vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.h> #include <vespa/searchcore/proton/documentmetastore/lidreusedelayer.h> #include <vespa/searchcore/proton/feedoperation/lidvectorcontext.h> #include <vespa/searchcore/proton/persistenceengine/resulthandler.h> @@ -68,36 +67,37 @@ public: using PromisedStream = std::promise<vespalib::nbostream>; using DocumentSP = std::shared_ptr<Document>; using DocumentUpdateSP = std::shared_ptr<DocumentUpdate>; - using LidReuseDelayerConfig = documentmetastore::LidReuseDelayerConfig; using LidReuseDelayer = documentmetastore::LidReuseDelayer; using Lid = search::DocumentIdT; struct Context { - const ISummaryAdapter::SP &_summaryAdapter; - const search::index::Schema::SP &_schema; - const IDocumentMetaStoreContext::SP &_documentMetaStoreContext; - IGidToLidChangeHandler &_gidToLidChangeHandler; - const std::shared_ptr<const document::DocumentTypeRepo> &_repo; - searchcorespi::index::IThreadingService &_writeService; - LidReuseDelayerConfig _lidReuseDelayerConfig; - - Context(const ISummaryAdapter::SP &summaryAdapter, - const search::index::Schema::SP &schema, - const IDocumentMetaStoreContext::SP &documentMetaStoreContext, + ISummaryAdapter::SP _summaryAdapter; + search::index::Schema::SP _schema; + IDocumentMetaStoreContext::SP _documentMetaStoreContext; + std::shared_ptr<const document::DocumentTypeRepo> _repo; + std::shared_ptr<PendingLidTrackerBase> _pendingLidsForCommit; + IGidToLidChangeHandler &_gidToLidChangeHandler; + searchcorespi::index::IThreadingService &_writeService; + + Context(ISummaryAdapter::SP summaryAdapter, + search::index::Schema::SP schema, + IDocumentMetaStoreContext::SP documentMetaStoreContext, + std::shared_ptr<const document::DocumentTypeRepo> repo, + std::shared_ptr<PendingLidTrackerBase> pendingLidsForCommit, IGidToLidChangeHandler &gidToLidChangeHandler, - const std::shared_ptr<const document::DocumentTypeRepo> &repo, - searchcorespi::index::IThreadingService &writeService, - const LidReuseDelayerConfig & lidReuseDelayerConfig) - : _summaryAdapter(summaryAdapter), - _schema(schema), - _documentMetaStoreContext(documentMetaStoreContext), + searchcorespi::index::IThreadingService &writeService) + : _summaryAdapter(std::move(summaryAdapter)), + _schema(std::move(schema)), + _documentMetaStoreContext(std::move(documentMetaStoreContext)), + _repo(std::move(repo)), + _pendingLidsForCommit(std::move(pendingLidsForCommit)), _gidToLidChangeHandler(gidToLidChangeHandler), - _repo(repo), - _writeService(writeService), - _lidReuseDelayerConfig(lidReuseDelayerConfig) + _writeService(writeService) {} + Context(Context &&) noexcept; + ~Context(); }; struct PersistentParams @@ -144,7 +144,7 @@ private: const document::DocumentType *_docType; LidReuseDelayer _lidReuseDelayer; PendingLidTracker _pendingLidsForDocStore; - std::unique_ptr<PendingLidTrackerBase> _pendingLidsForCommit; + std::shared_ptr<PendingLidTrackerBase> _pendingLidsForCommit; protected: const search::index::Schema::SP _schema; @@ -212,7 +212,7 @@ protected: virtual void removeIndexedFields(SerialNum serialNum, const LidVector &lidsToRemove, OnWriteDoneType onWriteDone); virtual void internalForceCommit(SerialNum serialNum, OnForceCommitDoneType onCommitDone); public: - StoreOnlyFeedView(const Context &ctx, const PersistentParams ¶ms); + StoreOnlyFeedView(Context ctx, const PersistentParams ¶ms); ~StoreOnlyFeedView() override; const ISummaryAdapter::SP &getSummaryAdapter() const { return _summaryAdapter; } @@ -222,7 +222,6 @@ public: const IDocumentMetaStoreContext::SP &getDocumentMetaStore() const { return _documentMetaStoreContext; } searchcorespi::index::IThreadingService &getWriteService() { return _writeService; } IGidToLidChangeHandler &getGidToLidChangeHandler() const { return _gidToLidChangeHandler; } - LidReuseDelayerConfig getLidReuseDelayerConfig() const { return _lidReuseDelayer.getConfig(); } const std::shared_ptr<const document::DocumentTypeRepo> &getDocumentTypeRepo() const override { return _repo; } const ISimpleDocumentMetaStore *getDocumentMetaStorePtr() const override; @@ -240,7 +239,6 @@ public: void heartBeat(search::SerialNum serialNum) override; void sync() override; void forceCommit(SerialNum serialNum, DoneCallback onDone) override; - bool isDrained() const override; /** * Prune lids present in operation. Caller must call doneSegment() @@ -250,7 +248,7 @@ public: */ void handlePruneRemovedDocuments(const PruneRemovedDocumentsOperation &pruneOp) override; void handleCompactLidSpace(const CompactLidSpaceOperation &op) override; - ILidCommitState & getUncommittedLidsTracker() override; + std::shared_ptr<PendingLidTrackerBase> getUncommittedLidTracker() { return _pendingLidsForCommit; } }; } diff --git a/searchcore/src/vespa/searchcore/proton/test/dummy_document_sub_db.h b/searchcore/src/vespa/searchcore/proton/test/dummy_document_sub_db.h index 7358a78de61..4c3839b3d0c 100644 --- a/searchcore/src/vespa/searchcore/proton/test/dummy_document_sub_db.h +++ b/searchcore/src/vespa/searchcore/proton/test/dummy_document_sub_db.h @@ -14,6 +14,7 @@ #include <vespa/searchcore/proton/summaryengine/isearchhandler.h> #include <vespa/searchcore/proton/persistenceengine/i_document_retriever.h> #include <vespa/searchcore/proton/server/reconfig_params.h> +#include <vespa/searchcore/proton/common/pendinglidtracker.h> namespace proton::test { @@ -28,6 +29,7 @@ struct DummyDocumentSubDb : public IDocumentSubDB IIndexWriter::SP _indexWriter; vespalib::ThreadStackExecutor _sharedExecutor; std::unique_ptr<ExecutorThreadingService> _writeService; + PendingLidTracker _pendingLidTracker; DummyDocumentSubDb(std::shared_ptr<BucketDBOwner> bucketDB, uint32_t subDbId) : _subDbId(subDbId), @@ -96,6 +98,11 @@ struct DummyDocumentSubDb : public IDocumentSubDB std::shared_ptr<IDocumentDBReference> getDocumentDBReference() override { return std::shared_ptr<IDocumentDBReference>(); } + + PendingLidTrackerBase &getUncommittedLidsTracker() override { + return _pendingLidTracker; + } + void tearDownReferences(IDocumentDBReferenceResolver &) override { } }; diff --git a/searchcore/src/vespa/searchcore/proton/test/dummy_feed_view.cpp b/searchcore/src/vespa/searchcore/proton/test/dummy_feed_view.cpp index 2507abcc9ea..631dce24044 100644 --- a/searchcore/src/vespa/searchcore/proton/test/dummy_feed_view.cpp +++ b/searchcore/src/vespa/searchcore/proton/test/dummy_feed_view.cpp @@ -17,9 +17,4 @@ DummyFeedView::DummyFeedView(std::shared_ptr<const document::DocumentTypeRepo> d DummyFeedView::~DummyFeedView() = default; -ILidCommitState & -DummyFeedView::getUncommittedLidsTracker() { - assert(false); -} - } diff --git a/searchcore/src/vespa/searchcore/proton/test/dummy_feed_view.h b/searchcore/src/vespa/searchcore/proton/test/dummy_feed_view.h index b50e47e8f9c..4ed50177965 100644 --- a/searchcore/src/vespa/searchcore/proton/test/dummy_feed_view.h +++ b/searchcore/src/vespa/searchcore/proton/test/dummy_feed_view.h @@ -33,8 +33,6 @@ struct DummyFeedView : public IFeedView void handlePruneRemovedDocuments(const PruneRemovedDocumentsOperation &) override {} void handleCompactLidSpace(const CompactLidSpaceOperation &) override {} void forceCommit(search::SerialNum, DoneCallback) override { } - ILidCommitState & getUncommittedLidsTracker() override; - bool isDrained() const override { return true; } }; } diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp index 02527883022..335863322d9 100644 --- a/storage/src/tests/persistence/mergehandlertest.cpp +++ b/storage/src/tests/persistence/mergehandlertest.cpp @@ -985,7 +985,7 @@ MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::beforeInvoke( auto cmd = std::make_shared<api::MergeBucketCommand>(test._bucket, test._nodes, test._maxTimestamp); handler.handleMergeBucket(*cmd, test.createTracker(cmd, test._bucket)); auto diffCmd = test.fetchSingleMessage<api::GetBucketDiffCommand>(); - auto dummyDiff = test.createDummyGetBucketDiff(100000 * _counter, 0x4); + auto dummyDiff = test.createDummyGetBucketDiff(100000 * _counter, 0x2); diffCmd->getDiff() = dummyDiff->getDiff(); api::GetBucketDiffReply diffReply(*diffCmd); @@ -1294,8 +1294,8 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply) // Node 4 has been eliminated before the first ApplyBucketDiff command EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}}), s.nodeList); EXPECT_EQ(baseline_diff_size + 2u, s.diff.size()); - EXPECT_EQ(EntryCheck(20000, 8u), s.diff[baseline_diff_size]); - EXPECT_EQ(EntryCheck(20100, 8u), s.diff[baseline_diff_size + 1]); + EXPECT_EQ(EntryCheck(20000, 24u), s.diff[baseline_diff_size]); + EXPECT_EQ(EntryCheck(20100, 24u), s.diff[baseline_diff_size + 1]); auto& cmd3 = dynamic_cast<api::ApplyBucketDiffCommand&>(*messageKeeper()._msgs[1]); // ApplyBucketDiffCommand has a shorter node list, node 2 is not present EXPECT_EQ((NodeList{{0, false}, {1, false}, {3, true}}), cmd3.getNodes()); @@ -1321,15 +1321,15 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply) auto &s = getEnv()._fileStorHandler.editMergeStatus(_bucket); EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}}), s.nodeList); EXPECT_EQ(baseline_diff_size + 1u, s.diff.size()); - EXPECT_EQ(EntryCheck(20100, 8u), s.diff[baseline_diff_size]); + EXPECT_EQ(EntryCheck(20100, 24u), s.diff[baseline_diff_size]); auto& cmd4 = dynamic_cast<api::ApplyBucketDiffCommand&>(*messageKeeper()._msgs[2]); EXPECT_EQ((NodeList{{0, false}, {1, false}, {3, true}}), cmd4.getNodes()); auto reply = std::make_unique<api::ApplyBucketDiffReply>(cmd4); auto& diff = reply->getDiff(); EXPECT_EQ(1u, diff.size()); EXPECT_EQ(EntryCheck(20100u, 4u), diff[0]._entry); - fill_entry(diff[0], *doc2, getEnv().getDocumentTypeRepo()); - diff[0]._entry._hasMask |= 2u; + // Simulate that node 3 somehow lost doc2 when trying to fill diff entry. + diff[0]._entry._hasMask &= ~4u; handler.handleApplyBucketDiffReply(*reply, messageKeeper()); LOG(debug, "handled second ApplyBucketDiffReply"); } @@ -1341,7 +1341,8 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply) auto &s = getEnv()._fileStorHandler.editMergeStatus(_bucket); // Nodes 3 and 2 have been eliminated before the third ApplyBucketDiff command EXPECT_EQ((NodeList{{0, false}, {1, false}}), s.nodeList); - EXPECT_EQ(baseline_diff_size, s.diff.size()); + EXPECT_EQ(baseline_diff_size + 1u, s.diff.size()); + EXPECT_EQ(EntryCheck(20100, 16u), s.diff[baseline_diff_size]); auto& cmd5 = dynamic_cast<api::ApplyBucketDiffCommand&>(*messageKeeper()._msgs[3]); EXPECT_EQ((NodeList{{0, false}, {1, false}}), cmd5.getNodes()); auto reply = std::make_unique<api::ApplyBucketDiffReply>(cmd5); @@ -1355,7 +1356,27 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply) LOG(debug, "handled third ApplyBucketDiffReply"); } ASSERT_EQ(5u, messageKeeper()._msgs.size()); - ASSERT_EQ(api::MessageType::MERGEBUCKET_REPLY, messageKeeper()._msgs[4]->getType()); + ASSERT_EQ(api::MessageType::APPLYBUCKETDIFF, messageKeeper()._msgs[4]->getType()); + { + LOG(debug, "checking fourth ApplyBucketDiff command"); + EXPECT_TRUE(getEnv()._fileStorHandler.isMerging(_bucket)); + auto &s = getEnv()._fileStorHandler.editMergeStatus(_bucket); + // All nodes in use again due to failure to fill diff entry for doc2 + EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}, {4, true}}), s.nodeList); + EXPECT_EQ(1u, s.diff.size()); + EXPECT_EQ(EntryCheck(20100, 16u), s.diff[0]); + auto& cmd6 = dynamic_cast<api::ApplyBucketDiffCommand&>(*messageKeeper()._msgs[4]); + EXPECT_EQ((NodeList{{0, false}, {1, false}, {4, true}}), cmd6.getNodes()); + auto reply = std::make_unique<api::ApplyBucketDiffReply>(cmd6); + auto& diff = reply->getDiff(); + EXPECT_EQ(1u, diff.size()); + fill_entry(diff[0], *doc2, getEnv().getDocumentTypeRepo()); + diff[0]._entry._hasMask |= 2u; + handler.handleApplyBucketDiffReply(*reply, messageKeeper()); + LOG(debug, "handled fourth ApplyBucketDiffReply"); + } + ASSERT_EQ(6u, messageKeeper()._msgs.size()); + ASSERT_EQ(api::MessageType::MERGEBUCKET_REPLY, messageKeeper()._msgs[5]->getType()); LOG(debug, "got mergebucket reply"); } diff --git a/storage/src/vespa/storage/persistence/filestorage/mergestatus.cpp b/storage/src/vespa/storage/persistence/filestorage/mergestatus.cpp index 2ecef59b567..2e390db69be 100644 --- a/storage/src/vespa/storage/persistence/filestorage/mergestatus.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/mergestatus.cpp @@ -12,7 +12,7 @@ namespace storage { MergeStatus::MergeStatus(const framework::Clock& clock, const metrics::LoadType& lt, api::StorageMessage::Priority priority, uint32_t traceLevel) - : reply(), nodeList(), maxTimestamp(0), diff(), pendingId(0), + : reply(), full_node_list(), nodeList(), maxTimestamp(0), diff(), pendingId(0), pendingGetDiff(), pendingApplyDiff(), timeout(0), startTime(clock), context(lt, priority, traceLevel) {} diff --git a/storage/src/vespa/storage/persistence/filestorage/mergestatus.h b/storage/src/vespa/storage/persistence/filestorage/mergestatus.h index 18ced81c280..51930f337c6 100644 --- a/storage/src/vespa/storage/persistence/filestorage/mergestatus.h +++ b/storage/src/vespa/storage/persistence/filestorage/mergestatus.h @@ -18,6 +18,7 @@ public: using SP = std::shared_ptr<MergeStatus>; std::shared_ptr<api::StorageReply> reply; + std::vector<api::MergeBucketCommand::Node> full_node_list; std::vector<api::MergeBucketCommand::Node> nodeList; framework::MicroSecTime maxTimestamp; std::deque<api::GetBucketDiffCommand::Entry> diff; diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp index 6e7fc30bd6c..51b575548d8 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.cpp +++ b/storage/src/vespa/storage/persistence/mergehandler.cpp @@ -651,18 +651,22 @@ MergeHandler::applyDiffLocally( } namespace { - void findCandidates(MergeStatus& status, bool constrictHasMask, uint16_t hasMask, + void findCandidates(MergeStatus& status, uint16_t active_nodes_mask, bool constrictHasMask, uint16_t hasMask, uint16_t newHasMask, api::ApplyBucketDiffCommand& cmd) { uint32_t chunkSize = 0; for (const auto& entry : status.diff) { - if (constrictHasMask && entry._hasMask != hasMask) { + uint16_t entry_has_mask = (entry._hasMask & active_nodes_mask); + if ((entry_has_mask == 0u) || + (constrictHasMask && (entry_has_mask != hasMask))) { continue; } chunkSize += entry._bodySize + entry._headerSize; cmd.getDiff().emplace_back(entry); if (constrictHasMask) { cmd.getDiff().back()._entry._hasMask = newHasMask; + } else { + cmd.getDiff().back()._entry._hasMask = entry_has_mask; } } } @@ -690,52 +694,70 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status, LOG(spam, "Processing merge of %s. %u entries left to merge.", bucket.toString().c_str(), (uint32_t) status.diff.size()); std::shared_ptr<api::ApplyBucketDiffCommand> cmd; - - // If we still have a source only node, eliminate that one from the - // merge. - while (status.nodeList.back().sourceOnly) { - std::vector<api::MergeBucketCommand::Node> nodes; - for (const auto& node : status.nodeList) { - if (!node.sourceOnly) { - nodes.emplace_back(node); + std::map<uint16_t, uint32_t> counts; + + uint16_t active_nodes_mask; + do { + active_nodes_mask = (1u << status.nodeList.size()) - 1; + // If we still have a source only node, eliminate that one from the + // merge. + while (status.nodeList.back().sourceOnly) { + std::vector<api::MergeBucketCommand::Node> nodes; + for (const auto& node : status.nodeList) { + if (!node.sourceOnly) { + nodes.emplace_back(node); + } + } + nodes.push_back(status.nodeList.back()); + assert(nodes.size() > 1); + + cmd = std::make_shared<api::ApplyBucketDiffCommand>(bucket.getBucket(), nodes); + cmd->setAddress(createAddress(_clusterName, nodes[1].index)); + findCandidates(status, + active_nodes_mask, + true, + 1 << (status.nodeList.size() - 1), + 1 << (nodes.size() - 1), + *cmd); + if (cmd->getDiff().size() != 0) { + break; + } + cmd.reset(); + // If we found no data to merge from the last source only node, + // remove it and retry. + status.nodeList.pop_back(); + active_nodes_mask = (1u << status.nodeList.size()) - 1; + // If only one node left in the merge, return ok. + if (status.nodeList.size() == 1) { + LOG(debug, "Done with merge of %s as there is only one node " + "that is not source only left in the merge.", + bucket.toString().c_str()); + return status.reply; } } - nodes.push_back(status.nodeList.back()); - assert(nodes.size() > 1); - - cmd = std::make_shared<api::ApplyBucketDiffCommand>(bucket.getBucket(), nodes); - cmd->setAddress(createAddress(_clusterName, nodes[1].index)); - findCandidates(status, - true, - 1 << (status.nodeList.size() - 1), - 1 << (nodes.size() - 1), - *cmd); - if (cmd->getDiff().size() != 0) break; - cmd.reset(); - // If we found no data to merge from the last source only node, - // remove it and retry. (Clear it out of the hasmask such that we - // can match hasmask with operator==) - status.nodeList.pop_back(); - uint16_t mask = ~(1 << status.nodeList.size()); - for (auto& e : status.diff) { - e._hasMask &= mask; - } - // If only one node left in the merge, return ok. - if (status.nodeList.size() == 1) { - LOG(debug, "Done with merge of %s as there is only one node " - "that is not source only left in the merge.", - bucket.toString().c_str()); - return status.reply; + if (!cmd) { + // If we did not have a source only node, check if we have a path with + // many documents within it that we'll merge separately + counts.clear(); + for (const auto& e : status.diff) { + ++counts[e._hasMask & active_nodes_mask]; + } + if (counts.size() == 1 && + counts.begin()->first == 0u && + status.nodeList.size() < status.full_node_list.size()) { + // Diff not empty, but none of the remaining nodes have any merge entries. + // Bring back source only nodes that might still have merge entries. + status.nodeList = status.full_node_list; + continue; + } } - } - // If we did not have a source only node, check if we have a path with - // many documents within it that we'll merge separately + break; + } while (true); if (!cmd) { - std::map<uint16_t, uint32_t> counts; - for (const auto& e : status.diff) { - ++counts[e._hasMask]; - } for (const auto& e : counts) { + if (e.first == 0u) { + continue; + } if (e.second >= uint32_t(_commonMergeChainOptimalizationMinimumSize) || counts.size() == 1) { @@ -769,7 +791,7 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status, cmd->setAddress(createAddress(_clusterName, nodes[1].index)); // Add all the metadata, and thus use big limit. Max // data to fetch parameter will control amount added. - findCandidates(status, true, e.first, newMask, *cmd); + findCandidates(status, active_nodes_mask, true, e.first, newMask, *cmd); break; } } @@ -780,7 +802,7 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status, if ( ! cmd ) { cmd = std::make_shared<api::ApplyBucketDiffCommand>(bucket.getBucket(), status.nodeList); cmd->setAddress(createAddress(_clusterName, status.nodeList[1].index)); - findCandidates(status, false, 0, 0, *cmd); + findCandidates(status, active_nodes_mask, false, 0, 0, *cmd); } cmd->setPriority(status.context.getPriority()); cmd->setTimeout(status.timeout); @@ -868,6 +890,7 @@ MergeHandler::handleMergeBucket(api::MergeBucketCommand& cmd, MessageTracker::UP _clock, cmd.getLoadType(), cmd.getPriority(), cmd.getTrace().getLevel()); _env._fileStorHandler.addMergeStatus(bucket.getBucket(), s); + s->full_node_list = cmd.getNodes(); s->nodeList = cmd.getNodes(); s->maxTimestamp = Timestamp(cmd.getMaxTimestamp()); s->timeout = cmd.getTimeout(); |