aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--clustercontroller-apps/src/main/java/com/yahoo/vespa/clustercontroller/apps/clustercontroller/ClusterController.java20
-rw-r--r--config-model-api/src/main/java/com/yahoo/config/application/api/Bcp.java122
-rw-r--r--config-model-api/src/main/java/com/yahoo/config/application/api/DeploymentInstanceSpec.java38
-rw-r--r--config-model-api/src/main/java/com/yahoo/config/application/api/DeploymentSpec.java18
-rw-r--r--config-model-api/src/main/java/com/yahoo/config/application/api/xml/DeploymentSpecXmlReader.java56
-rw-r--r--config-model-api/src/test/java/com/yahoo/config/application/api/DeploymentSpecTest.java20
-rw-r--r--config-model-api/src/test/java/com/yahoo/config/application/api/DeploymentSpecWithBcpTest.java254
-rw-r--r--config-model/src/main/java/com/yahoo/vespa/model/admin/monitoring/AutoscalingMetrics.java9
-rw-r--r--config-model/src/main/java/com/yahoo/vespa/model/admin/monitoring/VespaMetricSet.java125
-rw-r--r--container-core/src/main/java/com/yahoo/metrics/StorageMetrics.java59
-rw-r--r--container-core/src/main/java/com/yahoo/metrics/Unit.java2
-rw-r--r--container-core/src/main/java/com/yahoo/restapi/Path.java2
-rw-r--r--container-search/src/test/java/com/yahoo/search/yql/YqlParserTestCase.java5
-rw-r--r--controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/TrafficShareUpdater.java111
-rw-r--r--controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/DeploymentContext.java7
-rw-r--r--controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/TrafficShareUpdaterTest.java198
-rw-r--r--indexinglanguage/src/main/java/com/yahoo/vespa/indexinglanguage/expressions/EmbedExpression.java1
-rw-r--r--indexinglanguage/src/test/java/com/yahoo/vespa/indexinglanguage/ScriptTestCase.java17
-rw-r--r--node-repository/src/main/java/com/yahoo/vespa/hosted/provision/restapi/LoadBalancersV1ApiHandler.java73
-rw-r--r--node-repository/src/test/java/com/yahoo/vespa/hosted/provision/restapi/LoadBalancersV1ApiTest.java13
-rw-r--r--searchcore/src/vespa/searchcore/proton/matching/match_tools.cpp2
-rw-r--r--searchcore/src/vespa/searchcore/proton/matching/query.cpp2
-rw-r--r--searchlib/src/vespa/searchlib/queryeval/global_filter.cpp44
-rw-r--r--searchlib/src/vespa/searchlib/queryeval/global_filter.h8
-rw-r--r--storage/src/tests/distributor/pendingmessagetrackertest.cpp57
-rw-r--r--storage/src/tests/visiting/visitormanagertest.cpp18
-rw-r--r--storage/src/tests/visiting/visitortest.cpp8
-rw-r--r--storage/src/vespa/storage/bucketdb/bucketmanager.cpp18
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp30
-rw-r--r--storage/src/vespa/storage/distributor/pendingmessagetracker.cpp7
-rw-r--r--storage/src/vespa/storage/distributor/pendingmessagetracker.h16
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.cpp7
-rw-r--r--storage/src/vespa/storage/visiting/visitor.cpp176
-rw-r--r--storage/src/vespa/storage/visiting/visitor.h57
-rw-r--r--storage/src/vespa/storage/visiting/visitormanager.cpp12
-rw-r--r--storage/src/vespa/storage/visiting/visitormanager.h6
-rw-r--r--storage/src/vespa/storage/visiting/visitorthread.cpp29
-rw-r--r--storage/src/vespa/storage/visiting/visitorthread.h4
-rw-r--r--vespa-athenz/src/main/java/com/yahoo/vespa/athenz/identityprovider/api/EntityBindingsMapper.java19
-rw-r--r--vespa-athenz/src/main/java/com/yahoo/vespa/athenz/identityprovider/api/SignedIdentityDocument.java20
-rw-r--r--vespa-athenz/src/main/java/com/yahoo/vespa/athenz/identityprovider/api/bindings/SignedIdentityDocumentEntity.java52
-rw-r--r--vespa-athenz/src/test/java/com/yahoo/vespa/athenz/identityprovider/api/EntityBindingsMapperTest.java47
-rw-r--r--zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperServer.java4
-rw-r--r--zookeeper-server/zookeeper-server/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java2
-rw-r--r--zookeeper-server/zookeeper-server/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperServerImpl.java3
45 files changed, 1306 insertions, 492 deletions
diff --git a/clustercontroller-apps/src/main/java/com/yahoo/vespa/clustercontroller/apps/clustercontroller/ClusterController.java b/clustercontroller-apps/src/main/java/com/yahoo/vespa/clustercontroller/apps/clustercontroller/ClusterController.java
index 1ac870c45de..50859b837b1 100644
--- a/clustercontroller-apps/src/main/java/com/yahoo/vespa/clustercontroller/apps/clustercontroller/ClusterController.java
+++ b/clustercontroller-apps/src/main/java/com/yahoo/vespa/clustercontroller/apps/clustercontroller/ClusterController.java
@@ -1,8 +1,8 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.clustercontroller.apps.clustercontroller;
-import com.yahoo.component.annotation.Inject;
import com.yahoo.component.AbstractComponent;
+import com.yahoo.component.annotation.Inject;
import com.yahoo.jdisc.Metric;
import com.yahoo.vespa.clustercontroller.apputil.communication.http.JDiscMetricWrapper;
import com.yahoo.vespa.clustercontroller.core.FleetController;
@@ -10,13 +10,10 @@ import com.yahoo.vespa.clustercontroller.core.FleetControllerOptions;
import com.yahoo.vespa.clustercontroller.core.RemoteClusterControllerTaskScheduler;
import com.yahoo.vespa.clustercontroller.core.restapiv2.ClusterControllerStateRestAPI;
import com.yahoo.vespa.clustercontroller.core.status.StatusHandler;
-import com.yahoo.vespa.curator.Curator;
import com.yahoo.vespa.zookeeper.VespaZooKeeperServer;
-
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.TreeMap;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;
@@ -49,7 +46,6 @@ public class ClusterController extends AbstractComponent
public void setOptions(FleetControllerOptions options, Metric metricImpl) throws Exception {
referents.incrementAndGet();
metricWrapper.updateMetricImplementation(metricImpl);
- verifyThatZooKeeperWorks(options);
synchronized (controllers) {
FleetController controller = controllers.get(options.clusterName());
if (controller == null) {
@@ -99,8 +95,6 @@ public class ClusterController extends AbstractComponent
}
}
- FleetController getController(String name) { return controllers.get(name); }
-
@Override
public StatusHandler.ContainerStatusPageServer get(String cluster) {
return status.get(cluster);
@@ -115,16 +109,4 @@ public class ClusterController extends AbstractComponent
controller.shutdown();
}
- /**
- * Block until we are connected to zookeeper server
- */
- private void verifyThatZooKeeperWorks(FleetControllerOptions options) throws Exception {
- if (options.zooKeeperServerAddress() != null && !"".equals(options.zooKeeperServerAddress())) {
- try (Curator curator = Curator.create(options.zooKeeperServerAddress())) {
- if ( ! curator.framework().blockUntilConnected(600, TimeUnit.SECONDS))
- com.yahoo.protect.Process.logAndDie("Failed to connect to ZK, dying and restarting container");
- }
- }
- }
-
}
diff --git a/config-model-api/src/main/java/com/yahoo/config/application/api/Bcp.java b/config-model-api/src/main/java/com/yahoo/config/application/api/Bcp.java
new file mode 100644
index 00000000000..af369dc2672
--- /dev/null
+++ b/config-model-api/src/main/java/com/yahoo/config/application/api/Bcp.java
@@ -0,0 +1,122 @@
+package com.yahoo.config.application.api;
+
+import com.yahoo.config.provision.RegionName;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Defines the BCP structure for an instance in a deployment spec:
+ * A list of region groups where each group contains a set of regions
+ * which will handle the traffic of a member in the group when it becomes unreachable.
+ *
+ * This is used to make bcp-aware autoscaling decisions. If no explicit BCP spec
+ * is provided, it is assumed that a regions traffic will be divided equally over all
+ * the other regions when it becomes unreachable - i.e a single BCP group is implicitly
+ * defined having all defined production regions as members with fraction 1.0.
+ *
+ * It is assumed that the traffic of the unreachable region is distributed
+ * evenly to the other members of the group.
+ *
+ * A region can be a fractional member of a group, in which case it is assumed that
+ * region will only handle that fraction of its share of the unreachable regions traffic,
+ * and symmetrically that the other members of the group will only handle that fraction
+ * of the fraction regions traffic if it becomes unreachable.
+ *
+ * Each production region defined in the instance must have fractional memberships in groups that sums to exactly one.
+ *
+ * If a group has one member it will not set aside any capacity for BCP.
+ * If a group has more than two members, the system will attempt to provision capacity
+ * for BCP also when a region is unreachable. That is, if there are three member regions, A, B and C,
+ * each handling 100 qps, then they each aim to handle 150 in case one goes down. If C goes down,
+ * A and B will now handle 150 each, but will each aim to handle 300 each in case the other goes down.
+ *
+ * @author bratseth
+ */
+public class Bcp {
+
+ private static final Bcp empty = new Bcp(List.of());
+
+ private final List<Group> groups;
+
+ public Bcp(List<Group> groups) {
+ totalMembershipSumsToOne(groups);
+ this.groups = List.copyOf(groups);
+ }
+
+ public List<Group> groups() { return groups; }
+
+ /** Returns the set of regions declared in the groups of this. */
+ public Set<RegionName> regions() {
+ return groups.stream().flatMap(group -> group.members().stream()).map(member -> member.region()).collect(Collectors.toSet());
+ }
+
+ public boolean isEmpty() { return groups.isEmpty(); }
+
+ /** Returns this bcp spec, or if it is empty, the given bcp spec. */
+ public Bcp orElse(Bcp other) {
+ return this.isEmpty() ? other : this;
+ }
+
+ private void totalMembershipSumsToOne(List<Group> groups) {
+ Map<RegionName, Double> totalMembership = new HashMap<>();
+ for (var group : groups) {
+ for (var member : group.members())
+ totalMembership.compute(member.region(), (__, fraction) -> fraction == null ? member.fraction()
+ : fraction + member.fraction());
+ }
+ for (var entry : totalMembership.entrySet()) {
+ if (entry.getValue() != 1.0)
+ throw new IllegalArgumentException("Illegal BCP spec: All regions must have total membership fractions summing to 1.0, but " +
+ entry.getKey() + " sums to " + entry.getValue());
+ }
+ }
+
+ public static Bcp empty() { return empty; }
+
+ @Override
+ public String toString() {
+ if (isEmpty()) return "empty BCP";
+ return "BCP of " + groups;
+ }
+
+ public static class Group {
+
+ private final Duration deadline;
+ private final List<RegionMember> members;
+
+ public Group(List<RegionMember> members, Duration deadline) {
+ this.members = List.copyOf(members);
+ this.deadline = deadline;
+ }
+
+ public List<RegionMember> members() { return members; }
+
+ /**
+ * Returns the max time until the other regions must be able to handle the additional traffic
+ * when a region becomes unreachable, which by default is Duration.ZERO.
+ */
+ public Duration deadline() { return deadline; }
+
+ @Override
+ public String toString() {
+ return "BCP group of " + members;
+ }
+
+ }
+
+ public record RegionMember(RegionName region, double fraction) {
+
+ public RegionMember {
+ if (fraction < 0 || fraction > 1)
+ throw new IllegalArgumentException("Fraction must be a number between 0.0 and 1.0, but got " + fraction);
+ }
+
+
+ }
+
+}
diff --git a/config-model-api/src/main/java/com/yahoo/config/application/api/DeploymentInstanceSpec.java b/config-model-api/src/main/java/com/yahoo/config/application/api/DeploymentInstanceSpec.java
index b36c1409459..4b30734365d 100644
--- a/config-model-api/src/main/java/com/yahoo/config/application/api/DeploymentInstanceSpec.java
+++ b/config-model-api/src/main/java/com/yahoo/config/application/api/DeploymentInstanceSpec.java
@@ -61,6 +61,7 @@ public class DeploymentInstanceSpec extends DeploymentSpec.Steps {
private final Notifications notifications;
private final List<Endpoint> endpoints;
private final Map<ClusterSpec.Id, Map<ZoneId, ZoneEndpoint>> zoneEndpoints;
+ private final Bcp bcp;
public DeploymentInstanceSpec(InstanceName name,
Tags tags,
@@ -77,6 +78,7 @@ public class DeploymentInstanceSpec extends DeploymentSpec.Steps {
Notifications notifications,
List<Endpoint> endpoints,
Map<ClusterSpec.Id, Map<ZoneId, ZoneEndpoint>> zoneEndpoints,
+ Bcp bcp,
Instant now) {
super(steps);
this.name = Objects.requireNonNull(name);
@@ -101,8 +103,9 @@ public class DeploymentInstanceSpec extends DeploymentSpec.Steps {
Map<ClusterSpec.Id, Map<ZoneId, ZoneEndpoint>> zoneEndpointsCopy = new HashMap<>();
for (var entry : zoneEndpoints.entrySet()) zoneEndpointsCopy.put(entry.getKey(), Collections.unmodifiableMap(new HashMap<>(entry.getValue())));
this.zoneEndpoints = Collections.unmodifiableMap(zoneEndpointsCopy);
+ this.bcp = Objects.requireNonNull(bcp);
validateZones(new HashSet<>(), new HashSet<>(), this);
- validateEndpoints(steps(), globalServiceId, this.endpoints);
+ validateEndpoints(globalServiceId, this.endpoints);
validateChangeBlockers(changeBlockers, now);
}
@@ -144,25 +147,41 @@ public class DeploymentInstanceSpec extends DeploymentSpec.Steps {
}
/** Throw an IllegalArgumentException if an endpoint refers to a region that is not declared in 'prod' */
- private void validateEndpoints(List<DeploymentSpec.Step> steps, Optional<String> globalServiceId, List<Endpoint> endpoints) {
+ private void validateEndpoints(Optional<String> globalServiceId, List<Endpoint> endpoints) {
if (globalServiceId.isPresent() && ! endpoints.isEmpty()) {
throw new IllegalArgumentException("Providing both 'endpoints' and 'global-service-id'. Use only 'endpoints'.");
}
- var stepZones = steps.stream()
- .flatMap(s -> s.zones().stream())
- .flatMap(z -> z.region().stream())
- .collect(Collectors.toSet());
-
+ var regions = prodRegions();
for (var endpoint : endpoints){
for (var endpointRegion : endpoint.regions()) {
- if (! stepZones.contains(endpointRegion)) {
+ if (! regions.contains(endpointRegion)) {
throw new IllegalArgumentException("Region used in endpoint that is not declared in 'prod': " + endpointRegion);
}
}
}
}
+ /** Validates the given BCP instance (which is owned by this, or if none, a default) against this instance. */
+ void validateBcp(Bcp bcp) {
+ if (bcp.isEmpty()) return;
+ if ( ! prodRegions().equals(bcp.regions()))
+ throw new IllegalArgumentException("BCP and deployment mismatch in " + this + ": " +
+ "A <bcp> element must place all deployed production regions in " +
+ "at least one group, and declare no extra regions. " +
+ "Deployed regions: " + prodRegions() +
+ ". BCP regions: " + bcp.regions());
+}
+ /** Returns the production regions the steps of this specifies a deployment to. */
+ private Set<RegionName> prodRegions() {
+ return steps().stream()
+ .flatMap(s -> s.zones().stream())
+ .filter(zone -> zone.environment().isProduction())
+ .flatMap(z -> z.region().stream())
+ .collect(Collectors.toSet());
+ }
+
+
private void validateChangeBlockers(List<DeploymentSpec.ChangeBlocker> changeBlockers, Instant now) {
// Find all possible dates an upgrade block window can start
Stream<Instant> blockingFrom = changeBlockers.stream()
@@ -256,6 +275,9 @@ public class DeploymentInstanceSpec extends DeploymentSpec.Steps {
/** Returns the rotations configuration of these instances */
public List<Endpoint> endpoints() { return endpoints; }
+ /** Returns the BCP spec declared in this specified instance, or BcpSpec.empty() if none. */
+ public Bcp bcp() { return bcp; }
+
/** Returns whether this instance deploys to the given zone, either implicitly or explicitly */
public boolean deploysTo(Environment environment, RegionName region) {
return zones().stream().anyMatch(zone -> zone.concerns(environment, Optional.of(region)));
diff --git a/config-model-api/src/main/java/com/yahoo/config/application/api/DeploymentSpec.java b/config-model-api/src/main/java/com/yahoo/config/application/api/DeploymentSpec.java
index cbdb5bd6bcc..41644ebc87d 100644
--- a/config-model-api/src/main/java/com/yahoo/config/application/api/DeploymentSpec.java
+++ b/config-model-api/src/main/java/com/yahoo/config/application/api/DeploymentSpec.java
@@ -18,7 +18,6 @@ import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -47,6 +46,7 @@ public class DeploymentSpec {
Optional.empty(),
Optional.empty(),
List.of(),
+ Bcp.empty(),
"<deployment version='1.0'/>",
List.of());
@@ -58,6 +58,7 @@ public class DeploymentSpec {
private final Optional<AthenzService> athenzService;
private final Optional<CloudAccount> cloudAccount;
private final List<Endpoint> endpoints;
+ private final Bcp bcp;
private final List<DeprecatedElement> deprecatedElements;
private final String xmlForm;
@@ -68,6 +69,7 @@ public class DeploymentSpec {
Optional<AthenzService> athenzService,
Optional<CloudAccount> cloudAccount,
List<Endpoint> endpoints,
+ Bcp bcp,
String xmlForm,
List<DeprecatedElement> deprecatedElements) {
this.steps = List.copyOf(Objects.requireNonNull(steps));
@@ -77,11 +79,13 @@ public class DeploymentSpec {
this.cloudAccount = Objects.requireNonNull(cloudAccount);
this.xmlForm = Objects.requireNonNull(xmlForm);
this.endpoints = List.copyOf(Objects.requireNonNull(endpoints));
+ this.bcp = Objects.requireNonNull(bcp);
this.deprecatedElements = List.copyOf(Objects.requireNonNull(deprecatedElements));
validateTotalDelay(steps);
validateUpgradePoliciesOfIncreasingConservativeness(steps);
validateAthenz();
validateApplicationEndpoints();
+ validateBcp();
}
/** Throw an IllegalArgumentException if the total delay exceeds 24 hours */
@@ -158,13 +162,16 @@ public class DeploymentSpec {
}
}
+ private void validateBcp() {
+ for (var instance : instances())
+ instance.validateBcp(instance.bcp().orElse(bcp()));
+ }
+
/** Returns the major version this application is pinned to, or empty (default) to allow all major versions */
public Optional<Integer> majorVersion() { return majorVersion; }
/** Returns the deployment steps of this in the order they will be performed */
- public List<Step> steps() {
- return steps;
- }
+ public List<Step> steps() { return steps; }
/** Returns the Athenz domain set on the root tag, if any */
public Optional<AthenzDomain> athenzDomain() { return athenzDomain; }
@@ -203,6 +210,9 @@ public class DeploymentSpec {
.orElse(ZoneEndpoint.defaultEndpoint);
}
+ /** Returns the default BCP spec for instances, or Bcp.empty() if none are defined. */
+ public Bcp bcp() { return bcp; }
+
/** Returns the XML form of this spec, or null if it was not created by fromXml, nor is empty */
public String xmlForm() { return xmlForm; }
diff --git a/config-model-api/src/main/java/com/yahoo/config/application/api/xml/DeploymentSpecXmlReader.java b/config-model-api/src/main/java/com/yahoo/config/application/api/xml/DeploymentSpecXmlReader.java
index fb6d834f783..be6be5566a8 100644
--- a/config-model-api/src/main/java/com/yahoo/config/application/api/xml/DeploymentSpecXmlReader.java
+++ b/config-model-api/src/main/java/com/yahoo/config/application/api/xml/DeploymentSpecXmlReader.java
@@ -1,6 +1,7 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.config.application.api.xml;
+import com.yahoo.config.application.api.Bcp;
import com.yahoo.config.application.api.DeploymentInstanceSpec;
import com.yahoo.config.application.api.DeploymentSpec;
import com.yahoo.config.application.api.DeploymentSpec.DeclaredTest;
@@ -46,7 +47,6 @@ import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
@@ -54,6 +54,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.OptionalDouble;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Stream;
@@ -161,6 +162,7 @@ public class DeploymentSpecXmlReader {
stringAttribute(athenzServiceAttribute, root).map(AthenzService::from),
stringAttribute(cloudAccountAttribute, root).map(CloudAccount::from),
applicationEndpoints,
+ readBcp(root),
xmlForm,
deprecatedElements);
}
@@ -228,6 +230,7 @@ public class DeploymentSpecXmlReader {
notifications,
endpoints,
zoneEndpoints,
+ readBcp(instanceElement),
now))
.toList();
}
@@ -455,6 +458,24 @@ public class DeploymentSpecXmlReader {
validateAndConsolidate(endpointsByZone, zoneEndpoints);
}
+ static Bcp readBcp(Element element) {
+ Element bcpElement = XML.getChild(element, "bcp");
+ if (bcpElement == null) return Bcp.empty();
+
+ List<Bcp.Group> groups = new ArrayList<>();
+ for (Element groupElement : XML.getChildren(bcpElement, "group")) {
+ List<Bcp.RegionMember> regions = new ArrayList<>();
+ for (Element regionElement : XML.getChildren(groupElement, "region")) {
+ RegionName region = RegionName.from(XML.getValue(regionElement));
+ double fraction = toDouble(XML.attribute("fraction", regionElement).orElse(null), "fraction").orElse(1.0);
+ regions.add(new Bcp.RegionMember(region, fraction));
+ }
+ Duration deadline = XML.attribute("deadline", groupElement).map(value -> toDuration(value, "deadline")).orElse(Duration.ZERO);
+ groups.add(new Bcp.Group(regions, deadline));
+ }
+ return new Bcp(groups);
+ }
+
static void validateAndConsolidate(Map<String, Map<RegionName, List<ZoneEndpoint>>> in, Map<ClusterSpec.Id, Map<ZoneId, ZoneEndpoint>> out) {
in.forEach((cluster, regions) -> {
List<ZoneEndpoint> wildcards = regions.remove(null);
@@ -713,6 +734,39 @@ public class DeploymentSpecXmlReader {
.findFirst();
}
+ /**
+ * Returns a string consisting of a number followed by "m" or "M" to a duration of that number of minutes,
+ * or zero duration if null of blank.
+ */
+ private static Duration toDuration(String minutesSpec, String sourceDescription) {
+ try {
+ if (minutesSpec == null || minutesSpec.isBlank()) return Duration.ZERO;
+ minutesSpec = minutesSpec.trim().toLowerCase();
+ if ( ! minutesSpec.endsWith("m"))
+ throw new IllegalArgumentException("Must end by 'm'");
+ try {
+ return Duration.ofMinutes(Integer.parseInt(minutesSpec.substring(0, minutesSpec.length() - 1)));
+ }
+ catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Must be an integer number of minutes followed by 'm'");
+ }
+ }
+ catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException("Illegal " + sourceDescription + " '" + minutesSpec + "'", e);
+ }
+ }
+
+ private static OptionalDouble toDouble(String value, String sourceDescription) {
+ try {
+ if (value == null || value.isBlank()) return OptionalDouble.empty();
+ return OptionalDouble.of(Double.parseDouble(value));
+ }
+ catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Illegal " + sourceDescription + " '" + value + "': " +
+ "Must be a number between 0.0 and 1.0");
+ }
+ }
+
private static void illegal(String message) {
throw new IllegalArgumentException(message);
}
diff --git a/config-model-api/src/test/java/com/yahoo/config/application/api/DeploymentSpecTest.java b/config-model-api/src/test/java/com/yahoo/config/application/api/DeploymentSpecTest.java
index 2e746ff55c8..afa7e3e502b 100644
--- a/config-model-api/src/test/java/com/yahoo/config/application/api/DeploymentSpecTest.java
+++ b/config-model-api/src/test/java/com/yahoo/config/application/api/DeploymentSpecTest.java
@@ -106,16 +106,16 @@ public class DeploymentSpecTest {
@Test
public void minimalProductionSpec() {
- StringReader r = new StringReader(
- "<deployment version='1.0'>" +
- " <instance id='default'>" +
- " <prod>" +
- " <region active='false'>us-east1</region>" +
- " <region active='true'>us-west1</region>" +
- " </prod>" +
- " </instance>" +
- "</deployment>"
- );
+ StringReader r = new StringReader( """
+ <deployment version='1.0'>
+ <instance id='default'>
+ <prod>
+ <region active='false'>us-east1</region>
+ <region active='true'>us-west1</region>
+ </prod>
+ </instance>
+ </deployment>
+ """);
DeploymentSpec spec = DeploymentSpec.fromXml(r);
assertEquals(1, spec.steps().size());
diff --git a/config-model-api/src/test/java/com/yahoo/config/application/api/DeploymentSpecWithBcpTest.java b/config-model-api/src/test/java/com/yahoo/config/application/api/DeploymentSpecWithBcpTest.java
new file mode 100644
index 00000000000..77aadc88be8
--- /dev/null
+++ b/config-model-api/src/test/java/com/yahoo/config/application/api/DeploymentSpecWithBcpTest.java
@@ -0,0 +1,254 @@
+package com.yahoo.config.application.api;
+
+import com.yahoo.config.provision.RegionName;
+import com.yahoo.yolean.Exceptions;
+import org.junit.Test;
+
+import java.io.StringReader;
+import java.time.Duration;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * @author bratseth
+ */
+public class DeploymentSpecWithBcpTest {
+
+ @Test
+ public void minimalProductionSpecWithExplicitBcp() {
+ StringReader r = new StringReader("""
+ <deployment version='1.0'>
+ <instance id='default'>
+ <prod>
+ <region>us-east1</region>
+ <region>us-west1</region>
+ </prod>
+ </instance>
+ <bcp>
+ <group>
+ <region>us-east1</region>
+ <region>us-west1</region>
+ </group>
+ </bcp>
+ </deployment>
+ """);
+ assertTwoRegions(DeploymentSpec.fromXml(r));
+ }
+
+ @Test
+ public void specWithoutInstanceWithBcp() {
+ StringReader r = new StringReader("""
+ <deployment version='1.0'>
+ <prod>
+ <region>us-east1</region>
+ <region>us-west1</region>
+ </prod>
+ <bcp>
+ <group>
+ <region>us-east1</region>
+ <region>us-west1</region>
+ </group>
+ </bcp>
+ </deployment>
+ """);
+ assertTwoRegions(DeploymentSpec.fromXml(r));
+ }
+
+ @Test
+ public void complexBcpSetup() {
+ StringReader r = new StringReader("""
+ <deployment version='1.0'>
+ <instance id='beta'>
+ <prod>
+ <region>us-east1</region>
+ <region>us-east2</region>
+ </prod>
+ <bcp>
+ <group deadline="60m">
+ <region>us-east1</region>
+ <region>us-east2</region>
+ </group>
+ </bcp>
+ </instance>
+ <instance id='main'>
+ <prod>
+ <region>us-east1</region>
+ <region>us-east2</region>
+ <region>us-central1</region>
+ <region>us-west1</region>
+ <region>us-west2</region>
+ <region>eu-east1</region>
+ <region>eu-west1</region>
+ </prod>
+ </instance>
+ <bcp>
+ <group>
+ <region>us-east1</region>
+ <region>us-east2</region>
+ <region fraction="0.3">us-central1</region>
+ </group>
+ <group>
+ <region>us-west1</region>
+ <region>us-west2</region>
+ <region fraction="0.7">us-central1</region>
+ </group>
+ <group deadline="30m">
+ <region>eu-east1</region>
+ <region>eu-west1</region>
+ </group>
+ </bcp>
+ </deployment>
+ """);
+ var spec = DeploymentSpec.fromXml(r);
+ var betaBcp = spec.requireInstance("beta").bcp().orElse(spec.bcp());
+ assertEquals(1, betaBcp.groups().size());
+ var betaGroup = betaBcp.groups().get(0);
+ assertEquals(2, betaGroup.members().size());
+ assertEquals(Duration.ofMinutes(60), betaGroup.deadline());
+ assertEquals(new Bcp.RegionMember(RegionName.from("us-east1"), 1.0), betaGroup.members().get(0));
+ assertEquals(new Bcp.RegionMember(RegionName.from("us-east2"), 1.0), betaGroup.members().get(1));
+
+ var mainBcp = spec.requireInstance("main").bcp().orElse(spec.bcp());
+ assertEquals(7, mainBcp.regions().size());
+ assertEquals(3, mainBcp.groups().size());
+
+ var usEast = mainBcp.groups().get(0);
+ assertEquals(3, usEast.members().size());
+ assertEquals(Duration.ofMinutes(0), usEast.deadline());
+ assertEquals(new Bcp.RegionMember(RegionName.from("us-east1"), 1.0), usEast.members().get(0));
+ assertEquals(new Bcp.RegionMember(RegionName.from("us-east2"), 1.0), usEast.members().get(1));
+ assertEquals(new Bcp.RegionMember(RegionName.from("us-central1"), 0.3), usEast.members().get(2));
+
+ var usWest = mainBcp.groups().get(1);
+ assertEquals(3, usWest.members().size());
+ assertEquals(Duration.ofMinutes(0), usWest.deadline());
+ assertEquals(new Bcp.RegionMember(RegionName.from("us-west1"), 1.0), usWest.members().get(0));
+ assertEquals(new Bcp.RegionMember(RegionName.from("us-west2"), 1.0), usWest.members().get(1));
+ assertEquals(new Bcp.RegionMember(RegionName.from("us-central1"), 0.7), usWest.members().get(2));
+
+ var eu = mainBcp.groups().get(2);
+ assertEquals(2, eu.members().size());
+ assertEquals(Duration.ofMinutes(30), eu.deadline());
+ assertEquals(new Bcp.RegionMember(RegionName.from("eu-east1"), 1.0), eu.members().get(0));
+ assertEquals(new Bcp.RegionMember(RegionName.from("eu-west1"), 1.0), eu.members().get(1));
+ }
+
+ @Test
+ public void regionMembershipMatchValidation1() {
+ try {
+ StringReader r = new StringReader("""
+ <deployment version='1.0'>
+ <prod>
+ <region>us-east1</region>
+ <region>us-west1</region>
+ </prod>
+ <bcp>
+ <group>
+ <region>us-west1</region>
+ </group>
+ </bcp>
+ </deployment>
+ """);
+ DeploymentSpec.fromXml(r);
+ fail();
+ }
+ catch (IllegalArgumentException e) {
+ assertEquals("BCP and deployment mismatch in instance 'default': " +
+ "A <bcp> element must place all deployed production regions in at least one group, " +
+ "and declare no extra regions. " +
+ "Deployed regions: [us-east1, us-west1]. BCP regions: [us-west1]",
+ Exceptions.toMessageString(e));
+ }
+ }
+
+ @Test
+ public void regionMembershipMatchValidation2() {
+ try {
+ StringReader r = new StringReader("""
+ <deployment version='1.0'>
+ <prod>
+ <region>us-west1</region>
+ </prod>
+ <bcp>
+ <group>
+ <region>us-east1</region>
+ <region>us-west1</region>
+ </group>
+ </bcp>
+ </deployment>
+ """);
+ DeploymentSpec.fromXml(r);
+ fail();
+ }
+ catch (IllegalArgumentException e) {
+ assertEquals("BCP and deployment mismatch in instance 'default': " +
+ "A <bcp> element must place all deployed production regions in at least one group, " +
+ "and declare no extra regions. " +
+ "Deployed regions: [us-west1]. BCP regions: [us-east1, us-west1]",
+ Exceptions.toMessageString(e));
+ }
+ }
+
+ @Test
+ public void deadlineValidation() {
+ try {
+ StringReader r = new StringReader("""
+ <deployment version='1.0'>
+ <prod>
+ <region>us-east1</region>
+ <region>us-west1</region>
+ </prod>
+ <bcp>
+ <group deadline="fast">
+ <region>us-east1</region>
+ <region>us-west1</region>
+ </group>
+ </bcp>
+ </deployment>
+ """);
+ DeploymentSpec.fromXml(r);
+ fail();
+ }
+ catch (IllegalArgumentException e) {
+ assertEquals("Illegal deadline 'fast': Must end by 'm'", Exceptions.toMessageString(e));
+ }
+ }
+
+ @Test
+ public void fractionalMembershipValidation() {
+ try {
+ StringReader r = new StringReader("""
+ <deployment version='1.0'>
+ <prod>
+ <region>us-east1</region>
+ <region>us-west1</region>
+ </prod>
+ <bcp>
+ <group>
+ <region fraction="0.9">us-east1</region>
+ <region>us-west1</region>
+ </group>
+ </bcp>
+ </deployment>
+ """);
+ DeploymentSpec.fromXml(r);
+ fail();
+ }
+ catch (IllegalArgumentException e) {
+ assertEquals("Illegal BCP spec: All regions must have total membership fractions summing to 1.0, but us-east1 sums to 0.9",
+ Exceptions.toMessageString(e));
+ }
+ }
+
+ private void assertTwoRegions(DeploymentSpec spec) {
+ var bcp = spec.requireInstance("default").bcp().orElse(spec.bcp());
+ assertEquals(1, bcp.groups().size());
+ var group = bcp.groups().get(0);
+ assertEquals(2, group.members().size());
+ assertEquals(Duration.ZERO, group.deadline());
+ assertEquals(new Bcp.RegionMember(RegionName.from("us-east1"), 1.0), group.members().get(0));
+ assertEquals(new Bcp.RegionMember(RegionName.from("us-west1"), 1.0), group.members().get(1));
+ }
+
+}
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/admin/monitoring/AutoscalingMetrics.java b/config-model/src/main/java/com/yahoo/vespa/model/admin/monitoring/AutoscalingMetrics.java
index d09a4a303c2..a3fdce98c73 100644
--- a/config-model/src/main/java/com/yahoo/vespa/model/admin/monitoring/AutoscalingMetrics.java
+++ b/config-model/src/main/java/com/yahoo/vespa/model/admin/monitoring/AutoscalingMetrics.java
@@ -2,6 +2,7 @@
package com.yahoo.vespa.model.admin.monitoring;
import com.yahoo.metrics.ContainerMetrics;
import com.yahoo.metrics.SearchNodeMetrics;
+import com.yahoo.metrics.StorageMetrics;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
@@ -39,10 +40,10 @@ public class AutoscalingMetrics {
metrics.add(SearchNodeMetrics.CONTENT_PROTON_DOCUMENTDB_MATCHING_QUERIES.rate()); // content
// Write rate
- metrics.add("feed.http-requests.rate"); // container
- metrics.add("vds.filestor.allthreads.put.count.rate"); // content
- metrics.add("vds.filestor.allthreads.remove.count.rate"); // content
- metrics.add("vds.filestor.allthreads.update.count.rate"); // content
+ metrics.add(ContainerMetrics.FEED_HTTP_REQUESTS.rate()); // container
+ metrics.add(StorageMetrics.VDS_FILESTOR_ALLTHREADS_PUT_COUNT.rate()); // content
+ metrics.add(StorageMetrics.VDS_FILESTOR_ALLTHREADS_REMOVE_COUNT.rate()); // content
+ metrics.add(StorageMetrics.VDS_FILESTOR_ALLTHREADS_UPDATE_COUNT.rate()); // content
return new MetricSet("autoscaling", toMetrics(metrics));
}
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/admin/monitoring/VespaMetricSet.java b/config-model/src/main/java/com/yahoo/vespa/model/admin/monitoring/VespaMetricSet.java
index 850b25472d2..f3ad181887d 100644
--- a/config-model/src/main/java/com/yahoo/vespa/model/admin/monitoring/VespaMetricSet.java
+++ b/config-model/src/main/java/com/yahoo/vespa/model/admin/monitoring/VespaMetricSet.java
@@ -540,68 +540,69 @@ public class VespaMetricSet {
private static Set<Metric> getStorageMetrics() {
Set<Metric> metrics = new LinkedHashSet<>();
- // TODO: For the purpose of this file and likely elsewhere, all but the last aggregate specifier,
- // TODO: such as 'average' and 'sum' in the metric names below are just confusing and can be mentally
- // TODO: disregarded when considering metric names. Consider cleaning up for Vespa 9.
+ // TODO - Vespa 9: For the purpose of this file and likely elsewhere, all but the last aggregate specifier,
+ // TODO - Vespa 9: such as 'average' and 'sum' in the metric names below are just confusing and can be mentally
+ // TODO - Vespa 9: disregarded when considering metric names. Clean up for Vespa 9.
addMetric(metrics, StorageMetrics.VDS_DATASTORED_ALLDISKS_BUCKETS.average());
addMetric(metrics, StorageMetrics.VDS_DATASTORED_ALLDISKS_DOCS.average());
- addMetric(metrics, "vds.datastored.alldisks.bytes.average");
- addMetric(metrics, "vds.visitor.allthreads.averagevisitorlifetime", List.of("max", "sum", "count"));
- addMetric(metrics, "vds.visitor.allthreads.averagequeuewait", List.of("max", "sum", "count"));
- addMetric(metrics, "vds.visitor.allthreads.queuesize", List.of("max", "sum", "count"));
- addMetric(metrics, "vds.visitor.allthreads.completed.rate");
- addMetric(metrics, "vds.visitor.allthreads.created.rate");
- addMetric(metrics, "vds.visitor.allthreads.failed.rate");
- addMetric(metrics, "vds.visitor.allthreads.averagemessagesendtime", List.of("max", "sum", "count"));
- addMetric(metrics, "vds.visitor.allthreads.averageprocessingtime", List.of("max", "sum", "count"));
-
- addMetric(metrics, "vds.filestor.queuesize", List.of("max", "sum", "count"));
- addMetric(metrics, "vds.filestor.averagequeuewait", List.of("max", "sum", "count"));
- addMetric(metrics, "vds.filestor.active_operations.size", List.of("max", "sum", "count"));
- addMetric(metrics, "vds.filestor.active_operations.latency", List.of("max", "sum", "count"));
- addMetric(metrics, "vds.filestor.throttle_window_size", List.of("max", "sum", "count"));
- addMetric(metrics, "vds.filestor.throttle_waiting_threads", List.of("max", "sum", "count"));
- addMetric(metrics, "vds.filestor.throttle_active_tokens", List.of("max", "sum", "count"));
- addMetric(metrics, "vds.filestor.allthreads.mergemetadatareadlatency", List.of("max", "sum", "count"));
- addMetric(metrics, "vds.filestor.allthreads.mergedatareadlatency", List.of("max", "sum", "count"));
- addMetric(metrics, "vds.filestor.allthreads.mergedatawritelatency", List.of("max", "sum", "count"));
- addMetric(metrics, "vds.filestor.allthreads.put_latency", List.of("max", "sum", "count"));
- addMetric(metrics, "vds.filestor.allthreads.remove_latency", List.of("max", "sum", "count"));
- addMetric(metrics, "vds.filestor.allstripes.throttled_rpc_direct_dispatches.rate");
- addMetric(metrics, "vds.filestor.allstripes.throttled_persistence_thread_polls.rate");
- addMetric(metrics, "vds.filestor.allstripes.timeouts_waiting_for_throttle_token.rate");
-
- addMetric(metrics, "vds.filestor.allthreads.put.count.rate");
- addMetric(metrics, "vds.filestor.allthreads.put.failed.rate");
- addMetric(metrics, "vds.filestor.allthreads.put.test_and_set_failed.rate");
- addMetric(metrics, "vds.filestor.allthreads.put.latency", List.of("max", "sum", "count"));
- addMetric(metrics, "vds.filestor.allthreads.put.request_size", List.of("max", "sum", "count"));
- addMetric(metrics, "vds.filestor.allthreads.remove.count.rate");
- addMetric(metrics, "vds.filestor.allthreads.remove.failed.rate");
- addMetric(metrics, "vds.filestor.allthreads.remove.test_and_set_failed.rate");
- addMetric(metrics, "vds.filestor.allthreads.remove.latency", List.of("max", "sum", "count"));
- addMetric(metrics, "vds.filestor.allthreads.remove.request_size", List.of("max", "sum", "count"));
- addMetric(metrics, "vds.filestor.allthreads.get.count.rate");
- addMetric(metrics, "vds.filestor.allthreads.get.failed.rate");
- addMetric(metrics, "vds.filestor.allthreads.get.latency", List.of("max", "sum", "count"));
- addMetric(metrics, "vds.filestor.allthreads.get.request_size", List.of("max", "sum", "count"));
- addMetric(metrics, "vds.filestor.allthreads.update.count.rate");
- addMetric(metrics, "vds.filestor.allthreads.update.failed.rate");
- addMetric(metrics, "vds.filestor.allthreads.update.test_and_set_failed.rate");
- addMetric(metrics, "vds.filestor.allthreads.update.latency", List.of("max", "sum", "count"));
- addMetric(metrics, "vds.filestor.allthreads.update.request_size", List.of("max", "sum", "count"));
- addMetric(metrics, "vds.filestor.allthreads.createiterator.latency", List.of("max", "sum", "count"));
- addMetric(metrics, "vds.filestor.allthreads.createiterator.count.rate");
- addMetric(metrics, "vds.filestor.allthreads.visit.count.rate");
- addMetric(metrics, "vds.filestor.allthreads.visit.latency", List.of("max", "sum", "count"));
- addMetric(metrics, "vds.filestor.allthreads.remove_location.count.rate");
- addMetric(metrics, "vds.filestor.allthreads.remove_location.latency", List.of("max", "sum", "count"));
- addMetric(metrics, "vds.filestor.allthreads.splitbuckets.count.rate");
- addMetric(metrics, "vds.filestor.allthreads.joinbuckets.count.rate");
- addMetric(metrics, "vds.filestor.allthreads.deletebuckets.count.rate");
- addMetric(metrics, "vds.filestor.allthreads.deletebuckets.failed.rate");
- addMetric(metrics, "vds.filestor.allthreads.deletebuckets.latency", List.of("max", "sum", "count"));
- addMetric(metrics, "vds.filestor.allthreads.setbucketstates.count.rate");
+ addMetric(metrics, StorageMetrics.VDS_DATASTORED_ALLDISKS_BYTES.average());
+ addMetric(metrics, StorageMetrics.VDS_VISITOR_ALLTHREADS_AVERAGEVISITORLIFETIME, EnumSet.of(max, sum, count));
+ addMetric(metrics, StorageMetrics.VDS_VISITOR_ALLTHREADS_AVERAGEQUEUEWAIT, EnumSet.of(max, sum, count));
+ addMetric(metrics, StorageMetrics.VDS_VISITOR_ALLTHREADS_QUEUESIZE, EnumSet.of(max, sum, count));
+ addMetric(metrics, StorageMetrics.VDS_VISITOR_ALLTHREADS_COMPLETED.rate());
+ addMetric(metrics, StorageMetrics.VDS_VISITOR_ALLTHREADS_CREATED.rate());
+ addMetric(metrics, StorageMetrics.VDS_VISITOR_ALLTHREADS_FAILED.rate());
+ addMetric(metrics, StorageMetrics.VDS_VISITOR_ALLTHREADS_AVERAGEMESSAGESENDTIME, EnumSet.of(max, sum, count));
+ addMetric(metrics, StorageMetrics.VDS_VISITOR_ALLTHREADS_AVERAGEPROCESSINGTIME, EnumSet.of(max, sum, count));
+
+ addMetric(metrics, StorageMetrics.VDS_FILESTOR_QUEUESIZE, EnumSet.of(max, sum, count));
+ addMetric(metrics, StorageMetrics.VDS_FILESTOR_AVERAGEQUEUEWAIT, EnumSet.of(max, sum, count));
+ addMetric(metrics, StorageMetrics.VDS_FILESTOR_ACTIVE_OPERATIONS_SIZE, EnumSet.of(max, sum, count));
+ addMetric(metrics, StorageMetrics.VDS_FILESTOR_ACTIVE_OPERATIONS_LATENCY, EnumSet.of(max, sum, count));
+ addMetric(metrics, StorageMetrics.VDS_FILESTOR_THROTTLE_WINDOW_SIZE, EnumSet.of(max, sum, count));
+ addMetric(metrics, StorageMetrics.VDS_FILESTOR_THROTTLE_WAITING_THREADS, EnumSet.of(max, sum, count));
+ addMetric(metrics, StorageMetrics.VDS_FILESTOR_THROTTLE_ACTIVE_TOKENS, EnumSet.of(max, sum, count));
+ addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_MERGEMETADATAREADLATENCY, EnumSet.of(max, sum, count));
+ addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_MERGEDATAREADLATENCY, EnumSet.of(max, sum, count));
+ addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_MERGEDATAWRITELATENCY, EnumSet.of(max, sum, count));
+ addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_MERGE_PUT_LATENCY, EnumSet.of(max, sum, count));
+ addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_MERGE_REMOVE_LATENCY, EnumSet.of(max, sum, count));
+ addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLSTRIPES_THROTTLED_RPC_DIRECT_DISPATCHES.rate());
+ addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLSTRIPES_THROTTLED_PERSISTENCE_THREAD_POLLS.rate());
+ addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLSTRIPES_TIMEOUTS_WAITING_FOR_THROTTLE_TOKEN.rate());
+
+ addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_PUT_COUNT.rate());
+ addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_PUT_FAILED.rate());
+ addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_PUT_TEST_AND_SET_FAILED.rate());
+ addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_PUT_LATENCY, EnumSet.of(max, sum, count));
+ addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_PUT_REQUEST_SIZE, EnumSet.of(max, sum, count));
+ addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_REMOVE_COUNT.rate());
+ addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_REMOVE_FAILED.rate());
+ addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_REMOVE_TEST_AND_SET_FAILED.rate());
+ addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_REMOVE_LATENCY, EnumSet.of(max, sum, count));
+ addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_REMOVE_REQUEST_SIZE, EnumSet.of(max, sum, count));
+ addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_GET_COUNT.rate());
+ addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_GET_FAILED.rate());
+ addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_GET_LATENCY, EnumSet.of(max, sum, count));
+ addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_GET_REQUEST_SIZE, EnumSet.of(max, sum, count));
+ addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_UPDATE_COUNT.rate());
+ addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_UPDATE_FAILED.rate());
+ addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_UPDATE_TEST_AND_SET_FAILED.rate());
+ addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_UPDATE_LATENCY, EnumSet.of(max, sum, count));
+ addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_UPDATE_REQUEST_SIZE, EnumSet.of(max, sum, count));
+ addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_CREATEITERATOR_COUNT.rate());
+ addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_CREATEITERATOR_LATENCY, EnumSet.of(max, sum, count));
+ addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_VISIT_COUNT.rate());
+ addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_VISIT_LATENCY, EnumSet.of(max, sum, count));
+ addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_REMOVE_LOCATION_COUNT.rate());
+ addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_REMOVE_LOCATION_LATENCY, EnumSet.of(max, sum, count));
+ addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_SPLITBUCKETS_COUNT.rate());
+ addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_JOINBUCKETS_COUNT.rate());
+ addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_DELETEBUCKETS_COUNT.rate());
+ addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_DELETEBUCKETS_FAILED.rate());
+ addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_DELETEBUCKETS_LATENCY, EnumSet.of(max, sum, count));
+ addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_SETBUCKETSTATES_COUNT.rate());
+
return metrics;
}
private static Set<Metric> getDistributorMetrics() {
@@ -713,6 +714,10 @@ public class VespaMetricSet {
suffixes.forEach(suffix -> metrics.add(new Metric(metric.baseName() + "." + suffix.suffix())));
}
+ private static void addMetric(Set<Metric> metrics, StorageMetrics metric, EnumSet<Suffix> suffixes) {
+ suffixes.forEach(suffix -> metrics.add(new Metric(metric.baseName() + "." + suffix.suffix())));
+ }
+
private static void addMetric(Set<Metric> metrics, String metricName, Iterable<String> aggregateSuffices) {
for (String suffix : aggregateSuffices) {
metrics.add(new Metric(metricName + "." + suffix));
diff --git a/container-core/src/main/java/com/yahoo/metrics/StorageMetrics.java b/container-core/src/main/java/com/yahoo/metrics/StorageMetrics.java
index 12d4085ff4c..825ba67dca1 100644
--- a/container-core/src/main/java/com/yahoo/metrics/StorageMetrics.java
+++ b/container-core/src/main/java/com/yahoo/metrics/StorageMetrics.java
@@ -8,7 +8,64 @@ import java.util.List;
public enum StorageMetrics implements VespaMetrics {
VDS_DATASTORED_ALLDISKS_BUCKETS("vds.datastored.alldisks.buckets", Unit.BUCKET, "Number of buckets managed"),
- VDS_DATASTORED_ALLDISKS_DOCS("vds.datastored.alldisks.docs", Unit.DOCUMENT, "Number of documents stored");
+ VDS_DATASTORED_ALLDISKS_DOCS("vds.datastored.alldisks.docs", Unit.DOCUMENT, "Number of documents stored"),
+ VDS_DATASTORED_ALLDISKS_BYTES("vds.datastored.alldisks.bytes", Unit.BYTE, "Number of bytes stored"),
+ VDS_VISITOR_ALLTHREADS_AVERAGEVISITORLIFETIME("vds.visitor.allthreads.averagevisitorlifetime", Unit.MILLISECOND, "Average lifetime of a visitor"),
+ VDS_VISITOR_ALLTHREADS_AVERAGEQUEUEWAIT("vds.visitor.allthreads.averagequeuewait", Unit.MILLISECOND, "Average time an operation spends in input queue."),
+ VDS_VISITOR_ALLTHREADS_QUEUESIZE("vds.visitor.allthreads.queuesize", Unit.OPERATION, "Size of input message queue."),
+ VDS_VISITOR_ALLTHREADS_COMPLETED("vds.visitor.allthreads.completed", Unit.OPERATION, "Number of visitors completed"),
+ VDS_VISITOR_ALLTHREADS_CREATED("vds.visitor.allthreads.created", Unit.OPERATION, "Number of visitors created."),
+ VDS_VISITOR_ALLTHREADS_FAILED("vds.visitor.allthreads.failed", Unit.OPERATION, "Number of visitors failed"),
+ VDS_VISITOR_ALLTHREADS_AVERAGEMESSAGESENDTIME("vds.visitor.allthreads.averagemessagesendtime", Unit.MILLISECOND, "Average time it takes for messages to be sent to their target (and be replied to)"),
+ VDS_VISITOR_ALLTHREADS_AVERAGEPROCESSINGTIME("vds.visitor.allthreads.averageprocessingtime", Unit.MILLISECOND, "Average time used to process visitor requests"),
+
+ VDS_FILESTOR_QUEUESIZE("vds.filestor.queuesize", Unit.OPERATION, "Size of input message queue."),
+ VDS_FILESTOR_AVERAGEQUEUEWAIT("vds.filestor.averagequeuewait", Unit.MILLISECOND, "Average time an operation spends in input queue."),
+ VDS_FILESTOR_ACTIVE_OPERATIONS_SIZE("vds.filestor.active_operations.size", Unit.OPERATION, "Number of concurrent active operations"),
+ VDS_FILESTOR_ACTIVE_OPERATIONS_LATENCY("vds.filestor.active_operations.latency", Unit.MILLISECOND, "Latency (in ms) for completed operations"), // TODO Vespa 9: Remove 'active' from the metric name
+ VDS_FILESTOR_THROTTLE_WINDOW_SIZE("vds.filestor.throttle_window_size", Unit.OPERATION, "Current size of async operation throttler window size"),
+ VDS_FILESTOR_THROTTLE_WAITING_THREADS("vds.filestor.throttle_waiting_threads", Unit.THREAD, "Number of threads waiting to acquire a throttle token"),
+ VDS_FILESTOR_THROTTLE_ACTIVE_TOKENS("vds.filestor.throttle_active_tokens", Unit.INSTANCE, "Current number of active throttle tokens"),
+ VDS_FILESTOR_ALLTHREADS_MERGEMETADATAREADLATENCY("vds.filestor.allthreads.mergemetadatareadlatency", Unit.MILLISECOND, "Time spent in a merge step to check metadata of current node to see what data it has."),
+ VDS_FILESTOR_ALLTHREADS_MERGEDATAREADLATENCY("vds.filestor.allthreads.mergedatareadlatency", Unit.MILLISECOND, "Time spent in a merge step to read data other nodes need."),
+ VDS_FILESTOR_ALLTHREADS_MERGEDATAWRITELATENCY("vds.filestor.allthreads.mergedatawritelatency", Unit.MILLISECOND, "Time spent in a merge step to write data needed to current node."),
+ VDS_FILESTOR_ALLTHREADS_MERGE_PUT_LATENCY("vds.filestor.allthreads.put_latency", Unit.MILLISECOND, "Latency of individual puts that are part of merge operations"), // TODO Vespa 9: Update metric name to include 'merge'
+ VDS_FILESTOR_ALLTHREADS_MERGE_REMOVE_LATENCY("vds.filestor.allthreads.remove_latency", Unit.MILLISECOND, "Latency of individual removes that are part of merge operations"), // TODO Vespa 9: Update metric name to include 'merge'
+ VDS_FILESTOR_ALLSTRIPES_THROTTLED_RPC_DIRECT_DISPATCHES("vds.filestor.allstripes.throttled_rpc_direct_dispatches", Unit.INSTANCE, "Number of times an RPC thread could not directly dispatch an async operation directly to Proton because it was disallowed by the throttle policy"),
+ VDS_FILESTOR_ALLSTRIPES_THROTTLED_PERSISTENCE_THREAD_POLLS("vds.filestor.allstripes.throttled_persistence_thread_polls", Unit.INSTANCE, "Number of times a persistence thread could not immediately dispatch a queued async operation because it was disallowed by the throttle policy"),
+ VDS_FILESTOR_ALLSTRIPES_TIMEOUTS_WAITING_FOR_THROTTLE_TOKEN("vds.filestor.allstripes.timeouts_waiting_for_throttle_token", Unit.INSTANCE, "Number of times a persistence thread timed out waiting for an available throttle policy token"),
+
+ VDS_FILESTOR_ALLTHREADS_PUT_COUNT("vds.filestor.allthreads.put.count", Unit.OPERATION, "Number of requests processed."),
+ VDS_FILESTOR_ALLTHREADS_PUT_FAILED("vds.filestor.allthreads.put.failed", Unit.OPERATION, "Number of failed requests."),
+ VDS_FILESTOR_ALLTHREADS_PUT_TEST_AND_SET_FAILED("vds.filestor.allthreads.put.test_and_set_failed", Unit.OPERATION, "Number of operations that were skipped due to a test-and-set condition not met"),
+ VDS_FILESTOR_ALLTHREADS_PUT_LATENCY("vds.filestor.allthreads.put.latency", Unit.MILLISECOND, "Latency of successful requests."),
+ VDS_FILESTOR_ALLTHREADS_PUT_REQUEST_SIZE("vds.filestor.allthreads.put.request_size", Unit.BYTE, "Size of requests, in bytes"),
+ VDS_FILESTOR_ALLTHREADS_REMOVE_COUNT("vds.filestor.allthreads.remove.count", Unit.OPERATION, "Number of requests processed."),
+ VDS_FILESTOR_ALLTHREADS_REMOVE_FAILED("vds.filestor.allthreads.remove.failed", Unit.OPERATION, "Number of failed requests."),
+ VDS_FILESTOR_ALLTHREADS_REMOVE_TEST_AND_SET_FAILED("vds.filestor.allthreads.remove.test_and_set_failed", Unit.OPERATION, "Number of operations that were skipped due to a test-and-set condition not met"),
+ VDS_FILESTOR_ALLTHREADS_REMOVE_LATENCY("vds.filestor.allthreads.remove.latency", Unit.MILLISECOND, "Latency of successful requests."),
+ VDS_FILESTOR_ALLTHREADS_REMOVE_REQUEST_SIZE("vds.filestor.allthreads.remove.request_size", Unit.BYTE, "Size of requests, in bytes"),
+ VDS_FILESTOR_ALLTHREADS_GET_COUNT("vds.filestor.allthreads.get.count", Unit.OPERATION, "Number of requests processed."),
+ VDS_FILESTOR_ALLTHREADS_GET_FAILED("vds.filestor.allthreads.get.failed", Unit.OPERATION, "Number of failed requests."),
+ VDS_FILESTOR_ALLTHREADS_GET_LATENCY("vds.filestor.allthreads.get.latency", Unit.MILLISECOND, "Latency of successful requests."),
+ VDS_FILESTOR_ALLTHREADS_GET_REQUEST_SIZE("vds.filestor.allthreads.get.request_size", Unit.BYTE, "Size of requests, in bytes"),
+ VDS_FILESTOR_ALLTHREADS_UPDATE_COUNT("vds.filestor.allthreads.update.count", Unit.OPERATION, "Number of requests processed."),
+ VDS_FILESTOR_ALLTHREADS_UPDATE_FAILED("vds.filestor.allthreads.update.failed", Unit.OPERATION, "Number of failed requests."),
+ VDS_FILESTOR_ALLTHREADS_UPDATE_TEST_AND_SET_FAILED("vds.filestor.allthreads.update.test_and_set_failed", Unit.OPERATION, "Number of operations that were skipped due to a test-and-set condition not met"),
+ VDS_FILESTOR_ALLTHREADS_UPDATE_LATENCY("vds.filestor.allthreads.update.latency", Unit.MILLISECOND, "Latency of successful requests."),
+ VDS_FILESTOR_ALLTHREADS_UPDATE_REQUEST_SIZE("vds.filestor.allthreads.update.request_size", Unit.BYTE, "Size of requests, in bytes"),
+ VDS_FILESTOR_ALLTHREADS_CREATEITERATOR_COUNT("vds.filestor.allthreads.createiterator.count", Unit.OPERATION, "Number of requests processed."),
+ VDS_FILESTOR_ALLTHREADS_CREATEITERATOR_LATENCY("vds.filestor.allthreads.createiterator.latency", Unit.MILLISECOND, "Latency of successful requests."),
+ VDS_FILESTOR_ALLTHREADS_VISIT_COUNT("vds.filestor.allthreads.visit.count", Unit.OPERATION, "Number of requests processed."),
+ VDS_FILESTOR_ALLTHREADS_VISIT_LATENCY("vds.filestor.allthreads.visit.latency", Unit.MILLISECOND, "Latency of successful requests."),
+ VDS_FILESTOR_ALLTHREADS_REMOVE_LOCATION_COUNT("vds.filestor.allthreads.remove_location.count", Unit.OPERATION, "Number of requests processed."),
+ VDS_FILESTOR_ALLTHREADS_REMOVE_LOCATION_LATENCY("vds.filestor.allthreads.remove_location.latency", Unit.MILLISECOND, "Latency of successful requests."),
+ VDS_FILESTOR_ALLTHREADS_SPLITBUCKETS_COUNT("vds.filestor.allthreads.splitbuckets.count", Unit.OPERATION, "Number of requests processed."),
+ VDS_FILESTOR_ALLTHREADS_JOINBUCKETS_COUNT("vds.filestor.allthreads.joinbuckets.count", Unit.OPERATION, "Number of requests processed."),
+ VDS_FILESTOR_ALLTHREADS_DELETEBUCKETS_COUNT("vds.filestor.allthreads.deletebuckets.count", Unit.OPERATION, "Number of requests processed."),
+ VDS_FILESTOR_ALLTHREADS_DELETEBUCKETS_FAILED("vds.filestor.allthreads.deletebuckets.failed", Unit.OPERATION, "Number of failed requests."),
+ VDS_FILESTOR_ALLTHREADS_DELETEBUCKETS_LATENCY("vds.filestor.allthreads.deletebuckets.latency", Unit.MILLISECOND, "Latency of successful requests."),
+ VDS_FILESTOR_ALLTHREADS_SETBUCKETSTATES_COUNT("vds.filestor.allthreads.setbucketstates.count", Unit.OPERATION, "Number of requests processed.");
private final String name;
private final Unit unit;
diff --git a/container-core/src/main/java/com/yahoo/metrics/Unit.java b/container-core/src/main/java/com/yahoo/metrics/Unit.java
index cfc4b6f5d21..bb7718ddb4c 100644
--- a/container-core/src/main/java/com/yahoo/metrics/Unit.java
+++ b/container-core/src/main/java/com/yahoo/metrics/Unit.java
@@ -16,6 +16,7 @@ public enum Unit {
FRACTION(BaseUnit.FRACTION),
HIT(BaseUnit.HIT),
HIT_PER_QUERY(BaseUnit.HIT, BaseUnit.QUERY),
+ INSTANCE(BaseUnit.INSTANCE),
ITEM(BaseUnit.ITEM),
MILLISECOND(BaseUnit.MILLISECOND),
NANOSECOND(BaseUnit.NANOSECOND),
@@ -71,6 +72,7 @@ public enum Unit {
FILE("file"),
FRACTION("fraction"),
HIT("hit"),
+ INSTANCE("instance"),
ITEM("item"),
MILLISECOND("millisecond", "ms"),
NANOSECOND("nanosecond", "ns"),
diff --git a/container-core/src/main/java/com/yahoo/restapi/Path.java b/container-core/src/main/java/com/yahoo/restapi/Path.java
index 01bcb627639..2fae8da0c2d 100644
--- a/container-core/src/main/java/com/yahoo/restapi/Path.java
+++ b/container-core/src/main/java/com/yahoo/restapi/Path.java
@@ -85,7 +85,7 @@ public class Path {
*
* Returns whether this path matches the given template string.
* If the given template has placeholders, their values (accessible by get) are reset by calling this,
- * whether or not the path matches the given template.
+ * whether the path matches the given template.
*
* This will NOT match empty path elements.
*
diff --git a/container-search/src/test/java/com/yahoo/search/yql/YqlParserTestCase.java b/container-search/src/test/java/com/yahoo/search/yql/YqlParserTestCase.java
index bc9448f39d8..33cd2f48d46 100644
--- a/container-search/src/test/java/com/yahoo/search/yql/YqlParserTestCase.java
+++ b/container-search/src/test/java/com/yahoo/search/yql/YqlParserTestCase.java
@@ -241,6 +241,11 @@ public class YqlParserTestCase {
}
@Test
+ void testNonEquality() {
+ assertParse("select foo from bar where !(price = 500)", "-price:500");
+ }
+
+ @Test
void testNegativeLessThan() {
assertParse("select foo from bar where price < -500", "price:<-500");
assertParse("select foo from bar where -500 < price", "price:>-500");
diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/TrafficShareUpdater.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/TrafficShareUpdater.java
index 832dbb6b921..aea01ae36d3 100644
--- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/TrafficShareUpdater.java
+++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/TrafficShareUpdater.java
@@ -1,6 +1,10 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.hosted.controller.maintenance;
+import com.yahoo.config.application.api.Bcp;
+import com.yahoo.config.application.api.DeploymentSpec;
+import com.yahoo.config.provision.InstanceName;
+import com.yahoo.config.provision.RegionName;
import com.yahoo.vespa.hosted.controller.ApplicationController;
import com.yahoo.vespa.hosted.controller.Controller;
import com.yahoo.vespa.hosted.controller.Instance;
@@ -8,7 +12,11 @@ import com.yahoo.vespa.hosted.controller.api.integration.configserver.NodeReposi
import com.yahoo.vespa.hosted.controller.application.Deployment;
import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
import java.util.logging.Level;
+import java.util.stream.Collectors;
/**
* This computes, for every application deployment
@@ -41,12 +49,11 @@ public class TrafficShareUpdater extends ControllerMaintainer {
int failures = 0;
for (var application : applications.asList()) {
for (var instance : application.instances().values()) {
- for (var deployment : instance.deployments().values()) {
- if ( ! deployment.zone().environment().isProduction()) continue;
+ for (var deployment : instance.productionDeployments().values()) {
if (shuttingDown()) return 1.0;
try {
attempts++;
- updateTrafficFraction(instance, deployment);
+ updateTrafficFraction(instance, deployment, application.deploymentSpec());
}
catch (Exception e) {
// Some failures due to locked applications are expected and benign
@@ -62,20 +69,94 @@ public class TrafficShareUpdater extends ControllerMaintainer {
return successFactor;
}
- private void updateTrafficFraction(Instance instance, Deployment deployment) {
- double qpsInZone = deployment.metrics().queriesPerSecond();
- double totalQps = instance.deployments().values().stream()
- .filter(i -> i.zone().environment().isProduction())
- .mapToDouble(i -> i.metrics().queriesPerSecond()).sum();
- long prodRegions = instance.deployments().values().stream()
- .filter(i -> i.zone().environment().isProduction())
- .count();
- double currentReadShare = totalQps == 0 ? 0 : qpsInZone / totalQps;
- double maxReadShare = prodRegions < 2 ? 1.0 : 1.0 / ( prodRegions - 1.0);
- if (currentReadShare > maxReadShare) // This can happen because the assumption of equal traffic
- maxReadShare = currentReadShare; // distribution can be incorrect
+ private void updateTrafficFraction(Instance instance, Deployment deployment, DeploymentSpec deploymentSpec) {
+ // maxReadShare / currentReadShare = how much additional traffic must the zone be able to handle
+ double currentReadShare = 0; // How much of the total traffic of the group(s) this is a member of does this deployment receive
+ double maxReadShare = 0; // How much of the total traffic of the group(s) this is a member of might this deployment receive if a member of the group fails
+ for (BcpGroup group : BcpGroup.groupsFrom(instance, deploymentSpec)) {
+ if ( ! group.contains(deployment.zone().region())) continue;
+ double deploymentQps = deployment.metrics().queriesPerSecond();
+ double groupQps = group.totalQps();
+ double fraction = group.fraction(deployment.zone().region());
+ currentReadShare += groupQps == 0 ? 0 : fraction * deploymentQps / groupQps;
+ maxReadShare += group.size() == 1
+ ? currentReadShare
+ : fraction * ( deploymentQps + group.maxQpsExcluding(deployment.zone().region()) / (group.size() - 1) ) / groupQps;
+ }
nodeRepository.patchApplication(deployment.zone(), instance.id(), currentReadShare, maxReadShare);
}
+ /**
+ * A set of regions which will take over traffic from each other if one of them fails.
+ * Each region will take an equal share (modulated by fraction) of the failing region's traffic.
+ *
+ * A regions membership in a group may be partial, represented by a fraction [0, 1],
+ * in which case the other regions will collectively only take that fraction of the failing regions traffic,
+ * and symmetrically, the region will only take its fraction of its share of traffic of any other failing region.
+ */
+ private static class BcpGroup {
+
+ /** The instance which has this group. */
+ private final Instance instance;
+
+ /** Regions in this group, with their fractions. */
+ private final Map<RegionName, Double> regions;
+
+ /** Creates a group of a subset of the deployments in this instance. */
+ private BcpGroup(Instance instance, Map<RegionName, Double> regions) {
+ this.instance = instance;
+ this.regions = regions;
+ }
+
+ /** Returns the sum of the fractional memberships of this. */
+ double size() {
+ return regions.values().stream().mapToDouble(f -> f).sum();
+ }
+
+ double fraction(RegionName region) {
+ return regions.getOrDefault(region, 0.0);
+ }
+
+ boolean contains(RegionName region) {
+ return regions.containsKey(region);
+ }
+
+ double totalQps() {
+ return instance.productionDeployments().values().stream()
+ .mapToDouble(i -> i.metrics().queriesPerSecond()).sum();
+ }
+
+ double maxQpsExcluding(RegionName region) {
+ return instance.productionDeployments().values().stream()
+ .filter(d -> ! d.zone().region().equals(region))
+ .mapToDouble(d -> d.metrics().queriesPerSecond() * fraction(d.zone().region()))
+ .max()
+ .orElse(0);
+ }
+ private static Bcp bcpOf(InstanceName instanceName, DeploymentSpec deploymentSpec) {
+ var instanceSpec = deploymentSpec.instance(instanceName);
+ if (instanceSpec.isEmpty()) return deploymentSpec.bcp();
+ return instanceSpec.get().bcp().orElse(deploymentSpec.bcp());
+ }
+
+ private static Map<RegionName, Double> regionsFrom(Instance instance) {
+ return instance.productionDeployments().values().stream()
+ .collect(Collectors.toMap(deployment -> deployment.zone().region(), __ -> 1.0));
+ }
+
+ private static Map<RegionName, Double> regionsFrom(Bcp.Group groupSpec) {
+ return groupSpec.members().stream()
+ .collect(Collectors.toMap(member -> member.region(), member -> member.fraction()));
+ }
+
+ static List<BcpGroup> groupsFrom(Instance instance, DeploymentSpec deploymentSpec) {
+ Bcp bcp = bcpOf(instance.name(), deploymentSpec);
+ if (bcp.isEmpty())
+ return List.of(new BcpGroup(instance, regionsFrom(instance)));
+ return bcp.groups().stream().map(groupSpec -> new BcpGroup(instance, regionsFrom(groupSpec))).toList();
+ }
+
+ }
+
}
diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/DeploymentContext.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/DeploymentContext.java
index 14835a822e6..b712746b663 100644
--- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/DeploymentContext.java
+++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/DeploymentContext.java
@@ -15,6 +15,7 @@ import com.yahoo.security.KeyUtils;
import com.yahoo.security.SignatureAlgorithm;
import com.yahoo.security.X509CertificateBuilder;
import com.yahoo.vespa.hosted.controller.Application;
+import com.yahoo.vespa.hosted.controller.Controller;
import com.yahoo.vespa.hosted.controller.ControllerTester;
import com.yahoo.vespa.hosted.controller.Instance;
import com.yahoo.vespa.hosted.controller.api.identifiers.DeploymentId;
@@ -52,6 +53,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
+import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
@@ -196,8 +198,9 @@ public class DeploymentContext {
Application application = application();
assertTrue(application.revisions().last().isPresent(), "Application package submitted");
assertFalse(application.instances().values().stream()
- .anyMatch(instance -> instance.deployments().values().stream()
- .anyMatch(deployment -> deployment.revision().equals(lastSubmission))), "Submission is not already deployed");
+ .anyMatch(instance -> instance.deployments().values().stream()
+ .anyMatch(deployment -> deployment.revision().equals(lastSubmission))),
+ "Submission is not already deployed");
completeRollout(application.deploymentSpec().instances().size() > 1);
for (var instance : application().instances().values()) {
assertFalse(instance.change().hasTargets());
diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/TrafficShareUpdaterTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/TrafficShareUpdaterTest.java
index fad85ef9b48..5c26e270846 100644
--- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/TrafficShareUpdaterTest.java
+++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/TrafficShareUpdaterTest.java
@@ -2,8 +2,10 @@
package com.yahoo.vespa.hosted.controller.maintenance;
import com.yahoo.component.Version;
+import com.yahoo.config.application.api.xml.DeploymentSpecXmlReader;
import com.yahoo.config.provision.ApplicationId;
import com.yahoo.config.provision.zone.ZoneId;
+import com.yahoo.vespa.hosted.controller.Application;
import com.yahoo.vespa.hosted.controller.api.application.v4.model.ClusterMetrics;
import com.yahoo.vespa.hosted.controller.api.identifiers.DeploymentId;
import com.yahoo.vespa.hosted.controller.application.pkg.ApplicationPackage;
@@ -14,6 +16,7 @@ import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.util.Map;
+import java.util.OptionalLong;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -25,61 +28,184 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
public class TrafficShareUpdaterTest {
@Test
- void testTrafficUpdater() {
+ void testTrafficUpdaterImplicitBcp() {
DeploymentTester tester = new DeploymentTester();
Version version = Version.fromString("7.1");
- tester.controllerTester().upgradeSystem(version);
- var application = tester.newDeploymentContext();
+ tester.controllerTester().upgradeSystem(Version.fromString("7.1"));
+ var context = tester.newDeploymentContext();
var deploymentMetricsMaintainer = new DeploymentMetricsMaintainer(tester.controller(), Duration.ofDays(1));
var updater = new TrafficShareUpdater(tester.controller(), Duration.ofDays(1));
ZoneId prod1 = ZoneId.from("prod", "ap-northeast-1");
ZoneId prod2 = ZoneId.from("prod", "us-east-3");
ZoneId prod3 = ZoneId.from("prod", "us-west-1");
- application.runJob(DeploymentContext.productionApNortheast1, new ApplicationPackage(new byte[0]), version);
+ context.runJob(DeploymentContext.perfUsEast3, new ApplicationPackage(new byte[0]), version); // Ignored
+ context.runJob(DeploymentContext.productionApNortheast1, new ApplicationPackage(new byte[0]), version);
- // Single zone
- setQpsMetric(50.0, application.application().id().defaultInstance(), prod1, tester);
+ // One zone
+ context.runJob(DeploymentContext.productionApNortheast1, new ApplicationPackage(new byte[0]), version);
+ setQpsMetric(50.0, context.application().id().defaultInstance(), prod1, tester);
deploymentMetricsMaintainer.maintain();
assertEquals(1.0, updater.maintain(), 0.0000001);
- assertTrafficFraction(1.0, 1.0, application.instanceId(), prod1, tester);
+ assertTrafficFraction(1.0, 1.0, context.instanceId(), prod1, tester);
// Two zones
- application.runJob(DeploymentContext.productionUsEast3, new ApplicationPackage(new byte[0]), version);
- // - one cold
- setQpsMetric(50.0, application.application().id().defaultInstance(), prod1, tester);
- setQpsMetric(0.0, application.application().id().defaultInstance(), prod2, tester);
+ context.runJob(DeploymentContext.productionUsEast3, new ApplicationPackage(new byte[0]), version);
+ setQpsMetric(60.0, context.application().id().defaultInstance(), prod1, tester);
+ setQpsMetric(20.0, context.application().id().defaultInstance(), prod2, tester);
deploymentMetricsMaintainer.maintain();
assertEquals(1.0, updater.maintain(), 0.0000001);
- assertTrafficFraction(1.0, 1.0, application.instanceId(), prod1, tester);
- assertTrafficFraction(0.0, 1.0, application.instanceId(), prod2, tester);
- // - both hot
- setQpsMetric(53.0, application.application().id().defaultInstance(), prod1, tester);
- setQpsMetric(47.0, application.application().id().defaultInstance(), prod2, tester);
+ assertTrafficFraction(0.75, 1.0, context.instanceId(), prod1, tester);
+ assertTrafficFraction(0.25, 1.0, context.instanceId(), prod2, tester);
+
+ // Three zones
+ context.runJob(DeploymentContext.productionUsWest1, new ApplicationPackage(new byte[0]), version);
+ setQpsMetric(53.0, context.application().id().defaultInstance(), prod1, tester);
+ setQpsMetric(45.0, context.application().id().defaultInstance(), prod2, tester);
+ setQpsMetric(02.0, context.application().id().defaultInstance(), prod3, tester);
deploymentMetricsMaintainer.maintain();
assertEquals(1.0, updater.maintain(), 0.0000001);
- assertTrafficFraction(0.53, 1.0, application.instanceId(), prod1, tester);
- assertTrafficFraction(0.47, 1.0, application.instanceId(), prod2, tester);
+ assertTrafficFraction(0.53, 0.53 + (double)45/2 / 100, context.instanceId(), prod1, tester);
+ assertTrafficFraction(0.45, 0.45 + (double)53/2 / 100, context.instanceId(), prod2, tester);
+ assertTrafficFraction(0.02, 0.02 + (double)53/2 / 100, context.instanceId(), prod3, tester);
+ }
+
+ @Test
+ void testTrafficUpdaterHotCold() {
+ var spec = """
+ <deployment version="1.0">
+ <staging/>
+ <prod>
+ <region>ap-northeast-1</region>
+ <region>ap-southeast-1</region>
+ <region>us-east-3</region>
+ <region>us-central-1</region>
+ <region>eu-west-1</region>
+ </prod>
+ <bcp>
+ <group>
+ <region>ap-northeast-1</region>
+ <region>ap-southeast-1</region>
+ </group>
+ <group>
+ <region>us-east-3</region>
+ <region>us-central-1</region>
+ </group>
+ <group>
+ <region>eu-west-1</region>
+ </group>
+ </bcp>
+ </deployment>
+ """;
+
+ DeploymentTester tester = new DeploymentTester();
+ Version version = Version.fromString("7.1");
+ tester.controllerTester().upgradeSystem(Version.fromString("7.1"));
+ var context = tester.newDeploymentContext();
+ var deploymentSpec = new DeploymentSpecXmlReader(true).read(spec);
+ tester.controller().applications()
+ .lockApplicationOrThrow(context.application().id(),
+ locked -> tester.controller().applications().store(locked.with(deploymentSpec)));
+
+ var deploymentMetricsMaintainer = new DeploymentMetricsMaintainer(tester.controller(), Duration.ofDays(1));
+ var updater = new TrafficShareUpdater(tester.controller(), Duration.ofDays(1));
+
+ ZoneId ap1 = ZoneId.from("prod", "ap-northeast-1");
+ ZoneId ap2 = ZoneId.from("prod", "ap-southeast-1");
+ ZoneId us1 = ZoneId.from("prod", "us-east-3");
+ ZoneId us2 = ZoneId.from("prod", "us-central-1");
+ ZoneId eu1 = ZoneId.from("prod", "eu-west-1");
+
+ context.runJob(DeploymentContext.productionApNortheast1, new ApplicationPackage(new byte[0]), version);
+ context.runJob(DeploymentContext.productionApSoutheast1, new ApplicationPackage(new byte[0]), version);
+ context.runJob(DeploymentContext.productionUsEast3, new ApplicationPackage(new byte[0]), version);
+ context.runJob(DeploymentContext.productionUsCentral1, new ApplicationPackage(new byte[0]), version);
+ context.runJob(DeploymentContext.productionEuWest1, new ApplicationPackage(new byte[0]), version);
+
+ setQpsMetric(50.0, context.application().id().defaultInstance(), ap1, tester);
+ setQpsMetric(00.0, context.application().id().defaultInstance(), ap2, tester);
+ setQpsMetric(10.0, context.application().id().defaultInstance(), us1, tester);
+ setQpsMetric(00.0, context.application().id().defaultInstance(), us2, tester);
+ setQpsMetric(40.0, context.application().id().defaultInstance(), eu1, tester);
- // Three zones
- application.runJob(DeploymentContext.productionUsWest1, new ApplicationPackage(new byte[0]), version);
- // - one cold
- setQpsMetric(53.0, application.application().id().defaultInstance(), prod1, tester);
- setQpsMetric(47.0, application.application().id().defaultInstance(), prod2, tester);
- setQpsMetric(0.0, application.application().id().defaultInstance(), prod3, tester);
deploymentMetricsMaintainer.maintain();
assertEquals(1.0, updater.maintain(), 0.0000001);
- assertTrafficFraction(0.53, 0.53, application.instanceId(), prod1, tester);
- assertTrafficFraction(0.47, 0.50, application.instanceId(), prod2, tester);
- assertTrafficFraction(0.00, 0.50, application.instanceId(), prod3, tester);
- // - all hot
- setQpsMetric(50.0, application.application().id().defaultInstance(), prod1, tester);
- setQpsMetric(25.0, application.application().id().defaultInstance(), prod2, tester);
- setQpsMetric(25.0, application.application().id().defaultInstance(), prod3, tester);
+ assertTrafficFraction(0.5, 0.5, context.instanceId(), ap1, tester);
+ assertTrafficFraction(0.0, 0.5, context.instanceId(), ap2, tester);
+ assertTrafficFraction(0.1, 0.1, context.instanceId(), us1, tester);
+ assertTrafficFraction(0.0, 0.1, context.instanceId(), us2, tester);
+ assertTrafficFraction(0.4, 0.4, context.instanceId(), eu1, tester);
+ }
+
+ @Test
+ void testTrafficUpdaterOverlappingGroups() {
+ var spec = """
+ <deployment version="1.0">
+ <staging/>
+ <prod>
+ <region>ap-northeast-1</region>
+ <region>ap-southeast-1</region>
+ <region>us-east-3</region>
+ <region>us-central-1</region>
+ <region>us-west-1</region>
+ <region>eu-west-1</region>
+ </prod>
+ <bcp>
+ <group>
+ <region>ap-northeast-1</region>
+ <region>ap-southeast-1</region>
+ <region fraction="0.5">eu-west-1</region>
+ </group>
+ <group>
+ <region>us-east-3</region>
+ <region>us-central-1</region>
+ <region>us-west-1</region>
+ <region fraction="0.5">eu-west-1</region>
+ </group>
+ </bcp>
+ </deployment>
+ """;
+
+ DeploymentTester tester = new DeploymentTester();
+ Version version = Version.fromString("7.1");
+ tester.controllerTester().upgradeSystem(Version.fromString("7.1"));
+ var context = tester.newDeploymentContext();
+ var deploymentSpec = new DeploymentSpecXmlReader(true).read(spec);
+ tester.controller().applications()
+ .lockApplicationOrThrow(context.application().id(),
+ locked -> tester.controller().applications().store(locked.with(deploymentSpec)));
+
+ var deploymentMetricsMaintainer = new DeploymentMetricsMaintainer(tester.controller(), Duration.ofDays(1));
+ var updater = new TrafficShareUpdater(tester.controller(), Duration.ofDays(1));
+
+ ZoneId ap1 = ZoneId.from("prod", "ap-northeast-1");
+ ZoneId ap2 = ZoneId.from("prod", "ap-southeast-1");
+ ZoneId us1 = ZoneId.from("prod", "us-east-3");
+ ZoneId us2 = ZoneId.from("prod", "us-central-1");
+ ZoneId us3 = ZoneId.from("prod", "us-west-1");
+ ZoneId eu1 = ZoneId.from("prod", "eu-west-1");
+
+ context.runJob(DeploymentContext.productionApNortheast1, new ApplicationPackage(new byte[0]), version);
+ context.runJob(DeploymentContext.productionApSoutheast1, new ApplicationPackage(new byte[0]), version);
+ context.runJob(DeploymentContext.productionUsEast3, new ApplicationPackage(new byte[0]), version);
+ context.runJob(DeploymentContext.productionUsCentral1, new ApplicationPackage(new byte[0]), version);
+ context.runJob(DeploymentContext.productionUsWest1, new ApplicationPackage(new byte[0]), version);
+ context.runJob(DeploymentContext.productionEuWest1, new ApplicationPackage(new byte[0]), version);
+
+ setQpsMetric(20.0, context.application().id().defaultInstance(), ap1, tester);
+ setQpsMetric(50.0, context.application().id().defaultInstance(), ap2, tester);
+ setQpsMetric(00.0, context.application().id().defaultInstance(), us1, tester);
+ setQpsMetric(30.0, context.application().id().defaultInstance(), us2, tester);
+ setQpsMetric(40.0, context.application().id().defaultInstance(), us3, tester);
+ setQpsMetric(60.0, context.application().id().defaultInstance(), eu1, tester);
+
deploymentMetricsMaintainer.maintain();
assertEquals(1.0, updater.maintain(), 0.0000001);
- assertTrafficFraction(0.50, 0.5, application.instanceId(), prod1, tester);
- assertTrafficFraction(0.25, 0.5, application.instanceId(), prod2, tester);
- assertTrafficFraction(0.25, 0.5, application.instanceId(), prod3, tester);
+ assertTrafficFraction(0.10, 0.10 + 50 / 200.0 / 1.5, context.instanceId(), ap1, tester);
+ assertTrafficFraction(0.25, 0.25 + 30 / 200.0 / 1.5, context.instanceId(), ap2, tester);
+ assertTrafficFraction(0.00, 0.00 + 40 / 200.0 / 2.5, context.instanceId(), us1, tester);
+ assertTrafficFraction(0.15, 0.15 + 40 / 200.0 / 2.5, context.instanceId(), us2, tester);
+ assertTrafficFraction(0.20, 0.20 + 30 / 200.0 / 2.5, context.instanceId(), us3, tester);
+ assertTrafficFraction(0.30, 0.30 + 0.5 * 50 / 200.0 / 1.5 + 0.5 * 40 / 200.0 / 2.5, context.instanceId(), eu1, tester);
}
private void setQpsMetric(double qps, ApplicationId application, ZoneId zone, DeploymentTester tester) {
@@ -90,8 +216,8 @@ public class TrafficShareUpdaterTest {
private void assertTrafficFraction(double currentReadShare, double maxReadShare,
ApplicationId application, ZoneId zone, DeploymentTester tester) {
NodeRepositoryMock mock = (NodeRepositoryMock)tester.controller().serviceRegistry().configServer().nodeRepository();
- assertEquals(currentReadShare, mock.getTrafficFraction(application, zone).getFirst(), 0.00001);
- assertEquals(maxReadShare, mock.getTrafficFraction(application, zone).getSecond(), 0.00001);
+ assertEquals(currentReadShare, mock.getTrafficFraction(application, zone).getFirst(), 0.00001, "Current read share");
+ assertEquals(maxReadShare, mock.getTrafficFraction(application, zone).getSecond(), 0.00001, "Max read share");
}
}
diff --git a/indexinglanguage/src/main/java/com/yahoo/vespa/indexinglanguage/expressions/EmbedExpression.java b/indexinglanguage/src/main/java/com/yahoo/vespa/indexinglanguage/expressions/EmbedExpression.java
index 328cd00742f..9f2260e5b94 100644
--- a/indexinglanguage/src/main/java/com/yahoo/vespa/indexinglanguage/expressions/EmbedExpression.java
+++ b/indexinglanguage/src/main/java/com/yahoo/vespa/indexinglanguage/expressions/EmbedExpression.java
@@ -66,6 +66,7 @@ public class EmbedExpression extends Expression {
@Override
protected void doExecute(ExecutionContext context) {
+ if (context.getValue() == null) return;
Tensor output;
if (context.getValue().getDataType() == DataType.STRING) {
output = embedSingleValue(context);
diff --git a/indexinglanguage/src/test/java/com/yahoo/vespa/indexinglanguage/ScriptTestCase.java b/indexinglanguage/src/test/java/com/yahoo/vespa/indexinglanguage/ScriptTestCase.java
index c446c04065a..ce42f4e727f 100644
--- a/indexinglanguage/src/test/java/com/yahoo/vespa/indexinglanguage/ScriptTestCase.java
+++ b/indexinglanguage/src/test/java/com/yahoo/vespa/indexinglanguage/ScriptTestCase.java
@@ -189,6 +189,8 @@ public class ScriptTestCase {
"input text", "[105, 110, 112, 117]");
testEmbedStatement("input myText | embed 'emb1' | attribute 'myTensor'", embedder,
"input text", "[105, 110, 112, 117]");
+ testEmbedStatement("input myText | embed 'emb1' | attribute 'myTensor'", embedder,
+ null, null);
Map<String, Embedder> embedders = Map.of(
"emb1", new MockEmbedder("myDocument.myTensor"),
@@ -200,9 +202,9 @@ public class ScriptTestCase {
"my input", "[110.0, 122.0, 33.0, 106.0]");
assertThrows(() -> testEmbedStatement("input myText | embed | attribute 'myTensor'", embedders, "input text", "[105, 110, 112, 117]"),
- "Multiple embedders are provided but no embedder id is given. Valid embedders are emb1,emb2");
+ "Multiple embedders are provided but no embedder id is given. Valid embedders are emb1,emb2");
assertThrows(() -> testEmbedStatement("input myText | embed emb3 | attribute 'myTensor'", embedders, "input text", "[105, 110, 112, 117]"),
- "Can't find embedder 'emb3'. Valid embedders are emb1,emb2");
+ "Can't find embedder 'emb3'. Valid embedders are emb1,emb2");
}
private void testEmbedStatement(String expressionString, Map<String, Embedder> embedders, String input, String expected) {
@@ -224,9 +226,14 @@ public class ScriptTestCase {
ExecutionContext context = new ExecutionContext(adapter);
expression.execute(context);
- assertTrue(adapter.values.containsKey("myTensor"));
- assertEquals(Tensor.from(tensorType, expected),
- ((TensorFieldValue) adapter.values.get("myTensor")).getTensor().get());
+ if (input == null) {
+ assertFalse(adapter.values.containsKey("myTensor"));
+ }
+ else {
+ assertTrue(adapter.values.containsKey("myTensor"));
+ assertEquals(Tensor.from(tensorType, expected),
+ ((TensorFieldValue) adapter.values.get("myTensor")).getTensor().get());
+ }
}
catch (ParseException e) {
throw new IllegalArgumentException(e);
diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/restapi/LoadBalancersV1ApiHandler.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/restapi/LoadBalancersV1ApiHandler.java
index 1ba686772c7..a23b1273fe3 100644
--- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/restapi/LoadBalancersV1ApiHandler.java
+++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/restapi/LoadBalancersV1ApiHandler.java
@@ -1,37 +1,86 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.hosted.provision.restapi;
+import com.yahoo.container.jdisc.HttpRequest;
import com.yahoo.container.jdisc.HttpResponse;
import com.yahoo.container.jdisc.ThreadedHttpRequestHandler;
-import com.yahoo.restapi.RestApi;
-import com.yahoo.restapi.RestApiRequestHandler;
+import com.yahoo.restapi.ErrorResponse;
+import com.yahoo.restapi.MessageResponse;
+import com.yahoo.restapi.Path;
import com.yahoo.vespa.hosted.provision.NodeRepository;
+import com.yahoo.vespa.hosted.provision.lb.LoadBalancer;
+import com.yahoo.vespa.hosted.provision.lb.LoadBalancerId;
+import com.yahoo.yolean.Exceptions;
import javax.inject.Inject;
+import java.time.Duration;
+import java.util.Objects;
+import java.util.logging.Level;
/**
* @author mpolden
* @author jonmv
*/
-public class LoadBalancersV1ApiHandler extends RestApiRequestHandler<LoadBalancersV1ApiHandler> {
+public class LoadBalancersV1ApiHandler extends ThreadedHttpRequestHandler {
private final NodeRepository nodeRepository;
@Inject
public LoadBalancersV1ApiHandler(ThreadedHttpRequestHandler.Context parentCtx, NodeRepository nodeRepository) {
- super(parentCtx, LoadBalancersV1ApiHandler::createRestApiDefinition);
- this.nodeRepository = nodeRepository;
+ super(parentCtx);
+ this.nodeRepository = Objects.requireNonNull(nodeRepository);
}
- private static RestApi createRestApiDefinition(LoadBalancersV1ApiHandler self) {
- return RestApi.builder()
- .addRoute(RestApi.route("/loadbalancers/v1")
- .get(self::getLoadBalancers))
- .build();
+ @Override
+ public HttpResponse handle(HttpRequest request) {
+ try {
+ return switch (request.getMethod()) {
+ case GET -> get(request);
+ case PUT -> put(request);
+ default -> ErrorResponse.methodNotAllowed("Method '" + request.getMethod() + "' is not supported");
+ };
+ } catch (NotFoundException e) {
+ return ErrorResponse.notFoundError(Exceptions.toMessageString(e));
+ } catch (IllegalArgumentException e) {
+ return ErrorResponse.badRequest(Exceptions.toMessageString(e));
+ } catch (RuntimeException e) {
+ log.log(Level.WARNING, "Unexpected error handling '" + request.getUri() + "'", e);
+ return ErrorResponse.internalServerError(Exceptions.toMessageString(e));
+ }
}
- private HttpResponse getLoadBalancers(RestApi.RequestContext context) {
- return new LoadBalancersResponse(context.request(), nodeRepository);
+ private HttpResponse get(HttpRequest request) {
+ Path path = new Path(request.getUri());
+ if (path.matches("/loadbalancers/v1")) return new LoadBalancersResponse(request, nodeRepository);
+ throw new NotFoundException("Nothing at " + path);
+ }
+
+ private HttpResponse put(HttpRequest request) {
+ Path path = new Path(request.getUri());
+ if (path.matches("/loadbalancers/v1/state/{state}/{id}")) return setState(path.get("state"), path.get("id"));
+ throw new NotFoundException("Nothing at " + path);
+ }
+
+ private HttpResponse setState(String state, String id) {
+ LoadBalancer.State toState = stateFrom(state);
+ LoadBalancerId loadBalancerId = LoadBalancerId.fromSerializedForm(id);
+ try (var lock = nodeRepository.database().lock(loadBalancerId.application(), Duration.ofSeconds(1))) {
+ LoadBalancer loadBalancer = nodeRepository.database().readLoadBalancer(loadBalancerId)
+ .orElseThrow(() -> new NotFoundException(loadBalancerId + " does not exist"));
+ nodeRepository.database().writeLoadBalancer(loadBalancer.with(toState, nodeRepository.clock().instant()),
+ loadBalancer.state());
+ }
+ return new MessageResponse("Moved " + loadBalancerId + " to " + toState);
+ }
+
+ private LoadBalancer.State stateFrom(String state) {
+ return switch (state) {
+ case "reserved" -> LoadBalancer.State.reserved;
+ case "inactive" -> LoadBalancer.State.inactive;
+ case "active" -> LoadBalancer.State.active;
+ case "removable" -> LoadBalancer.State.removable;
+ default -> throw new IllegalArgumentException("Invalid state '" + state + "'");
+ };
}
}
diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/restapi/LoadBalancersV1ApiTest.java b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/restapi/LoadBalancersV1ApiTest.java
index 3c20f6ddb09..240d0daf96f 100644
--- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/restapi/LoadBalancersV1ApiTest.java
+++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/restapi/LoadBalancersV1ApiTest.java
@@ -7,6 +7,9 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+/**
+ * @author mpolden
+ */
public class LoadBalancersV1ApiTest {
private RestApiTester tester;
@@ -22,11 +25,19 @@ public class LoadBalancersV1ApiTest {
}
@Test
- public void test_load_balancers() throws Exception {
+ public void load_balancers() throws Exception {
tester.assertFile(new Request("http://localhost:8080/loadbalancers/v1"), "load-balancers.json");
tester.assertFile(new Request("http://localhost:8080/loadbalancers/v1/"), "load-balancers.json");
tester.assertFile(new Request("http://localhost:8080/loadbalancers/v1/?application=tenant4.application4.instance4"), "load-balancers-single.json");
tester.assertResponse(new Request("http://localhost:8080/loadbalancers/v1/?application=tenant.nonexistent.default"), "{\"loadBalancers\":[]}");
}
+ @Test
+ public void set_state() throws Exception {
+ tester.assertResponse(new Request("http://localhost:8080/loadbalancers/v1/state/removable/tenant42:application42:instance42:id42", "", Request.Method.PUT),
+ 404, "{\"error-code\":\"NOT_FOUND\",\"message\":\"load balancer tenant42:application42:instance42:id42 does not exist\"}");
+ tester.assertResponse(new Request("http://localhost:8080/loadbalancers/v1/state/removable/tenant4:application4:instance4:id4", "", Request.Method.PUT),
+ "{\"message\":\"Moved load balancer tenant4:application4:instance4:id4 to removable\"}");
+ }
+
}
diff --git a/searchcore/src/vespa/searchcore/proton/matching/match_tools.cpp b/searchcore/src/vespa/searchcore/proton/matching/match_tools.cpp
index 980c3345527..834e256c18f 100644
--- a/searchcore/src/vespa/searchcore/proton/matching/match_tools.cpp
+++ b/searchcore/src/vespa/searchcore/proton/matching/match_tools.cpp
@@ -231,7 +231,7 @@ MatchToolsFactory(QueryLimiter & queryLimiter,
_match_limiter = std::make_unique<NoMatchPhaseLimiter>();
}
trace.addEvent(4, "Complete query setup");
- if (root_trace.shouldTrace(4)) {
+ if (trace.hasTrace()) {
vespalib::slime::ObjectInserter inserter(root_trace.createCursor("query_setup"), "traces");
vespalib::slime::inject(trace.getTraces(), inserter);
}
diff --git a/searchcore/src/vespa/searchcore/proton/matching/query.cpp b/searchcore/src/vespa/searchcore/proton/matching/query.cpp
index 9217b985ba5..b7e3f32a6f6 100644
--- a/searchcore/src/vespa/searchcore/proton/matching/query.cpp
+++ b/searchcore/src/vespa/searchcore/proton/matching/query.cpp
@@ -292,7 +292,7 @@ Query::handle_global_filter(Blueprint& blueprint, uint32_t docid_limit,
trace->addEvent(5, vespalib::make_string("Calculate global filter (estimated_hit_ratio (%f) <= upper_limit (%f))",
estimated_hit_ratio, global_filter_upper_limit));
}
- global_filter = GlobalFilter::create(blueprint, docid_limit, thread_bundle);
+ global_filter = GlobalFilter::create(blueprint, docid_limit, thread_bundle, trace);
if (!global_filter->is_active() && trace && trace->shouldTrace(5)) {
trace->addEvent(5, "Global filter matches everything");
}
diff --git a/searchlib/src/vespa/searchlib/queryeval/global_filter.cpp b/searchlib/src/vespa/searchlib/queryeval/global_filter.cpp
index 2d88883780a..f4ee9bf9bc2 100644
--- a/searchlib/src/vespa/searchlib/queryeval/global_filter.cpp
+++ b/searchlib/src/vespa/searchlib/queryeval/global_filter.cpp
@@ -2,11 +2,17 @@
#include "global_filter.h"
#include "blueprint.h"
+#include "profiled_iterator.h"
#include <vespa/vespalib/util/require.h>
#include <vespa/vespalib/util/thread_bundle.h>
+#include <vespa/vespalib/util/execution_profiler.h>
#include <vespa/searchlib/common/bitvector.h>
+#include <vespa/searchlib/engine/trace.h>
+#include <vespa/vespalib/data/slime/slime.h>
#include <cassert>
+using search::engine::Trace;
+using vespalib::ExecutionProfiler;
using vespalib::Runnable;
using vespalib::ThreadBundle;
using vespalib::Trinary;
@@ -80,12 +86,15 @@ struct PartResult {
: matches_any(Trinary::Undefined), bits(std::move(bits_in)) {}
};
-PartResult make_part(Blueprint &blueprint, uint32_t begin, uint32_t end) {
+PartResult make_part(Blueprint &blueprint, uint32_t begin, uint32_t end, ExecutionProfiler *profiler) {
bool strict = true;
auto constraint = Blueprint::FilterConstraint::UPPER_BOUND;
auto filter = blueprint.createFilterSearch(strict, constraint);
auto matches_any = filter->matches_any();
if (matches_any == Trinary::Undefined) {
+ if (profiler) {
+ filter = ProfiledIterator::profile(*profiler, std::move(filter));
+ }
filter->initRange(begin, end);
auto bits = filter->get_hits(begin);
// count bits in parallel and cache the results for later
@@ -101,10 +110,31 @@ struct MakePart : Runnable {
uint32_t begin;
uint32_t end;
PartResult result;
- MakePart(Blueprint &blueprint_in, uint32_t begin_in, uint32_t end_in) noexcept
- : blueprint(blueprint_in), begin(begin_in), end(end_in), result() {}
- void run() override { result = make_part(blueprint, begin, end); }
+ std::unique_ptr<ExecutionProfiler> profiler;
+ MakePart(MakePart &&) = default;
+ MakePart(Blueprint &blueprint_in, uint32_t begin_in, uint32_t end_in, int32_t profile_depth)
+ : blueprint(blueprint_in), begin(begin_in), end(end_in), result(), profiler()
+ {
+ if (profile_depth != 0) {
+ profiler = std::make_unique<ExecutionProfiler>(profile_depth);
+ }
+ }
+ void run() override { result = make_part(blueprint, begin, end, profiler.get()); }
+ ~MakePart();
};
+MakePart::~MakePart() = default;
+
+void maybe_insert_profiler_results(Trace *trace, int32_t profile_depth, const std::vector<MakePart> &parts) {
+ if (trace && profile_depth != 0) {
+ auto &obj = trace->createCursor("global_filter_execution");
+ auto &arr = obj.setArray("threads");
+ for (auto &&part: parts) {
+ auto &dst = arr.addObject().setArray("traces").addObject();
+ dst.setString("tag", "global_filter_profiling");
+ part.profiler->report(dst);
+ }
+ }
+}
}
@@ -159,8 +189,9 @@ GlobalFilter::create(std::vector<std::unique_ptr<BitVector>> vectors)
}
std::shared_ptr<GlobalFilter>
-GlobalFilter::create(Blueprint &blueprint, uint32_t docid_limit, ThreadBundle &thread_bundle)
+GlobalFilter::create(Blueprint &blueprint, uint32_t docid_limit, ThreadBundle &thread_bundle, Trace *trace)
{
+ int32_t profile_depth = (trace && trace->getLevel() > 0) ? trace->match_profile_depth() : 0;
uint32_t num_threads = thread_bundle.size();
std::vector<MakePart> parts;
parts.reserve(num_threads);
@@ -169,12 +200,13 @@ GlobalFilter::create(Blueprint &blueprint, uint32_t docid_limit, ThreadBundle &t
uint32_t rest_docs = (docid_limit - docid) % num_threads;
while (docid < docid_limit) {
uint32_t part_size = per_thread + (parts.size() < rest_docs);
- parts.emplace_back(blueprint, docid, docid + part_size);
+ parts.emplace_back(blueprint, docid, docid + part_size, profile_depth);
docid += part_size;
}
assert(parts.size() <= num_threads);
assert((docid == docid_limit) || parts.empty());
thread_bundle.run(parts);
+ maybe_insert_profiler_results(trace, profile_depth, parts);
std::vector<std::unique_ptr<BitVector>> vectors;
vectors.reserve(parts.size());
for (MakePart &part: parts) {
diff --git a/searchlib/src/vespa/searchlib/queryeval/global_filter.h b/searchlib/src/vespa/searchlib/queryeval/global_filter.h
index 66f85299dd1..c2b7b6617fc 100644
--- a/searchlib/src/vespa/searchlib/queryeval/global_filter.h
+++ b/searchlib/src/vespa/searchlib/queryeval/global_filter.h
@@ -8,6 +8,8 @@
namespace vespalib { struct ThreadBundle; }
namespace search { class BitVector; }
+namespace search::engine { class Trace; }
+
namespace search::queryeval {
class Blueprint;
@@ -22,6 +24,7 @@ class Blueprint;
class GlobalFilter : public std::enable_shared_from_this<GlobalFilter>
{
public:
+ using Trace = search::engine::Trace;
GlobalFilter() noexcept;
GlobalFilter(const GlobalFilter &) = delete;
GlobalFilter(GlobalFilter &&) = delete;
@@ -39,7 +42,10 @@ public:
static std::shared_ptr<GlobalFilter> create(std::vector<uint32_t> docids, uint32_t size);
static std::shared_ptr<GlobalFilter> create(std::unique_ptr<BitVector> vector);
static std::shared_ptr<GlobalFilter> create(std::vector<std::unique_ptr<BitVector>> vectors);
- static std::shared_ptr<GlobalFilter> create(Blueprint &blueprint, uint32_t docid_limit, vespalib::ThreadBundle &thread_bundle);
+ static std::shared_ptr<GlobalFilter> create(Blueprint &blueprint, uint32_t docid_limit, vespalib::ThreadBundle &thread_bundle, Trace *trace);
+ static std::shared_ptr<GlobalFilter> create(Blueprint &blueprint, uint32_t docid_limit, vespalib::ThreadBundle &thread_bundle) {
+ return create(blueprint, docid_limit, thread_bundle, nullptr);
+ }
};
} // namespace
diff --git a/storage/src/tests/distributor/pendingmessagetrackertest.cpp b/storage/src/tests/distributor/pendingmessagetrackertest.cpp
index 0ba374f7190..3bfa1027a82 100644
--- a/storage/src/tests/distributor/pendingmessagetrackertest.cpp
+++ b/storage/src/tests/distributor/pendingmessagetrackertest.cpp
@@ -175,7 +175,7 @@ TEST_F(PendingMessageTrackerTest, simple) {
EXPECT_THAT(ost.str(), HasSubstr(
"<b>Bucket(BucketSpace(0x0000000000000001), BucketId(0x40000000000004d2))</b>\n"
"<ul>\n"
- "<li><i>Node 0</i>: <b>1970-01-01 00:00:01</b> "
+ "<li><i>Node 0</i>: <b>1970-01-01 00:00:01.000 UTC</b> "
"Remove(BucketId(0x40000000000004d2), priority=127)</li>\n"
"</ul>\n"));
}
@@ -248,17 +248,17 @@ TEST_F(PendingMessageTrackerTest, multiple_messages) {
EXPECT_THAT(ost.str(), HasSubstr(
"<b>Bucket(BucketSpace(0x0000000000000001), BucketId(0x40000000000004d2))</b>\n"
"<ul>\n"
- "<li><i>Node 0</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n"
- "<li><i>Node 0</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n"
- "<li><i>Node 1</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n"
- "<li><i>Node 1</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n"
+ "<li><i>Node 0</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n"
+ "<li><i>Node 0</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n"
+ "<li><i>Node 1</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n"
+ "<li><i>Node 1</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n"
"</ul>\n"
"<b>Bucket(BucketSpace(0x0000000000000001), BucketId(0x40000000000011d7))</b>\n"
"<ul>\n"
- "<li><i>Node 0</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n"
- "<li><i>Node 0</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n"
- "<li><i>Node 1</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n"
- "<li><i>Node 1</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n"
+ "<li><i>Node 0</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n"
+ "<li><i>Node 0</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n"
+ "<li><i>Node 1</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n"
+ "<li><i>Node 1</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n"
"</ul>\n"));
}
{
@@ -268,44 +268,23 @@ TEST_F(PendingMessageTrackerTest, multiple_messages) {
EXPECT_THAT(ost.str(), HasSubstr(
"<b>Node 0 (pending count: 4)</b>\n"
"<ul>\n"
- "<li><i>Node 0</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n"
- "<li><i>Node 0</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n"
- "<li><i>Node 0</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n"
- "<li><i>Node 0</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n"
+ "<li><i>Node 0</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n"
+ "<li><i>Node 0</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n"
+ "<li><i>Node 0</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n"
+ "<li><i>Node 0</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n"
"</ul>\n"
"<b>Node 1 (pending count: 4)</b>\n"
"<ul>\n"
- "<li><i>Node 1</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n"
- "<li><i>Node 1</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n"
- "<li><i>Node 1</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n"
- "<li><i>Node 1</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n"
+ "<li><i>Node 1</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n"
+ "<li><i>Node 1</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n"
+ "<li><i>Node 1</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n"
+ "<li><i>Node 1</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n"
"</ul>\n"));
}
}
namespace {
-template <typename T>
-std::string setToString(const std::set<T>& s)
-{
- std::ostringstream ost;
- ost << '{';
- for (typename std::set<T>::const_iterator i(s.begin()), e(s.end());
- i != e; ++i)
- {
- if (i != s.begin()) {
- ost << ',';
- }
- ost << *i;
- }
- ost << '}';
- return ost.str();
-}
-
-}
-
-namespace {
-
class TestChecker : public PendingMessageTracker::Checker
{
public:
@@ -443,7 +422,7 @@ TEST_F(PendingMessageTrackerTest, busy_reply_marks_node_as_busy) {
TEST_F(PendingMessageTrackerTest, busy_node_duration_can_be_adjusted) {
Fixture f;
auto cmd = f.sendPut(RequestBuilder().toNode(0));
- f.tracker().setNodeBusyDuration(std::chrono::seconds(10));
+ f.tracker().setNodeBusyDuration(10s);
f.sendPutReply(*cmd, RequestBuilder(), api::ReturnCode(api::ReturnCode::BUSY));
EXPECT_TRUE(f.tracker().getNodeInfo().isBusy(0));
f.clock().addSecondsToTime(11);
diff --git a/storage/src/tests/visiting/visitormanagertest.cpp b/storage/src/tests/visiting/visitormanagertest.cpp
index be4e7270c69..a82514acb03 100644
--- a/storage/src/tests/visiting/visitormanagertest.cpp
+++ b/storage/src/tests/visiting/visitormanagertest.cpp
@@ -217,7 +217,7 @@ VisitorManagerTest::getSession(uint32_t n)
// Wait until we have started the visitor
const std::vector<TestVisitorMessageSession*>& sessions(_messageSessionFactory->_visitorSessions);
framework::defaultimplementation::RealClock clock;
- framework::MilliSecTime endTime(clock.getTimeInMillis() + framework::MilliSecTime(30 * 1000));
+ vespalib::steady_time endTime = clock.getMonotonicTime() + 30s;
while (true) {
{
std::lock_guard lock(_messageSessionFactory->_accessLock);
@@ -225,9 +225,8 @@ VisitorManagerTest::getSession(uint32_t n)
return *sessions[n];
}
}
- if (clock.getTimeInMillis() > endTime) {
- throw vespalib::IllegalStateException(
- "Timed out waiting for visitor session", VESPA_STRLOC);
+ if (clock.getMonotonicTime() > endTime) {
+ throw vespalib::IllegalStateException("Timed out waiting for visitor session", VESPA_STRLOC);
}
std::this_thread::sleep_for(10ms);
}
@@ -255,12 +254,10 @@ VisitorManagerTest::getMessagesAndReply(
switch (session.sentMessages[i]->getType()) {
case documentapi::DocumentProtocol::MESSAGE_PUTDOCUMENT:
- docs.push_back(static_cast<documentapi::PutDocumentMessage&>(
- *session.sentMessages[i]).getDocumentSP());
+ docs.push_back(static_cast<documentapi::PutDocumentMessage&>(*session.sentMessages[i]).getDocumentSP());
break;
case documentapi::DocumentProtocol::MESSAGE_REMOVEDOCUMENT:
- docIds.push_back(static_cast<documentapi::RemoveDocumentMessage&>(
- *session.sentMessages[i]).getDocumentId());
+ docIds.push_back(static_cast<documentapi::RemoveDocumentMessage&>(*session.sentMessages[i]).getDocumentId());
break;
default:
break;
@@ -355,10 +352,7 @@ TEST_F(VisitorManagerTest, normal_usage) {
getMessagesAndReply(1, getSession(0), docs, docIds);
// All data has been replied to, expecting to get a create visitor reply
- ASSERT_NO_FATAL_FAILURE(
- verifyCreateVisitorReply(api::ReturnCode::OK,
- int(docs.size()),
- getTotalSerializedSize(docs)));
+ ASSERT_NO_FATAL_FAILURE(verifyCreateVisitorReply(api::ReturnCode::OK, int(docs.size()), getTotalSerializedSize(docs)));
EXPECT_EQ(1u, getMatchingDocuments(docs));
EXPECT_FALSE(_manager->hasPendingMessageState());
diff --git a/storage/src/tests/visiting/visitortest.cpp b/storage/src/tests/visiting/visitortest.cpp
index f3a538b7832..565131b3b99 100644
--- a/storage/src/tests/visiting/visitortest.cpp
+++ b/storage/src/tests/visiting/visitortest.cpp
@@ -256,11 +256,9 @@ TestVisitorMessageSession&
VisitorTest::getSession(uint32_t n)
{
// Wait until we have started the visitor
- const std::vector<TestVisitorMessageSession*>& sessions(
- _messageSessionFactory->_visitorSessions);
+ const std::vector<TestVisitorMessageSession*>& sessions(_messageSessionFactory->_visitorSessions);
framework::defaultimplementation::RealClock clock;
- framework::MilliSecTime endTime(
- clock.getTimeInMillis() + framework::MilliSecTime(30 * 1000));
+ vespalib::steady_time endTime = clock.getMonotonicTime() + 30s;
while (true) {
{
std::lock_guard lock(_messageSessionFactory->_accessLock);
@@ -268,7 +266,7 @@ VisitorTest::getSession(uint32_t n)
return *sessions[n];
}
}
- if (clock.getTimeInMillis() > endTime) {
+ if (clock.getMonotonicTime() > endTime) {
throw vespalib::IllegalStateException(
"Timed out waiting for visitor session", VESPA_STRLOC);
}
diff --git a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp
index e68cbd75d52..2f1622750d7 100644
--- a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp
+++ b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp
@@ -182,7 +182,7 @@ struct MetricsUpdater {
void add(const MetricsUpdater& rhs) noexcept {
auto& d = count;
- auto& s = rhs.count;
+ const auto& s = rhs.count;
d.buckets += s.buckets;
d.docs += s.docs;
d.bytes += s.bytes;
@@ -209,7 +209,7 @@ BucketManager::updateMetrics(bool updateDocCount)
if (!updateDocCount || _doneInitialized) {
MetricsUpdater total;
- for (auto& space : _component.getBucketSpaceRepo()) {
+ for (const auto& space : _component.getBucketSpaceRepo()) {
MetricsUpdater m;
auto guard = space.second->bucketDatabase().acquire_read_guard();
guard->for_each(std::ref(m));
@@ -238,7 +238,7 @@ BucketManager::updateMetrics(bool updateDocCount)
}
void BucketManager::update_bucket_db_memory_usage_metrics() {
- for (auto& space : _component.getBucketSpaceRepo()) {
+ for (const auto& space : _component.getBucketSpaceRepo()) {
auto bm = _metrics->bucket_spaces.find(space.first);
bm->second->bucket_db_metrics.memory_usage.update(space.second->bucketDatabase().detailed_memory_usage());
}
@@ -342,7 +342,7 @@ BucketManager::reportStatus(std::ostream& out,
using vespalib::xml::XmlAttribute;
xmlReporter << vespalib::xml::XmlTag("buckets");
- for (auto& space : _component.getBucketSpaceRepo()) {
+ for (const auto& space : _component.getBucketSpaceRepo()) {
xmlReporter << XmlTag("bucket-space")
<< XmlAttribute("name", document::FixedBucketSpaces::to_string(space.first));
BucketDBDumper dumper(xmlReporter.getStream());
@@ -404,7 +404,7 @@ bool BucketManager::onRequestBucketInfo(
api::RequestBucketInfoReply::EntryVector info;
if (!cmd->getBuckets().empty()) {
for (auto bucketId : cmd->getBuckets()) {
- for (auto & entry : _component.getBucketDatabase(bucketSpace).getAll(bucketId, "BucketManager::onRequestBucketInfo")) {
+ for (const auto & entry : _component.getBucketDatabase(bucketSpace).getAll(bucketId, "BucketManager::onRequestBucketInfo")) {
info.emplace_back(entry.first, entry.second->getBucketInfo());
}
}
@@ -457,7 +457,7 @@ BucketManager::leaveQueueProtectedSection(ScopedQueueDispatchGuard& queueGuard)
// may alter the relevant state.
--_requestsCurrentlyProcessing;
if (_requestsCurrentlyProcessing == 0) {
- for (auto& qr : _queuedReplies) {
+ for (const auto& qr : _queuedReplies) {
sendUp(qr);
}
_queuedReplies.clear();
@@ -494,7 +494,7 @@ BucketManager::processRequestBucketInfoCommands(document::BucketSpace bucketSpac
reqs.size(), bucketSpace.toString().c_str(), clusterState->toString().c_str(), our_hash.c_str());
std::lock_guard clusterStateGuard(_clusterStateLock);
- for (auto & req : std::ranges::reverse_view(reqs)) {
+ for (const auto & req : std::ranges::reverse_view(reqs)) {
// Currently small requests should not be forwarded to worker thread
assert(req->hasSystemState());
const auto their_hash = req->getDistributionHash();
@@ -547,7 +547,7 @@ BucketManager::processRequestBucketInfoCommands(document::BucketSpace bucketSpac
std::ostringstream distrList;
std::unordered_map<uint16_t, api::RequestBucketInfoReply::EntryVector> result;
- for (auto& nodeAndCmd : requests) {
+ for (const auto& nodeAndCmd : requests) {
result[nodeAndCmd.first];
if (LOG_WOULD_LOG(debug)) {
distrList << ' ' << nodeAndCmd.first;
@@ -576,7 +576,7 @@ BucketManager::processRequestBucketInfoCommands(document::BucketSpace bucketSpac
"BucketManager::processRequestBucketInfoCommands-2");
}
_metrics->fullBucketInfoLatency.addValue(runStartTime.getElapsedTimeAsDouble());
- for (auto& nodeAndCmd : requests) {
+ for (const auto& nodeAndCmd : requests) {
auto reply(std::make_shared<api::RequestBucketInfoReply>(*nodeAndCmd.second));
reply->getBucketInfo().swap(result[nodeAndCmd.first]);
sendUp(reply);
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp
index 667afbf67a0..393136de654 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp
@@ -15,6 +15,7 @@ LOG_SETUP(".distributor.operation.idealstate.merge");
using vespalib::to_utc;
using vespalib::to_string;
+using vespalib::make_string_short::fmt;
namespace storage::distributor {
MergeOperation::~MergeOperation() = default;
@@ -24,8 +25,7 @@ MergeOperation::getStatus() const
{
return
Operation::getStatus() +
- vespalib::make_string(" . Sent MergeBucketCommand at %s",
- to_string(to_utc(_sentMessageTime)).c_str());
+ fmt(" . Sent MergeBucketCommand at %s", to_string(to_utc(_sentMessageTime)).c_str());
}
void
@@ -35,7 +35,7 @@ MergeOperation::addIdealNodes(
std::vector<MergeMetaData>& result)
{
// Add all ideal nodes first. These are never marked source-only.
- for (unsigned short idealNode : idealNodes) {
+ for (uint16_t idealNode : idealNodes) {
const MergeMetaData* entry = nullptr;
for (const auto & node : nodes) {
if (idealNode == node._nodeIndex) {
@@ -56,7 +56,7 @@ MergeOperation::addCopiesNotAlreadyAdded(uint16_t redundancy,
const std::vector<MergeMetaData>& nodes,
std::vector<MergeMetaData>& result)
{
- for (auto node : nodes) {
+ for (const auto & node : nodes) {
bool found = false;
for (const auto & mergeData : result) {
if (mergeData._nodeIndex == node._nodeIndex) {
@@ -123,7 +123,7 @@ MergeOperation::onStart(DistributorStripeMessageSender& sender)
std::vector<std::unique_ptr<BucketCopy> > newCopies;
std::vector<MergeMetaData> nodes;
- for (unsigned short node : getNodes()) {
+ for (uint16_t node : getNodes()) {
const BucketCopy* copy = entry->getNode(node);
if (copy == nullptr) { // New copies?
newCopies.emplace_back(std::make_unique<BucketCopy>(BucketCopy::recentlyCreatedCopy(0, node)));
@@ -153,8 +153,7 @@ MergeOperation::onStart(DistributorStripeMessageSender& sender)
msg->set_use_unordered_forwarding(true);
}
- LOG(debug, "Sending %s to storage node %u", msg->toString().c_str(),
- _mnodes[0].index);
+ LOG(debug, "Sending %s to storage node %u", msg->toString().c_str(), _mnodes[0].index);
// Set timeout to one hour to prevent hung nodes that manage to keep
// connections open from stalling merges in the cluster indefinitely.
@@ -165,8 +164,7 @@ MergeOperation::onStart(DistributorStripeMessageSender& sender)
_sentMessageTime = _manager->node_context().clock().getMonotonicTime();
} else {
- LOGBP(debug,
- "Unable to merge bucket %s, since only one copy is available. System state %s",
+ LOGBP(debug, "Unable to merge bucket %s, since only one copy is available. System state %s",
getBucketId().toString().c_str(), clusterState.toString().c_str());
_ok = false;
done();
@@ -178,7 +176,7 @@ MergeOperation::sourceOnlyCopyChangedDuringMerge(
const BucketDatabase::Entry& currentState) const
{
assert(currentState.valid());
- for (auto mnode : _mnodes) {
+ for (const auto & mnode : _mnodes) {
const BucketCopy* copyBefore(_infoBefore.getNode(mnode.index));
if (!copyBefore) {
continue;
@@ -206,7 +204,7 @@ MergeOperation::deleteSourceOnlyNodes(
{
assert(currentState.valid());
std::vector<uint16_t> sourceOnlyNodes;
- for (auto & mnode : _mnodes) {
+ for (const auto & mnode : _mnodes) {
const uint16_t nodeIndex = mnode.index;
const BucketCopy* copy = currentState->getNode(nodeIndex);
if (!copy) {
@@ -338,7 +336,7 @@ bool MergeOperation::isBlocked(const DistributorStripeOperationContext& ctx,
// to enter the merge throttler queues, displacing lower priority merges.
if (!is_global_bucket_merge()) {
const auto& node_info = ctx.pending_message_tracker().getNodeInfo();
- for (auto node : getNodes()) {
+ for (uint16_t node : getNodes()) {
if (node_info.isBusy(node)) {
return true;
}
@@ -364,11 +362,9 @@ bool MergeOperation::all_involved_nodes_support_unordered_merge_chaining() const
MergeBucketMetricSet*
MergeOperation::get_merge_metrics()
{
- if (_manager) {
- return dynamic_cast<MergeBucketMetricSet *>(_manager->getMetrics().operations[getType()].get());
- } else {
- return nullptr;
- }
+ return (_manager)
+ ? dynamic_cast<MergeBucketMetricSet *>(_manager->getMetrics().operations[getType()].get())
+ : nullptr;
}
}
diff --git a/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp b/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp
index 533493a79a2..8618d570685 100644
--- a/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp
+++ b/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp
@@ -3,7 +3,6 @@
#include <vespa/vespalib/stllike/asciistream.h>
#include <vespa/vespalib/util/stringfmt.h>
#include <map>
-#include <algorithm>
#include <vespa/log/log.h>
LOG_SETUP(".pendingmessages");
@@ -15,7 +14,7 @@ PendingMessageTracker::PendingMessageTracker(framework::ComponentRegister& cr, u
vespalib::make_string("Pending messages to storage nodes (stripe %u)", stripe_index)),
_component(cr, "pendingmessagetracker"),
_nodeInfo(_component.getClock()),
- _nodeBusyDuration(60),
+ _nodeBusyDuration(60s),
_deferred_read_tasks(),
_lock()
{
@@ -38,7 +37,7 @@ vespalib::string
PendingMessageTracker::MessageEntry::toHtml() const {
vespalib::asciistream ss;
ss << "<li><i>Node " << nodeIdx << "</i>: "
- << "<b>" << framework::MilliSecTime(timeStamp.count()).toString() << "</b> "
+ << "<b>" << vespalib::to_string(timeStamp) << "</b> "
<< api::MessageType::get(api::MessageType::Id(msgType)).getName() << "(" << bucket.getBucketId() << ", priority=" << priority << ")</li>\n";
return ss.str();
}
@@ -46,7 +45,7 @@ PendingMessageTracker::MessageEntry::toHtml() const {
PendingMessageTracker::TimePoint
PendingMessageTracker::currentTime() const
{
- return TimePoint(_component.getClock().getTimeInMillis().getTime());
+ return _component.getClock().getSystemTime();
}
namespace {
diff --git a/storage/src/vespa/storage/distributor/pendingmessagetracker.h b/storage/src/vespa/storage/distributor/pendingmessagetracker.h
index 93238b5a83f..fb672d5ee31 100644
--- a/storage/src/vespa/storage/distributor/pendingmessagetracker.h
+++ b/storage/src/vespa/storage/distributor/pendingmessagetracker.h
@@ -68,13 +68,7 @@ public:
virtual bool check(uint32_t messageType, uint16_t node, uint8_t priority) = 0;
};
- /**
- * Time point represented as the millisecond interval from the framework
- * clock's epoch to a given point in time. Note that it'd be more
- * semantically correct to use std::chrono::time_point, but it is bound
- * to specific chrono clock types, their epochs and duration resolution.
- */
- using TimePoint = std::chrono::milliseconds;
+ using TimePoint = vespalib::system_time;
PendingMessageTracker(framework::ComponentRegister&, uint32_t stripe_index);
~PendingMessageTracker() override;
@@ -119,8 +113,8 @@ public:
*/
std::vector<uint64_t> clearMessagesForNode(uint16_t node);
- void setNodeBusyDuration(std::chrono::seconds secs) noexcept {
- _nodeBusyDuration = secs;
+ void setNodeBusyDuration(vespalib::duration duration) noexcept {
+ _nodeBusyDuration = duration;
}
void run_once_no_pending_for_bucket(const document::Bucket& bucket, std::unique_ptr<DeferredTask> task);
@@ -136,7 +130,7 @@ private:
MessageEntry(TimePoint timeStamp, uint32_t msgType, uint32_t priority,
uint64_t msgId, document::Bucket bucket, uint16_t nodeIdx) noexcept;
- vespalib::string toHtml() const;
+ [[nodiscard]] vespalib::string toHtml() const;
};
struct MessageIdKey : boost::multi_index::member<MessageEntry, uint64_t, &MessageEntry::msgId> {};
@@ -187,7 +181,7 @@ private:
Messages _messages;
framework::Component _component;
NodeInfo _nodeInfo;
- std::chrono::seconds _nodeBusyDuration;
+ vespalib::duration _nodeBusyDuration;
DeferredBucketTaskMap _deferred_read_tasks;
// Since distributor is currently single-threaded, this will only
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
index ec22d7c064e..db88a22d500 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
@@ -55,7 +55,7 @@ vespalib::string getNodeId(StorageComponent& sc) {
return ost.str();
}
-vespalib::duration TEN_MINUTES = 600s;
+constexpr vespalib::duration STALE_PROTOCOL_LIFETIME = 1h;
}
@@ -694,7 +694,7 @@ CommunicationManager::run(framework::ThreadHandle& thread)
std::lock_guard<std::mutex> guard(_earlierGenerationsLock);
for (auto it(_earlierGenerations.begin());
!_earlierGenerations.empty() &&
- ((it->first + TEN_MINUTES) < _component.getClock().getMonotonicTime());
+ ((it->first + STALE_PROTOCOL_LIFETIME) < _component.getClock().getMonotonicTime());
it = _earlierGenerations.begin())
{
_earlierGenerations.erase(it);
@@ -709,9 +709,8 @@ CommunicationManager::updateMetrics(const MetricLockGuard &)
}
void
-CommunicationManager::print(std::ostream& out, bool verbose, const std::string& indent) const
+CommunicationManager::print(std::ostream& out, bool , const std::string& ) const
{
- (void) verbose; (void) indent;
out << "CommunicationManager";
}
diff --git a/storage/src/vespa/storage/visiting/visitor.cpp b/storage/src/vespa/storage/visiting/visitor.cpp
index 91f304ad9a0..6d36abc896e 100644
--- a/storage/src/vespa/storage/visiting/visitor.cpp
+++ b/storage/src/vespa/storage/visiting/visitor.cpp
@@ -121,12 +121,9 @@ Visitor::VisitorTarget::metaForMessageId(uint64_t msgId)
void
Visitor::VisitorTarget::discardQueuedMessages()
{
- for (MessageQueue::iterator
- it(_queuedMessages.begin()), e(_queuedMessages.end());
- it != e; ++it)
- {
- LOG(spam, "Erasing queued message with id %" PRIu64, it->second);
- releaseMetaForMessageId(it->second);
+ for (const auto & entry : _queuedMessages) {
+ LOG(spam, "Erasing queued message with id %" PRIu64, entry.second);
+ releaseMetaForMessageId(entry.second);
}
_queuedMessages.clear();
}
@@ -310,17 +307,14 @@ Visitor::getStateName(VisitorState s)
return "COMPLETED";
default:
assert(!"Unknown visitor state");
- return NULL;
+ return nullptr;
}
}
Visitor::VisitorState
Visitor::transitionTo(VisitorState newState)
{
- LOG(debug, "Visitor '%s' state transition %s -> %s",
- _id.c_str(),
- getStateName(_state),
- getStateName(newState));
+ LOG(debug, "Visitor '%s' state transition %s -> %s", _id.c_str(), getStateName(_state), getStateName(newState));
VisitorState oldState = _state;
_state = newState;
return oldState;
@@ -339,12 +333,10 @@ Visitor::mayTransitionToCompleted() const
void
Visitor::forceClose()
{
- for (std::list<BucketIterationState*>::iterator it = _bucketStates.begin();
- it != _bucketStates.end(); ++it)
- {
+ for (auto * state : _bucketStates) {
// Reset iterator id so no destroy iterator will be sent
- (*it)->setIteratorId(spi::IteratorId(0));
- delete *it;
+ state->setIteratorId(spi::IteratorId(0));
+ delete state;
}
_bucketStates.clear();
transitionTo(STATE_COMPLETED);
@@ -358,7 +350,7 @@ Visitor::sendReplyOnce()
std::shared_ptr<api::StorageReply> reply(_initiatingCmd->makeReply());
_hitCounter->updateVisitorStatistics(_visitorStatistics);
- static_cast<api::CreateVisitorReply*>(reply.get())->setVisitorStatistics(_visitorStatistics);
+ dynamic_cast<api::CreateVisitorReply*>(reply.get())->setVisitorStatistics(_visitorStatistics);
if (shouldAddMbusTrace()) {
_trace.moveTraceTo(reply->getTrace());
}
@@ -373,17 +365,15 @@ void
Visitor::finalize()
{
if (_state != STATE_COMPLETED) {
- LOG(error, "Attempting to finalize non-completed visitor %s",
- _id.c_str());
+ LOG(error, "Attempting to finalize non-completed visitor %s", _id.c_str());
assert(false);
}
assert(_bucketStates.empty());
if (_result.success()) {
- if (_messageSession->pending() > 0)
- {
+ if (_messageSession->pending() > 0) {
_result = api::ReturnCode(api::ReturnCode::ABORTED);
- try{
+ try {
abortedVisiting();
} catch (std::exception& e) {
LOG(warning, "Visitor %s had a problem in abortVisiting(). As "
@@ -404,43 +394,31 @@ Visitor::finalize()
void
Visitor::discardAllNoPendingBucketStates()
{
- for (BucketStateList::iterator
- it(_bucketStates.begin()), e(_bucketStates.end());
- it != e;)
- {
+ for (auto it = _bucketStates.begin(); it !=_bucketStates.end();) {
BucketIterationState& bstate(**it);
if (bstate.hasPendingControlCommand() || bstate.hasPendingIterators()) {
- LOG(debug,
- "Visitor '%s' not discarding bucket state %s "
- "since it has pending operations",
- _id.c_str(),
- bstate.toString().c_str());
+ LOG(debug, "Visitor '%s' not discarding bucket state %s since it has pending operations",
+ _id.c_str(), bstate.toString().c_str());
++it;
continue;
}
- LOG(debug, "Visitor '%s' discarding bucket state %s",
- _id.c_str(), bstate.toString().c_str());
+ LOG(debug, "Visitor '%s' discarding bucket state %s", _id.c_str(), bstate.toString().c_str());
delete *it;
it = _bucketStates.erase(it);
}
}
void
-Visitor::fail(const api::ReturnCode& reason,
- bool overrideExistingError)
+Visitor::fail(const api::ReturnCode& reason, bool overrideExistingError)
{
assert(_state != STATE_COMPLETED);
if (_result.getResult() < reason.getResult() || overrideExistingError) {
- LOG(debug, "Setting result of visitor '%s' to %s",
- _id.c_str(), reason.toString().c_str());
+ LOG(debug, "Setting result of visitor '%s' to %s", _id.c_str(), reason.toString().c_str());
_result = reason;
}
if (_visitorTarget.hasQueuedMessages()) {
- LOG(debug, "Visitor '%s' dropping %zu queued messages bound to %s "
- "since visitor has failed",
- _id.c_str(),
- _visitorTarget._queuedMessages.size(),
- _controlDestination->toString().c_str());
+ LOG(debug, "Visitor '%s' dropping %zu queued messages bound to %s since visitor has failed",
+ _id.c_str(), _visitorTarget._queuedMessages.size(), _controlDestination->toString().c_str());
_visitorTarget.discardQueuedMessages();
}
discardAllNoPendingBucketStates();
@@ -448,8 +426,7 @@ Visitor::fail(const api::ReturnCode& reason,
}
bool
-Visitor::shouldReportProblemToClient(const api::ReturnCode& code,
- size_t retryCount) const
+Visitor::shouldReportProblemToClient(const api::ReturnCode& code, size_t retryCount)
{
// Report _once_ per message if we reach a certain retry threshold.
if (retryCount == TRANSIENT_ERROR_RETRIES_BEFORE_NOTIFY) {
@@ -521,7 +498,7 @@ Visitor::start(api::VisitorId id, api::StorageMessage::Id cmdId,
_visitorOptions._fromTime = fromTimestamp;
_visitorOptions._toTime = toTimestamp;
_currentBucket = 0;
- _hitCounter.reset(new HitCounter());
+ _hitCounter = std::make_unique<HitCounter>();
_messageSession = std::move(messageSession);
_documentPriority = documentPriority;
@@ -612,8 +589,7 @@ Visitor::handleDocumentApiReply(mbus::Reply::UP reply, VisitorThreadMetrics& met
uint64_t messageId = reply->getContext().value.UINT64;
uint32_t removed = _visitorTarget._pendingMessages.erase(messageId);
- LOG(spam, "Visitor '%s' reply %s for message ID %" PRIu64, _id.c_str(),
- reply->toString().c_str(), messageId);
+ LOG(spam, "Visitor '%s' reply %s for message ID %" PRIu64, _id.c_str(), reply->toString().c_str(), messageId);
assert(removed == 1);
(void) removed;
@@ -634,20 +610,16 @@ Visitor::handleDocumentApiReply(mbus::Reply::UP reply, VisitorThreadMetrics& met
metrics.visitorDestinationFailureReplies.inc();
if (message->getType() == documentapi::DocumentProtocol::MESSAGE_VISITORINFO) {
- LOG(debug, "Aborting visitor as we failed to talk to controller: %s",
- reply->getError(0).toString().c_str());
- api::ReturnCode returnCode(
- static_cast<api::ReturnCode::Result>(
- reply->getError(0).getCode()),
- reply->getError(0).getMessage());
+ LOG(debug, "Aborting visitor as we failed to talk to controller: %s", reply->getError(0).toString().c_str());
+ api::ReturnCode returnCode(static_cast<api::ReturnCode::Result>(reply->getError(0).getCode()),
+ reply->getError(0).getMessage());
fail(returnCode, true);
close();
return;
}
- api::ReturnCode returnCode(
- static_cast<api::ReturnCode::Result>(reply->getError(0).getCode()),
- reply->getError(0).getMessage());
+ api::ReturnCode returnCode(static_cast<api::ReturnCode::Result>(reply->getError(0).getCode()),
+ reply->getError(0).getMessage());
const bool should_fail = remap_docapi_message_error_code(returnCode);
if (should_fail) {
// Abort - something is wrong with target.
@@ -657,8 +629,7 @@ Visitor::handleDocumentApiReply(mbus::Reply::UP reply, VisitorThreadMetrics& met
}
if (failed()) {
- LOG(debug, "Failed to send message from visitor '%s', due to "
- "%s. Not resending since visitor has failed",
+ LOG(debug, "Failed to send message from visitor '%s', due to %s. Not resending since visitor has failed",
_id.c_str(), returnCode.toString().c_str());
return;
}
@@ -709,8 +680,7 @@ Visitor::onCreateIteratorReply(
if (reply->getResult().failed()) {
LOG(debug, "Failed to create iterator for bucket %s: %s",
- bucketId.toString().c_str(),
- reply->getResult().toString().c_str());
+ bucketId.toString().c_str(), reply->getResult().toString().c_str());
fail(reply->getResult());
delete *it;
_bucketStates.erase((++it).base());
@@ -718,17 +688,14 @@ Visitor::onCreateIteratorReply(
}
bucketState.setIteratorId(reply->getIteratorId());
if (failed()) {
- LOG(debug, "Create iterator for bucket %s is OK, "
- "but visitor has failed: %s",
- bucketId.toString().c_str(),
- _result.toString().c_str());
+ LOG(debug, "Create iterator for bucket %s is OK, but visitor has failed: %s",
+ bucketId.toString().c_str(), _result.toString().c_str());
delete *it;
_bucketStates.erase((++it).base());
return;
}
- LOG(debug, "Visitor '%s' starting to visit bucket %s.",
- _id.c_str(), bucketId.toString().c_str());
+ LOG(debug, "Visitor '%s' starting to visit bucket %s.", _id.c_str(), bucketId.toString().c_str());
auto cmd = std::make_shared<GetIterCommand>(bucket, bucketState.getIteratorId(), _docBlockSize);
cmd->getTrace().setLevel(_traceLevel);
cmd->setPriority(_priority);
@@ -737,13 +704,10 @@ Visitor::onCreateIteratorReply(
}
void
-Visitor::onGetIterReply(const std::shared_ptr<GetIterReply>& reply,
- VisitorThreadMetrics& metrics)
+Visitor::onGetIterReply(const std::shared_ptr<GetIterReply>& reply, VisitorThreadMetrics& metrics)
{
LOG(debug, "Visitor '%s' got get iter reply for bucket %s: %s",
- _id.c_str(),
- reply->getBucketId().toString().c_str(),
- reply->getResult().toString().c_str());
+ _id.c_str(), reply->getBucketId().toString().c_str(), reply->getResult().toString().c_str());
auto it = _bucketStates.rbegin();
// New requests will be pushed on end of list.. So searching
@@ -763,10 +727,8 @@ Visitor::onGetIterReply(const std::shared_ptr<GetIterReply>& reply,
!reply->getResult().isShutdownRelated() &&
!reply->getResult().isBucketDisappearance())
{
- LOG(warning, "Failed to talk to persistence layer for bucket "
- "%s. Aborting visitor '%s': %s",
- reply->getBucketId().toString().c_str(),
- _id.c_str(), reply->getResult().toString().c_str());
+ LOG(warning, "Failed to talk to persistence layer for bucket %s. Aborting visitor '%s': %s",
+ reply->getBucketId().toString().c_str(), _id.c_str(), reply->getResult().toString().c_str());
}
fail(reply->getResult());
BucketIterationState& bucketState(**it);
@@ -783,17 +745,14 @@ Visitor::onGetIterReply(const std::shared_ptr<GetIterReply>& reply,
bucketState.setCompleted(reply->isCompleted());
--bucketState._pendingIterators;
if (!reply->getEntries().empty()) {
- LOG(debug, "Processing documents in handle given from bucket %s.",
- reply->getBucketId().toString().c_str());
+ LOG(debug, "Processing documents in handle given from bucket %s.", reply->getBucketId().toString().c_str());
// While handling documents we should not keep locks, such
// that visitor may process several things at once.
if (isRunning()) {
MBUS_TRACE(reply->getTrace(), 5,
vespalib::make_string("Visitor %s handling block of %zu documents.",
_id.c_str(), reply->getEntries().size()));
- LOG(debug, "Visitor %s handling block of %zu documents.",
- _id.c_str(),
- reply->getEntries().size());
+ LOG(debug, "Visitor %s handling block of %zu documents.", _id.c_str(), reply->getEntries().size());
try {
framework::MilliSecTimer processingTimer(_component.getClock());
handleDocuments(reply->getBucketId(), reply->getEntries(), *_hitCounter);
@@ -913,15 +872,11 @@ Visitor::continueVisitor()
}
}
- LOG(debug, "No pending messages, tagging visitor '%s' complete",
- _id.c_str());
+ LOG(debug, "No pending messages, tagging visitor '%s' complete", _id.c_str());
transitionTo(STATE_COMPLETED);
} else {
- LOG(debug, "Visitor %s waiting for all commands to be replied to "
- "(pending=%zu, queued=%zu)",
- _id.c_str(),
- _visitorTarget._pendingMessages.size(),
- _visitorTarget._queuedMessages.size());
+ LOG(debug, "Visitor %s waiting for all commands to be replied to (pending=%zu, queued=%zu)",
+ _id.c_str(), _visitorTarget._pendingMessages.size(), _visitorTarget._queuedMessages.size());
}
return false;
} else {
@@ -981,14 +936,14 @@ Visitor::getStatus(std::ostream& out, bool verbose) const
<< (_visitorOptions._visitRemoves ? "true" : "false")
<< "</td></tr>\n";
out << "<tr><td>Control destination</td><td>";
- if (_controlDestination.get()) {
+ if (_controlDestination) {
out << xml_content_escaped(_controlDestination->toString());
} else {
out << "nil";
}
out << "</td></tr>\n";
out << "<tr><td>Data destination</td><td>";
- if (_dataDestination.get()) {
+ if (_dataDestination) {
out << xml_content_escaped(_dataDestination->toString());
} else {
out << "nil";
@@ -1078,17 +1033,13 @@ Visitor::getStatus(std::ostream& out, bool verbose) const
bool
Visitor::getIterators()
{
- LOG(debug, "getIterators, visitor %s, _buckets = %zu , _bucketStates = %zu, "
- "_currentBucket = %d",
- _id.c_str(), _buckets.size(),
- _bucketStates.size(), _currentBucket);
+ LOG(debug, "getIterators, visitor %s, _buckets = %zu , _bucketStates = %zu, _currentBucket = %d",
+ _id.c_str(), _buckets.size(), _bucketStates.size(), _currentBucket);
// Don't send any further GetIters if we're closing
if (!isRunning()) {
if (hasPendingIterators()) {
- LOG(debug, "Visitor has failed but waiting for %zu "
- "buckets to finish processing",
- _bucketStates.size());
+ LOG(debug, "Visitor has failed but waiting for %zu buckets to finish processing", _bucketStates.size());
return true;
} else {
return false;
@@ -1097,13 +1048,10 @@ Visitor::getIterators()
// Go through buckets found. Take the first that doesn't have requested
// state and request a new piece.
- for (std::list<BucketIterationState*>::iterator it = _bucketStates.begin();
- it != _bucketStates.end();)
- {
+ for (auto it = _bucketStates.begin();it != _bucketStates.end();) {
assert(*it);
BucketIterationState& bucketState(**it);
- if ((bucketState._pendingIterators
- >= _visitorOptions._maxParallelOneBucket)
+ if ((bucketState._pendingIterators >= _visitorOptions._maxParallelOneBucket)
|| bucketState.hasPendingControlCommand())
{
++it;
@@ -1118,20 +1066,17 @@ Visitor::getIterators()
}
try{
completedBucket(bucketState.getBucketId(), *_hitCounter);
- _visitorStatistics.setBucketsVisited(
- _visitorStatistics.getBucketsVisited() + 1);
+ _visitorStatistics.setBucketsVisited(_visitorStatistics.getBucketsVisited() + 1);
} catch (std::exception& e) {
std::ostringstream ost;
- ost << "Visitor fail to run completedBucket() notification: "
- << e.what();
+ ost << "Visitor fail to run completedBucket() notification: " << e.what();
reportProblem(ost.str());
}
delete *it;
it = _bucketStates.erase(it);
continue;
}
- auto cmd = std::make_shared<GetIterCommand>(
- bucketState.getBucket(), bucketState.getIteratorId(), _docBlockSize);
+ auto cmd = std::make_shared<GetIterCommand>(bucketState.getBucket(), bucketState.getIteratorId(), _docBlockSize);
cmd->getTrace().setLevel(_traceLevel);
cmd->setPriority(_priority);
_messageHandler->send(cmd, *this);
@@ -1143,7 +1088,7 @@ Visitor::getIterators()
}
// If there aren't anymore buckets to iterate, we're done
- if (_bucketStates.size() == 0 && _currentBucket >= _buckets.size()) {
+ if (_bucketStates.empty() && _currentBucket >= _buckets.size()) {
LOG(debug, "No more buckets to visit for visitor '%s'.", _id.c_str());
return false;
}
@@ -1157,17 +1102,13 @@ Visitor::getIterators()
_currentBucket < _buckets.size())
{
document::Bucket bucket(_bucketSpace, _buckets[_currentBucket]);
- std::unique_ptr<BucketIterationState> newBucketState(
- new BucketIterationState(*this, *_messageHandler, bucket));
+ auto newBucketState = std::make_unique<BucketIterationState>(*this, *_messageHandler, bucket);
LOG(debug, "Visitor '%s': Sending create iterator for bucket %s.",
_id.c_str(), bucket.getBucketId().toString().c_str());
- spi::Selection selection
- = spi::Selection(spi::DocumentSelection(_documentSelectionString));
- selection.setFromTimestamp(
- spi::Timestamp(_visitorOptions._fromTime.getTime()));
- selection.setToTimestamp(
- spi::Timestamp(_visitorOptions._toTime.getTime()));
+ spi::Selection selection = spi::Selection(spi::DocumentSelection(_documentSelectionString));
+ selection.setFromTimestamp(spi::Timestamp(_visitorOptions._fromTime.getTime()));
+ selection.setToTimestamp(spi::Timestamp(_visitorOptions._toTime.getTime()));
auto cmd = std::make_shared<CreateIteratorCommand>(bucket, selection,_visitorOptions._fieldSet,
_visitorOptions._visitRemoves
@@ -1184,8 +1125,7 @@ Visitor::getIterators()
}
if (sentCount == 0) {
if (LOG_WOULD_LOG(debug)) {
- LOG(debug, "Enough iterators being processed. Doing nothing for "
- "visitor '%s' bucketStates = %zu.",
+ LOG(debug, "Enough iterators being processed. Doing nothing for visitor '%s' bucketStates = %zu.",
_id.c_str(), _bucketStates.size());
for (const auto& state : _bucketStates) {
LOG(debug, "Existing: %s", state->toString().c_str());
diff --git a/storage/src/vespa/storage/visiting/visitor.h b/storage/src/vespa/storage/visiting/visitor.h
index 0737c5612c0..9b6d8e348b9 100644
--- a/storage/src/vespa/storage/visiting/visitor.h
+++ b/storage/src/vespa/storage/visiting/visitor.h
@@ -136,28 +136,24 @@ private:
{}
/** Sends DestroyIterator over _messageHandler if _iteratorId != 0 */
- ~BucketIterationState();
+ ~BucketIterationState() override;
void setCompleted(bool completed = true) { _completed = completed; }
- bool isCompleted() const { return _completed; }
+ [[nodiscard]] bool isCompleted() const { return _completed; }
- document::Bucket getBucket() const { return _bucket; }
- document::BucketId getBucketId() const { return _bucket.getBucketId(); }
+ [[nodiscard]] document::Bucket getBucket() const { return _bucket; }
+ [[nodiscard]] document::BucketId getBucketId() const { return _bucket.getBucketId(); }
void setIteratorId(spi::IteratorId iteratorId) {
_iteratorId = iteratorId;
}
- spi::IteratorId getIteratorId() const { return _iteratorId; }
+ [[nodiscard]] spi::IteratorId getIteratorId() const { return _iteratorId; }
- void setPendingControlCommand() {
- _iteratorId = spi::IteratorId(0);
- }
-
- bool hasPendingControlCommand() const {
+ [[nodiscard]] bool hasPendingControlCommand() const {
return _iteratorId == spi::IteratorId(0);
}
- bool hasPendingIterators() const { return _pendingIterators > 0; }
+ [[nodiscard]] bool hasPendingIterators() const { return _pendingIterators > 0; }
void print(std::ostream& out, bool, const std::string& ) const override {
out << "BucketIterationState("
@@ -247,12 +243,10 @@ private:
MessageMeta releaseMetaForMessageId(uint64_t msgId);
void reinsertMeta(MessageMeta);
- bool hasQueuedMessages() const { return !_queuedMessages.empty(); }
+ [[nodiscard]] bool hasQueuedMessages() const { return !_queuedMessages.empty(); }
void discardQueuedMessages();
- uint32_t getMemoryUsage() const noexcept {
- return _memoryUsage;
- }
+ [[nodiscard]] uint32_t getMemoryUsage() const noexcept { return _memoryUsage; }
VisitorTarget();
~VisitorTarget();
@@ -326,9 +320,9 @@ protected:
std::string _documentSelectionString;
vdslib::VisitorStatistics _visitorStatistics;
- bool isCompletedCalled() const { return _calledCompletedVisitor; }
+ [[nodiscard]] bool isCompletedCalled() const { return _calledCompletedVisitor; }
- uint32_t traceLevel() const noexcept { return _traceLevel; }
+ [[nodiscard]] uint32_t traceLevel() const noexcept { return _traceLevel; }
/**
* Attempts to add the given trace message to the internal, memory bounded
@@ -339,7 +333,7 @@ protected:
*/
bool addBoundedTrace(uint32_t level, const vespalib::string& message);
- const vdslib::Parameters& visitor_parameters() const noexcept;
+ [[nodiscard]] const vdslib::Parameters& visitor_parameters() const noexcept;
// Possibly modifies the ReturnCode parameter in-place if its return code should
// be changed based on visitor subclass-specific behavior.
@@ -417,7 +411,7 @@ public:
* The consistency level provided here is propagated through the SPI
* Context object for createIterator calls.
*/
- virtual spi::ReadConsistency getRequiredReadConsistency() const {
+ [[nodiscard]] virtual spi::ReadConsistency getRequiredReadConsistency() const {
return spi::ReadConsistency::STRONG;
}
@@ -428,8 +422,7 @@ public:
/**
* Used to silence transient errors that can happen during normal operation.
*/
- bool shouldReportProblemToClient(const api::ReturnCode&,
- size_t retryCount) const;
+ [[nodiscard]] static bool shouldReportProblemToClient(const api::ReturnCode&, size_t retryCount) ;
/** Called to send report to client of potential non-critical problems. */
void reportProblem(const std::string& problem);
@@ -492,18 +485,16 @@ public:
void getStatus(std::ostream& out, bool verbose) const;
- void setMaxParallel(uint32_t maxParallel)
- { _visitorOptions._maxParallel = maxParallel; }
- void setMaxParallelPerBucket(uint32_t max)
- { _visitorOptions._maxParallelOneBucket = max; }
+ void setMaxParallel(uint32_t maxParallel) { _visitorOptions._maxParallel = maxParallel; }
+ void setMaxParallelPerBucket(uint32_t max) { _visitorOptions._maxParallelOneBucket = max; }
/**
* Sends a message to the data handler for this visitor.
*/
void sendMessage(std::unique_ptr<documentapi::DocumentMessage> documentMessage);
- bool isRunning() const { return _state == STATE_RUNNING; }
- bool isCompleted() const { return _state == STATE_COMPLETED; }
+ [[nodiscard]] bool isRunning() const { return _state == STATE_RUNNING; }
+ [[nodiscard]] bool isCompleted() const { return _state == STATE_COMPLETED; }
private:
/**
@@ -542,11 +533,9 @@ private:
void sendReplyOnce();
- bool hasFailedVisiting() const { return _result.failed(); }
-
- bool hasPendingIterators() const { return !_bucketStates.empty(); }
-
- bool mayTransitionToCompleted() const;
+ [[nodiscard]] bool hasFailedVisiting() const { return _result.failed(); }
+ [[nodiscard]] bool hasPendingIterators() const { return !_bucketStates.empty(); }
+ [[nodiscard]] bool mayTransitionToCompleted() const;
void discardAllNoPendingBucketStates();
@@ -565,9 +554,7 @@ private:
*
* Precondition: attach() must have been called on `this`.
*/
- bool shouldAddMbusTrace() const noexcept {
- return _traceLevel != 0;
- }
+ [[nodiscard]] bool shouldAddMbusTrace() const noexcept { return _traceLevel != 0; }
/**
* Set internal state to the given state value.
diff --git a/storage/src/vespa/storage/visiting/visitormanager.cpp b/storage/src/vespa/storage/visiting/visitormanager.cpp
index a03b9a9a8a3..07938002746 100644
--- a/storage/src/vespa/storage/visiting/visitormanager.cpp
+++ b/storage/src/vespa/storage/visiting/visitormanager.cpp
@@ -187,9 +187,8 @@ VisitorManager::configure(std::unique_ptr<vespa::config::content::core::StorVisi
for (int32_t i=0; i<config->visitorthreads; ++i) {
_visitorThread.emplace_back(
// Naked new due to a lot of private inheritance in VisitorThread and VisitorManager
- std::shared_ptr<VisitorThread>(
- new VisitorThread(i, _componentRegister, _messageSessionFactory,
- _visitorFactories, *_metrics->threads[i], *this)),
+ std::shared_ptr<VisitorThread>(new VisitorThread(i, _componentRegister, _messageSessionFactory,
+ _visitorFactories, *_metrics->threads[i], *this)),
std::map<api::VisitorId, std::string>());
}
}
@@ -450,8 +449,7 @@ VisitorManager::processReply(const std::shared_ptr<api::StorageReply>& reply)
}
void
-VisitorManager::send(const std::shared_ptr<api::StorageCommand>& cmd,
- Visitor& visitor)
+VisitorManager::send(const std::shared_ptr<api::StorageCommand>& cmd, Visitor& visitor)
{
assert(cmd->getType() == api::MessageType::INTERNAL);
// Only add to internal state if not destroy iterator command, as
@@ -460,7 +458,7 @@ VisitorManager::send(const std::shared_ptr<api::StorageCommand>& cmd,
if (static_cast<const api::InternalCommand&>(*cmd).getType() != DestroyIteratorCommand::ID) {
MessageInfo inf;
inf.id = visitor.getVisitorId();
- inf.timestamp = _component.getClock().getTimeInSeconds().getTime();
+ inf.timestamp = _component.getClock().getSystemTime();
inf.timeout = cmd->getTimeout();
if (cmd->getAddress()) {
@@ -623,7 +621,7 @@ VisitorManager::reportHtmlStatus(std::ostream& out,
out << "<tr>"
<< "<td>" << entry.first << "</td>"
<< "<td>" << entry.second.id << "</td>"
- << "<td>" << entry.second.timestamp << "</td>"
+ << "<td>" << vespalib::to_string(entry.second.timestamp) << "</td>"
<< "<td>" << vespalib::count_ms(entry.second.timeout) << "</td>"
<< "<td>" << xml_content_escaped(entry.second.destination) << "</td>"
<< "</tr>\n";
diff --git a/storage/src/vespa/storage/visiting/visitormanager.h b/storage/src/vespa/storage/visiting/visitormanager.h
index 33703b392bc..3e331e1c9a2 100644
--- a/storage/src/vespa/storage/visiting/visitormanager.h
+++ b/storage/src/vespa/storage/visiting/visitormanager.h
@@ -57,7 +57,7 @@ private:
struct MessageInfo {
api::VisitorId id;
- time_t timestamp;
+ vespalib::system_time timestamp;
vespalib::duration timeout;
std::string destination;
};
@@ -168,9 +168,7 @@ private:
* by the formula: fixed + variable * ((255 - priority) / 255)
*/
uint32_t maximumConcurrent(const api::CreateVisitorCommand& cmd) const {
- return _maxFixedConcurrentVisitors + static_cast<uint32_t>(
- _maxVariableConcurrentVisitors
- * ((255.0 - cmd.getPriority()) / 255.0));
+ return _maxFixedConcurrentVisitors + static_cast<uint32_t>(_maxVariableConcurrentVisitors * ((255.0 - cmd.getPriority()) / 255.0));
}
void updateMetrics(const MetricLockGuard &) override;
diff --git a/storage/src/vespa/storage/visiting/visitorthread.cpp b/storage/src/vespa/storage/visiting/visitorthread.cpp
index 55ef83ba658..e3ebef3a3ef 100644
--- a/storage/src/vespa/storage/visiting/visitorthread.cpp
+++ b/storage/src/vespa/storage/visiting/visitorthread.cpp
@@ -126,10 +126,10 @@ VisitorThread::shutdown()
if (event._message.get()) {
if (!event._message->getType().isReply()
&& (event._message->getType() != api::MessageType::INTERNAL
- || static_cast<const api::InternalCommand&>(*event._message).getType() != PropagateVisitorConfig::ID))
+ || dynamic_cast<const api::InternalCommand&>(*event._message).getType() != PropagateVisitorConfig::ID))
{
std::shared_ptr<api::StorageReply> reply(
- static_cast<api::StorageCommand&>(*event._message).makeReply());
+ dynamic_cast<api::StorageCommand&>(*event._message).makeReply());
reply->setResult(api::ReturnCode(api::ReturnCode::ABORTED, "Shutting down storage node."));
_messageSender.send(reply);
}
@@ -197,7 +197,7 @@ VisitorThread::run(framework::ThreadHandle& thread)
// disappear when no visiting is done)
if (entry._message.get() &&
(entry._message->getType() != api::MessageType::INTERNAL
- || static_cast<api::InternalCommand&>(*entry._message).getType() != PropagateVisitorConfig::ID))
+ || dynamic_cast<api::InternalCommand&>(*entry._message).getType() != PropagateVisitorConfig::ID))
{
entry._timer.stop(_metrics.averageQueueWaitingTime);
}
@@ -290,7 +290,7 @@ VisitorThread::close()
} else {
_metrics.completedVisitors.inc(1);
}
- framework::SecondTime currentTime(_component.getClock().getTimeInSeconds());
+ vespalib::steady_time currentTime(_component.getClock().getMonotonicTime());
trimRecentlyCompletedList(currentTime);
_recentlyCompleted.emplace_back(_currentlyRunningVisitor->first, currentTime);
_visitors.erase(_currentlyRunningVisitor);
@@ -298,9 +298,9 @@ VisitorThread::close()
}
void
-VisitorThread::trimRecentlyCompletedList(framework::SecondTime currentTime)
+VisitorThread::trimRecentlyCompletedList(vespalib::steady_time currentTime)
{
- framework::SecondTime recentLimit(currentTime - framework::SecondTime(30));
+ vespalib::steady_time recentLimit(currentTime - 30s);
// Dump all elements that aren't recent anymore
while (!_recentlyCompleted.empty()
&& _recentlyCompleted.front().second < recentLimit)
@@ -313,8 +313,7 @@ void
VisitorThread::handleNonExistingVisitorCall(const Event& entry, ReturnCode& code)
{
// Get current time. Set the time that is the oldest still recent.
- framework::SecondTime currentTime(_component.getClock().getTimeInSeconds());
- trimRecentlyCompletedList(currentTime);
+ trimRecentlyCompletedList(_component.getClock().getMonotonicTime());
// Go through all recent visitors. Ignore request if recent
for (const auto& e : _recentlyCompleted) {
@@ -344,7 +343,7 @@ VisitorThread::createVisitor(vespalib::stringref libName,
auto it = _visitorFactories.find(str);
if (it == _visitorFactories.end()) {
error << "Visitor library " << str << " not found.";
- return std::shared_ptr<Visitor>();
+ return {};
}
auto libIter = _libs.find(str);
@@ -363,7 +362,7 @@ VisitorThread::createVisitor(vespalib::stringref libName,
} catch (std::exception& e) {
error << "Failed to create visitor instance of type " << libName
<< ": " << e.what();
- return std::shared_ptr<Visitor>();
+ return {};
}
}
@@ -690,7 +689,7 @@ VisitorThread::getStatus(vespalib::asciistream& out,
}
for (const auto& cv : _recentlyCompleted) {
out << "<li> Visitor " << cv.first << " done at "
- << cv.second.getTime() << "\n";
+ << vespalib::to_string(vespalib::to_utc(cv.second)) << "\n";
}
out << "</ul>\n";
out << "<h3>Current queue size: " << _queue.size() << "</h3>\n";
@@ -736,12 +735,10 @@ VisitorThread::getStatus(vespalib::asciistream& out,
if (_visitors.empty()) {
out << "None\n";
}
- for (VisitorMap::const_iterator it = _visitors.begin();
- it != _visitors.end(); ++it)
- {
- out << "<a href=\"?visitor=" << it->first
+ for (const auto & v : _visitors) {
+ out << "<a href=\"?visitor=" << v.first
<< (verbose ? "&verbose" : "") << "\">Visitor "
- << it->first << "</a><br>\n";
+ << v.first << "</a><br>\n";
}
}
}
diff --git a/storage/src/vespa/storage/visiting/visitorthread.h b/storage/src/vespa/storage/visiting/visitorthread.h
index 226e7c0631b..56e40328fda 100644
--- a/storage/src/vespa/storage/visiting/visitorthread.h
+++ b/storage/src/vespa/storage/visiting/visitorthread.h
@@ -38,7 +38,7 @@ class VisitorThread : public framework::Runnable,
using VisitorMap = std::map<api::VisitorId, std::shared_ptr<Visitor>>;
VisitorMap _visitors;
- std::deque<std::pair<api::VisitorId, framework::SecondTime>> _recentlyCompleted;
+ std::deque<std::pair<api::VisitorId, vespalib::steady_time>> _recentlyCompleted;
struct Event {
enum class Type {
@@ -118,7 +118,7 @@ private:
*/
Event popNextQueuedEventIfAvailable();
void tick();
- void trimRecentlyCompletedList(framework::SecondTime currentTime);
+ void trimRecentlyCompletedList(vespalib::steady_time currentTime);
void handleNonExistingVisitorCall(const Event& entry, api::ReturnCode& code);
std::shared_ptr<Visitor> createVisitor(vespalib::stringref libName,
diff --git a/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/identityprovider/api/EntityBindingsMapper.java b/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/identityprovider/api/EntityBindingsMapper.java
index 110bd5e885e..9b7b666e353 100644
--- a/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/identityprovider/api/EntityBindingsMapper.java
+++ b/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/identityprovider/api/EntityBindingsMapper.java
@@ -25,7 +25,7 @@ import static com.yahoo.vespa.athenz.identityprovider.api.VespaUniqueInstanceId.
*/
public class EntityBindingsMapper {
- private static final ObjectMapper mapper = new ObjectMapper().registerModule(new JavaTimeModule());
+ static final ObjectMapper mapper = new ObjectMapper().registerModule(new JavaTimeModule());
private EntityBindingsMapper() {}
@@ -37,6 +37,14 @@ public class EntityBindingsMapper {
}
}
+ public static SignedIdentityDocument fromInputStream(InputStream in) throws IOException {
+ return EntityBindingsMapper.toSignedIdentityDocument(mapper.readValue(in, SignedIdentityDocumentEntity.class));
+ }
+
+ public static SignedIdentityDocument fromString(String json) throws IOException {
+ return EntityBindingsMapper.toSignedIdentityDocument(mapper.readValue(json, SignedIdentityDocumentEntity.class));
+ }
+
public static SignedIdentityDocument toSignedIdentityDocument(SignedIdentityDocumentEntity entity) {
return new SignedIdentityDocument(
entity.signature(),
@@ -49,7 +57,8 @@ public class EntityBindingsMapper {
entity.createdAt(),
entity.ipAddresses(),
IdentityType.fromId(entity.identityType()),
- Optional.ofNullable(entity.clusterType()).map(ClusterType::from).orElse(null));
+ Optional.ofNullable(entity.clusterType()).map(ClusterType::from).orElse(null),
+ entity.unknownAttributes());
}
public static SignedIdentityDocumentEntity toSignedIdentityDocumentEntity(SignedIdentityDocument model) {
@@ -64,13 +73,13 @@ public class EntityBindingsMapper {
model.createdAt(),
model.ipAddresses(),
model.identityType().id(),
- Optional.ofNullable(model.clusterType()).map(ClusterType::toConfigValue).orElse(null));
+ Optional.ofNullable(model.clusterType()).map(ClusterType::toConfigValue).orElse(null),
+ model.unknownAttributes());
}
public static SignedIdentityDocument readSignedIdentityDocumentFromFile(Path file) {
try (InputStream inputStream = Files.newInputStream(file)) {
- SignedIdentityDocumentEntity entity = mapper.readValue(inputStream, SignedIdentityDocumentEntity.class);
- return EntityBindingsMapper.toSignedIdentityDocument(entity);
+ return fromInputStream(inputStream);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
diff --git a/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/identityprovider/api/SignedIdentityDocument.java b/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/identityprovider/api/SignedIdentityDocument.java
index e331fc1f6e8..a08f0d3391d 100644
--- a/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/identityprovider/api/SignedIdentityDocument.java
+++ b/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/identityprovider/api/SignedIdentityDocument.java
@@ -4,17 +4,33 @@ package com.yahoo.vespa.athenz.identityprovider.api;
import com.yahoo.vespa.athenz.api.AthenzService;
import java.time.Instant;
+import java.util.Map;
import java.util.Set;
/**
- * A signed identity document
+ * A signed identity document.
+ * The {@link #unknownAttributes()} member provides forward compatibility and ensures any new/unknown fields are kept intact when serialized to JSON.
*
* @author bjorncs
*/
public record SignedIdentityDocument(String signature, int signingKeyVersion, VespaUniqueInstanceId providerUniqueId,
AthenzService providerService, int documentVersion, String configServerHostname,
String instanceHostname, Instant createdAt, Set<String> ipAddresses,
- IdentityType identityType, ClusterType clusterType) {
+ IdentityType identityType, ClusterType clusterType, Map<String, Object> unknownAttributes) {
+
+ public SignedIdentityDocument {
+ ipAddresses = Set.copyOf(ipAddresses);
+ unknownAttributes = Map.copyOf(unknownAttributes);
+ }
+
+ public SignedIdentityDocument(String signature, int signingKeyVersion, VespaUniqueInstanceId providerUniqueId,
+ AthenzService providerService, int documentVersion, String configServerHostname,
+ String instanceHostname, Instant createdAt, Set<String> ipAddresses,
+ IdentityType identityType, ClusterType clusterType) {
+ this(signature, signingKeyVersion, providerUniqueId, providerService, documentVersion, configServerHostname,
+ instanceHostname, createdAt, ipAddresses, identityType, clusterType, Map.of());
+ }
+
public static final int DEFAULT_DOCUMENT_VERSION = 2;
}
diff --git a/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/identityprovider/api/bindings/SignedIdentityDocumentEntity.java b/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/identityprovider/api/bindings/SignedIdentityDocumentEntity.java
index 2fb709615da..c37dd2f9147 100644
--- a/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/identityprovider/api/bindings/SignedIdentityDocumentEntity.java
+++ b/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/identityprovider/api/bindings/SignedIdentityDocumentEntity.java
@@ -1,25 +1,51 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.athenz.identityprovider.api.bindings;
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonAnyGetter;
+import com.fasterxml.jackson.annotation.JsonAnySetter;
+import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Set;
/**
* @author bjorncs
*/
-@JsonIgnoreProperties(ignoreUnknown = true)
-public record SignedIdentityDocumentEntity(@JsonProperty("signature") String signature,
- @JsonProperty("signing-key-version") int signingKeyVersion,
- @JsonProperty("provider-unique-id") String providerUniqueId,
- @JsonProperty("provider-service") String providerService,
- @JsonProperty("document-version") int documentVersion,
- @JsonProperty("configserver-hostname") String configServerHostname,
- @JsonProperty("instance-hostname") String instanceHostname,
- @JsonProperty("created-at") Instant createdAt,
- @JsonProperty("ip-addresses") Set<String> ipAddresses,
- @JsonProperty("identity-type") String identityType,
- @JsonProperty("cluster-type") String clusterType) {
+public record SignedIdentityDocumentEntity(
+ String signature, int signingKeyVersion, String providerUniqueId, String providerService, int documentVersion,
+ String configServerHostname, String instanceHostname, Instant createdAt, Set<String> ipAddresses,
+ String identityType, String clusterType, Map<String, Object> unknownAttributes) {
+
+ @JsonCreator
+ public SignedIdentityDocumentEntity(@JsonProperty("signature") String signature,
+ @JsonProperty("signing-key-version") int signingKeyVersion,
+ @JsonProperty("provider-unique-id") String providerUniqueId,
+ @JsonProperty("provider-service") String providerService,
+ @JsonProperty("document-version") int documentVersion,
+ @JsonProperty("configserver-hostname") String configServerHostname,
+ @JsonProperty("instance-hostname") String instanceHostname,
+ @JsonProperty("created-at") Instant createdAt,
+ @JsonProperty("ip-addresses") Set<String> ipAddresses,
+ @JsonProperty("identity-type") String identityType,
+ @JsonProperty("cluster-type") String clusterType) {
+ this(signature, signingKeyVersion, providerUniqueId, providerService, documentVersion, configServerHostname,
+ instanceHostname, createdAt, ipAddresses, identityType, clusterType, new HashMap<>());
+ }
+
+ @JsonProperty("signature") @Override public String signature() { return signature; }
+ @JsonProperty("signing-key-version") @Override public int signingKeyVersion() { return signingKeyVersion; }
+ @JsonProperty("provider-unique-id") @Override public String providerUniqueId() { return providerUniqueId; }
+ @JsonProperty("provider-service") @Override public String providerService() { return providerService; }
+ @JsonProperty("document-version") @Override public int documentVersion() { return documentVersion; }
+ @JsonProperty("configserver-hostname") @Override public String configServerHostname() { return configServerHostname; }
+ @JsonProperty("instance-hostname") @Override public String instanceHostname() { return instanceHostname; }
+ @JsonProperty("created-at") @Override public Instant createdAt() { return createdAt; }
+ @JsonProperty("ip-addresses") @Override public Set<String> ipAddresses() { return ipAddresses; }
+ @JsonProperty("identity-type") @Override public String identityType() { return identityType; }
+ @JsonProperty("cluster-type") @Override public String clusterType() { return clusterType; }
+ @JsonAnyGetter @Override public Map<String, Object> unknownAttributes() { return unknownAttributes; }
+ @JsonAnySetter public void set(String name, Object value) { unknownAttributes.put(name, value); }
}
diff --git a/vespa-athenz/src/test/java/com/yahoo/vespa/athenz/identityprovider/api/EntityBindingsMapperTest.java b/vespa-athenz/src/test/java/com/yahoo/vespa/athenz/identityprovider/api/EntityBindingsMapperTest.java
new file mode 100644
index 00000000000..f8c119190a6
--- /dev/null
+++ b/vespa-athenz/src/test/java/com/yahoo/vespa/athenz/identityprovider/api/EntityBindingsMapperTest.java
@@ -0,0 +1,47 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+package com.yahoo.vespa.athenz.identityprovider.api;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * @author bjorncs
+ */
+class EntityBindingsMapperTest {
+
+ @Test
+ public void persists_unknown_json_members() throws IOException {
+ var originalJson =
+ """
+ {
+ "signature": "sig",
+ "signing-key-version": 0,
+ "provider-unique-id": "0.cluster.instance.app.tenant.us-west-1.test.node",
+ "provider-service": "domain.service",
+ "document-version": 2,
+ "configserver-hostname": "cfg",
+ "instance-hostname": "host",
+ "created-at": 12345.0,
+ "ip-addresses": [],
+ "identity-type": "node",
+ "cluster-type": "admin",
+ "unknown-string": "string-value",
+ "unknown-object": { "member-in-unknown-object": 123 }
+ }
+ """;
+ var entity = EntityBindingsMapper.fromString(originalJson);
+ assertEquals(2, entity.unknownAttributes().size(), entity.unknownAttributes().toString());
+ var json = EntityBindingsMapper.toAttestationData(entity);
+
+ var expectedMemberInJson = "member-in-unknown-object";
+ assertTrue(json.contains(expectedMemberInJson),
+ () -> "Expected JSON to contain '%s', but got \n'%s'".formatted(expectedMemberInJson, json));
+ assertEquals(EntityBindingsMapper.mapper.readTree(originalJson), EntityBindingsMapper.mapper.readTree(json));
+ }
+
+} \ No newline at end of file
diff --git a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperServer.java b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperServer.java
index 68fcbcc4983..13b395a97f2 100644
--- a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperServer.java
+++ b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperServer.java
@@ -4,7 +4,9 @@ package com.yahoo.vespa.zookeeper;
import java.nio.file.Path;
/**
- * Interface for a component that starts/stops a ZooKeeper server.
+ * Interface for a component that starts/stops a ZooKeeper server. Implementations should make sure
+ * that the server is up and accepts connection (typically by returning from constructor only after
+ * writing a node to ZooKeeper successfully).
*
* @author hmusum
*/
diff --git a/zookeeper-server/zookeeper-server/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java b/zookeeper-server/zookeeper-server/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java
index 8161e4bba4b..5bf7c9bd3e9 100644
--- a/zookeeper-server/zookeeper-server/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java
+++ b/zookeeper-server/zookeeper-server/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java
@@ -12,6 +12,8 @@ import java.time.Duration;
* Starts or reconfigures zookeeper cluster.
* The QuorumPeer conditionally created here is owned by the Reconfigurer;
* when it already has a peer, that peer is used here in case start or shutdown is required.
+ * Guarantees that server is up by writing a node to ZooKeeper successfully before
+ * returning from constructor.
*
* @author hmusum
*/
diff --git a/zookeeper-server/zookeeper-server/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperServerImpl.java b/zookeeper-server/zookeeper-server/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperServerImpl.java
index 8adabeedb1b..bb2b02c5c9b 100644
--- a/zookeeper-server/zookeeper-server/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperServerImpl.java
+++ b/zookeeper-server/zookeeper-server/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperServerImpl.java
@@ -9,6 +9,9 @@ import java.nio.file.Path;
import java.time.Duration;
/**
+ * ZooKeeper server. Guarantees that the server is up by writing a node to ZooKeeper successfully before
+ * returning from constructor.
+ *
* @author Ulf Lilleengen
* @author Harald Musum
*/