diff options
101 files changed, 1138 insertions, 611 deletions
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/builder/xml/dom/DomAdminV4Builder.java b/config-model/src/main/java/com/yahoo/vespa/model/builder/xml/dom/DomAdminV4Builder.java index 12d5e8d32ed..a28475c94f3 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/builder/xml/dom/DomAdminV4Builder.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/builder/xml/dom/DomAdminV4Builder.java @@ -118,7 +118,9 @@ public class DomAdminV4Builder extends DomAdminBuilderBase { return nodesSpecification.provision(hostSystem, ClusterSpec.Type.admin, ClusterSpec.Id.from(clusterId), - context.getDeployLogger()).keySet(); + context.getDeployLogger(), + false) + .keySet(); } /** diff --git a/config-model/src/main/java/com/yahoo/vespa/model/builder/xml/dom/NodesSpecification.java b/config-model/src/main/java/com/yahoo/vespa/model/builder/xml/dom/NodesSpecification.java index 526e88749fc..aace6818ba6 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/builder/xml/dom/NodesSpecification.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/builder/xml/dom/NodesSpecification.java @@ -202,15 +202,17 @@ public class NodesSpecification { public Map<HostResource, ClusterMembership> provision(HostSystem hostSystem, ClusterSpec.Type clusterType, ClusterSpec.Id clusterId, - DeployLogger logger) { + DeployLogger logger, + boolean stateful) { if (combinedId.isPresent()) clusterType = ClusterSpec.Type.combined; ClusterSpec cluster = ClusterSpec.request(clusterType, clusterId) - .vespaVersion(version) - .exclusive(exclusive) - .combinedId(combinedId.map(ClusterSpec.Id::from)) - .dockerImageRepository(dockerImageRepo) - .build(); + .vespaVersion(version) + .exclusive(exclusive) + .combinedId(combinedId.map(ClusterSpec.Id::from)) + .dockerImageRepository(dockerImageRepo) + .stateful(stateful) + .build(); return hostSystem.allocateHosts(cluster, Capacity.from(min, max, required, canFail), logger); } diff --git a/config-model/src/main/java/com/yahoo/vespa/model/container/xml/ContainerModelBuilder.java b/config-model/src/main/java/com/yahoo/vespa/model/container/xml/ContainerModelBuilder.java index 63426979649..193d12c428c 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/container/xml/ContainerModelBuilder.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/container/xml/ContainerModelBuilder.java @@ -207,8 +207,7 @@ public class ContainerModelBuilder extends ConfigModelBuilder<ContainerModel> { } private void addZooKeeper(ApplicationContainerCluster cluster, Element spec) { - Element zkElement = XML.getChild(spec, "zookeeper"); - if (zkElement == null) return; + if (!hasZooKeeper(spec)) return; Element nodesElement = XML.getChild(spec, "nodes"); boolean isCombined = nodesElement != null && nodesElement.hasAttribute("of"); if (isCombined) { @@ -589,7 +588,7 @@ public class ContainerModelBuilder extends ConfigModelBuilder<ContainerModel> { Element nodesElement, ConfigModelContext context) { applyNodesTagJvmArgs(nodes, getJvmOptions(cluster, nodesElement, context.getDeployLogger())); - if (!cluster.getJvmGCOptions().isPresent()) { + if (cluster.getJvmGCOptions().isEmpty()) { String jvmGCOptions = extractAttribute(nodesElement, VespaDomBuilder.JVM_GC_OPTIONS); cluster.setJvmGCOptions(buildJvmGCOptions(context.getDeployState(), jvmGCOptions)); } @@ -617,7 +616,7 @@ public class ContainerModelBuilder extends ConfigModelBuilder<ContainerModel> { if (nodesElement == null) { cluster.addContainers(allocateWithoutNodesTag(cluster, context)); } else { - List<ApplicationContainer> nodes = createNodes(cluster, nodesElement, context); + List<ApplicationContainer> nodes = createNodes(cluster, containerElement, nodesElement, context); Element jvmElement = XML.getChild(nodesElement, "jvm"); if (jvmElement == null) { @@ -628,7 +627,7 @@ public class ContainerModelBuilder extends ConfigModelBuilder<ContainerModel> { applyRoutingAliasProperties(nodes, cluster); applyDefaultPreload(nodes, nodesElement); String environmentVars = getEnvironmentVariables(XML.getChild(nodesElement, ENVIRONMENT_VARIABLES_ELEMENT)); - if (environmentVars != null && !environmentVars.isEmpty()) { + if (!environmentVars.isEmpty()) { cluster.setEnvironmentVars(environmentVars); } if (useCpuSocketAffinity(nodesElement)) @@ -648,15 +647,15 @@ public class ContainerModelBuilder extends ConfigModelBuilder<ContainerModel> { return sb.toString(); } - private List<ApplicationContainer> createNodes(ApplicationContainerCluster cluster, Element nodesElement, ConfigModelContext context) { + private List<ApplicationContainer> createNodes(ApplicationContainerCluster cluster, Element containerElement, Element nodesElement, ConfigModelContext context) { if (nodesElement.hasAttribute("type")) // internal use for hosted system infrastructure nodes return createNodesFromNodeType(cluster, nodesElement, context); else if (nodesElement.hasAttribute("of")) // hosted node spec referencing a content cluster return createNodesFromContentServiceReference(cluster, nodesElement, context); else if (nodesElement.hasAttribute("count")) // regular, hosted node spec - return createNodesFromNodeCount(cluster, nodesElement, context); + return createNodesFromNodeCount(cluster, containerElement, nodesElement, context); else if (cluster.isHostedVespa() && cluster.getZone().environment().isManuallyDeployed()) // default to 1 in manual zones - return createNodesFromNodeCount(cluster, nodesElement, context); + return createNodesFromNodeCount(cluster, containerElement, nodesElement, context); else // the non-hosted option return createNodesFromNodeList(context.getDeployState(), cluster, nodesElement); } @@ -720,12 +719,13 @@ public class ContainerModelBuilder extends ConfigModelBuilder<ContainerModel> { return List.of(node); } - private List<ApplicationContainer> createNodesFromNodeCount(ApplicationContainerCluster cluster, Element nodesElement, ConfigModelContext context) { + private List<ApplicationContainer> createNodesFromNodeCount(ApplicationContainerCluster cluster, Element containerElement, Element nodesElement, ConfigModelContext context) { NodesSpecification nodesSpecification = NodesSpecification.from(new ModelElement(nodesElement), context); Map<HostResource, ClusterMembership> hosts = nodesSpecification.provision(cluster.getRoot().hostSystem(), ClusterSpec.Type.container, ClusterSpec.Id.from(cluster.getName()), - log); + log, + hasZooKeeper(containerElement)); return createNodesFromHosts(context.getDeployLogger(), hosts, cluster); } @@ -862,7 +862,7 @@ public class ContainerModelBuilder extends ConfigModelBuilder<ContainerModel> { private void addIncludes(Element parentElement) { List<Element> includes = XML.getChildren(parentElement, IncludeDirs.INCLUDE); - if (includes == null || includes.isEmpty()) { + if (includes.isEmpty()) { return; } if (app == null) { @@ -940,6 +940,10 @@ public class ContainerModelBuilder extends ConfigModelBuilder<ContainerModel> { )); } + private static boolean hasZooKeeper(Element spec) { + return XML.getChild(spec, "zookeeper") != null; + } + /** Disallow renderers named "XmlRenderer" or "JsonRenderer" */ private static void validateRendererElement(Element element) { String idAttr = element.getAttribute("id"); diff --git a/config-model/src/main/java/com/yahoo/vespa/model/content/StorageGroup.java b/config-model/src/main/java/com/yahoo/vespa/model/content/StorageGroup.java index 894022d936d..3c0f8996c22 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/content/StorageGroup.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/content/StorageGroup.java @@ -1,12 +1,11 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.model.content; +import com.yahoo.config.application.api.DeployLogger; import com.yahoo.config.model.ConfigModelContext; import com.yahoo.config.model.deploy.DeployState; import com.yahoo.config.provision.ClusterMembership; import com.yahoo.config.provision.ClusterSpec; -import com.yahoo.config.application.api.DeployLogger; -import java.util.logging.Level; import com.yahoo.vespa.config.content.StorDistributionConfig; import com.yahoo.vespa.model.HostResource; import com.yahoo.vespa.model.HostSystem; @@ -25,6 +24,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.logging.Level; /** * A group of storage nodes/distributors. @@ -189,7 +189,7 @@ public class StorageGroup { HostSystem hostSystem, DeployLogger logger) { ClusterSpec.Id clusterId = ClusterSpec.Id.from(clusterIdString); - return nodesSpecification.provision(hostSystem, ClusterSpec.Type.content, clusterId, logger); + return nodesSpecification.provision(hostSystem, ClusterSpec.Type.content, clusterId, logger, true); } public static class Builder { diff --git a/config-model/src/main/java/com/yahoo/vespa/model/content/cluster/ContentCluster.java b/config-model/src/main/java/com/yahoo/vespa/model/content/cluster/ContentCluster.java index 8ae895b0695..93367f1a286 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/content/cluster/ContentCluster.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/content/cluster/ContentCluster.java @@ -354,7 +354,7 @@ public class ContentCluster extends AbstractConfigProducer implements } private Collection<HostResource> getControllerHosts(NodesSpecification nodesSpecification, Admin admin, String clusterName, ConfigModelContext context) { - return nodesSpecification.provision(admin.hostSystem(), ClusterSpec.Type.admin, ClusterSpec.Id.from(clusterName), context.getDeployLogger()).keySet(); + return nodesSpecification.provision(admin.hostSystem(), ClusterSpec.Type.admin, ClusterSpec.Id.from(clusterName), context.getDeployLogger(), false).keySet(); } private List<HostResource> drawControllerHosts(int count, StorageGroup rootGroup, Collection<ContainerModel> containers) { diff --git a/config-model/src/test/java/com/yahoo/config/model/provision/ModelProvisioningTest.java b/config-model/src/test/java/com/yahoo/config/model/provision/ModelProvisioningTest.java index 4ee05b56bca..f8695a76b11 100644 --- a/config-model/src/test/java/com/yahoo/config/model/provision/ModelProvisioningTest.java +++ b/config-model/src/test/java/com/yahoo/config/model/provision/ModelProvisioningTest.java @@ -1815,6 +1815,44 @@ public class ModelProvisioningTest { assertEquals(1, controller.getContainers().size()); } + @Test + public void testStatefulProperty() { + String servicesXml = + "<?xml version='1.0' encoding='utf-8' ?>" + + "<services>" + + " <container version='1.0' id='qrs'>" + + " <nodes count='1'/>" + + " </container>" + + " <container version='1.0' id='zk'>" + + " <zookeeper/>" + + " <nodes count='3'/>" + + " </container>" + + " <content version='1.0' id='content'>" + + " <redundancy>2</redundancy>" + + " <documents>" + + " <document type='type1' mode='index'/>" + + " </documents>" + + " <nodes count='2'/>" + + " </content>" + + "</services>"; + VespaModelTester tester = new VespaModelTester(); + tester.addHosts(6); + VespaModel model = tester.createModel(servicesXml, true); + + Map<String, Boolean> tests = Map.of("qrs", false, + "zk", true, + "content", true); + Map<String, List<HostResource>> hostsByCluster = model.hostSystem().getHosts().stream() + .collect(Collectors.groupingBy(h -> h.spec().membership().get().cluster().id().value())); + tests.forEach((clusterId, stateful) -> { + List<HostResource> hosts = hostsByCluster.getOrDefault(clusterId, List.of()); + assertFalse("Hosts are provisioned for '" + clusterId + "'", hosts.isEmpty()); + assertEquals("Hosts in cluster '" + clusterId + "' are " + (stateful ? "" : "not ") + "stateful", + stateful, + hosts.stream().allMatch(h -> h.spec().membership().get().cluster().isStateful())); + }); + } + private VespaModel createNonProvisionedMultitenantModel(String services) { return createNonProvisionedModel(true, null, services); } diff --git a/config-provisioning/src/main/java/com/yahoo/config/provision/ClusterMembership.java b/config-provisioning/src/main/java/com/yahoo/config/provision/ClusterMembership.java index 9fb1689782f..031fce88354 100644 --- a/config-provisioning/src/main/java/com/yahoo/config/provision/ClusterMembership.java +++ b/config-provisioning/src/main/java/com/yahoo/config/provision/ClusterMembership.java @@ -72,8 +72,7 @@ public class ClusterMembership { "/" + index + ( cluster.isExclusive() ? "/exclusive" : "") + ( retired ? "/retired" : "") + - // TODO(mpolden): Write stateful tag once all nodes can read it - // ( cluster.isStateful() ? "/stateful" : "") + + ( cluster.isStateful() ? "/stateful" : "") + ( cluster.combinedId().isPresent() ? "/" + cluster.combinedId().get().value() : ""); } diff --git a/config-provisioning/src/test/java/com/yahoo/config/provision/ClusterMembershipTest.java b/config-provisioning/src/test/java/com/yahoo/config/provision/ClusterMembershipTest.java index 1f170cca9a0..fc1cdd74169 100644 --- a/config-provisioning/src/test/java/com/yahoo/config/provision/ClusterMembershipTest.java +++ b/config-provisioning/src/test/java/com/yahoo/config/provision/ClusterMembershipTest.java @@ -48,10 +48,10 @@ public class ClusterMembershipTest { assertEquals(dockerImageRepo.get(), instance.cluster().dockerImageRepo().get()); } { - ClusterMembership instance = ClusterMembership.from("container/id1/4/37", Vtag.currentVersion, Optional.empty()); + ClusterMembership instance = ClusterMembership.from("container/id1/4/37/stateful", Vtag.currentVersion, Optional.empty()); ClusterMembership serialized = ClusterMembership.from(instance.stringValue(), Vtag.currentVersion, Optional.empty()); assertEquals(instance, serialized); - assertFalse("Skips serialization of stateful property", instance.cluster().isStateful()); + assertTrue(instance.cluster().isStateful()); } } @@ -117,7 +117,7 @@ public class ClusterMembershipTest { assertFalse(instance.cluster().group().isPresent()); assertEquals(37, instance.index()); assertFalse(instance.retired()); - assertEquals("content/id1/37", instance.stringValue()); + assertEquals("content/id1/37/stateful", instance.stringValue()); } private void assertContentServiceWithGroup(ClusterMembership instance) { @@ -126,7 +126,7 @@ public class ClusterMembershipTest { assertEquals(4, instance.cluster().group().get().index()); assertEquals(37, instance.index()); assertFalse(instance.retired()); - assertEquals("content/id1/4/37", instance.stringValue()); + assertEquals("content/id1/4/37/stateful", instance.stringValue()); } /** Serializing a spec without a group assigned works, but not deserialization */ @@ -135,7 +135,7 @@ public class ClusterMembershipTest { assertEquals("id1", instance.cluster().id().value()); assertEquals(37, instance.index()); assertTrue(instance.retired()); - assertEquals("content/id1/37/retired", instance.stringValue()); + assertEquals("content/id1/37/retired/stateful", instance.stringValue()); } private void assertContentServiceWithGroupAndRetire(ClusterMembership instance) { @@ -144,7 +144,7 @@ public class ClusterMembershipTest { assertEquals(4, instance.cluster().group().get().index()); assertEquals(37, instance.index()); assertTrue(instance.retired()); - assertEquals("content/id1/4/37/retired", instance.stringValue()); + assertEquals("content/id1/4/37/retired/stateful", instance.stringValue()); } } diff --git a/config/src/apps/vespa-get-config/getconfig.cpp b/config/src/apps/vespa-get-config/getconfig.cpp index dc12d2bbf0e..a5e400bd354 100644 --- a/config/src/apps/vespa-get-config/getconfig.cpp +++ b/config/src/apps/vespa-get-config/getconfig.cpp @@ -1,13 +1,13 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <vespa/fnet/frt/frt.h> +#include <vespa/fnet/frt/supervisor.h> +#include <vespa/fnet/frt/target.h> #include <vespa/config/config.h> #include <vespa/config/frt/frtconfigrequestfactory.h> #include <vespa/config/frt/frtconnection.h> #include <vespa/config/common/payload_converter.h> #include <vespa/fastos/app.h> - #include <string> #include <sstream> #include <fstream> @@ -28,7 +28,7 @@ private: public: GetConfig() : _server(), _target(nullptr) {} - virtual ~GetConfig(); + ~GetConfig() override; int usage(); void initRPC(const char *spec); void finiRPC(); diff --git a/config/src/apps/vespa-ping-configproxy/pingproxy.cpp b/config/src/apps/vespa-ping-configproxy/pingproxy.cpp index 208f9312ada..a47fd25f9af 100644 --- a/config/src/apps/vespa-ping-configproxy/pingproxy.cpp +++ b/config/src/apps/vespa-ping-configproxy/pingproxy.cpp @@ -1,6 +1,8 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <vespa/fnet/frt/frt.h> +#include <vespa/fnet/frt/supervisor.h> +#include <vespa/fnet/frt/target.h> +#include <vespa/fnet/frt/rpcrequest.h> #include <vespa/fastos/app.h> #include <sstream> @@ -15,12 +17,11 @@ private: std::unique_ptr<fnet::frt::StandaloneFRT> _server; FRT_Target *_target; - PingProxy(const PingProxy &); - PingProxy &operator=(const PingProxy &); - public: + PingProxy(const PingProxy &) = delete; + PingProxy &operator=(const PingProxy &) = delete; PingProxy() : _server(), _target(nullptr) {} - virtual ~PingProxy(); + ~PingProxy() override ; int usage(); void initRPC(const char *spec); void finiRPC(); diff --git a/config/src/tests/failover/failover.cpp b/config/src/tests/failover/failover.cpp index 99b6967c929..2e039081716 100644 --- a/config/src/tests/failover/failover.cpp +++ b/config/src/tests/failover/failover.cpp @@ -5,7 +5,9 @@ #include <vespa/config/frt/protocol.h> #include <vespa/config/config.h> #include <vespa/config/common/configcontext.h> -#include <vespa/fnet/frt/frt.h> +#include <vespa/fnet/frt/supervisor.h> +#include <vespa/fnet/frt/rpcrequest.h> + #include "config-my.h" #include <vespa/vespalib/data/slime/slime.h> #include <vespa/vespalib/data/simple_buffer.h> diff --git a/config/src/tests/file_acquirer/file_acquirer_test.cpp b/config/src/tests/file_acquirer/file_acquirer_test.cpp index 33bb8f47e09..da4bd71b82b 100644 --- a/config/src/tests/file_acquirer/file_acquirer_test.cpp +++ b/config/src/tests/file_acquirer/file_acquirer_test.cpp @@ -1,7 +1,8 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <vespa/vespalib/testkit/test_kit.h> #include <vespa/config/file_acquirer/file_acquirer.h> -#include <vespa/fnet/frt/frt.h> +#include <vespa/fnet/frt/supervisor.h> +#include <vespa/fnet/frt/rpcrequest.h> #include <vespa/vespalib/util/stringfmt.h> using namespace config; diff --git a/config/src/tests/frt/frt.cpp b/config/src/tests/frt/frt.cpp index 85b9789821d..0d70605fa62 100644 --- a/config/src/tests/frt/frt.cpp +++ b/config/src/tests/frt/frt.cpp @@ -11,9 +11,8 @@ #include <vespa/vespalib/data/slime/slime.h> #include <vespa/vespalib/data/slime/json_format.h> #include <vespa/vespalib/data/simple_buffer.h> -#include <vespa/fnet/fnet.h> -#include <vespa/fnet/frt/frt.h> #include <vespa/fnet/frt/error.h> +#include <vespa/fnet/frt/supervisor.h> #include <vespa/config/frt/protocol.h> #include <lz4.h> #include "config-my.h" diff --git a/config/src/vespa/config/frt/frtconfigresponsev3.cpp b/config/src/vespa/config/frt/frtconfigresponsev3.cpp index 9635b4c811c..80cdf88a79a 100644 --- a/config/src/vespa/config/frt/frtconfigresponsev3.cpp +++ b/config/src/vespa/config/frt/frtconfigresponsev3.cpp @@ -1,7 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "frtconfigresponsev3.h" #include "compressioninfo.h" -#include <vespa/fnet/frt/frt.h> +#include <vespa/fnet/frt/values.h> #include <vespa/vespalib/data/simple_buffer.h> #include <vespa/log/log.h> diff --git a/config/src/vespa/config/frt/slimeconfigrequest.cpp b/config/src/vespa/config/frt/slimeconfigrequest.cpp index 696789f74c1..8a6706974f6 100644 --- a/config/src/vespa/config/frt/slimeconfigrequest.cpp +++ b/config/src/vespa/config/frt/slimeconfigrequest.cpp @@ -1,12 +1,12 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "slimeconfigrequest.h" #include "connection.h" -#include <vespa/fnet/frt/frt.h> #include <vespa/config/common/configkey.h> #include <vespa/config/common/configstate.h> #include <vespa/config/common/configdefinition.h> #include <vespa/config/common/trace.h> #include <vespa/config/common/vespa_version.h> +#include <vespa/fnet/frt/rpcrequest.h> #include <vespa/vespalib/data/simple_buffer.h> diff --git a/config/src/vespa/config/frt/slimeconfigresponse.cpp b/config/src/vespa/config/frt/slimeconfigresponse.cpp index 181ab58b184..155515d2ed6 100644 --- a/config/src/vespa/config/frt/slimeconfigresponse.cpp +++ b/config/src/vespa/config/frt/slimeconfigresponse.cpp @@ -1,7 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "slimeconfigresponse.h" #include <vespa/config/common/misc.h> -#include <vespa/fnet/frt/frt.h> +#include <vespa/fnet/frt/values.h> #include <vespa/vespalib/stllike/string.h> #include <vespa/log/log.h> LOG_SETUP(".config.frt.slimeconfigresponse"); diff --git a/configd/src/apps/sentinel/cmdq.cpp b/configd/src/apps/sentinel/cmdq.cpp index 8fa3726c7f6..489ae97228f 100644 --- a/configd/src/apps/sentinel/cmdq.cpp +++ b/configd/src/apps/sentinel/cmdq.cpp @@ -1,7 +1,7 @@ // Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "cmdq.h" -#include <vespa/fnet/frt/frt.h> +#include <vespa/fnet/frt/rpcrequest.h> namespace config::sentinel { diff --git a/configd/src/apps/sentinel/rpchooks.cpp b/configd/src/apps/sentinel/rpchooks.cpp index 99bcd404402..aef58b8a1dc 100644 --- a/configd/src/apps/sentinel/rpchooks.cpp +++ b/configd/src/apps/sentinel/rpchooks.cpp @@ -2,7 +2,8 @@ #include "rpchooks.h" #include "cmdq.h" -#include <vespa/fnet/frt/frt.h> +#include <vespa/fnet/frt/supervisor.h> +#include <vespa/fnet/frt/rpcrequest.h> #include <vespa/log/log.h> LOG_SETUP(".rpchooks"); diff --git a/configutil/src/lib/configstatus.cpp b/configutil/src/lib/configstatus.cpp index 254fa94a8ec..e56da67fade 100644 --- a/configutil/src/lib/configstatus.cpp +++ b/configutil/src/lib/configstatus.cpp @@ -2,7 +2,6 @@ #include "configstatus.h" #include "tags.h" -#include <vespa/fnet/frt/frt.h> #include <vespa/vespalib/data/slime/slime.h> #include <vbench/http/http_result_handler.h> #include <vbench/http/server_spec.h> diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/externpolicy.cpp b/documentapi/src/vespa/documentapi/messagebus/policies/externpolicy.cpp index 25112a00b99..312cd2d89cb 100644 --- a/documentapi/src/vespa/documentapi/messagebus/policies/externpolicy.cpp +++ b/documentapi/src/vespa/documentapi/messagebus/policies/externpolicy.cpp @@ -2,10 +2,11 @@ #include "externpolicy.h" #include <boost/tokenizer.hpp> #include <vespa/documentapi/messagebus/documentprotocol.h> -#include <vespa/messagebus/emptyreply.h> #include <vespa/vespalib/util/stringfmt.h> #include <vespa/slobrok/sbmirror.h> -#include <vespa/fnet/frt/frt.h> +#include <vespa/fnet/transport.h> +#include <vespa/fnet/frt/supervisor.h> +#include <vespa/fastos/thread.h> #include <vespa/log/log.h> LOG_SETUP(".externpolicy"); @@ -52,13 +53,13 @@ ExternPolicy::ExternPolicy(const string ¶m) : spec.push_back(*it); } - if (spec.size() == 0) { + if (spec.empty()) { _error = vespalib::make_string("Extern policy needs at least one slobrok: Slobrok list '%s' resolved to no slobroks", lst.c_str()); return; } slobrok::ConfiguratorFactory config(spec); - _mirror.reset(new MirrorAPI(*_orb, config)); + _mirror = std::make_unique<MirrorAPI>(*_orb, config); _started = _transport->Start(_threadPool.get()); if (!_started) { _error = "Failed to start FNET supervisor."; diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/externslobrokpolicy.cpp b/documentapi/src/vespa/documentapi/messagebus/policies/externslobrokpolicy.cpp index 800aa8c4520..9eb28432234 100644 --- a/documentapi/src/vespa/documentapi/messagebus/policies/externslobrokpolicy.cpp +++ b/documentapi/src/vespa/documentapi/messagebus/policies/externslobrokpolicy.cpp @@ -4,10 +4,10 @@ #include <vespa/messagebus/routing/routingcontext.h> #include <vespa/config/common/configcontext.h> #include <vespa/vespalib/text/stringtokenizer.h> -#include <vespa/vespalib/util/time.h> -#include <vespa/fnet/frt/frt.h> #include <vespa/slobrok/sbmirror.h> +#include <vespa/fnet/frt/supervisor.h> #include <vespa/fnet/transport.h> +#include <vespa/fastos/thread.h> #include <thread> using slobrok::api::IMirrorAPI; diff --git a/eval/src/vespa/eval/streamed/streamed_value_index.cpp b/eval/src/vespa/eval/streamed/streamed_value_index.cpp index 38b57e9c660..17cf7316554 100644 --- a/eval/src/vespa/eval/streamed/streamed_value_index.cpp +++ b/eval/src/vespa/eval/streamed/streamed_value_index.cpp @@ -39,7 +39,7 @@ struct StreamedFilterView : Value::Index::View bool next_result(ConstArrayRef<vespalib::stringref*> addr_out, size_t &idx_out) override { while (const auto block = label_blocks.next_block()) { - idx_out = block.ss_idx; + idx_out = block.subspace_index; bool matches = true; size_t out_idx = 0; size_t vdm_idx = 0; @@ -73,7 +73,7 @@ struct StreamedIterationView : Value::Index::View bool next_result(ConstArrayRef<vespalib::stringref*> addr_out, size_t &idx_out) override { if (auto block = label_blocks.next_block()) { - idx_out = block.ss_idx; + idx_out = block.subspace_index; size_t i = 0; assert(addr_out.size() == block.address.size()); for (auto ptr : addr_out) { diff --git a/eval/src/vespa/eval/streamed/streamed_value_utils.h b/eval/src/vespa/eval/streamed/streamed_value_utils.h index 3e3da82dd22..b88d4df8581 100644 --- a/eval/src/vespa/eval/streamed/streamed_value_utils.h +++ b/eval/src/vespa/eval/streamed/streamed_value_utils.h @@ -29,9 +29,9 @@ struct LabelStream { **/ struct LabelBlock { static constexpr size_t npos = -1; - size_t ss_idx; + size_t subspace_index; ConstArrayRef<vespalib::stringref> address; - operator bool() const { return ss_idx != npos; } + operator bool() const { return subspace_index != npos; } }; /** diff --git a/fnet/src/examples/frt/rpc/echo_client.cpp b/fnet/src/examples/frt/rpc/echo_client.cpp index 06f4ef0ee5b..bb2ef66c6fa 100644 --- a/fnet/src/examples/frt/rpc/echo_client.cpp +++ b/fnet/src/examples/frt/rpc/echo_client.cpp @@ -1,6 +1,8 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <vespa/fnet/frt/frt.h> +#include <vespa/fnet/frt/supervisor.h> +#include <vespa/fnet/frt/target.h> +#include <vespa/fnet/frt/rpcrequest.h> #include <vespa/fastos/app.h> #include <vespa/log/log.h> diff --git a/fnet/src/examples/frt/rpc/rpc_callback_client.cpp b/fnet/src/examples/frt/rpc/rpc_callback_client.cpp index 726a500cc55..c63352d8f24 100644 --- a/fnet/src/examples/frt/rpc/rpc_callback_client.cpp +++ b/fnet/src/examples/frt/rpc/rpc_callback_client.cpp @@ -1,6 +1,8 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <vespa/fnet/frt/frt.h> +#include <vespa/fnet/frt/supervisor.h> +#include <vespa/fnet/frt/target.h> +#include <vespa/fnet/frt/rpcrequest.h> #include <vespa/fastos/app.h> #include <vespa/log/log.h> diff --git a/fnet/src/examples/frt/rpc/rpc_callback_server.cpp b/fnet/src/examples/frt/rpc/rpc_callback_server.cpp index 872894b190d..9832a59abad 100644 --- a/fnet/src/examples/frt/rpc/rpc_callback_server.cpp +++ b/fnet/src/examples/frt/rpc/rpc_callback_server.cpp @@ -1,6 +1,10 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <vespa/fnet/frt/frt.h> +#include <vespa/fnet/frt/supervisor.h> +#include <vespa/fnet/frt/rpcrequest.h> +#include <vespa/fnet/signalshutdown.h> +#include <vespa/fnet/transport.h> + #include <vespa/fastos/app.h> #include <thread> diff --git a/fnet/src/examples/frt/rpc/rpc_client.cpp b/fnet/src/examples/frt/rpc/rpc_client.cpp index fc1d54d3440..1c634f4b704 100644 --- a/fnet/src/examples/frt/rpc/rpc_client.cpp +++ b/fnet/src/examples/frt/rpc/rpc_client.cpp @@ -1,6 +1,8 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <vespa/fnet/frt/frt.h> +#include <vespa/fnet/frt/supervisor.h> +#include <vespa/fnet/frt/target.h> +#include <vespa/fnet/frt/rpcrequest.h> #include <vespa/fastos/app.h> #include <vespa/log/log.h> diff --git a/fnet/src/examples/frt/rpc/rpc_info.cpp b/fnet/src/examples/frt/rpc/rpc_info.cpp index d90d22d1986..0f8b8422241 100644 --- a/fnet/src/examples/frt/rpc/rpc_info.cpp +++ b/fnet/src/examples/frt/rpc/rpc_info.cpp @@ -1,6 +1,8 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <vespa/fnet/frt/frt.h> +#include <vespa/fnet/frt/supervisor.h> +#include <vespa/fnet/frt/target.h> +#include <vespa/fnet/frt/rpcrequest.h> #include <vespa/fastos/app.h> #include <vespa/log/log.h> diff --git a/fnet/src/examples/frt/rpc/rpc_invoke.cpp b/fnet/src/examples/frt/rpc/rpc_invoke.cpp index fb82622a537..d1f35429352 100644 --- a/fnet/src/examples/frt/rpc/rpc_invoke.cpp +++ b/fnet/src/examples/frt/rpc/rpc_invoke.cpp @@ -1,6 +1,8 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <vespa/fnet/frt/frt.h> +#include <vespa/fnet/frt/supervisor.h> +#include <vespa/fnet/frt/target.h> +#include <vespa/fnet/frt/rpcrequest.h> #include <vespa/fastos/app.h> #include <vespa/vespalib/locale/c.h> diff --git a/fnet/src/examples/frt/rpc/rpc_proxy.cpp b/fnet/src/examples/frt/rpc/rpc_proxy.cpp index a61e2d37197..93076344ce2 100644 --- a/fnet/src/examples/frt/rpc/rpc_proxy.cpp +++ b/fnet/src/examples/frt/rpc/rpc_proxy.cpp @@ -1,6 +1,14 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <vespa/fnet/frt/frt.h> +#include <vespa/fnet/frt/supervisor.h> +#include <vespa/fnet/frt/rpcrequest.h> +#include <vespa/fnet/frt/target.h> +#include <vespa/fnet/frt/invoker.h> +#include <vespa/fnet/channel.h> +#include <vespa/fnet/transport_thread.h> +#include <vespa/fnet/transport.h> +#include <vespa/fnet/signalshutdown.h> + #include <vespa/fastos/app.h> #include <chrono> @@ -15,12 +23,10 @@ struct Session uint32_t id; uint32_t finiCnt; - Session(uint32_t xid) : client(nullptr), server(nullptr), id(xid), finiCnt(0) {} + explicit Session(uint32_t xid) : client(nullptr), server(nullptr), id(xid), finiCnt(0) {} ~Session() { assert(client == nullptr && server == nullptr && finiCnt == 2); } - -private: - Session(const Session &); - Session &operator=(const Session &); + Session(const Session &) = delete; + Session &operator=(const Session &) = delete; }; //----------------------------------------------------------------------------- @@ -34,10 +40,9 @@ private: uint32_t _currID; char _prefixStr[256]; - RPCProxy(const RPCProxy &); - RPCProxy &operator=(const RPCProxy &); - public: + RPCProxy(const RPCProxy &) = delete; + RPCProxy &operator=(const RPCProxy &) = delete; RPCProxy(FRT_Supervisor &supervisor, const char *spec, bool verbose) @@ -69,7 +74,7 @@ private: RPCProxy &_proxy; public: - ReqDone(RPCProxy &proxy) : _proxy(proxy) {} + explicit ReqDone(RPCProxy &proxy) : _proxy(proxy) {} void RequestDone(FRT_RPCRequest *req) override; }; diff --git a/fnet/src/examples/frt/rpc/rpc_server.cpp b/fnet/src/examples/frt/rpc/rpc_server.cpp index aa521080538..4333f182cc0 100644 --- a/fnet/src/examples/frt/rpc/rpc_server.cpp +++ b/fnet/src/examples/frt/rpc/rpc_server.cpp @@ -1,6 +1,9 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <vespa/fnet/frt/frt.h> +#include <vespa/fnet/frt/supervisor.h> +#include <vespa/fnet/frt/rpcrequest.h> +#include <vespa/fnet/signalshutdown.h> +#include <vespa/fnet/transport.h> #include <vespa/fastos/app.h> #include <vespa/log/log.h> diff --git a/fnet/src/examples/ping/packets.cpp b/fnet/src/examples/ping/packets.cpp index e5e9f645c9a..6aa54838c8b 100644 --- a/fnet/src/examples/ping/packets.cpp +++ b/fnet/src/examples/ping/packets.cpp @@ -1,7 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <vespa/fnet/fnet.h> #include "packets.h" +#include <vespa/fnet/databuffer.h> uint32_t PingRequest::GetPCODE() diff --git a/fnet/src/examples/ping/pingclient.cpp b/fnet/src/examples/ping/pingclient.cpp index 1d65ef3b69c..6a7bd21e715 100644 --- a/fnet/src/examples/ping/pingclient.cpp +++ b/fnet/src/examples/ping/pingclient.cpp @@ -1,8 +1,12 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <vespa/fnet/fnet.h> +#include <vespa/fnet/transport.h> +#include <vespa/fnet/simplepacketstreamer.h> +#include <vespa/fnet/channel.h> +#include <vespa/fnet/connection.h> #include <examples/ping/packets.h> #include <vespa/fastos/app.h> +#include <vespa/fastos/thread.h> #include <vespa/log/log.h> LOG_SETUP("pingclient"); diff --git a/fnet/src/examples/ping/pingserver.cpp b/fnet/src/examples/ping/pingserver.cpp index c4543134912..cb0ab02aa0d 100644 --- a/fnet/src/examples/ping/pingserver.cpp +++ b/fnet/src/examples/ping/pingserver.cpp @@ -1,6 +1,11 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <vespa/fnet/fnet.h> +#include <vespa/fnet/transport.h> +#include <vespa/fnet/signalshutdown.h> +#include <vespa/fnet/simplepacketstreamer.h> +#include <vespa/fnet/channel.h> +#include <vespa/fnet/iserveradapter.h> +#include <vespa/fnet/connector.h> #include <examples/ping/packets.h> #include <vespa/fastos/app.h> diff --git a/fnet/src/examples/proxy/proxy.cpp b/fnet/src/examples/proxy/proxy.cpp index a01a16ead9c..062a0d52627 100644 --- a/fnet/src/examples/proxy/proxy.cpp +++ b/fnet/src/examples/proxy/proxy.cpp @@ -1,6 +1,14 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <vespa/fnet/fnet.h> +#include <vespa/fnet/transport.h> +#include <vespa/fnet/transport_thread.h> +#include <vespa/fnet/connection.h> +#include <vespa/fnet/signalshutdown.h> +#include <vespa/fnet/packet.h> +#include <vespa/fnet/iserveradapter.h> +#include <vespa/fnet/ipacketstreamer.h> +#include <vespa/fnet/channel.h> +#include <vespa/fnet/connector.h> #include <vespa/fastos/app.h> #include <vespa/log/log.h> @@ -140,7 +148,7 @@ private: public: Proxy() : _transport() {} - ~Proxy() { } + ~Proxy() override { } bool GetPacketInfo(FNET_DataBuffer *src, uint32_t *plen, uint32_t *pcode, uint32_t *chid, bool *) override; FNET_Packet *Decode(FNET_DataBuffer *src, uint32_t plen, uint32_t pcode, FNET_Context) override; void Encode(FNET_Packet *packet, uint32_t chid, FNET_DataBuffer *dst) override; diff --git a/fnet/src/examples/timeout/timeout.cpp b/fnet/src/examples/timeout/timeout.cpp index 23dfbeb9070..9f363ccd864 100644 --- a/fnet/src/examples/timeout/timeout.cpp +++ b/fnet/src/examples/timeout/timeout.cpp @@ -1,7 +1,11 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <vespa/fnet/fnet.h> +#include <vespa/fnet/transport.h> +#include <vespa/fnet/signalshutdown.h> +#include <vespa/fnet/packetqueue.h> +#include <vespa/fnet/controlpacket.h> #include <vespa/fastos/app.h> +#include <vespa/fastos/thread.h> #include <vespa/vespalib/util/time.h> #include <thread> diff --git a/fnet/src/tests/connect/connect_test.cpp b/fnet/src/tests/connect/connect_test.cpp index 3fe7b5b7614..2b4a2bbe9f0 100644 --- a/fnet/src/tests/connect/connect_test.cpp +++ b/fnet/src/tests/connect/connect_test.cpp @@ -1,7 +1,12 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <vespa/vespalib/testkit/test_kit.h> -#include <vespa/fnet/fnet.h> +#include <vespa/fnet/transport.h> +#include <vespa/fnet/transport_thread.h> +#include <vespa/fnet/simplepacketstreamer.h> +#include <vespa/fnet/ipackethandler.h> +#include <vespa/fnet/connection.h> +#include <vespa/fnet/controlpacket.h> #include <vespa/vespalib/net/server_socket.h> #include <vespa/vespalib/net/crypto_engine.h> #include <vespa/vespalib/util/stringfmt.h> @@ -90,13 +95,13 @@ struct TransportFixture : FNET_IPacketHandler, FNET_IConnectionCleanupHandler { transport.Start(&pool); } TransportFixture(AsyncResolver::HostResolver::SP host_resolver) - : streamer(nullptr), pool(128 * 1024), transport(make_resolver(std::move(host_resolver)), 1), + : streamer(nullptr), pool(128 * 1024), transport(TransportConfig().resolver(make_resolver(std::move(host_resolver)))), conn_lost(), conn_deleted() { transport.Start(&pool); } TransportFixture(CryptoEngine::SP crypto) - : streamer(nullptr), pool(128 * 1024), transport(crypto, 1), + : streamer(nullptr), pool(128 * 1024), transport(TransportConfig().crypto(std::move(crypto))), conn_lost(), conn_deleted() { transport.Start(&pool); @@ -114,7 +119,7 @@ struct TransportFixture : FNET_IPacketHandler, FNET_IConnectionCleanupHandler { conn->SetCleanupHandler(this); return conn; } - ~TransportFixture() { + ~TransportFixture() override { transport.ShutDown(true); pool.Close(); } diff --git a/fnet/src/tests/connection_spread/connection_spread_test.cpp b/fnet/src/tests/connection_spread/connection_spread_test.cpp index 11120ebc3dc..caeb4211ab2 100644 --- a/fnet/src/tests/connection_spread/connection_spread_test.cpp +++ b/fnet/src/tests/connection_spread/connection_spread_test.cpp @@ -1,6 +1,12 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <vespa/vespalib/testkit/test_kit.h> -#include <vespa/fnet/fnet.h> +#include <vespa/fnet/transport.h> +#include <vespa/fnet/transport_thread.h> +#include <vespa/fnet/iserveradapter.h> +#include <vespa/fnet/ipacketstreamer.h> +#include <vespa/fnet/connector.h> +#include <vespa/fnet/connection.h> +#include <vespa/fastos/thread.h> #include <vespa/vespalib/util/stringfmt.h> #include <thread> #include <chrono> diff --git a/fnet/src/tests/frt/method_pt/method_pt.cpp b/fnet/src/tests/frt/method_pt/method_pt.cpp index 53960d73466..450731fe1aa 100644 --- a/fnet/src/tests/frt/method_pt/method_pt.cpp +++ b/fnet/src/tests/frt/method_pt/method_pt.cpp @@ -1,8 +1,9 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <vespa/vespalib/testkit/test_kit.h> #include <vespa/vespalib/util/stringfmt.h> -#include <vespa/fnet/frt/frt.h> - +#include <vespa/fnet/frt/supervisor.h> +#include <vespa/fnet/frt/target.h> +#include <vespa/fnet/frt/rpcrequest.h> class Test; class SimpleHandler; diff --git a/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp b/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp index ed4911175a0..2577b4e6155 100644 --- a/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp +++ b/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp @@ -1,7 +1,10 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <vespa/vespalib/testkit/test_kit.h> -#include <vespa/vespalib/util/stringfmt.h> -#include <vespa/fnet/frt/frt.h> +#include <vespa/fnet/frt/supervisor.h> +#include <vespa/fnet/frt/rpcrequest.h> +#include <vespa/fnet/frt/target.h> +#include <vespa/fnet/transport.h> +#include <vespa/fastos/thread.h> #include <vespa/vespalib/util/benchmark_timer.h> #include <vespa/vespalib/net/crypto_engine.h> #include <vespa/vespalib/net/tls/tls_crypto_engine.h> @@ -15,7 +18,7 @@ struct Rpc : FRT_Invokable { FNET_Transport transport; FRT_Supervisor orb; Rpc(CryptoEngine::SP crypto, size_t num_threads) - : thread_pool(128 * 1024), transport(crypto, num_threads), orb(&transport) {} + : thread_pool(128 * 1024), transport(TransportConfig(num_threads).crypto(std::move(crypto))), orb(&transport) {} void start() { ASSERT_TRUE(transport.Start(&thread_pool)); } @@ -26,7 +29,7 @@ struct Rpc : FRT_Invokable { FRT_Target *connect(uint32_t port) { return orb.GetTarget(port); } - ~Rpc() { + ~Rpc() override { transport.ShutDown(true); thread_pool.Close(); } @@ -34,7 +37,7 @@ struct Rpc : FRT_Invokable { struct Server : Rpc { uint32_t port; - Server(CryptoEngine::SP crypto, size_t num_threads) : Rpc(crypto, num_threads), port(listen()) { + Server(CryptoEngine::SP crypto, size_t num_threads) : Rpc(std::move(crypto), num_threads), port(listen()) { init_rpc(); start(); } @@ -54,7 +57,7 @@ struct Server : Rpc { struct Client : Rpc { uint32_t port; - Client(CryptoEngine::SP crypto, size_t num_threads, const Server &server) : Rpc(crypto, num_threads), port(server.port) { + Client(CryptoEngine::SP crypto, size_t num_threads, const Server &server) : Rpc(std::move(crypto), num_threads), port(server.port) { start(); } FRT_Target *connect() { return Rpc::connect(port); } @@ -62,7 +65,7 @@ struct Client : Rpc { struct Result { std::vector<double> req_per_sec; - Result(size_t num_threads) : req_per_sec(num_threads, 0.0) {} + explicit Result(size_t num_threads) : req_per_sec(num_threads, 0.0) {} double throughput() const { double sum = 0.0; for (double sample: req_per_sec) { diff --git a/fnet/src/tests/frt/parallel_rpc/tls_rpc_bench.cpp b/fnet/src/tests/frt/parallel_rpc/tls_rpc_bench.cpp index cdb2636a8c1..8a954db26e0 100644 --- a/fnet/src/tests/frt/parallel_rpc/tls_rpc_bench.cpp +++ b/fnet/src/tests/frt/parallel_rpc/tls_rpc_bench.cpp @@ -6,7 +6,9 @@ #include <vespa/vespalib/net/tls/tls_crypto_engine.h> #include <vespa/vespalib/test/make_tls_options_for_testing.h> #include <vespa/vespalib/test/time_tracer.h> -#include <vespa/fnet/frt/frt.h> +#include <vespa/fnet/frt/supervisor.h> +#include <vespa/fnet/frt/target.h> +#include <vespa/fnet/frt/rpcrequest.h> #include <thread> #include <chrono> diff --git a/fnet/src/tests/frt/rpc/detach_return_invoke.cpp b/fnet/src/tests/frt/rpc/detach_return_invoke.cpp index 95dbe672909..68bd58d2f82 100644 --- a/fnet/src/tests/frt/rpc/detach_return_invoke.cpp +++ b/fnet/src/tests/frt/rpc/detach_return_invoke.cpp @@ -1,6 +1,9 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <vespa/vespalib/testkit/test_kit.h> -#include <vespa/fnet/frt/frt.h> +#include <vespa/fnet/frt/supervisor.h> +#include <vespa/fnet/frt/target.h> +#include <vespa/fnet/frt/rpcrequest.h> +#include <vespa/fnet/frt/invoker.h> #include <vespa/vespalib/util/stringfmt.h> struct Receptor : public FRT_IRequestWait diff --git a/fnet/src/tests/frt/rpc/invoke.cpp b/fnet/src/tests/frt/rpc/invoke.cpp index 410a60fa08a..bd2f6fa9e1d 100644 --- a/fnet/src/tests/frt/rpc/invoke.cpp +++ b/fnet/src/tests/frt/rpc/invoke.cpp @@ -3,7 +3,10 @@ #include <vespa/vespalib/net/socket_spec.h> #include <vespa/vespalib/util/benchmark_timer.h> #include <vespa/vespalib/util/latch.h> -#include <vespa/fnet/frt/frt.h> +#include <vespa/fnet/frt/supervisor.h> +#include <vespa/fnet/frt/target.h> +#include <vespa/fnet/frt/rpcrequest.h> +#include <vespa/fnet/frt/invoker.h> #include <mutex> #include <condition_variable> @@ -25,7 +28,7 @@ private: vespalib::Latch<FRT_RPCRequest*> _latch; public: RequestLatch() : _latch() {} - ~RequestLatch() { ASSERT_TRUE(!has_req()); } + ~RequestLatch() override { ASSERT_TRUE(!has_req()); } bool has_req() { return _latch.has_value(); } FRT_RPCRequest *read() { return _latch.read(); } void write(FRT_RPCRequest *req) { _latch.write(req); } @@ -38,8 +41,8 @@ class MyReq { private: FRT_RPCRequest *_req; public: - MyReq(FRT_RPCRequest *req) : _req(req) {} - MyReq(const char *method_name) + explicit MyReq(FRT_RPCRequest *req) : _req(req) {} + explicit MyReq(const char *method_name) : _req(new FRT_RPCRequest()) { _req->SetMethodName(method_name); @@ -270,8 +273,6 @@ public: _testRPC(&_server.supervisor()), _echoTest(&_server.supervisor()) { - _client.supervisor().GetTransport()->SetTCPNoDelay(true); - _server.supervisor().GetTransport()->SetTCPNoDelay(true); ASSERT_TRUE(_server.supervisor().Listen("tcp/0")); _peerSpec = SocketSpec::from_host_port("localhost", _server.supervisor().GetListenPort()).spec(); _target = _client.supervisor().GetTarget(_peerSpec.c_str()); diff --git a/fnet/src/tests/frt/rpc/session.cpp b/fnet/src/tests/frt/rpc/session.cpp index 24cbedb3ff7..b96c881ba27 100644 --- a/fnet/src/tests/frt/rpc/session.cpp +++ b/fnet/src/tests/frt/rpc/session.cpp @@ -1,7 +1,9 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <vespa/vespalib/testkit/test_kit.h> -#include <vespa/fnet/frt/frt.h> +#include <vespa/fnet/frt/supervisor.h> +#include <vespa/fnet/frt/target.h> +#include <vespa/fnet/frt/rpcrequest.h> #include <mutex> //------------------------------------------------------------- diff --git a/fnet/src/tests/frt/rpc/sharedblob.cpp b/fnet/src/tests/frt/rpc/sharedblob.cpp index 1c0503454c7..09297bbf1c3 100644 --- a/fnet/src/tests/frt/rpc/sharedblob.cpp +++ b/fnet/src/tests/frt/rpc/sharedblob.cpp @@ -1,7 +1,9 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <vespa/vespalib/testkit/test_kit.h> -#include <vespa/fnet/frt/frt.h> +#include <vespa/fnet/frt/supervisor.h> +#include <vespa/fnet/frt/target.h> +#include <vespa/fnet/frt/rpcrequest.h> #include <vector> constexpr size_t ALLOC_LIMIT=1024; diff --git a/fnet/src/tests/info/info.cpp b/fnet/src/tests/info/info.cpp index 1d26f2b0fa1..3422efe1da6 100644 --- a/fnet/src/tests/info/info.cpp +++ b/fnet/src/tests/info/info.cpp @@ -1,6 +1,10 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <vespa/vespalib/testkit/test_kit.h> -#include <vespa/fnet/frt/frt.h> +#include <vespa/fnet/frt/supervisor.h> +#include <vespa/fnet/frt/rpcrequest.h> +#include <vespa/fnet/frt/target.h> +#include <vespa/fnet/channel.h> +#include <vespa/fnet/info.h> #include <mutex> #include <condition_variable> diff --git a/fnet/src/tests/locking/castspeed.cpp b/fnet/src/tests/locking/castspeed.cpp index dc82073ea57..2f784e625be 100644 --- a/fnet/src/tests/locking/castspeed.cpp +++ b/fnet/src/tests/locking/castspeed.cpp @@ -1,6 +1,5 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <vespa/vespalib/testkit/test_kit.h> -#include <vespa/fnet/fnet.h> #include <chrono> class B; diff --git a/fnet/src/tests/locking/drainpackets.cpp b/fnet/src/tests/locking/drainpackets.cpp index 9db43a5eb52..2d0ab6e2808 100644 --- a/fnet/src/tests/locking/drainpackets.cpp +++ b/fnet/src/tests/locking/drainpackets.cpp @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <vespa/vespalib/testkit/test_kit.h> -#include <vespa/fnet/fnet.h> +#include <vespa/fnet/packetqueue.h> +#include <vespa/fnet/packet.h> #include <mutex> #include <chrono> diff --git a/fnet/src/tests/locking/lockspeed.cpp b/fnet/src/tests/locking/lockspeed.cpp index ae6b983d724..a3ab48edd22 100644 --- a/fnet/src/tests/locking/lockspeed.cpp +++ b/fnet/src/tests/locking/lockspeed.cpp @@ -1,6 +1,5 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <vespa/vespalib/testkit/test_kit.h> -#include <vespa/fnet/fnet.h> #include "dummy.h" #include <chrono> diff --git a/fnet/src/tests/scheduling/schedule.cpp b/fnet/src/tests/scheduling/schedule.cpp index a1b02a688bf..fc5b5687c87 100644 --- a/fnet/src/tests/scheduling/schedule.cpp +++ b/fnet/src/tests/scheduling/schedule.cpp @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <vespa/vespalib/testkit/test_kit.h> -#include <vespa/fnet/fnet.h> +#include <vespa/fnet/scheduler.h> +#include <vespa/fnet/task.h> using my_clock = FNET_Scheduler::clock; using time_point = my_clock::time_point; diff --git a/fnet/src/tests/scheduling/sloweventloop.cpp b/fnet/src/tests/scheduling/sloweventloop.cpp index 5ddc9b1502c..a36afbc9d92 100644 --- a/fnet/src/tests/scheduling/sloweventloop.cpp +++ b/fnet/src/tests/scheduling/sloweventloop.cpp @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <vespa/vespalib/testkit/test_kit.h> -#include <vespa/fnet/fnet.h> +#include <vespa/fnet/scheduler.h> +#include <vespa/fnet/task.h> class MyTask : public FNET_Task { diff --git a/fnet/src/tests/thread_selection/thread_selection_test.cpp b/fnet/src/tests/thread_selection/thread_selection_test.cpp index ecfe0c57d5c..35df07221a2 100644 --- a/fnet/src/tests/thread_selection/thread_selection_test.cpp +++ b/fnet/src/tests/thread_selection/thread_selection_test.cpp @@ -1,6 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <vespa/vespalib/testkit/test_kit.h> -#include <vespa/fnet/fnet.h> +#include <vespa/fnet/transport.h> #include <vespa/vespalib/util/stringfmt.h> #include <thread> #include <chrono> diff --git a/fnet/src/tests/time/timespeed.cpp b/fnet/src/tests/time/timespeed.cpp index e6d2af5a278..eda367f1990 100644 --- a/fnet/src/tests/time/timespeed.cpp +++ b/fnet/src/tests/time/timespeed.cpp @@ -1,6 +1,5 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <vespa/vespalib/testkit/test_kit.h> -#include <vespa/fnet/fnet.h> #include <vespa/vespalib/util/benchmark_timer.h> #include <chrono> diff --git a/fnet/src/vespa/fnet/config.cpp b/fnet/src/vespa/fnet/config.cpp index a546d38f78b..aee9ac6f2c1 100644 --- a/fnet/src/vespa/fnet/config.cpp +++ b/fnet/src/vespa/fnet/config.cpp @@ -3,7 +3,8 @@ #include "config.h" FNET_Config::FNET_Config() - : _iocTimeOut(0), + : _events_before_wakeup(1), + _iocTimeOut(0), _maxInputBufferSize(0x10000), _maxOutputBufferSize(0x10000), _tcpNoDelay(true) diff --git a/fnet/src/vespa/fnet/config.h b/fnet/src/vespa/fnet/config.h index 3f34c1511b6..f512d7cde75 100644 --- a/fnet/src/vespa/fnet/config.h +++ b/fnet/src/vespa/fnet/config.h @@ -11,6 +11,7 @@ class FNET_Config { public: + uint32_t _events_before_wakeup; uint32_t _iocTimeOut; uint32_t _maxInputBufferSize; uint32_t _maxOutputBufferSize; diff --git a/fnet/src/vespa/fnet/connection.cpp b/fnet/src/vespa/fnet/connection.cpp index c5afd627a5a..a2e6fe25edc 100644 --- a/fnet/src/vespa/fnet/connection.cpp +++ b/fnet/src/vespa/fnet/connection.cpp @@ -347,7 +347,7 @@ done_read: } UpdateTimeOut(); - uint32_t maxSize = GetConfig()->_maxInputBufferSize; + uint32_t maxSize = getConfig()._maxInputBufferSize; if (maxSize > 0 && _input.GetBufSize() > maxSize) { if (!_flags._gotheader || _packetLength < maxSize) { @@ -430,7 +430,7 @@ FNET_Connection::Write() } } - uint32_t maxSize = GetConfig()->_maxOutputBufferSize; + uint32_t maxSize = getConfig()._maxOutputBufferSize; if (maxSize > 0 && _output.GetBufSize() > maxSize) { _output.Shrink(maxSize); } diff --git a/fnet/src/vespa/fnet/fnet.h b/fnet/src/vespa/fnet/fnet.h deleted file mode 100644 index c7570e025ec..00000000000 --- a/fnet/src/vespa/fnet/fnet.h +++ /dev/null @@ -1,68 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#pragma once - -#include <vespa/vespalib/component/vtag.h> - -// DEPRECATED - -#define DEPRECATED __attribute__((deprecated)) - -// FORWARD DECLARATIONS - -class FNET_IPacketFactory; -class FNET_IPacketHandler; -class FNET_IPacketStreamer; -class FNET_IServerAdapter; -class FNET_IExecutable; - -class FNET_Channel; -class FNET_ChannelLookup; -class FNET_ChannelPool; -class FNET_Config; -class FNET_Connection; -class FNET_Connector; -class FNET_Context; -class FNET_ControlPacket; -class FNET_DataBuffer; -class FNET_DummyPacket; -class FNET_Info; -class FNET_IOComponent; -class FNET_Packet; -class FNET_PacketQueue; -class FNET_Scheduler; -class FNET_SimplePacketStreamer; -class FNET_Task; -class FNET_Transport; -class FNET_TransportThread; - -// CONTEXT CLASS (union of types) -#include "context.h" - -// INTERFACES -#include "ipacketfactory.h" -#include "ipackethandler.h" -#include "ipacketstreamer.h" -#include "iserveradapter.h" -#include "iexecutable.h" - -// CLASSES -#include "task.h" -#include "scheduler.h" -#include "config.h" -#include "databuffer.h" -#include "packet.h" -#include "dummypacket.h" -#include "controlpacket.h" -#include "packetqueue.h" -#include "channel.h" -#include "channellookup.h" -#include "simplepacketstreamer.h" -#include "transport_thread.h" -#include "iocomponent.h" -#include "transport.h" -#include "connection.h" -#include "connector.h" -#include "info.h" -#include "signalshutdown.h" - diff --git a/fnet/src/vespa/fnet/frt/frt.h b/fnet/src/vespa/fnet/frt/frt.h deleted file mode 100644 index 490b9b0a2b1..00000000000 --- a/fnet/src/vespa/fnet/frt/frt.h +++ /dev/null @@ -1,35 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#pragma once - -class FRT_Invokable; -class FRT_IAbortHandler; -class FRT_IReturnHandler; -class FRT_ICleanupHandler; -class FRT_ISharedBlob; - -class FRT_Method; -class FRT_PacketFactory; -class FRT_ReflectionBuilder; -class FRT_ReflectionManager; -class FRT_RPCErrorPacket; -class FRT_RPCInvoker; -class FRT_RPCReplyPacket; -class FRT_RPCRequest; -class FRT_RPCRequestPacket; -class FRT_Supervisor; -class FRT_Target; -class FRT_Values; - -#include <vespa/fnet/fnet.h> -#include "error.h" -#include "isharedblob.h" -#include "invokable.h" -#include "values.h" -#include "reflection.h" -#include "rpcrequest.h" -#include "packets.h" -#include "invoker.h" -#include "supervisor.h" -#include "target.h" - diff --git a/fnet/src/vespa/fnet/frt/supervisor.cpp b/fnet/src/vespa/fnet/frt/supervisor.cpp index 7e92855583e..388d754ece4 100644 --- a/fnet/src/vespa/fnet/frt/supervisor.cpp +++ b/fnet/src/vespa/fnet/frt/supervisor.cpp @@ -419,7 +419,7 @@ StandaloneFRT::StandaloneFRT() StandaloneFRT::StandaloneFRT(vespalib::CryptoEngine::SP crypto) : _threadPool(std::make_unique<FastOS_ThreadPool>(1024*128)), - _transport(std::make_unique<FNET_Transport>(std::move(crypto), 1)), + _transport(std::make_unique<FNET_Transport>(TransportConfig().crypto(std::move(crypto)))), _supervisor(std::make_unique<FRT_Supervisor>(_transport.get())) { _transport->Start(_threadPool.get()); diff --git a/fnet/src/vespa/fnet/iocomponent.cpp b/fnet/src/vespa/fnet/iocomponent.cpp index 81d1bf47567..7afab188b66 100644 --- a/fnet/src/vespa/fnet/iocomponent.cpp +++ b/fnet/src/vespa/fnet/iocomponent.cpp @@ -32,9 +32,9 @@ FNET_IOComponent::~FNET_IOComponent() assert(_ioc_selector == nullptr); } -FNET_Config * -FNET_IOComponent::GetConfig() { - return _ioc_owner->GetConfig(); +const FNET_Config & +FNET_IOComponent::getConfig() const { + return _ioc_owner->getConfig(); } void diff --git a/fnet/src/vespa/fnet/iocomponent.h b/fnet/src/vespa/fnet/iocomponent.h index 84e3c8bd412..5fd5079a8af 100644 --- a/fnet/src/vespa/fnet/iocomponent.h +++ b/fnet/src/vespa/fnet/iocomponent.h @@ -138,7 +138,7 @@ public: * * @return config object. **/ - FNET_Config *GetConfig(); + const FNET_Config & getConfig() const; /** diff --git a/fnet/src/vespa/fnet/transport.cpp b/fnet/src/vespa/fnet/transport.cpp index e59186069ce..ce5f44efb7c 100644 --- a/fnet/src/vespa/fnet/transport.cpp +++ b/fnet/src/vespa/fnet/transport.cpp @@ -25,15 +25,33 @@ VESPA_THREAD_STACK_TAG(fnet_work_pool); } // namespace <unnamed> -FNET_Transport::FNET_Transport(vespalib::AsyncResolver::SP resolver, vespalib::CryptoEngine::SP crypto, size_t num_threads) - : _async_resolver(std::move(resolver)), - _crypto_engine(std::move(crypto)), +TransportConfig::TransportConfig(int num_threads) + : _config(), + _resolver(), + _crypto(), + _num_threads(num_threads) +{} + +TransportConfig::~TransportConfig() = default; + +vespalib::AsyncResolver::SP +TransportConfig::resolver() const { + return _resolver ? _resolver : vespalib::AsyncResolver::get_shared(); +} +vespalib::CryptoEngine::SP +TransportConfig::crypto() const { + return _crypto ? _crypto : vespalib::CryptoEngine::get_default(); +} + +FNET_Transport::FNET_Transport(TransportConfig cfg) + : _async_resolver(cfg.resolver()), + _crypto_engine(cfg.crypto()), _work_pool(std::make_unique<vespalib::ThreadStackExecutor>(1, 128 * 1024, fnet_work_pool, 1024)), _threads(), - _events_before_wakeup(1) + _config(cfg.config()) { - assert(num_threads >= 1); - for (size_t i = 0; i < num_threads; ++i) { + assert(cfg.num_threads() >= 1); + for (size_t i = 0; i < cfg.num_threads(); ++i) { _threads.emplace_back(std::make_unique<FNET_TransportThread>(*this)); } } @@ -104,38 +122,6 @@ FNET_Transport::GetNumIOComponents() } void -FNET_Transport::SetIOCTimeOut(uint32_t ms) -{ - for (const auto &thread: _threads) { - thread->SetIOCTimeOut(ms); - } -} - -void -FNET_Transport::SetMaxInputBufferSize(uint32_t bytes) -{ - for (const auto &thread: _threads) { - thread->SetMaxInputBufferSize(bytes); - } -} - -void -FNET_Transport::SetMaxOutputBufferSize(uint32_t bytes) -{ - for (const auto &thread: _threads) { - thread->SetMaxOutputBufferSize(bytes); - } -} - -void -FNET_Transport::SetTCPNoDelay(bool noDelay) -{ - for (const auto &thread: _threads) { - thread->SetTCPNoDelay(noDelay); - } -} - -void FNET_Transport::sync() { for (const auto &thread: _threads) { diff --git a/fnet/src/vespa/fnet/transport.h b/fnet/src/vespa/fnet/transport.h index 1126f68e69f..766eaa3ccaa 100644 --- a/fnet/src/vespa/fnet/transport.h +++ b/fnet/src/vespa/fnet/transport.h @@ -3,6 +3,7 @@ #pragma once #include "context.h" +#include "config.h" #include <memory> #include <vector> #include <vespa/vespalib/net/async_resolver.h> @@ -16,6 +17,48 @@ class FNET_IServerAdapter; class FNET_IPacketHandler; class FNET_Scheduler; +class TransportConfig { +public: + TransportConfig() : TransportConfig(1) {} + explicit TransportConfig(int num_threads); + ~TransportConfig(); + vespalib::AsyncResolver::SP resolver() const; + vespalib::CryptoEngine::SP crypto() const; + TransportConfig & resolver(vespalib::AsyncResolver::SP resolver_in) { + _resolver = std::move(resolver_in); + return *this; + } + TransportConfig & crypto(vespalib::CryptoEngine::SP crypto_in) { + _crypto = std::move(crypto_in); + return *this; + } + const FNET_Config & config() const { return _config; } + uint32_t num_threads() const { return _num_threads; } + + TransportConfig & events_before_wakeup(uint32_t v) { + if (v > 1) { + _config._events_before_wakeup = v; + } + return *this; + } + TransportConfig & maxInputBufferSize(uint32_t v) { + _config._maxInputBufferSize = v; + return *this; + } + TransportConfig & maxOutputBufferSize(uint32_t v) { + _config._maxOutputBufferSize = v; + return *this; + } + TransportConfig & tcpNoDelay(bool v) { + _config._tcpNoDelay = v; + return *this; + } +private: + FNET_Config _config; + vespalib::AsyncResolver::SP _resolver; + vespalib::CryptoEngine::SP _crypto; + uint32_t _num_threads; +}; /** * This class represents the transport layer and handles a collection * of transport threads. Note: remember to shut down your transport @@ -28,12 +71,14 @@ private: using Threads = std::vector<Thread>; vespalib::AsyncResolver::SP _async_resolver; - vespalib::CryptoEngine::SP _crypto_engine; + vespalib::CryptoEngine::SP _crypto_engine; std::unique_ptr<vespalib::SyncableThreadExecutor> _work_pool; - Threads _threads; - size_t _events_before_wakeup; + Threads _threads; + const FNET_Config _config; public: + FNET_Transport(const FNET_Transport &) = delete; + FNET_Transport & operator = (const FNET_Transport &) = delete; /** * Construct a transport layer. To activate your newly created * transport object you need to call either the Start method to @@ -41,22 +86,15 @@ public: * the current thread become the transport thread. Main may only * be called for single-threaded transports. **/ - FNET_Transport(vespalib::AsyncResolver::SP resolver, vespalib::CryptoEngine::SP crypto, size_t num_threads); + explicit FNET_Transport(TransportConfig config); - FNET_Transport(vespalib::AsyncResolver::SP resolver, size_t num_threads) - : FNET_Transport(std::move(resolver), vespalib::CryptoEngine::get_default(), num_threads) {} - FNET_Transport(vespalib::CryptoEngine::SP crypto, size_t num_threads) - : FNET_Transport(vespalib::AsyncResolver::get_shared(), std::move(crypto), num_threads) {} - FNET_Transport(size_t num_threads) - : FNET_Transport(vespalib::AsyncResolver::get_shared(), vespalib::CryptoEngine::get_default(), num_threads) {} + explicit FNET_Transport(uint32_t num_threads) + : FNET_Transport(TransportConfig(num_threads)) {} FNET_Transport() - : FNET_Transport(vespalib::AsyncResolver::get_shared(), vespalib::CryptoEngine::get_default(), 1) {} + : FNET_Transport(TransportConfig()) {} ~FNET_Transport(); - size_t events_before_wakeup() const { return _events_before_wakeup; } - void events_before_wakeup(size_t events_before_wakeup_in) { - _events_before_wakeup = events_before_wakeup_in; - } + const FNET_Config & getConfig() const { return _config; } /** * Try to execute the given task on the internal work pool @@ -173,54 +211,6 @@ public: uint32_t GetNumIOComponents(); /** - * Set the I/O Component timeout. Idle I/O Components with timeout - * enabled (determined by calling the ShouldTimeOut method) will - * time out if idle for the given number of milliseconds. An I/O - * component reports its un-idle-ness by calling the UpdateTimeOut - * method in the owning transport object. Calling this method with 0 - * as parameter will disable I/O Component timeouts. Note that newly - * created transport objects begin their lives with I/O Component - * timeouts disabled. An I/O Component timeout has the same effect - * as calling the Close method in the transport object with the - * target I/O Component as parameter. - * - * @param ms number of milliseconds before IOC idle timeout occurs. - **/ - void SetIOCTimeOut(uint32_t ms); - - /** - * Set maximum input buffer size. This value will only affect - * connections that use a common input buffer when decoding - * incoming packets. Note that this value is not an absolute - * max. The buffer will still grow larger than this value if - * needed to decode big packets. However, when the buffer becomes - * larger than this value, it will be shrunk back when possible. - * - * @param bytes buffer size in bytes. 0 means unlimited. - **/ - void SetMaxInputBufferSize(uint32_t bytes); - - /** - * Set maximum output buffer size. This value will only affect - * connections that use a common output buffer when encoding - * outgoing packets. Note that this value is not an absolute - * max. The buffer will still grow larger than this value if needed - * to encode big packets. However, when the buffer becomes larger - * than this value, it will be shrunk back when possible. - * - * @param bytes buffer size in bytes. 0 means unlimited. - **/ - void SetMaxOutputBufferSize(uint32_t bytes); - - /** - * Enable or disable use of the TCP_NODELAY flag with sockets - * created by this transport object. - * - * @param noDelay true if TCP_NODELAY flag should be used. - **/ - void SetTCPNoDelay(bool noDelay); - - /** * Synchronize with all transport threads. This method will block * until all events posted before this method was invoked has been * processed. If a transport thread has been shut down (or is in diff --git a/fnet/src/vespa/fnet/transport_thread.cpp b/fnet/src/vespa/fnet/transport_thread.cpp index d61eaffa24f..6b66b1ebe4b 100644 --- a/fnet/src/vespa/fnet/transport_thread.cpp +++ b/fnet/src/vespa/fnet/transport_thread.cpp @@ -125,7 +125,7 @@ FNET_TransportThread::PostEvent(FNET_ControlPacket *cpacket, _queue.QueuePacket_NoLock(cpacket, context); qLen = _queue.GetPacketCnt_NoLock(); } - if (qLen == _owner.events_before_wakeup()) { + if (qLen == getConfig()._events_before_wakeup) { _selector.wakeup(); } return true; @@ -209,7 +209,6 @@ FNET_TransportThread::FNET_TransportThread(FNET_Transport &owner_in) : _owner(owner_in), _now(clock::now()), _scheduler(&_now), - _config(), _componentsHead(nullptr), _timeOutHead(nullptr), _componentsTail(nullptr), @@ -242,13 +241,17 @@ FNET_TransportThread::~FNET_TransportThread() } } +const FNET_Config & +FNET_TransportThread::getConfig() const { + return _owner.getConfig(); +} bool FNET_TransportThread::tune(SocketHandle &handle) const { handle.set_keepalive(true); handle.set_linger(true, 0); - handle.set_nodelay(_config._tcpNoDelay); + handle.set_nodelay(getConfig()._tcpNoDelay); return handle.set_blocking(false); } @@ -486,8 +489,8 @@ FNET_TransportThread::EventLoopIteration() _selector.dispatch(*this); // handle IOC time-outs - if (_config._iocTimeOut > 0) { - time_point oldest = (_now - std::chrono::milliseconds(_config._iocTimeOut)); + if (getConfig()._iocTimeOut > 0) { + time_point oldest = (_now - std::chrono::milliseconds(getConfig()._iocTimeOut)); while (_timeOutHead != nullptr && oldest > _timeOutHead->_ioc_timestamp) { diff --git a/fnet/src/vespa/fnet/transport_thread.h b/fnet/src/vespa/fnet/transport_thread.h index b4319d4e2bc..c649a677cb4 100644 --- a/fnet/src/vespa/fnet/transport_thread.h +++ b/fnet/src/vespa/fnet/transport_thread.h @@ -37,7 +37,6 @@ private: FNET_Transport &_owner; // owning transport layer time_point _now; // current time sampler FNET_Scheduler _scheduler; // transport thread scheduler - FNET_Config _config; // FNET configuration [static] FNET_IOComponent *_componentsHead; // I/O component list head FNET_IOComponent *_timeOutHead; // first IOC in list to time out FNET_IOComponent *_componentsTail; // I/O component list tail @@ -140,7 +139,7 @@ private: * * @return config object. **/ - FNET_Config *GetConfig() { return &_config; } + const FNET_Config & getConfig() const; void handle_add_cmd(FNET_IOComponent *ioc); @@ -185,7 +184,7 @@ public: * * @param owner owning transport layer **/ - FNET_TransportThread(FNET_Transport &owner_in); + explicit FNET_TransportThread(FNET_Transport &owner_in); /** @@ -269,60 +268,6 @@ public: **/ uint32_t GetNumIOComponents() { return _componentCnt; } - - /** - * Set the I/O Component timeout. Idle I/O Components with timeout - * enabled (determined by calling the ShouldTimeOut method) will - * time out if idle for the given number of milliseconds. An I/O - * component reports its un-idle-ness by calling the UpdateTimeOut - * method in the owning transport object. Calling this method with 0 - * as parameter will disable I/O Component timeouts. Note that newly - * created transport objects begin their lives with I/O Component - * timeouts disabled. An I/O Component timeout has the same effect - * as calling the Close method in the transport object with the - * target I/O Component as parameter. - * - * @param ms number of milliseconds before IOC idle timeout occurs. - **/ - void SetIOCTimeOut(uint32_t ms) { _config._iocTimeOut = ms; } - - - /** - * Set maximum input buffer size. This value will only affect - * connections that use a common input buffer when decoding - * incoming packets. Note that this value is not an absolute - * max. The buffer will still grow larger than this value if - * needed to decode big packets. However, when the buffer becomes - * larger than this value, it will be shrunk back when possible. - * - * @param bytes buffer size in bytes. 0 means unlimited. - **/ - void SetMaxInputBufferSize(uint32_t bytes) - { _config._maxInputBufferSize = bytes; } - - - /** - * Set maximum output buffer size. This value will only affect - * connections that use a common output buffer when encoding - * outgoing packets. Note that this value is not an absolute - * max. The buffer will still grow larger than this value if needed - * to encode big packets. However, when the buffer becomes larger - * than this value, it will be shrunk back when possible. - * - * @param bytes buffer size in bytes. 0 means unlimited. - **/ - void SetMaxOutputBufferSize(uint32_t bytes) - { _config._maxOutputBufferSize = bytes; } - - /** - * Enable or disable use of the TCP_NODELAY flag with sockets - * created by this transport object. - * - * @param noDelay true if TCP_NODELAY flag should be used. - **/ - void SetTCPNoDelay(bool noDelay) { _config._tcpNoDelay = noDelay; } - - /** * Add an I/O component to the working set of this transport * object. Note that the actual work is performed by the transport diff --git a/jrt_test/src/jrt-test/simpleserver/simpleserver.cpp b/jrt_test/src/jrt-test/simpleserver/simpleserver.cpp index ff6c84b5b18..cf23a027905 100644 --- a/jrt_test/src/jrt-test/simpleserver/simpleserver.cpp +++ b/jrt_test/src/jrt-test/simpleserver/simpleserver.cpp @@ -1,6 +1,8 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <vespa/fnet/frt/frt.h> +#include <vespa/fnet/frt/supervisor.h> +#include <vespa/fnet/frt/rpcrequest.h> +#include <vespa/fnet/transport.h> #include <vespa/fastos/app.h> class Server : public FRT_Invokable diff --git a/jrt_test/src/tests/echo/echo-client.cpp b/jrt_test/src/tests/echo/echo-client.cpp index 3a87e38e6e0..4bc9ac743e4 100644 --- a/jrt_test/src/tests/echo/echo-client.cpp +++ b/jrt_test/src/tests/echo/echo-client.cpp @@ -1,6 +1,9 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <vespa/fnet/frt/frt.h> +#include <vespa/fnet/frt/supervisor.h> +#include <vespa/fnet/frt/target.h> +#include <vespa/fnet/frt/rpcrequest.h> + #include <vespa/fastos/app.h> class EchoClient : public FastOS_Application diff --git a/jrt_test/src/tests/mandatory-methods/extract-reflection.cpp b/jrt_test/src/tests/mandatory-methods/extract-reflection.cpp index cd1ad7e6eed..af9f60f84d3 100644 --- a/jrt_test/src/tests/mandatory-methods/extract-reflection.cpp +++ b/jrt_test/src/tests/mandatory-methods/extract-reflection.cpp @@ -1,7 +1,9 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <vespa/fastos/app.h> -#include <vespa/fnet/frt/frt.h> +#include <vespa/fnet/frt/supervisor.h> +#include <vespa/fnet/frt/target.h> +#include <vespa/fnet/frt/rpcrequest.h> #include <vespa/vespalib/util/time.h> #include <thread> diff --git a/jrt_test/src/tests/mockup-invoke/mockup-server.cpp b/jrt_test/src/tests/mockup-invoke/mockup-server.cpp index 4101bb8e9aa..1abe27407b3 100644 --- a/jrt_test/src/tests/mockup-invoke/mockup-server.cpp +++ b/jrt_test/src/tests/mockup-invoke/mockup-server.cpp @@ -1,15 +1,15 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <vespa/fnet/frt/frt.h> +#include <vespa/fnet/frt/supervisor.h> +#include <vespa/fnet/frt/rpcrequest.h> +#include <vespa/fnet/transport.h> #include <vespa/fastos/app.h> class MockupServer : public FRT_Invokable { -private: - MockupServer(const MockupServer &); - MockupServer &operator=(const MockupServer &); - public: + MockupServer(const MockupServer &) = delete; + MockupServer &operator=(const MockupServer &) = delete; MockupServer(FRT_Supervisor *s) { FRT_ReflectionBuilder rb(s); diff --git a/jrt_test/src/tests/rpc-error/test-errors.cpp b/jrt_test/src/tests/rpc-error/test-errors.cpp index c30af8ea579..1c0c057e433 100644 --- a/jrt_test/src/tests/rpc-error/test-errors.cpp +++ b/jrt_test/src/tests/rpc-error/test-errors.cpp @@ -1,6 +1,8 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <vespa/vespalib/testkit/testapp.h> -#include <vespa/fnet/frt/frt.h> +#include <vespa/fnet/frt/supervisor.h> +#include <vespa/fnet/frt/target.h> +#include <vespa/fnet/frt/rpcrequest.h> class TestErrors : public vespalib::TestApp { diff --git a/logd/src/logd/rpc_forwarder.cpp b/logd/src/logd/rpc_forwarder.cpp index ffc43bce0bb..90adf7c0b66 100644 --- a/logd/src/logd/rpc_forwarder.cpp +++ b/logd/src/logd/rpc_forwarder.cpp @@ -7,6 +7,10 @@ #include <vespa/log/exceptions.h> #include <vespa/vespalib/util/buffer.h> #include <vespa/vespalib/util/stringfmt.h> +#include <vespa/fnet/frt/rpcrequest.h> +#include <vespa/fnet/frt/supervisor.h> + + #include <vespa/log/log.h> LOG_SETUP(".logd.rpc_forwarder"); diff --git a/logd/src/logd/rpc_forwarder.h b/logd/src/logd/rpc_forwarder.h index 37729db088f..fbc3ecbc09a 100644 --- a/logd/src/logd/rpc_forwarder.h +++ b/logd/src/logd/rpc_forwarder.h @@ -5,10 +5,12 @@ #include "forwarder.h" #include "proto_converter.h" #include <vespa/log/log_message.h> -#include <vespa/fnet/frt/frt.h> +#include <vespa/fnet/frt/target.h> #include <memory> #include <vector> +class FRT_Supervisor; + namespace logdemon { struct Metrics; diff --git a/logd/src/tests/rpc_forwarder/rpc_forwarder_test.cpp b/logd/src/tests/rpc_forwarder/rpc_forwarder_test.cpp index d39a9ade0a8..370ca7f458e 100644 --- a/logd/src/tests/rpc_forwarder/rpc_forwarder_test.cpp +++ b/logd/src/tests/rpc_forwarder/rpc_forwarder_test.cpp @@ -5,6 +5,10 @@ #include <logd/rpc_forwarder.h> #include <vespa/vespalib/gtest/gtest.h> #include <vespa/vespalib/metrics/dummy_metrics_manager.h> +#include <vespa/fnet/frt/supervisor.h> +#include <vespa/fnet/frt/rpcrequest.h> + + using namespace logdemon; using vespalib::metrics::DummyMetricsManager; diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp index 893b8d1d2ca..9ecc57fd9de 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp @@ -79,6 +79,14 @@ struct TargetPoolTask : public FNET_Task { } }; +TransportConfig +toFNETConfig(const RPCNetworkParams & params) { + return TransportConfig() + .maxInputBufferSize(params.getMaxInputBufferSize()) + .maxOutputBufferSize(params.getMaxOutputBufferSize()) + .tcpNoDelay(params.getTcpNoDelay()); +} + } RPCNetwork::SendContext::SendContext(RPCNetwork &net, const Message &msg, @@ -117,7 +125,7 @@ RPCNetwork::RPCNetwork(const RPCNetworkParams ¶ms) : _owner(nullptr), _ident(params.getIdentity()), _threadPool(std::make_unique<FastOS_ThreadPool>(128000, 0)), - _transport(std::make_unique<FNET_Transport>()), + _transport(std::make_unique<FNET_Transport>(toFNETConfig(params))), _orb(std::make_unique<FRT_Supervisor>(_transport.get())), _scheduler(*_transport->GetScheduler()), _slobrokCfgFactory(std::make_unique<slobrok::ConfiguratorFactory>(params.getSlobrokConfig())), @@ -135,9 +143,6 @@ RPCNetwork::RPCNetwork(const RPCNetworkParams ¶ms) : _allowDispatchForEncode(params.getDispatchOnEncode()), _allowDispatchForDecode(params.getDispatchOnDecode()) { - _transport->SetMaxInputBufferSize(params.getMaxInputBufferSize()); - _transport->SetMaxOutputBufferSize(params.getMaxOutputBufferSize()); - _transport->SetTCPNoDelay(params.getTcpNoDelay()); } RPCNetwork::~RPCNetwork() diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/AllocatableClusterResources.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/AllocatableClusterResources.java index b1213b2da41..9eb4b796970 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/AllocatableClusterResources.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/AllocatableClusterResources.java @@ -26,20 +26,19 @@ public class AllocatableClusterResources { private final NodeResources realResources; private final NodeResources advertisedResources; - private final ClusterSpec.Type clusterType; + private final ClusterSpec clusterSpec; private final double fulfilment; /** Fake allocatable resources from requested capacity */ public AllocatableClusterResources(ClusterResources requested, - ClusterSpec.Type clusterType, - boolean exclusive, + ClusterSpec clusterSpec, NodeRepository nodeRepository) { this.nodes = requested.nodes(); this.groups = requested.groups(); - this.realResources = nodeRepository.resourcesCalculator().requestToReal(requested.nodeResources(), exclusive); + this.realResources = nodeRepository.resourcesCalculator().requestToReal(requested.nodeResources(), clusterSpec.isExclusive()); this.advertisedResources = requested.nodeResources(); - this.clusterType = clusterType; + this.clusterSpec = clusterSpec; this.fulfilment = 1; } @@ -48,19 +47,19 @@ public class AllocatableClusterResources { this.groups = (int)nodes.stream().map(node -> node.allocation().get().membership().cluster().group()).distinct().count(); this.realResources = averageRealResourcesOf(nodes, nodeRepository, exclusive); // Average since we average metrics over nodes this.advertisedResources = nodes.get(0).resources(); - this.clusterType = nodes.get(0).allocation().get().membership().cluster().type(); + this.clusterSpec = nodes.get(0).allocation().get().membership().cluster(); this.fulfilment = 1; } public AllocatableClusterResources(ClusterResources realResources, NodeResources advertisedResources, NodeResources idealResources, - ClusterSpec.Type clusterType) { + ClusterSpec clusterSpec) { this.nodes = realResources.nodes(); this.groups = realResources.groups(); this.realResources = realResources.nodeResources(); this.advertisedResources = advertisedResources; - this.clusterType = clusterType; + this.clusterSpec = clusterSpec; this.fulfilment = fulfilment(realResources.nodeResources(), idealResources); } @@ -88,7 +87,7 @@ public class AllocatableClusterResources { return (int)Math.ceil((double)nodes / groups); } - public ClusterSpec.Type clusterType() { return clusterType; } + public ClusterSpec clusterSpec() { return clusterSpec; } public double cost() { return nodes * advertisedResources.cost(); } @@ -133,23 +132,23 @@ public class AllocatableClusterResources { } public static Optional<AllocatableClusterResources> from(ClusterResources wantedResources, - boolean exclusive, - ClusterSpec.Type clusterType, + ClusterSpec clusterSpec, Limits applicationLimits, NodeRepository nodeRepository) { var systemLimits = new NodeResourceLimits(nodeRepository); - if ( !exclusive && !nodeRepository.zone().getCloud().dynamicProvisioning()) { + boolean exclusive = clusterSpec.isExclusive(); + if ( !clusterSpec.isExclusive() && !nodeRepository.zone().getCloud().dynamicProvisioning()) { // We decide resources: Add overhead to what we'll request (advertised) to make sure real becomes (at least) cappedNodeResources NodeResources advertisedResources = nodeRepository.resourcesCalculator().realToRequest(wantedResources.nodeResources(), exclusive); - advertisedResources = systemLimits.enlargeToLegal(advertisedResources, clusterType, exclusive); // Attempt to ask for something legal + advertisedResources = systemLimits.enlargeToLegal(advertisedResources, clusterSpec.type(), exclusive); // Attempt to ask for something legal advertisedResources = applicationLimits.cap(advertisedResources); // Overrides other conditions, even if it will then fail NodeResources realResources = nodeRepository.resourcesCalculator().requestToReal(advertisedResources, exclusive); // ... thus, what we really get may change - if ( ! systemLimits.isWithinRealLimits(realResources, clusterType)) return Optional.empty(); + if ( ! systemLimits.isWithinRealLimits(realResources, clusterSpec.type())) return Optional.empty(); if (matchesAny(nodeRepository.flavors().getFlavors(), advertisedResources)) return Optional.of(new AllocatableClusterResources(wantedResources.with(realResources), advertisedResources, wantedResources.nodeResources(), - clusterType)); + clusterSpec)); else return Optional.empty(); } @@ -172,11 +171,11 @@ public class AllocatableClusterResources { } if ( ! between(applicationLimits.min().nodeResources(), applicationLimits.max().nodeResources(), advertisedResources)) continue; - if ( ! systemLimits.isWithinRealLimits(realResources, clusterType)) continue; + if ( ! systemLimits.isWithinRealLimits(realResources, clusterSpec.type())) continue; var candidate = new AllocatableClusterResources(wantedResources.with(realResources), advertisedResources, wantedResources.nodeResources(), - clusterType); + clusterSpec); if (best.isEmpty() || candidate.preferableTo(best.get())) best = Optional.of(candidate); } diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/AllocationOptimizer.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/AllocationOptimizer.java index e57011b0e4a..fb97e803a35 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/AllocationOptimizer.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/AllocationOptimizer.java @@ -58,7 +58,7 @@ public class AllocationOptimizer { groups, nodeResourcesWith(nodesAdjustedForRedundancy, groupsAdjustedForRedundancy, limits, current, target)); - var allocatableResources = AllocatableClusterResources.from(next, exclusive, current.clusterType(), limits, nodeRepository); + var allocatableResources = AllocatableClusterResources.from(next, current.clusterSpec(), limits, nodeRepository); if (allocatableResources.isEmpty()) continue; if (bestAllocation.isEmpty() || allocatableResources.get().preferableTo(bestAllocation.get())) bestAllocation = allocatableResources; @@ -79,7 +79,7 @@ public class AllocationOptimizer { int groupSize = nodes / groups; - if (current.clusterType().isContent()) { // load scales with node share of content + if (current.clusterSpec().isStateful()) { // load scales with node share of content // The fixed cost portion of cpu does not scale with changes to the node count // TODO: Only for the portion of cpu consumed by queries double cpuPerGroup = fixedCpuCostFraction * target.nodeCpu() + diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/Autoscaler.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/Autoscaler.java index 023eb5860ee..8fcba452d26 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/Autoscaler.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/Autoscaler.java @@ -59,7 +59,7 @@ public class Autoscaler { } private Advice autoscale(Cluster cluster, List<Node> clusterNodes, Limits limits, boolean exclusive) { - ClusterSpec.Type clusterType = clusterNodes.get(0).allocation().get().membership().cluster().type(); + ClusterSpec clusterSpec = clusterNodes.get(0).allocation().get().membership().cluster(); if ( ! stable(clusterNodes, nodeRepository)) return Advice.none("Cluster change in progress"); @@ -69,7 +69,7 @@ public class Autoscaler { ClusterTimeseries clusterTimeseries = new ClusterTimeseries(cluster, clusterNodes, metricsDb, nodeRepository); int measurementsPerNode = clusterTimeseries.measurementsPerNode(); - if (measurementsPerNode < minimumMeasurementsPerNode(clusterType)) + if (measurementsPerNode < minimumMeasurementsPerNode(clusterSpec)) return Advice.none("Collecting more data before making new scaling decisions" + ": Has " + measurementsPerNode + " data points per node" + "(all: " + clusterTimeseries.measurementCount + @@ -124,14 +124,14 @@ public class Autoscaler { } private boolean recentlyScaled(Cluster cluster, List<Node> clusterNodes) { - Duration downscalingDelay = downscalingDelay(clusterNodes.get(0).allocation().get().membership().cluster().type()); + Duration downscalingDelay = downscalingDelay(clusterNodes.get(0).allocation().get().membership().cluster()); return cluster.lastScalingEvent().map(event -> event.at()).orElse(Instant.MIN) .isAfter(nodeRepository.clock().instant().minus(downscalingDelay)); } /** The duration of the window we need to consider to make a scaling decision. See also minimumMeasurementsPerNode */ - static Duration scalingWindow(ClusterSpec.Type clusterType) { - if (clusterType.isContent()) return Duration.ofHours(12); + static Duration scalingWindow(ClusterSpec cluster) { + if (cluster.isStateful()) return Duration.ofHours(12); return Duration.ofMinutes(30); } @@ -140,8 +140,8 @@ public class Autoscaler { } /** Measurements are currently taken once a minute. See also scalingWindow */ - static int minimumMeasurementsPerNode(ClusterSpec.Type clusterType) { - if (clusterType.isContent()) return 60; + static int minimumMeasurementsPerNode(ClusterSpec cluster) { + if (cluster.isStateful()) return 60; return 7; } @@ -149,8 +149,8 @@ public class Autoscaler { * We should wait a while before scaling down after a scaling event as a peak in usage * indicates more peaks may arrive in the near future. */ - static Duration downscalingDelay(ClusterSpec.Type clusterType) { - if (clusterType.isContent()) return Duration.ofHours(12); + static Duration downscalingDelay(ClusterSpec cluster) { + if (cluster.isStateful()) return Duration.ofHours(12); return Duration.ofHours(1); } diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/ClusterTimeseries.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/ClusterTimeseries.java index 1983162f121..2b4ba3fbbcb 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/ClusterTimeseries.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/ClusterTimeseries.java @@ -7,7 +7,6 @@ import com.yahoo.vespa.hosted.provision.NodeRepository; import com.yahoo.vespa.hosted.provision.applications.Cluster; import java.time.Instant; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -33,8 +32,8 @@ public class ClusterTimeseries { public ClusterTimeseries(Cluster cluster, List<Node> clusterNodes, MetricsDb db, NodeRepository nodeRepository) { this.clusterNodes = clusterNodes; - ClusterSpec.Type clusterType = clusterNodes.get(0).allocation().get().membership().cluster().type(); - var timeseries = db.getNodeTimeseries(nodeRepository.clock().instant().minus(Autoscaler.scalingWindow(clusterType)), + ClusterSpec clusterSpec = clusterNodes.get(0).allocation().get().membership().cluster(); + var timeseries = db.getNodeTimeseries(nodeRepository.clock().instant().minus(Autoscaler.scalingWindow(clusterSpec)), clusterNodes.stream().map(Node::hostname).collect(Collectors.toSet())); Map<String, Instant> startTimePerNode = metricStartTimes(cluster, clusterNodes, timeseries, nodeRepository); diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/MetricsReporter.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/MetricsReporter.java index 685fa3727a1..bcdcd9054a7 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/MetricsReporter.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/MetricsReporter.java @@ -262,19 +262,19 @@ public class MetricsReporter extends NodeRepositoryMaintainer { Metric.Context context = getContext(Map.of("lockPath", lockPath)); LatencyMetrics acquireLatencyMetrics = lockMetrics.getAndResetAcquireLatencyMetrics(); - metric.set("lockAttempt.acquireMaxActiveLatency", acquireLatencyMetrics.maxActiveLatencySeconds(), context); - metric.set("lockAttempt.acquireHz", acquireLatencyMetrics.startHz(), context); - metric.set("lockAttempt.acquireLoad", acquireLatencyMetrics.load(), context); + setNonZero("lockAttempt.acquireMaxActiveLatency", acquireLatencyMetrics.maxActiveLatencySeconds(), context); + setNonZero("lockAttempt.acquireHz", acquireLatencyMetrics.startHz(), context); + setNonZero("lockAttempt.acquireLoad", acquireLatencyMetrics.load(), context); LatencyMetrics lockedLatencyMetrics = lockMetrics.getAndResetLockedLatencyMetrics(); - metric.set("lockAttempt.lockedLatency", lockedLatencyMetrics.maxLatencySeconds(), context); - metric.set("lockAttempt.lockedLoad", lockedLatencyMetrics.load(), context); + setNonZero("lockAttempt.lockedLatency", lockedLatencyMetrics.maxLatencySeconds(), context); + setNonZero("lockAttempt.lockedLoad", lockedLatencyMetrics.load(), context); - metric.set("lockAttempt.acquireTimedOut", lockMetrics.getAndResetAcquireTimedOutCount(), context); - metric.set("lockAttempt.deadlock", lockMetrics.getAndResetDeadlockCount(), context); + setNonZero("lockAttempt.acquireTimedOut", lockMetrics.getAndResetAcquireTimedOutCount(), context); + setNonZero("lockAttempt.deadlock", lockMetrics.getAndResetDeadlockCount(), context); // bucket for various rare errors - to reduce #metrics - metric.set("lockAttempt.errors", + setNonZero("lockAttempt.errors", lockMetrics.getAndResetAcquireFailedCount() + lockMetrics.getAndResetReleaseFailedCount() + lockMetrics.getAndResetNakedReleaseCount() + @@ -284,6 +284,12 @@ public class MetricsReporter extends NodeRepositoryMaintainer { }); } + private void setNonZero(String key, Number value, Metric.Context context) { + if (Double.compare(value.doubleValue(), 0.0) != 0) { + metric.set(key, value, context); + } + } + private void updateDockerMetrics(NodeList nodes) { NodeResources totalCapacity = getCapacityTotal(nodes); metric.set("hostedVespa.docker.totalCapacityCpu", totalCapacity.vcpu(), null); diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodeAllocation.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodeAllocation.java index 68e11c4c995..bc164dc37e0 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodeAllocation.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodeAllocation.java @@ -203,7 +203,7 @@ class NodeAllocation { * Such nodes will be marked retired during finalization of the list of accepted nodes. * The conditions for this are: * - * This is a content or combined node. These must always be retired before being removed to allow the cluster to + * This is a stateful node. These must always be retired before being removed to allow the cluster to * migrate away data. * * This is a container node and it is not desired due to having the wrong flavor. In this case this @@ -218,7 +218,7 @@ class NodeAllocation { if (candidate.allocation().get().membership().retired()) return true; // don't second-guess if already retired if (! requestedNodes.considerRetiring()) return false; - return cluster.type().isContent() || + return cluster.isStateful() || (cluster.type() == ClusterSpec.Type.container && !hasCompatibleFlavor(candidate)); } diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodeRepositoryProvisioner.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodeRepositoryProvisioner.java index ede6f4ef250..a6d68243160 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodeRepositoryProvisioner.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodeRepositoryProvisioner.java @@ -29,7 +29,6 @@ import com.yahoo.vespa.hosted.provision.autoscale.AllocatableClusterResources; import com.yahoo.vespa.hosted.provision.autoscale.AllocationOptimizer; import com.yahoo.vespa.hosted.provision.autoscale.Limits; import com.yahoo.vespa.hosted.provision.autoscale.ResourceTarget; -import com.yahoo.vespa.hosted.provision.node.Agent; import com.yahoo.vespa.hosted.provision.node.Allocation; import com.yahoo.vespa.hosted.provision.node.filter.ApplicationFilter; import com.yahoo.vespa.hosted.provision.node.filter.NodeHostFilter; @@ -168,7 +167,7 @@ public class NodeRepositoryProvisioner implements Provisioner { boolean firstDeployment = nodes.isEmpty(); AllocatableClusterResources currentResources = firstDeployment // start at min, preserve current resources otherwise - ? new AllocatableClusterResources(requested.minResources(), clusterSpec.type(), clusterSpec.isExclusive(), nodeRepository) + ? new AllocatableClusterResources(requested.minResources(), clusterSpec, nodeRepository) : new AllocatableClusterResources(nodes, nodeRepository, clusterSpec.isExclusive()); return within(Limits.of(requested), clusterSpec.isExclusive(), currentResources, firstDeployment); } diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/restapi/NodesV2ApiTest.java b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/restapi/NodesV2ApiTest.java index 86427fe30ae..3945c518a77 100644 --- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/restapi/NodesV2ApiTest.java +++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/restapi/NodesV2ApiTest.java @@ -485,7 +485,7 @@ public class NodesV2ApiTest { "{\"message\":\"Moved host2.yahoo.com to parked\"}"); tester.assertResponse(new Request("http://localhost:8080/nodes/v2/state/ready/host2.yahoo.com", new byte[0], Request.Method.PUT), - 400, "{\"error-code\":\"BAD_REQUEST\",\"message\":\"Cannot make parked host host2.yahoo.com allocated to tenant2.application2.instance2 as 'content/id2/0/0' available for new allocation as it is not in state [dirty]\"}"); + 400, "{\"error-code\":\"BAD_REQUEST\",\"message\":\"Cannot make parked host host2.yahoo.com allocated to tenant2.application2.instance2 as 'content/id2/0/0/stateful' available for new allocation as it is not in state [dirty]\"}"); // (... while dirty then ready works (the ready move will be initiated by node maintenance)) assertResponse(new Request("http://localhost:8080/nodes/v2/state/dirty/host2.yahoo.com", new byte[0], Request.Method.PUT), @@ -502,7 +502,7 @@ public class NodesV2ApiTest { // Attempt to DELETE allocated node tester.assertResponse(new Request("http://localhost:8080/nodes/v2/node/host4.yahoo.com", new byte[0], Request.Method.DELETE), - 400, "{\"error-code\":\"BAD_REQUEST\",\"message\":\"active child node host4.yahoo.com allocated to tenant3.application3.instance3 as 'content/id3/0/0' is currently allocated and cannot be removed\"}"); + 400, "{\"error-code\":\"BAD_REQUEST\",\"message\":\"active child node host4.yahoo.com allocated to tenant3.application3.instance3 as 'content/id3/0/0/stateful' is currently allocated and cannot be removed\"}"); // PUT current restart generation with string instead of long tester.assertResponse(new Request("http://localhost:8080/nodes/v2/node/host4.yahoo.com", diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/restapi/responses/capacity-zone.json b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/restapi/responses/capacity-zone.json index 7104957ba98..c3857e9c8ee 100644 --- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/restapi/responses/capacity-zone.json +++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/restapi/responses/capacity-zone.json @@ -4,7 +4,7 @@ "failedTenantParent": "dockerhost1.yahoo.com", "failedTenant": "host4.yahoo.com", "failedTenantResources": "[vcpu: 1.0, memory: 4.0 Gb, disk 100.0 Gb, bandwidth: 1.0 Gbps, storage type: local]", - "failedTenantAllocation": "allocated to tenant3.application3.instance3 as 'content/id3/0/0'", + "failedTenantAllocation": "allocated to tenant3.application3.instance3 as 'content/id3/0/0/stateful'", "hostCandidateRejectionReasons": { "singularReasonFailures": { "insufficientVcpu": 0, diff --git a/searchcore/src/apps/vespa-proton-cmd/vespa-proton-cmd.cpp b/searchcore/src/apps/vespa-proton-cmd/vespa-proton-cmd.cpp index 8c7e6f13c18..8b6203c78a2 100644 --- a/searchcore/src/apps/vespa-proton-cmd/vespa-proton-cmd.cpp +++ b/searchcore/src/apps/vespa-proton-cmd/vespa-proton-cmd.cpp @@ -3,7 +3,8 @@ #include <vespa/slobrok/sbmirror.h> #include <vespa/config/common/configsystem.h> #include <vespa/config/common/exceptions.h> -#include <vespa/fnet/frt/frt.h> +#include <vespa/fnet/frt/supervisor.h> +#include <vespa/fnet/frt/target.h> #include <vespa/vespalib/util/host_name.h> #include <vespa/vespalib/util/stringfmt.h> #include <vespa/vespalib/util/time.h> @@ -28,10 +29,12 @@ private: FRT_RPCRequest *_req; public: - App() : _frt(), - _target(nullptr), - _req(nullptr) {} - virtual ~App() + App() + : _frt(), + _target(nullptr), + _req(nullptr) + {} + ~App() override { assert(!_frt); assert(_target == nullptr); diff --git a/searchlib/src/test/java/com/yahoo/searchlib/tensor/TensorConformanceTest.java b/searchlib/src/test/java/com/yahoo/searchlib/tensor/TensorConformanceTest.java deleted file mode 100644 index 61aec069c72..00000000000 --- a/searchlib/src/test/java/com/yahoo/searchlib/tensor/TensorConformanceTest.java +++ /dev/null @@ -1,142 +0,0 @@ -// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.searchlib.tensor; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.yahoo.io.GrowableByteBuffer; -import com.yahoo.searchlib.rankingexpression.RankingExpression; -import com.yahoo.searchlib.rankingexpression.evaluation.BooleanValue; -import com.yahoo.searchlib.rankingexpression.evaluation.DoubleCompatibleValue; -import com.yahoo.searchlib.rankingexpression.evaluation.MapContext; -import com.yahoo.searchlib.rankingexpression.evaluation.StringValue; -import com.yahoo.searchlib.rankingexpression.evaluation.TensorValue; -import com.yahoo.searchlib.rankingexpression.evaluation.Value; -import com.yahoo.searchlib.rankingexpression.parser.ParseException; -import com.yahoo.tensor.Tensor; -import com.yahoo.tensor.serialization.TypedBinaryFormat; -import org.junit.Assert; -import org.junit.Test; - -import java.io.BufferedReader; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.FileReader; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Optional; - -import static org.junit.Assert.assertEquals; - -public class TensorConformanceTest { - - private static String testPath = "eval/src/apps/tensor_conformance/test_spec.json"; - - @Test - public void testConformance() throws IOException { - File testSpec = new File(testPath); - if (!testSpec.exists()) { - testSpec = new File("../" + testPath); - } - int count = 0; - List<Integer> failList = new ArrayList<>(); - - try(BufferedReader br = new BufferedReader(new FileReader(testSpec))) { - String test = br.readLine(); - while (test != null) { - boolean success = testCase(test, count); - if (!success) { - failList.add(count); - } - test = br.readLine(); - count++; - } - } - assertEquals(failList.size() + " conformance test fails: " + failList, 0, failList.size()); - } - - private boolean testCase(String test, int count) { - try { - ObjectMapper mapper = new ObjectMapper(); - JsonNode node = mapper.readTree(test); - - if (node.has("num_tests")) { - Assert.assertEquals(node.get("num_tests").asInt(), count); - return true; - } - if (!node.has("expression")) { - return true; // ignore - } - - String expression = node.get("expression").asText(); - MapContext context = getInput(node.get("inputs")); - Tensor expect = getTensor(node.get("result").get("expect").asText()); - Tensor result = evaluate(expression, context); - boolean equals = Tensor.equals(result, expect); - if (!equals) { - System.out.println(count + " : Tensors not equal. Result: " + result.toString() + " Expected: " + expect.toString() + " -> expression \"" + expression + "\""); - } else if (! result.type().valueType().equals(expect.type().valueType())) { - System.out.println(count + " : Tensor cell value types not equal. Result: " + result.type() + " Expected: " + expect.type() + " -> expression \"" + expression + "\""); - equals = false; - } - return equals; - - } catch (Exception e) { - System.out.println(count + " : " + e.toString()); - } - return false; - } - - private Tensor evaluate(String expression, MapContext context) throws ParseException { - Value value = new RankingExpression(expression).evaluate(context); - if (!(value instanceof TensorValue)) { - throw new IllegalArgumentException("Result is not a tensor"); - } - return ((TensorValue)value).asTensor(); - } - - private MapContext getInput(JsonNode inputs) { - MapContext context = new MapContext(); - for (Iterator<String> i = inputs.fieldNames(); i.hasNext(); ) { - String name = i.next(); - String value = inputs.get(name).asText(); - Tensor tensor = getTensor(value); - context.put(name, new TensorValue(tensor)); - } - return context; - } - - private Tensor getTensor(String binaryRepresentation) { - byte[] bin = getBytes(binaryRepresentation); - return TypedBinaryFormat.decode(Optional.empty(), GrowableByteBuffer.wrap(bin)); - } - - private byte[] getBytes(String binaryRepresentation) { - return parseHexValue(binaryRepresentation.substring(2)); - } - - private byte[] parseHexValue(String s) { - final int len = s.length(); - byte[] bytes = new byte[len/2]; - for (int i = 0; i < len; i += 2) { - int c1 = hexValue(s.charAt(i)) << 4; - int c2 = hexValue(s.charAt(i + 1)); - bytes[i/2] = (byte)(c1 + c2); - } - return bytes; - } - - private int hexValue(Character c) { - if (c >= 'a' && c <= 'f') { - return c - 'a' + 10; - } else if (c >= 'A' && c <= 'F') { - return c - 'A' + 10; - } else if (c >= '0' && c <= '9') { - return c - '0'; - } - throw new IllegalArgumentException("Hex contains illegal characters"); - } - -} - diff --git a/searchlib/src/tests/attribute/tensorattribute/tensorattribute_test.cpp b/searchlib/src/tests/attribute/tensorattribute/tensorattribute_test.cpp index daa85c91b2c..bf46a2cc7d0 100644 --- a/searchlib/src/tests/attribute/tensorattribute/tensorattribute_test.cpp +++ b/searchlib/src/tests/attribute/tensorattribute/tensorattribute_test.cpp @@ -16,7 +16,7 @@ #include <vespa/searchlib/tensor/nearest_neighbor_index.h> #include <vespa/searchlib/tensor/nearest_neighbor_index_factory.h> #include <vespa/searchlib/tensor/nearest_neighbor_index_saver.h> -#include <vespa/searchlib/tensor/serialized_tensor_attribute.h> +#include <vespa/searchlib/tensor/serialized_fast_value_attribute.h> #include <vespa/searchlib/tensor/tensor_attribute.h> #include <vespa/searchlib/test/directory_handler.h> #include <vespa/searchlib/util/fileutil.h> @@ -40,7 +40,7 @@ using search::tensor::DefaultNearestNeighborIndexFactory; using search::tensor::DenseTensorAttribute; using search::tensor::DirectTensorAttribute; using search::tensor::DocVectorAccess; -using search::tensor::SerializedTensorAttribute; +using search::tensor::SerializedFastValueAttribute; using search::tensor::HnswIndex; using search::tensor::HnswNode; using search::tensor::NearestNeighborIndex; @@ -344,7 +344,7 @@ struct Fixture { } else if (_traits.use_direct_tensor_attribute) { return std::make_shared<DirectTensorAttribute>(_name, _cfg); } else { - return std::make_shared<SerializedTensorAttribute>(_name, _cfg); + return std::make_shared<SerializedFastValueAttribute>(_name, _cfg); } } diff --git a/searchlib/src/tests/engine/proto_rpc_adapter/proto_rpc_adapter_test.cpp b/searchlib/src/tests/engine/proto_rpc_adapter/proto_rpc_adapter_test.cpp index 3dbe0d00881..1cd91329912 100644 --- a/searchlib/src/tests/engine/proto_rpc_adapter/proto_rpc_adapter_test.cpp +++ b/searchlib/src/tests/engine/proto_rpc_adapter/proto_rpc_adapter_test.cpp @@ -6,7 +6,9 @@ #include <vespa/searchlib/engine/searchapi.h> #include <vespa/searchlib/engine/docsumapi.h> #include <vespa/searchlib/engine/monitorapi.h> -#include <vespa/fnet/frt/frt.h> +#include <vespa/fnet/frt/supervisor.h> +#include <vespa/fnet/frt/target.h> +#include <vespa/fnet/frt/rpcrequest.h> #include <vespa/vespalib/data/slime/slime.h> #include <vespa/vespalib/data/slime/binary_format.h> #include <thread> @@ -83,7 +85,7 @@ struct ProtoRpcAdapterTest : ::testing::Test { FRT_Target *connect() { return server.supervisor().GetTarget(server.supervisor().GetListenPort()); } - ~ProtoRpcAdapterTest() = default; + ~ProtoRpcAdapterTest() override = default; }; //----------------------------------------------------------------------------- diff --git a/searchlib/src/vespa/searchlib/attribute/createsinglestd.cpp b/searchlib/src/vespa/searchlib/attribute/createsinglestd.cpp index 148d18f79ff..29033944d4b 100644 --- a/searchlib/src/vespa/searchlib/attribute/createsinglestd.cpp +++ b/searchlib/src/vespa/searchlib/attribute/createsinglestd.cpp @@ -7,8 +7,10 @@ #include "singlenumericattribute.hpp" #include "singlestringattribute.h" #include "singleboolattribute.h" +#include <vespa/eval/eval/engine_or_factory.h> #include <vespa/searchlib/tensor/dense_tensor_attribute.h> #include <vespa/searchlib/tensor/serialized_tensor_attribute.h> +#include <vespa/searchlib/tensor/serialized_fast_value_attribute.h> namespace search { @@ -45,6 +47,8 @@ AttributeFactory::createSingleStd(stringref name, const Config & info) case BasicType::TENSOR: if (info.tensorType().is_dense()) { return std::make_shared<tensor::DenseTensorAttribute>(name, info); + } else if (vespalib::eval::EngineOrFactory::get().is_factory()) { + return std::make_shared<tensor::SerializedFastValueAttribute>(name, info); } else { return std::make_shared<tensor::SerializedTensorAttribute>(name, info); } diff --git a/searchlib/src/vespa/searchlib/tensor/CMakeLists.txt b/searchlib/src/vespa/searchlib/tensor/CMakeLists.txt index fac6d015a5f..79b18a57a34 100644 --- a/searchlib/src/vespa/searchlib/tensor/CMakeLists.txt +++ b/searchlib/src/vespa/searchlib/tensor/CMakeLists.txt @@ -25,5 +25,8 @@ vespa_add_library(searchlib_tensor OBJECT tensor_attribute.cpp tensor_deserialize.cpp tensor_store.cpp + serialized_fast_value_attribute.cpp + streamed_value_saver.cpp + streamed_value_store.cpp DEPENDS ) diff --git a/searchlib/src/vespa/searchlib/tensor/serialized_fast_value_attribute.cpp b/searchlib/src/vespa/searchlib/tensor/serialized_fast_value_attribute.cpp new file mode 100644 index 00000000000..6e1fb1a0a2f --- /dev/null +++ b/searchlib/src/vespa/searchlib/tensor/serialized_fast_value_attribute.cpp @@ -0,0 +1,234 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "serialized_fast_value_attribute.h" +#include "streamed_value_saver.h" +#include <vespa/eval/eval/value.h> +#include <vespa/eval/eval/fast_value.hpp> +#include <vespa/eval/streamed/streamed_value_utils.h> +#include <vespa/fastlib/io/bufferedfile.h> +#include <vespa/searchlib/attribute/readerbase.h> +#include <vespa/searchlib/util/fileutil.h> +#include <vespa/vespalib/util/rcuvector.hpp> +#include <vespa/log/log.h> + +LOG_SETUP(".searchlib.tensor.serialized_fast_value_attribute"); + +#include "blob_sequence_reader.h" +#include "tensor_attribute.hpp" + +using namespace vespalib; +using namespace vespalib::eval; + +namespace search::tensor { + +namespace { + +struct ValueBlock : LabelBlock { + TypedCells cells; +}; + +class ValueBlockStream { +private: + const StreamedValueStore::DataFromType &_from_type; + LabelBlockStream _label_block_stream; + const char *_cells_ptr; + + size_t dsss() const { return _from_type.dense_subspace_size; } + auto cell_type() const { return _from_type.cell_type; } +public: + ValueBlock next_block() { + auto labels = _label_block_stream.next_block(); + if (labels) { + TypedCells subspace_cells(_cells_ptr, cell_type(), dsss()); + _cells_ptr += CellTypeUtils::mem_size(cell_type(), dsss()); + return ValueBlock{labels, subspace_cells}; + } else { + TypedCells none(nullptr, cell_type(), 0); + return ValueBlock{labels, none}; + } + } + + ValueBlockStream(const StreamedValueStore::DataFromType &from_type, + const StreamedValueStore::StreamedValueData &from_store) + : _from_type(from_type), + _label_block_stream(from_store.num_subspaces, + from_store.labels_buffer, + from_type.num_mapped_dimensions), + _cells_ptr((const char *)from_store.cells_ref.data) + { + _label_block_stream.reset(); + } + + ~ValueBlockStream(); +}; + +ValueBlockStream::~ValueBlockStream() = default; + +void report_problematic_subspace(size_t idx, + const StreamedValueStore::DataFromType &from_type, + const StreamedValueStore::StreamedValueData &from_store) +{ + LOG(error, "PROBLEM: add_mapping returned same index=%zu twice", idx); + FastValueIndex temp_index(from_type.num_mapped_dimensions, + from_store.num_subspaces); + auto from_start = ValueBlockStream(from_type, from_store); + while (auto redo_block = from_start.next_block()) { + if (idx == temp_index.map.add_mapping(redo_block.address)) { + vespalib::string msg = "Block with address[ "; + for (vespalib::stringref ref : redo_block.address) { + msg.append("'").append(ref).append("' "); + } + msg.append("]"); + LOG(error, "%s maps to subspace %zu", msg.c_str(), idx); + } + } +} + +/** + * This Value implementation is almost exactly like FastValue, but + * instead of owning its type and cells it just has a reference to + * data stored elsewhere. + * XXX: we should find a better name for this, and move it + * (together with the helper classes above) to its own file, + * and add associated unit tests. + **/ +class OnlyFastValueIndex : public Value { +private: + const ValueType &_type; + TypedCells _cells; + FastValueIndex my_index; +public: + OnlyFastValueIndex(const ValueType &type, + const StreamedValueStore::DataFromType &from_type, + const StreamedValueStore::StreamedValueData &from_store) + : _type(type), + _cells(from_store.cells_ref), + my_index(from_type.num_mapped_dimensions, + from_store.num_subspaces) + { + assert(_type.cell_type() == _cells.type); + std::vector<vespalib::stringref> address(from_type.num_mapped_dimensions); + auto block_stream = ValueBlockStream(from_type, from_store); + size_t ss = 0; + while (auto block = block_stream.next_block()) { + size_t idx = my_index.map.add_mapping(block.address); + if (idx != ss) { + report_problematic_subspace(idx, from_type, from_store); + } + ++ss; + } + assert(ss == from_store.num_subspaces); + } + + + ~OnlyFastValueIndex(); + + const ValueType &type() const final override { return _type; } + TypedCells cells() const final override { return _cells; } + const Index &index() const final override { return my_index; } + vespalib::MemoryUsage get_memory_usage() const final override { + auto usage = self_memory_usage<OnlyFastValueIndex>(); + usage.merge(my_index.map.estimate_extra_memory_usage()); + return usage; + } +}; + +OnlyFastValueIndex::~OnlyFastValueIndex() = default; + +} + +SerializedFastValueAttribute::SerializedFastValueAttribute(stringref name, const Config &cfg) + : TensorAttribute(name, cfg, _streamedValueStore), + _tensor_type(cfg.tensorType()), + _streamedValueStore(_tensor_type), + _data_from_type(_tensor_type) +{ +} + + +SerializedFastValueAttribute::~SerializedFastValueAttribute() +{ + getGenerationHolder().clearHoldLists(); + _tensorStore.clearHoldLists(); +} + +void +SerializedFastValueAttribute::setTensor(DocId docId, const vespalib::eval::Value &tensor) +{ + checkTensorType(tensor); + EntryRef ref = _streamedValueStore.store_tensor(tensor); + assert(ref.valid()); + setTensorRef(docId, ref); +} + +std::unique_ptr<Value> +SerializedFastValueAttribute::getTensor(DocId docId) const +{ + EntryRef ref; + if (docId < getCommittedDocIdLimit()) { + ref = _refVector[docId]; + } + if (!ref.valid()) { + return {}; + } + if (auto data_from_store = _streamedValueStore.get_tensor_data(ref)) { + return std::make_unique<OnlyFastValueIndex>(_tensor_type, + _data_from_type, + data_from_store); + } + return {}; +} + +bool +SerializedFastValueAttribute::onLoad() +{ + BlobSequenceReader tensorReader(*this); + if (!tensorReader.hasData()) { + return false; + } + setCreateSerialNum(tensorReader.getCreateSerialNum()); + assert(tensorReader.getVersion() == getVersion()); + uint32_t numDocs(tensorReader.getDocIdLimit()); + _refVector.reset(); + _refVector.unsafe_reserve(numDocs); + vespalib::Array<char> buffer(1024); + for (uint32_t lid = 0; lid < numDocs; ++lid) { + uint32_t tensorSize = tensorReader.getNextSize(); + if (tensorSize != 0) { + if (tensorSize > buffer.size()) { + buffer.resize(tensorSize + 1024); + } + tensorReader.readBlob(&buffer[0], tensorSize); + vespalib::nbostream source(&buffer[0], tensorSize); + EntryRef ref = _streamedValueStore.store_encoded_tensor(source); + _refVector.push_back(ref); + } else { + EntryRef invalid; + _refVector.push_back(invalid); + } + } + setNumDocs(numDocs); + setCommittedDocIdLimit(numDocs); + return true; +} + + +std::unique_ptr<AttributeSaver> +SerializedFastValueAttribute::onInitSave(vespalib::stringref fileName) +{ + vespalib::GenerationHandler::Guard guard(getGenerationHandler(). + takeGuard()); + return std::make_unique<StreamedValueSaver> + (std::move(guard), + this->createAttributeHeader(fileName), + getRefCopy(), + _streamedValueStore); +} + +void +SerializedFastValueAttribute::compactWorst() +{ + doCompactWorst<StreamedValueStore::RefType>(); +} + +} diff --git a/searchlib/src/vespa/searchlib/tensor/serialized_fast_value_attribute.h b/searchlib/src/vespa/searchlib/tensor/serialized_fast_value_attribute.h new file mode 100644 index 00000000000..a8c1df4913a --- /dev/null +++ b/searchlib/src/vespa/searchlib/tensor/serialized_fast_value_attribute.h @@ -0,0 +1,33 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "tensor_attribute.h" +#include "streamed_value_store.h" + +namespace search::tensor { + +/** + * Attribute vector class storing serialized tensors for all documents in memory. + * + * When fetching a tensor with getTensor(docId) the returned Value + * will have a FastValueIndex (constructed on the fly) for its sparse + * mapping, but refer to a common type, while cells() will refer to + * memory in the serialized store without copying. + * + */ +class SerializedFastValueAttribute : public TensorAttribute { + vespalib::eval::ValueType _tensor_type; + StreamedValueStore _streamedValueStore; // data store for serialized tensors + const StreamedValueStore::DataFromType _data_from_type; +public: + SerializedFastValueAttribute(vespalib::stringref baseFileName, const Config &cfg); + virtual ~SerializedFastValueAttribute(); + virtual void setTensor(DocId docId, const vespalib::eval::Value &tensor) override; + virtual std::unique_ptr<vespalib::eval::Value> getTensor(DocId docId) const override; + virtual bool onLoad() override; + virtual std::unique_ptr<AttributeSaver> onInitSave(vespalib::stringref fileName) override; + virtual void compactWorst() override; +}; + +} diff --git a/searchlib/src/vespa/searchlib/tensor/streamed_value_saver.cpp b/searchlib/src/vespa/searchlib/tensor/streamed_value_saver.cpp new file mode 100644 index 00000000000..d4fd681f2cb --- /dev/null +++ b/searchlib/src/vespa/searchlib/tensor/streamed_value_saver.cpp @@ -0,0 +1,48 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "streamed_value_saver.h" +#include "streamed_value_store.h" + +#include <vespa/searchlib/attribute/iattributesavetarget.h> +#include <vespa/searchlib/util/bufferwriter.h> +#include <vespa/vespalib/objects/nbostream.h> + +using vespalib::GenerationHandler; + +namespace search::tensor { + +StreamedValueSaver:: +StreamedValueSaver(GenerationHandler::Guard &&guard, + const attribute::AttributeHeader &header, + RefCopyVector &&refs, + const StreamedValueStore &tensorStore) + : AttributeSaver(std::move(guard), header), + _refs(std::move(refs)), + _tensorStore(tensorStore) +{ +} + +StreamedValueSaver::~StreamedValueSaver() = default; + +bool +StreamedValueSaver::onSave(IAttributeSaveTarget &saveTarget) +{ + auto datWriter = saveTarget.datWriter().allocBufferWriter(); + const uint32_t docIdLimit(_refs.size()); + vespalib::nbostream stream; + for (uint32_t lid = 0; lid < docIdLimit; ++lid) { + if (_tensorStore.encode_tensor(_refs[lid], stream)) { + uint32_t sz = stream.size(); + datWriter->write(&sz, sizeof(sz)); + datWriter->write(stream.peek(), stream.size()); + stream.clear(); + } else { + uint32_t sz = 0; + datWriter->write(&sz, sizeof(sz)); + } + } + datWriter->flush(); + return true; +} + +} // namespace search::tensor diff --git a/searchlib/src/vespa/searchlib/tensor/streamed_value_saver.h b/searchlib/src/vespa/searchlib/tensor/streamed_value_saver.h new file mode 100644 index 00000000000..71d56539679 --- /dev/null +++ b/searchlib/src/vespa/searchlib/tensor/streamed_value_saver.h @@ -0,0 +1,35 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vespa/searchlib/attribute/attributesaver.h> +#include "tensor_attribute.h" + +namespace search::tensor { + +class StreamedValueStore; + +/* + * Class for saving a tensor attribute. + */ +class StreamedValueSaver : public AttributeSaver +{ +public: + using RefCopyVector = TensorAttribute::RefCopyVector; +private: + using GenerationHandler = vespalib::GenerationHandler; + + RefCopyVector _refs; + const StreamedValueStore &_tensorStore; + + bool onSave(IAttributeSaveTarget &saveTarget) override; +public: + StreamedValueSaver(GenerationHandler::Guard &&guard, + const attribute::AttributeHeader &header, + RefCopyVector &&refs, + const StreamedValueStore &tensorStore); + + virtual ~StreamedValueSaver(); +}; + +} // namespace search::tensor diff --git a/searchlib/src/vespa/searchlib/tensor/streamed_value_store.cpp b/searchlib/src/vespa/searchlib/tensor/streamed_value_store.cpp new file mode 100644 index 00000000000..ae2e0e7ed10 --- /dev/null +++ b/searchlib/src/vespa/searchlib/tensor/streamed_value_store.cpp @@ -0,0 +1,228 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "streamed_value_store.h" +#include "tensor_deserialize.h" +#include <vespa/eval/eval/value.h> +#include <vespa/eval/eval/value_codec.h> +#include <vespa/eval/streamed/streamed_value_builder_factory.h> +#include <vespa/eval/streamed/streamed_value_view.h> +#include <vespa/vespalib/datastore/datastore.hpp> +#include <vespa/vespalib/objects/nbostream.h> +#include <vespa/vespalib/util/stringfmt.h> +#include <vespa/log/log.h> + +LOG_SETUP(".searchlib.tensor.streamed_value_store"); + +using vespalib::datastore::Handle; +using namespace vespalib::eval; + +namespace search::tensor { + +namespace { + +constexpr size_t MIN_BUFFER_ARRAYS = 1024; + +struct CellsMemBlock { + uint32_t num; + uint32_t total_sz; + const char *ptr; + CellsMemBlock(TypedCells cells) + : num(cells.size), + total_sz(CellTypeUtils::mem_size(cells.type, num)), + ptr((const char *)cells.data) + {} +}; + +template<typename T> +T *fix_alignment(T *ptr, size_t align) +{ + static_assert(sizeof(T) == 1); + assert((align & (align-1)) == 0); // must be 2^N + size_t ptr_val = (size_t)ptr; + size_t unalign = ptr_val & (align - 1); + if (unalign == 0) { + return ptr; + } else { + return ptr + (align - unalign); + } +} + +} // namespace <unnamed> + +StreamedValueStore::StreamedValueStore(const ValueType &tensor_type) + : TensorStore(_concreteStore), + _concreteStore(), + _bufferType(RefType::align(1), + MIN_BUFFER_ARRAYS, + RefType::offsetSize() / RefType::align(1)), + _tensor_type(tensor_type), + _data_from_type(_tensor_type) +{ + _store.addType(&_bufferType); + _store.initActiveBuffers(); +} + +StreamedValueStore::~StreamedValueStore() +{ + _store.dropBuffers(); +} + +std::pair<const char *, uint32_t> +StreamedValueStore::getRawBuffer(RefType ref) const +{ + if (!ref.valid()) { + return std::make_pair(nullptr, 0u); + } + const char *buf = _store.getEntry<char>(ref); + uint32_t len = *reinterpret_cast<const uint32_t *>(buf); + return std::make_pair(buf + sizeof(uint32_t), len); +} + +Handle<char> +StreamedValueStore::allocRawBuffer(uint32_t size) +{ + if (size == 0) { + return Handle<char>(); + } + size_t extSize = size + sizeof(uint32_t); + size_t bufSize = RefType::align(extSize); + auto result = _concreteStore.rawAllocator<char>(_typeId).alloc(bufSize); + *reinterpret_cast<uint32_t *>(result.data) = size; + char *padWritePtr = result.data + extSize; + for (size_t i = extSize; i < bufSize; ++i) { + *padWritePtr++ = 0; + } + // Hide length of buffer (first 4 bytes) from users of the buffer. + return Handle<char>(result.ref, result.data + sizeof(uint32_t)); +} + +void +StreamedValueStore::holdTensor(EntryRef ref) +{ + if (!ref.valid()) { + return; + } + RefType iRef(ref); + const char *buf = _store.getEntry<char>(iRef); + uint32_t len = *reinterpret_cast<const uint32_t *>(buf); + _concreteStore.holdElem(ref, len + sizeof(uint32_t)); +} + +TensorStore::EntryRef +StreamedValueStore::move(EntryRef ref) +{ + if (!ref.valid()) { + return RefType(); + } + auto oldraw = getRawBuffer(ref); + auto newraw = allocRawBuffer(oldraw.second); + memcpy(newraw.data, oldraw.first, oldraw.second); + _concreteStore.holdElem(ref, oldraw.second + sizeof(uint32_t)); + return newraw.ref; +} + +StreamedValueStore::StreamedValueData +StreamedValueStore::get_tensor_data(EntryRef ref) const +{ + StreamedValueData retval; + retval.valid = false; + auto raw = getRawBuffer(ref); + if (raw.second == 0u) { + return retval; + } + vespalib::nbostream source(raw.first, raw.second); + uint32_t num_cells = source.readValue<uint32_t>(); + { + uint32_t alignment = CellTypeUtils::alignment(_data_from_type.cell_type); + const char *aligned_ptr = fix_alignment(source.peek(), alignment); + size_t adjustment = aligned_ptr - source.peek(); + source.adjustReadPos(adjustment); + } + retval.cells_ref = TypedCells(source.peek(), _data_from_type.cell_type, num_cells); + source.adjustReadPos(CellTypeUtils::mem_size(_data_from_type.cell_type, num_cells)); + retval.num_subspaces = source.readValue<uint32_t>(); + retval.labels_buffer = vespalib::ConstArrayRef<char>(source.peek(), source.size()); + assert(retval.num_subspaces * _data_from_type.dense_subspace_size == num_cells); + retval.valid = true; + return retval; +} + +bool +StreamedValueStore::encode_tensor(EntryRef ref, vespalib::nbostream &target) const +{ + if (auto data = get_tensor_data(ref)) { + StreamedValueView value( + _tensor_type, _data_from_type.num_mapped_dimensions, + data.cells_ref, data.num_subspaces, data.labels_buffer); + vespalib::eval::encode_value(value, target); + return true; + } else { + return false; + } +} + +void +StreamedValueStore::serialize_labels(const Value::Index &index, + vespalib::nbostream &target) const +{ + uint32_t num_subspaces = index.size(); + target << num_subspaces; + uint32_t num_mapped_dims = _data_from_type.num_mapped_dimensions; + std::vector<vespalib::stringref> labels(num_mapped_dims * num_subspaces); + auto view = index.create_view({}); + view->lookup({}); + std::vector<vespalib::stringref> addr(num_mapped_dims); + std::vector<vespalib::stringref *> addr_refs; + for (auto & label : addr) { + addr_refs.push_back(&label); + } + size_t subspace; + for (size_t ss = 0; ss < num_subspaces; ++ss) { + bool ok = view->next_result(addr_refs, subspace); + assert(ok); + size_t idx = subspace * num_mapped_dims; + for (auto label : addr) { + labels[idx++] = label; + } + } + bool ok = view->next_result(addr_refs, subspace); + assert(!ok); + for (auto label : labels) { + target.writeSmallString(label); + } +} + +TensorStore::EntryRef +StreamedValueStore::store_tensor(const Value &tensor) +{ + assert(tensor.type() == _tensor_type); + CellsMemBlock cells_mem(tensor.cells()); + size_t alignment = CellTypeUtils::alignment(_data_from_type.cell_type); + size_t padding = alignment - 1; + vespalib::nbostream stream; + stream << uint32_t(cells_mem.num); + serialize_labels(tensor.index(), stream); + size_t mem_size = stream.size() + cells_mem.total_sz + padding; + auto raw = allocRawBuffer(mem_size); + char *target = raw.data; + memcpy(target, stream.peek(), sizeof(uint32_t)); + stream.adjustReadPos(sizeof(uint32_t)); + target += sizeof(uint32_t); + target = fix_alignment(target, alignment); + memcpy(target, cells_mem.ptr, cells_mem.total_sz); + target += cells_mem.total_sz; + memcpy(target, stream.peek(), stream.size()); + target += stream.size(); + assert(target <= raw.data + mem_size); + return raw.ref; +} + +TensorStore::EntryRef +StreamedValueStore::store_encoded_tensor(vespalib::nbostream &encoded) +{ + const auto &factory = StreamedValueBuilderFactory::get(); + auto val = vespalib::eval::decode_value(encoded, factory); + return store_tensor(*val); +} + +} diff --git a/searchlib/src/vespa/searchlib/tensor/streamed_value_store.h b/searchlib/src/vespa/searchlib/tensor/streamed_value_store.h new file mode 100644 index 00000000000..4e12296916d --- /dev/null +++ b/searchlib/src/vespa/searchlib/tensor/streamed_value_store.h @@ -0,0 +1,98 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "tensor_store.h" +#include <vespa/eval/eval/value_type.h> +#include <vespa/eval/eval/value.h> +#include <vespa/vespalib/objects/nbostream.h> +#include <vespa/vespalib/util/typify.h> + +namespace search::tensor { + +/** + * Class for storing tensors in memory, with a special serialization + * format that can be used directly to make a StreamedValueView. + * + * The tensor type is owned by the store itself and will not be + * serialized at all. + * + * The parameters for serialization (see DataFromType) are: + * - number of mapped dimensions [MD] + * - dense subspace size [DS] + * - size of each cell [CS] - currently 4 (float) or 8 (double) + * - alignment for cells [CA] - currently 4 (float) or 8 (double) + * While the tensor value to be serialized has: + * - number of dense subspaces [ND] + * - labels for dense subspaces, ND * MD strings + * - cell values, ND * DS cells (each either float or double) + * The serialization format looks like: + * + * [bytes] : [format] : [description] + * 4 : n.b.o. uint32_ t : num cells = ND * DS + * 1-7 : (none) : padding to cell alignment CA + * CS * ND * DS : native float or double : cells + * 4 : n.b.o. uint32_t : number of subspaces = ND + * (depends) : n.b.o. strings : labels + * + * Here, n.b.o. means network byte order, or more precisely + * it's the format vespalib::nbostream uses for the given data type, + * including strings (where exact format depends on the string length). + * Note that the only unpredictably-sized data (the labels) are kept + * last. + * If we ever make a "hbostream" which uses host byte order, we + * could switch to that instead since these data are only kept in + * memory. + */ +class StreamedValueStore : public TensorStore { +public: + using RefType = vespalib::datastore::AlignedEntryRefT<22, 3>; + using DataStoreType = vespalib::datastore::DataStoreT<RefType>; + + struct StreamedValueData { + bool valid; + vespalib::eval::TypedCells cells_ref; + size_t num_subspaces; + vespalib::ConstArrayRef<char> labels_buffer; + operator bool() const { return valid; } + }; + + struct DataFromType { + uint32_t num_mapped_dimensions; + uint32_t dense_subspace_size; + vespalib::eval::CellType cell_type; + + DataFromType(const vespalib::eval::ValueType& type) + : num_mapped_dimensions(type.count_mapped_dimensions()), + dense_subspace_size(type.dense_subspace_size()), + cell_type(type.cell_type()) + {} + }; + +private: + DataStoreType _concreteStore; + vespalib::datastore::BufferType<char> _bufferType; + vespalib::eval::ValueType _tensor_type; + DataFromType _data_from_type; + + void serialize_labels(const vespalib::eval::Value::Index &index, + vespalib::nbostream &target) const; + + std::pair<const char *, uint32_t> getRawBuffer(RefType ref) const; + vespalib::datastore::Handle<char> allocRawBuffer(uint32_t size); +public: + StreamedValueStore(const vespalib::eval::ValueType &tensor_type); + virtual ~StreamedValueStore(); + + virtual void holdTensor(EntryRef ref) override; + virtual EntryRef move(EntryRef ref) override; + + StreamedValueData get_tensor_data(EntryRef ref) const; + bool encode_tensor(EntryRef ref, vespalib::nbostream &target) const; + + EntryRef store_tensor(const vespalib::eval::Value &tensor); + EntryRef store_encoded_tensor(vespalib::nbostream &encoded); +}; + + +} diff --git a/searchlib/src/vespa/searchlib/tensor/tensor_deserialize.cpp b/searchlib/src/vespa/searchlib/tensor/tensor_deserialize.cpp index 83988a3af11..35be27bc03b 100644 --- a/searchlib/src/vespa/searchlib/tensor/tensor_deserialize.cpp +++ b/searchlib/src/vespa/searchlib/tensor/tensor_deserialize.cpp @@ -1,9 +1,9 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "tensor_deserialize.h" #include <vespa/document/util/serializableexceptions.h> #include <vespa/eval/eval/engine_or_factory.h> #include <vespa/eval/eval/value.h> -#include <vespa/vespalib/objects/nbostream.h> using document::DeserializeException; using vespalib::eval::EngineOrFactory; @@ -11,14 +11,19 @@ using vespalib::eval::Value; namespace search::tensor { -std::unique_ptr<Value> deserialize_tensor(const void *data, size_t size) +std::unique_ptr<Value> deserialize_tensor(vespalib::nbostream &buffer) { - vespalib::nbostream wrapStream(data, size); - auto tensor = EngineOrFactory::get().decode(wrapStream); - if (wrapStream.size() != 0) { + auto tensor = EngineOrFactory::get().decode(buffer); + if (buffer.size() != 0) { throw DeserializeException("Leftover bytes deserializing tensor attribute value.", VESPA_STRLOC); } return tensor; } +std::unique_ptr<Value> deserialize_tensor(const void *data, size_t size) +{ + vespalib::nbostream wrapStream(data, size); + return deserialize_tensor(wrapStream); +} + } // namespace diff --git a/searchlib/src/vespa/searchlib/tensor/tensor_deserialize.h b/searchlib/src/vespa/searchlib/tensor/tensor_deserialize.h index 18e166543d6..6f9521c1355 100644 --- a/searchlib/src/vespa/searchlib/tensor/tensor_deserialize.h +++ b/searchlib/src/vespa/searchlib/tensor/tensor_deserialize.h @@ -1,10 +1,14 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <vespa/eval/eval/value.h> +#include <vespa/vespalib/objects/nbostream.h> namespace search::tensor { extern std::unique_ptr<vespalib::eval::Value> deserialize_tensor(const void *data, size_t size); +extern std::unique_ptr<vespalib::eval::Value> +deserialize_tensor(vespalib::nbostream &stream); + } // namespace diff --git a/slobrok/src/apps/slobrok/slobrok.cpp b/slobrok/src/apps/slobrok/slobrok.cpp index b89449e6779..5d650fafc96 100644 --- a/slobrok/src/apps/slobrok/slobrok.cpp +++ b/slobrok/src/apps/slobrok/slobrok.cpp @@ -1,5 +1,4 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <vespa/fnet/fnet.h> #include <vespa/slobrok/server/sbenv.h> #include <vespa/config/common/exceptions.h> #include <vespa/vespalib/util/exceptions.h> diff --git a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp index 2709ae9d7d7..3aa3d21aa7b 100644 --- a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp +++ b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp @@ -64,7 +64,7 @@ SharedRpcResources::SharedRpcResources(const config::ConfigUri& config_uri, int rpc_server_port, size_t rpc_thread_pool_size) : _thread_pool(std::make_unique<FastOS_ThreadPool>(1024*60)), - _transport(std::make_unique<FNET_Transport>(rpc_thread_pool_size)), + _transport(std::make_unique<FNET_Transport>(TransportConfig(rpc_thread_pool_size).events_before_wakeup(1))), _orb(std::make_unique<FRT_Supervisor>(_transport.get())), _slobrok_register(std::make_unique<slobrok::api::RegisterAPI>(*_orb, slobrok::ConfiguratorFactory(config_uri))), _slobrok_mirror(std::make_unique<slobrok::api::MirrorAPI>(*_orb, slobrok::ConfiguratorFactory(config_uri))), @@ -72,9 +72,7 @@ SharedRpcResources::SharedRpcResources(const config::ConfigUri& config_uri, _hostname(vespalib::HostName::get()), _rpc_server_port(rpc_server_port), _shutdown(false) -{ - _transport->events_before_wakeup(1); -} +{ } // TODO make sure init/shutdown is safe for aborted init in comm. mgr. diff --git a/storage/src/vespa/storage/tools/storage-cmd.cpp b/storage/src/vespa/storage/tools/storage-cmd.cpp index 8c0fcc83330..00ee2c9c4cf 100644 --- a/storage/src/vespa/storage/tools/storage-cmd.cpp +++ b/storage/src/vespa/storage/tools/storage-cmd.cpp @@ -1,5 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <vespa/fnet/frt/frt.h> +#include <vespa/fnet/frt/rpcrequest.h> +#include <vespa/fnet/frt/supervisor.h> +#include <vespa/fnet/frt/target.h> #include <vespa/slobrok/sbmirror.h> #include <vespa/fastos/app.h> #include <vespa/vespalib/locale/c.h> diff --git a/vespaclient/src/vespa/vespaclient/vdsstates/statesapp.cpp b/vespaclient/src/vespa/vespaclient/vdsstates/statesapp.cpp index 9393562bf28..a468dd98698 100644 --- a/vespaclient/src/vespa/vespaclient/vdsstates/statesapp.cpp +++ b/vespaclient/src/vespa/vespaclient/vdsstates/statesapp.cpp @@ -1,8 +1,10 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <vespa/defaults.h> -#include <vespa/fnet/frt/frt.h> #include <vespa/slobrok/sbmirror.h> +#include <vespa/fnet/frt/supervisor.h> +#include <vespa/fnet/frt/target.h> +#include <vespa/fnet/frt/rpcrequest.h> #include <vespa/vdslib/distribution/distribution.h> #include <vespa/vdslib/state/clusterstate.h> #include <vespa/vespalib/util/programoptions.h> |