diff options
32 files changed, 384 insertions, 255 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index 9afb1819025..f04856ffd29 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -75,6 +75,7 @@ add_subdirectory(jdisc_http_service) add_subdirectory(jdisc_jetty) add_subdirectory(jrt_test) add_subdirectory(juniper) +add_subdirectory(linguistics) add_subdirectory(logd) add_subdirectory(logserver) add_subdirectory(logforwarder) diff --git a/bundle-plugin-test/pom.xml b/bundle-plugin-test/pom.xml index b27f6edc5f8..67e869a841d 100644 --- a/bundle-plugin-test/pom.xml +++ b/bundle-plugin-test/pom.xml @@ -19,4 +19,10 @@ <module>integration-test</module> <module>test-bundles</module> </modules> + + <properties> + <!-- This project only builds test artifacts --> + <maven.deploy.skip>true</maven.deploy.skip> + </properties> + </project> diff --git a/config-provisioning/CMakeLists.txt b/config-provisioning/CMakeLists.txt index 829ba87fab8..73101f8a7a6 100644 --- a/config-provisioning/CMakeLists.txt +++ b/config-provisioning/CMakeLists.txt @@ -1,3 +1,4 @@ # Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. install_fat_java_artifact(config-provisioning) install_config_definition(src/main/resources/configdefinitions/flavors.def config.provisioning.flavors.def) +install_config_definition(src/main/resources/configdefinitions/node-repository.def config.provisioning.node-repository.def) diff --git a/container-core/CMakeLists.txt b/container-core/CMakeLists.txt index 43225e38aee..341155457a8 100644 --- a/container-core/CMakeLists.txt +++ b/container-core/CMakeLists.txt @@ -4,7 +4,9 @@ install_config_definition(src/main/resources/configdefinitions/container-documen install_config_definition(src/main/resources/configdefinitions/container-http.def container.core.container-http.def) install_config_definition(src/main/resources/configdefinitions/health-monitor.def container.jdisc.config.health-monitor.def) install_config_definition(src/main/resources/configdefinitions/http-filter.def container.core.http.http-filter.def) +install_config_definition(src/main/resources/configdefinitions/identity.def container.core.identity.identity.def) install_config_definition(src/main/resources/configdefinitions/log-handler.def container.core.log-handler.def) +install_config_definition(src/main/resources/configdefinitions/metrics-packets-handler.def container.jdisc.state.metrics-packets-handler.def) install_config_definition(src/main/resources/configdefinitions/metrics-presentation.def metrics.metrics-presentation.def) install_config_definition(src/main/resources/configdefinitions/mockservice.def container.handler.test.mockservice.def) install_config_definition(src/main/resources/configdefinitions/qr-searchers.def container.qr-searchers.def) diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/RoutingPolicy.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/RoutingPolicy.java index a86bbaa317e..e55bbaf6acc 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/RoutingPolicy.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/RoutingPolicy.java @@ -27,16 +27,18 @@ public class RoutingPolicy { private final HostName canonicalName; private final Optional<String> dnsZone; private final Set<EndpointId> endpoints; + private final boolean active; /** DO NOT USE. Public for serialization purposes */ public RoutingPolicy(ApplicationId owner, ClusterSpec.Id cluster, ZoneId zone, HostName canonicalName, - Optional<String> dnsZone, Set<EndpointId> endpoints) { + Optional<String> dnsZone, Set<EndpointId> endpoints, boolean active) { this.owner = Objects.requireNonNull(owner, "owner must be non-null"); this.cluster = Objects.requireNonNull(cluster, "cluster must be non-null"); this.zone = Objects.requireNonNull(zone, "zone must be non-null"); this.canonicalName = Objects.requireNonNull(canonicalName, "canonicalName must be non-null"); this.dnsZone = Objects.requireNonNull(dnsZone, "dnsZone must be non-null"); this.endpoints = ImmutableSortedSet.copyOf(Objects.requireNonNull(endpoints, "endpoints must be non-null")); + this.active = active; } /** The application owning this */ @@ -69,6 +71,11 @@ public class RoutingPolicy { return endpoints; } + /** Returns whether this is active (the underlying load balancer is in an active state) */ + public boolean active() { + return active; + } + /** Returns the endpoint of this */ public Endpoint endpointIn(SystemName system) { return Endpoint.of(owner).target(cluster, zone).on(Port.tls()).directRouting().in(system); diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/RoutingPolicies.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/RoutingPolicies.java index 12108b9d29a..59394b955b8 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/RoutingPolicies.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/RoutingPolicies.java @@ -127,7 +127,7 @@ public class RoutingPolicies { private RoutingPolicy createPolicy(ApplicationId application, ZoneId zone, LoadBalancer loadBalancer, Set<EndpointId> endpointIds) { var routingPolicy = new RoutingPolicy(application, loadBalancer.cluster(), zone, loadBalancer.hostname(), - loadBalancer.dnsZone(), endpointIds); + loadBalancer.dnsZone(), endpointIds, isActive(loadBalancer)); var name = RecordName.from(routingPolicy.endpointIn(controller.system()).dnsName()); var data = RecordData.fqdn(loadBalancer.hostname().value()); controller.nameServiceForwarder().createCname(name, data, Priority.normal); @@ -197,6 +197,15 @@ public class RoutingPolicies { return routingTable; } + private static boolean isActive(LoadBalancer loadBalancer) { + switch (loadBalancer.state()) { + case reserved: // Count reserved as active as we want callers (application API) to see the endpoint as early + // as possible + case active: return true; + } + return false; + } + /** Load balancers allocated to a deployment */ private static class AllocatedLoadBalancers { @@ -209,9 +218,7 @@ public class RoutingPolicies { DeploymentSpec deploymentSpec) { this.application = application; this.zone = zone; - this.list = loadBalancers.stream() - .filter(AllocatedLoadBalancers::shouldUpdatePolicy) - .collect(Collectors.toUnmodifiableList()); + this.list = List.copyOf(loadBalancers); this.deploymentSpec = deploymentSpec; } @@ -232,17 +239,6 @@ public class RoutingPolicies { .collect(Collectors.toSet()); } - private static boolean shouldUpdatePolicy(LoadBalancer loadBalancer) { - switch (loadBalancer.state()) { - case active: - case reserved: // This allows DNS updates to happen early, while an application is being prepared. - return true; - } - // Any other state, such as inactive, is ignored. - LOGGER.log(LogLevel.WARNING, "Ignoring load balancer " + loadBalancer.hostname() + " in state " + loadBalancer.state()); - return false; - } - } } diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/persistence/RoutingPolicySerializer.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/persistence/RoutingPolicySerializer.java index 890fa31bc4d..54a3ef7551a 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/persistence/RoutingPolicySerializer.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/persistence/RoutingPolicySerializer.java @@ -35,6 +35,7 @@ public class RoutingPolicySerializer { private static final String zoneField = "zone"; private static final String dnsZoneField = "dnsZone"; private static final String rotationsField = "rotations"; + private static final String activeField = "active"; public Slime toSlime(Set<RoutingPolicy> routingPolicies) { var slime = new Slime(); @@ -50,6 +51,7 @@ public class RoutingPolicySerializer { policy.endpoints().forEach(endpointId -> { rotationArray.addString(endpointId.id()); }); + policyObject.setBool(activeField, policy.active()); }); return slime; } @@ -61,12 +63,15 @@ public class RoutingPolicySerializer { field.traverse((ArrayTraverser) (i, inspect) -> { var endpointIds = new LinkedHashSet<EndpointId>(); inspect.field(rotationsField).traverse((ArrayTraverser) (j, endpointId) -> endpointIds.add(EndpointId.of(endpointId.asString()))); + var activeFieldInspector = inspect.field(activeField); + // TODO(mpolden): Remove field presence check after January 2020 + boolean active = !activeFieldInspector.valid() || activeFieldInspector.asBool(); policies.add(new RoutingPolicy(owner, ClusterSpec.Id.from(inspect.field(clusterField).asString()), ZoneId.from(inspect.field(zoneField).asString()), HostName.from(inspect.field(canonicalNameField).asString()), Serializers.optionalString(inspect.field(dnsZoneField)), - endpointIds)); + endpointIds, active)); }); return Collections.unmodifiableSet(policies); } diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiHandler.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiHandler.java index 1901cd6bb37..114df26015f 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiHandler.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiHandler.java @@ -909,6 +909,7 @@ public class ApplicationApiHandler extends LoggingRequestHandler { // Per-cluster rotations Set<RoutingPolicy> routingPolicies = controller.applications().routingPolicies().get(instance.id()); for (RoutingPolicy policy : routingPolicies) { + if (!policy.active()) continue; policy.rotationEndpointsIn(controller.system()).asList().stream() .map(Endpoint::url) .map(URI::toString) @@ -1009,6 +1010,7 @@ public class ApplicationApiHandler extends LoggingRequestHandler { // Add endpoint(s) defined by routing policies var endpointArray = response.setArray("endpoints"); for (var policy : controller.applications().routingPolicies().get(deploymentId)) { + if (!policy.active()) continue; Cursor endpointObject = endpointArray.addObject(); Endpoint endpoint = policy.endpointIn(controller.system()); endpointObject.setString("cluster", policy.cluster().value()); diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/InternalStepRunnerTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/InternalStepRunnerTest.java index 9ca50e7f22a..0340cb25d6f 100644 --- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/InternalStepRunnerTest.java +++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/InternalStepRunnerTest.java @@ -221,13 +221,13 @@ public class InternalStepRunnerTest { JobType.systemTest.zone(system()), HostName.from("host"), Optional.empty(), - emptySet()))); + emptySet(), true))); tester.controller().curator().writeRoutingPolicies(app.testerId().id(), Set.of(new RoutingPolicy(app.testerId().id(), ClusterSpec.Id.from("default"), JobType.systemTest.zone(system()), HostName.from("host"), Optional.empty(), - emptySet()))); + emptySet(), true))); tester.runner().run();; assertEquals(succeeded, tester.jobs().last(app.instanceId(), JobType.systemTest).get().steps().get(Step.installReal)); assertEquals(succeeded, tester.jobs().last(app.instanceId(), JobType.systemTest).get().steps().get(Step.installTester)); diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/RoutingPoliciesTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/RoutingPoliciesTest.java index 940e4efdeed..f78873bae09 100644 --- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/RoutingPoliciesTest.java +++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/RoutingPoliciesTest.java @@ -320,14 +320,6 @@ public class RoutingPoliciesTest { LoadBalancer.State.active, Optional.of("dns-zone-1"))); } - // Add an inactive load balancers that should be ignored - loadBalancers.add(new LoadBalancer("inactive-LB-0-Z-" + zone.value(), - application, - ClusterSpec.Id.from("c0"), - HostName.from("lb-0--" + application.serializedForm() + - "--" + zone.value()), - LoadBalancer.State.inactive, - Optional.of("dns-zone-1"))); return loadBalancers; } diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/persistence/RoutingPolicySerializerTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/persistence/RoutingPolicySerializerTest.java index e99cc302ffe..23355bd6033 100644 --- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/persistence/RoutingPolicySerializerTest.java +++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/persistence/RoutingPolicySerializerTest.java @@ -6,6 +6,7 @@ import com.yahoo.config.provision.ApplicationId; import com.yahoo.config.provision.ClusterSpec; import com.yahoo.config.provision.HostName; import com.yahoo.config.provision.zone.ZoneId; +import com.yahoo.vespa.config.SlimeUtils; import com.yahoo.vespa.hosted.controller.application.EndpointId; import com.yahoo.vespa.hosted.controller.application.RoutingPolicy; import org.junit.Test; @@ -15,6 +16,7 @@ import java.util.Optional; import java.util.Set; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; /** * @author mortent @@ -24,7 +26,7 @@ public class RoutingPolicySerializerTest { private final RoutingPolicySerializer serializer = new RoutingPolicySerializer(); @Test - public void test_serialization() { + public void serialization() { var owner = ApplicationId.defaultId(); var endpoints = Set.of(EndpointId.of("r1"), EndpointId.of("r2")); var policies = ImmutableSet.of(new RoutingPolicy(owner, @@ -32,13 +34,13 @@ public class RoutingPolicySerializerTest { ZoneId.from("prod", "us-north-1"), HostName.from("long-and-ugly-name"), Optional.of("zone1"), - endpoints), + endpoints, true), new RoutingPolicy(owner, ClusterSpec.Id.from("my-cluster2"), ZoneId.from("prod", "us-north-2"), HostName.from("long-and-ugly-name-2"), Optional.empty(), - endpoints)); + endpoints, false)); var serialized = serializer.fromSlime(owner, serializer.toSlime(policies)); assertEquals(policies.size(), serialized.size()); for (Iterator<RoutingPolicy> it1 = policies.iterator(), it2 = serialized.iterator(); it1.hasNext();) { @@ -50,7 +52,18 @@ public class RoutingPolicySerializerTest { assertEquals(expected.canonicalName(), actual.canonicalName()); assertEquals(expected.dnsZone(), actual.dnsZone()); assertEquals(expected.endpoints(), actual.endpoints()); + assertEquals(expected.active(), actual.active()); } } + @Test + public void legacy_serialization() { + var json = "{\"routingPolicies\":[{\"cluster\":\"default\",\"zone\":\"prod.us-north-1\"," + + "\"canonicalName\":\"lb-0\"," + + "\"dnsZone\":\"dns-zone-id\",\"rotations\":[]}]}"; + var serialized = serializer.fromSlime(ApplicationId.defaultId(), SlimeUtils.jsonToSlime(json)); + assertTrue(serialized.iterator().next().active()); + + } + } diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiTest.java index 27305f8956f..eba619253b9 100644 --- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiTest.java +++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiTest.java @@ -1430,12 +1430,18 @@ public class ApplicationApiTest extends ControllerContainerTest { .region("us-west-1") .build(); app.submit(applicationPackage).deploy(); - RoutingPolicy policy = new RoutingPolicy(app.instanceId(), + Set<RoutingPolicy> policies = Set.of(new RoutingPolicy(app.instanceId(), ClusterSpec.Id.from("default"), ZoneId.from(Environment.prod, RegionName.from("us-west-1")), HostName.from("lb-0-canonical-name"), - Optional.of("dns-zone-1"), Set.of(EndpointId.of("c0"))); - tester.controller().curator().writeRoutingPolicies(app.instanceId(), Set.of(policy)); + Optional.of("dns-zone-1"), Set.of(EndpointId.of("c0")), true), + // Inactive policy is not included + new RoutingPolicy(app.instanceId(), + ClusterSpec.Id.from("deleted-cluster"), + ZoneId.from(Environment.prod, RegionName.from("us-west-1")), + HostName.from("lb-1-canonical-name"), + Optional.of("dns-zone-1"), Set.of(), false)); + tester.controller().curator().writeRoutingPolicies(app.instanceId(), policies); // GET application tester.assertResponse(request("/application/v4/tenant/tenant1/application/application1/instance/instance1", GET) diff --git a/jdisc-security-filters/CMakeLists.txt b/jdisc-security-filters/CMakeLists.txt index 9c5b35941bb..fd691ae84ca 100644 --- a/jdisc-security-filters/CMakeLists.txt +++ b/jdisc-security-filters/CMakeLists.txt @@ -1,4 +1,4 @@ # Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. install_fat_java_artifact(jdisc-security-filters) -install_config_definition(src/main/resources/configdefinitions/cors-filter.def) +install_config_definition(src/main/resources/configdefinitions/cors-filter.def jdisc.http.filter.security.cors.cors-filter.def) diff --git a/jdisc_core_test/pom.xml b/jdisc_core_test/pom.xml index 6fa2dd27b83..e5cde58caa1 100644 --- a/jdisc_core_test/pom.xml +++ b/jdisc_core_test/pom.xml @@ -20,4 +20,10 @@ <module>integration_test</module> <module>test_bundles</module> </modules> + + <properties> + <!-- This project only builds test artifacts --> + <maven.deploy.skip>true</maven.deploy.skip> + </properties> + </project> diff --git a/linguistics/CMakeLists.txt b/linguistics/CMakeLists.txt new file mode 100644 index 00000000000..44f645f2b57 --- /dev/null +++ b/linguistics/CMakeLists.txt @@ -0,0 +1,2 @@ +# Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +install_config_definition(src/main/resources/configdefinitions/opennlp-linguistics.def language.opennlp.opennlp-linguistics.def) diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/ApplicationMaintainer.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/ApplicationMaintainer.java index c1a05a3c32d..e9e09781e31 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/ApplicationMaintainer.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/ApplicationMaintainer.java @@ -4,17 +4,11 @@ package com.yahoo.vespa.hosted.provision.maintenance; import com.yahoo.concurrent.DaemonThreadFactory; import com.yahoo.config.provision.ApplicationId; import com.yahoo.config.provision.Deployer; -import com.yahoo.config.provision.Deployment; -import com.yahoo.config.provision.TransientException; import com.yahoo.log.LogLevel; -import com.yahoo.transaction.Mutex; -import com.yahoo.vespa.hosted.provision.Node; import com.yahoo.vespa.hosted.provision.NodeRepository; -import com.yahoo.yolean.Exceptions; import java.time.Duration; import java.time.Instant; -import java.util.Optional; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.LinkedBlockingQueue; @@ -79,21 +73,10 @@ public abstract class ApplicationMaintainer extends Maintainer { /** Redeploy this application. A lock will be taken for the duration of the deployment activation */ protected final void deployWithLock(ApplicationId application) { - // An application might change its state between the time the set of applications is retrieved and the - // time deployment happens. Lock the application and check if it's still active. - // - // Lock is acquired with a low timeout to reduce the chance of colliding with an external deployment. - try (Mutex lock = nodeRepository().lock(application, Duration.ofSeconds(1))) { - if ( ! isActive(application)) return; // became inactive since deployment was requested + try (MaintenanceDeployment deployment = new MaintenanceDeployment(application, deployer, nodeRepository())) { + if ( ! deployment.isValid()) return; // this will be done at another config server if ( ! canDeployNow(application)) return; // redeployment is no longer needed - Optional<Deployment> deployment = deployer.deployFromLocalActive(application); - if ( ! deployment.isPresent()) return; // this will be done at another config server - log.log(LogLevel.DEBUG, this.getClass().getSimpleName() + " deploying " + application); - deployment.get().activate(); - } catch (TransientException e) { - log.log(LogLevel.INFO, "Failed to redeploy " + application + " with a transient error: " + Exceptions.toMessageString(e)); - } catch (RuntimeException e) { - log.log(LogLevel.WARNING, "Exception on maintenance redeploy", e); + deployment.activate(); } finally { pendingDeployments.remove(application); } @@ -104,11 +87,6 @@ public abstract class ApplicationMaintainer extends Maintainer { return deployer.lastDeployTime(application).orElse(Instant.EPOCH); } - /** Returns true when application has at least one active node */ - private boolean isActive(ApplicationId application) { - return ! nodeRepository().getNodes(application, Node.State.active).isEmpty(); - } - @Override public void deconstruct() { super.deconstruct(); diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/MaintenanceDeployment.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/MaintenanceDeployment.java new file mode 100644 index 00000000000..d25ef969c6b --- /dev/null +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/MaintenanceDeployment.java @@ -0,0 +1,96 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.hosted.provision.maintenance; + +import com.yahoo.config.provision.ApplicationId; +import com.yahoo.config.provision.ApplicationLockException; +import com.yahoo.config.provision.Deployer; +import com.yahoo.config.provision.Deployment; +import com.yahoo.config.provision.TransientException; +import com.yahoo.log.LogLevel; +import com.yahoo.transaction.Mutex; +import com.yahoo.vespa.hosted.provision.Node; +import com.yahoo.vespa.hosted.provision.NodeRepository; +import com.yahoo.yolean.Exceptions; + +import java.io.Closeable; +import java.time.Duration; +import java.util.Optional; +import java.util.logging.Logger; + +/** + * A wrapper of a deployment suitable for maintenance. + * This is a single-use, single-thread object. + * + * @author bratseth + */ +class MaintenanceDeployment implements Closeable { + + private static final Logger log = Logger.getLogger(MaintenanceDeployment.class.getName()); + + private final ApplicationId application; + private final Optional<Mutex> lock; + private final Optional<Deployment> deployment; + + private boolean closed = false; + + public MaintenanceDeployment(ApplicationId application, Deployer deployer, NodeRepository nodeRepository) { + this.application = application; + lock = tryLock(application, nodeRepository); + deployment = tryDeployment(lock, application, deployer, nodeRepository); + } + + /** Return whether this is - as yet - functional and can be used to carry out the deployment */ + public boolean isValid() { + return deployment.isPresent(); + } + + public boolean prepare() { + return doStep(() -> deployment.get().prepare()); + } + + public boolean activate() { + return doStep(() -> deployment.get().activate()); + } + + private boolean doStep(Runnable action) { + if (closed) throw new IllegalStateException("Deployment of '" + application + "' is closed"); + if ( ! isValid()) return false; + try { + action.run(); + return true; + } catch (TransientException e) { + log.log(LogLevel.INFO, "Failed to maintenance deploy " + application + " with a transient error: " + + Exceptions.toMessageString(e)); + return false; + } catch (RuntimeException e) { + log.log(LogLevel.WARNING, "Exception on maintenance deploy of " + application, e); + return false; + } + } + + private Optional<Mutex> tryLock(ApplicationId application, NodeRepository nodeRepository) { + try { + // Use a short lock to avoid interfering with change deployments + return Optional.of(nodeRepository.lock(application, Duration.ofSeconds(1))); + } + catch (ApplicationLockException e) { + return Optional.empty(); + } + } + + private Optional<Deployment> tryDeployment(Optional<Mutex> lock, + ApplicationId application, + Deployer deployer, + NodeRepository nodeRepository) { + if (lock.isEmpty()) return Optional.empty(); + if (nodeRepository.getNodes(application, Node.State.active).isEmpty()) return Optional.empty(); + return deployer.deployFromLocalActive(application); + } + + @Override + public void close() { + lock.ifPresent(l -> l.close()); + closed = true; + } + +} diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/Rebalancer.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/Rebalancer.java index cc3b7def389..d7dd93522e4 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/Rebalancer.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/Rebalancer.java @@ -2,14 +2,10 @@ package com.yahoo.vespa.hosted.provision.maintenance; import com.yahoo.config.provision.ApplicationId; -import com.yahoo.config.provision.ApplicationLockException; import com.yahoo.config.provision.Deployer; -import com.yahoo.config.provision.Deployment; import com.yahoo.config.provision.NodeResources; import com.yahoo.config.provision.NodeType; -import com.yahoo.config.provision.TransientException; import com.yahoo.jdisc.Metric; -import com.yahoo.log.LogLevel; import com.yahoo.transaction.Mutex; import com.yahoo.vespa.hosted.provision.Node; import com.yahoo.vespa.hosted.provision.NodeList; @@ -19,13 +15,10 @@ import com.yahoo.vespa.hosted.provision.provisioning.DockerHostCapacity; import com.yahoo.vespa.hosted.provision.provisioning.HostProvisioner; import com.yahoo.vespa.hosted.provision.provisioning.HostResourcesCalculator; import com.yahoo.vespa.hosted.provision.provisioning.NodePrioritizer; -import com.yahoo.yolean.Exceptions; -import java.io.Closeable; import java.time.Clock; import java.time.Duration; import java.util.Optional; -import java.util.logging.Logger; /** * @author bratseth @@ -203,72 +196,4 @@ public class Rebalancer extends Maintainer { } - private static class MaintenanceDeployment implements Closeable { - - private static final Logger log = Logger.getLogger(MaintenanceDeployment.class.getName()); - - private final ApplicationId application; - private final Optional<Mutex> lock; - private final Optional<Deployment> deployment; - - public MaintenanceDeployment(ApplicationId application, Deployer deployer, NodeRepository nodeRepository) { - this.application = application; - lock = tryLock(application, nodeRepository); - deployment = tryDeployment(lock, application, deployer, nodeRepository); - } - - /** Return whether this is - as yet - functional and can be used to carry out the deployment */ - public boolean isValid() { - return deployment.isPresent(); - } - - private Optional<Mutex> tryLock(ApplicationId application, NodeRepository nodeRepository) { - try { - // Use a short lock to avoid interfering with change deployments - return Optional.of(nodeRepository.lock(application, Duration.ofSeconds(1))); - } - catch (ApplicationLockException e) { - return Optional.empty(); - } - } - - private Optional<Deployment> tryDeployment(Optional<Mutex> lock, - ApplicationId application, - Deployer deployer, - NodeRepository nodeRepository) { - if (lock.isEmpty()) return Optional.empty(); - if (nodeRepository.getNodes(application, Node.State.active).isEmpty()) return Optional.empty(); - return deployer.deployFromLocalActive(application); - } - - public boolean prepare() { - return doStep(() -> deployment.get().prepare()); - } - - public boolean activate() { - return doStep(() -> deployment.get().activate()); - } - - private boolean doStep(Runnable action) { - if ( ! isValid()) return false; - try { - action.run(); - return true; - } catch (TransientException e) { - log.log(LogLevel.INFO, "Failed to deploy " + application + " with a transient error: " + - Exceptions.toMessageString(e)); - return false; - } catch (RuntimeException e) { - log.log(LogLevel.WARNING, "Exception on maintenance deploy of " + application, e); - return false; - } - } - - @Override - public void close() { - lock.ifPresent(l -> l.close()); - } - - } - } diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/RetiredExpirer.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/RetiredExpirer.java index acf742842c8..1d31917b3e1 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/RetiredExpirer.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/RetiredExpirer.java @@ -4,23 +4,17 @@ package com.yahoo.vespa.hosted.provision.maintenance; import com.google.common.util.concurrent.UncheckedTimeoutException; import com.yahoo.config.provision.ApplicationId; import com.yahoo.config.provision.Deployer; -import com.yahoo.config.provision.Deployment; -import com.yahoo.config.provision.TransientException; -import com.yahoo.log.LogLevel; import com.yahoo.vespa.applicationmodel.HostName; import com.yahoo.vespa.hosted.provision.Node; import com.yahoo.vespa.hosted.provision.NodeRepository; import com.yahoo.vespa.hosted.provision.node.History; import com.yahoo.vespa.orchestrator.OrchestrationException; import com.yahoo.vespa.orchestrator.Orchestrator; -import com.yahoo.yolean.Exceptions; import java.time.Clock; import java.time.Duration; import java.util.List; import java.util.Map; -import java.util.Optional; -import java.util.logging.Level; import java.util.stream.Collectors; /** @@ -62,28 +56,18 @@ public class RetiredExpirer extends Maintainer { ApplicationId application = entry.getKey(); List<Node> retiredNodes = entry.getValue(); - try { - Optional<Deployment> deployment = deployer.deployFromLocalActive(application); - if ( ! deployment.isPresent()) continue; // this will be done at another config server + try (MaintenanceDeployment deployment = new MaintenanceDeployment(application, deployer, nodeRepository())) { + if ( ! deployment.isValid()) continue; // this will be done at another config server List<Node> nodesToRemove = retiredNodes.stream().filter(this::canRemove).collect(Collectors.toList()); - if (nodesToRemove.isEmpty()) { - continue; - } + if (nodesToRemove.isEmpty()) continue; nodeRepository().setRemovable(application, nodesToRemove); - deployment.get().activate(); - + boolean success = deployment.activate(); + if ( ! success) return; String nodeList = nodesToRemove.stream().map(Node::hostname).collect(Collectors.joining(", ")); log.info("Redeployed " + application + " to deactivate retired nodes: " + nodeList); - } catch (TransientException e) { - log.log(LogLevel.INFO, "Failed to redeploy " + application + - " with a transient error, will be retried by application maintainer: " + Exceptions.toMessageString(e)); - } catch (RuntimeException e) { - String nodeList = retiredNodes.stream().map(Node::hostname).collect(Collectors.joining(", ")); - log.log(Level.WARNING, "Exception trying to deactivate retired nodes from " + application - + ": " + nodeList, e); } } } diff --git a/searchlib/src/tests/queryeval/nearest_neighbor/nearest_neighbor_test.cpp b/searchlib/src/tests/queryeval/nearest_neighbor/nearest_neighbor_test.cpp index 6a96b7720b1..25ff459c005 100644 --- a/searchlib/src/tests/queryeval/nearest_neighbor/nearest_neighbor_test.cpp +++ b/searchlib/src/tests/queryeval/nearest_neighbor/nearest_neighbor_test.cpp @@ -100,7 +100,7 @@ SimpleResult find_matches(Fixture &env, const DenseTensorView &qtv) { auto &tfmd = *(md->resolveTermField(0)); auto &attr = *(env._tensorAttr); NearestNeighborDistanceHeap dh(2); - auto search = NearestNeighborIteratorFactory::createIterator(strict, tfmd, qtv, attr, dh); + auto search = NearestNeighborIterator::create(strict, tfmd, qtv, attr, dh); if (strict) { return SimpleResult().searchStrict(*search, attr.getNumDocs()); } else { @@ -137,7 +137,7 @@ std::vector<feature_t> get_rawscores(Fixture &env, const DenseTensorView &qtv) { auto &tfmd = *(md->resolveTermField(0)); auto &attr = *(env._tensorAttr); NearestNeighborDistanceHeap dh(2); - auto search = NearestNeighborIteratorFactory::createIterator(strict, tfmd, qtv, attr, dh); + auto search = NearestNeighborIterator::create(strict, tfmd, qtv, attr, dh); uint32_t limit = attr.getNumDocs(); uint32_t docid = 1; search->initRange(docid, limit); diff --git a/searchlib/src/vespa/searchlib/queryeval/nearest_neighbor_blueprint.cpp b/searchlib/src/vespa/searchlib/queryeval/nearest_neighbor_blueprint.cpp index 572707323c9..6a844a6bec0 100644 --- a/searchlib/src/vespa/searchlib/queryeval/nearest_neighbor_blueprint.cpp +++ b/searchlib/src/vespa/searchlib/queryeval/nearest_neighbor_blueprint.cpp @@ -30,7 +30,7 @@ NearestNeighborBlueprint::createLeafSearch(const search::fef::TermFieldMatchData fef::TermFieldMatchData &tfmd = *tfmda[0]; // always search in only one field const vespalib::tensor::DenseTensorView &qT = *_query_tensor; - return NearestNeighborIteratorFactory::createIterator(strict, tfmd, qT, _attr_tensor, _distance_heap); + return NearestNeighborIterator::create(strict, tfmd, qT, _attr_tensor, _distance_heap); } void diff --git a/searchlib/src/vespa/searchlib/queryeval/nearest_neighbor_iterator.cpp b/searchlib/src/vespa/searchlib/queryeval/nearest_neighbor_iterator.cpp index d17ed024fce..4617bb0e374 100644 --- a/searchlib/src/vespa/searchlib/queryeval/nearest_neighbor_iterator.cpp +++ b/searchlib/src/vespa/searchlib/queryeval/nearest_neighbor_iterator.cpp @@ -2,29 +2,13 @@ #include "nearest_neighbor_iterator.h" +using search::tensor::DenseTensorAttribute; using vespalib::ConstArrayRef; +using vespalib::tensor::DenseTensorView; +using vespalib::tensor::MutableDenseTensorView; using vespalib::tensor::TypedCells; -namespace { - -struct SumSquaredDiff -{ - template <typename LCT, typename RCT> - static double - call(const ConstArrayRef<LCT> &lhs, const ConstArrayRef<RCT> &rhs) - { - double sum = 0.0; - size_t sz = lhs.size(); - assert(sz == rhs.size()); - for (size_t i = 0; i < sz; ++i) { - double diff = lhs[i] - rhs[i]; - sum += diff*diff; - } - return sum; - } -}; - -} +using CellType = vespalib::eval::ValueType::CellType; namespace search::queryeval { @@ -34,32 +18,24 @@ namespace search::queryeval { * Keeps a heap of the K best hit distances. * Currently always does brute-force scanning, which is very expensive. **/ -template <bool strict> -class NearestNeighborIterator : public SearchIterator +template <bool strict, typename LCT, typename RCT> +class NearestNeighborImpl : public NearestNeighborIterator { public: - using DenseTensorView = vespalib::tensor::DenseTensorView; - using DenseTensorAttribute = search::tensor::DenseTensorAttribute; - using MutableDenseTensorView = vespalib::tensor::MutableDenseTensorView; - - NearestNeighborIterator(fef::TermFieldMatchData &tfmd, - const DenseTensorView &queryTensor, - const DenseTensorAttribute &tensorAttribute, - NearestNeighborDistanceHeap &distanceHeap) - : _tfmd(tfmd), - _queryTensor(queryTensor), - _tensorAttribute(tensorAttribute), - _fieldTensor(_tensorAttribute.getTensorType()), - _distanceHeap(distanceHeap), + + NearestNeighborImpl(Params params_in) + : NearestNeighborIterator(params_in), + _lhs(params().queryTensor.cellsRef().typify<LCT>()), + _fieldTensor(params().tensorAttribute.getTensorType()), _lastScore(0.0) { - assert(_fieldTensor.fast_type() == _queryTensor.fast_type()); + assert(_fieldTensor.fast_type() == params().queryTensor.fast_type()); } - ~NearestNeighborIterator(); + ~NearestNeighborImpl(); void doSeek(uint32_t docId) override { - double distanceLimit = _distanceHeap.distanceLimit(); + double distanceLimit = params().distanceHeap.distanceLimit(); while (__builtin_expect((docId < getEndId()), true)) { double d = computeDistance(docId); if (d <= distanceLimit) { @@ -77,53 +53,97 @@ public: } void doUnpack(uint32_t docId) override { - _tfmd.setRawScore(docId, sqrt(_lastScore)); - _distanceHeap.used(_lastScore); + params().tfmd.setRawScore(docId, sqrt(_lastScore)); + params().distanceHeap.used(_lastScore); } Trinary is_strict() const override { return strict ? Trinary::True : Trinary::False ; } private: - double computeDistance(uint32_t docId); - - fef::TermFieldMatchData &_tfmd; - const DenseTensorView &_queryTensor; - const DenseTensorAttribute &_tensorAttribute; - MutableDenseTensorView _fieldTensor; - NearestNeighborDistanceHeap &_distanceHeap; - double _lastScore; + static double computeSum(ConstArrayRef<LCT> lhs, ConstArrayRef<RCT> rhs) { + double sum = 0.0; + size_t sz = lhs.size(); + assert(sz == rhs.size()); + for (size_t i = 0; i < sz; ++i) { + double diff = lhs[i] - rhs[i]; + sum += diff*diff; + } + return sum; + } + + double computeDistance(uint32_t docId) { + params().tensorAttribute.getTensor(docId, _fieldTensor); + return computeSum(_lhs, _fieldTensor.cellsRef().typify<RCT>()); + } + + ConstArrayRef<LCT> _lhs; + MutableDenseTensorView _fieldTensor; + double _lastScore; }; -template <bool strict> -NearestNeighborIterator<strict>::~NearestNeighborIterator() = default; +template <bool strict, typename LCT, typename RCT> +NearestNeighborImpl<strict, LCT, RCT>::~NearestNeighborImpl() = default; -template <bool strict> -double -NearestNeighborIterator<strict>::computeDistance(uint32_t docId) +namespace { + +template<bool strict, typename LCT, typename RCT> +std::unique_ptr<NearestNeighborIterator> +create_impl(const NearestNeighborIterator::Params ¶ms) { - _tensorAttribute.getTensor(docId, _fieldTensor); - TypedCells lhsCells = _queryTensor.cellsRef(); - TypedCells rhsCells = _fieldTensor.cellsRef(); - return vespalib::tensor::dispatch_2<SumSquaredDiff>(lhsCells, rhsCells); + using NNI = NearestNeighborImpl<strict, LCT, RCT>; + return std::make_unique<NNI>(params); } +template<bool strict, typename LCT> +std::unique_ptr<NearestNeighborIterator> +resolve_RCT(const NearestNeighborIterator::Params ¶ms) +{ + CellType ct = params.tensorAttribute.getTensorType().cell_type(); + if (ct == CellType::FLOAT) { + return create_impl<strict, LCT, float>(params); + } + if (ct == CellType::DOUBLE) { + return create_impl<strict, LCT, double>(params); + } + abort(); +} -std::unique_ptr<SearchIterator> -NearestNeighborIteratorFactory::createIterator( +template<bool strict> +std::unique_ptr<NearestNeighborIterator> +resolve_LCT_RCT(const NearestNeighborIterator::Params ¶ms) +{ + CellType ct = params.queryTensor.fast_type().cell_type(); + if (ct == CellType::FLOAT) { + return resolve_RCT<strict, float>(params); + } + if (ct == CellType::DOUBLE) { + return resolve_RCT<strict, double>(params); + } + abort(); +} + +std::unique_ptr<NearestNeighborIterator> +resolve_strict_LCT_RCT(bool strict, const NearestNeighborIterator::Params ¶ms) +{ + if (strict) { + return resolve_LCT_RCT<true>(params); + } else { + return resolve_LCT_RCT<false>(params); + } +} + +} // namespace <unnamed> + +std::unique_ptr<NearestNeighborIterator> +NearestNeighborIterator::create( bool strict, fef::TermFieldMatchData &tfmd, const vespalib::tensor::DenseTensorView &queryTensor, const search::tensor::DenseTensorAttribute &tensorAttribute, NearestNeighborDistanceHeap &distanceHeap) { - using StrictNN = NearestNeighborIterator<true>; - using UnStrict = NearestNeighborIterator<false>; - - if (strict) { - return std::make_unique<StrictNN>(tfmd, queryTensor, tensorAttribute, distanceHeap); - } else { - return std::make_unique<UnStrict>(tfmd, queryTensor, tensorAttribute, distanceHeap); - } + Params params(tfmd, queryTensor, tensorAttribute, distanceHeap); + return resolve_strict_LCT_RCT(strict, params); } } // namespace diff --git a/searchlib/src/vespa/searchlib/queryeval/nearest_neighbor_iterator.h b/searchlib/src/vespa/searchlib/queryeval/nearest_neighbor_iterator.h index 140e92ad37d..34eb547fe39 100644 --- a/searchlib/src/vespa/searchlib/queryeval/nearest_neighbor_iterator.h +++ b/searchlib/src/vespa/searchlib/queryeval/nearest_neighbor_iterator.h @@ -13,15 +13,43 @@ namespace search::queryeval { -class NearestNeighborIteratorFactory +class NearestNeighborIterator : public SearchIterator { public: - static std::unique_ptr<SearchIterator> createIterator( - bool strict, - fef::TermFieldMatchData &tfmd, - const vespalib::tensor::DenseTensorView &queryTensor, - const search::tensor::DenseTensorAttribute &tensorAttribute, - NearestNeighborDistanceHeap &distanceHeap); + using DenseTensorAttribute = search::tensor::DenseTensorAttribute; + using DenseTensorView = vespalib::tensor::DenseTensorView; + + struct Params { + fef::TermFieldMatchData &tfmd; + const DenseTensorView &queryTensor; + const DenseTensorAttribute &tensorAttribute; + NearestNeighborDistanceHeap &distanceHeap; + + Params(fef::TermFieldMatchData &tfmd_in, + const DenseTensorView &queryTensor_in, + const DenseTensorAttribute &tensorAttribute_in, + NearestNeighborDistanceHeap &distanceHeap_in) + : tfmd(tfmd_in), + queryTensor(queryTensor_in), + tensorAttribute(tensorAttribute_in), + distanceHeap(distanceHeap_in) + {} + }; + + NearestNeighborIterator(Params params_in) + : _params(params_in) + {} + + static std::unique_ptr<NearestNeighborIterator> create( + bool strict, + fef::TermFieldMatchData &tfmd, + const vespalib::tensor::DenseTensorView &queryTensor, + const search::tensor::DenseTensorAttribute &tensorAttribute, + NearestNeighborDistanceHeap &distanceHeap); + + const Params& params() const { return _params; } +private: + Params _params; }; -} +} // namespace diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp index 94d33e50047..8fa8a6bcede 100644 --- a/storage/src/tests/distributor/distributortest.cpp +++ b/storage/src/tests/distributor/distributortest.cpp @@ -189,6 +189,8 @@ struct DistributorTest : Test, DistributorTestUtil { void configure_mutation_sequencing(bool enabled); void configure_merge_busy_inhibit_duration(int seconds); void do_test_pending_merge_getnodestate_reply_edge(BucketSpace space); + + void set_up_and_start_get_op_with_stale_reads_enabled(bool enabled); }; DistributorTest::DistributorTest() @@ -1025,14 +1027,18 @@ TEST_F(DistributorTest, concurrent_reads_not_enabled_if_btree_db_is_not_enabled) EXPECT_FALSE(getExternalOperationHandler().concurrent_gets_enabled()); } -TEST_F(DistributorTest, gets_are_started_outside_main_distributor_logic_if_btree_db_and_stale_reads_enabled) { +void DistributorTest::set_up_and_start_get_op_with_stale_reads_enabled(bool enabled) { createLinks(true); setupDistributor(Redundancy(1), NodeCount(1), "distributor:1 storage:1"); - configure_stale_reads_enabled(true); + configure_stale_reads_enabled(enabled); document::BucketId bucket(16, 1); addNodesToBucketDB(bucket, "0=1/1/1/t"); _distributor->onDown(make_dummy_get_command_for_bucket_1()); +} + +TEST_F(DistributorTest, gets_are_started_outside_main_distributor_logic_if_btree_db_and_stale_reads_enabled) { + set_up_and_start_get_op_with_stale_reads_enabled(true); ASSERT_THAT(_sender.commands(), SizeIs(1)); EXPECT_THAT(_sender.replies(), SizeIs(0)); @@ -1044,16 +1050,27 @@ TEST_F(DistributorTest, gets_are_started_outside_main_distributor_logic_if_btree } TEST_F(DistributorTest, gets_are_not_started_outside_main_distributor_logic_if_stale_reads_disabled) { - createLinks(true); - setupDistributor(Redundancy(1), NodeCount(1), "distributor:1 storage:1"); - configure_stale_reads_enabled(false); - - document::BucketId bucket(16, 1); - addNodesToBucketDB(bucket, "0=1/1/1/t"); - _distributor->onDown(make_dummy_get_command_for_bucket_1()); + set_up_and_start_get_op_with_stale_reads_enabled(false); // Get has been placed into distributor queue, so no external messages are produced. EXPECT_THAT(_sender.commands(), SizeIs(0)); EXPECT_THAT(_sender.replies(), SizeIs(0)); } +// There's no need or desire to track "lockfree" Gets in the main pending message tracker, +// as we only have to track mutations to inhibit maintenance ops safely. Furthermore, +// the message tracker is a multi-index and therefore has some runtime cost. +TEST_F(DistributorTest, gets_started_outside_main_thread_are_not_tracked_by_main_pending_message_tracker) { + set_up_and_start_get_op_with_stale_reads_enabled(true); + Bucket bucket(FixedBucketSpaces::default_space(), BucketId(16, 1)); + EXPECT_FALSE(_distributor->getPendingMessageTracker().hasPendingMessage( + 0, bucket, api::MessageType::GET_ID)); +} + +TEST_F(DistributorTest, closing_aborts_gets_started_outside_main_distributor_thread) { + set_up_and_start_get_op_with_stale_reads_enabled(true); + _distributor->close(); + ASSERT_EQ(1, _sender.replies().size()); + EXPECT_EQ(api::ReturnCode::ABORTED, _sender.reply(0)->getResult().getResult()); +} + } diff --git a/storage/src/tests/distributor/externaloperationhandlertest.cpp b/storage/src/tests/distributor/externaloperationhandlertest.cpp index 84f7d34d069..f2f949a6d56 100644 --- a/storage/src/tests/distributor/externaloperationhandlertest.cpp +++ b/storage/src/tests/distributor/externaloperationhandlertest.cpp @@ -532,7 +532,6 @@ TEST_F(ExternalOperationHandlerTest, gets_are_busy_bounced_during_transition_per makeGetCommandForUser(non_owned_bucket.withoutCountBits()))); EXPECT_EQ("ReturnCode(BUSY, Currently pending cluster state transition from version 123 to 321)", _sender.reply(0)->getResult().toString()); - } // TODO support sequencing of RemoveLocation? It's a mutating operation, but supporting it with diff --git a/storage/src/vespa/storage/common/messagesender.h b/storage/src/vespa/storage/common/messagesender.h index 659fccad412..2f24b750d66 100644 --- a/storage/src/vespa/storage/common/messagesender.h +++ b/storage/src/vespa/storage/common/messagesender.h @@ -27,7 +27,7 @@ namespace storage::api { namespace storage { struct MessageSender { - virtual ~MessageSender() {} + virtual ~MessageSender() = default; virtual void sendCommand(const std::shared_ptr<api::StorageCommand>&) = 0; virtual void sendReply(const std::shared_ptr<api::StorageReply>&) = 0; @@ -36,7 +36,7 @@ struct MessageSender { }; struct ChainedMessageSender { - virtual ~ChainedMessageSender() {} + virtual ~ChainedMessageSender() = default; virtual void sendUp(const std::shared_ptr<api::StorageMessage>&) = 0; virtual void sendDown(const std::shared_ptr<api::StorageMessage>&) = 0; }; diff --git a/storage/src/vespa/storage/common/storagelink.cpp b/storage/src/vespa/storage/common/storagelink.cpp index 97b357a3bf7..f73eb3ea36d 100644 --- a/storage/src/vespa/storage/common/storagelink.cpp +++ b/storage/src/vespa/storage/common/storagelink.cpp @@ -173,7 +173,7 @@ void StorageLink::sendUp(const shared_ptr<StorageMessage> & msg) ost << vespalib::getStackTrace(0); if (!msg->getType().isReply()) { LOGBP(warning, "%s", ost.str().c_str()); - StorageCommand& cmd = static_cast<StorageCommand&>(*msg); + auto& cmd = static_cast<StorageCommand&>(*msg); shared_ptr<StorageReply> reply(cmd.makeReply().release()); if (reply.get()) { diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp index d903db4f6ed..5c6773ea5bf 100644 --- a/storage/src/vespa/storage/distributor/distributor.cpp +++ b/storage/src/vespa/storage/distributor/distributor.cpp @@ -217,15 +217,13 @@ void Distributor::onClose() { LOG(debug, "Distributor::onClose invoked"); _bucketDBUpdater.flush(); + _externalOperationHandler.close_pending(); _operationOwner.onClose(); _maintenanceOperationOwner.onClose(); } -void -Distributor::sendUp(const std::shared_ptr<api::StorageMessage>& msg) -{ - _pendingMessageTracker.insert(msg); - if (_messageSender != 0) { +void Distributor::send_up_without_tracking(const std::shared_ptr<api::StorageMessage>& msg) { + if (_messageSender) { _messageSender->sendUp(msg); } else { StorageLink::sendUp(msg); @@ -233,9 +231,16 @@ Distributor::sendUp(const std::shared_ptr<api::StorageMessage>& msg) } void +Distributor::sendUp(const std::shared_ptr<api::StorageMessage>& msg) +{ + _pendingMessageTracker.insert(msg); + send_up_without_tracking(msg); +} + +void Distributor::sendDown(const std::shared_ptr<api::StorageMessage>& msg) { - if (_messageSender != 0) { + if (_messageSender) { _messageSender->sendDown(msg); } else { StorageLink::sendDown(msg); diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h index 9fd055023e2..ac6e306a4fb 100644 --- a/storage/src/vespa/storage/distributor/distributor.h +++ b/storage/src/vespa/storage/distributor/distributor.h @@ -55,13 +55,15 @@ public: HostInfo& hostInfoReporterRegistrar, ChainedMessageSender* = nullptr); - ~Distributor(); + ~Distributor() override; void onOpen() override; void onClose() override; bool onDown(const std::shared_ptr<api::StorageMessage>&) override; void sendUp(const std::shared_ptr<api::StorageMessage>&) override; void sendDown(const std::shared_ptr<api::StorageMessage>&) override; + // Bypasses message tracker component. Thread safe. + void send_up_without_tracking(const std::shared_ptr<api::StorageMessage>&); ChainedMessageSender& getMessageSender() override { return (_messageSender == 0 ? *this : *_messageSender); diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp index ecf02ad3c30..adeab7ba132 100644 --- a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp +++ b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp @@ -26,16 +26,42 @@ LOG_SETUP(".distributor.manager"); namespace storage::distributor { +class DirectDispatchSender : public DistributorMessageSender { + Distributor& _distributor; +public: + explicit DirectDispatchSender(Distributor& distributor) + : _distributor(distributor) + {} + ~DirectDispatchSender() override = default; + + void sendCommand(const std::shared_ptr<api::StorageCommand>& cmd) override { + _distributor.send_up_without_tracking(cmd); + } + void sendReply(const std::shared_ptr<api::StorageReply>& reply) override { + _distributor.send_up_without_tracking(reply); + } + int getDistributorIndex() const override { + return _distributor.getDistributorIndex(); // Thread safe + } + const std::string& getClusterName() const override { + return _distributor.getClusterName(); // Thread safe + } + const PendingMessageTracker& getPendingMessageTracker() const override { + abort(); // Never called by the messages using this component. + } +}; + ExternalOperationHandler::ExternalOperationHandler(Distributor& owner, DistributorBucketSpaceRepo& bucketSpaceRepo, DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo, const MaintenanceOperationGenerator& gen, DistributorComponentRegister& compReg) : DistributorComponent(owner, bucketSpaceRepo, readOnlyBucketSpaceRepo, compReg, "External operation handler"), + _direct_dispatch_sender(std::make_unique<DirectDispatchSender>(owner)), _operationGenerator(gen), _rejectFeedBeforeTimeReached(), // At epoch _non_main_thread_ops_mutex(), - _non_main_thread_ops_owner(owner, getClock()), + _non_main_thread_ops_owner(*_direct_dispatch_sender, getClock()), _concurrent_gets_enabled(false) { } @@ -51,6 +77,12 @@ ExternalOperationHandler::handleMessage(const std::shared_ptr<api::StorageMessag return retVal; } +void ExternalOperationHandler::close_pending() { + std::lock_guard g(_non_main_thread_ops_mutex); + // Make sure we drain any pending operations upon close. + _non_main_thread_ops_owner.onClose(); +} + api::ReturnCode ExternalOperationHandler::makeSafeTimeRejectionResult(TimePoint unsafeTime) { diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.h b/storage/src/vespa/storage/distributor/externaloperationhandler.h index 2c1a87267eb..e38f6792717 100644 --- a/storage/src/vespa/storage/distributor/externaloperationhandler.h +++ b/storage/src/vespa/storage/distributor/externaloperationhandler.h @@ -20,6 +20,7 @@ namespace distributor { class Distributor; class MaintenanceOperationGenerator; +class DirectDispatchSender; class ExternalOperationHandler : public DistributorComponent, public api::MessageHandler @@ -55,6 +56,8 @@ public: // Returns true iff message was handled and should not be processed further by the caller. bool try_handle_message_outside_main_thread(const std::shared_ptr<api::StorageMessage>& msg); + void close_pending(); + void set_concurrent_gets_enabled(bool enabled) noexcept { _concurrent_gets_enabled.store(enabled, std::memory_order_relaxed); } @@ -64,6 +67,7 @@ public: } private: + std::unique_ptr<DirectDispatchSender> _direct_dispatch_sender; const MaintenanceOperationGenerator& _operationGenerator; OperationSequencer _mutationSequencer; Operation::SP _op; diff --git a/vespa-testrunner-components/CMakeLists.txt b/vespa-testrunner-components/CMakeLists.txt index fe2cb84b7bb..35ddb17cbc6 100644 --- a/vespa-testrunner-components/CMakeLists.txt +++ b/vespa-testrunner-components/CMakeLists.txt @@ -1,3 +1,3 @@ install_java_artifact(vespa-testrunner-components) install_fat_java_artifact(vespa-testrunner-components) -install_config_definition(src/main/resources/configdefinitions/test-runner.def test-runner.def) +install_config_definition(src/main/resources/configdefinitions/test-runner.def com.yahoo.vespa.hosted.testrunner.test-runner.def) |