diff options
51 files changed, 189 insertions, 219 deletions
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/admin/monitoring/VespaMetricSet.java b/config-model/src/main/java/com/yahoo/vespa/model/admin/monitoring/VespaMetricSet.java index db716866f15..d8c59ebda65 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/admin/monitoring/VespaMetricSet.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/admin/monitoring/VespaMetricSet.java @@ -797,6 +797,9 @@ public class VespaMetricSet { metrics.add(new Metric("vds.mergethrottler.queuesize.max")); metrics.add(new Metric("vds.mergethrottler.queuesize.sum")); metrics.add(new Metric("vds.mergethrottler.queuesize.count")); + metrics.add(new Metric("vds.mergethrottler.active_window_size.max")); + metrics.add(new Metric("vds.mergethrottler.active_window_size.sum")); + metrics.add(new Metric("vds.mergethrottler.active_window_size.count")); metrics.add(new Metric("vds.mergethrottler.bounced_due_to_back_pressure.rate")); metrics.add(new Metric("vds.mergethrottler.locallyexecutedmerges.ok.rate")); metrics.add(new Metric("vds.mergethrottler.mergechains.ok.rate")); diff --git a/metrics/src/vespa/metrics/valuemetric.h b/metrics/src/vespa/metrics/valuemetric.h index 3470aa067e0..9d5f0a82735 100644 --- a/metrics/src/vespa/metrics/valuemetric.h +++ b/metrics/src/vespa/metrics/valuemetric.h @@ -37,7 +37,6 @@ protected: void logWarning(const char* msg, const char *op) const; void logNonFiniteValueWarning() const; - void sendLogEvent(Metric::String name, double value) const; }; template<typename AvgVal, typename TotVal, bool SumOnAdd> @@ -86,7 +85,7 @@ public: ~ValueMetric(); MetricValueClass::UP getValues() const override { - return MetricValueClass::UP(new Values(_values.getValues())); + return std::make_unique<Values>(_values.getValues()); } void unsetOnZeroValue() { _values.setFlag(UNSET_ON_ZERO_VALUE); } diff --git a/metrics/src/vespa/metrics/valuemetric.hpp b/metrics/src/vespa/metrics/valuemetric.hpp index a9522dcec29..5e0ef95e9e5 100644 --- a/metrics/src/vespa/metrics/valuemetric.hpp +++ b/metrics/src/vespa/metrics/valuemetric.hpp @@ -26,7 +26,7 @@ ValueMetric<AvgVal, TotVal, SumOnAdd>::ValueMetric( {} template<typename AvgVal, typename TotVal, bool SumOnAdd> -ValueMetric<AvgVal, TotVal, SumOnAdd>::~ValueMetric() { } +ValueMetric<AvgVal, TotVal, SumOnAdd>::~ValueMetric() = default; template<typename AvgVal, typename TotVal, bool SumOnAdd> void ValueMetric<AvgVal, TotVal, SumOnAdd>::inc(AvgVal incVal) @@ -239,8 +239,7 @@ ValueMetric<AvgVal, TotVal, SumOnAdd>::getDoubleValue(stringref id) const template<typename AvgVal, typename TotVal, bool SumOnAdd> void -ValueMetric<AvgVal, TotVal, SumOnAdd>::addMemoryUsage( - MemoryConsumption& mc) const +ValueMetric<AvgVal, TotVal, SumOnAdd>::addMemoryUsage(MemoryConsumption& mc) const { ++mc._valueMetricCount; mc._valueMetricValues += _values.getMemoryUsageAllocatedInternally(); diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/identity/AthenzCredentialsMaintainer.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/identity/AthenzCredentialsMaintainer.java index a237ec6dd4f..280e58c91f1 100644 --- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/identity/AthenzCredentialsMaintainer.java +++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/identity/AthenzCredentialsMaintainer.java @@ -24,7 +24,6 @@ import com.yahoo.vespa.hosted.node.admin.container.ContainerName; import com.yahoo.vespa.hosted.node.admin.component.ConfigServerInfo; import com.yahoo.vespa.hosted.node.admin.nodeagent.NodeAgentContext; import com.yahoo.vespa.hosted.node.admin.nodeagent.NodeAgentTask; -import com.yahoo.vespa.hosted.node.admin.nodeagent.VespaUser; import com.yahoo.vespa.hosted.node.admin.task.util.file.FileFinder; import com.yahoo.vespa.hosted.node.admin.task.util.file.UnixPath; import com.yahoo.vespa.hosted.node.admin.task.util.fs.ContainerPath; @@ -207,7 +206,7 @@ public class AthenzCredentialsMaintainer implements CredentialsMaintainer { EntityBindingsMapper.toAttestationData(signedIdentityDocument), csr); EntityBindingsMapper.writeSignedIdentityDocumentToFile(identityDocumentFile, signedIdentityDocument); - writePrivateKeyAndCertificate(context.vespaUser(), + writePrivateKeyAndCertificate(context.userNamespace().vespaUserId(), privateKeyFile, keyPair.getPrivate(), certificateFile, instanceIdentity.certificate()); context.log(logger, "Instance successfully registered and credentials written to file"); } @@ -235,7 +234,7 @@ public class AthenzCredentialsMaintainer implements CredentialsMaintainer { context.identity(), identityDocument.providerUniqueId().asDottedString(), csr); - writePrivateKeyAndCertificate(context.vespaUser(), + writePrivateKeyAndCertificate(context.userNamespace().vespaUserId(), privateKeyFile, keyPair.getPrivate(), certificateFile, instanceIdentity.certificate()); context.log(logger, "Instance successfully refreshed and credentials written to file"); } catch (ZtsClientException e) { @@ -252,19 +251,19 @@ public class AthenzCredentialsMaintainer implements CredentialsMaintainer { } - private static void writePrivateKeyAndCertificate(VespaUser vespaUser, + private static void writePrivateKeyAndCertificate(int vespaUid, ContainerPath privateKeyFile, PrivateKey privateKey, ContainerPath certificateFile, X509Certificate certificate) { - writeFile(privateKeyFile, vespaUser, KeyUtils.toPem(privateKey)); - writeFile(certificateFile, vespaUser, X509CertificateUtils.toPem(certificate)); + writeFile(privateKeyFile, vespaUid, KeyUtils.toPem(privateKey)); + writeFile(certificateFile, vespaUid, X509CertificateUtils.toPem(certificate)); } - private static void writeFile(ContainerPath path, VespaUser vespaUser, String utf8Content) { + private static void writeFile(ContainerPath path, int vespaUid, String utf8Content) { new UnixPath(path.resolveSibling(path.getFileName() + ".tmp")) .writeUtf8File(utf8Content, "r--------") - .setOwnerId(vespaUser.uid()) + .setOwnerId(vespaUid) .atomicMove(path); } diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/servicedump/VespaServiceDumperImpl.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/servicedump/VespaServiceDumperImpl.java index b299e1f3f0d..0a9496be0a6 100644 --- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/servicedump/VespaServiceDumperImpl.java +++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/servicedump/VespaServiceDumperImpl.java @@ -101,8 +101,8 @@ public class VespaServiceDumperImpl implements VespaServiceDumper { } context.log(log, Level.INFO, "Creating '" + unixPathDirectory +"'."); unixPathDirectory.createDirectory("rwxr-x---") - .setOwner(context.vespaUser().name()) - .setGroup(context.vespaUser().group()); + .setOwner(context.userNamespace().vespaUser()) + .setGroup(context.userNamespace().vespaGroup()); URI destination = serviceDumpDestination(nodeSpec, createDumpId(request)); ProducerContext producerCtx = new ProducerContext(context, directory, request); List<Artifact> producedArtifacts = new ArrayList<>(); diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeadmin/NodeAdminStateUpdater.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeadmin/NodeAdminStateUpdater.java index dda404797d9..53c9e741f59 100644 --- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeadmin/NodeAdminStateUpdater.java +++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeadmin/NodeAdminStateUpdater.java @@ -3,6 +3,7 @@ package com.yahoo.vespa.hosted.node.admin.nodeadmin; import com.yahoo.concurrent.ThreadFactoryFactory; import com.yahoo.config.provision.HostName; +import com.yahoo.vespa.flags.FlagSource; import com.yahoo.vespa.hosted.node.admin.configserver.noderepository.Acl; import com.yahoo.vespa.hosted.node.admin.configserver.noderepository.NodeRepository; import com.yahoo.vespa.hosted.node.admin.configserver.noderepository.NodeSpec; @@ -12,6 +13,7 @@ import com.yahoo.vespa.hosted.node.admin.nodeagent.NodeAgentContext; import com.yahoo.vespa.hosted.node.admin.nodeagent.NodeAgentContextFactory; import com.yahoo.yolean.Exceptions; +import java.time.Clock; import java.time.Duration; import java.util.ArrayList; import java.util.EnumSet; @@ -57,7 +59,9 @@ public class NodeAdminStateUpdater { NodeRepository nodeRepository, Orchestrator orchestrator, NodeAdmin nodeAdmin, - HostName hostHostname) { + HostName hostHostname, + Clock clock, + FlagSource flagSource) { this.nodeAgentContextFactory = nodeAgentContextFactory; this.nodeRepository = nodeRepository; this.orchestrator = orchestrator; diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentContext.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentContext.java index f3148cc7859..8cf8553bc34 100644 --- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentContext.java +++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentContext.java @@ -42,8 +42,7 @@ public interface NodeAgentContext extends TaskContext { ZoneApi zone(); - /** @return information about the Vespa user inside the container */ - VespaUser vespaUser(); + UserNamespace userNamespace(); default boolean isDisabled(NodeAgentTask task) { return false; diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentContextImpl.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentContextImpl.java index 771528324e0..9bcf5d58d6e 100644 --- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentContextImpl.java +++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentContextImpl.java @@ -42,14 +42,16 @@ public class NodeAgentContextImpl implements NodeAgentContext { private final ZoneApi zone; private final ContainerFileSystem containerFs; private final ContainerPath pathToVespaHome; + private final UserNamespace userNamespace; private final double cpuSpeedup; private final Set<NodeAgentTask> disabledNodeAgentTasks; private final Optional<ApplicationId> hostExclusiveTo; public NodeAgentContextImpl(NodeSpec node, Acl acl, AthenzIdentity identity, ContainerNetworkMode containerNetworkMode, ZoneApi zone, - FlagSource flagSource, ContainerFileSystem containerFs, String pathToVespaHome, - double cpuSpeedup, Optional<ApplicationId> hostExclusiveTo) { + FlagSource flagSource, Path pathToContainerStorage, String pathToVespaHome, + UserNamespace userNamespace, double cpuSpeedup, + Optional<ApplicationId> hostExclusiveTo) { if (cpuSpeedup <= 0) throw new IllegalArgumentException("cpuSpeedUp must be positive, was: " + cpuSpeedup); @@ -59,9 +61,10 @@ public class NodeAgentContextImpl implements NodeAgentContext { this.identity = Objects.requireNonNull(identity); this.containerNetworkMode = Objects.requireNonNull(containerNetworkMode); this.zone = Objects.requireNonNull(zone); - this.containerFs = Objects.requireNonNull(containerFs); + this.containerFs = ContainerFileSystem.create(pathToContainerStorage.resolve(containerName.asString()), userNamespace); this.pathToVespaHome = containerFs.getPath(pathToVespaHome); this.logPrefix = containerName.asString() + ": "; + this.userNamespace = Objects.requireNonNull(userNamespace); this.cpuSpeedup = cpuSpeedup; this.disabledNodeAgentTasks = NodeAgentTask.fromString( PermanentFlags.DISABLED_HOST_ADMIN_TASKS.bindTo(flagSource).with(FetchVector.Dimension.HOSTNAME, node.hostname()).value()); @@ -99,8 +102,8 @@ public class NodeAgentContextImpl implements NodeAgentContext { } @Override - public VespaUser vespaUser() { - return containerFs.getUserPrincipalLookupService().vespaUser(); + public UserNamespace userNamespace() { + return userNamespace; } @Override @@ -188,7 +191,6 @@ public class NodeAgentContextImpl implements NodeAgentContext { private ContainerNetworkMode containerNetworkMode; private ZoneApi zone; private UserNamespace userNamespace; - private VespaUser vespaUser; private Path containerStorage; private FlagSource flagSource; private double cpuSpeedUp = 1; @@ -228,12 +230,6 @@ public class NodeAgentContextImpl implements NodeAgentContext { return this; } - public Builder vespaUser(VespaUser vespaUser) { - this.vespaUser = vespaUser; - return this; - } - - /** Sets the file system to use for paths. */ public Builder fileSystem(FileSystem fileSystem) { return containerStorage(fileSystem.getPath(DEFAULT_CONTAINER_STORAGE.toString())); @@ -262,13 +258,6 @@ public class NodeAgentContextImpl implements NodeAgentContext { public NodeAgentContextImpl build() { Objects.requireNonNull(containerStorage, "Must set one of containerStorage or fileSystem"); - UserNamespace userNamespace = Optional.ofNullable(this.userNamespace) - .orElseGet(() -> new UserNamespace(100000, 100000)); - VespaUser vespaUser = Optional.ofNullable(this.vespaUser) - .orElseGet(() -> new VespaUser("vespa", "vespa", 1000, 100)); - ContainerFileSystem containerFs = ContainerFileSystem.create(containerStorage - .resolve(nodeSpecBuilder.hostname().split("\\.")[0]), userNamespace, vespaUser); - return new NodeAgentContextImpl( nodeSpecBuilder.build(), Optional.ofNullable(acl).orElse(Acl.EMPTY), @@ -296,8 +285,9 @@ public class NodeAgentContextImpl implements NodeAgentContext { } }), Optional.ofNullable(flagSource).orElseGet(InMemoryFlagSource::new), - containerFs, + containerStorage, "/opt/vespa", + Optional.ofNullable(userNamespace).orElseGet(() -> new UserNamespace(100000, 100000, "vespa", "vespa", 1000, 100)), cpuSpeedUp, hostExclusiveTo); } } diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/UserNamespace.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/UserNamespace.java index 005452411bd..1a25b5c3c5e 100644 --- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/UserNamespace.java +++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/UserNamespace.java @@ -1,6 +1,8 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.hosted.node.admin.nodeagent; +import java.util.Objects; + /** * @author valerijf */ @@ -14,12 +16,20 @@ public class UserNamespace { * https://github.com/torvalds/linux/blob/5bfc75d92efd494db37f5c4c173d3639d4772966/Documentation/admin-guide/sysctl/fs.rst#overflowgid--overflowuid */ private static final int OVERFLOW_ID = 65_534; - private volatile int uidOffset; - private volatile int gidOffset; + private final int uidOffset; + private final int gidOffset; + private final String vespaUser; + private final String vespaGroup; + private final int vespaUserId; + private final int vespaGroupId; - public UserNamespace(int uidOffset, int gidOffset) { + public UserNamespace(int uidOffset, int gidOffset, String vespaUser, String vespaGroup, int vespaUserId, int vespaGroupId) { this.uidOffset = uidOffset; this.gidOffset = gidOffset; + this.vespaUser = Objects.requireNonNull(vespaUser); + this.vespaGroup = Objects.requireNonNull(vespaGroup); + this.vespaUserId = vespaUserId; + this.vespaGroupId = vespaGroupId; } public int userIdOnHost(int containerUid) { return toHostId(containerUid, uidOffset); } @@ -27,15 +37,14 @@ public class UserNamespace { public int userIdInContainer(int hostUid) { return toContainerId(hostUid, uidOffset); } public int groupIdInContainer(int hostGid) { return toContainerId(hostGid, gidOffset); } + public String vespaUser() { return vespaUser; } + public String vespaGroup() { return vespaGroup; } + public int vespaUserId() { return vespaUserId; } + public int vespaGroupId() { return vespaGroupId; } + public int idRange() { return ID_RANGE; } public int overflowId() { return OVERFLOW_ID; } - // Remove after migration to mapped namespaces is complete, make fields final - public void setOffsets(int idOffset) { - this.uidOffset = idOffset; - this.gidOffset = idOffset; - } - private static int toHostId(int containerId, int idOffset) { if (containerId < 0 || containerId > ID_RANGE) throw new IllegalArgumentException("Invalid container id: " + containerId); diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/VespaUser.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/VespaUser.java deleted file mode 100644 index 78ccca80beb..00000000000 --- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/VespaUser.java +++ /dev/null @@ -1,29 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.hosted.node.admin.nodeagent; - -import java.util.Objects; - -/** - * Describes Vespa user inside the container user namespace. - * - * @author valerijf - */ -public class VespaUser { - - private final String name; - private final String group; - private final int uid; - private final int gid; - - public VespaUser(String name, String group, int uid, int gid) { - this.name = Objects.requireNonNull(name); - this.group = Objects.requireNonNull(group); - this.uid = uid; - this.gid = gid; - } - - public String name() { return name; } - public String group() { return group; } - public int uid() { return uid; } - public int gid() { return gid; } -} diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerFileSystem.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerFileSystem.java index cda806f533a..078a60ba7a5 100644 --- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerFileSystem.java +++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerFileSystem.java @@ -2,7 +2,6 @@ package com.yahoo.vespa.hosted.node.admin.task.util.fs; import com.yahoo.vespa.hosted.node.admin.nodeagent.UserNamespace; -import com.yahoo.vespa.hosted.node.admin.nodeagent.VespaUser; import java.io.IOException; import java.nio.file.FileStore; @@ -11,6 +10,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.PathMatcher; import java.nio.file.WatchService; +import java.nio.file.attribute.UserPrincipalLookupService; import java.util.Set; import static com.yahoo.yolean.Exceptions.uncheck; @@ -52,7 +52,7 @@ public class ContainerFileSystem extends FileSystem { } @Override - public ContainerUserPrincipalLookupService getUserPrincipalLookupService() { + public UserPrincipalLookupService getUserPrincipalLookupService() { return containerFsProvider.userPrincipalLookupService(); } @@ -86,8 +86,8 @@ public class ContainerFileSystem extends FileSystem { throw new UnsupportedOperationException(); } - public static ContainerFileSystem create(Path containerStorageRoot, UserNamespace userNamespace, VespaUser vespaUser) { + public static ContainerFileSystem create(Path containerStorageRoot, UserNamespace userNamespace) { uncheck(() -> Files.createDirectories(containerStorageRoot)); - return new ContainerFileSystemProvider(containerStorageRoot, userNamespace, vespaUser).getFileSystem(null); + return new ContainerFileSystemProvider(containerStorageRoot, userNamespace).getFileSystem(null); } } diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerFileSystemProvider.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerFileSystemProvider.java index bcc35426d25..909c6c9cbc1 100644 --- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerFileSystemProvider.java +++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerFileSystemProvider.java @@ -2,7 +2,6 @@ package com.yahoo.vespa.hosted.node.admin.task.util.fs; import com.yahoo.vespa.hosted.node.admin.nodeagent.UserNamespace; -import com.yahoo.vespa.hosted.node.admin.nodeagent.VespaUser; import java.io.IOException; import java.net.URI; @@ -46,10 +45,10 @@ class ContainerFileSystemProvider extends FileSystemProvider { private final ContainerUserPrincipalLookupService userPrincipalLookupService; private final Path containerRootOnHost; - ContainerFileSystemProvider(Path containerRootOnHost, UserNamespace userNamespace, VespaUser vespaUser) { + ContainerFileSystemProvider(Path containerRootOnHost, UserNamespace userNamespace) { this.containerFs = new ContainerFileSystem(this); this.userPrincipalLookupService = new ContainerUserPrincipalLookupService( - containerRootOnHost.getFileSystem().getUserPrincipalLookupService(), userNamespace, vespaUser); + containerRootOnHost.getFileSystem().getUserPrincipalLookupService(), userNamespace); this.containerRootOnHost = containerRootOnHost; } diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerUserPrincipalLookupService.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerUserPrincipalLookupService.java index 8e35bdccc23..ae65f6a7f7f 100644 --- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerUserPrincipalLookupService.java +++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerUserPrincipalLookupService.java @@ -2,7 +2,6 @@ package com.yahoo.vespa.hosted.node.admin.task.util.fs; import com.yahoo.vespa.hosted.node.admin.nodeagent.UserNamespace; -import com.yahoo.vespa.hosted.node.admin.nodeagent.VespaUser; import java.io.IOException; import java.nio.file.attribute.GroupPrincipal; @@ -14,22 +13,16 @@ import java.util.Objects; /** * @author valerijf */ -public class ContainerUserPrincipalLookupService extends UserPrincipalLookupService { +class ContainerUserPrincipalLookupService extends UserPrincipalLookupService { private final UserPrincipalLookupService baseFsUserPrincipalLookupService; private final UserNamespace userNamespace; - private final VespaUser vespaUser; - ContainerUserPrincipalLookupService( - UserPrincipalLookupService baseFsUserPrincipalLookupService, UserNamespace userNamespace, VespaUser vespaUser) { + ContainerUserPrincipalLookupService(UserPrincipalLookupService baseFsUserPrincipalLookupService, UserNamespace userNamespace) { this.baseFsUserPrincipalLookupService = Objects.requireNonNull(baseFsUserPrincipalLookupService); this.userNamespace = Objects.requireNonNull(userNamespace); - this.vespaUser = Objects.requireNonNull(vespaUser); } - public UserNamespace userNamespace() { return userNamespace; } - public VespaUser vespaUser() { return vespaUser; } - public int userIdOnHost(int containerUid) { return userNamespace.userIdOnHost(containerUid); } public int groupIdOnHost(int containerGid) { return userNamespace.groupIdOnHost(containerGid); } public int userIdInContainer(int hostUid) { return userNamespace.userIdInContainer(hostUid); } @@ -37,27 +30,27 @@ public class ContainerUserPrincipalLookupService extends UserPrincipalLookupServ @Override public ContainerUserPrincipal lookupPrincipalByName(String name) throws IOException { - int containerUid = resolveName(name, vespaUser.name(), vespaUser.uid()); - String user = resolveId(containerUid, vespaUser.name(), vespaUser.uid()); + int containerUid = resolveName(name, userNamespace.vespaUser(), userNamespace.vespaUserId()); + String user = resolveId(containerUid, userNamespace.vespaUser(), userNamespace.vespaUserId()); String hostUid = String.valueOf(userIdOnHost(containerUid)); return new ContainerUserPrincipal(containerUid, user, baseFsUserPrincipalLookupService.lookupPrincipalByName(hostUid)); } @Override public ContainerGroupPrincipal lookupPrincipalByGroupName(String group) throws IOException { - int containerGid = resolveName(group, vespaUser.group(), vespaUser.gid()); - String name = resolveId(containerGid, vespaUser.group(), vespaUser.gid()); + int containerGid = resolveName(group, userNamespace.vespaGroup(), userNamespace.vespaGroupId()); + String name = resolveId(containerGid, userNamespace.vespaGroup(), userNamespace.vespaGroupId()); String hostGid = String.valueOf(groupIdOnHost(containerGid)); return new ContainerGroupPrincipal(containerGid, name, baseFsUserPrincipalLookupService.lookupPrincipalByGroupName(hostGid)); } public ContainerUserPrincipal userPrincipal(int uid, UserPrincipal baseFsPrincipal) { - String name = resolveId(uid, vespaUser.name(), vespaUser.uid()); + String name = resolveId(uid, userNamespace.vespaUser(), userNamespace.vespaUserId()); return new ContainerUserPrincipal(uid, name, baseFsPrincipal); } public ContainerGroupPrincipal groupPrincipal(int gid, GroupPrincipal baseFsPrincipal) { - String name = resolveId(gid, vespaUser.group(), vespaUser.gid()); + String name = resolveId(gid, userNamespace.vespaGroup(), userNamespace.vespaGroupId()); return new ContainerGroupPrincipal(gid, name, baseFsPrincipal); } diff --git a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/integration/ContainerTester.java b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/integration/ContainerTester.java index 4a26195dd3a..08e335f188a 100644 --- a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/integration/ContainerTester.java +++ b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/integration/ContainerTester.java @@ -93,7 +93,7 @@ public class ContainerTester implements AutoCloseable { NodeAgentContextFactory nodeAgentContextFactory = (nodeSpec, acl) -> NodeAgentContextImpl.builder(nodeSpec).acl(acl).fileSystem(fileSystem).build(); nodeAdminStateUpdater = new NodeAdminStateUpdater(nodeAgentContextFactory, nodeRepository, orchestrator, - nodeAdmin, HOST_HOSTNAME); + nodeAdmin, HOST_HOSTNAME, clock, flagSource); loopThread = new Thread(() -> { nodeAdminStateUpdater.start(); diff --git a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/nodeadmin/NodeAdminStateUpdaterTest.java b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/nodeadmin/NodeAdminStateUpdaterTest.java index 5436f84f467..f9b0070a3d6 100644 --- a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/nodeadmin/NodeAdminStateUpdaterTest.java +++ b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/nodeadmin/NodeAdminStateUpdaterTest.java @@ -3,6 +3,8 @@ package com.yahoo.vespa.hosted.node.admin.nodeadmin; import com.yahoo.config.provision.HostName; import com.yahoo.config.provision.NodeType; +import com.yahoo.test.ManualClock; +import com.yahoo.vespa.flags.InMemoryFlagSource; import com.yahoo.vespa.hosted.node.admin.configserver.noderepository.Acl; import com.yahoo.vespa.hosted.node.admin.configserver.noderepository.NodeSpec; import com.yahoo.vespa.hosted.node.admin.configserver.noderepository.NodeState; @@ -48,9 +50,11 @@ public class NodeAdminStateUpdaterTest { private final Orchestrator orchestrator = mock(Orchestrator.class); private final NodeAdmin nodeAdmin = mock(NodeAdmin.class); private final HostName hostHostname = HostName.from("basehost1.test.yahoo.com"); + private final ManualClock clock = new ManualClock(); + private final InMemoryFlagSource flagSource = new InMemoryFlagSource(); private final NodeAdminStateUpdater updater = spy(new NodeAdminStateUpdater( - nodeAgentContextFactory, nodeRepository, orchestrator, nodeAdmin, hostHostname)); + nodeAgentContextFactory, nodeRepository, orchestrator, nodeAdmin, hostHostname, clock, flagSource)); @Test diff --git a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/nodeagent/UserNamespaceTest.java b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/nodeagent/UserNamespaceTest.java index bb02667a550..73b59a17c37 100644 --- a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/nodeagent/UserNamespaceTest.java +++ b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/nodeagent/UserNamespaceTest.java @@ -11,7 +11,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; */ class UserNamespaceTest { - private final UserNamespace userNamespace = new UserNamespace(1000, 2000); + private final UserNamespace userNamespace = new UserNamespace(1000, 2000, "vespa", "users", 1000, 100); @Test public void translates_between_ids() { diff --git a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerFileSystemTest.java b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerFileSystemTest.java index 242a2458f07..4e85052a176 100644 --- a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerFileSystemTest.java +++ b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerFileSystemTest.java @@ -2,7 +2,6 @@ package com.yahoo.vespa.hosted.node.admin.task.util.fs; import com.yahoo.vespa.hosted.node.admin.nodeagent.UserNamespace; -import com.yahoo.vespa.hosted.node.admin.nodeagent.VespaUser; import com.yahoo.vespa.hosted.node.admin.task.util.file.UnixPath; import com.yahoo.vespa.test.file.TestFileSystem; import org.junit.jupiter.api.Test; @@ -25,10 +24,8 @@ class ContainerFileSystemTest { private final FileSystem fileSystem = TestFileSystem.create(); private final UnixPath containerRootOnHost = new UnixPath(fileSystem.getPath("/data/storage/ctr1")); - private final UserNamespace userNamespace = new UserNamespace(10_000, 11_000); - private final VespaUser vespaUser = new VespaUser("vespa", "users", 1000, 100); - private final ContainerFileSystem containerFs = ContainerFileSystem.create( - containerRootOnHost.createDirectories().toPath(), userNamespace, vespaUser); + private final UserNamespace userNamespace = new UserNamespace(10_000, 11_000, "vespa", "users", 1000, 100); + private final ContainerFileSystem containerFs = ContainerFileSystem.create(containerRootOnHost.createDirectories().toPath(), userNamespace); @Test public void creates_files_and_directories_with_container_root_as_owner() throws IOException { diff --git a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerPathTest.java b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerPathTest.java index a2a036008bc..6bca8c2f0b1 100644 --- a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerPathTest.java +++ b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerPathTest.java @@ -2,7 +2,6 @@ package com.yahoo.vespa.hosted.node.admin.task.util.fs; import com.yahoo.vespa.hosted.node.admin.nodeagent.UserNamespace; -import com.yahoo.vespa.hosted.node.admin.nodeagent.VespaUser; import com.yahoo.vespa.test.file.TestFileSystem; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -28,7 +27,7 @@ import java.nio.file.Path; class ContainerPathTest { private final FileSystem baseFs = TestFileSystem.create(); - private final ContainerFileSystem containerFs = ContainerFileSystem.create(baseFs.getPath("/data/storage/ctr1"), mock(UserNamespace.class), mock(VespaUser.class)); + private final ContainerFileSystem containerFs = ContainerFileSystem.create(baseFs.getPath("/data/storage/ctr1"), mock(UserNamespace.class)); @Test public void create_new_container_path() { diff --git a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerUserPrincipalLookupServiceTest.java b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerUserPrincipalLookupServiceTest.java index 70c837e6fb2..bc26cfa73f3 100644 --- a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerUserPrincipalLookupServiceTest.java +++ b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerUserPrincipalLookupServiceTest.java @@ -2,7 +2,6 @@ package com.yahoo.vespa.hosted.node.admin.task.util.fs; import com.yahoo.vespa.hosted.node.admin.nodeagent.UserNamespace; -import com.yahoo.vespa.hosted.node.admin.nodeagent.VespaUser; import com.yahoo.vespa.test.file.TestFileSystem; import org.junit.jupiter.api.Test; @@ -19,10 +18,9 @@ import static org.junit.jupiter.api.Assertions.assertThrows; */ class ContainerUserPrincipalLookupServiceTest { - private final UserNamespace userNamespace = new UserNamespace(10_000, 11_000); - private final VespaUser vespaUser = new VespaUser("vespa", "users", 1000, 100); + private final UserNamespace userNamespace = new UserNamespace(10_000, 11_000, "vespa", "users", 1000, 100); private final ContainerUserPrincipalLookupService userPrincipalLookupService = - new ContainerUserPrincipalLookupService(TestFileSystem.create().getUserPrincipalLookupService(), userNamespace, vespaUser); + new ContainerUserPrincipalLookupService(TestFileSystem.create().getUserPrincipalLookupService(), userNamespace); @Test public void correctly_resolves_ids() throws IOException { diff --git a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp index 71ae26180ad..09c782cef07 100644 --- a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp +++ b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp @@ -15,7 +15,6 @@ #include <vespa/searchcore/proton/feedoperation/removeoperation.h> #include <vespa/searchcore/proton/server/blockable_maintenance_job.h> #include <vespa/searchcore/proton/server/executor_thread_service.h> -#include <vespa/searchcore/proton/server/i_document_scan_iterator.h> #include <vespa/searchcore/proton/server/i_operation_storer.h> #include <vespa/searchcore/proton/server/ibucketmodifiedhandler.h> #include <vespa/searchcore/proton/server/idocumentmovehandler.h> @@ -728,7 +727,7 @@ MyExecutor::isIdle() { (void) getStats(); sync(); - Stats stats(getStats()); + auto stats = getStats(); return stats.acceptedTasks == 0u; } diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h index 068d7bd033c..4eaa722e0ba 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h +++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h @@ -114,7 +114,7 @@ public: * * @return executor stats **/ - vespalib::ThreadStackExecutor::Stats getExecutorStats() { return _executor.getStats(); } + vespalib::ExecutorStats getExecutorStats() { return _executor.getStats(); } /** * Returns the underlying executor. Only used for state explorers. diff --git a/searchcore/src/vespa/searchcore/proton/matchengine/matchengine.h b/searchcore/src/vespa/searchcore/proton/matchengine/matchengine.h index 9364fc7b097..74a39a3ec78 100644 --- a/searchcore/src/vespa/searchcore/proton/matchengine/matchengine.h +++ b/searchcore/src/vespa/searchcore/proton/matchengine/matchengine.h @@ -62,7 +62,7 @@ public: * * @return executor stats **/ - vespalib::ThreadStackExecutor::Stats getExecutorStats() { return _executor.getStats(); } + vespalib::ExecutorStats getExecutorStats() { return _executor.getStats(); } /** * Returns the underlying executor. Only used for state explorers. diff --git a/searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.cpp b/searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.cpp index fa825024878..74e0971178c 100644 --- a/searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.cpp +++ b/searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.cpp @@ -5,7 +5,7 @@ namespace proton { void -ExecutorMetrics::update(const vespalib::ThreadStackExecutorBase::Stats &stats) +ExecutorMetrics::update(const vespalib::ExecutorStats &stats) { maxPending.set(stats.queueSize.max()); accepted.inc(stats.acceptedTasks); diff --git a/searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.h b/searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.h index 5f7dfdf45b0..273c4ed8979 100644 --- a/searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.h +++ b/searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.h @@ -5,7 +5,7 @@ #include <vespa/metrics/metricset.h> #include <vespa/metrics/countmetric.h> #include <vespa/metrics/valuemetric.h> -#include <vespa/vespalib/util/threadstackexecutorbase.h> +#include <vespa/vespalib/util/executor_stats.h> namespace proton { @@ -16,7 +16,7 @@ struct ExecutorMetrics : metrics::MetricSet metrics::LongCountMetric rejected; metrics::LongAverageMetric queueSize; - void update(const vespalib::ThreadStackExecutorBase::Stats &stats); + void update(const vespalib::ExecutorStats &stats); ExecutorMetrics(const std::string &name, metrics::MetricSet *parent); ~ExecutorMetrics(); }; diff --git a/searchcore/src/vespa/searchcore/proton/metrics/executor_threading_service_stats.h b/searchcore/src/vespa/searchcore/proton/metrics/executor_threading_service_stats.h index 0a113207f65..e2c53af11b5 100644 --- a/searchcore/src/vespa/searchcore/proton/metrics/executor_threading_service_stats.h +++ b/searchcore/src/vespa/searchcore/proton/metrics/executor_threading_service_stats.h @@ -11,10 +11,8 @@ namespace proton { * document db. */ class ExecutorThreadingServiceStats { -public: - using Stats = vespalib::ExecutorStats; - private: + using Stats = vespalib::ExecutorStats; Stats _masterExecutorStats; Stats _indexExecutorStats; Stats _summaryExecutorStats; diff --git a/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.cpp b/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.cpp index 3b4f12b7c85..684132b34e7 100644 --- a/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.cpp @@ -73,7 +73,7 @@ ExecutorThreadService::isCurrentThread() const return FastOS_Thread::CompareThreadIds(_threadId->_id, currentThreadId); } -vespalib::ThreadExecutor::Stats ExecutorThreadService::getStats() { +vespalib::ExecutorStats ExecutorThreadService::getStats() { return _executor.getStats(); } diff --git a/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.h b/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.h index 8adb80889e7..44a330ca696 100644 --- a/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.h +++ b/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.h @@ -21,7 +21,7 @@ public: ExecutorThreadService(vespalib::SyncableThreadExecutor &executor); ~ExecutorThreadService(); - Stats getStats() override; + vespalib::ExecutorStats getStats() override; vespalib::Executor::Task::UP execute(vespalib::Executor::Task::UP task) override { return _executor.execute(std::move(task)); diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp index 17c255d506a..edf68633124 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp @@ -737,8 +737,7 @@ Proton::prepareRestart() namespace { void -updateExecutorMetrics(ExecutorMetrics &metrics, - const vespalib::ThreadStackExecutor::Stats &stats) +updateExecutorMetrics(ExecutorMetrics &metrics, const vespalib::ExecutorStats &stats) { metrics.update(stats); } diff --git a/searchcore/src/vespa/searchcore/proton/summaryengine/summaryengine.h b/searchcore/src/vespa/searchcore/proton/summaryengine/summaryengine.h index a439306b69b..34eebdc839d 100644 --- a/searchcore/src/vespa/searchcore/proton/summaryengine/summaryengine.h +++ b/searchcore/src/vespa/searchcore/proton/summaryengine/summaryengine.h @@ -66,7 +66,7 @@ public: * * @return executor stats **/ - vespalib::ThreadStackExecutor::Stats getExecutorStats() { return _executor.getStats(); } + vespalib::ExecutorStats getExecutorStats() { return _executor.getStats(); } /** * Returns the underlying executor. Only used for state explorers. diff --git a/searchcore/src/vespa/searchcore/proton/test/thread_service_observer.h b/searchcore/src/vespa/searchcore/proton/test/thread_service_observer.h index f2e1e64eeb3..26a92841999 100644 --- a/searchcore/src/vespa/searchcore/proton/test/thread_service_observer.h +++ b/searchcore/src/vespa/searchcore/proton/test/thread_service_observer.h @@ -40,7 +40,7 @@ public: } size_t getNumThreads() const override { return _service.getNumThreads(); } - Stats getStats() override { + vespalib::ExecutorStats getStats() override { return _service.getStats(); } diff --git a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp index e4b64b19739..869ff0456e1 100644 --- a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp @@ -324,12 +324,12 @@ AdaptiveSequencedExecutor::setTaskLimit(uint32_t task_limit) } } -AdaptiveSequencedExecutor::Stats +ExecutorStats AdaptiveSequencedExecutor::getStats() { auto guard = std::lock_guard(_mutex); - Stats stats = _stats; - _stats = Stats(); + ExecutorStats stats = _stats; + _stats = ExecutorStats(); _stats.queueSize.add(_self.pending_tasks); return stats; } diff --git a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h index 4e0388caf8a..fdcdf35fbbb 100644 --- a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h +++ b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h @@ -22,8 +22,7 @@ namespace vespalib { class AdaptiveSequencedExecutor : public ISequencedTaskExecutor { private: - using Stats = vespalib::ExecutorStats; - using Task = vespalib::Executor::Task; + using Task = Executor::Task; struct TaggedTask { Task::UP task; @@ -128,7 +127,7 @@ private: vespalib::ArrayQueue<Worker*> _worker_stack; EventBarrier<BarrierCompletion> _barrier; Self _self; - Stats _stats; + ExecutorStats _stats; Config _cfg; void maybe_block_self(std::unique_lock<std::mutex> &lock); @@ -147,7 +146,7 @@ public: void executeTask(ExecutorId id, Task::UP task) override; void sync() override; void setTaskLimit(uint32_t task_limit) override; - vespalib::ExecutorStats getStats() override; + ExecutorStats getStats() override; Config get_config() const; }; diff --git a/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h b/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h index f3dbd512047..3bd5ca3d49a 100644 --- a/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h +++ b/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h @@ -22,7 +22,7 @@ public: return Task::UP(); } size_t getNumThreads() const override { return 0; } - Stats getStats() override { + ExecutorStats getStats() override { return ExecutorStats(ExecutorStats::QueueSizeT(), _accepted.load(std::memory_order_relaxed), 0); } void setTaskLimit(uint32_t taskLimit) override { (void) taskLimit; } diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp index adb4356c4c8..727894397a7 100644 --- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp @@ -99,10 +99,10 @@ SequencedTaskExecutor::wakeup() { } } -SequencedTaskExecutor::Stats +ExecutorStats SequencedTaskExecutor::getStats() { - Stats accumulatedStats; + ExecutorStats accumulatedStats; for (auto &executor :* _executors) { accumulatedStats += executor->getStats(); } diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h index 38844535021..7b49f7aac75 100644 --- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h +++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h @@ -16,7 +16,6 @@ class SyncableThreadExecutor; class SequencedTaskExecutor final : public ISequencedTaskExecutor { public: - using Stats = vespalib::ExecutorStats; using ISequencedTaskExecutor::getExecutorId; using OptimizeFor = vespalib::Executor::OptimizeFor; @@ -26,7 +25,7 @@ public: void executeTask(ExecutorId id, vespalib::Executor::Task::UP task) override; ExecutorId getExecutorId(uint64_t componentId) const override; void sync() override; - Stats getStats() override; + ExecutorStats getStats() override; void wakeup() override; /* diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp index 7fba68e2092..803ec4f3f7c 100644 --- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp @@ -158,13 +158,13 @@ SingleExecutor::wait_for_room(Lock & lock) { } } -ThreadExecutor::Stats +ExecutorStats SingleExecutor::getStats() { Lock lock(_mutex); uint64_t accepted = _wp.load(std::memory_order_relaxed); - Stats stats(_queueSize, (accepted - _lastAccepted), 0); + ExecutorStats stats(_queueSize, (accepted - _lastAccepted), 0); _lastAccepted = accepted; - _queueSize = Stats::QueueSizeT() ; + _queueSize = ExecutorStats::QueueSizeT() ; return stats; } diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h index 20e5a560e52..8e9c1ae3fa1 100644 --- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h +++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h @@ -29,7 +29,7 @@ public: uint32_t getTaskLimit() const override { return _taskLimit.load(std::memory_order_relaxed); } uint32_t get_watermark() const { return _watermark; } duration get_reaction_time() const { return _reactionTime; } - Stats getStats() override; + ExecutorStats getStats() override; SingleExecutor & shutdown() override; private: using Lock = std::unique_lock<std::mutex>; @@ -55,7 +55,7 @@ private: std::condition_variable _producerCondition; vespalib::Thread _thread; uint64_t _lastAccepted; - Stats::QueueSizeT _queueSize; + ExecutorStats::QueueSizeT _queueSize; std::atomic<uint64_t> _wakeupConsumerAt; std::atomic<uint64_t> _producerNeedWakeupAt; std::atomic<uint64_t> _wp; diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp index e3b63c1bdf9..60030004594 100644 --- a/storage/src/tests/persistence/mergehandlertest.cpp +++ b/storage/src/tests/persistence/mergehandlertest.cpp @@ -1275,8 +1275,8 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply) EXPECT_EQ(1, cmd2.getAddress()->getIndex()); EXPECT_EQ(1234, cmd2.getSourceIndex()); EXPECT_TRUE(getEnv()._fileStorHandler.isMerging(_bucket)); - auto &s = getEnv()._fileStorHandler.editMergeStatus(_bucket); - EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}, {4, true}}), s.nodeList); + auto s = getEnv()._fileStorHandler.editMergeStatus(_bucket); + EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}, {4, true}}), s->nodeList); baseline_diff_size = cmd2.getDiff().size(); auto reply = std::make_unique<api::GetBucketDiffReply>(cmd2); auto &diff = reply->getDiff(); @@ -1292,12 +1292,12 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply) { LOG(debug, "checking first ApplyBucketDiff command"); EXPECT_TRUE(getEnv()._fileStorHandler.isMerging(_bucket)); - auto &s = getEnv()._fileStorHandler.editMergeStatus(_bucket); + auto s = getEnv()._fileStorHandler.editMergeStatus(_bucket); // Node 4 has been eliminated before the first ApplyBucketDiff command - EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}}), s.nodeList); - EXPECT_EQ(baseline_diff_size + 2u, s.diff.size()); - EXPECT_EQ(EntryCheck(20000, 24u), s.diff[baseline_diff_size]); - EXPECT_EQ(EntryCheck(20100, 24u), s.diff[baseline_diff_size + 1]); + EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}}), s->nodeList); + EXPECT_EQ(baseline_diff_size + 2u, s->diff.size()); + EXPECT_EQ(EntryCheck(20000, 24u), s->diff[baseline_diff_size]); + EXPECT_EQ(EntryCheck(20100, 24u), s->diff[baseline_diff_size + 1]); auto& cmd3 = dynamic_cast<api::ApplyBucketDiffCommand&>(*messageKeeper()._msgs[1]); // ApplyBucketDiffCommand has a shorter node list, node 2 is not present EXPECT_EQ((NodeList{{0, false}, {1, false}, {3, true}}), cmd3.getNodes()); @@ -1320,10 +1320,10 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply) { LOG(debug, "checking second ApplyBucketDiff command"); EXPECT_TRUE(getEnv()._fileStorHandler.isMerging(_bucket)); - auto &s = getEnv()._fileStorHandler.editMergeStatus(_bucket); - EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}}), s.nodeList); - EXPECT_EQ(baseline_diff_size + 1u, s.diff.size()); - EXPECT_EQ(EntryCheck(20100, 24u), s.diff[baseline_diff_size]); + auto s = getEnv()._fileStorHandler.editMergeStatus(_bucket); + EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}}), s->nodeList); + EXPECT_EQ(baseline_diff_size + 1u, s->diff.size()); + EXPECT_EQ(EntryCheck(20100, 24u), s->diff[baseline_diff_size]); auto& cmd4 = dynamic_cast<api::ApplyBucketDiffCommand&>(*messageKeeper()._msgs[2]); EXPECT_EQ((NodeList{{0, false}, {1, false}, {3, true}}), cmd4.getNodes()); auto reply = std::make_shared<api::ApplyBucketDiffReply>(cmd4); @@ -1340,11 +1340,11 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply) { LOG(debug, "checking third ApplyBucketDiff command"); EXPECT_TRUE(getEnv()._fileStorHandler.isMerging(_bucket)); - auto &s = getEnv()._fileStorHandler.editMergeStatus(_bucket); + auto s = getEnv()._fileStorHandler.editMergeStatus(_bucket); // Nodes 3 and 2 have been eliminated before the third ApplyBucketDiff command - EXPECT_EQ((NodeList{{0, false}, {1, false}}), s.nodeList); - EXPECT_EQ(baseline_diff_size + 1u, s.diff.size()); - EXPECT_EQ(EntryCheck(20100, 16u), s.diff[baseline_diff_size]); + EXPECT_EQ((NodeList{{0, false}, {1, false}}), s->nodeList); + EXPECT_EQ(baseline_diff_size + 1u, s->diff.size()); + EXPECT_EQ(EntryCheck(20100, 16u), s->diff[baseline_diff_size]); auto& cmd5 = dynamic_cast<api::ApplyBucketDiffCommand&>(*messageKeeper()._msgs[3]); EXPECT_EQ((NodeList{{0, false}, {1, false}}), cmd5.getNodes()); auto reply = std::make_shared<api::ApplyBucketDiffReply>(cmd5); @@ -1362,11 +1362,11 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply) { LOG(debug, "checking fourth ApplyBucketDiff command"); EXPECT_TRUE(getEnv()._fileStorHandler.isMerging(_bucket)); - auto &s = getEnv()._fileStorHandler.editMergeStatus(_bucket); + auto s = getEnv()._fileStorHandler.editMergeStatus(_bucket); // All nodes in use again due to failure to fill diff entry for doc2 - EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}, {4, true}}), s.nodeList); - EXPECT_EQ(1u, s.diff.size()); - EXPECT_EQ(EntryCheck(20100, 16u), s.diff[0]); + EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}, {4, true}}), s->nodeList); + EXPECT_EQ(1u, s->diff.size()); + EXPECT_EQ(EntryCheck(20100, 16u), s->diff[0]); auto& cmd6 = dynamic_cast<api::ApplyBucketDiffCommand&>(*messageKeeper()._msgs[4]); EXPECT_EQ((NodeList{{0, false}, {1, false}, {4, true}}), cmd6.getNodes()); auto reply = std::make_shared<api::ApplyBucketDiffReply>(cmd6); diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp index 568eee1e92c..5c192942521 100644 --- a/storage/src/tests/storageserver/mergethrottlertest.cpp +++ b/storage/src/tests/storageserver/mergethrottlertest.cpp @@ -1151,6 +1151,10 @@ TEST_F(MergeThrottlerTest, busy_returned_on_full_queue_for_merges_sent_from_dist size_t maxQueue = _throttlers[0]->getMaxQueueSize(); ASSERT_EQ(20, maxQueue); ASSERT_LT(maxPending, 100); + + EXPECT_EQ(_throttlers[0]->getMetrics().active_window_size.getLast(), 0); + EXPECT_EQ(_throttlers[0]->getMetrics().queueSize.getLast(), 0); + for (size_t i = 0; i < maxPending + maxQueue; ++i) { std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}}); // No chain set, i.e. merge command is freshly squeezed from a distributor. @@ -1162,6 +1166,7 @@ TEST_F(MergeThrottlerTest, busy_returned_on_full_queue_for_merges_sent_from_dist // Wait till we have maxPending replies and maxQueue queued _topLinks[0]->waitForMessages(maxPending, _messageWaitTime); waitUntilMergeQueueIs(*_throttlers[0], maxQueue, _messageWaitTime); + EXPECT_EQ(_throttlers[0]->getMetrics().active_window_size.getLast(), maxPending); EXPECT_EQ(maxQueue, _throttlers[0]->getMetrics().queueSize.getMaximum()); // Clear all forwarded merges diff --git a/storage/src/vespa/storage/distributor/idealstatemetricsset.cpp b/storage/src/vespa/storage/distributor/idealstatemetricsset.cpp index d3096e6864e..618e49c4238 100644 --- a/storage/src/vespa/storage/distributor/idealstatemetricsset.cpp +++ b/storage/src/vespa/storage/distributor/idealstatemetricsset.cpp @@ -39,13 +39,13 @@ MergeBucketMetricSet::MergeBucketMetricSet(const std::string& name, metrics::Met : OperationMetricSet(name, std::move(tags), description, owner), source_only_copy_changed("source_only_copy_changed", {{"logdefault"},{"yamasdefault"}}, - "The number of merge operations where source-only copy changed"), + "The number of merge operations where source-only copy changed", this), source_only_copy_delete_blocked("source_only_copy_delete_blocked", {{"logdefault"},{"yamasdefault"}}, - "The number of merge operations where delete of unchanged source-only copies was blocked"), + "The number of merge operations where delete of unchanged source-only copies was blocked", this), source_only_copy_delete_failed("source_only_copy_delete_failed", {{"logdefault"},{"yamasdefault"}}, - "The number of merge operations where delete of unchanged source-only copies failed") + "The number of merge operations where delete of unchanged source-only copies failed", this) { } diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h index 0dcb8539bff..70ed9845cb0 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h @@ -200,13 +200,13 @@ public: virtual void addMergeStatus(const document::Bucket&, std::shared_ptr<MergeStatus>) = 0; /** - * Returns the reference to the current merge status for the given bucket. + * Returns a shared pointer to the current merge status for the given bucket. * This allows unlocked access to an internal variable, so users should * first check that noone else is using it by calling isMerging() first. * * @param bucket The bucket to start merging. */ - virtual MergeStatus& editMergeStatus(const document::Bucket& bucket) = 0; + virtual std::shared_ptr<MergeStatus> editMergeStatus(const document::Bucket& bucket) = 0; /** * Returns true if the bucket is currently being merged on this node. diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp index 62f9fa12a21..e395a7df9e0 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp @@ -86,7 +86,7 @@ FileStorHandlerImpl::addMergeStatus(const document::Bucket& bucket, std::shared_ _mergeStates[bucket] = status; } -MergeStatus& +std::shared_ptr<MergeStatus> FileStorHandlerImpl::editMergeStatus(const document::Bucket& bucket) { std::lock_guard mlock(_mergeStatesLock); @@ -94,7 +94,7 @@ FileStorHandlerImpl::editMergeStatus(const document::Bucket& bucket) if ( ! status ) { throw vespalib::IllegalStateException("No merge state exist for " + bucket.toString(), VESPA_STRLOC); } - return *status; + return status; } bool diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h index a3dc316cdde..5f212b18a7f 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h @@ -218,7 +218,7 @@ public: } void addMergeStatus(const document::Bucket&, std::shared_ptr<MergeStatus>) override; - MergeStatus& editMergeStatus(const document::Bucket&) override; + std::shared_ptr<MergeStatus> editMergeStatus(const document::Bucket&) override; bool isMerging(const document::Bucket&) const override; void clearMergeStatus(const document::Bucket& bucket) override; void clearMergeStatus(const document::Bucket& bucket, const api::ReturnCode& code) override; diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp index bc7d1d7d282..963fddd9fb5 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.cpp +++ b/storage/src/vespa/storage/persistence/mergehandler.cpp @@ -1138,49 +1138,49 @@ MergeHandler::handleGetBucketDiffReply(api::GetBucketDiffReply& reply, MessageSe return; } - MergeStatus& s = _env._fileStorHandler.editMergeStatus(bucket.getBucket()); - if (s.pendingId != reply.getMsgId()) { + auto s = _env._fileStorHandler.editMergeStatus(bucket.getBucket()); + if (s->pendingId != reply.getMsgId()) { LOG(warning, "Got GetBucketDiffReply for %s which had message " "id %" PRIu64 " when we expected %" PRIu64 ". Ignoring reply.", - bucket.toString().c_str(), reply.getMsgId(), s.pendingId); + bucket.toString().c_str(), reply.getMsgId(), s->pendingId); return; } api::StorageReply::SP replyToSend; bool clearState = true; try { - if (s.isFirstNode()) { + if (s->isFirstNode()) { if (reply.getResult().failed()) { // We failed, so we should reply to the pending message. - replyToSend = s.reply; + replyToSend = s->reply; } else { // If we didn't fail, reply should have good content // Sanity check for nodes assert(reply.getNodes().size() >= 2); // Get bucket diff should retrieve all info at once - assert(s.diff.size() == 0); - s.diff.insert(s.diff.end(), + assert(s->diff.size() == 0); + s->diff.insert(s->diff.end(), reply.getDiff().begin(), reply.getDiff().end()); - replyToSend = processBucketMerge(bucket, s, sender, s.context); + replyToSend = processBucketMerge(bucket, *s, sender, s->context); if (!replyToSend.get()) { // We have sent something on, and shouldn't reply now. clearState = false; } else { _env._metrics.merge_handler_metrics.mergeLatencyTotal.addValue( - s.startTime.getElapsedTimeAsDouble()); + s->startTime.getElapsedTimeAsDouble()); } } } else { // Exists in send on list, send on! - replyToSend = s.pendingGetDiff; + replyToSend = s->pendingGetDiff; LOG(spam, "Received GetBucketDiffReply for %s with diff of " "size %zu. Sending it on.", bucket.toString().c_str(), reply.getDiff().size()); - s.pendingGetDiff->getDiff().swap(reply.getDiff()); + s->pendingGetDiff->getDiff().swap(reply.getDiff()); } } catch (std::exception& e) { _env._fileStorHandler.clearMergeStatus( @@ -1299,11 +1299,11 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,Messag return; } - MergeStatus& s = _env._fileStorHandler.editMergeStatus(bucket.getBucket()); - if (s.pendingId != reply.getMsgId()) { + auto s = _env._fileStorHandler.editMergeStatus(bucket.getBucket()); + if (s->pendingId != reply.getMsgId()) { LOG(warning, "Got ApplyBucketDiffReply for %s which had message " "id %" PRIu64 " when we expected %" PRIu64 ". Ignoring reply.", - bucket.toString().c_str(), reply.getMsgId(), s.pendingId); + bucket.toString().c_str(), reply.getMsgId(), s->pendingId); return; } bool clearState = true; @@ -1318,12 +1318,12 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,Messag uint8_t index = findOwnIndex(reply.getNodes(), _env._nodeIndex); if (applyDiffNeedLocalData(diff, index, false)) { framework::MilliSecTimer startTime(_clock); - fetchLocalData(bucket, diff, index, s.context); + fetchLocalData(bucket, diff, index, s->context); _env._metrics.merge_handler_metrics.mergeDataReadLatency.addValue(startTime.getElapsedTimeAsDouble()); } if (applyDiffHasLocallyNeededData(diff, index)) { framework::MilliSecTimer startTime(_clock); - applyDiffLocally(bucket, diff, index, s.context, async_results); + applyDiffLocally(bucket, diff, index, s->context, async_results); _env._metrics.merge_handler_metrics.mergeDataWriteLatency.addValue(startTime.getElapsedTimeAsDouble()); async_results.check(); async_results.sync_bucket_info(); @@ -1335,50 +1335,50 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,Messag } } - if (s.isFirstNode()) { + if (s->isFirstNode()) { uint16_t hasMask = 0; for (uint16_t i=0; i<reply.getNodes().size(); ++i) { hasMask |= (1 << i); } - const size_t diffSizeBefore = s.diff.size(); - const bool altered = s.removeFromDiff(diff, hasMask, reply.getNodes()); + const size_t diffSizeBefore = s->diff.size(); + const bool altered = s->removeFromDiff(diff, hasMask, reply.getNodes()); if (reply.getResult().success() - && s.diff.size() == diffSizeBefore + && s->diff.size() == diffSizeBefore && !altered) { std::string msg( vespalib::make_string( "Completed merge cycle without fixing " "any entries (merge state diff at %zu entries)", - s.diff.size())); + s->diff.size())); returnCode = api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE, msg); LOG(warning, "Got reply indicating merge cycle did not fix any entries: %s", reply.toString(true).c_str()); LOG(warning, "Merge state for which there was no progress across a full merge cycle: %s", - s.toString().c_str()); + s->toString().c_str()); } if (returnCode.failed()) { // Should reply now, since we failed. - replyToSend = s.reply; + replyToSend = s->reply; } else { - replyToSend = processBucketMerge(bucket, s, sender, s.context); + replyToSend = processBucketMerge(bucket, *s, sender, s->context); if (!replyToSend.get()) { // We have sent something on and shouldn't reply now. clearState = false; } else { - _env._metrics.merge_handler_metrics.mergeLatencyTotal.addValue(s.startTime.getElapsedTimeAsDouble()); + _env._metrics.merge_handler_metrics.mergeLatencyTotal.addValue(s->startTime.getElapsedTimeAsDouble()); } } } else { - replyToSend = s.pendingApplyDiff; + replyToSend = s->pendingApplyDiff; LOG(debug, "ApplyBucketDiff(%s) finished. Sending reply.", bucket.toString().c_str()); - s.pendingApplyDiff->getDiff().swap(reply.getDiff()); + s->pendingApplyDiff->getDiff().swap(reply.getDiff()); } } catch (std::exception& e) { _env._fileStorHandler.clearMergeStatus( diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.cpp b/storage/src/vespa/storage/storageserver/mergethrottler.cpp index fa9ab22c1cb..a17c77f6ca4 100644 --- a/storage/src/vespa/storage/storageserver/mergethrottler.cpp +++ b/storage/src/vespa/storage/storageserver/mergethrottler.cpp @@ -72,6 +72,7 @@ MergeThrottler::Metrics::Metrics(metrics::MetricSet* owner) : metrics::MetricSet("mergethrottler", {}, "", owner), averageQueueWaitingTime("averagequeuewaitingtime", {}, "Average time a merge spends in the throttler queue", this), queueSize("queuesize", {}, "Length of merge queue", this), + active_window_size("active_window_size", {}, "Number of merges active within the pending window size", this), bounced_due_to_back_pressure("bounced_due_to_back_pressure", {}, "Number of merges bounced due to resource exhaustion back-pressure", this), chaining("mergechains", this), local("locallyexecutedmerges", this) @@ -366,6 +367,7 @@ MergeThrottler::removeActiveMerge(ActiveMergeMap::iterator mergeIter) LOG(debug, "Removed merge for %s from internal state", mergeIter->first.toString().c_str()); _merges.erase(mergeIter); + update_active_merge_window_size_metric(); } api::StorageMessage::SP @@ -815,6 +817,7 @@ MergeThrottler::processNewMergeCommand( // merge throttling window. assert(_merges.find(mergeCmd.getBucket()) == _merges.end()); auto state = _merges.emplace(mergeCmd.getBucket(), ChainedMergeState(msg)).first; + update_active_merge_window_size_metric(); LOG(debug, "Added merge %s to internal state", mergeCmd.toString().c_str()); @@ -1247,6 +1250,11 @@ MergeThrottler::set_disable_queue_limits_for_chained_merges(bool disable_limits) } void +MergeThrottler::update_active_merge_window_size_metric() noexcept { + _metrics->active_window_size.set(static_cast<int64_t>(_merges.size())); +} + +void MergeThrottler::print(std::ostream& out, bool /*verbose*/, const std::string& /*indent*/) const { diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.h b/storage/src/vespa/storage/storageserver/mergethrottler.h index 9b0fb125b2f..997477a4b70 100644 --- a/storage/src/vespa/storage/storageserver/mergethrottler.h +++ b/storage/src/vespa/storage/storageserver/mergethrottler.h @@ -64,6 +64,7 @@ public: public: metrics::DoubleAverageMetric averageQueueWaitingTime; metrics::LongValueMetric queueSize; + metrics::LongValueMetric active_window_size; metrics::LongCountMetric bounced_due_to_back_pressure; MergeOperationMetrics chaining; MergeOperationMetrics local; @@ -388,6 +389,8 @@ private: void rejectOperationsInThreadQueue(MessageGuard&, uint32_t minimumStateVersion); void markActiveMergesAsAborted(uint32_t minimumStateVersion); + void update_active_merge_window_size_metric() noexcept; + // const function, but metrics are mutable void updateOperationMetrics( const api::ReturnCode& result, diff --git a/vespalib/src/tests/executor/threadstackexecutor_test.cpp b/vespalib/src/tests/executor/threadstackexecutor_test.cpp index 63c6856afd2..b55f54f9339 100644 --- a/vespalib/src/tests/executor/threadstackexecutor_test.cpp +++ b/vespalib/src/tests/executor/threadstackexecutor_test.cpp @@ -33,17 +33,18 @@ std::atomic<uint32_t> MyTask::runCnt(0); std::atomic<uint32_t> MyTask::deleteCnt(0); struct MyState { + static constexpr uint32_t NUM_THREADS = 10; Gate gate; // to block workers CountDownLatch latch; // to wait for workers ThreadStackExecutor executor; bool checked; - MyState() : gate(), latch(10), executor(10, 128000, 20), checked(false) + MyState() : gate(), latch(10), executor(NUM_THREADS, 128000, 20), checked(false) { MyTask::resetStats(); } MyState &execute(uint32_t cnt) { for (uint32_t i = 0; i < cnt; ++i) { - executor.execute(Task::UP(new MyTask(gate, latch))); + executor.execute(std::make_unique<MyTask>(gate, latch)); } return *this; } @@ -70,7 +71,7 @@ struct MyState { { ASSERT_TRUE(!checked); checked = true; - ThreadStackExecutor::Stats stats = executor.getStats(); + ExecutorStats stats = executor.getStats(); EXPECT_EQUAL(expect_running + expect_deleted, MyTask::runCnt); EXPECT_EQUAL(expect_rejected + expect_deleted, MyTask::deleteCnt); EXPECT_EQUAL(expect_queue + expect_running + expect_deleted, @@ -187,11 +188,11 @@ TEST_F("require that executor thread stack tag can be set", ThreadStackExecutor( } TEST("require that stats can be accumulated") { - ThreadStackExecutor::Stats stats(ThreadExecutor::Stats::QueueSizeT(1) ,2,3); + ExecutorStats stats(ExecutorStats::QueueSizeT(1) ,2,3); EXPECT_EQUAL(1u, stats.queueSize.max()); EXPECT_EQUAL(2u, stats.acceptedTasks); EXPECT_EQUAL(3u, stats.rejectedTasks); - stats += ThreadStackExecutor::Stats(ThreadExecutor::Stats::QueueSizeT(7),8,9); + stats += ExecutorStats(ExecutorStats::QueueSizeT(7),8,9); EXPECT_EQUAL(2u, stats.queueSize.count()); EXPECT_EQUAL(8u, stats.queueSize.total()); EXPECT_EQUAL(8u, stats.queueSize.max()); diff --git a/vespalib/src/vespa/vespalib/util/executor_stats.h b/vespalib/src/vespa/vespalib/util/executor_stats.h index 49d83c96714..f1f58685570 100644 --- a/vespalib/src/vespa/vespalib/util/executor_stats.h +++ b/vespalib/src/vespa/vespalib/util/executor_stats.h @@ -3,6 +3,7 @@ #pragma once #include <limits> +#include <cstdint> namespace vespalib { @@ -15,10 +16,10 @@ public: AggregatedAverage() : AggregatedAverage(0ul, T(0), std::numeric_limits<T>::max(), std::numeric_limits<T>::min()) { } explicit AggregatedAverage(T value) : AggregatedAverage(1, value, value, value) { } AggregatedAverage(size_t count_in, T total_in, T min_in, T max_in) - : _count(count_in), - _total(total_in), - _min(min_in), - _max(max_in) + : _count(count_in), + _total(total_in), + _min(min_in), + _max(max_in) { } AggregatedAverage & operator += (const AggregatedAverage & rhs) { add(rhs); diff --git a/vespalib/src/vespa/vespalib/util/threadexecutor.h b/vespalib/src/vespa/vespalib/util/threadexecutor.h index 9ab7aacfc4b..36c72fa4bb0 100644 --- a/vespalib/src/vespa/vespalib/util/threadexecutor.h +++ b/vespalib/src/vespa/vespalib/util/threadexecutor.h @@ -12,11 +12,6 @@ class ThreadExecutor : public Executor { public: /** - * Internal stats that we want to observe externally. Note that - * all stats are reset each time they are observed. - **/ - using Stats = ExecutorStats; - /** * Get number of threads in the executor pool. * @return number of threads in the pool */ @@ -26,7 +21,7 @@ public: * Observe and reset stats for this object. * @return stats **/ - virtual Stats getStats() = 0; + virtual ExecutorStats getStats() = 0; /** * Sets a new upper limit for accepted number of tasks. diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp index 32e47f366cc..f80a5b4ce32 100644 --- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp +++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp @@ -203,12 +203,12 @@ ThreadStackExecutorBase::num_idle_workers() const return _workers.size(); } -ThreadStackExecutorBase::Stats +ExecutorStats ThreadStackExecutorBase::getStats() { std::unique_lock guard(_lock); - Stats stats = _stats; - _stats = Stats(); + ExecutorStats stats = _stats; + _stats = ExecutorStats(); _stats.queueSize.add(_taskCount); return stats; } diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h index 59ed385b4f4..66a34bfde95 100644 --- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h +++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h @@ -80,7 +80,7 @@ private: std::unique_ptr<FastOS_ThreadPool> _pool; mutable std::mutex _lock; std::condition_variable _cond; - Stats _stats; + ExecutorStats _stats; Gate _executorCompletion; ArrayQueue<TaggedTask> _tasks; ArrayQueue<Worker*> _workers; @@ -188,7 +188,7 @@ public: **/ size_t num_idle_workers() const; - Stats getStats() override; + ExecutorStats getStats() override; Task::UP execute(Task::UP task) override; |