diff options
45 files changed, 1306 insertions, 492 deletions
diff --git a/clustercontroller-apps/src/main/java/com/yahoo/vespa/clustercontroller/apps/clustercontroller/ClusterController.java b/clustercontroller-apps/src/main/java/com/yahoo/vespa/clustercontroller/apps/clustercontroller/ClusterController.java index 1ac870c45de..50859b837b1 100644 --- a/clustercontroller-apps/src/main/java/com/yahoo/vespa/clustercontroller/apps/clustercontroller/ClusterController.java +++ b/clustercontroller-apps/src/main/java/com/yahoo/vespa/clustercontroller/apps/clustercontroller/ClusterController.java @@ -1,8 +1,8 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.clustercontroller.apps.clustercontroller; -import com.yahoo.component.annotation.Inject; import com.yahoo.component.AbstractComponent; +import com.yahoo.component.annotation.Inject; import com.yahoo.jdisc.Metric; import com.yahoo.vespa.clustercontroller.apputil.communication.http.JDiscMetricWrapper; import com.yahoo.vespa.clustercontroller.core.FleetController; @@ -10,13 +10,10 @@ import com.yahoo.vespa.clustercontroller.core.FleetControllerOptions; import com.yahoo.vespa.clustercontroller.core.RemoteClusterControllerTaskScheduler; import com.yahoo.vespa.clustercontroller.core.restapiv2.ClusterControllerStateRestAPI; import com.yahoo.vespa.clustercontroller.core.status.StatusHandler; -import com.yahoo.vespa.curator.Curator; import com.yahoo.vespa.zookeeper.VespaZooKeeperServer; - import java.util.LinkedHashMap; import java.util.Map; import java.util.TreeMap; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Logger; @@ -49,7 +46,6 @@ public class ClusterController extends AbstractComponent public void setOptions(FleetControllerOptions options, Metric metricImpl) throws Exception { referents.incrementAndGet(); metricWrapper.updateMetricImplementation(metricImpl); - verifyThatZooKeeperWorks(options); synchronized (controllers) { FleetController controller = controllers.get(options.clusterName()); if (controller == null) { @@ -99,8 +95,6 @@ public class ClusterController extends AbstractComponent } } - FleetController getController(String name) { return controllers.get(name); } - @Override public StatusHandler.ContainerStatusPageServer get(String cluster) { return status.get(cluster); @@ -115,16 +109,4 @@ public class ClusterController extends AbstractComponent controller.shutdown(); } - /** - * Block until we are connected to zookeeper server - */ - private void verifyThatZooKeeperWorks(FleetControllerOptions options) throws Exception { - if (options.zooKeeperServerAddress() != null && !"".equals(options.zooKeeperServerAddress())) { - try (Curator curator = Curator.create(options.zooKeeperServerAddress())) { - if ( ! curator.framework().blockUntilConnected(600, TimeUnit.SECONDS)) - com.yahoo.protect.Process.logAndDie("Failed to connect to ZK, dying and restarting container"); - } - } - } - } diff --git a/config-model-api/src/main/java/com/yahoo/config/application/api/Bcp.java b/config-model-api/src/main/java/com/yahoo/config/application/api/Bcp.java new file mode 100644 index 00000000000..af369dc2672 --- /dev/null +++ b/config-model-api/src/main/java/com/yahoo/config/application/api/Bcp.java @@ -0,0 +1,122 @@ +package com.yahoo.config.application.api; + +import com.yahoo.config.provision.RegionName; + +import java.time.Duration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Defines the BCP structure for an instance in a deployment spec: + * A list of region groups where each group contains a set of regions + * which will handle the traffic of a member in the group when it becomes unreachable. + * + * This is used to make bcp-aware autoscaling decisions. If no explicit BCP spec + * is provided, it is assumed that a regions traffic will be divided equally over all + * the other regions when it becomes unreachable - i.e a single BCP group is implicitly + * defined having all defined production regions as members with fraction 1.0. + * + * It is assumed that the traffic of the unreachable region is distributed + * evenly to the other members of the group. + * + * A region can be a fractional member of a group, in which case it is assumed that + * region will only handle that fraction of its share of the unreachable regions traffic, + * and symmetrically that the other members of the group will only handle that fraction + * of the fraction regions traffic if it becomes unreachable. + * + * Each production region defined in the instance must have fractional memberships in groups that sums to exactly one. + * + * If a group has one member it will not set aside any capacity for BCP. + * If a group has more than two members, the system will attempt to provision capacity + * for BCP also when a region is unreachable. That is, if there are three member regions, A, B and C, + * each handling 100 qps, then they each aim to handle 150 in case one goes down. If C goes down, + * A and B will now handle 150 each, but will each aim to handle 300 each in case the other goes down. + * + * @author bratseth + */ +public class Bcp { + + private static final Bcp empty = new Bcp(List.of()); + + private final List<Group> groups; + + public Bcp(List<Group> groups) { + totalMembershipSumsToOne(groups); + this.groups = List.copyOf(groups); + } + + public List<Group> groups() { return groups; } + + /** Returns the set of regions declared in the groups of this. */ + public Set<RegionName> regions() { + return groups.stream().flatMap(group -> group.members().stream()).map(member -> member.region()).collect(Collectors.toSet()); + } + + public boolean isEmpty() { return groups.isEmpty(); } + + /** Returns this bcp spec, or if it is empty, the given bcp spec. */ + public Bcp orElse(Bcp other) { + return this.isEmpty() ? other : this; + } + + private void totalMembershipSumsToOne(List<Group> groups) { + Map<RegionName, Double> totalMembership = new HashMap<>(); + for (var group : groups) { + for (var member : group.members()) + totalMembership.compute(member.region(), (__, fraction) -> fraction == null ? member.fraction() + : fraction + member.fraction()); + } + for (var entry : totalMembership.entrySet()) { + if (entry.getValue() != 1.0) + throw new IllegalArgumentException("Illegal BCP spec: All regions must have total membership fractions summing to 1.0, but " + + entry.getKey() + " sums to " + entry.getValue()); + } + } + + public static Bcp empty() { return empty; } + + @Override + public String toString() { + if (isEmpty()) return "empty BCP"; + return "BCP of " + groups; + } + + public static class Group { + + private final Duration deadline; + private final List<RegionMember> members; + + public Group(List<RegionMember> members, Duration deadline) { + this.members = List.copyOf(members); + this.deadline = deadline; + } + + public List<RegionMember> members() { return members; } + + /** + * Returns the max time until the other regions must be able to handle the additional traffic + * when a region becomes unreachable, which by default is Duration.ZERO. + */ + public Duration deadline() { return deadline; } + + @Override + public String toString() { + return "BCP group of " + members; + } + + } + + public record RegionMember(RegionName region, double fraction) { + + public RegionMember { + if (fraction < 0 || fraction > 1) + throw new IllegalArgumentException("Fraction must be a number between 0.0 and 1.0, but got " + fraction); + } + + + } + +} diff --git a/config-model-api/src/main/java/com/yahoo/config/application/api/DeploymentInstanceSpec.java b/config-model-api/src/main/java/com/yahoo/config/application/api/DeploymentInstanceSpec.java index b36c1409459..4b30734365d 100644 --- a/config-model-api/src/main/java/com/yahoo/config/application/api/DeploymentInstanceSpec.java +++ b/config-model-api/src/main/java/com/yahoo/config/application/api/DeploymentInstanceSpec.java @@ -61,6 +61,7 @@ public class DeploymentInstanceSpec extends DeploymentSpec.Steps { private final Notifications notifications; private final List<Endpoint> endpoints; private final Map<ClusterSpec.Id, Map<ZoneId, ZoneEndpoint>> zoneEndpoints; + private final Bcp bcp; public DeploymentInstanceSpec(InstanceName name, Tags tags, @@ -77,6 +78,7 @@ public class DeploymentInstanceSpec extends DeploymentSpec.Steps { Notifications notifications, List<Endpoint> endpoints, Map<ClusterSpec.Id, Map<ZoneId, ZoneEndpoint>> zoneEndpoints, + Bcp bcp, Instant now) { super(steps); this.name = Objects.requireNonNull(name); @@ -101,8 +103,9 @@ public class DeploymentInstanceSpec extends DeploymentSpec.Steps { Map<ClusterSpec.Id, Map<ZoneId, ZoneEndpoint>> zoneEndpointsCopy = new HashMap<>(); for (var entry : zoneEndpoints.entrySet()) zoneEndpointsCopy.put(entry.getKey(), Collections.unmodifiableMap(new HashMap<>(entry.getValue()))); this.zoneEndpoints = Collections.unmodifiableMap(zoneEndpointsCopy); + this.bcp = Objects.requireNonNull(bcp); validateZones(new HashSet<>(), new HashSet<>(), this); - validateEndpoints(steps(), globalServiceId, this.endpoints); + validateEndpoints(globalServiceId, this.endpoints); validateChangeBlockers(changeBlockers, now); } @@ -144,25 +147,41 @@ public class DeploymentInstanceSpec extends DeploymentSpec.Steps { } /** Throw an IllegalArgumentException if an endpoint refers to a region that is not declared in 'prod' */ - private void validateEndpoints(List<DeploymentSpec.Step> steps, Optional<String> globalServiceId, List<Endpoint> endpoints) { + private void validateEndpoints(Optional<String> globalServiceId, List<Endpoint> endpoints) { if (globalServiceId.isPresent() && ! endpoints.isEmpty()) { throw new IllegalArgumentException("Providing both 'endpoints' and 'global-service-id'. Use only 'endpoints'."); } - var stepZones = steps.stream() - .flatMap(s -> s.zones().stream()) - .flatMap(z -> z.region().stream()) - .collect(Collectors.toSet()); - + var regions = prodRegions(); for (var endpoint : endpoints){ for (var endpointRegion : endpoint.regions()) { - if (! stepZones.contains(endpointRegion)) { + if (! regions.contains(endpointRegion)) { throw new IllegalArgumentException("Region used in endpoint that is not declared in 'prod': " + endpointRegion); } } } } + /** Validates the given BCP instance (which is owned by this, or if none, a default) against this instance. */ + void validateBcp(Bcp bcp) { + if (bcp.isEmpty()) return; + if ( ! prodRegions().equals(bcp.regions())) + throw new IllegalArgumentException("BCP and deployment mismatch in " + this + ": " + + "A <bcp> element must place all deployed production regions in " + + "at least one group, and declare no extra regions. " + + "Deployed regions: " + prodRegions() + + ". BCP regions: " + bcp.regions()); +} + /** Returns the production regions the steps of this specifies a deployment to. */ + private Set<RegionName> prodRegions() { + return steps().stream() + .flatMap(s -> s.zones().stream()) + .filter(zone -> zone.environment().isProduction()) + .flatMap(z -> z.region().stream()) + .collect(Collectors.toSet()); + } + + private void validateChangeBlockers(List<DeploymentSpec.ChangeBlocker> changeBlockers, Instant now) { // Find all possible dates an upgrade block window can start Stream<Instant> blockingFrom = changeBlockers.stream() @@ -256,6 +275,9 @@ public class DeploymentInstanceSpec extends DeploymentSpec.Steps { /** Returns the rotations configuration of these instances */ public List<Endpoint> endpoints() { return endpoints; } + /** Returns the BCP spec declared in this specified instance, or BcpSpec.empty() if none. */ + public Bcp bcp() { return bcp; } + /** Returns whether this instance deploys to the given zone, either implicitly or explicitly */ public boolean deploysTo(Environment environment, RegionName region) { return zones().stream().anyMatch(zone -> zone.concerns(environment, Optional.of(region))); diff --git a/config-model-api/src/main/java/com/yahoo/config/application/api/DeploymentSpec.java b/config-model-api/src/main/java/com/yahoo/config/application/api/DeploymentSpec.java index cbdb5bd6bcc..41644ebc87d 100644 --- a/config-model-api/src/main/java/com/yahoo/config/application/api/DeploymentSpec.java +++ b/config-model-api/src/main/java/com/yahoo/config/application/api/DeploymentSpec.java @@ -18,7 +18,6 @@ import java.time.Duration; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -47,6 +46,7 @@ public class DeploymentSpec { Optional.empty(), Optional.empty(), List.of(), + Bcp.empty(), "<deployment version='1.0'/>", List.of()); @@ -58,6 +58,7 @@ public class DeploymentSpec { private final Optional<AthenzService> athenzService; private final Optional<CloudAccount> cloudAccount; private final List<Endpoint> endpoints; + private final Bcp bcp; private final List<DeprecatedElement> deprecatedElements; private final String xmlForm; @@ -68,6 +69,7 @@ public class DeploymentSpec { Optional<AthenzService> athenzService, Optional<CloudAccount> cloudAccount, List<Endpoint> endpoints, + Bcp bcp, String xmlForm, List<DeprecatedElement> deprecatedElements) { this.steps = List.copyOf(Objects.requireNonNull(steps)); @@ -77,11 +79,13 @@ public class DeploymentSpec { this.cloudAccount = Objects.requireNonNull(cloudAccount); this.xmlForm = Objects.requireNonNull(xmlForm); this.endpoints = List.copyOf(Objects.requireNonNull(endpoints)); + this.bcp = Objects.requireNonNull(bcp); this.deprecatedElements = List.copyOf(Objects.requireNonNull(deprecatedElements)); validateTotalDelay(steps); validateUpgradePoliciesOfIncreasingConservativeness(steps); validateAthenz(); validateApplicationEndpoints(); + validateBcp(); } /** Throw an IllegalArgumentException if the total delay exceeds 24 hours */ @@ -158,13 +162,16 @@ public class DeploymentSpec { } } + private void validateBcp() { + for (var instance : instances()) + instance.validateBcp(instance.bcp().orElse(bcp())); + } + /** Returns the major version this application is pinned to, or empty (default) to allow all major versions */ public Optional<Integer> majorVersion() { return majorVersion; } /** Returns the deployment steps of this in the order they will be performed */ - public List<Step> steps() { - return steps; - } + public List<Step> steps() { return steps; } /** Returns the Athenz domain set on the root tag, if any */ public Optional<AthenzDomain> athenzDomain() { return athenzDomain; } @@ -203,6 +210,9 @@ public class DeploymentSpec { .orElse(ZoneEndpoint.defaultEndpoint); } + /** Returns the default BCP spec for instances, or Bcp.empty() if none are defined. */ + public Bcp bcp() { return bcp; } + /** Returns the XML form of this spec, or null if it was not created by fromXml, nor is empty */ public String xmlForm() { return xmlForm; } diff --git a/config-model-api/src/main/java/com/yahoo/config/application/api/xml/DeploymentSpecXmlReader.java b/config-model-api/src/main/java/com/yahoo/config/application/api/xml/DeploymentSpecXmlReader.java index fb6d834f783..be6be5566a8 100644 --- a/config-model-api/src/main/java/com/yahoo/config/application/api/xml/DeploymentSpecXmlReader.java +++ b/config-model-api/src/main/java/com/yahoo/config/application/api/xml/DeploymentSpecXmlReader.java @@ -1,6 +1,7 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.config.application.api.xml; +import com.yahoo.config.application.api.Bcp; import com.yahoo.config.application.api.DeploymentInstanceSpec; import com.yahoo.config.application.api.DeploymentSpec; import com.yahoo.config.application.api.DeploymentSpec.DeclaredTest; @@ -46,7 +47,6 @@ import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.LinkedHashSet; @@ -54,6 +54,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.OptionalDouble; import java.util.Set; import java.util.function.Function; import java.util.stream.Stream; @@ -161,6 +162,7 @@ public class DeploymentSpecXmlReader { stringAttribute(athenzServiceAttribute, root).map(AthenzService::from), stringAttribute(cloudAccountAttribute, root).map(CloudAccount::from), applicationEndpoints, + readBcp(root), xmlForm, deprecatedElements); } @@ -228,6 +230,7 @@ public class DeploymentSpecXmlReader { notifications, endpoints, zoneEndpoints, + readBcp(instanceElement), now)) .toList(); } @@ -455,6 +458,24 @@ public class DeploymentSpecXmlReader { validateAndConsolidate(endpointsByZone, zoneEndpoints); } + static Bcp readBcp(Element element) { + Element bcpElement = XML.getChild(element, "bcp"); + if (bcpElement == null) return Bcp.empty(); + + List<Bcp.Group> groups = new ArrayList<>(); + for (Element groupElement : XML.getChildren(bcpElement, "group")) { + List<Bcp.RegionMember> regions = new ArrayList<>(); + for (Element regionElement : XML.getChildren(groupElement, "region")) { + RegionName region = RegionName.from(XML.getValue(regionElement)); + double fraction = toDouble(XML.attribute("fraction", regionElement).orElse(null), "fraction").orElse(1.0); + regions.add(new Bcp.RegionMember(region, fraction)); + } + Duration deadline = XML.attribute("deadline", groupElement).map(value -> toDuration(value, "deadline")).orElse(Duration.ZERO); + groups.add(new Bcp.Group(regions, deadline)); + } + return new Bcp(groups); + } + static void validateAndConsolidate(Map<String, Map<RegionName, List<ZoneEndpoint>>> in, Map<ClusterSpec.Id, Map<ZoneId, ZoneEndpoint>> out) { in.forEach((cluster, regions) -> { List<ZoneEndpoint> wildcards = regions.remove(null); @@ -713,6 +734,39 @@ public class DeploymentSpecXmlReader { .findFirst(); } + /** + * Returns a string consisting of a number followed by "m" or "M" to a duration of that number of minutes, + * or zero duration if null of blank. + */ + private static Duration toDuration(String minutesSpec, String sourceDescription) { + try { + if (minutesSpec == null || minutesSpec.isBlank()) return Duration.ZERO; + minutesSpec = minutesSpec.trim().toLowerCase(); + if ( ! minutesSpec.endsWith("m")) + throw new IllegalArgumentException("Must end by 'm'"); + try { + return Duration.ofMinutes(Integer.parseInt(minutesSpec.substring(0, minutesSpec.length() - 1))); + } + catch (NumberFormatException e) { + throw new IllegalArgumentException("Must be an integer number of minutes followed by 'm'"); + } + } + catch (IllegalArgumentException e) { + throw new IllegalArgumentException("Illegal " + sourceDescription + " '" + minutesSpec + "'", e); + } + } + + private static OptionalDouble toDouble(String value, String sourceDescription) { + try { + if (value == null || value.isBlank()) return OptionalDouble.empty(); + return OptionalDouble.of(Double.parseDouble(value)); + } + catch (NumberFormatException e) { + throw new IllegalArgumentException("Illegal " + sourceDescription + " '" + value + "': " + + "Must be a number between 0.0 and 1.0"); + } + } + private static void illegal(String message) { throw new IllegalArgumentException(message); } diff --git a/config-model-api/src/test/java/com/yahoo/config/application/api/DeploymentSpecTest.java b/config-model-api/src/test/java/com/yahoo/config/application/api/DeploymentSpecTest.java index 2e746ff55c8..afa7e3e502b 100644 --- a/config-model-api/src/test/java/com/yahoo/config/application/api/DeploymentSpecTest.java +++ b/config-model-api/src/test/java/com/yahoo/config/application/api/DeploymentSpecTest.java @@ -106,16 +106,16 @@ public class DeploymentSpecTest { @Test public void minimalProductionSpec() { - StringReader r = new StringReader( - "<deployment version='1.0'>" + - " <instance id='default'>" + - " <prod>" + - " <region active='false'>us-east1</region>" + - " <region active='true'>us-west1</region>" + - " </prod>" + - " </instance>" + - "</deployment>" - ); + StringReader r = new StringReader( """ + <deployment version='1.0'> + <instance id='default'> + <prod> + <region active='false'>us-east1</region> + <region active='true'>us-west1</region> + </prod> + </instance> + </deployment> + """); DeploymentSpec spec = DeploymentSpec.fromXml(r); assertEquals(1, spec.steps().size()); diff --git a/config-model-api/src/test/java/com/yahoo/config/application/api/DeploymentSpecWithBcpTest.java b/config-model-api/src/test/java/com/yahoo/config/application/api/DeploymentSpecWithBcpTest.java new file mode 100644 index 00000000000..77aadc88be8 --- /dev/null +++ b/config-model-api/src/test/java/com/yahoo/config/application/api/DeploymentSpecWithBcpTest.java @@ -0,0 +1,254 @@ +package com.yahoo.config.application.api; + +import com.yahoo.config.provision.RegionName; +import com.yahoo.yolean.Exceptions; +import org.junit.Test; + +import java.io.StringReader; +import java.time.Duration; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +/** + * @author bratseth + */ +public class DeploymentSpecWithBcpTest { + + @Test + public void minimalProductionSpecWithExplicitBcp() { + StringReader r = new StringReader(""" + <deployment version='1.0'> + <instance id='default'> + <prod> + <region>us-east1</region> + <region>us-west1</region> + </prod> + </instance> + <bcp> + <group> + <region>us-east1</region> + <region>us-west1</region> + </group> + </bcp> + </deployment> + """); + assertTwoRegions(DeploymentSpec.fromXml(r)); + } + + @Test + public void specWithoutInstanceWithBcp() { + StringReader r = new StringReader(""" + <deployment version='1.0'> + <prod> + <region>us-east1</region> + <region>us-west1</region> + </prod> + <bcp> + <group> + <region>us-east1</region> + <region>us-west1</region> + </group> + </bcp> + </deployment> + """); + assertTwoRegions(DeploymentSpec.fromXml(r)); + } + + @Test + public void complexBcpSetup() { + StringReader r = new StringReader(""" + <deployment version='1.0'> + <instance id='beta'> + <prod> + <region>us-east1</region> + <region>us-east2</region> + </prod> + <bcp> + <group deadline="60m"> + <region>us-east1</region> + <region>us-east2</region> + </group> + </bcp> + </instance> + <instance id='main'> + <prod> + <region>us-east1</region> + <region>us-east2</region> + <region>us-central1</region> + <region>us-west1</region> + <region>us-west2</region> + <region>eu-east1</region> + <region>eu-west1</region> + </prod> + </instance> + <bcp> + <group> + <region>us-east1</region> + <region>us-east2</region> + <region fraction="0.3">us-central1</region> + </group> + <group> + <region>us-west1</region> + <region>us-west2</region> + <region fraction="0.7">us-central1</region> + </group> + <group deadline="30m"> + <region>eu-east1</region> + <region>eu-west1</region> + </group> + </bcp> + </deployment> + """); + var spec = DeploymentSpec.fromXml(r); + var betaBcp = spec.requireInstance("beta").bcp().orElse(spec.bcp()); + assertEquals(1, betaBcp.groups().size()); + var betaGroup = betaBcp.groups().get(0); + assertEquals(2, betaGroup.members().size()); + assertEquals(Duration.ofMinutes(60), betaGroup.deadline()); + assertEquals(new Bcp.RegionMember(RegionName.from("us-east1"), 1.0), betaGroup.members().get(0)); + assertEquals(new Bcp.RegionMember(RegionName.from("us-east2"), 1.0), betaGroup.members().get(1)); + + var mainBcp = spec.requireInstance("main").bcp().orElse(spec.bcp()); + assertEquals(7, mainBcp.regions().size()); + assertEquals(3, mainBcp.groups().size()); + + var usEast = mainBcp.groups().get(0); + assertEquals(3, usEast.members().size()); + assertEquals(Duration.ofMinutes(0), usEast.deadline()); + assertEquals(new Bcp.RegionMember(RegionName.from("us-east1"), 1.0), usEast.members().get(0)); + assertEquals(new Bcp.RegionMember(RegionName.from("us-east2"), 1.0), usEast.members().get(1)); + assertEquals(new Bcp.RegionMember(RegionName.from("us-central1"), 0.3), usEast.members().get(2)); + + var usWest = mainBcp.groups().get(1); + assertEquals(3, usWest.members().size()); + assertEquals(Duration.ofMinutes(0), usWest.deadline()); + assertEquals(new Bcp.RegionMember(RegionName.from("us-west1"), 1.0), usWest.members().get(0)); + assertEquals(new Bcp.RegionMember(RegionName.from("us-west2"), 1.0), usWest.members().get(1)); + assertEquals(new Bcp.RegionMember(RegionName.from("us-central1"), 0.7), usWest.members().get(2)); + + var eu = mainBcp.groups().get(2); + assertEquals(2, eu.members().size()); + assertEquals(Duration.ofMinutes(30), eu.deadline()); + assertEquals(new Bcp.RegionMember(RegionName.from("eu-east1"), 1.0), eu.members().get(0)); + assertEquals(new Bcp.RegionMember(RegionName.from("eu-west1"), 1.0), eu.members().get(1)); + } + + @Test + public void regionMembershipMatchValidation1() { + try { + StringReader r = new StringReader(""" + <deployment version='1.0'> + <prod> + <region>us-east1</region> + <region>us-west1</region> + </prod> + <bcp> + <group> + <region>us-west1</region> + </group> + </bcp> + </deployment> + """); + DeploymentSpec.fromXml(r); + fail(); + } + catch (IllegalArgumentException e) { + assertEquals("BCP and deployment mismatch in instance 'default': " + + "A <bcp> element must place all deployed production regions in at least one group, " + + "and declare no extra regions. " + + "Deployed regions: [us-east1, us-west1]. BCP regions: [us-west1]", + Exceptions.toMessageString(e)); + } + } + + @Test + public void regionMembershipMatchValidation2() { + try { + StringReader r = new StringReader(""" + <deployment version='1.0'> + <prod> + <region>us-west1</region> + </prod> + <bcp> + <group> + <region>us-east1</region> + <region>us-west1</region> + </group> + </bcp> + </deployment> + """); + DeploymentSpec.fromXml(r); + fail(); + } + catch (IllegalArgumentException e) { + assertEquals("BCP and deployment mismatch in instance 'default': " + + "A <bcp> element must place all deployed production regions in at least one group, " + + "and declare no extra regions. " + + "Deployed regions: [us-west1]. BCP regions: [us-east1, us-west1]", + Exceptions.toMessageString(e)); + } + } + + @Test + public void deadlineValidation() { + try { + StringReader r = new StringReader(""" + <deployment version='1.0'> + <prod> + <region>us-east1</region> + <region>us-west1</region> + </prod> + <bcp> + <group deadline="fast"> + <region>us-east1</region> + <region>us-west1</region> + </group> + </bcp> + </deployment> + """); + DeploymentSpec.fromXml(r); + fail(); + } + catch (IllegalArgumentException e) { + assertEquals("Illegal deadline 'fast': Must end by 'm'", Exceptions.toMessageString(e)); + } + } + + @Test + public void fractionalMembershipValidation() { + try { + StringReader r = new StringReader(""" + <deployment version='1.0'> + <prod> + <region>us-east1</region> + <region>us-west1</region> + </prod> + <bcp> + <group> + <region fraction="0.9">us-east1</region> + <region>us-west1</region> + </group> + </bcp> + </deployment> + """); + DeploymentSpec.fromXml(r); + fail(); + } + catch (IllegalArgumentException e) { + assertEquals("Illegal BCP spec: All regions must have total membership fractions summing to 1.0, but us-east1 sums to 0.9", + Exceptions.toMessageString(e)); + } + } + + private void assertTwoRegions(DeploymentSpec spec) { + var bcp = spec.requireInstance("default").bcp().orElse(spec.bcp()); + assertEquals(1, bcp.groups().size()); + var group = bcp.groups().get(0); + assertEquals(2, group.members().size()); + assertEquals(Duration.ZERO, group.deadline()); + assertEquals(new Bcp.RegionMember(RegionName.from("us-east1"), 1.0), group.members().get(0)); + assertEquals(new Bcp.RegionMember(RegionName.from("us-west1"), 1.0), group.members().get(1)); + } + +} diff --git a/config-model/src/main/java/com/yahoo/vespa/model/admin/monitoring/AutoscalingMetrics.java b/config-model/src/main/java/com/yahoo/vespa/model/admin/monitoring/AutoscalingMetrics.java index d09a4a303c2..a3fdce98c73 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/admin/monitoring/AutoscalingMetrics.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/admin/monitoring/AutoscalingMetrics.java @@ -2,6 +2,7 @@ package com.yahoo.vespa.model.admin.monitoring; import com.yahoo.metrics.ContainerMetrics; import com.yahoo.metrics.SearchNodeMetrics; +import com.yahoo.metrics.StorageMetrics; import java.util.ArrayList; import java.util.LinkedHashSet; import java.util.List; @@ -39,10 +40,10 @@ public class AutoscalingMetrics { metrics.add(SearchNodeMetrics.CONTENT_PROTON_DOCUMENTDB_MATCHING_QUERIES.rate()); // content // Write rate - metrics.add("feed.http-requests.rate"); // container - metrics.add("vds.filestor.allthreads.put.count.rate"); // content - metrics.add("vds.filestor.allthreads.remove.count.rate"); // content - metrics.add("vds.filestor.allthreads.update.count.rate"); // content + metrics.add(ContainerMetrics.FEED_HTTP_REQUESTS.rate()); // container + metrics.add(StorageMetrics.VDS_FILESTOR_ALLTHREADS_PUT_COUNT.rate()); // content + metrics.add(StorageMetrics.VDS_FILESTOR_ALLTHREADS_REMOVE_COUNT.rate()); // content + metrics.add(StorageMetrics.VDS_FILESTOR_ALLTHREADS_UPDATE_COUNT.rate()); // content return new MetricSet("autoscaling", toMetrics(metrics)); } diff --git a/config-model/src/main/java/com/yahoo/vespa/model/admin/monitoring/VespaMetricSet.java b/config-model/src/main/java/com/yahoo/vespa/model/admin/monitoring/VespaMetricSet.java index 850b25472d2..f3ad181887d 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/admin/monitoring/VespaMetricSet.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/admin/monitoring/VespaMetricSet.java @@ -540,68 +540,69 @@ public class VespaMetricSet { private static Set<Metric> getStorageMetrics() { Set<Metric> metrics = new LinkedHashSet<>(); - // TODO: For the purpose of this file and likely elsewhere, all but the last aggregate specifier, - // TODO: such as 'average' and 'sum' in the metric names below are just confusing and can be mentally - // TODO: disregarded when considering metric names. Consider cleaning up for Vespa 9. + // TODO - Vespa 9: For the purpose of this file and likely elsewhere, all but the last aggregate specifier, + // TODO - Vespa 9: such as 'average' and 'sum' in the metric names below are just confusing and can be mentally + // TODO - Vespa 9: disregarded when considering metric names. Clean up for Vespa 9. addMetric(metrics, StorageMetrics.VDS_DATASTORED_ALLDISKS_BUCKETS.average()); addMetric(metrics, StorageMetrics.VDS_DATASTORED_ALLDISKS_DOCS.average()); - addMetric(metrics, "vds.datastored.alldisks.bytes.average"); - addMetric(metrics, "vds.visitor.allthreads.averagevisitorlifetime", List.of("max", "sum", "count")); - addMetric(metrics, "vds.visitor.allthreads.averagequeuewait", List.of("max", "sum", "count")); - addMetric(metrics, "vds.visitor.allthreads.queuesize", List.of("max", "sum", "count")); - addMetric(metrics, "vds.visitor.allthreads.completed.rate"); - addMetric(metrics, "vds.visitor.allthreads.created.rate"); - addMetric(metrics, "vds.visitor.allthreads.failed.rate"); - addMetric(metrics, "vds.visitor.allthreads.averagemessagesendtime", List.of("max", "sum", "count")); - addMetric(metrics, "vds.visitor.allthreads.averageprocessingtime", List.of("max", "sum", "count")); - - addMetric(metrics, "vds.filestor.queuesize", List.of("max", "sum", "count")); - addMetric(metrics, "vds.filestor.averagequeuewait", List.of("max", "sum", "count")); - addMetric(metrics, "vds.filestor.active_operations.size", List.of("max", "sum", "count")); - addMetric(metrics, "vds.filestor.active_operations.latency", List.of("max", "sum", "count")); - addMetric(metrics, "vds.filestor.throttle_window_size", List.of("max", "sum", "count")); - addMetric(metrics, "vds.filestor.throttle_waiting_threads", List.of("max", "sum", "count")); - addMetric(metrics, "vds.filestor.throttle_active_tokens", List.of("max", "sum", "count")); - addMetric(metrics, "vds.filestor.allthreads.mergemetadatareadlatency", List.of("max", "sum", "count")); - addMetric(metrics, "vds.filestor.allthreads.mergedatareadlatency", List.of("max", "sum", "count")); - addMetric(metrics, "vds.filestor.allthreads.mergedatawritelatency", List.of("max", "sum", "count")); - addMetric(metrics, "vds.filestor.allthreads.put_latency", List.of("max", "sum", "count")); - addMetric(metrics, "vds.filestor.allthreads.remove_latency", List.of("max", "sum", "count")); - addMetric(metrics, "vds.filestor.allstripes.throttled_rpc_direct_dispatches.rate"); - addMetric(metrics, "vds.filestor.allstripes.throttled_persistence_thread_polls.rate"); - addMetric(metrics, "vds.filestor.allstripes.timeouts_waiting_for_throttle_token.rate"); - - addMetric(metrics, "vds.filestor.allthreads.put.count.rate"); - addMetric(metrics, "vds.filestor.allthreads.put.failed.rate"); - addMetric(metrics, "vds.filestor.allthreads.put.test_and_set_failed.rate"); - addMetric(metrics, "vds.filestor.allthreads.put.latency", List.of("max", "sum", "count")); - addMetric(metrics, "vds.filestor.allthreads.put.request_size", List.of("max", "sum", "count")); - addMetric(metrics, "vds.filestor.allthreads.remove.count.rate"); - addMetric(metrics, "vds.filestor.allthreads.remove.failed.rate"); - addMetric(metrics, "vds.filestor.allthreads.remove.test_and_set_failed.rate"); - addMetric(metrics, "vds.filestor.allthreads.remove.latency", List.of("max", "sum", "count")); - addMetric(metrics, "vds.filestor.allthreads.remove.request_size", List.of("max", "sum", "count")); - addMetric(metrics, "vds.filestor.allthreads.get.count.rate"); - addMetric(metrics, "vds.filestor.allthreads.get.failed.rate"); - addMetric(metrics, "vds.filestor.allthreads.get.latency", List.of("max", "sum", "count")); - addMetric(metrics, "vds.filestor.allthreads.get.request_size", List.of("max", "sum", "count")); - addMetric(metrics, "vds.filestor.allthreads.update.count.rate"); - addMetric(metrics, "vds.filestor.allthreads.update.failed.rate"); - addMetric(metrics, "vds.filestor.allthreads.update.test_and_set_failed.rate"); - addMetric(metrics, "vds.filestor.allthreads.update.latency", List.of("max", "sum", "count")); - addMetric(metrics, "vds.filestor.allthreads.update.request_size", List.of("max", "sum", "count")); - addMetric(metrics, "vds.filestor.allthreads.createiterator.latency", List.of("max", "sum", "count")); - addMetric(metrics, "vds.filestor.allthreads.createiterator.count.rate"); - addMetric(metrics, "vds.filestor.allthreads.visit.count.rate"); - addMetric(metrics, "vds.filestor.allthreads.visit.latency", List.of("max", "sum", "count")); - addMetric(metrics, "vds.filestor.allthreads.remove_location.count.rate"); - addMetric(metrics, "vds.filestor.allthreads.remove_location.latency", List.of("max", "sum", "count")); - addMetric(metrics, "vds.filestor.allthreads.splitbuckets.count.rate"); - addMetric(metrics, "vds.filestor.allthreads.joinbuckets.count.rate"); - addMetric(metrics, "vds.filestor.allthreads.deletebuckets.count.rate"); - addMetric(metrics, "vds.filestor.allthreads.deletebuckets.failed.rate"); - addMetric(metrics, "vds.filestor.allthreads.deletebuckets.latency", List.of("max", "sum", "count")); - addMetric(metrics, "vds.filestor.allthreads.setbucketstates.count.rate"); + addMetric(metrics, StorageMetrics.VDS_DATASTORED_ALLDISKS_BYTES.average()); + addMetric(metrics, StorageMetrics.VDS_VISITOR_ALLTHREADS_AVERAGEVISITORLIFETIME, EnumSet.of(max, sum, count)); + addMetric(metrics, StorageMetrics.VDS_VISITOR_ALLTHREADS_AVERAGEQUEUEWAIT, EnumSet.of(max, sum, count)); + addMetric(metrics, StorageMetrics.VDS_VISITOR_ALLTHREADS_QUEUESIZE, EnumSet.of(max, sum, count)); + addMetric(metrics, StorageMetrics.VDS_VISITOR_ALLTHREADS_COMPLETED.rate()); + addMetric(metrics, StorageMetrics.VDS_VISITOR_ALLTHREADS_CREATED.rate()); + addMetric(metrics, StorageMetrics.VDS_VISITOR_ALLTHREADS_FAILED.rate()); + addMetric(metrics, StorageMetrics.VDS_VISITOR_ALLTHREADS_AVERAGEMESSAGESENDTIME, EnumSet.of(max, sum, count)); + addMetric(metrics, StorageMetrics.VDS_VISITOR_ALLTHREADS_AVERAGEPROCESSINGTIME, EnumSet.of(max, sum, count)); + + addMetric(metrics, StorageMetrics.VDS_FILESTOR_QUEUESIZE, EnumSet.of(max, sum, count)); + addMetric(metrics, StorageMetrics.VDS_FILESTOR_AVERAGEQUEUEWAIT, EnumSet.of(max, sum, count)); + addMetric(metrics, StorageMetrics.VDS_FILESTOR_ACTIVE_OPERATIONS_SIZE, EnumSet.of(max, sum, count)); + addMetric(metrics, StorageMetrics.VDS_FILESTOR_ACTIVE_OPERATIONS_LATENCY, EnumSet.of(max, sum, count)); + addMetric(metrics, StorageMetrics.VDS_FILESTOR_THROTTLE_WINDOW_SIZE, EnumSet.of(max, sum, count)); + addMetric(metrics, StorageMetrics.VDS_FILESTOR_THROTTLE_WAITING_THREADS, EnumSet.of(max, sum, count)); + addMetric(metrics, StorageMetrics.VDS_FILESTOR_THROTTLE_ACTIVE_TOKENS, EnumSet.of(max, sum, count)); + addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_MERGEMETADATAREADLATENCY, EnumSet.of(max, sum, count)); + addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_MERGEDATAREADLATENCY, EnumSet.of(max, sum, count)); + addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_MERGEDATAWRITELATENCY, EnumSet.of(max, sum, count)); + addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_MERGE_PUT_LATENCY, EnumSet.of(max, sum, count)); + addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_MERGE_REMOVE_LATENCY, EnumSet.of(max, sum, count)); + addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLSTRIPES_THROTTLED_RPC_DIRECT_DISPATCHES.rate()); + addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLSTRIPES_THROTTLED_PERSISTENCE_THREAD_POLLS.rate()); + addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLSTRIPES_TIMEOUTS_WAITING_FOR_THROTTLE_TOKEN.rate()); + + addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_PUT_COUNT.rate()); + addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_PUT_FAILED.rate()); + addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_PUT_TEST_AND_SET_FAILED.rate()); + addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_PUT_LATENCY, EnumSet.of(max, sum, count)); + addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_PUT_REQUEST_SIZE, EnumSet.of(max, sum, count)); + addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_REMOVE_COUNT.rate()); + addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_REMOVE_FAILED.rate()); + addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_REMOVE_TEST_AND_SET_FAILED.rate()); + addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_REMOVE_LATENCY, EnumSet.of(max, sum, count)); + addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_REMOVE_REQUEST_SIZE, EnumSet.of(max, sum, count)); + addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_GET_COUNT.rate()); + addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_GET_FAILED.rate()); + addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_GET_LATENCY, EnumSet.of(max, sum, count)); + addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_GET_REQUEST_SIZE, EnumSet.of(max, sum, count)); + addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_UPDATE_COUNT.rate()); + addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_UPDATE_FAILED.rate()); + addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_UPDATE_TEST_AND_SET_FAILED.rate()); + addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_UPDATE_LATENCY, EnumSet.of(max, sum, count)); + addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_UPDATE_REQUEST_SIZE, EnumSet.of(max, sum, count)); + addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_CREATEITERATOR_COUNT.rate()); + addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_CREATEITERATOR_LATENCY, EnumSet.of(max, sum, count)); + addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_VISIT_COUNT.rate()); + addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_VISIT_LATENCY, EnumSet.of(max, sum, count)); + addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_REMOVE_LOCATION_COUNT.rate()); + addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_REMOVE_LOCATION_LATENCY, EnumSet.of(max, sum, count)); + addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_SPLITBUCKETS_COUNT.rate()); + addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_JOINBUCKETS_COUNT.rate()); + addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_DELETEBUCKETS_COUNT.rate()); + addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_DELETEBUCKETS_FAILED.rate()); + addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_DELETEBUCKETS_LATENCY, EnumSet.of(max, sum, count)); + addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_SETBUCKETSTATES_COUNT.rate()); + return metrics; } private static Set<Metric> getDistributorMetrics() { @@ -713,6 +714,10 @@ public class VespaMetricSet { suffixes.forEach(suffix -> metrics.add(new Metric(metric.baseName() + "." + suffix.suffix()))); } + private static void addMetric(Set<Metric> metrics, StorageMetrics metric, EnumSet<Suffix> suffixes) { + suffixes.forEach(suffix -> metrics.add(new Metric(metric.baseName() + "." + suffix.suffix()))); + } + private static void addMetric(Set<Metric> metrics, String metricName, Iterable<String> aggregateSuffices) { for (String suffix : aggregateSuffices) { metrics.add(new Metric(metricName + "." + suffix)); diff --git a/container-core/src/main/java/com/yahoo/metrics/StorageMetrics.java b/container-core/src/main/java/com/yahoo/metrics/StorageMetrics.java index 12d4085ff4c..825ba67dca1 100644 --- a/container-core/src/main/java/com/yahoo/metrics/StorageMetrics.java +++ b/container-core/src/main/java/com/yahoo/metrics/StorageMetrics.java @@ -8,7 +8,64 @@ import java.util.List; public enum StorageMetrics implements VespaMetrics { VDS_DATASTORED_ALLDISKS_BUCKETS("vds.datastored.alldisks.buckets", Unit.BUCKET, "Number of buckets managed"), - VDS_DATASTORED_ALLDISKS_DOCS("vds.datastored.alldisks.docs", Unit.DOCUMENT, "Number of documents stored"); + VDS_DATASTORED_ALLDISKS_DOCS("vds.datastored.alldisks.docs", Unit.DOCUMENT, "Number of documents stored"), + VDS_DATASTORED_ALLDISKS_BYTES("vds.datastored.alldisks.bytes", Unit.BYTE, "Number of bytes stored"), + VDS_VISITOR_ALLTHREADS_AVERAGEVISITORLIFETIME("vds.visitor.allthreads.averagevisitorlifetime", Unit.MILLISECOND, "Average lifetime of a visitor"), + VDS_VISITOR_ALLTHREADS_AVERAGEQUEUEWAIT("vds.visitor.allthreads.averagequeuewait", Unit.MILLISECOND, "Average time an operation spends in input queue."), + VDS_VISITOR_ALLTHREADS_QUEUESIZE("vds.visitor.allthreads.queuesize", Unit.OPERATION, "Size of input message queue."), + VDS_VISITOR_ALLTHREADS_COMPLETED("vds.visitor.allthreads.completed", Unit.OPERATION, "Number of visitors completed"), + VDS_VISITOR_ALLTHREADS_CREATED("vds.visitor.allthreads.created", Unit.OPERATION, "Number of visitors created."), + VDS_VISITOR_ALLTHREADS_FAILED("vds.visitor.allthreads.failed", Unit.OPERATION, "Number of visitors failed"), + VDS_VISITOR_ALLTHREADS_AVERAGEMESSAGESENDTIME("vds.visitor.allthreads.averagemessagesendtime", Unit.MILLISECOND, "Average time it takes for messages to be sent to their target (and be replied to)"), + VDS_VISITOR_ALLTHREADS_AVERAGEPROCESSINGTIME("vds.visitor.allthreads.averageprocessingtime", Unit.MILLISECOND, "Average time used to process visitor requests"), + + VDS_FILESTOR_QUEUESIZE("vds.filestor.queuesize", Unit.OPERATION, "Size of input message queue."), + VDS_FILESTOR_AVERAGEQUEUEWAIT("vds.filestor.averagequeuewait", Unit.MILLISECOND, "Average time an operation spends in input queue."), + VDS_FILESTOR_ACTIVE_OPERATIONS_SIZE("vds.filestor.active_operations.size", Unit.OPERATION, "Number of concurrent active operations"), + VDS_FILESTOR_ACTIVE_OPERATIONS_LATENCY("vds.filestor.active_operations.latency", Unit.MILLISECOND, "Latency (in ms) for completed operations"), // TODO Vespa 9: Remove 'active' from the metric name + VDS_FILESTOR_THROTTLE_WINDOW_SIZE("vds.filestor.throttle_window_size", Unit.OPERATION, "Current size of async operation throttler window size"), + VDS_FILESTOR_THROTTLE_WAITING_THREADS("vds.filestor.throttle_waiting_threads", Unit.THREAD, "Number of threads waiting to acquire a throttle token"), + VDS_FILESTOR_THROTTLE_ACTIVE_TOKENS("vds.filestor.throttle_active_tokens", Unit.INSTANCE, "Current number of active throttle tokens"), + VDS_FILESTOR_ALLTHREADS_MERGEMETADATAREADLATENCY("vds.filestor.allthreads.mergemetadatareadlatency", Unit.MILLISECOND, "Time spent in a merge step to check metadata of current node to see what data it has."), + VDS_FILESTOR_ALLTHREADS_MERGEDATAREADLATENCY("vds.filestor.allthreads.mergedatareadlatency", Unit.MILLISECOND, "Time spent in a merge step to read data other nodes need."), + VDS_FILESTOR_ALLTHREADS_MERGEDATAWRITELATENCY("vds.filestor.allthreads.mergedatawritelatency", Unit.MILLISECOND, "Time spent in a merge step to write data needed to current node."), + VDS_FILESTOR_ALLTHREADS_MERGE_PUT_LATENCY("vds.filestor.allthreads.put_latency", Unit.MILLISECOND, "Latency of individual puts that are part of merge operations"), // TODO Vespa 9: Update metric name to include 'merge' + VDS_FILESTOR_ALLTHREADS_MERGE_REMOVE_LATENCY("vds.filestor.allthreads.remove_latency", Unit.MILLISECOND, "Latency of individual removes that are part of merge operations"), // TODO Vespa 9: Update metric name to include 'merge' + VDS_FILESTOR_ALLSTRIPES_THROTTLED_RPC_DIRECT_DISPATCHES("vds.filestor.allstripes.throttled_rpc_direct_dispatches", Unit.INSTANCE, "Number of times an RPC thread could not directly dispatch an async operation directly to Proton because it was disallowed by the throttle policy"), + VDS_FILESTOR_ALLSTRIPES_THROTTLED_PERSISTENCE_THREAD_POLLS("vds.filestor.allstripes.throttled_persistence_thread_polls", Unit.INSTANCE, "Number of times a persistence thread could not immediately dispatch a queued async operation because it was disallowed by the throttle policy"), + VDS_FILESTOR_ALLSTRIPES_TIMEOUTS_WAITING_FOR_THROTTLE_TOKEN("vds.filestor.allstripes.timeouts_waiting_for_throttle_token", Unit.INSTANCE, "Number of times a persistence thread timed out waiting for an available throttle policy token"), + + VDS_FILESTOR_ALLTHREADS_PUT_COUNT("vds.filestor.allthreads.put.count", Unit.OPERATION, "Number of requests processed."), + VDS_FILESTOR_ALLTHREADS_PUT_FAILED("vds.filestor.allthreads.put.failed", Unit.OPERATION, "Number of failed requests."), + VDS_FILESTOR_ALLTHREADS_PUT_TEST_AND_SET_FAILED("vds.filestor.allthreads.put.test_and_set_failed", Unit.OPERATION, "Number of operations that were skipped due to a test-and-set condition not met"), + VDS_FILESTOR_ALLTHREADS_PUT_LATENCY("vds.filestor.allthreads.put.latency", Unit.MILLISECOND, "Latency of successful requests."), + VDS_FILESTOR_ALLTHREADS_PUT_REQUEST_SIZE("vds.filestor.allthreads.put.request_size", Unit.BYTE, "Size of requests, in bytes"), + VDS_FILESTOR_ALLTHREADS_REMOVE_COUNT("vds.filestor.allthreads.remove.count", Unit.OPERATION, "Number of requests processed."), + VDS_FILESTOR_ALLTHREADS_REMOVE_FAILED("vds.filestor.allthreads.remove.failed", Unit.OPERATION, "Number of failed requests."), + VDS_FILESTOR_ALLTHREADS_REMOVE_TEST_AND_SET_FAILED("vds.filestor.allthreads.remove.test_and_set_failed", Unit.OPERATION, "Number of operations that were skipped due to a test-and-set condition not met"), + VDS_FILESTOR_ALLTHREADS_REMOVE_LATENCY("vds.filestor.allthreads.remove.latency", Unit.MILLISECOND, "Latency of successful requests."), + VDS_FILESTOR_ALLTHREADS_REMOVE_REQUEST_SIZE("vds.filestor.allthreads.remove.request_size", Unit.BYTE, "Size of requests, in bytes"), + VDS_FILESTOR_ALLTHREADS_GET_COUNT("vds.filestor.allthreads.get.count", Unit.OPERATION, "Number of requests processed."), + VDS_FILESTOR_ALLTHREADS_GET_FAILED("vds.filestor.allthreads.get.failed", Unit.OPERATION, "Number of failed requests."), + VDS_FILESTOR_ALLTHREADS_GET_LATENCY("vds.filestor.allthreads.get.latency", Unit.MILLISECOND, "Latency of successful requests."), + VDS_FILESTOR_ALLTHREADS_GET_REQUEST_SIZE("vds.filestor.allthreads.get.request_size", Unit.BYTE, "Size of requests, in bytes"), + VDS_FILESTOR_ALLTHREADS_UPDATE_COUNT("vds.filestor.allthreads.update.count", Unit.OPERATION, "Number of requests processed."), + VDS_FILESTOR_ALLTHREADS_UPDATE_FAILED("vds.filestor.allthreads.update.failed", Unit.OPERATION, "Number of failed requests."), + VDS_FILESTOR_ALLTHREADS_UPDATE_TEST_AND_SET_FAILED("vds.filestor.allthreads.update.test_and_set_failed", Unit.OPERATION, "Number of operations that were skipped due to a test-and-set condition not met"), + VDS_FILESTOR_ALLTHREADS_UPDATE_LATENCY("vds.filestor.allthreads.update.latency", Unit.MILLISECOND, "Latency of successful requests."), + VDS_FILESTOR_ALLTHREADS_UPDATE_REQUEST_SIZE("vds.filestor.allthreads.update.request_size", Unit.BYTE, "Size of requests, in bytes"), + VDS_FILESTOR_ALLTHREADS_CREATEITERATOR_COUNT("vds.filestor.allthreads.createiterator.count", Unit.OPERATION, "Number of requests processed."), + VDS_FILESTOR_ALLTHREADS_CREATEITERATOR_LATENCY("vds.filestor.allthreads.createiterator.latency", Unit.MILLISECOND, "Latency of successful requests."), + VDS_FILESTOR_ALLTHREADS_VISIT_COUNT("vds.filestor.allthreads.visit.count", Unit.OPERATION, "Number of requests processed."), + VDS_FILESTOR_ALLTHREADS_VISIT_LATENCY("vds.filestor.allthreads.visit.latency", Unit.MILLISECOND, "Latency of successful requests."), + VDS_FILESTOR_ALLTHREADS_REMOVE_LOCATION_COUNT("vds.filestor.allthreads.remove_location.count", Unit.OPERATION, "Number of requests processed."), + VDS_FILESTOR_ALLTHREADS_REMOVE_LOCATION_LATENCY("vds.filestor.allthreads.remove_location.latency", Unit.MILLISECOND, "Latency of successful requests."), + VDS_FILESTOR_ALLTHREADS_SPLITBUCKETS_COUNT("vds.filestor.allthreads.splitbuckets.count", Unit.OPERATION, "Number of requests processed."), + VDS_FILESTOR_ALLTHREADS_JOINBUCKETS_COUNT("vds.filestor.allthreads.joinbuckets.count", Unit.OPERATION, "Number of requests processed."), + VDS_FILESTOR_ALLTHREADS_DELETEBUCKETS_COUNT("vds.filestor.allthreads.deletebuckets.count", Unit.OPERATION, "Number of requests processed."), + VDS_FILESTOR_ALLTHREADS_DELETEBUCKETS_FAILED("vds.filestor.allthreads.deletebuckets.failed", Unit.OPERATION, "Number of failed requests."), + VDS_FILESTOR_ALLTHREADS_DELETEBUCKETS_LATENCY("vds.filestor.allthreads.deletebuckets.latency", Unit.MILLISECOND, "Latency of successful requests."), + VDS_FILESTOR_ALLTHREADS_SETBUCKETSTATES_COUNT("vds.filestor.allthreads.setbucketstates.count", Unit.OPERATION, "Number of requests processed."); private final String name; private final Unit unit; diff --git a/container-core/src/main/java/com/yahoo/metrics/Unit.java b/container-core/src/main/java/com/yahoo/metrics/Unit.java index cfc4b6f5d21..bb7718ddb4c 100644 --- a/container-core/src/main/java/com/yahoo/metrics/Unit.java +++ b/container-core/src/main/java/com/yahoo/metrics/Unit.java @@ -16,6 +16,7 @@ public enum Unit { FRACTION(BaseUnit.FRACTION), HIT(BaseUnit.HIT), HIT_PER_QUERY(BaseUnit.HIT, BaseUnit.QUERY), + INSTANCE(BaseUnit.INSTANCE), ITEM(BaseUnit.ITEM), MILLISECOND(BaseUnit.MILLISECOND), NANOSECOND(BaseUnit.NANOSECOND), @@ -71,6 +72,7 @@ public enum Unit { FILE("file"), FRACTION("fraction"), HIT("hit"), + INSTANCE("instance"), ITEM("item"), MILLISECOND("millisecond", "ms"), NANOSECOND("nanosecond", "ns"), diff --git a/container-core/src/main/java/com/yahoo/restapi/Path.java b/container-core/src/main/java/com/yahoo/restapi/Path.java index 01bcb627639..2fae8da0c2d 100644 --- a/container-core/src/main/java/com/yahoo/restapi/Path.java +++ b/container-core/src/main/java/com/yahoo/restapi/Path.java @@ -85,7 +85,7 @@ public class Path { * * Returns whether this path matches the given template string. * If the given template has placeholders, their values (accessible by get) are reset by calling this, - * whether or not the path matches the given template. + * whether the path matches the given template. * * This will NOT match empty path elements. * diff --git a/container-search/src/test/java/com/yahoo/search/yql/YqlParserTestCase.java b/container-search/src/test/java/com/yahoo/search/yql/YqlParserTestCase.java index bc9448f39d8..33cd2f48d46 100644 --- a/container-search/src/test/java/com/yahoo/search/yql/YqlParserTestCase.java +++ b/container-search/src/test/java/com/yahoo/search/yql/YqlParserTestCase.java @@ -241,6 +241,11 @@ public class YqlParserTestCase { } @Test + void testNonEquality() { + assertParse("select foo from bar where !(price = 500)", "-price:500"); + } + + @Test void testNegativeLessThan() { assertParse("select foo from bar where price < -500", "price:<-500"); assertParse("select foo from bar where -500 < price", "price:>-500"); diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/TrafficShareUpdater.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/TrafficShareUpdater.java index 832dbb6b921..aea01ae36d3 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/TrafficShareUpdater.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/TrafficShareUpdater.java @@ -1,6 +1,10 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.hosted.controller.maintenance; +import com.yahoo.config.application.api.Bcp; +import com.yahoo.config.application.api.DeploymentSpec; +import com.yahoo.config.provision.InstanceName; +import com.yahoo.config.provision.RegionName; import com.yahoo.vespa.hosted.controller.ApplicationController; import com.yahoo.vespa.hosted.controller.Controller; import com.yahoo.vespa.hosted.controller.Instance; @@ -8,7 +12,11 @@ import com.yahoo.vespa.hosted.controller.api.integration.configserver.NodeReposi import com.yahoo.vespa.hosted.controller.application.Deployment; import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; import java.util.logging.Level; +import java.util.stream.Collectors; /** * This computes, for every application deployment @@ -41,12 +49,11 @@ public class TrafficShareUpdater extends ControllerMaintainer { int failures = 0; for (var application : applications.asList()) { for (var instance : application.instances().values()) { - for (var deployment : instance.deployments().values()) { - if ( ! deployment.zone().environment().isProduction()) continue; + for (var deployment : instance.productionDeployments().values()) { if (shuttingDown()) return 1.0; try { attempts++; - updateTrafficFraction(instance, deployment); + updateTrafficFraction(instance, deployment, application.deploymentSpec()); } catch (Exception e) { // Some failures due to locked applications are expected and benign @@ -62,20 +69,94 @@ public class TrafficShareUpdater extends ControllerMaintainer { return successFactor; } - private void updateTrafficFraction(Instance instance, Deployment deployment) { - double qpsInZone = deployment.metrics().queriesPerSecond(); - double totalQps = instance.deployments().values().stream() - .filter(i -> i.zone().environment().isProduction()) - .mapToDouble(i -> i.metrics().queriesPerSecond()).sum(); - long prodRegions = instance.deployments().values().stream() - .filter(i -> i.zone().environment().isProduction()) - .count(); - double currentReadShare = totalQps == 0 ? 0 : qpsInZone / totalQps; - double maxReadShare = prodRegions < 2 ? 1.0 : 1.0 / ( prodRegions - 1.0); - if (currentReadShare > maxReadShare) // This can happen because the assumption of equal traffic - maxReadShare = currentReadShare; // distribution can be incorrect + private void updateTrafficFraction(Instance instance, Deployment deployment, DeploymentSpec deploymentSpec) { + // maxReadShare / currentReadShare = how much additional traffic must the zone be able to handle + double currentReadShare = 0; // How much of the total traffic of the group(s) this is a member of does this deployment receive + double maxReadShare = 0; // How much of the total traffic of the group(s) this is a member of might this deployment receive if a member of the group fails + for (BcpGroup group : BcpGroup.groupsFrom(instance, deploymentSpec)) { + if ( ! group.contains(deployment.zone().region())) continue; + double deploymentQps = deployment.metrics().queriesPerSecond(); + double groupQps = group.totalQps(); + double fraction = group.fraction(deployment.zone().region()); + currentReadShare += groupQps == 0 ? 0 : fraction * deploymentQps / groupQps; + maxReadShare += group.size() == 1 + ? currentReadShare + : fraction * ( deploymentQps + group.maxQpsExcluding(deployment.zone().region()) / (group.size() - 1) ) / groupQps; + } nodeRepository.patchApplication(deployment.zone(), instance.id(), currentReadShare, maxReadShare); } + /** + * A set of regions which will take over traffic from each other if one of them fails. + * Each region will take an equal share (modulated by fraction) of the failing region's traffic. + * + * A regions membership in a group may be partial, represented by a fraction [0, 1], + * in which case the other regions will collectively only take that fraction of the failing regions traffic, + * and symmetrically, the region will only take its fraction of its share of traffic of any other failing region. + */ + private static class BcpGroup { + + /** The instance which has this group. */ + private final Instance instance; + + /** Regions in this group, with their fractions. */ + private final Map<RegionName, Double> regions; + + /** Creates a group of a subset of the deployments in this instance. */ + private BcpGroup(Instance instance, Map<RegionName, Double> regions) { + this.instance = instance; + this.regions = regions; + } + + /** Returns the sum of the fractional memberships of this. */ + double size() { + return regions.values().stream().mapToDouble(f -> f).sum(); + } + + double fraction(RegionName region) { + return regions.getOrDefault(region, 0.0); + } + + boolean contains(RegionName region) { + return regions.containsKey(region); + } + + double totalQps() { + return instance.productionDeployments().values().stream() + .mapToDouble(i -> i.metrics().queriesPerSecond()).sum(); + } + + double maxQpsExcluding(RegionName region) { + return instance.productionDeployments().values().stream() + .filter(d -> ! d.zone().region().equals(region)) + .mapToDouble(d -> d.metrics().queriesPerSecond() * fraction(d.zone().region())) + .max() + .orElse(0); + } + private static Bcp bcpOf(InstanceName instanceName, DeploymentSpec deploymentSpec) { + var instanceSpec = deploymentSpec.instance(instanceName); + if (instanceSpec.isEmpty()) return deploymentSpec.bcp(); + return instanceSpec.get().bcp().orElse(deploymentSpec.bcp()); + } + + private static Map<RegionName, Double> regionsFrom(Instance instance) { + return instance.productionDeployments().values().stream() + .collect(Collectors.toMap(deployment -> deployment.zone().region(), __ -> 1.0)); + } + + private static Map<RegionName, Double> regionsFrom(Bcp.Group groupSpec) { + return groupSpec.members().stream() + .collect(Collectors.toMap(member -> member.region(), member -> member.fraction())); + } + + static List<BcpGroup> groupsFrom(Instance instance, DeploymentSpec deploymentSpec) { + Bcp bcp = bcpOf(instance.name(), deploymentSpec); + if (bcp.isEmpty()) + return List.of(new BcpGroup(instance, regionsFrom(instance))); + return bcp.groups().stream().map(groupSpec -> new BcpGroup(instance, regionsFrom(groupSpec))).toList(); + } + + } + } diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/DeploymentContext.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/DeploymentContext.java index 14835a822e6..b712746b663 100644 --- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/DeploymentContext.java +++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/DeploymentContext.java @@ -15,6 +15,7 @@ import com.yahoo.security.KeyUtils; import com.yahoo.security.SignatureAlgorithm; import com.yahoo.security.X509CertificateBuilder; import com.yahoo.vespa.hosted.controller.Application; +import com.yahoo.vespa.hosted.controller.Controller; import com.yahoo.vespa.hosted.controller.ControllerTester; import com.yahoo.vespa.hosted.controller.Instance; import com.yahoo.vespa.hosted.controller.api.identifiers.DeploymentId; @@ -52,6 +53,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Optional; +import java.util.OptionalLong; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; @@ -196,8 +198,9 @@ public class DeploymentContext { Application application = application(); assertTrue(application.revisions().last().isPresent(), "Application package submitted"); assertFalse(application.instances().values().stream() - .anyMatch(instance -> instance.deployments().values().stream() - .anyMatch(deployment -> deployment.revision().equals(lastSubmission))), "Submission is not already deployed"); + .anyMatch(instance -> instance.deployments().values().stream() + .anyMatch(deployment -> deployment.revision().equals(lastSubmission))), + "Submission is not already deployed"); completeRollout(application.deploymentSpec().instances().size() > 1); for (var instance : application().instances().values()) { assertFalse(instance.change().hasTargets()); diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/TrafficShareUpdaterTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/TrafficShareUpdaterTest.java index fad85ef9b48..5c26e270846 100644 --- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/TrafficShareUpdaterTest.java +++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/TrafficShareUpdaterTest.java @@ -2,8 +2,10 @@ package com.yahoo.vespa.hosted.controller.maintenance; import com.yahoo.component.Version; +import com.yahoo.config.application.api.xml.DeploymentSpecXmlReader; import com.yahoo.config.provision.ApplicationId; import com.yahoo.config.provision.zone.ZoneId; +import com.yahoo.vespa.hosted.controller.Application; import com.yahoo.vespa.hosted.controller.api.application.v4.model.ClusterMetrics; import com.yahoo.vespa.hosted.controller.api.identifiers.DeploymentId; import com.yahoo.vespa.hosted.controller.application.pkg.ApplicationPackage; @@ -14,6 +16,7 @@ import org.junit.jupiter.api.Test; import java.time.Duration; import java.util.Map; +import java.util.OptionalLong; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -25,61 +28,184 @@ import static org.junit.jupiter.api.Assertions.assertEquals; public class TrafficShareUpdaterTest { @Test - void testTrafficUpdater() { + void testTrafficUpdaterImplicitBcp() { DeploymentTester tester = new DeploymentTester(); Version version = Version.fromString("7.1"); - tester.controllerTester().upgradeSystem(version); - var application = tester.newDeploymentContext(); + tester.controllerTester().upgradeSystem(Version.fromString("7.1")); + var context = tester.newDeploymentContext(); var deploymentMetricsMaintainer = new DeploymentMetricsMaintainer(tester.controller(), Duration.ofDays(1)); var updater = new TrafficShareUpdater(tester.controller(), Duration.ofDays(1)); ZoneId prod1 = ZoneId.from("prod", "ap-northeast-1"); ZoneId prod2 = ZoneId.from("prod", "us-east-3"); ZoneId prod3 = ZoneId.from("prod", "us-west-1"); - application.runJob(DeploymentContext.productionApNortheast1, new ApplicationPackage(new byte[0]), version); + context.runJob(DeploymentContext.perfUsEast3, new ApplicationPackage(new byte[0]), version); // Ignored + context.runJob(DeploymentContext.productionApNortheast1, new ApplicationPackage(new byte[0]), version); - // Single zone - setQpsMetric(50.0, application.application().id().defaultInstance(), prod1, tester); + // One zone + context.runJob(DeploymentContext.productionApNortheast1, new ApplicationPackage(new byte[0]), version); + setQpsMetric(50.0, context.application().id().defaultInstance(), prod1, tester); deploymentMetricsMaintainer.maintain(); assertEquals(1.0, updater.maintain(), 0.0000001); - assertTrafficFraction(1.0, 1.0, application.instanceId(), prod1, tester); + assertTrafficFraction(1.0, 1.0, context.instanceId(), prod1, tester); // Two zones - application.runJob(DeploymentContext.productionUsEast3, new ApplicationPackage(new byte[0]), version); - // - one cold - setQpsMetric(50.0, application.application().id().defaultInstance(), prod1, tester); - setQpsMetric(0.0, application.application().id().defaultInstance(), prod2, tester); + context.runJob(DeploymentContext.productionUsEast3, new ApplicationPackage(new byte[0]), version); + setQpsMetric(60.0, context.application().id().defaultInstance(), prod1, tester); + setQpsMetric(20.0, context.application().id().defaultInstance(), prod2, tester); deploymentMetricsMaintainer.maintain(); assertEquals(1.0, updater.maintain(), 0.0000001); - assertTrafficFraction(1.0, 1.0, application.instanceId(), prod1, tester); - assertTrafficFraction(0.0, 1.0, application.instanceId(), prod2, tester); - // - both hot - setQpsMetric(53.0, application.application().id().defaultInstance(), prod1, tester); - setQpsMetric(47.0, application.application().id().defaultInstance(), prod2, tester); + assertTrafficFraction(0.75, 1.0, context.instanceId(), prod1, tester); + assertTrafficFraction(0.25, 1.0, context.instanceId(), prod2, tester); + + // Three zones + context.runJob(DeploymentContext.productionUsWest1, new ApplicationPackage(new byte[0]), version); + setQpsMetric(53.0, context.application().id().defaultInstance(), prod1, tester); + setQpsMetric(45.0, context.application().id().defaultInstance(), prod2, tester); + setQpsMetric(02.0, context.application().id().defaultInstance(), prod3, tester); deploymentMetricsMaintainer.maintain(); assertEquals(1.0, updater.maintain(), 0.0000001); - assertTrafficFraction(0.53, 1.0, application.instanceId(), prod1, tester); - assertTrafficFraction(0.47, 1.0, application.instanceId(), prod2, tester); + assertTrafficFraction(0.53, 0.53 + (double)45/2 / 100, context.instanceId(), prod1, tester); + assertTrafficFraction(0.45, 0.45 + (double)53/2 / 100, context.instanceId(), prod2, tester); + assertTrafficFraction(0.02, 0.02 + (double)53/2 / 100, context.instanceId(), prod3, tester); + } + + @Test + void testTrafficUpdaterHotCold() { + var spec = """ + <deployment version="1.0"> + <staging/> + <prod> + <region>ap-northeast-1</region> + <region>ap-southeast-1</region> + <region>us-east-3</region> + <region>us-central-1</region> + <region>eu-west-1</region> + </prod> + <bcp> + <group> + <region>ap-northeast-1</region> + <region>ap-southeast-1</region> + </group> + <group> + <region>us-east-3</region> + <region>us-central-1</region> + </group> + <group> + <region>eu-west-1</region> + </group> + </bcp> + </deployment> + """; + + DeploymentTester tester = new DeploymentTester(); + Version version = Version.fromString("7.1"); + tester.controllerTester().upgradeSystem(Version.fromString("7.1")); + var context = tester.newDeploymentContext(); + var deploymentSpec = new DeploymentSpecXmlReader(true).read(spec); + tester.controller().applications() + .lockApplicationOrThrow(context.application().id(), + locked -> tester.controller().applications().store(locked.with(deploymentSpec))); + + var deploymentMetricsMaintainer = new DeploymentMetricsMaintainer(tester.controller(), Duration.ofDays(1)); + var updater = new TrafficShareUpdater(tester.controller(), Duration.ofDays(1)); + + ZoneId ap1 = ZoneId.from("prod", "ap-northeast-1"); + ZoneId ap2 = ZoneId.from("prod", "ap-southeast-1"); + ZoneId us1 = ZoneId.from("prod", "us-east-3"); + ZoneId us2 = ZoneId.from("prod", "us-central-1"); + ZoneId eu1 = ZoneId.from("prod", "eu-west-1"); + + context.runJob(DeploymentContext.productionApNortheast1, new ApplicationPackage(new byte[0]), version); + context.runJob(DeploymentContext.productionApSoutheast1, new ApplicationPackage(new byte[0]), version); + context.runJob(DeploymentContext.productionUsEast3, new ApplicationPackage(new byte[0]), version); + context.runJob(DeploymentContext.productionUsCentral1, new ApplicationPackage(new byte[0]), version); + context.runJob(DeploymentContext.productionEuWest1, new ApplicationPackage(new byte[0]), version); + + setQpsMetric(50.0, context.application().id().defaultInstance(), ap1, tester); + setQpsMetric(00.0, context.application().id().defaultInstance(), ap2, tester); + setQpsMetric(10.0, context.application().id().defaultInstance(), us1, tester); + setQpsMetric(00.0, context.application().id().defaultInstance(), us2, tester); + setQpsMetric(40.0, context.application().id().defaultInstance(), eu1, tester); - // Three zones - application.runJob(DeploymentContext.productionUsWest1, new ApplicationPackage(new byte[0]), version); - // - one cold - setQpsMetric(53.0, application.application().id().defaultInstance(), prod1, tester); - setQpsMetric(47.0, application.application().id().defaultInstance(), prod2, tester); - setQpsMetric(0.0, application.application().id().defaultInstance(), prod3, tester); deploymentMetricsMaintainer.maintain(); assertEquals(1.0, updater.maintain(), 0.0000001); - assertTrafficFraction(0.53, 0.53, application.instanceId(), prod1, tester); - assertTrafficFraction(0.47, 0.50, application.instanceId(), prod2, tester); - assertTrafficFraction(0.00, 0.50, application.instanceId(), prod3, tester); - // - all hot - setQpsMetric(50.0, application.application().id().defaultInstance(), prod1, tester); - setQpsMetric(25.0, application.application().id().defaultInstance(), prod2, tester); - setQpsMetric(25.0, application.application().id().defaultInstance(), prod3, tester); + assertTrafficFraction(0.5, 0.5, context.instanceId(), ap1, tester); + assertTrafficFraction(0.0, 0.5, context.instanceId(), ap2, tester); + assertTrafficFraction(0.1, 0.1, context.instanceId(), us1, tester); + assertTrafficFraction(0.0, 0.1, context.instanceId(), us2, tester); + assertTrafficFraction(0.4, 0.4, context.instanceId(), eu1, tester); + } + + @Test + void testTrafficUpdaterOverlappingGroups() { + var spec = """ + <deployment version="1.0"> + <staging/> + <prod> + <region>ap-northeast-1</region> + <region>ap-southeast-1</region> + <region>us-east-3</region> + <region>us-central-1</region> + <region>us-west-1</region> + <region>eu-west-1</region> + </prod> + <bcp> + <group> + <region>ap-northeast-1</region> + <region>ap-southeast-1</region> + <region fraction="0.5">eu-west-1</region> + </group> + <group> + <region>us-east-3</region> + <region>us-central-1</region> + <region>us-west-1</region> + <region fraction="0.5">eu-west-1</region> + </group> + </bcp> + </deployment> + """; + + DeploymentTester tester = new DeploymentTester(); + Version version = Version.fromString("7.1"); + tester.controllerTester().upgradeSystem(Version.fromString("7.1")); + var context = tester.newDeploymentContext(); + var deploymentSpec = new DeploymentSpecXmlReader(true).read(spec); + tester.controller().applications() + .lockApplicationOrThrow(context.application().id(), + locked -> tester.controller().applications().store(locked.with(deploymentSpec))); + + var deploymentMetricsMaintainer = new DeploymentMetricsMaintainer(tester.controller(), Duration.ofDays(1)); + var updater = new TrafficShareUpdater(tester.controller(), Duration.ofDays(1)); + + ZoneId ap1 = ZoneId.from("prod", "ap-northeast-1"); + ZoneId ap2 = ZoneId.from("prod", "ap-southeast-1"); + ZoneId us1 = ZoneId.from("prod", "us-east-3"); + ZoneId us2 = ZoneId.from("prod", "us-central-1"); + ZoneId us3 = ZoneId.from("prod", "us-west-1"); + ZoneId eu1 = ZoneId.from("prod", "eu-west-1"); + + context.runJob(DeploymentContext.productionApNortheast1, new ApplicationPackage(new byte[0]), version); + context.runJob(DeploymentContext.productionApSoutheast1, new ApplicationPackage(new byte[0]), version); + context.runJob(DeploymentContext.productionUsEast3, new ApplicationPackage(new byte[0]), version); + context.runJob(DeploymentContext.productionUsCentral1, new ApplicationPackage(new byte[0]), version); + context.runJob(DeploymentContext.productionUsWest1, new ApplicationPackage(new byte[0]), version); + context.runJob(DeploymentContext.productionEuWest1, new ApplicationPackage(new byte[0]), version); + + setQpsMetric(20.0, context.application().id().defaultInstance(), ap1, tester); + setQpsMetric(50.0, context.application().id().defaultInstance(), ap2, tester); + setQpsMetric(00.0, context.application().id().defaultInstance(), us1, tester); + setQpsMetric(30.0, context.application().id().defaultInstance(), us2, tester); + setQpsMetric(40.0, context.application().id().defaultInstance(), us3, tester); + setQpsMetric(60.0, context.application().id().defaultInstance(), eu1, tester); + deploymentMetricsMaintainer.maintain(); assertEquals(1.0, updater.maintain(), 0.0000001); - assertTrafficFraction(0.50, 0.5, application.instanceId(), prod1, tester); - assertTrafficFraction(0.25, 0.5, application.instanceId(), prod2, tester); - assertTrafficFraction(0.25, 0.5, application.instanceId(), prod3, tester); + assertTrafficFraction(0.10, 0.10 + 50 / 200.0 / 1.5, context.instanceId(), ap1, tester); + assertTrafficFraction(0.25, 0.25 + 30 / 200.0 / 1.5, context.instanceId(), ap2, tester); + assertTrafficFraction(0.00, 0.00 + 40 / 200.0 / 2.5, context.instanceId(), us1, tester); + assertTrafficFraction(0.15, 0.15 + 40 / 200.0 / 2.5, context.instanceId(), us2, tester); + assertTrafficFraction(0.20, 0.20 + 30 / 200.0 / 2.5, context.instanceId(), us3, tester); + assertTrafficFraction(0.30, 0.30 + 0.5 * 50 / 200.0 / 1.5 + 0.5 * 40 / 200.0 / 2.5, context.instanceId(), eu1, tester); } private void setQpsMetric(double qps, ApplicationId application, ZoneId zone, DeploymentTester tester) { @@ -90,8 +216,8 @@ public class TrafficShareUpdaterTest { private void assertTrafficFraction(double currentReadShare, double maxReadShare, ApplicationId application, ZoneId zone, DeploymentTester tester) { NodeRepositoryMock mock = (NodeRepositoryMock)tester.controller().serviceRegistry().configServer().nodeRepository(); - assertEquals(currentReadShare, mock.getTrafficFraction(application, zone).getFirst(), 0.00001); - assertEquals(maxReadShare, mock.getTrafficFraction(application, zone).getSecond(), 0.00001); + assertEquals(currentReadShare, mock.getTrafficFraction(application, zone).getFirst(), 0.00001, "Current read share"); + assertEquals(maxReadShare, mock.getTrafficFraction(application, zone).getSecond(), 0.00001, "Max read share"); } } diff --git a/indexinglanguage/src/main/java/com/yahoo/vespa/indexinglanguage/expressions/EmbedExpression.java b/indexinglanguage/src/main/java/com/yahoo/vespa/indexinglanguage/expressions/EmbedExpression.java index 328cd00742f..9f2260e5b94 100644 --- a/indexinglanguage/src/main/java/com/yahoo/vespa/indexinglanguage/expressions/EmbedExpression.java +++ b/indexinglanguage/src/main/java/com/yahoo/vespa/indexinglanguage/expressions/EmbedExpression.java @@ -66,6 +66,7 @@ public class EmbedExpression extends Expression { @Override protected void doExecute(ExecutionContext context) { + if (context.getValue() == null) return; Tensor output; if (context.getValue().getDataType() == DataType.STRING) { output = embedSingleValue(context); diff --git a/indexinglanguage/src/test/java/com/yahoo/vespa/indexinglanguage/ScriptTestCase.java b/indexinglanguage/src/test/java/com/yahoo/vespa/indexinglanguage/ScriptTestCase.java index c446c04065a..ce42f4e727f 100644 --- a/indexinglanguage/src/test/java/com/yahoo/vespa/indexinglanguage/ScriptTestCase.java +++ b/indexinglanguage/src/test/java/com/yahoo/vespa/indexinglanguage/ScriptTestCase.java @@ -189,6 +189,8 @@ public class ScriptTestCase { "input text", "[105, 110, 112, 117]"); testEmbedStatement("input myText | embed 'emb1' | attribute 'myTensor'", embedder, "input text", "[105, 110, 112, 117]"); + testEmbedStatement("input myText | embed 'emb1' | attribute 'myTensor'", embedder, + null, null); Map<String, Embedder> embedders = Map.of( "emb1", new MockEmbedder("myDocument.myTensor"), @@ -200,9 +202,9 @@ public class ScriptTestCase { "my input", "[110.0, 122.0, 33.0, 106.0]"); assertThrows(() -> testEmbedStatement("input myText | embed | attribute 'myTensor'", embedders, "input text", "[105, 110, 112, 117]"), - "Multiple embedders are provided but no embedder id is given. Valid embedders are emb1,emb2"); + "Multiple embedders are provided but no embedder id is given. Valid embedders are emb1,emb2"); assertThrows(() -> testEmbedStatement("input myText | embed emb3 | attribute 'myTensor'", embedders, "input text", "[105, 110, 112, 117]"), - "Can't find embedder 'emb3'. Valid embedders are emb1,emb2"); + "Can't find embedder 'emb3'. Valid embedders are emb1,emb2"); } private void testEmbedStatement(String expressionString, Map<String, Embedder> embedders, String input, String expected) { @@ -224,9 +226,14 @@ public class ScriptTestCase { ExecutionContext context = new ExecutionContext(adapter); expression.execute(context); - assertTrue(adapter.values.containsKey("myTensor")); - assertEquals(Tensor.from(tensorType, expected), - ((TensorFieldValue) adapter.values.get("myTensor")).getTensor().get()); + if (input == null) { + assertFalse(adapter.values.containsKey("myTensor")); + } + else { + assertTrue(adapter.values.containsKey("myTensor")); + assertEquals(Tensor.from(tensorType, expected), + ((TensorFieldValue) adapter.values.get("myTensor")).getTensor().get()); + } } catch (ParseException e) { throw new IllegalArgumentException(e); diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/restapi/LoadBalancersV1ApiHandler.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/restapi/LoadBalancersV1ApiHandler.java index 1ba686772c7..a23b1273fe3 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/restapi/LoadBalancersV1ApiHandler.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/restapi/LoadBalancersV1ApiHandler.java @@ -1,37 +1,86 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.hosted.provision.restapi; +import com.yahoo.container.jdisc.HttpRequest; import com.yahoo.container.jdisc.HttpResponse; import com.yahoo.container.jdisc.ThreadedHttpRequestHandler; -import com.yahoo.restapi.RestApi; -import com.yahoo.restapi.RestApiRequestHandler; +import com.yahoo.restapi.ErrorResponse; +import com.yahoo.restapi.MessageResponse; +import com.yahoo.restapi.Path; import com.yahoo.vespa.hosted.provision.NodeRepository; +import com.yahoo.vespa.hosted.provision.lb.LoadBalancer; +import com.yahoo.vespa.hosted.provision.lb.LoadBalancerId; +import com.yahoo.yolean.Exceptions; import javax.inject.Inject; +import java.time.Duration; +import java.util.Objects; +import java.util.logging.Level; /** * @author mpolden * @author jonmv */ -public class LoadBalancersV1ApiHandler extends RestApiRequestHandler<LoadBalancersV1ApiHandler> { +public class LoadBalancersV1ApiHandler extends ThreadedHttpRequestHandler { private final NodeRepository nodeRepository; @Inject public LoadBalancersV1ApiHandler(ThreadedHttpRequestHandler.Context parentCtx, NodeRepository nodeRepository) { - super(parentCtx, LoadBalancersV1ApiHandler::createRestApiDefinition); - this.nodeRepository = nodeRepository; + super(parentCtx); + this.nodeRepository = Objects.requireNonNull(nodeRepository); } - private static RestApi createRestApiDefinition(LoadBalancersV1ApiHandler self) { - return RestApi.builder() - .addRoute(RestApi.route("/loadbalancers/v1") - .get(self::getLoadBalancers)) - .build(); + @Override + public HttpResponse handle(HttpRequest request) { + try { + return switch (request.getMethod()) { + case GET -> get(request); + case PUT -> put(request); + default -> ErrorResponse.methodNotAllowed("Method '" + request.getMethod() + "' is not supported"); + }; + } catch (NotFoundException e) { + return ErrorResponse.notFoundError(Exceptions.toMessageString(e)); + } catch (IllegalArgumentException e) { + return ErrorResponse.badRequest(Exceptions.toMessageString(e)); + } catch (RuntimeException e) { + log.log(Level.WARNING, "Unexpected error handling '" + request.getUri() + "'", e); + return ErrorResponse.internalServerError(Exceptions.toMessageString(e)); + } } - private HttpResponse getLoadBalancers(RestApi.RequestContext context) { - return new LoadBalancersResponse(context.request(), nodeRepository); + private HttpResponse get(HttpRequest request) { + Path path = new Path(request.getUri()); + if (path.matches("/loadbalancers/v1")) return new LoadBalancersResponse(request, nodeRepository); + throw new NotFoundException("Nothing at " + path); + } + + private HttpResponse put(HttpRequest request) { + Path path = new Path(request.getUri()); + if (path.matches("/loadbalancers/v1/state/{state}/{id}")) return setState(path.get("state"), path.get("id")); + throw new NotFoundException("Nothing at " + path); + } + + private HttpResponse setState(String state, String id) { + LoadBalancer.State toState = stateFrom(state); + LoadBalancerId loadBalancerId = LoadBalancerId.fromSerializedForm(id); + try (var lock = nodeRepository.database().lock(loadBalancerId.application(), Duration.ofSeconds(1))) { + LoadBalancer loadBalancer = nodeRepository.database().readLoadBalancer(loadBalancerId) + .orElseThrow(() -> new NotFoundException(loadBalancerId + " does not exist")); + nodeRepository.database().writeLoadBalancer(loadBalancer.with(toState, nodeRepository.clock().instant()), + loadBalancer.state()); + } + return new MessageResponse("Moved " + loadBalancerId + " to " + toState); + } + + private LoadBalancer.State stateFrom(String state) { + return switch (state) { + case "reserved" -> LoadBalancer.State.reserved; + case "inactive" -> LoadBalancer.State.inactive; + case "active" -> LoadBalancer.State.active; + case "removable" -> LoadBalancer.State.removable; + default -> throw new IllegalArgumentException("Invalid state '" + state + "'"); + }; } } diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/restapi/LoadBalancersV1ApiTest.java b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/restapi/LoadBalancersV1ApiTest.java index 3c20f6ddb09..240d0daf96f 100644 --- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/restapi/LoadBalancersV1ApiTest.java +++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/restapi/LoadBalancersV1ApiTest.java @@ -7,6 +7,9 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +/** + * @author mpolden + */ public class LoadBalancersV1ApiTest { private RestApiTester tester; @@ -22,11 +25,19 @@ public class LoadBalancersV1ApiTest { } @Test - public void test_load_balancers() throws Exception { + public void load_balancers() throws Exception { tester.assertFile(new Request("http://localhost:8080/loadbalancers/v1"), "load-balancers.json"); tester.assertFile(new Request("http://localhost:8080/loadbalancers/v1/"), "load-balancers.json"); tester.assertFile(new Request("http://localhost:8080/loadbalancers/v1/?application=tenant4.application4.instance4"), "load-balancers-single.json"); tester.assertResponse(new Request("http://localhost:8080/loadbalancers/v1/?application=tenant.nonexistent.default"), "{\"loadBalancers\":[]}"); } + @Test + public void set_state() throws Exception { + tester.assertResponse(new Request("http://localhost:8080/loadbalancers/v1/state/removable/tenant42:application42:instance42:id42", "", Request.Method.PUT), + 404, "{\"error-code\":\"NOT_FOUND\",\"message\":\"load balancer tenant42:application42:instance42:id42 does not exist\"}"); + tester.assertResponse(new Request("http://localhost:8080/loadbalancers/v1/state/removable/tenant4:application4:instance4:id4", "", Request.Method.PUT), + "{\"message\":\"Moved load balancer tenant4:application4:instance4:id4 to removable\"}"); + } + } diff --git a/searchcore/src/vespa/searchcore/proton/matching/match_tools.cpp b/searchcore/src/vespa/searchcore/proton/matching/match_tools.cpp index 980c3345527..834e256c18f 100644 --- a/searchcore/src/vespa/searchcore/proton/matching/match_tools.cpp +++ b/searchcore/src/vespa/searchcore/proton/matching/match_tools.cpp @@ -231,7 +231,7 @@ MatchToolsFactory(QueryLimiter & queryLimiter, _match_limiter = std::make_unique<NoMatchPhaseLimiter>(); } trace.addEvent(4, "Complete query setup"); - if (root_trace.shouldTrace(4)) { + if (trace.hasTrace()) { vespalib::slime::ObjectInserter inserter(root_trace.createCursor("query_setup"), "traces"); vespalib::slime::inject(trace.getTraces(), inserter); } diff --git a/searchcore/src/vespa/searchcore/proton/matching/query.cpp b/searchcore/src/vespa/searchcore/proton/matching/query.cpp index 9217b985ba5..b7e3f32a6f6 100644 --- a/searchcore/src/vespa/searchcore/proton/matching/query.cpp +++ b/searchcore/src/vespa/searchcore/proton/matching/query.cpp @@ -292,7 +292,7 @@ Query::handle_global_filter(Blueprint& blueprint, uint32_t docid_limit, trace->addEvent(5, vespalib::make_string("Calculate global filter (estimated_hit_ratio (%f) <= upper_limit (%f))", estimated_hit_ratio, global_filter_upper_limit)); } - global_filter = GlobalFilter::create(blueprint, docid_limit, thread_bundle); + global_filter = GlobalFilter::create(blueprint, docid_limit, thread_bundle, trace); if (!global_filter->is_active() && trace && trace->shouldTrace(5)) { trace->addEvent(5, "Global filter matches everything"); } diff --git a/searchlib/src/vespa/searchlib/queryeval/global_filter.cpp b/searchlib/src/vespa/searchlib/queryeval/global_filter.cpp index 2d88883780a..f4ee9bf9bc2 100644 --- a/searchlib/src/vespa/searchlib/queryeval/global_filter.cpp +++ b/searchlib/src/vespa/searchlib/queryeval/global_filter.cpp @@ -2,11 +2,17 @@ #include "global_filter.h" #include "blueprint.h" +#include "profiled_iterator.h" #include <vespa/vespalib/util/require.h> #include <vespa/vespalib/util/thread_bundle.h> +#include <vespa/vespalib/util/execution_profiler.h> #include <vespa/searchlib/common/bitvector.h> +#include <vespa/searchlib/engine/trace.h> +#include <vespa/vespalib/data/slime/slime.h> #include <cassert> +using search::engine::Trace; +using vespalib::ExecutionProfiler; using vespalib::Runnable; using vespalib::ThreadBundle; using vespalib::Trinary; @@ -80,12 +86,15 @@ struct PartResult { : matches_any(Trinary::Undefined), bits(std::move(bits_in)) {} }; -PartResult make_part(Blueprint &blueprint, uint32_t begin, uint32_t end) { +PartResult make_part(Blueprint &blueprint, uint32_t begin, uint32_t end, ExecutionProfiler *profiler) { bool strict = true; auto constraint = Blueprint::FilterConstraint::UPPER_BOUND; auto filter = blueprint.createFilterSearch(strict, constraint); auto matches_any = filter->matches_any(); if (matches_any == Trinary::Undefined) { + if (profiler) { + filter = ProfiledIterator::profile(*profiler, std::move(filter)); + } filter->initRange(begin, end); auto bits = filter->get_hits(begin); // count bits in parallel and cache the results for later @@ -101,10 +110,31 @@ struct MakePart : Runnable { uint32_t begin; uint32_t end; PartResult result; - MakePart(Blueprint &blueprint_in, uint32_t begin_in, uint32_t end_in) noexcept - : blueprint(blueprint_in), begin(begin_in), end(end_in), result() {} - void run() override { result = make_part(blueprint, begin, end); } + std::unique_ptr<ExecutionProfiler> profiler; + MakePart(MakePart &&) = default; + MakePart(Blueprint &blueprint_in, uint32_t begin_in, uint32_t end_in, int32_t profile_depth) + : blueprint(blueprint_in), begin(begin_in), end(end_in), result(), profiler() + { + if (profile_depth != 0) { + profiler = std::make_unique<ExecutionProfiler>(profile_depth); + } + } + void run() override { result = make_part(blueprint, begin, end, profiler.get()); } + ~MakePart(); }; +MakePart::~MakePart() = default; + +void maybe_insert_profiler_results(Trace *trace, int32_t profile_depth, const std::vector<MakePart> &parts) { + if (trace && profile_depth != 0) { + auto &obj = trace->createCursor("global_filter_execution"); + auto &arr = obj.setArray("threads"); + for (auto &&part: parts) { + auto &dst = arr.addObject().setArray("traces").addObject(); + dst.setString("tag", "global_filter_profiling"); + part.profiler->report(dst); + } + } +} } @@ -159,8 +189,9 @@ GlobalFilter::create(std::vector<std::unique_ptr<BitVector>> vectors) } std::shared_ptr<GlobalFilter> -GlobalFilter::create(Blueprint &blueprint, uint32_t docid_limit, ThreadBundle &thread_bundle) +GlobalFilter::create(Blueprint &blueprint, uint32_t docid_limit, ThreadBundle &thread_bundle, Trace *trace) { + int32_t profile_depth = (trace && trace->getLevel() > 0) ? trace->match_profile_depth() : 0; uint32_t num_threads = thread_bundle.size(); std::vector<MakePart> parts; parts.reserve(num_threads); @@ -169,12 +200,13 @@ GlobalFilter::create(Blueprint &blueprint, uint32_t docid_limit, ThreadBundle &t uint32_t rest_docs = (docid_limit - docid) % num_threads; while (docid < docid_limit) { uint32_t part_size = per_thread + (parts.size() < rest_docs); - parts.emplace_back(blueprint, docid, docid + part_size); + parts.emplace_back(blueprint, docid, docid + part_size, profile_depth); docid += part_size; } assert(parts.size() <= num_threads); assert((docid == docid_limit) || parts.empty()); thread_bundle.run(parts); + maybe_insert_profiler_results(trace, profile_depth, parts); std::vector<std::unique_ptr<BitVector>> vectors; vectors.reserve(parts.size()); for (MakePart &part: parts) { diff --git a/searchlib/src/vespa/searchlib/queryeval/global_filter.h b/searchlib/src/vespa/searchlib/queryeval/global_filter.h index 66f85299dd1..c2b7b6617fc 100644 --- a/searchlib/src/vespa/searchlib/queryeval/global_filter.h +++ b/searchlib/src/vespa/searchlib/queryeval/global_filter.h @@ -8,6 +8,8 @@ namespace vespalib { struct ThreadBundle; } namespace search { class BitVector; } +namespace search::engine { class Trace; } + namespace search::queryeval { class Blueprint; @@ -22,6 +24,7 @@ class Blueprint; class GlobalFilter : public std::enable_shared_from_this<GlobalFilter> { public: + using Trace = search::engine::Trace; GlobalFilter() noexcept; GlobalFilter(const GlobalFilter &) = delete; GlobalFilter(GlobalFilter &&) = delete; @@ -39,7 +42,10 @@ public: static std::shared_ptr<GlobalFilter> create(std::vector<uint32_t> docids, uint32_t size); static std::shared_ptr<GlobalFilter> create(std::unique_ptr<BitVector> vector); static std::shared_ptr<GlobalFilter> create(std::vector<std::unique_ptr<BitVector>> vectors); - static std::shared_ptr<GlobalFilter> create(Blueprint &blueprint, uint32_t docid_limit, vespalib::ThreadBundle &thread_bundle); + static std::shared_ptr<GlobalFilter> create(Blueprint &blueprint, uint32_t docid_limit, vespalib::ThreadBundle &thread_bundle, Trace *trace); + static std::shared_ptr<GlobalFilter> create(Blueprint &blueprint, uint32_t docid_limit, vespalib::ThreadBundle &thread_bundle) { + return create(blueprint, docid_limit, thread_bundle, nullptr); + } }; } // namespace diff --git a/storage/src/tests/distributor/pendingmessagetrackertest.cpp b/storage/src/tests/distributor/pendingmessagetrackertest.cpp index 0ba374f7190..3bfa1027a82 100644 --- a/storage/src/tests/distributor/pendingmessagetrackertest.cpp +++ b/storage/src/tests/distributor/pendingmessagetrackertest.cpp @@ -175,7 +175,7 @@ TEST_F(PendingMessageTrackerTest, simple) { EXPECT_THAT(ost.str(), HasSubstr( "<b>Bucket(BucketSpace(0x0000000000000001), BucketId(0x40000000000004d2))</b>\n" "<ul>\n" - "<li><i>Node 0</i>: <b>1970-01-01 00:00:01</b> " + "<li><i>Node 0</i>: <b>1970-01-01 00:00:01.000 UTC</b> " "Remove(BucketId(0x40000000000004d2), priority=127)</li>\n" "</ul>\n")); } @@ -248,17 +248,17 @@ TEST_F(PendingMessageTrackerTest, multiple_messages) { EXPECT_THAT(ost.str(), HasSubstr( "<b>Bucket(BucketSpace(0x0000000000000001), BucketId(0x40000000000004d2))</b>\n" "<ul>\n" - "<li><i>Node 0</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n" - "<li><i>Node 0</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n" - "<li><i>Node 1</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n" - "<li><i>Node 1</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n" + "<li><i>Node 0</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n" + "<li><i>Node 0</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n" + "<li><i>Node 1</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n" + "<li><i>Node 1</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n" "</ul>\n" "<b>Bucket(BucketSpace(0x0000000000000001), BucketId(0x40000000000011d7))</b>\n" "<ul>\n" - "<li><i>Node 0</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n" - "<li><i>Node 0</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n" - "<li><i>Node 1</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n" - "<li><i>Node 1</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n" + "<li><i>Node 0</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n" + "<li><i>Node 0</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n" + "<li><i>Node 1</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n" + "<li><i>Node 1</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n" "</ul>\n")); } { @@ -268,44 +268,23 @@ TEST_F(PendingMessageTrackerTest, multiple_messages) { EXPECT_THAT(ost.str(), HasSubstr( "<b>Node 0 (pending count: 4)</b>\n" "<ul>\n" - "<li><i>Node 0</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n" - "<li><i>Node 0</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n" - "<li><i>Node 0</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n" - "<li><i>Node 0</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n" + "<li><i>Node 0</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n" + "<li><i>Node 0</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n" + "<li><i>Node 0</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n" + "<li><i>Node 0</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n" "</ul>\n" "<b>Node 1 (pending count: 4)</b>\n" "<ul>\n" - "<li><i>Node 1</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n" - "<li><i>Node 1</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n" - "<li><i>Node 1</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n" - "<li><i>Node 1</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n" + "<li><i>Node 1</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n" + "<li><i>Node 1</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n" + "<li><i>Node 1</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n" + "<li><i>Node 1</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n" "</ul>\n")); } } namespace { -template <typename T> -std::string setToString(const std::set<T>& s) -{ - std::ostringstream ost; - ost << '{'; - for (typename std::set<T>::const_iterator i(s.begin()), e(s.end()); - i != e; ++i) - { - if (i != s.begin()) { - ost << ','; - } - ost << *i; - } - ost << '}'; - return ost.str(); -} - -} - -namespace { - class TestChecker : public PendingMessageTracker::Checker { public: @@ -443,7 +422,7 @@ TEST_F(PendingMessageTrackerTest, busy_reply_marks_node_as_busy) { TEST_F(PendingMessageTrackerTest, busy_node_duration_can_be_adjusted) { Fixture f; auto cmd = f.sendPut(RequestBuilder().toNode(0)); - f.tracker().setNodeBusyDuration(std::chrono::seconds(10)); + f.tracker().setNodeBusyDuration(10s); f.sendPutReply(*cmd, RequestBuilder(), api::ReturnCode(api::ReturnCode::BUSY)); EXPECT_TRUE(f.tracker().getNodeInfo().isBusy(0)); f.clock().addSecondsToTime(11); diff --git a/storage/src/tests/visiting/visitormanagertest.cpp b/storage/src/tests/visiting/visitormanagertest.cpp index be4e7270c69..a82514acb03 100644 --- a/storage/src/tests/visiting/visitormanagertest.cpp +++ b/storage/src/tests/visiting/visitormanagertest.cpp @@ -217,7 +217,7 @@ VisitorManagerTest::getSession(uint32_t n) // Wait until we have started the visitor const std::vector<TestVisitorMessageSession*>& sessions(_messageSessionFactory->_visitorSessions); framework::defaultimplementation::RealClock clock; - framework::MilliSecTime endTime(clock.getTimeInMillis() + framework::MilliSecTime(30 * 1000)); + vespalib::steady_time endTime = clock.getMonotonicTime() + 30s; while (true) { { std::lock_guard lock(_messageSessionFactory->_accessLock); @@ -225,9 +225,8 @@ VisitorManagerTest::getSession(uint32_t n) return *sessions[n]; } } - if (clock.getTimeInMillis() > endTime) { - throw vespalib::IllegalStateException( - "Timed out waiting for visitor session", VESPA_STRLOC); + if (clock.getMonotonicTime() > endTime) { + throw vespalib::IllegalStateException("Timed out waiting for visitor session", VESPA_STRLOC); } std::this_thread::sleep_for(10ms); } @@ -255,12 +254,10 @@ VisitorManagerTest::getMessagesAndReply( switch (session.sentMessages[i]->getType()) { case documentapi::DocumentProtocol::MESSAGE_PUTDOCUMENT: - docs.push_back(static_cast<documentapi::PutDocumentMessage&>( - *session.sentMessages[i]).getDocumentSP()); + docs.push_back(static_cast<documentapi::PutDocumentMessage&>(*session.sentMessages[i]).getDocumentSP()); break; case documentapi::DocumentProtocol::MESSAGE_REMOVEDOCUMENT: - docIds.push_back(static_cast<documentapi::RemoveDocumentMessage&>( - *session.sentMessages[i]).getDocumentId()); + docIds.push_back(static_cast<documentapi::RemoveDocumentMessage&>(*session.sentMessages[i]).getDocumentId()); break; default: break; @@ -355,10 +352,7 @@ TEST_F(VisitorManagerTest, normal_usage) { getMessagesAndReply(1, getSession(0), docs, docIds); // All data has been replied to, expecting to get a create visitor reply - ASSERT_NO_FATAL_FAILURE( - verifyCreateVisitorReply(api::ReturnCode::OK, - int(docs.size()), - getTotalSerializedSize(docs))); + ASSERT_NO_FATAL_FAILURE(verifyCreateVisitorReply(api::ReturnCode::OK, int(docs.size()), getTotalSerializedSize(docs))); EXPECT_EQ(1u, getMatchingDocuments(docs)); EXPECT_FALSE(_manager->hasPendingMessageState()); diff --git a/storage/src/tests/visiting/visitortest.cpp b/storage/src/tests/visiting/visitortest.cpp index f3a538b7832..565131b3b99 100644 --- a/storage/src/tests/visiting/visitortest.cpp +++ b/storage/src/tests/visiting/visitortest.cpp @@ -256,11 +256,9 @@ TestVisitorMessageSession& VisitorTest::getSession(uint32_t n) { // Wait until we have started the visitor - const std::vector<TestVisitorMessageSession*>& sessions( - _messageSessionFactory->_visitorSessions); + const std::vector<TestVisitorMessageSession*>& sessions(_messageSessionFactory->_visitorSessions); framework::defaultimplementation::RealClock clock; - framework::MilliSecTime endTime( - clock.getTimeInMillis() + framework::MilliSecTime(30 * 1000)); + vespalib::steady_time endTime = clock.getMonotonicTime() + 30s; while (true) { { std::lock_guard lock(_messageSessionFactory->_accessLock); @@ -268,7 +266,7 @@ VisitorTest::getSession(uint32_t n) return *sessions[n]; } } - if (clock.getTimeInMillis() > endTime) { + if (clock.getMonotonicTime() > endTime) { throw vespalib::IllegalStateException( "Timed out waiting for visitor session", VESPA_STRLOC); } diff --git a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp index e68cbd75d52..2f1622750d7 100644 --- a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp +++ b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp @@ -182,7 +182,7 @@ struct MetricsUpdater { void add(const MetricsUpdater& rhs) noexcept { auto& d = count; - auto& s = rhs.count; + const auto& s = rhs.count; d.buckets += s.buckets; d.docs += s.docs; d.bytes += s.bytes; @@ -209,7 +209,7 @@ BucketManager::updateMetrics(bool updateDocCount) if (!updateDocCount || _doneInitialized) { MetricsUpdater total; - for (auto& space : _component.getBucketSpaceRepo()) { + for (const auto& space : _component.getBucketSpaceRepo()) { MetricsUpdater m; auto guard = space.second->bucketDatabase().acquire_read_guard(); guard->for_each(std::ref(m)); @@ -238,7 +238,7 @@ BucketManager::updateMetrics(bool updateDocCount) } void BucketManager::update_bucket_db_memory_usage_metrics() { - for (auto& space : _component.getBucketSpaceRepo()) { + for (const auto& space : _component.getBucketSpaceRepo()) { auto bm = _metrics->bucket_spaces.find(space.first); bm->second->bucket_db_metrics.memory_usage.update(space.second->bucketDatabase().detailed_memory_usage()); } @@ -342,7 +342,7 @@ BucketManager::reportStatus(std::ostream& out, using vespalib::xml::XmlAttribute; xmlReporter << vespalib::xml::XmlTag("buckets"); - for (auto& space : _component.getBucketSpaceRepo()) { + for (const auto& space : _component.getBucketSpaceRepo()) { xmlReporter << XmlTag("bucket-space") << XmlAttribute("name", document::FixedBucketSpaces::to_string(space.first)); BucketDBDumper dumper(xmlReporter.getStream()); @@ -404,7 +404,7 @@ bool BucketManager::onRequestBucketInfo( api::RequestBucketInfoReply::EntryVector info; if (!cmd->getBuckets().empty()) { for (auto bucketId : cmd->getBuckets()) { - for (auto & entry : _component.getBucketDatabase(bucketSpace).getAll(bucketId, "BucketManager::onRequestBucketInfo")) { + for (const auto & entry : _component.getBucketDatabase(bucketSpace).getAll(bucketId, "BucketManager::onRequestBucketInfo")) { info.emplace_back(entry.first, entry.second->getBucketInfo()); } } @@ -457,7 +457,7 @@ BucketManager::leaveQueueProtectedSection(ScopedQueueDispatchGuard& queueGuard) // may alter the relevant state. --_requestsCurrentlyProcessing; if (_requestsCurrentlyProcessing == 0) { - for (auto& qr : _queuedReplies) { + for (const auto& qr : _queuedReplies) { sendUp(qr); } _queuedReplies.clear(); @@ -494,7 +494,7 @@ BucketManager::processRequestBucketInfoCommands(document::BucketSpace bucketSpac reqs.size(), bucketSpace.toString().c_str(), clusterState->toString().c_str(), our_hash.c_str()); std::lock_guard clusterStateGuard(_clusterStateLock); - for (auto & req : std::ranges::reverse_view(reqs)) { + for (const auto & req : std::ranges::reverse_view(reqs)) { // Currently small requests should not be forwarded to worker thread assert(req->hasSystemState()); const auto their_hash = req->getDistributionHash(); @@ -547,7 +547,7 @@ BucketManager::processRequestBucketInfoCommands(document::BucketSpace bucketSpac std::ostringstream distrList; std::unordered_map<uint16_t, api::RequestBucketInfoReply::EntryVector> result; - for (auto& nodeAndCmd : requests) { + for (const auto& nodeAndCmd : requests) { result[nodeAndCmd.first]; if (LOG_WOULD_LOG(debug)) { distrList << ' ' << nodeAndCmd.first; @@ -576,7 +576,7 @@ BucketManager::processRequestBucketInfoCommands(document::BucketSpace bucketSpac "BucketManager::processRequestBucketInfoCommands-2"); } _metrics->fullBucketInfoLatency.addValue(runStartTime.getElapsedTimeAsDouble()); - for (auto& nodeAndCmd : requests) { + for (const auto& nodeAndCmd : requests) { auto reply(std::make_shared<api::RequestBucketInfoReply>(*nodeAndCmd.second)); reply->getBucketInfo().swap(result[nodeAndCmd.first]); sendUp(reply); diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp index 667afbf67a0..393136de654 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp @@ -15,6 +15,7 @@ LOG_SETUP(".distributor.operation.idealstate.merge"); using vespalib::to_utc; using vespalib::to_string; +using vespalib::make_string_short::fmt; namespace storage::distributor { MergeOperation::~MergeOperation() = default; @@ -24,8 +25,7 @@ MergeOperation::getStatus() const { return Operation::getStatus() + - vespalib::make_string(" . Sent MergeBucketCommand at %s", - to_string(to_utc(_sentMessageTime)).c_str()); + fmt(" . Sent MergeBucketCommand at %s", to_string(to_utc(_sentMessageTime)).c_str()); } void @@ -35,7 +35,7 @@ MergeOperation::addIdealNodes( std::vector<MergeMetaData>& result) { // Add all ideal nodes first. These are never marked source-only. - for (unsigned short idealNode : idealNodes) { + for (uint16_t idealNode : idealNodes) { const MergeMetaData* entry = nullptr; for (const auto & node : nodes) { if (idealNode == node._nodeIndex) { @@ -56,7 +56,7 @@ MergeOperation::addCopiesNotAlreadyAdded(uint16_t redundancy, const std::vector<MergeMetaData>& nodes, std::vector<MergeMetaData>& result) { - for (auto node : nodes) { + for (const auto & node : nodes) { bool found = false; for (const auto & mergeData : result) { if (mergeData._nodeIndex == node._nodeIndex) { @@ -123,7 +123,7 @@ MergeOperation::onStart(DistributorStripeMessageSender& sender) std::vector<std::unique_ptr<BucketCopy> > newCopies; std::vector<MergeMetaData> nodes; - for (unsigned short node : getNodes()) { + for (uint16_t node : getNodes()) { const BucketCopy* copy = entry->getNode(node); if (copy == nullptr) { // New copies? newCopies.emplace_back(std::make_unique<BucketCopy>(BucketCopy::recentlyCreatedCopy(0, node))); @@ -153,8 +153,7 @@ MergeOperation::onStart(DistributorStripeMessageSender& sender) msg->set_use_unordered_forwarding(true); } - LOG(debug, "Sending %s to storage node %u", msg->toString().c_str(), - _mnodes[0].index); + LOG(debug, "Sending %s to storage node %u", msg->toString().c_str(), _mnodes[0].index); // Set timeout to one hour to prevent hung nodes that manage to keep // connections open from stalling merges in the cluster indefinitely. @@ -165,8 +164,7 @@ MergeOperation::onStart(DistributorStripeMessageSender& sender) _sentMessageTime = _manager->node_context().clock().getMonotonicTime(); } else { - LOGBP(debug, - "Unable to merge bucket %s, since only one copy is available. System state %s", + LOGBP(debug, "Unable to merge bucket %s, since only one copy is available. System state %s", getBucketId().toString().c_str(), clusterState.toString().c_str()); _ok = false; done(); @@ -178,7 +176,7 @@ MergeOperation::sourceOnlyCopyChangedDuringMerge( const BucketDatabase::Entry& currentState) const { assert(currentState.valid()); - for (auto mnode : _mnodes) { + for (const auto & mnode : _mnodes) { const BucketCopy* copyBefore(_infoBefore.getNode(mnode.index)); if (!copyBefore) { continue; @@ -206,7 +204,7 @@ MergeOperation::deleteSourceOnlyNodes( { assert(currentState.valid()); std::vector<uint16_t> sourceOnlyNodes; - for (auto & mnode : _mnodes) { + for (const auto & mnode : _mnodes) { const uint16_t nodeIndex = mnode.index; const BucketCopy* copy = currentState->getNode(nodeIndex); if (!copy) { @@ -338,7 +336,7 @@ bool MergeOperation::isBlocked(const DistributorStripeOperationContext& ctx, // to enter the merge throttler queues, displacing lower priority merges. if (!is_global_bucket_merge()) { const auto& node_info = ctx.pending_message_tracker().getNodeInfo(); - for (auto node : getNodes()) { + for (uint16_t node : getNodes()) { if (node_info.isBusy(node)) { return true; } @@ -364,11 +362,9 @@ bool MergeOperation::all_involved_nodes_support_unordered_merge_chaining() const MergeBucketMetricSet* MergeOperation::get_merge_metrics() { - if (_manager) { - return dynamic_cast<MergeBucketMetricSet *>(_manager->getMetrics().operations[getType()].get()); - } else { - return nullptr; - } + return (_manager) + ? dynamic_cast<MergeBucketMetricSet *>(_manager->getMetrics().operations[getType()].get()) + : nullptr; } } diff --git a/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp b/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp index 533493a79a2..8618d570685 100644 --- a/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp +++ b/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp @@ -3,7 +3,6 @@ #include <vespa/vespalib/stllike/asciistream.h> #include <vespa/vespalib/util/stringfmt.h> #include <map> -#include <algorithm> #include <vespa/log/log.h> LOG_SETUP(".pendingmessages"); @@ -15,7 +14,7 @@ PendingMessageTracker::PendingMessageTracker(framework::ComponentRegister& cr, u vespalib::make_string("Pending messages to storage nodes (stripe %u)", stripe_index)), _component(cr, "pendingmessagetracker"), _nodeInfo(_component.getClock()), - _nodeBusyDuration(60), + _nodeBusyDuration(60s), _deferred_read_tasks(), _lock() { @@ -38,7 +37,7 @@ vespalib::string PendingMessageTracker::MessageEntry::toHtml() const { vespalib::asciistream ss; ss << "<li><i>Node " << nodeIdx << "</i>: " - << "<b>" << framework::MilliSecTime(timeStamp.count()).toString() << "</b> " + << "<b>" << vespalib::to_string(timeStamp) << "</b> " << api::MessageType::get(api::MessageType::Id(msgType)).getName() << "(" << bucket.getBucketId() << ", priority=" << priority << ")</li>\n"; return ss.str(); } @@ -46,7 +45,7 @@ PendingMessageTracker::MessageEntry::toHtml() const { PendingMessageTracker::TimePoint PendingMessageTracker::currentTime() const { - return TimePoint(_component.getClock().getTimeInMillis().getTime()); + return _component.getClock().getSystemTime(); } namespace { diff --git a/storage/src/vespa/storage/distributor/pendingmessagetracker.h b/storage/src/vespa/storage/distributor/pendingmessagetracker.h index 93238b5a83f..fb672d5ee31 100644 --- a/storage/src/vespa/storage/distributor/pendingmessagetracker.h +++ b/storage/src/vespa/storage/distributor/pendingmessagetracker.h @@ -68,13 +68,7 @@ public: virtual bool check(uint32_t messageType, uint16_t node, uint8_t priority) = 0; }; - /** - * Time point represented as the millisecond interval from the framework - * clock's epoch to a given point in time. Note that it'd be more - * semantically correct to use std::chrono::time_point, but it is bound - * to specific chrono clock types, their epochs and duration resolution. - */ - using TimePoint = std::chrono::milliseconds; + using TimePoint = vespalib::system_time; PendingMessageTracker(framework::ComponentRegister&, uint32_t stripe_index); ~PendingMessageTracker() override; @@ -119,8 +113,8 @@ public: */ std::vector<uint64_t> clearMessagesForNode(uint16_t node); - void setNodeBusyDuration(std::chrono::seconds secs) noexcept { - _nodeBusyDuration = secs; + void setNodeBusyDuration(vespalib::duration duration) noexcept { + _nodeBusyDuration = duration; } void run_once_no_pending_for_bucket(const document::Bucket& bucket, std::unique_ptr<DeferredTask> task); @@ -136,7 +130,7 @@ private: MessageEntry(TimePoint timeStamp, uint32_t msgType, uint32_t priority, uint64_t msgId, document::Bucket bucket, uint16_t nodeIdx) noexcept; - vespalib::string toHtml() const; + [[nodiscard]] vespalib::string toHtml() const; }; struct MessageIdKey : boost::multi_index::member<MessageEntry, uint64_t, &MessageEntry::msgId> {}; @@ -187,7 +181,7 @@ private: Messages _messages; framework::Component _component; NodeInfo _nodeInfo; - std::chrono::seconds _nodeBusyDuration; + vespalib::duration _nodeBusyDuration; DeferredBucketTaskMap _deferred_read_tasks; // Since distributor is currently single-threaded, this will only diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp index ec22d7c064e..db88a22d500 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp +++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp @@ -55,7 +55,7 @@ vespalib::string getNodeId(StorageComponent& sc) { return ost.str(); } -vespalib::duration TEN_MINUTES = 600s; +constexpr vespalib::duration STALE_PROTOCOL_LIFETIME = 1h; } @@ -694,7 +694,7 @@ CommunicationManager::run(framework::ThreadHandle& thread) std::lock_guard<std::mutex> guard(_earlierGenerationsLock); for (auto it(_earlierGenerations.begin()); !_earlierGenerations.empty() && - ((it->first + TEN_MINUTES) < _component.getClock().getMonotonicTime()); + ((it->first + STALE_PROTOCOL_LIFETIME) < _component.getClock().getMonotonicTime()); it = _earlierGenerations.begin()) { _earlierGenerations.erase(it); @@ -709,9 +709,8 @@ CommunicationManager::updateMetrics(const MetricLockGuard &) } void -CommunicationManager::print(std::ostream& out, bool verbose, const std::string& indent) const +CommunicationManager::print(std::ostream& out, bool , const std::string& ) const { - (void) verbose; (void) indent; out << "CommunicationManager"; } diff --git a/storage/src/vespa/storage/visiting/visitor.cpp b/storage/src/vespa/storage/visiting/visitor.cpp index 91f304ad9a0..6d36abc896e 100644 --- a/storage/src/vespa/storage/visiting/visitor.cpp +++ b/storage/src/vespa/storage/visiting/visitor.cpp @@ -121,12 +121,9 @@ Visitor::VisitorTarget::metaForMessageId(uint64_t msgId) void Visitor::VisitorTarget::discardQueuedMessages() { - for (MessageQueue::iterator - it(_queuedMessages.begin()), e(_queuedMessages.end()); - it != e; ++it) - { - LOG(spam, "Erasing queued message with id %" PRIu64, it->second); - releaseMetaForMessageId(it->second); + for (const auto & entry : _queuedMessages) { + LOG(spam, "Erasing queued message with id %" PRIu64, entry.second); + releaseMetaForMessageId(entry.second); } _queuedMessages.clear(); } @@ -310,17 +307,14 @@ Visitor::getStateName(VisitorState s) return "COMPLETED"; default: assert(!"Unknown visitor state"); - return NULL; + return nullptr; } } Visitor::VisitorState Visitor::transitionTo(VisitorState newState) { - LOG(debug, "Visitor '%s' state transition %s -> %s", - _id.c_str(), - getStateName(_state), - getStateName(newState)); + LOG(debug, "Visitor '%s' state transition %s -> %s", _id.c_str(), getStateName(_state), getStateName(newState)); VisitorState oldState = _state; _state = newState; return oldState; @@ -339,12 +333,10 @@ Visitor::mayTransitionToCompleted() const void Visitor::forceClose() { - for (std::list<BucketIterationState*>::iterator it = _bucketStates.begin(); - it != _bucketStates.end(); ++it) - { + for (auto * state : _bucketStates) { // Reset iterator id so no destroy iterator will be sent - (*it)->setIteratorId(spi::IteratorId(0)); - delete *it; + state->setIteratorId(spi::IteratorId(0)); + delete state; } _bucketStates.clear(); transitionTo(STATE_COMPLETED); @@ -358,7 +350,7 @@ Visitor::sendReplyOnce() std::shared_ptr<api::StorageReply> reply(_initiatingCmd->makeReply()); _hitCounter->updateVisitorStatistics(_visitorStatistics); - static_cast<api::CreateVisitorReply*>(reply.get())->setVisitorStatistics(_visitorStatistics); + dynamic_cast<api::CreateVisitorReply*>(reply.get())->setVisitorStatistics(_visitorStatistics); if (shouldAddMbusTrace()) { _trace.moveTraceTo(reply->getTrace()); } @@ -373,17 +365,15 @@ void Visitor::finalize() { if (_state != STATE_COMPLETED) { - LOG(error, "Attempting to finalize non-completed visitor %s", - _id.c_str()); + LOG(error, "Attempting to finalize non-completed visitor %s", _id.c_str()); assert(false); } assert(_bucketStates.empty()); if (_result.success()) { - if (_messageSession->pending() > 0) - { + if (_messageSession->pending() > 0) { _result = api::ReturnCode(api::ReturnCode::ABORTED); - try{ + try { abortedVisiting(); } catch (std::exception& e) { LOG(warning, "Visitor %s had a problem in abortVisiting(). As " @@ -404,43 +394,31 @@ Visitor::finalize() void Visitor::discardAllNoPendingBucketStates() { - for (BucketStateList::iterator - it(_bucketStates.begin()), e(_bucketStates.end()); - it != e;) - { + for (auto it = _bucketStates.begin(); it !=_bucketStates.end();) { BucketIterationState& bstate(**it); if (bstate.hasPendingControlCommand() || bstate.hasPendingIterators()) { - LOG(debug, - "Visitor '%s' not discarding bucket state %s " - "since it has pending operations", - _id.c_str(), - bstate.toString().c_str()); + LOG(debug, "Visitor '%s' not discarding bucket state %s since it has pending operations", + _id.c_str(), bstate.toString().c_str()); ++it; continue; } - LOG(debug, "Visitor '%s' discarding bucket state %s", - _id.c_str(), bstate.toString().c_str()); + LOG(debug, "Visitor '%s' discarding bucket state %s", _id.c_str(), bstate.toString().c_str()); delete *it; it = _bucketStates.erase(it); } } void -Visitor::fail(const api::ReturnCode& reason, - bool overrideExistingError) +Visitor::fail(const api::ReturnCode& reason, bool overrideExistingError) { assert(_state != STATE_COMPLETED); if (_result.getResult() < reason.getResult() || overrideExistingError) { - LOG(debug, "Setting result of visitor '%s' to %s", - _id.c_str(), reason.toString().c_str()); + LOG(debug, "Setting result of visitor '%s' to %s", _id.c_str(), reason.toString().c_str()); _result = reason; } if (_visitorTarget.hasQueuedMessages()) { - LOG(debug, "Visitor '%s' dropping %zu queued messages bound to %s " - "since visitor has failed", - _id.c_str(), - _visitorTarget._queuedMessages.size(), - _controlDestination->toString().c_str()); + LOG(debug, "Visitor '%s' dropping %zu queued messages bound to %s since visitor has failed", + _id.c_str(), _visitorTarget._queuedMessages.size(), _controlDestination->toString().c_str()); _visitorTarget.discardQueuedMessages(); } discardAllNoPendingBucketStates(); @@ -448,8 +426,7 @@ Visitor::fail(const api::ReturnCode& reason, } bool -Visitor::shouldReportProblemToClient(const api::ReturnCode& code, - size_t retryCount) const +Visitor::shouldReportProblemToClient(const api::ReturnCode& code, size_t retryCount) { // Report _once_ per message if we reach a certain retry threshold. if (retryCount == TRANSIENT_ERROR_RETRIES_BEFORE_NOTIFY) { @@ -521,7 +498,7 @@ Visitor::start(api::VisitorId id, api::StorageMessage::Id cmdId, _visitorOptions._fromTime = fromTimestamp; _visitorOptions._toTime = toTimestamp; _currentBucket = 0; - _hitCounter.reset(new HitCounter()); + _hitCounter = std::make_unique<HitCounter>(); _messageSession = std::move(messageSession); _documentPriority = documentPriority; @@ -612,8 +589,7 @@ Visitor::handleDocumentApiReply(mbus::Reply::UP reply, VisitorThreadMetrics& met uint64_t messageId = reply->getContext().value.UINT64; uint32_t removed = _visitorTarget._pendingMessages.erase(messageId); - LOG(spam, "Visitor '%s' reply %s for message ID %" PRIu64, _id.c_str(), - reply->toString().c_str(), messageId); + LOG(spam, "Visitor '%s' reply %s for message ID %" PRIu64, _id.c_str(), reply->toString().c_str(), messageId); assert(removed == 1); (void) removed; @@ -634,20 +610,16 @@ Visitor::handleDocumentApiReply(mbus::Reply::UP reply, VisitorThreadMetrics& met metrics.visitorDestinationFailureReplies.inc(); if (message->getType() == documentapi::DocumentProtocol::MESSAGE_VISITORINFO) { - LOG(debug, "Aborting visitor as we failed to talk to controller: %s", - reply->getError(0).toString().c_str()); - api::ReturnCode returnCode( - static_cast<api::ReturnCode::Result>( - reply->getError(0).getCode()), - reply->getError(0).getMessage()); + LOG(debug, "Aborting visitor as we failed to talk to controller: %s", reply->getError(0).toString().c_str()); + api::ReturnCode returnCode(static_cast<api::ReturnCode::Result>(reply->getError(0).getCode()), + reply->getError(0).getMessage()); fail(returnCode, true); close(); return; } - api::ReturnCode returnCode( - static_cast<api::ReturnCode::Result>(reply->getError(0).getCode()), - reply->getError(0).getMessage()); + api::ReturnCode returnCode(static_cast<api::ReturnCode::Result>(reply->getError(0).getCode()), + reply->getError(0).getMessage()); const bool should_fail = remap_docapi_message_error_code(returnCode); if (should_fail) { // Abort - something is wrong with target. @@ -657,8 +629,7 @@ Visitor::handleDocumentApiReply(mbus::Reply::UP reply, VisitorThreadMetrics& met } if (failed()) { - LOG(debug, "Failed to send message from visitor '%s', due to " - "%s. Not resending since visitor has failed", + LOG(debug, "Failed to send message from visitor '%s', due to %s. Not resending since visitor has failed", _id.c_str(), returnCode.toString().c_str()); return; } @@ -709,8 +680,7 @@ Visitor::onCreateIteratorReply( if (reply->getResult().failed()) { LOG(debug, "Failed to create iterator for bucket %s: %s", - bucketId.toString().c_str(), - reply->getResult().toString().c_str()); + bucketId.toString().c_str(), reply->getResult().toString().c_str()); fail(reply->getResult()); delete *it; _bucketStates.erase((++it).base()); @@ -718,17 +688,14 @@ Visitor::onCreateIteratorReply( } bucketState.setIteratorId(reply->getIteratorId()); if (failed()) { - LOG(debug, "Create iterator for bucket %s is OK, " - "but visitor has failed: %s", - bucketId.toString().c_str(), - _result.toString().c_str()); + LOG(debug, "Create iterator for bucket %s is OK, but visitor has failed: %s", + bucketId.toString().c_str(), _result.toString().c_str()); delete *it; _bucketStates.erase((++it).base()); return; } - LOG(debug, "Visitor '%s' starting to visit bucket %s.", - _id.c_str(), bucketId.toString().c_str()); + LOG(debug, "Visitor '%s' starting to visit bucket %s.", _id.c_str(), bucketId.toString().c_str()); auto cmd = std::make_shared<GetIterCommand>(bucket, bucketState.getIteratorId(), _docBlockSize); cmd->getTrace().setLevel(_traceLevel); cmd->setPriority(_priority); @@ -737,13 +704,10 @@ Visitor::onCreateIteratorReply( } void -Visitor::onGetIterReply(const std::shared_ptr<GetIterReply>& reply, - VisitorThreadMetrics& metrics) +Visitor::onGetIterReply(const std::shared_ptr<GetIterReply>& reply, VisitorThreadMetrics& metrics) { LOG(debug, "Visitor '%s' got get iter reply for bucket %s: %s", - _id.c_str(), - reply->getBucketId().toString().c_str(), - reply->getResult().toString().c_str()); + _id.c_str(), reply->getBucketId().toString().c_str(), reply->getResult().toString().c_str()); auto it = _bucketStates.rbegin(); // New requests will be pushed on end of list.. So searching @@ -763,10 +727,8 @@ Visitor::onGetIterReply(const std::shared_ptr<GetIterReply>& reply, !reply->getResult().isShutdownRelated() && !reply->getResult().isBucketDisappearance()) { - LOG(warning, "Failed to talk to persistence layer for bucket " - "%s. Aborting visitor '%s': %s", - reply->getBucketId().toString().c_str(), - _id.c_str(), reply->getResult().toString().c_str()); + LOG(warning, "Failed to talk to persistence layer for bucket %s. Aborting visitor '%s': %s", + reply->getBucketId().toString().c_str(), _id.c_str(), reply->getResult().toString().c_str()); } fail(reply->getResult()); BucketIterationState& bucketState(**it); @@ -783,17 +745,14 @@ Visitor::onGetIterReply(const std::shared_ptr<GetIterReply>& reply, bucketState.setCompleted(reply->isCompleted()); --bucketState._pendingIterators; if (!reply->getEntries().empty()) { - LOG(debug, "Processing documents in handle given from bucket %s.", - reply->getBucketId().toString().c_str()); + LOG(debug, "Processing documents in handle given from bucket %s.", reply->getBucketId().toString().c_str()); // While handling documents we should not keep locks, such // that visitor may process several things at once. if (isRunning()) { MBUS_TRACE(reply->getTrace(), 5, vespalib::make_string("Visitor %s handling block of %zu documents.", _id.c_str(), reply->getEntries().size())); - LOG(debug, "Visitor %s handling block of %zu documents.", - _id.c_str(), - reply->getEntries().size()); + LOG(debug, "Visitor %s handling block of %zu documents.", _id.c_str(), reply->getEntries().size()); try { framework::MilliSecTimer processingTimer(_component.getClock()); handleDocuments(reply->getBucketId(), reply->getEntries(), *_hitCounter); @@ -913,15 +872,11 @@ Visitor::continueVisitor() } } - LOG(debug, "No pending messages, tagging visitor '%s' complete", - _id.c_str()); + LOG(debug, "No pending messages, tagging visitor '%s' complete", _id.c_str()); transitionTo(STATE_COMPLETED); } else { - LOG(debug, "Visitor %s waiting for all commands to be replied to " - "(pending=%zu, queued=%zu)", - _id.c_str(), - _visitorTarget._pendingMessages.size(), - _visitorTarget._queuedMessages.size()); + LOG(debug, "Visitor %s waiting for all commands to be replied to (pending=%zu, queued=%zu)", + _id.c_str(), _visitorTarget._pendingMessages.size(), _visitorTarget._queuedMessages.size()); } return false; } else { @@ -981,14 +936,14 @@ Visitor::getStatus(std::ostream& out, bool verbose) const << (_visitorOptions._visitRemoves ? "true" : "false") << "</td></tr>\n"; out << "<tr><td>Control destination</td><td>"; - if (_controlDestination.get()) { + if (_controlDestination) { out << xml_content_escaped(_controlDestination->toString()); } else { out << "nil"; } out << "</td></tr>\n"; out << "<tr><td>Data destination</td><td>"; - if (_dataDestination.get()) { + if (_dataDestination) { out << xml_content_escaped(_dataDestination->toString()); } else { out << "nil"; @@ -1078,17 +1033,13 @@ Visitor::getStatus(std::ostream& out, bool verbose) const bool Visitor::getIterators() { - LOG(debug, "getIterators, visitor %s, _buckets = %zu , _bucketStates = %zu, " - "_currentBucket = %d", - _id.c_str(), _buckets.size(), - _bucketStates.size(), _currentBucket); + LOG(debug, "getIterators, visitor %s, _buckets = %zu , _bucketStates = %zu, _currentBucket = %d", + _id.c_str(), _buckets.size(), _bucketStates.size(), _currentBucket); // Don't send any further GetIters if we're closing if (!isRunning()) { if (hasPendingIterators()) { - LOG(debug, "Visitor has failed but waiting for %zu " - "buckets to finish processing", - _bucketStates.size()); + LOG(debug, "Visitor has failed but waiting for %zu buckets to finish processing", _bucketStates.size()); return true; } else { return false; @@ -1097,13 +1048,10 @@ Visitor::getIterators() // Go through buckets found. Take the first that doesn't have requested // state and request a new piece. - for (std::list<BucketIterationState*>::iterator it = _bucketStates.begin(); - it != _bucketStates.end();) - { + for (auto it = _bucketStates.begin();it != _bucketStates.end();) { assert(*it); BucketIterationState& bucketState(**it); - if ((bucketState._pendingIterators - >= _visitorOptions._maxParallelOneBucket) + if ((bucketState._pendingIterators >= _visitorOptions._maxParallelOneBucket) || bucketState.hasPendingControlCommand()) { ++it; @@ -1118,20 +1066,17 @@ Visitor::getIterators() } try{ completedBucket(bucketState.getBucketId(), *_hitCounter); - _visitorStatistics.setBucketsVisited( - _visitorStatistics.getBucketsVisited() + 1); + _visitorStatistics.setBucketsVisited(_visitorStatistics.getBucketsVisited() + 1); } catch (std::exception& e) { std::ostringstream ost; - ost << "Visitor fail to run completedBucket() notification: " - << e.what(); + ost << "Visitor fail to run completedBucket() notification: " << e.what(); reportProblem(ost.str()); } delete *it; it = _bucketStates.erase(it); continue; } - auto cmd = std::make_shared<GetIterCommand>( - bucketState.getBucket(), bucketState.getIteratorId(), _docBlockSize); + auto cmd = std::make_shared<GetIterCommand>(bucketState.getBucket(), bucketState.getIteratorId(), _docBlockSize); cmd->getTrace().setLevel(_traceLevel); cmd->setPriority(_priority); _messageHandler->send(cmd, *this); @@ -1143,7 +1088,7 @@ Visitor::getIterators() } // If there aren't anymore buckets to iterate, we're done - if (_bucketStates.size() == 0 && _currentBucket >= _buckets.size()) { + if (_bucketStates.empty() && _currentBucket >= _buckets.size()) { LOG(debug, "No more buckets to visit for visitor '%s'.", _id.c_str()); return false; } @@ -1157,17 +1102,13 @@ Visitor::getIterators() _currentBucket < _buckets.size()) { document::Bucket bucket(_bucketSpace, _buckets[_currentBucket]); - std::unique_ptr<BucketIterationState> newBucketState( - new BucketIterationState(*this, *_messageHandler, bucket)); + auto newBucketState = std::make_unique<BucketIterationState>(*this, *_messageHandler, bucket); LOG(debug, "Visitor '%s': Sending create iterator for bucket %s.", _id.c_str(), bucket.getBucketId().toString().c_str()); - spi::Selection selection - = spi::Selection(spi::DocumentSelection(_documentSelectionString)); - selection.setFromTimestamp( - spi::Timestamp(_visitorOptions._fromTime.getTime())); - selection.setToTimestamp( - spi::Timestamp(_visitorOptions._toTime.getTime())); + spi::Selection selection = spi::Selection(spi::DocumentSelection(_documentSelectionString)); + selection.setFromTimestamp(spi::Timestamp(_visitorOptions._fromTime.getTime())); + selection.setToTimestamp(spi::Timestamp(_visitorOptions._toTime.getTime())); auto cmd = std::make_shared<CreateIteratorCommand>(bucket, selection,_visitorOptions._fieldSet, _visitorOptions._visitRemoves @@ -1184,8 +1125,7 @@ Visitor::getIterators() } if (sentCount == 0) { if (LOG_WOULD_LOG(debug)) { - LOG(debug, "Enough iterators being processed. Doing nothing for " - "visitor '%s' bucketStates = %zu.", + LOG(debug, "Enough iterators being processed. Doing nothing for visitor '%s' bucketStates = %zu.", _id.c_str(), _bucketStates.size()); for (const auto& state : _bucketStates) { LOG(debug, "Existing: %s", state->toString().c_str()); diff --git a/storage/src/vespa/storage/visiting/visitor.h b/storage/src/vespa/storage/visiting/visitor.h index 0737c5612c0..9b6d8e348b9 100644 --- a/storage/src/vespa/storage/visiting/visitor.h +++ b/storage/src/vespa/storage/visiting/visitor.h @@ -136,28 +136,24 @@ private: {} /** Sends DestroyIterator over _messageHandler if _iteratorId != 0 */ - ~BucketIterationState(); + ~BucketIterationState() override; void setCompleted(bool completed = true) { _completed = completed; } - bool isCompleted() const { return _completed; } + [[nodiscard]] bool isCompleted() const { return _completed; } - document::Bucket getBucket() const { return _bucket; } - document::BucketId getBucketId() const { return _bucket.getBucketId(); } + [[nodiscard]] document::Bucket getBucket() const { return _bucket; } + [[nodiscard]] document::BucketId getBucketId() const { return _bucket.getBucketId(); } void setIteratorId(spi::IteratorId iteratorId) { _iteratorId = iteratorId; } - spi::IteratorId getIteratorId() const { return _iteratorId; } + [[nodiscard]] spi::IteratorId getIteratorId() const { return _iteratorId; } - void setPendingControlCommand() { - _iteratorId = spi::IteratorId(0); - } - - bool hasPendingControlCommand() const { + [[nodiscard]] bool hasPendingControlCommand() const { return _iteratorId == spi::IteratorId(0); } - bool hasPendingIterators() const { return _pendingIterators > 0; } + [[nodiscard]] bool hasPendingIterators() const { return _pendingIterators > 0; } void print(std::ostream& out, bool, const std::string& ) const override { out << "BucketIterationState(" @@ -247,12 +243,10 @@ private: MessageMeta releaseMetaForMessageId(uint64_t msgId); void reinsertMeta(MessageMeta); - bool hasQueuedMessages() const { return !_queuedMessages.empty(); } + [[nodiscard]] bool hasQueuedMessages() const { return !_queuedMessages.empty(); } void discardQueuedMessages(); - uint32_t getMemoryUsage() const noexcept { - return _memoryUsage; - } + [[nodiscard]] uint32_t getMemoryUsage() const noexcept { return _memoryUsage; } VisitorTarget(); ~VisitorTarget(); @@ -326,9 +320,9 @@ protected: std::string _documentSelectionString; vdslib::VisitorStatistics _visitorStatistics; - bool isCompletedCalled() const { return _calledCompletedVisitor; } + [[nodiscard]] bool isCompletedCalled() const { return _calledCompletedVisitor; } - uint32_t traceLevel() const noexcept { return _traceLevel; } + [[nodiscard]] uint32_t traceLevel() const noexcept { return _traceLevel; } /** * Attempts to add the given trace message to the internal, memory bounded @@ -339,7 +333,7 @@ protected: */ bool addBoundedTrace(uint32_t level, const vespalib::string& message); - const vdslib::Parameters& visitor_parameters() const noexcept; + [[nodiscard]] const vdslib::Parameters& visitor_parameters() const noexcept; // Possibly modifies the ReturnCode parameter in-place if its return code should // be changed based on visitor subclass-specific behavior. @@ -417,7 +411,7 @@ public: * The consistency level provided here is propagated through the SPI * Context object for createIterator calls. */ - virtual spi::ReadConsistency getRequiredReadConsistency() const { + [[nodiscard]] virtual spi::ReadConsistency getRequiredReadConsistency() const { return spi::ReadConsistency::STRONG; } @@ -428,8 +422,7 @@ public: /** * Used to silence transient errors that can happen during normal operation. */ - bool shouldReportProblemToClient(const api::ReturnCode&, - size_t retryCount) const; + [[nodiscard]] static bool shouldReportProblemToClient(const api::ReturnCode&, size_t retryCount) ; /** Called to send report to client of potential non-critical problems. */ void reportProblem(const std::string& problem); @@ -492,18 +485,16 @@ public: void getStatus(std::ostream& out, bool verbose) const; - void setMaxParallel(uint32_t maxParallel) - { _visitorOptions._maxParallel = maxParallel; } - void setMaxParallelPerBucket(uint32_t max) - { _visitorOptions._maxParallelOneBucket = max; } + void setMaxParallel(uint32_t maxParallel) { _visitorOptions._maxParallel = maxParallel; } + void setMaxParallelPerBucket(uint32_t max) { _visitorOptions._maxParallelOneBucket = max; } /** * Sends a message to the data handler for this visitor. */ void sendMessage(std::unique_ptr<documentapi::DocumentMessage> documentMessage); - bool isRunning() const { return _state == STATE_RUNNING; } - bool isCompleted() const { return _state == STATE_COMPLETED; } + [[nodiscard]] bool isRunning() const { return _state == STATE_RUNNING; } + [[nodiscard]] bool isCompleted() const { return _state == STATE_COMPLETED; } private: /** @@ -542,11 +533,9 @@ private: void sendReplyOnce(); - bool hasFailedVisiting() const { return _result.failed(); } - - bool hasPendingIterators() const { return !_bucketStates.empty(); } - - bool mayTransitionToCompleted() const; + [[nodiscard]] bool hasFailedVisiting() const { return _result.failed(); } + [[nodiscard]] bool hasPendingIterators() const { return !_bucketStates.empty(); } + [[nodiscard]] bool mayTransitionToCompleted() const; void discardAllNoPendingBucketStates(); @@ -565,9 +554,7 @@ private: * * Precondition: attach() must have been called on `this`. */ - bool shouldAddMbusTrace() const noexcept { - return _traceLevel != 0; - } + [[nodiscard]] bool shouldAddMbusTrace() const noexcept { return _traceLevel != 0; } /** * Set internal state to the given state value. diff --git a/storage/src/vespa/storage/visiting/visitormanager.cpp b/storage/src/vespa/storage/visiting/visitormanager.cpp index a03b9a9a8a3..07938002746 100644 --- a/storage/src/vespa/storage/visiting/visitormanager.cpp +++ b/storage/src/vespa/storage/visiting/visitormanager.cpp @@ -187,9 +187,8 @@ VisitorManager::configure(std::unique_ptr<vespa::config::content::core::StorVisi for (int32_t i=0; i<config->visitorthreads; ++i) { _visitorThread.emplace_back( // Naked new due to a lot of private inheritance in VisitorThread and VisitorManager - std::shared_ptr<VisitorThread>( - new VisitorThread(i, _componentRegister, _messageSessionFactory, - _visitorFactories, *_metrics->threads[i], *this)), + std::shared_ptr<VisitorThread>(new VisitorThread(i, _componentRegister, _messageSessionFactory, + _visitorFactories, *_metrics->threads[i], *this)), std::map<api::VisitorId, std::string>()); } } @@ -450,8 +449,7 @@ VisitorManager::processReply(const std::shared_ptr<api::StorageReply>& reply) } void -VisitorManager::send(const std::shared_ptr<api::StorageCommand>& cmd, - Visitor& visitor) +VisitorManager::send(const std::shared_ptr<api::StorageCommand>& cmd, Visitor& visitor) { assert(cmd->getType() == api::MessageType::INTERNAL); // Only add to internal state if not destroy iterator command, as @@ -460,7 +458,7 @@ VisitorManager::send(const std::shared_ptr<api::StorageCommand>& cmd, if (static_cast<const api::InternalCommand&>(*cmd).getType() != DestroyIteratorCommand::ID) { MessageInfo inf; inf.id = visitor.getVisitorId(); - inf.timestamp = _component.getClock().getTimeInSeconds().getTime(); + inf.timestamp = _component.getClock().getSystemTime(); inf.timeout = cmd->getTimeout(); if (cmd->getAddress()) { @@ -623,7 +621,7 @@ VisitorManager::reportHtmlStatus(std::ostream& out, out << "<tr>" << "<td>" << entry.first << "</td>" << "<td>" << entry.second.id << "</td>" - << "<td>" << entry.second.timestamp << "</td>" + << "<td>" << vespalib::to_string(entry.second.timestamp) << "</td>" << "<td>" << vespalib::count_ms(entry.second.timeout) << "</td>" << "<td>" << xml_content_escaped(entry.second.destination) << "</td>" << "</tr>\n"; diff --git a/storage/src/vespa/storage/visiting/visitormanager.h b/storage/src/vespa/storage/visiting/visitormanager.h index 33703b392bc..3e331e1c9a2 100644 --- a/storage/src/vespa/storage/visiting/visitormanager.h +++ b/storage/src/vespa/storage/visiting/visitormanager.h @@ -57,7 +57,7 @@ private: struct MessageInfo { api::VisitorId id; - time_t timestamp; + vespalib::system_time timestamp; vespalib::duration timeout; std::string destination; }; @@ -168,9 +168,7 @@ private: * by the formula: fixed + variable * ((255 - priority) / 255) */ uint32_t maximumConcurrent(const api::CreateVisitorCommand& cmd) const { - return _maxFixedConcurrentVisitors + static_cast<uint32_t>( - _maxVariableConcurrentVisitors - * ((255.0 - cmd.getPriority()) / 255.0)); + return _maxFixedConcurrentVisitors + static_cast<uint32_t>(_maxVariableConcurrentVisitors * ((255.0 - cmd.getPriority()) / 255.0)); } void updateMetrics(const MetricLockGuard &) override; diff --git a/storage/src/vespa/storage/visiting/visitorthread.cpp b/storage/src/vespa/storage/visiting/visitorthread.cpp index 55ef83ba658..e3ebef3a3ef 100644 --- a/storage/src/vespa/storage/visiting/visitorthread.cpp +++ b/storage/src/vespa/storage/visiting/visitorthread.cpp @@ -126,10 +126,10 @@ VisitorThread::shutdown() if (event._message.get()) { if (!event._message->getType().isReply() && (event._message->getType() != api::MessageType::INTERNAL - || static_cast<const api::InternalCommand&>(*event._message).getType() != PropagateVisitorConfig::ID)) + || dynamic_cast<const api::InternalCommand&>(*event._message).getType() != PropagateVisitorConfig::ID)) { std::shared_ptr<api::StorageReply> reply( - static_cast<api::StorageCommand&>(*event._message).makeReply()); + dynamic_cast<api::StorageCommand&>(*event._message).makeReply()); reply->setResult(api::ReturnCode(api::ReturnCode::ABORTED, "Shutting down storage node.")); _messageSender.send(reply); } @@ -197,7 +197,7 @@ VisitorThread::run(framework::ThreadHandle& thread) // disappear when no visiting is done) if (entry._message.get() && (entry._message->getType() != api::MessageType::INTERNAL - || static_cast<api::InternalCommand&>(*entry._message).getType() != PropagateVisitorConfig::ID)) + || dynamic_cast<api::InternalCommand&>(*entry._message).getType() != PropagateVisitorConfig::ID)) { entry._timer.stop(_metrics.averageQueueWaitingTime); } @@ -290,7 +290,7 @@ VisitorThread::close() } else { _metrics.completedVisitors.inc(1); } - framework::SecondTime currentTime(_component.getClock().getTimeInSeconds()); + vespalib::steady_time currentTime(_component.getClock().getMonotonicTime()); trimRecentlyCompletedList(currentTime); _recentlyCompleted.emplace_back(_currentlyRunningVisitor->first, currentTime); _visitors.erase(_currentlyRunningVisitor); @@ -298,9 +298,9 @@ VisitorThread::close() } void -VisitorThread::trimRecentlyCompletedList(framework::SecondTime currentTime) +VisitorThread::trimRecentlyCompletedList(vespalib::steady_time currentTime) { - framework::SecondTime recentLimit(currentTime - framework::SecondTime(30)); + vespalib::steady_time recentLimit(currentTime - 30s); // Dump all elements that aren't recent anymore while (!_recentlyCompleted.empty() && _recentlyCompleted.front().second < recentLimit) @@ -313,8 +313,7 @@ void VisitorThread::handleNonExistingVisitorCall(const Event& entry, ReturnCode& code) { // Get current time. Set the time that is the oldest still recent. - framework::SecondTime currentTime(_component.getClock().getTimeInSeconds()); - trimRecentlyCompletedList(currentTime); + trimRecentlyCompletedList(_component.getClock().getMonotonicTime()); // Go through all recent visitors. Ignore request if recent for (const auto& e : _recentlyCompleted) { @@ -344,7 +343,7 @@ VisitorThread::createVisitor(vespalib::stringref libName, auto it = _visitorFactories.find(str); if (it == _visitorFactories.end()) { error << "Visitor library " << str << " not found."; - return std::shared_ptr<Visitor>(); + return {}; } auto libIter = _libs.find(str); @@ -363,7 +362,7 @@ VisitorThread::createVisitor(vespalib::stringref libName, } catch (std::exception& e) { error << "Failed to create visitor instance of type " << libName << ": " << e.what(); - return std::shared_ptr<Visitor>(); + return {}; } } @@ -690,7 +689,7 @@ VisitorThread::getStatus(vespalib::asciistream& out, } for (const auto& cv : _recentlyCompleted) { out << "<li> Visitor " << cv.first << " done at " - << cv.second.getTime() << "\n"; + << vespalib::to_string(vespalib::to_utc(cv.second)) << "\n"; } out << "</ul>\n"; out << "<h3>Current queue size: " << _queue.size() << "</h3>\n"; @@ -736,12 +735,10 @@ VisitorThread::getStatus(vespalib::asciistream& out, if (_visitors.empty()) { out << "None\n"; } - for (VisitorMap::const_iterator it = _visitors.begin(); - it != _visitors.end(); ++it) - { - out << "<a href=\"?visitor=" << it->first + for (const auto & v : _visitors) { + out << "<a href=\"?visitor=" << v.first << (verbose ? "&verbose" : "") << "\">Visitor " - << it->first << "</a><br>\n"; + << v.first << "</a><br>\n"; } } } diff --git a/storage/src/vespa/storage/visiting/visitorthread.h b/storage/src/vespa/storage/visiting/visitorthread.h index 226e7c0631b..56e40328fda 100644 --- a/storage/src/vespa/storage/visiting/visitorthread.h +++ b/storage/src/vespa/storage/visiting/visitorthread.h @@ -38,7 +38,7 @@ class VisitorThread : public framework::Runnable, using VisitorMap = std::map<api::VisitorId, std::shared_ptr<Visitor>>; VisitorMap _visitors; - std::deque<std::pair<api::VisitorId, framework::SecondTime>> _recentlyCompleted; + std::deque<std::pair<api::VisitorId, vespalib::steady_time>> _recentlyCompleted; struct Event { enum class Type { @@ -118,7 +118,7 @@ private: */ Event popNextQueuedEventIfAvailable(); void tick(); - void trimRecentlyCompletedList(framework::SecondTime currentTime); + void trimRecentlyCompletedList(vespalib::steady_time currentTime); void handleNonExistingVisitorCall(const Event& entry, api::ReturnCode& code); std::shared_ptr<Visitor> createVisitor(vespalib::stringref libName, diff --git a/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/identityprovider/api/EntityBindingsMapper.java b/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/identityprovider/api/EntityBindingsMapper.java index 110bd5e885e..9b7b666e353 100644 --- a/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/identityprovider/api/EntityBindingsMapper.java +++ b/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/identityprovider/api/EntityBindingsMapper.java @@ -25,7 +25,7 @@ import static com.yahoo.vespa.athenz.identityprovider.api.VespaUniqueInstanceId. */ public class EntityBindingsMapper { - private static final ObjectMapper mapper = new ObjectMapper().registerModule(new JavaTimeModule()); + static final ObjectMapper mapper = new ObjectMapper().registerModule(new JavaTimeModule()); private EntityBindingsMapper() {} @@ -37,6 +37,14 @@ public class EntityBindingsMapper { } } + public static SignedIdentityDocument fromInputStream(InputStream in) throws IOException { + return EntityBindingsMapper.toSignedIdentityDocument(mapper.readValue(in, SignedIdentityDocumentEntity.class)); + } + + public static SignedIdentityDocument fromString(String json) throws IOException { + return EntityBindingsMapper.toSignedIdentityDocument(mapper.readValue(json, SignedIdentityDocumentEntity.class)); + } + public static SignedIdentityDocument toSignedIdentityDocument(SignedIdentityDocumentEntity entity) { return new SignedIdentityDocument( entity.signature(), @@ -49,7 +57,8 @@ public class EntityBindingsMapper { entity.createdAt(), entity.ipAddresses(), IdentityType.fromId(entity.identityType()), - Optional.ofNullable(entity.clusterType()).map(ClusterType::from).orElse(null)); + Optional.ofNullable(entity.clusterType()).map(ClusterType::from).orElse(null), + entity.unknownAttributes()); } public static SignedIdentityDocumentEntity toSignedIdentityDocumentEntity(SignedIdentityDocument model) { @@ -64,13 +73,13 @@ public class EntityBindingsMapper { model.createdAt(), model.ipAddresses(), model.identityType().id(), - Optional.ofNullable(model.clusterType()).map(ClusterType::toConfigValue).orElse(null)); + Optional.ofNullable(model.clusterType()).map(ClusterType::toConfigValue).orElse(null), + model.unknownAttributes()); } public static SignedIdentityDocument readSignedIdentityDocumentFromFile(Path file) { try (InputStream inputStream = Files.newInputStream(file)) { - SignedIdentityDocumentEntity entity = mapper.readValue(inputStream, SignedIdentityDocumentEntity.class); - return EntityBindingsMapper.toSignedIdentityDocument(entity); + return fromInputStream(inputStream); } catch (IOException e) { throw new UncheckedIOException(e); } diff --git a/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/identityprovider/api/SignedIdentityDocument.java b/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/identityprovider/api/SignedIdentityDocument.java index e331fc1f6e8..a08f0d3391d 100644 --- a/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/identityprovider/api/SignedIdentityDocument.java +++ b/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/identityprovider/api/SignedIdentityDocument.java @@ -4,17 +4,33 @@ package com.yahoo.vespa.athenz.identityprovider.api; import com.yahoo.vespa.athenz.api.AthenzService; import java.time.Instant; +import java.util.Map; import java.util.Set; /** - * A signed identity document + * A signed identity document. + * The {@link #unknownAttributes()} member provides forward compatibility and ensures any new/unknown fields are kept intact when serialized to JSON. * * @author bjorncs */ public record SignedIdentityDocument(String signature, int signingKeyVersion, VespaUniqueInstanceId providerUniqueId, AthenzService providerService, int documentVersion, String configServerHostname, String instanceHostname, Instant createdAt, Set<String> ipAddresses, - IdentityType identityType, ClusterType clusterType) { + IdentityType identityType, ClusterType clusterType, Map<String, Object> unknownAttributes) { + + public SignedIdentityDocument { + ipAddresses = Set.copyOf(ipAddresses); + unknownAttributes = Map.copyOf(unknownAttributes); + } + + public SignedIdentityDocument(String signature, int signingKeyVersion, VespaUniqueInstanceId providerUniqueId, + AthenzService providerService, int documentVersion, String configServerHostname, + String instanceHostname, Instant createdAt, Set<String> ipAddresses, + IdentityType identityType, ClusterType clusterType) { + this(signature, signingKeyVersion, providerUniqueId, providerService, documentVersion, configServerHostname, + instanceHostname, createdAt, ipAddresses, identityType, clusterType, Map.of()); + } + public static final int DEFAULT_DOCUMENT_VERSION = 2; } diff --git a/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/identityprovider/api/bindings/SignedIdentityDocumentEntity.java b/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/identityprovider/api/bindings/SignedIdentityDocumentEntity.java index 2fb709615da..c37dd2f9147 100644 --- a/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/identityprovider/api/bindings/SignedIdentityDocumentEntity.java +++ b/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/identityprovider/api/bindings/SignedIdentityDocumentEntity.java @@ -1,25 +1,51 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.athenz.identityprovider.api.bindings; -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonAnyGetter; +import com.fasterxml.jackson.annotation.JsonAnySetter; +import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import java.time.Instant; +import java.util.HashMap; +import java.util.Map; import java.util.Set; /** * @author bjorncs */ -@JsonIgnoreProperties(ignoreUnknown = true) -public record SignedIdentityDocumentEntity(@JsonProperty("signature") String signature, - @JsonProperty("signing-key-version") int signingKeyVersion, - @JsonProperty("provider-unique-id") String providerUniqueId, - @JsonProperty("provider-service") String providerService, - @JsonProperty("document-version") int documentVersion, - @JsonProperty("configserver-hostname") String configServerHostname, - @JsonProperty("instance-hostname") String instanceHostname, - @JsonProperty("created-at") Instant createdAt, - @JsonProperty("ip-addresses") Set<String> ipAddresses, - @JsonProperty("identity-type") String identityType, - @JsonProperty("cluster-type") String clusterType) { +public record SignedIdentityDocumentEntity( + String signature, int signingKeyVersion, String providerUniqueId, String providerService, int documentVersion, + String configServerHostname, String instanceHostname, Instant createdAt, Set<String> ipAddresses, + String identityType, String clusterType, Map<String, Object> unknownAttributes) { + + @JsonCreator + public SignedIdentityDocumentEntity(@JsonProperty("signature") String signature, + @JsonProperty("signing-key-version") int signingKeyVersion, + @JsonProperty("provider-unique-id") String providerUniqueId, + @JsonProperty("provider-service") String providerService, + @JsonProperty("document-version") int documentVersion, + @JsonProperty("configserver-hostname") String configServerHostname, + @JsonProperty("instance-hostname") String instanceHostname, + @JsonProperty("created-at") Instant createdAt, + @JsonProperty("ip-addresses") Set<String> ipAddresses, + @JsonProperty("identity-type") String identityType, + @JsonProperty("cluster-type") String clusterType) { + this(signature, signingKeyVersion, providerUniqueId, providerService, documentVersion, configServerHostname, + instanceHostname, createdAt, ipAddresses, identityType, clusterType, new HashMap<>()); + } + + @JsonProperty("signature") @Override public String signature() { return signature; } + @JsonProperty("signing-key-version") @Override public int signingKeyVersion() { return signingKeyVersion; } + @JsonProperty("provider-unique-id") @Override public String providerUniqueId() { return providerUniqueId; } + @JsonProperty("provider-service") @Override public String providerService() { return providerService; } + @JsonProperty("document-version") @Override public int documentVersion() { return documentVersion; } + @JsonProperty("configserver-hostname") @Override public String configServerHostname() { return configServerHostname; } + @JsonProperty("instance-hostname") @Override public String instanceHostname() { return instanceHostname; } + @JsonProperty("created-at") @Override public Instant createdAt() { return createdAt; } + @JsonProperty("ip-addresses") @Override public Set<String> ipAddresses() { return ipAddresses; } + @JsonProperty("identity-type") @Override public String identityType() { return identityType; } + @JsonProperty("cluster-type") @Override public String clusterType() { return clusterType; } + @JsonAnyGetter @Override public Map<String, Object> unknownAttributes() { return unknownAttributes; } + @JsonAnySetter public void set(String name, Object value) { unknownAttributes.put(name, value); } } diff --git a/vespa-athenz/src/test/java/com/yahoo/vespa/athenz/identityprovider/api/EntityBindingsMapperTest.java b/vespa-athenz/src/test/java/com/yahoo/vespa/athenz/identityprovider/api/EntityBindingsMapperTest.java new file mode 100644 index 00000000000..f8c119190a6 --- /dev/null +++ b/vespa-athenz/src/test/java/com/yahoo/vespa/athenz/identityprovider/api/EntityBindingsMapperTest.java @@ -0,0 +1,47 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +package com.yahoo.vespa.athenz.identityprovider.api; + +import org.junit.jupiter.api.Test; + +import java.io.IOException; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * @author bjorncs + */ +class EntityBindingsMapperTest { + + @Test + public void persists_unknown_json_members() throws IOException { + var originalJson = + """ + { + "signature": "sig", + "signing-key-version": 0, + "provider-unique-id": "0.cluster.instance.app.tenant.us-west-1.test.node", + "provider-service": "domain.service", + "document-version": 2, + "configserver-hostname": "cfg", + "instance-hostname": "host", + "created-at": 12345.0, + "ip-addresses": [], + "identity-type": "node", + "cluster-type": "admin", + "unknown-string": "string-value", + "unknown-object": { "member-in-unknown-object": 123 } + } + """; + var entity = EntityBindingsMapper.fromString(originalJson); + assertEquals(2, entity.unknownAttributes().size(), entity.unknownAttributes().toString()); + var json = EntityBindingsMapper.toAttestationData(entity); + + var expectedMemberInJson = "member-in-unknown-object"; + assertTrue(json.contains(expectedMemberInJson), + () -> "Expected JSON to contain '%s', but got \n'%s'".formatted(expectedMemberInJson, json)); + assertEquals(EntityBindingsMapper.mapper.readTree(originalJson), EntityBindingsMapper.mapper.readTree(json)); + } + +}
\ No newline at end of file diff --git a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperServer.java b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperServer.java index 68fcbcc4983..13b395a97f2 100644 --- a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperServer.java +++ b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperServer.java @@ -4,7 +4,9 @@ package com.yahoo.vespa.zookeeper; import java.nio.file.Path; /** - * Interface for a component that starts/stops a ZooKeeper server. + * Interface for a component that starts/stops a ZooKeeper server. Implementations should make sure + * that the server is up and accepts connection (typically by returning from constructor only after + * writing a node to ZooKeeper successfully). * * @author hmusum */ diff --git a/zookeeper-server/zookeeper-server/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java b/zookeeper-server/zookeeper-server/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java index 8161e4bba4b..5bf7c9bd3e9 100644 --- a/zookeeper-server/zookeeper-server/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java +++ b/zookeeper-server/zookeeper-server/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java @@ -12,6 +12,8 @@ import java.time.Duration; * Starts or reconfigures zookeeper cluster. * The QuorumPeer conditionally created here is owned by the Reconfigurer; * when it already has a peer, that peer is used here in case start or shutdown is required. + * Guarantees that server is up by writing a node to ZooKeeper successfully before + * returning from constructor. * * @author hmusum */ diff --git a/zookeeper-server/zookeeper-server/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperServerImpl.java b/zookeeper-server/zookeeper-server/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperServerImpl.java index 8adabeedb1b..bb2b02c5c9b 100644 --- a/zookeeper-server/zookeeper-server/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperServerImpl.java +++ b/zookeeper-server/zookeeper-server/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperServerImpl.java @@ -9,6 +9,9 @@ import java.nio.file.Path; import java.time.Duration; /** + * ZooKeeper server. Guarantees that the server is up by writing a node to ZooKeeper successfully before + * returning from constructor. + * * @author Ulf Lilleengen * @author Harald Musum */ |