aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--config-model/src/main/java/com/yahoo/vespa/model/builder/xml/dom/DomAdminV4Builder.java4
-rw-r--r--config-model/src/main/java/com/yahoo/vespa/model/builder/xml/dom/NodesSpecification.java14
-rw-r--r--config-model/src/main/java/com/yahoo/vespa/model/container/xml/ContainerModelBuilder.java26
-rw-r--r--config-model/src/main/java/com/yahoo/vespa/model/content/StorageGroup.java6
-rw-r--r--config-model/src/main/java/com/yahoo/vespa/model/content/cluster/ContentCluster.java2
-rw-r--r--config-model/src/test/java/com/yahoo/config/model/provision/ModelProvisioningTest.java38
-rw-r--r--config-provisioning/src/main/java/com/yahoo/config/provision/ClusterMembership.java3
-rw-r--r--config-provisioning/src/test/java/com/yahoo/config/provision/ClusterMembershipTest.java12
-rw-r--r--config/src/apps/vespa-get-config/getconfig.cpp6
-rw-r--r--config/src/apps/vespa-ping-configproxy/pingproxy.cpp11
-rw-r--r--config/src/tests/failover/failover.cpp4
-rw-r--r--config/src/tests/file_acquirer/file_acquirer_test.cpp3
-rw-r--r--config/src/tests/frt/frt.cpp3
-rw-r--r--config/src/vespa/config/frt/frtconfigresponsev3.cpp2
-rw-r--r--config/src/vespa/config/frt/slimeconfigrequest.cpp2
-rw-r--r--config/src/vespa/config/frt/slimeconfigresponse.cpp2
-rw-r--r--configd/src/apps/sentinel/cmdq.cpp2
-rw-r--r--configd/src/apps/sentinel/rpchooks.cpp3
-rw-r--r--configutil/src/lib/configstatus.cpp1
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/policies/externpolicy.cpp9
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/policies/externslobrokpolicy.cpp4
-rw-r--r--eval/src/vespa/eval/streamed/streamed_value_index.cpp4
-rw-r--r--eval/src/vespa/eval/streamed/streamed_value_utils.h4
-rw-r--r--fnet/src/examples/frt/rpc/echo_client.cpp4
-rw-r--r--fnet/src/examples/frt/rpc/rpc_callback_client.cpp4
-rw-r--r--fnet/src/examples/frt/rpc/rpc_callback_server.cpp6
-rw-r--r--fnet/src/examples/frt/rpc/rpc_client.cpp4
-rw-r--r--fnet/src/examples/frt/rpc/rpc_info.cpp4
-rw-r--r--fnet/src/examples/frt/rpc/rpc_invoke.cpp4
-rw-r--r--fnet/src/examples/frt/rpc/rpc_proxy.cpp25
-rw-r--r--fnet/src/examples/frt/rpc/rpc_server.cpp5
-rw-r--r--fnet/src/examples/ping/packets.cpp2
-rw-r--r--fnet/src/examples/ping/pingclient.cpp6
-rw-r--r--fnet/src/examples/ping/pingserver.cpp7
-rw-r--r--fnet/src/examples/proxy/proxy.cpp12
-rw-r--r--fnet/src/examples/timeout/timeout.cpp6
-rw-r--r--fnet/src/tests/connect/connect_test.cpp13
-rw-r--r--fnet/src/tests/connection_spread/connection_spread_test.cpp8
-rw-r--r--fnet/src/tests/frt/method_pt/method_pt.cpp5
-rw-r--r--fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp17
-rw-r--r--fnet/src/tests/frt/parallel_rpc/tls_rpc_bench.cpp4
-rw-r--r--fnet/src/tests/frt/rpc/detach_return_invoke.cpp5
-rw-r--r--fnet/src/tests/frt/rpc/invoke.cpp13
-rw-r--r--fnet/src/tests/frt/rpc/session.cpp4
-rw-r--r--fnet/src/tests/frt/rpc/sharedblob.cpp4
-rw-r--r--fnet/src/tests/info/info.cpp6
-rw-r--r--fnet/src/tests/locking/castspeed.cpp1
-rw-r--r--fnet/src/tests/locking/drainpackets.cpp3
-rw-r--r--fnet/src/tests/locking/lockspeed.cpp1
-rw-r--r--fnet/src/tests/scheduling/schedule.cpp3
-rw-r--r--fnet/src/tests/scheduling/sloweventloop.cpp3
-rw-r--r--fnet/src/tests/thread_selection/thread_selection_test.cpp2
-rw-r--r--fnet/src/tests/time/timespeed.cpp1
-rw-r--r--fnet/src/vespa/fnet/config.cpp3
-rw-r--r--fnet/src/vespa/fnet/config.h1
-rw-r--r--fnet/src/vespa/fnet/connection.cpp4
-rw-r--r--fnet/src/vespa/fnet/fnet.h68
-rw-r--r--fnet/src/vespa/fnet/frt/frt.h35
-rw-r--r--fnet/src/vespa/fnet/frt/supervisor.cpp2
-rw-r--r--fnet/src/vespa/fnet/iocomponent.cpp6
-rw-r--r--fnet/src/vespa/fnet/iocomponent.h2
-rw-r--r--fnet/src/vespa/fnet/transport.cpp62
-rw-r--r--fnet/src/vespa/fnet/transport.h116
-rw-r--r--fnet/src/vespa/fnet/transport_thread.cpp13
-rw-r--r--fnet/src/vespa/fnet/transport_thread.h59
-rw-r--r--jrt_test/src/jrt-test/simpleserver/simpleserver.cpp4
-rw-r--r--jrt_test/src/tests/echo/echo-client.cpp5
-rw-r--r--jrt_test/src/tests/mandatory-methods/extract-reflection.cpp4
-rw-r--r--jrt_test/src/tests/mockup-invoke/mockup-server.cpp10
-rw-r--r--jrt_test/src/tests/rpc-error/test-errors.cpp4
-rw-r--r--logd/src/logd/rpc_forwarder.cpp4
-rw-r--r--logd/src/logd/rpc_forwarder.h4
-rw-r--r--logd/src/tests/rpc_forwarder/rpc_forwarder_test.cpp4
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.cpp13
-rw-r--r--node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/AllocatableClusterResources.java33
-rw-r--r--node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/AllocationOptimizer.java4
-rw-r--r--node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/Autoscaler.java18
-rw-r--r--node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/ClusterTimeseries.java5
-rw-r--r--node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/MetricsReporter.java22
-rw-r--r--node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodeAllocation.java4
-rw-r--r--node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodeRepositoryProvisioner.java3
-rw-r--r--node-repository/src/test/java/com/yahoo/vespa/hosted/provision/restapi/NodesV2ApiTest.java4
-rw-r--r--node-repository/src/test/java/com/yahoo/vespa/hosted/provision/restapi/responses/capacity-zone.json2
-rw-r--r--searchcore/src/apps/vespa-proton-cmd/vespa-proton-cmd.cpp13
-rw-r--r--searchlib/src/test/java/com/yahoo/searchlib/tensor/TensorConformanceTest.java142
-rw-r--r--searchlib/src/tests/attribute/tensorattribute/tensorattribute_test.cpp6
-rw-r--r--searchlib/src/tests/engine/proto_rpc_adapter/proto_rpc_adapter_test.cpp6
-rw-r--r--searchlib/src/vespa/searchlib/attribute/createsinglestd.cpp4
-rw-r--r--searchlib/src/vespa/searchlib/tensor/CMakeLists.txt3
-rw-r--r--searchlib/src/vespa/searchlib/tensor/serialized_fast_value_attribute.cpp234
-rw-r--r--searchlib/src/vespa/searchlib/tensor/serialized_fast_value_attribute.h33
-rw-r--r--searchlib/src/vespa/searchlib/tensor/streamed_value_saver.cpp48
-rw-r--r--searchlib/src/vespa/searchlib/tensor/streamed_value_saver.h35
-rw-r--r--searchlib/src/vespa/searchlib/tensor/streamed_value_store.cpp228
-rw-r--r--searchlib/src/vespa/searchlib/tensor/streamed_value_store.h98
-rw-r--r--searchlib/src/vespa/searchlib/tensor/tensor_deserialize.cpp15
-rw-r--r--searchlib/src/vespa/searchlib/tensor/tensor_deserialize.h4
-rw-r--r--slobrok/src/apps/slobrok/slobrok.cpp1
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp6
-rw-r--r--storage/src/vespa/storage/tools/storage-cmd.cpp4
-rw-r--r--vespaclient/src/vespa/vespaclient/vdsstates/statesapp.cpp4
101 files changed, 1138 insertions, 611 deletions
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/builder/xml/dom/DomAdminV4Builder.java b/config-model/src/main/java/com/yahoo/vespa/model/builder/xml/dom/DomAdminV4Builder.java
index 12d5e8d32ed..a28475c94f3 100644
--- a/config-model/src/main/java/com/yahoo/vespa/model/builder/xml/dom/DomAdminV4Builder.java
+++ b/config-model/src/main/java/com/yahoo/vespa/model/builder/xml/dom/DomAdminV4Builder.java
@@ -118,7 +118,9 @@ public class DomAdminV4Builder extends DomAdminBuilderBase {
return nodesSpecification.provision(hostSystem,
ClusterSpec.Type.admin,
ClusterSpec.Id.from(clusterId),
- context.getDeployLogger()).keySet();
+ context.getDeployLogger(),
+ false)
+ .keySet();
}
/**
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/builder/xml/dom/NodesSpecification.java b/config-model/src/main/java/com/yahoo/vespa/model/builder/xml/dom/NodesSpecification.java
index 526e88749fc..aace6818ba6 100644
--- a/config-model/src/main/java/com/yahoo/vespa/model/builder/xml/dom/NodesSpecification.java
+++ b/config-model/src/main/java/com/yahoo/vespa/model/builder/xml/dom/NodesSpecification.java
@@ -202,15 +202,17 @@ public class NodesSpecification {
public Map<HostResource, ClusterMembership> provision(HostSystem hostSystem,
ClusterSpec.Type clusterType,
ClusterSpec.Id clusterId,
- DeployLogger logger) {
+ DeployLogger logger,
+ boolean stateful) {
if (combinedId.isPresent())
clusterType = ClusterSpec.Type.combined;
ClusterSpec cluster = ClusterSpec.request(clusterType, clusterId)
- .vespaVersion(version)
- .exclusive(exclusive)
- .combinedId(combinedId.map(ClusterSpec.Id::from))
- .dockerImageRepository(dockerImageRepo)
- .build();
+ .vespaVersion(version)
+ .exclusive(exclusive)
+ .combinedId(combinedId.map(ClusterSpec.Id::from))
+ .dockerImageRepository(dockerImageRepo)
+ .stateful(stateful)
+ .build();
return hostSystem.allocateHosts(cluster, Capacity.from(min, max, required, canFail), logger);
}
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/container/xml/ContainerModelBuilder.java b/config-model/src/main/java/com/yahoo/vespa/model/container/xml/ContainerModelBuilder.java
index 63426979649..193d12c428c 100644
--- a/config-model/src/main/java/com/yahoo/vespa/model/container/xml/ContainerModelBuilder.java
+++ b/config-model/src/main/java/com/yahoo/vespa/model/container/xml/ContainerModelBuilder.java
@@ -207,8 +207,7 @@ public class ContainerModelBuilder extends ConfigModelBuilder<ContainerModel> {
}
private void addZooKeeper(ApplicationContainerCluster cluster, Element spec) {
- Element zkElement = XML.getChild(spec, "zookeeper");
- if (zkElement == null) return;
+ if (!hasZooKeeper(spec)) return;
Element nodesElement = XML.getChild(spec, "nodes");
boolean isCombined = nodesElement != null && nodesElement.hasAttribute("of");
if (isCombined) {
@@ -589,7 +588,7 @@ public class ContainerModelBuilder extends ConfigModelBuilder<ContainerModel> {
Element nodesElement, ConfigModelContext context) {
applyNodesTagJvmArgs(nodes, getJvmOptions(cluster, nodesElement, context.getDeployLogger()));
- if (!cluster.getJvmGCOptions().isPresent()) {
+ if (cluster.getJvmGCOptions().isEmpty()) {
String jvmGCOptions = extractAttribute(nodesElement, VespaDomBuilder.JVM_GC_OPTIONS);
cluster.setJvmGCOptions(buildJvmGCOptions(context.getDeployState(), jvmGCOptions));
}
@@ -617,7 +616,7 @@ public class ContainerModelBuilder extends ConfigModelBuilder<ContainerModel> {
if (nodesElement == null) {
cluster.addContainers(allocateWithoutNodesTag(cluster, context));
} else {
- List<ApplicationContainer> nodes = createNodes(cluster, nodesElement, context);
+ List<ApplicationContainer> nodes = createNodes(cluster, containerElement, nodesElement, context);
Element jvmElement = XML.getChild(nodesElement, "jvm");
if (jvmElement == null) {
@@ -628,7 +627,7 @@ public class ContainerModelBuilder extends ConfigModelBuilder<ContainerModel> {
applyRoutingAliasProperties(nodes, cluster);
applyDefaultPreload(nodes, nodesElement);
String environmentVars = getEnvironmentVariables(XML.getChild(nodesElement, ENVIRONMENT_VARIABLES_ELEMENT));
- if (environmentVars != null && !environmentVars.isEmpty()) {
+ if (!environmentVars.isEmpty()) {
cluster.setEnvironmentVars(environmentVars);
}
if (useCpuSocketAffinity(nodesElement))
@@ -648,15 +647,15 @@ public class ContainerModelBuilder extends ConfigModelBuilder<ContainerModel> {
return sb.toString();
}
- private List<ApplicationContainer> createNodes(ApplicationContainerCluster cluster, Element nodesElement, ConfigModelContext context) {
+ private List<ApplicationContainer> createNodes(ApplicationContainerCluster cluster, Element containerElement, Element nodesElement, ConfigModelContext context) {
if (nodesElement.hasAttribute("type")) // internal use for hosted system infrastructure nodes
return createNodesFromNodeType(cluster, nodesElement, context);
else if (nodesElement.hasAttribute("of")) // hosted node spec referencing a content cluster
return createNodesFromContentServiceReference(cluster, nodesElement, context);
else if (nodesElement.hasAttribute("count")) // regular, hosted node spec
- return createNodesFromNodeCount(cluster, nodesElement, context);
+ return createNodesFromNodeCount(cluster, containerElement, nodesElement, context);
else if (cluster.isHostedVespa() && cluster.getZone().environment().isManuallyDeployed()) // default to 1 in manual zones
- return createNodesFromNodeCount(cluster, nodesElement, context);
+ return createNodesFromNodeCount(cluster, containerElement, nodesElement, context);
else // the non-hosted option
return createNodesFromNodeList(context.getDeployState(), cluster, nodesElement);
}
@@ -720,12 +719,13 @@ public class ContainerModelBuilder extends ConfigModelBuilder<ContainerModel> {
return List.of(node);
}
- private List<ApplicationContainer> createNodesFromNodeCount(ApplicationContainerCluster cluster, Element nodesElement, ConfigModelContext context) {
+ private List<ApplicationContainer> createNodesFromNodeCount(ApplicationContainerCluster cluster, Element containerElement, Element nodesElement, ConfigModelContext context) {
NodesSpecification nodesSpecification = NodesSpecification.from(new ModelElement(nodesElement), context);
Map<HostResource, ClusterMembership> hosts = nodesSpecification.provision(cluster.getRoot().hostSystem(),
ClusterSpec.Type.container,
ClusterSpec.Id.from(cluster.getName()),
- log);
+ log,
+ hasZooKeeper(containerElement));
return createNodesFromHosts(context.getDeployLogger(), hosts, cluster);
}
@@ -862,7 +862,7 @@ public class ContainerModelBuilder extends ConfigModelBuilder<ContainerModel> {
private void addIncludes(Element parentElement) {
List<Element> includes = XML.getChildren(parentElement, IncludeDirs.INCLUDE);
- if (includes == null || includes.isEmpty()) {
+ if (includes.isEmpty()) {
return;
}
if (app == null) {
@@ -940,6 +940,10 @@ public class ContainerModelBuilder extends ConfigModelBuilder<ContainerModel> {
));
}
+ private static boolean hasZooKeeper(Element spec) {
+ return XML.getChild(spec, "zookeeper") != null;
+ }
+
/** Disallow renderers named "XmlRenderer" or "JsonRenderer" */
private static void validateRendererElement(Element element) {
String idAttr = element.getAttribute("id");
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/content/StorageGroup.java b/config-model/src/main/java/com/yahoo/vespa/model/content/StorageGroup.java
index 894022d936d..3c0f8996c22 100644
--- a/config-model/src/main/java/com/yahoo/vespa/model/content/StorageGroup.java
+++ b/config-model/src/main/java/com/yahoo/vespa/model/content/StorageGroup.java
@@ -1,12 +1,11 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.model.content;
+import com.yahoo.config.application.api.DeployLogger;
import com.yahoo.config.model.ConfigModelContext;
import com.yahoo.config.model.deploy.DeployState;
import com.yahoo.config.provision.ClusterMembership;
import com.yahoo.config.provision.ClusterSpec;
-import com.yahoo.config.application.api.DeployLogger;
-import java.util.logging.Level;
import com.yahoo.vespa.config.content.StorDistributionConfig;
import com.yahoo.vespa.model.HostResource;
import com.yahoo.vespa.model.HostSystem;
@@ -25,6 +24,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.logging.Level;
/**
* A group of storage nodes/distributors.
@@ -189,7 +189,7 @@ public class StorageGroup {
HostSystem hostSystem,
DeployLogger logger) {
ClusterSpec.Id clusterId = ClusterSpec.Id.from(clusterIdString);
- return nodesSpecification.provision(hostSystem, ClusterSpec.Type.content, clusterId, logger);
+ return nodesSpecification.provision(hostSystem, ClusterSpec.Type.content, clusterId, logger, true);
}
public static class Builder {
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/content/cluster/ContentCluster.java b/config-model/src/main/java/com/yahoo/vespa/model/content/cluster/ContentCluster.java
index 8ae895b0695..93367f1a286 100644
--- a/config-model/src/main/java/com/yahoo/vespa/model/content/cluster/ContentCluster.java
+++ b/config-model/src/main/java/com/yahoo/vespa/model/content/cluster/ContentCluster.java
@@ -354,7 +354,7 @@ public class ContentCluster extends AbstractConfigProducer implements
}
private Collection<HostResource> getControllerHosts(NodesSpecification nodesSpecification, Admin admin, String clusterName, ConfigModelContext context) {
- return nodesSpecification.provision(admin.hostSystem(), ClusterSpec.Type.admin, ClusterSpec.Id.from(clusterName), context.getDeployLogger()).keySet();
+ return nodesSpecification.provision(admin.hostSystem(), ClusterSpec.Type.admin, ClusterSpec.Id.from(clusterName), context.getDeployLogger(), false).keySet();
}
private List<HostResource> drawControllerHosts(int count, StorageGroup rootGroup, Collection<ContainerModel> containers) {
diff --git a/config-model/src/test/java/com/yahoo/config/model/provision/ModelProvisioningTest.java b/config-model/src/test/java/com/yahoo/config/model/provision/ModelProvisioningTest.java
index 4ee05b56bca..f8695a76b11 100644
--- a/config-model/src/test/java/com/yahoo/config/model/provision/ModelProvisioningTest.java
+++ b/config-model/src/test/java/com/yahoo/config/model/provision/ModelProvisioningTest.java
@@ -1815,6 +1815,44 @@ public class ModelProvisioningTest {
assertEquals(1, controller.getContainers().size());
}
+ @Test
+ public void testStatefulProperty() {
+ String servicesXml =
+ "<?xml version='1.0' encoding='utf-8' ?>" +
+ "<services>" +
+ " <container version='1.0' id='qrs'>" +
+ " <nodes count='1'/>" +
+ " </container>" +
+ " <container version='1.0' id='zk'>" +
+ " <zookeeper/>" +
+ " <nodes count='3'/>" +
+ " </container>" +
+ " <content version='1.0' id='content'>" +
+ " <redundancy>2</redundancy>" +
+ " <documents>" +
+ " <document type='type1' mode='index'/>" +
+ " </documents>" +
+ " <nodes count='2'/>" +
+ " </content>" +
+ "</services>";
+ VespaModelTester tester = new VespaModelTester();
+ tester.addHosts(6);
+ VespaModel model = tester.createModel(servicesXml, true);
+
+ Map<String, Boolean> tests = Map.of("qrs", false,
+ "zk", true,
+ "content", true);
+ Map<String, List<HostResource>> hostsByCluster = model.hostSystem().getHosts().stream()
+ .collect(Collectors.groupingBy(h -> h.spec().membership().get().cluster().id().value()));
+ tests.forEach((clusterId, stateful) -> {
+ List<HostResource> hosts = hostsByCluster.getOrDefault(clusterId, List.of());
+ assertFalse("Hosts are provisioned for '" + clusterId + "'", hosts.isEmpty());
+ assertEquals("Hosts in cluster '" + clusterId + "' are " + (stateful ? "" : "not ") + "stateful",
+ stateful,
+ hosts.stream().allMatch(h -> h.spec().membership().get().cluster().isStateful()));
+ });
+ }
+
private VespaModel createNonProvisionedMultitenantModel(String services) {
return createNonProvisionedModel(true, null, services);
}
diff --git a/config-provisioning/src/main/java/com/yahoo/config/provision/ClusterMembership.java b/config-provisioning/src/main/java/com/yahoo/config/provision/ClusterMembership.java
index 9fb1689782f..031fce88354 100644
--- a/config-provisioning/src/main/java/com/yahoo/config/provision/ClusterMembership.java
+++ b/config-provisioning/src/main/java/com/yahoo/config/provision/ClusterMembership.java
@@ -72,8 +72,7 @@ public class ClusterMembership {
"/" + index +
( cluster.isExclusive() ? "/exclusive" : "") +
( retired ? "/retired" : "") +
- // TODO(mpolden): Write stateful tag once all nodes can read it
- // ( cluster.isStateful() ? "/stateful" : "") +
+ ( cluster.isStateful() ? "/stateful" : "") +
( cluster.combinedId().isPresent() ? "/" + cluster.combinedId().get().value() : "");
}
diff --git a/config-provisioning/src/test/java/com/yahoo/config/provision/ClusterMembershipTest.java b/config-provisioning/src/test/java/com/yahoo/config/provision/ClusterMembershipTest.java
index 1f170cca9a0..fc1cdd74169 100644
--- a/config-provisioning/src/test/java/com/yahoo/config/provision/ClusterMembershipTest.java
+++ b/config-provisioning/src/test/java/com/yahoo/config/provision/ClusterMembershipTest.java
@@ -48,10 +48,10 @@ public class ClusterMembershipTest {
assertEquals(dockerImageRepo.get(), instance.cluster().dockerImageRepo().get());
}
{
- ClusterMembership instance = ClusterMembership.from("container/id1/4/37", Vtag.currentVersion, Optional.empty());
+ ClusterMembership instance = ClusterMembership.from("container/id1/4/37/stateful", Vtag.currentVersion, Optional.empty());
ClusterMembership serialized = ClusterMembership.from(instance.stringValue(), Vtag.currentVersion, Optional.empty());
assertEquals(instance, serialized);
- assertFalse("Skips serialization of stateful property", instance.cluster().isStateful());
+ assertTrue(instance.cluster().isStateful());
}
}
@@ -117,7 +117,7 @@ public class ClusterMembershipTest {
assertFalse(instance.cluster().group().isPresent());
assertEquals(37, instance.index());
assertFalse(instance.retired());
- assertEquals("content/id1/37", instance.stringValue());
+ assertEquals("content/id1/37/stateful", instance.stringValue());
}
private void assertContentServiceWithGroup(ClusterMembership instance) {
@@ -126,7 +126,7 @@ public class ClusterMembershipTest {
assertEquals(4, instance.cluster().group().get().index());
assertEquals(37, instance.index());
assertFalse(instance.retired());
- assertEquals("content/id1/4/37", instance.stringValue());
+ assertEquals("content/id1/4/37/stateful", instance.stringValue());
}
/** Serializing a spec without a group assigned works, but not deserialization */
@@ -135,7 +135,7 @@ public class ClusterMembershipTest {
assertEquals("id1", instance.cluster().id().value());
assertEquals(37, instance.index());
assertTrue(instance.retired());
- assertEquals("content/id1/37/retired", instance.stringValue());
+ assertEquals("content/id1/37/retired/stateful", instance.stringValue());
}
private void assertContentServiceWithGroupAndRetire(ClusterMembership instance) {
@@ -144,7 +144,7 @@ public class ClusterMembershipTest {
assertEquals(4, instance.cluster().group().get().index());
assertEquals(37, instance.index());
assertTrue(instance.retired());
- assertEquals("content/id1/4/37/retired", instance.stringValue());
+ assertEquals("content/id1/4/37/retired/stateful", instance.stringValue());
}
}
diff --git a/config/src/apps/vespa-get-config/getconfig.cpp b/config/src/apps/vespa-get-config/getconfig.cpp
index dc12d2bbf0e..a5e400bd354 100644
--- a/config/src/apps/vespa-get-config/getconfig.cpp
+++ b/config/src/apps/vespa-get-config/getconfig.cpp
@@ -1,13 +1,13 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <vespa/fnet/frt/frt.h>
+#include <vespa/fnet/frt/supervisor.h>
+#include <vespa/fnet/frt/target.h>
#include <vespa/config/config.h>
#include <vespa/config/frt/frtconfigrequestfactory.h>
#include <vespa/config/frt/frtconnection.h>
#include <vespa/config/common/payload_converter.h>
#include <vespa/fastos/app.h>
-
#include <string>
#include <sstream>
#include <fstream>
@@ -28,7 +28,7 @@ private:
public:
GetConfig() : _server(), _target(nullptr) {}
- virtual ~GetConfig();
+ ~GetConfig() override;
int usage();
void initRPC(const char *spec);
void finiRPC();
diff --git a/config/src/apps/vespa-ping-configproxy/pingproxy.cpp b/config/src/apps/vespa-ping-configproxy/pingproxy.cpp
index 208f9312ada..a47fd25f9af 100644
--- a/config/src/apps/vespa-ping-configproxy/pingproxy.cpp
+++ b/config/src/apps/vespa-ping-configproxy/pingproxy.cpp
@@ -1,6 +1,8 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <vespa/fnet/frt/frt.h>
+#include <vespa/fnet/frt/supervisor.h>
+#include <vespa/fnet/frt/target.h>
+#include <vespa/fnet/frt/rpcrequest.h>
#include <vespa/fastos/app.h>
#include <sstream>
@@ -15,12 +17,11 @@ private:
std::unique_ptr<fnet::frt::StandaloneFRT> _server;
FRT_Target *_target;
- PingProxy(const PingProxy &);
- PingProxy &operator=(const PingProxy &);
-
public:
+ PingProxy(const PingProxy &) = delete;
+ PingProxy &operator=(const PingProxy &) = delete;
PingProxy() : _server(), _target(nullptr) {}
- virtual ~PingProxy();
+ ~PingProxy() override ;
int usage();
void initRPC(const char *spec);
void finiRPC();
diff --git a/config/src/tests/failover/failover.cpp b/config/src/tests/failover/failover.cpp
index 99b6967c929..2e039081716 100644
--- a/config/src/tests/failover/failover.cpp
+++ b/config/src/tests/failover/failover.cpp
@@ -5,7 +5,9 @@
#include <vespa/config/frt/protocol.h>
#include <vespa/config/config.h>
#include <vespa/config/common/configcontext.h>
-#include <vespa/fnet/frt/frt.h>
+#include <vespa/fnet/frt/supervisor.h>
+#include <vespa/fnet/frt/rpcrequest.h>
+
#include "config-my.h"
#include <vespa/vespalib/data/slime/slime.h>
#include <vespa/vespalib/data/simple_buffer.h>
diff --git a/config/src/tests/file_acquirer/file_acquirer_test.cpp b/config/src/tests/file_acquirer/file_acquirer_test.cpp
index 33bb8f47e09..da4bd71b82b 100644
--- a/config/src/tests/file_acquirer/file_acquirer_test.cpp
+++ b/config/src/tests/file_acquirer/file_acquirer_test.cpp
@@ -1,7 +1,8 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/vespalib/testkit/test_kit.h>
#include <vespa/config/file_acquirer/file_acquirer.h>
-#include <vespa/fnet/frt/frt.h>
+#include <vespa/fnet/frt/supervisor.h>
+#include <vespa/fnet/frt/rpcrequest.h>
#include <vespa/vespalib/util/stringfmt.h>
using namespace config;
diff --git a/config/src/tests/frt/frt.cpp b/config/src/tests/frt/frt.cpp
index 85b9789821d..0d70605fa62 100644
--- a/config/src/tests/frt/frt.cpp
+++ b/config/src/tests/frt/frt.cpp
@@ -11,9 +11,8 @@
#include <vespa/vespalib/data/slime/slime.h>
#include <vespa/vespalib/data/slime/json_format.h>
#include <vespa/vespalib/data/simple_buffer.h>
-#include <vespa/fnet/fnet.h>
-#include <vespa/fnet/frt/frt.h>
#include <vespa/fnet/frt/error.h>
+#include <vespa/fnet/frt/supervisor.h>
#include <vespa/config/frt/protocol.h>
#include <lz4.h>
#include "config-my.h"
diff --git a/config/src/vespa/config/frt/frtconfigresponsev3.cpp b/config/src/vespa/config/frt/frtconfigresponsev3.cpp
index 9635b4c811c..80cdf88a79a 100644
--- a/config/src/vespa/config/frt/frtconfigresponsev3.cpp
+++ b/config/src/vespa/config/frt/frtconfigresponsev3.cpp
@@ -1,7 +1,7 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "frtconfigresponsev3.h"
#include "compressioninfo.h"
-#include <vespa/fnet/frt/frt.h>
+#include <vespa/fnet/frt/values.h>
#include <vespa/vespalib/data/simple_buffer.h>
#include <vespa/log/log.h>
diff --git a/config/src/vespa/config/frt/slimeconfigrequest.cpp b/config/src/vespa/config/frt/slimeconfigrequest.cpp
index 696789f74c1..8a6706974f6 100644
--- a/config/src/vespa/config/frt/slimeconfigrequest.cpp
+++ b/config/src/vespa/config/frt/slimeconfigrequest.cpp
@@ -1,12 +1,12 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "slimeconfigrequest.h"
#include "connection.h"
-#include <vespa/fnet/frt/frt.h>
#include <vespa/config/common/configkey.h>
#include <vespa/config/common/configstate.h>
#include <vespa/config/common/configdefinition.h>
#include <vespa/config/common/trace.h>
#include <vespa/config/common/vespa_version.h>
+#include <vespa/fnet/frt/rpcrequest.h>
#include <vespa/vespalib/data/simple_buffer.h>
diff --git a/config/src/vespa/config/frt/slimeconfigresponse.cpp b/config/src/vespa/config/frt/slimeconfigresponse.cpp
index 181ab58b184..155515d2ed6 100644
--- a/config/src/vespa/config/frt/slimeconfigresponse.cpp
+++ b/config/src/vespa/config/frt/slimeconfigresponse.cpp
@@ -1,7 +1,7 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "slimeconfigresponse.h"
#include <vespa/config/common/misc.h>
-#include <vespa/fnet/frt/frt.h>
+#include <vespa/fnet/frt/values.h>
#include <vespa/vespalib/stllike/string.h>
#include <vespa/log/log.h>
LOG_SETUP(".config.frt.slimeconfigresponse");
diff --git a/configd/src/apps/sentinel/cmdq.cpp b/configd/src/apps/sentinel/cmdq.cpp
index 8fa3726c7f6..489ae97228f 100644
--- a/configd/src/apps/sentinel/cmdq.cpp
+++ b/configd/src/apps/sentinel/cmdq.cpp
@@ -1,7 +1,7 @@
// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "cmdq.h"
-#include <vespa/fnet/frt/frt.h>
+#include <vespa/fnet/frt/rpcrequest.h>
namespace config::sentinel {
diff --git a/configd/src/apps/sentinel/rpchooks.cpp b/configd/src/apps/sentinel/rpchooks.cpp
index 99bcd404402..aef58b8a1dc 100644
--- a/configd/src/apps/sentinel/rpchooks.cpp
+++ b/configd/src/apps/sentinel/rpchooks.cpp
@@ -2,7 +2,8 @@
#include "rpchooks.h"
#include "cmdq.h"
-#include <vespa/fnet/frt/frt.h>
+#include <vespa/fnet/frt/supervisor.h>
+#include <vespa/fnet/frt/rpcrequest.h>
#include <vespa/log/log.h>
LOG_SETUP(".rpchooks");
diff --git a/configutil/src/lib/configstatus.cpp b/configutil/src/lib/configstatus.cpp
index 254fa94a8ec..e56da67fade 100644
--- a/configutil/src/lib/configstatus.cpp
+++ b/configutil/src/lib/configstatus.cpp
@@ -2,7 +2,6 @@
#include "configstatus.h"
#include "tags.h"
-#include <vespa/fnet/frt/frt.h>
#include <vespa/vespalib/data/slime/slime.h>
#include <vbench/http/http_result_handler.h>
#include <vbench/http/server_spec.h>
diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/externpolicy.cpp b/documentapi/src/vespa/documentapi/messagebus/policies/externpolicy.cpp
index 25112a00b99..312cd2d89cb 100644
--- a/documentapi/src/vespa/documentapi/messagebus/policies/externpolicy.cpp
+++ b/documentapi/src/vespa/documentapi/messagebus/policies/externpolicy.cpp
@@ -2,10 +2,11 @@
#include "externpolicy.h"
#include <boost/tokenizer.hpp>
#include <vespa/documentapi/messagebus/documentprotocol.h>
-#include <vespa/messagebus/emptyreply.h>
#include <vespa/vespalib/util/stringfmt.h>
#include <vespa/slobrok/sbmirror.h>
-#include <vespa/fnet/frt/frt.h>
+#include <vespa/fnet/transport.h>
+#include <vespa/fnet/frt/supervisor.h>
+#include <vespa/fastos/thread.h>
#include <vespa/log/log.h>
LOG_SETUP(".externpolicy");
@@ -52,13 +53,13 @@ ExternPolicy::ExternPolicy(const string &param) :
spec.push_back(*it);
}
- if (spec.size() == 0) {
+ if (spec.empty()) {
_error = vespalib::make_string("Extern policy needs at least one slobrok: Slobrok list '%s' resolved to no slobroks", lst.c_str());
return;
}
slobrok::ConfiguratorFactory config(spec);
- _mirror.reset(new MirrorAPI(*_orb, config));
+ _mirror = std::make_unique<MirrorAPI>(*_orb, config);
_started = _transport->Start(_threadPool.get());
if (!_started) {
_error = "Failed to start FNET supervisor.";
diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/externslobrokpolicy.cpp b/documentapi/src/vespa/documentapi/messagebus/policies/externslobrokpolicy.cpp
index 800aa8c4520..9eb28432234 100644
--- a/documentapi/src/vespa/documentapi/messagebus/policies/externslobrokpolicy.cpp
+++ b/documentapi/src/vespa/documentapi/messagebus/policies/externslobrokpolicy.cpp
@@ -4,10 +4,10 @@
#include <vespa/messagebus/routing/routingcontext.h>
#include <vespa/config/common/configcontext.h>
#include <vespa/vespalib/text/stringtokenizer.h>
-#include <vespa/vespalib/util/time.h>
-#include <vespa/fnet/frt/frt.h>
#include <vespa/slobrok/sbmirror.h>
+#include <vespa/fnet/frt/supervisor.h>
#include <vespa/fnet/transport.h>
+#include <vespa/fastos/thread.h>
#include <thread>
using slobrok::api::IMirrorAPI;
diff --git a/eval/src/vespa/eval/streamed/streamed_value_index.cpp b/eval/src/vespa/eval/streamed/streamed_value_index.cpp
index 38b57e9c660..17cf7316554 100644
--- a/eval/src/vespa/eval/streamed/streamed_value_index.cpp
+++ b/eval/src/vespa/eval/streamed/streamed_value_index.cpp
@@ -39,7 +39,7 @@ struct StreamedFilterView : Value::Index::View
bool next_result(ConstArrayRef<vespalib::stringref*> addr_out, size_t &idx_out) override {
while (const auto block = label_blocks.next_block()) {
- idx_out = block.ss_idx;
+ idx_out = block.subspace_index;
bool matches = true;
size_t out_idx = 0;
size_t vdm_idx = 0;
@@ -73,7 +73,7 @@ struct StreamedIterationView : Value::Index::View
bool next_result(ConstArrayRef<vespalib::stringref*> addr_out, size_t &idx_out) override {
if (auto block = label_blocks.next_block()) {
- idx_out = block.ss_idx;
+ idx_out = block.subspace_index;
size_t i = 0;
assert(addr_out.size() == block.address.size());
for (auto ptr : addr_out) {
diff --git a/eval/src/vespa/eval/streamed/streamed_value_utils.h b/eval/src/vespa/eval/streamed/streamed_value_utils.h
index 3e3da82dd22..b88d4df8581 100644
--- a/eval/src/vespa/eval/streamed/streamed_value_utils.h
+++ b/eval/src/vespa/eval/streamed/streamed_value_utils.h
@@ -29,9 +29,9 @@ struct LabelStream {
**/
struct LabelBlock {
static constexpr size_t npos = -1;
- size_t ss_idx;
+ size_t subspace_index;
ConstArrayRef<vespalib::stringref> address;
- operator bool() const { return ss_idx != npos; }
+ operator bool() const { return subspace_index != npos; }
};
/**
diff --git a/fnet/src/examples/frt/rpc/echo_client.cpp b/fnet/src/examples/frt/rpc/echo_client.cpp
index 06f4ef0ee5b..bb2ef66c6fa 100644
--- a/fnet/src/examples/frt/rpc/echo_client.cpp
+++ b/fnet/src/examples/frt/rpc/echo_client.cpp
@@ -1,6 +1,8 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <vespa/fnet/frt/frt.h>
+#include <vespa/fnet/frt/supervisor.h>
+#include <vespa/fnet/frt/target.h>
+#include <vespa/fnet/frt/rpcrequest.h>
#include <vespa/fastos/app.h>
#include <vespa/log/log.h>
diff --git a/fnet/src/examples/frt/rpc/rpc_callback_client.cpp b/fnet/src/examples/frt/rpc/rpc_callback_client.cpp
index 726a500cc55..c63352d8f24 100644
--- a/fnet/src/examples/frt/rpc/rpc_callback_client.cpp
+++ b/fnet/src/examples/frt/rpc/rpc_callback_client.cpp
@@ -1,6 +1,8 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <vespa/fnet/frt/frt.h>
+#include <vespa/fnet/frt/supervisor.h>
+#include <vespa/fnet/frt/target.h>
+#include <vespa/fnet/frt/rpcrequest.h>
#include <vespa/fastos/app.h>
#include <vespa/log/log.h>
diff --git a/fnet/src/examples/frt/rpc/rpc_callback_server.cpp b/fnet/src/examples/frt/rpc/rpc_callback_server.cpp
index 872894b190d..9832a59abad 100644
--- a/fnet/src/examples/frt/rpc/rpc_callback_server.cpp
+++ b/fnet/src/examples/frt/rpc/rpc_callback_server.cpp
@@ -1,6 +1,10 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <vespa/fnet/frt/frt.h>
+#include <vespa/fnet/frt/supervisor.h>
+#include <vespa/fnet/frt/rpcrequest.h>
+#include <vespa/fnet/signalshutdown.h>
+#include <vespa/fnet/transport.h>
+
#include <vespa/fastos/app.h>
#include <thread>
diff --git a/fnet/src/examples/frt/rpc/rpc_client.cpp b/fnet/src/examples/frt/rpc/rpc_client.cpp
index fc1d54d3440..1c634f4b704 100644
--- a/fnet/src/examples/frt/rpc/rpc_client.cpp
+++ b/fnet/src/examples/frt/rpc/rpc_client.cpp
@@ -1,6 +1,8 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <vespa/fnet/frt/frt.h>
+#include <vespa/fnet/frt/supervisor.h>
+#include <vespa/fnet/frt/target.h>
+#include <vespa/fnet/frt/rpcrequest.h>
#include <vespa/fastos/app.h>
#include <vespa/log/log.h>
diff --git a/fnet/src/examples/frt/rpc/rpc_info.cpp b/fnet/src/examples/frt/rpc/rpc_info.cpp
index d90d22d1986..0f8b8422241 100644
--- a/fnet/src/examples/frt/rpc/rpc_info.cpp
+++ b/fnet/src/examples/frt/rpc/rpc_info.cpp
@@ -1,6 +1,8 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <vespa/fnet/frt/frt.h>
+#include <vespa/fnet/frt/supervisor.h>
+#include <vespa/fnet/frt/target.h>
+#include <vespa/fnet/frt/rpcrequest.h>
#include <vespa/fastos/app.h>
#include <vespa/log/log.h>
diff --git a/fnet/src/examples/frt/rpc/rpc_invoke.cpp b/fnet/src/examples/frt/rpc/rpc_invoke.cpp
index fb82622a537..d1f35429352 100644
--- a/fnet/src/examples/frt/rpc/rpc_invoke.cpp
+++ b/fnet/src/examples/frt/rpc/rpc_invoke.cpp
@@ -1,6 +1,8 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <vespa/fnet/frt/frt.h>
+#include <vespa/fnet/frt/supervisor.h>
+#include <vespa/fnet/frt/target.h>
+#include <vespa/fnet/frt/rpcrequest.h>
#include <vespa/fastos/app.h>
#include <vespa/vespalib/locale/c.h>
diff --git a/fnet/src/examples/frt/rpc/rpc_proxy.cpp b/fnet/src/examples/frt/rpc/rpc_proxy.cpp
index a61e2d37197..93076344ce2 100644
--- a/fnet/src/examples/frt/rpc/rpc_proxy.cpp
+++ b/fnet/src/examples/frt/rpc/rpc_proxy.cpp
@@ -1,6 +1,14 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <vespa/fnet/frt/frt.h>
+#include <vespa/fnet/frt/supervisor.h>
+#include <vespa/fnet/frt/rpcrequest.h>
+#include <vespa/fnet/frt/target.h>
+#include <vespa/fnet/frt/invoker.h>
+#include <vespa/fnet/channel.h>
+#include <vespa/fnet/transport_thread.h>
+#include <vespa/fnet/transport.h>
+#include <vespa/fnet/signalshutdown.h>
+
#include <vespa/fastos/app.h>
#include <chrono>
@@ -15,12 +23,10 @@ struct Session
uint32_t id;
uint32_t finiCnt;
- Session(uint32_t xid) : client(nullptr), server(nullptr), id(xid), finiCnt(0) {}
+ explicit Session(uint32_t xid) : client(nullptr), server(nullptr), id(xid), finiCnt(0) {}
~Session() { assert(client == nullptr && server == nullptr && finiCnt == 2); }
-
-private:
- Session(const Session &);
- Session &operator=(const Session &);
+ Session(const Session &) = delete;
+ Session &operator=(const Session &) = delete;
};
//-----------------------------------------------------------------------------
@@ -34,10 +40,9 @@ private:
uint32_t _currID;
char _prefixStr[256];
- RPCProxy(const RPCProxy &);
- RPCProxy &operator=(const RPCProxy &);
-
public:
+ RPCProxy(const RPCProxy &) = delete;
+ RPCProxy &operator=(const RPCProxy &) = delete;
RPCProxy(FRT_Supervisor &supervisor,
const char *spec,
bool verbose)
@@ -69,7 +74,7 @@ private:
RPCProxy &_proxy;
public:
- ReqDone(RPCProxy &proxy) : _proxy(proxy) {}
+ explicit ReqDone(RPCProxy &proxy) : _proxy(proxy) {}
void RequestDone(FRT_RPCRequest *req) override;
};
diff --git a/fnet/src/examples/frt/rpc/rpc_server.cpp b/fnet/src/examples/frt/rpc/rpc_server.cpp
index aa521080538..4333f182cc0 100644
--- a/fnet/src/examples/frt/rpc/rpc_server.cpp
+++ b/fnet/src/examples/frt/rpc/rpc_server.cpp
@@ -1,6 +1,9 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <vespa/fnet/frt/frt.h>
+#include <vespa/fnet/frt/supervisor.h>
+#include <vespa/fnet/frt/rpcrequest.h>
+#include <vespa/fnet/signalshutdown.h>
+#include <vespa/fnet/transport.h>
#include <vespa/fastos/app.h>
#include <vespa/log/log.h>
diff --git a/fnet/src/examples/ping/packets.cpp b/fnet/src/examples/ping/packets.cpp
index e5e9f645c9a..6aa54838c8b 100644
--- a/fnet/src/examples/ping/packets.cpp
+++ b/fnet/src/examples/ping/packets.cpp
@@ -1,7 +1,7 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <vespa/fnet/fnet.h>
#include "packets.h"
+#include <vespa/fnet/databuffer.h>
uint32_t
PingRequest::GetPCODE()
diff --git a/fnet/src/examples/ping/pingclient.cpp b/fnet/src/examples/ping/pingclient.cpp
index 1d65ef3b69c..6a7bd21e715 100644
--- a/fnet/src/examples/ping/pingclient.cpp
+++ b/fnet/src/examples/ping/pingclient.cpp
@@ -1,8 +1,12 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <vespa/fnet/fnet.h>
+#include <vespa/fnet/transport.h>
+#include <vespa/fnet/simplepacketstreamer.h>
+#include <vespa/fnet/channel.h>
+#include <vespa/fnet/connection.h>
#include <examples/ping/packets.h>
#include <vespa/fastos/app.h>
+#include <vespa/fastos/thread.h>
#include <vespa/log/log.h>
LOG_SETUP("pingclient");
diff --git a/fnet/src/examples/ping/pingserver.cpp b/fnet/src/examples/ping/pingserver.cpp
index c4543134912..cb0ab02aa0d 100644
--- a/fnet/src/examples/ping/pingserver.cpp
+++ b/fnet/src/examples/ping/pingserver.cpp
@@ -1,6 +1,11 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <vespa/fnet/fnet.h>
+#include <vespa/fnet/transport.h>
+#include <vespa/fnet/signalshutdown.h>
+#include <vespa/fnet/simplepacketstreamer.h>
+#include <vespa/fnet/channel.h>
+#include <vespa/fnet/iserveradapter.h>
+#include <vespa/fnet/connector.h>
#include <examples/ping/packets.h>
#include <vespa/fastos/app.h>
diff --git a/fnet/src/examples/proxy/proxy.cpp b/fnet/src/examples/proxy/proxy.cpp
index a01a16ead9c..062a0d52627 100644
--- a/fnet/src/examples/proxy/proxy.cpp
+++ b/fnet/src/examples/proxy/proxy.cpp
@@ -1,6 +1,14 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <vespa/fnet/fnet.h>
+#include <vespa/fnet/transport.h>
+#include <vespa/fnet/transport_thread.h>
+#include <vespa/fnet/connection.h>
+#include <vespa/fnet/signalshutdown.h>
+#include <vespa/fnet/packet.h>
+#include <vespa/fnet/iserveradapter.h>
+#include <vespa/fnet/ipacketstreamer.h>
+#include <vespa/fnet/channel.h>
+#include <vespa/fnet/connector.h>
#include <vespa/fastos/app.h>
#include <vespa/log/log.h>
@@ -140,7 +148,7 @@ private:
public:
Proxy() : _transport() {}
- ~Proxy() { }
+ ~Proxy() override { }
bool GetPacketInfo(FNET_DataBuffer *src, uint32_t *plen, uint32_t *pcode, uint32_t *chid, bool *) override;
FNET_Packet *Decode(FNET_DataBuffer *src, uint32_t plen, uint32_t pcode, FNET_Context) override;
void Encode(FNET_Packet *packet, uint32_t chid, FNET_DataBuffer *dst) override;
diff --git a/fnet/src/examples/timeout/timeout.cpp b/fnet/src/examples/timeout/timeout.cpp
index 23dfbeb9070..9f363ccd864 100644
--- a/fnet/src/examples/timeout/timeout.cpp
+++ b/fnet/src/examples/timeout/timeout.cpp
@@ -1,7 +1,11 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <vespa/fnet/fnet.h>
+#include <vespa/fnet/transport.h>
+#include <vespa/fnet/signalshutdown.h>
+#include <vespa/fnet/packetqueue.h>
+#include <vespa/fnet/controlpacket.h>
#include <vespa/fastos/app.h>
+#include <vespa/fastos/thread.h>
#include <vespa/vespalib/util/time.h>
#include <thread>
diff --git a/fnet/src/tests/connect/connect_test.cpp b/fnet/src/tests/connect/connect_test.cpp
index 3fe7b5b7614..2b4a2bbe9f0 100644
--- a/fnet/src/tests/connect/connect_test.cpp
+++ b/fnet/src/tests/connect/connect_test.cpp
@@ -1,7 +1,12 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/vespalib/testkit/test_kit.h>
-#include <vespa/fnet/fnet.h>
+#include <vespa/fnet/transport.h>
+#include <vespa/fnet/transport_thread.h>
+#include <vespa/fnet/simplepacketstreamer.h>
+#include <vespa/fnet/ipackethandler.h>
+#include <vespa/fnet/connection.h>
+#include <vespa/fnet/controlpacket.h>
#include <vespa/vespalib/net/server_socket.h>
#include <vespa/vespalib/net/crypto_engine.h>
#include <vespa/vespalib/util/stringfmt.h>
@@ -90,13 +95,13 @@ struct TransportFixture : FNET_IPacketHandler, FNET_IConnectionCleanupHandler {
transport.Start(&pool);
}
TransportFixture(AsyncResolver::HostResolver::SP host_resolver)
- : streamer(nullptr), pool(128 * 1024), transport(make_resolver(std::move(host_resolver)), 1),
+ : streamer(nullptr), pool(128 * 1024), transport(TransportConfig().resolver(make_resolver(std::move(host_resolver)))),
conn_lost(), conn_deleted()
{
transport.Start(&pool);
}
TransportFixture(CryptoEngine::SP crypto)
- : streamer(nullptr), pool(128 * 1024), transport(crypto, 1),
+ : streamer(nullptr), pool(128 * 1024), transport(TransportConfig().crypto(std::move(crypto))),
conn_lost(), conn_deleted()
{
transport.Start(&pool);
@@ -114,7 +119,7 @@ struct TransportFixture : FNET_IPacketHandler, FNET_IConnectionCleanupHandler {
conn->SetCleanupHandler(this);
return conn;
}
- ~TransportFixture() {
+ ~TransportFixture() override {
transport.ShutDown(true);
pool.Close();
}
diff --git a/fnet/src/tests/connection_spread/connection_spread_test.cpp b/fnet/src/tests/connection_spread/connection_spread_test.cpp
index 11120ebc3dc..caeb4211ab2 100644
--- a/fnet/src/tests/connection_spread/connection_spread_test.cpp
+++ b/fnet/src/tests/connection_spread/connection_spread_test.cpp
@@ -1,6 +1,12 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/vespalib/testkit/test_kit.h>
-#include <vespa/fnet/fnet.h>
+#include <vespa/fnet/transport.h>
+#include <vespa/fnet/transport_thread.h>
+#include <vespa/fnet/iserveradapter.h>
+#include <vespa/fnet/ipacketstreamer.h>
+#include <vespa/fnet/connector.h>
+#include <vespa/fnet/connection.h>
+#include <vespa/fastos/thread.h>
#include <vespa/vespalib/util/stringfmt.h>
#include <thread>
#include <chrono>
diff --git a/fnet/src/tests/frt/method_pt/method_pt.cpp b/fnet/src/tests/frt/method_pt/method_pt.cpp
index 53960d73466..450731fe1aa 100644
--- a/fnet/src/tests/frt/method_pt/method_pt.cpp
+++ b/fnet/src/tests/frt/method_pt/method_pt.cpp
@@ -1,8 +1,9 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/vespalib/testkit/test_kit.h>
#include <vespa/vespalib/util/stringfmt.h>
-#include <vespa/fnet/frt/frt.h>
-
+#include <vespa/fnet/frt/supervisor.h>
+#include <vespa/fnet/frt/target.h>
+#include <vespa/fnet/frt/rpcrequest.h>
class Test;
class SimpleHandler;
diff --git a/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp b/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp
index ed4911175a0..2577b4e6155 100644
--- a/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp
+++ b/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp
@@ -1,7 +1,10 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/vespalib/testkit/test_kit.h>
-#include <vespa/vespalib/util/stringfmt.h>
-#include <vespa/fnet/frt/frt.h>
+#include <vespa/fnet/frt/supervisor.h>
+#include <vespa/fnet/frt/rpcrequest.h>
+#include <vespa/fnet/frt/target.h>
+#include <vespa/fnet/transport.h>
+#include <vespa/fastos/thread.h>
#include <vespa/vespalib/util/benchmark_timer.h>
#include <vespa/vespalib/net/crypto_engine.h>
#include <vespa/vespalib/net/tls/tls_crypto_engine.h>
@@ -15,7 +18,7 @@ struct Rpc : FRT_Invokable {
FNET_Transport transport;
FRT_Supervisor orb;
Rpc(CryptoEngine::SP crypto, size_t num_threads)
- : thread_pool(128 * 1024), transport(crypto, num_threads), orb(&transport) {}
+ : thread_pool(128 * 1024), transport(TransportConfig(num_threads).crypto(std::move(crypto))), orb(&transport) {}
void start() {
ASSERT_TRUE(transport.Start(&thread_pool));
}
@@ -26,7 +29,7 @@ struct Rpc : FRT_Invokable {
FRT_Target *connect(uint32_t port) {
return orb.GetTarget(port);
}
- ~Rpc() {
+ ~Rpc() override {
transport.ShutDown(true);
thread_pool.Close();
}
@@ -34,7 +37,7 @@ struct Rpc : FRT_Invokable {
struct Server : Rpc {
uint32_t port;
- Server(CryptoEngine::SP crypto, size_t num_threads) : Rpc(crypto, num_threads), port(listen()) {
+ Server(CryptoEngine::SP crypto, size_t num_threads) : Rpc(std::move(crypto), num_threads), port(listen()) {
init_rpc();
start();
}
@@ -54,7 +57,7 @@ struct Server : Rpc {
struct Client : Rpc {
uint32_t port;
- Client(CryptoEngine::SP crypto, size_t num_threads, const Server &server) : Rpc(crypto, num_threads), port(server.port) {
+ Client(CryptoEngine::SP crypto, size_t num_threads, const Server &server) : Rpc(std::move(crypto), num_threads), port(server.port) {
start();
}
FRT_Target *connect() { return Rpc::connect(port); }
@@ -62,7 +65,7 @@ struct Client : Rpc {
struct Result {
std::vector<double> req_per_sec;
- Result(size_t num_threads) : req_per_sec(num_threads, 0.0) {}
+ explicit Result(size_t num_threads) : req_per_sec(num_threads, 0.0) {}
double throughput() const {
double sum = 0.0;
for (double sample: req_per_sec) {
diff --git a/fnet/src/tests/frt/parallel_rpc/tls_rpc_bench.cpp b/fnet/src/tests/frt/parallel_rpc/tls_rpc_bench.cpp
index cdb2636a8c1..8a954db26e0 100644
--- a/fnet/src/tests/frt/parallel_rpc/tls_rpc_bench.cpp
+++ b/fnet/src/tests/frt/parallel_rpc/tls_rpc_bench.cpp
@@ -6,7 +6,9 @@
#include <vespa/vespalib/net/tls/tls_crypto_engine.h>
#include <vespa/vespalib/test/make_tls_options_for_testing.h>
#include <vespa/vespalib/test/time_tracer.h>
-#include <vespa/fnet/frt/frt.h>
+#include <vespa/fnet/frt/supervisor.h>
+#include <vespa/fnet/frt/target.h>
+#include <vespa/fnet/frt/rpcrequest.h>
#include <thread>
#include <chrono>
diff --git a/fnet/src/tests/frt/rpc/detach_return_invoke.cpp b/fnet/src/tests/frt/rpc/detach_return_invoke.cpp
index 95dbe672909..68bd58d2f82 100644
--- a/fnet/src/tests/frt/rpc/detach_return_invoke.cpp
+++ b/fnet/src/tests/frt/rpc/detach_return_invoke.cpp
@@ -1,6 +1,9 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/vespalib/testkit/test_kit.h>
-#include <vespa/fnet/frt/frt.h>
+#include <vespa/fnet/frt/supervisor.h>
+#include <vespa/fnet/frt/target.h>
+#include <vespa/fnet/frt/rpcrequest.h>
+#include <vespa/fnet/frt/invoker.h>
#include <vespa/vespalib/util/stringfmt.h>
struct Receptor : public FRT_IRequestWait
diff --git a/fnet/src/tests/frt/rpc/invoke.cpp b/fnet/src/tests/frt/rpc/invoke.cpp
index 410a60fa08a..bd2f6fa9e1d 100644
--- a/fnet/src/tests/frt/rpc/invoke.cpp
+++ b/fnet/src/tests/frt/rpc/invoke.cpp
@@ -3,7 +3,10 @@
#include <vespa/vespalib/net/socket_spec.h>
#include <vespa/vespalib/util/benchmark_timer.h>
#include <vespa/vespalib/util/latch.h>
-#include <vespa/fnet/frt/frt.h>
+#include <vespa/fnet/frt/supervisor.h>
+#include <vespa/fnet/frt/target.h>
+#include <vespa/fnet/frt/rpcrequest.h>
+#include <vespa/fnet/frt/invoker.h>
#include <mutex>
#include <condition_variable>
@@ -25,7 +28,7 @@ private:
vespalib::Latch<FRT_RPCRequest*> _latch;
public:
RequestLatch() : _latch() {}
- ~RequestLatch() { ASSERT_TRUE(!has_req()); }
+ ~RequestLatch() override { ASSERT_TRUE(!has_req()); }
bool has_req() { return _latch.has_value(); }
FRT_RPCRequest *read() { return _latch.read(); }
void write(FRT_RPCRequest *req) { _latch.write(req); }
@@ -38,8 +41,8 @@ class MyReq {
private:
FRT_RPCRequest *_req;
public:
- MyReq(FRT_RPCRequest *req) : _req(req) {}
- MyReq(const char *method_name)
+ explicit MyReq(FRT_RPCRequest *req) : _req(req) {}
+ explicit MyReq(const char *method_name)
: _req(new FRT_RPCRequest())
{
_req->SetMethodName(method_name);
@@ -270,8 +273,6 @@ public:
_testRPC(&_server.supervisor()),
_echoTest(&_server.supervisor())
{
- _client.supervisor().GetTransport()->SetTCPNoDelay(true);
- _server.supervisor().GetTransport()->SetTCPNoDelay(true);
ASSERT_TRUE(_server.supervisor().Listen("tcp/0"));
_peerSpec = SocketSpec::from_host_port("localhost", _server.supervisor().GetListenPort()).spec();
_target = _client.supervisor().GetTarget(_peerSpec.c_str());
diff --git a/fnet/src/tests/frt/rpc/session.cpp b/fnet/src/tests/frt/rpc/session.cpp
index 24cbedb3ff7..b96c881ba27 100644
--- a/fnet/src/tests/frt/rpc/session.cpp
+++ b/fnet/src/tests/frt/rpc/session.cpp
@@ -1,7 +1,9 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/vespalib/testkit/test_kit.h>
-#include <vespa/fnet/frt/frt.h>
+#include <vespa/fnet/frt/supervisor.h>
+#include <vespa/fnet/frt/target.h>
+#include <vespa/fnet/frt/rpcrequest.h>
#include <mutex>
//-------------------------------------------------------------
diff --git a/fnet/src/tests/frt/rpc/sharedblob.cpp b/fnet/src/tests/frt/rpc/sharedblob.cpp
index 1c0503454c7..09297bbf1c3 100644
--- a/fnet/src/tests/frt/rpc/sharedblob.cpp
+++ b/fnet/src/tests/frt/rpc/sharedblob.cpp
@@ -1,7 +1,9 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/vespalib/testkit/test_kit.h>
-#include <vespa/fnet/frt/frt.h>
+#include <vespa/fnet/frt/supervisor.h>
+#include <vespa/fnet/frt/target.h>
+#include <vespa/fnet/frt/rpcrequest.h>
#include <vector>
constexpr size_t ALLOC_LIMIT=1024;
diff --git a/fnet/src/tests/info/info.cpp b/fnet/src/tests/info/info.cpp
index 1d26f2b0fa1..3422efe1da6 100644
--- a/fnet/src/tests/info/info.cpp
+++ b/fnet/src/tests/info/info.cpp
@@ -1,6 +1,10 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/vespalib/testkit/test_kit.h>
-#include <vespa/fnet/frt/frt.h>
+#include <vespa/fnet/frt/supervisor.h>
+#include <vespa/fnet/frt/rpcrequest.h>
+#include <vespa/fnet/frt/target.h>
+#include <vespa/fnet/channel.h>
+#include <vespa/fnet/info.h>
#include <mutex>
#include <condition_variable>
diff --git a/fnet/src/tests/locking/castspeed.cpp b/fnet/src/tests/locking/castspeed.cpp
index dc82073ea57..2f784e625be 100644
--- a/fnet/src/tests/locking/castspeed.cpp
+++ b/fnet/src/tests/locking/castspeed.cpp
@@ -1,6 +1,5 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/vespalib/testkit/test_kit.h>
-#include <vespa/fnet/fnet.h>
#include <chrono>
class B;
diff --git a/fnet/src/tests/locking/drainpackets.cpp b/fnet/src/tests/locking/drainpackets.cpp
index 9db43a5eb52..2d0ab6e2808 100644
--- a/fnet/src/tests/locking/drainpackets.cpp
+++ b/fnet/src/tests/locking/drainpackets.cpp
@@ -1,6 +1,7 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/vespalib/testkit/test_kit.h>
-#include <vespa/fnet/fnet.h>
+#include <vespa/fnet/packetqueue.h>
+#include <vespa/fnet/packet.h>
#include <mutex>
#include <chrono>
diff --git a/fnet/src/tests/locking/lockspeed.cpp b/fnet/src/tests/locking/lockspeed.cpp
index ae6b983d724..a3ab48edd22 100644
--- a/fnet/src/tests/locking/lockspeed.cpp
+++ b/fnet/src/tests/locking/lockspeed.cpp
@@ -1,6 +1,5 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/vespalib/testkit/test_kit.h>
-#include <vespa/fnet/fnet.h>
#include "dummy.h"
#include <chrono>
diff --git a/fnet/src/tests/scheduling/schedule.cpp b/fnet/src/tests/scheduling/schedule.cpp
index a1b02a688bf..fc5b5687c87 100644
--- a/fnet/src/tests/scheduling/schedule.cpp
+++ b/fnet/src/tests/scheduling/schedule.cpp
@@ -1,6 +1,7 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/vespalib/testkit/test_kit.h>
-#include <vespa/fnet/fnet.h>
+#include <vespa/fnet/scheduler.h>
+#include <vespa/fnet/task.h>
using my_clock = FNET_Scheduler::clock;
using time_point = my_clock::time_point;
diff --git a/fnet/src/tests/scheduling/sloweventloop.cpp b/fnet/src/tests/scheduling/sloweventloop.cpp
index 5ddc9b1502c..a36afbc9d92 100644
--- a/fnet/src/tests/scheduling/sloweventloop.cpp
+++ b/fnet/src/tests/scheduling/sloweventloop.cpp
@@ -1,6 +1,7 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/vespalib/testkit/test_kit.h>
-#include <vespa/fnet/fnet.h>
+#include <vespa/fnet/scheduler.h>
+#include <vespa/fnet/task.h>
class MyTask : public FNET_Task
{
diff --git a/fnet/src/tests/thread_selection/thread_selection_test.cpp b/fnet/src/tests/thread_selection/thread_selection_test.cpp
index ecfe0c57d5c..35df07221a2 100644
--- a/fnet/src/tests/thread_selection/thread_selection_test.cpp
+++ b/fnet/src/tests/thread_selection/thread_selection_test.cpp
@@ -1,6 +1,6 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/vespalib/testkit/test_kit.h>
-#include <vespa/fnet/fnet.h>
+#include <vespa/fnet/transport.h>
#include <vespa/vespalib/util/stringfmt.h>
#include <thread>
#include <chrono>
diff --git a/fnet/src/tests/time/timespeed.cpp b/fnet/src/tests/time/timespeed.cpp
index e6d2af5a278..eda367f1990 100644
--- a/fnet/src/tests/time/timespeed.cpp
+++ b/fnet/src/tests/time/timespeed.cpp
@@ -1,6 +1,5 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/vespalib/testkit/test_kit.h>
-#include <vespa/fnet/fnet.h>
#include <vespa/vespalib/util/benchmark_timer.h>
#include <chrono>
diff --git a/fnet/src/vespa/fnet/config.cpp b/fnet/src/vespa/fnet/config.cpp
index a546d38f78b..aee9ac6f2c1 100644
--- a/fnet/src/vespa/fnet/config.cpp
+++ b/fnet/src/vespa/fnet/config.cpp
@@ -3,7 +3,8 @@
#include "config.h"
FNET_Config::FNET_Config()
- : _iocTimeOut(0),
+ : _events_before_wakeup(1),
+ _iocTimeOut(0),
_maxInputBufferSize(0x10000),
_maxOutputBufferSize(0x10000),
_tcpNoDelay(true)
diff --git a/fnet/src/vespa/fnet/config.h b/fnet/src/vespa/fnet/config.h
index 3f34c1511b6..f512d7cde75 100644
--- a/fnet/src/vespa/fnet/config.h
+++ b/fnet/src/vespa/fnet/config.h
@@ -11,6 +11,7 @@
class FNET_Config
{
public:
+ uint32_t _events_before_wakeup;
uint32_t _iocTimeOut;
uint32_t _maxInputBufferSize;
uint32_t _maxOutputBufferSize;
diff --git a/fnet/src/vespa/fnet/connection.cpp b/fnet/src/vespa/fnet/connection.cpp
index c5afd627a5a..a2e6fe25edc 100644
--- a/fnet/src/vespa/fnet/connection.cpp
+++ b/fnet/src/vespa/fnet/connection.cpp
@@ -347,7 +347,7 @@ done_read:
}
UpdateTimeOut();
- uint32_t maxSize = GetConfig()->_maxInputBufferSize;
+ uint32_t maxSize = getConfig()._maxInputBufferSize;
if (maxSize > 0 && _input.GetBufSize() > maxSize)
{
if (!_flags._gotheader || _packetLength < maxSize) {
@@ -430,7 +430,7 @@ FNET_Connection::Write()
}
}
- uint32_t maxSize = GetConfig()->_maxOutputBufferSize;
+ uint32_t maxSize = getConfig()._maxOutputBufferSize;
if (maxSize > 0 && _output.GetBufSize() > maxSize) {
_output.Shrink(maxSize);
}
diff --git a/fnet/src/vespa/fnet/fnet.h b/fnet/src/vespa/fnet/fnet.h
deleted file mode 100644
index c7570e025ec..00000000000
--- a/fnet/src/vespa/fnet/fnet.h
+++ /dev/null
@@ -1,68 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#pragma once
-
-#include <vespa/vespalib/component/vtag.h>
-
-// DEPRECATED
-
-#define DEPRECATED __attribute__((deprecated))
-
-// FORWARD DECLARATIONS
-
-class FNET_IPacketFactory;
-class FNET_IPacketHandler;
-class FNET_IPacketStreamer;
-class FNET_IServerAdapter;
-class FNET_IExecutable;
-
-class FNET_Channel;
-class FNET_ChannelLookup;
-class FNET_ChannelPool;
-class FNET_Config;
-class FNET_Connection;
-class FNET_Connector;
-class FNET_Context;
-class FNET_ControlPacket;
-class FNET_DataBuffer;
-class FNET_DummyPacket;
-class FNET_Info;
-class FNET_IOComponent;
-class FNET_Packet;
-class FNET_PacketQueue;
-class FNET_Scheduler;
-class FNET_SimplePacketStreamer;
-class FNET_Task;
-class FNET_Transport;
-class FNET_TransportThread;
-
-// CONTEXT CLASS (union of types)
-#include "context.h"
-
-// INTERFACES
-#include "ipacketfactory.h"
-#include "ipackethandler.h"
-#include "ipacketstreamer.h"
-#include "iserveradapter.h"
-#include "iexecutable.h"
-
-// CLASSES
-#include "task.h"
-#include "scheduler.h"
-#include "config.h"
-#include "databuffer.h"
-#include "packet.h"
-#include "dummypacket.h"
-#include "controlpacket.h"
-#include "packetqueue.h"
-#include "channel.h"
-#include "channellookup.h"
-#include "simplepacketstreamer.h"
-#include "transport_thread.h"
-#include "iocomponent.h"
-#include "transport.h"
-#include "connection.h"
-#include "connector.h"
-#include "info.h"
-#include "signalshutdown.h"
-
diff --git a/fnet/src/vespa/fnet/frt/frt.h b/fnet/src/vespa/fnet/frt/frt.h
deleted file mode 100644
index 490b9b0a2b1..00000000000
--- a/fnet/src/vespa/fnet/frt/frt.h
+++ /dev/null
@@ -1,35 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#pragma once
-
-class FRT_Invokable;
-class FRT_IAbortHandler;
-class FRT_IReturnHandler;
-class FRT_ICleanupHandler;
-class FRT_ISharedBlob;
-
-class FRT_Method;
-class FRT_PacketFactory;
-class FRT_ReflectionBuilder;
-class FRT_ReflectionManager;
-class FRT_RPCErrorPacket;
-class FRT_RPCInvoker;
-class FRT_RPCReplyPacket;
-class FRT_RPCRequest;
-class FRT_RPCRequestPacket;
-class FRT_Supervisor;
-class FRT_Target;
-class FRT_Values;
-
-#include <vespa/fnet/fnet.h>
-#include "error.h"
-#include "isharedblob.h"
-#include "invokable.h"
-#include "values.h"
-#include "reflection.h"
-#include "rpcrequest.h"
-#include "packets.h"
-#include "invoker.h"
-#include "supervisor.h"
-#include "target.h"
-
diff --git a/fnet/src/vespa/fnet/frt/supervisor.cpp b/fnet/src/vespa/fnet/frt/supervisor.cpp
index 7e92855583e..388d754ece4 100644
--- a/fnet/src/vespa/fnet/frt/supervisor.cpp
+++ b/fnet/src/vespa/fnet/frt/supervisor.cpp
@@ -419,7 +419,7 @@ StandaloneFRT::StandaloneFRT()
StandaloneFRT::StandaloneFRT(vespalib::CryptoEngine::SP crypto)
: _threadPool(std::make_unique<FastOS_ThreadPool>(1024*128)),
- _transport(std::make_unique<FNET_Transport>(std::move(crypto), 1)),
+ _transport(std::make_unique<FNET_Transport>(TransportConfig().crypto(std::move(crypto)))),
_supervisor(std::make_unique<FRT_Supervisor>(_transport.get()))
{
_transport->Start(_threadPool.get());
diff --git a/fnet/src/vespa/fnet/iocomponent.cpp b/fnet/src/vespa/fnet/iocomponent.cpp
index 81d1bf47567..7afab188b66 100644
--- a/fnet/src/vespa/fnet/iocomponent.cpp
+++ b/fnet/src/vespa/fnet/iocomponent.cpp
@@ -32,9 +32,9 @@ FNET_IOComponent::~FNET_IOComponent()
assert(_ioc_selector == nullptr);
}
-FNET_Config *
-FNET_IOComponent::GetConfig() {
- return _ioc_owner->GetConfig();
+const FNET_Config &
+FNET_IOComponent::getConfig() const {
+ return _ioc_owner->getConfig();
}
void
diff --git a/fnet/src/vespa/fnet/iocomponent.h b/fnet/src/vespa/fnet/iocomponent.h
index 84e3c8bd412..5fd5079a8af 100644
--- a/fnet/src/vespa/fnet/iocomponent.h
+++ b/fnet/src/vespa/fnet/iocomponent.h
@@ -138,7 +138,7 @@ public:
*
* @return config object.
**/
- FNET_Config *GetConfig();
+ const FNET_Config & getConfig() const;
/**
diff --git a/fnet/src/vespa/fnet/transport.cpp b/fnet/src/vespa/fnet/transport.cpp
index e59186069ce..ce5f44efb7c 100644
--- a/fnet/src/vespa/fnet/transport.cpp
+++ b/fnet/src/vespa/fnet/transport.cpp
@@ -25,15 +25,33 @@ VESPA_THREAD_STACK_TAG(fnet_work_pool);
} // namespace <unnamed>
-FNET_Transport::FNET_Transport(vespalib::AsyncResolver::SP resolver, vespalib::CryptoEngine::SP crypto, size_t num_threads)
- : _async_resolver(std::move(resolver)),
- _crypto_engine(std::move(crypto)),
+TransportConfig::TransportConfig(int num_threads)
+ : _config(),
+ _resolver(),
+ _crypto(),
+ _num_threads(num_threads)
+{}
+
+TransportConfig::~TransportConfig() = default;
+
+vespalib::AsyncResolver::SP
+TransportConfig::resolver() const {
+ return _resolver ? _resolver : vespalib::AsyncResolver::get_shared();
+}
+vespalib::CryptoEngine::SP
+TransportConfig::crypto() const {
+ return _crypto ? _crypto : vespalib::CryptoEngine::get_default();
+}
+
+FNET_Transport::FNET_Transport(TransportConfig cfg)
+ : _async_resolver(cfg.resolver()),
+ _crypto_engine(cfg.crypto()),
_work_pool(std::make_unique<vespalib::ThreadStackExecutor>(1, 128 * 1024, fnet_work_pool, 1024)),
_threads(),
- _events_before_wakeup(1)
+ _config(cfg.config())
{
- assert(num_threads >= 1);
- for (size_t i = 0; i < num_threads; ++i) {
+ assert(cfg.num_threads() >= 1);
+ for (size_t i = 0; i < cfg.num_threads(); ++i) {
_threads.emplace_back(std::make_unique<FNET_TransportThread>(*this));
}
}
@@ -104,38 +122,6 @@ FNET_Transport::GetNumIOComponents()
}
void
-FNET_Transport::SetIOCTimeOut(uint32_t ms)
-{
- for (const auto &thread: _threads) {
- thread->SetIOCTimeOut(ms);
- }
-}
-
-void
-FNET_Transport::SetMaxInputBufferSize(uint32_t bytes)
-{
- for (const auto &thread: _threads) {
- thread->SetMaxInputBufferSize(bytes);
- }
-}
-
-void
-FNET_Transport::SetMaxOutputBufferSize(uint32_t bytes)
-{
- for (const auto &thread: _threads) {
- thread->SetMaxOutputBufferSize(bytes);
- }
-}
-
-void
-FNET_Transport::SetTCPNoDelay(bool noDelay)
-{
- for (const auto &thread: _threads) {
- thread->SetTCPNoDelay(noDelay);
- }
-}
-
-void
FNET_Transport::sync()
{
for (const auto &thread: _threads) {
diff --git a/fnet/src/vespa/fnet/transport.h b/fnet/src/vespa/fnet/transport.h
index 1126f68e69f..766eaa3ccaa 100644
--- a/fnet/src/vespa/fnet/transport.h
+++ b/fnet/src/vespa/fnet/transport.h
@@ -3,6 +3,7 @@
#pragma once
#include "context.h"
+#include "config.h"
#include <memory>
#include <vector>
#include <vespa/vespalib/net/async_resolver.h>
@@ -16,6 +17,48 @@ class FNET_IServerAdapter;
class FNET_IPacketHandler;
class FNET_Scheduler;
+class TransportConfig {
+public:
+ TransportConfig() : TransportConfig(1) {}
+ explicit TransportConfig(int num_threads);
+ ~TransportConfig();
+ vespalib::AsyncResolver::SP resolver() const;
+ vespalib::CryptoEngine::SP crypto() const;
+ TransportConfig & resolver(vespalib::AsyncResolver::SP resolver_in) {
+ _resolver = std::move(resolver_in);
+ return *this;
+ }
+ TransportConfig & crypto(vespalib::CryptoEngine::SP crypto_in) {
+ _crypto = std::move(crypto_in);
+ return *this;
+ }
+ const FNET_Config & config() const { return _config; }
+ uint32_t num_threads() const { return _num_threads; }
+
+ TransportConfig & events_before_wakeup(uint32_t v) {
+ if (v > 1) {
+ _config._events_before_wakeup = v;
+ }
+ return *this;
+ }
+ TransportConfig & maxInputBufferSize(uint32_t v) {
+ _config._maxInputBufferSize = v;
+ return *this;
+ }
+ TransportConfig & maxOutputBufferSize(uint32_t v) {
+ _config._maxOutputBufferSize = v;
+ return *this;
+ }
+ TransportConfig & tcpNoDelay(bool v) {
+ _config._tcpNoDelay = v;
+ return *this;
+ }
+private:
+ FNET_Config _config;
+ vespalib::AsyncResolver::SP _resolver;
+ vespalib::CryptoEngine::SP _crypto;
+ uint32_t _num_threads;
+};
/**
* This class represents the transport layer and handles a collection
* of transport threads. Note: remember to shut down your transport
@@ -28,12 +71,14 @@ private:
using Threads = std::vector<Thread>;
vespalib::AsyncResolver::SP _async_resolver;
- vespalib::CryptoEngine::SP _crypto_engine;
+ vespalib::CryptoEngine::SP _crypto_engine;
std::unique_ptr<vespalib::SyncableThreadExecutor> _work_pool;
- Threads _threads;
- size_t _events_before_wakeup;
+ Threads _threads;
+ const FNET_Config _config;
public:
+ FNET_Transport(const FNET_Transport &) = delete;
+ FNET_Transport & operator = (const FNET_Transport &) = delete;
/**
* Construct a transport layer. To activate your newly created
* transport object you need to call either the Start method to
@@ -41,22 +86,15 @@ public:
* the current thread become the transport thread. Main may only
* be called for single-threaded transports.
**/
- FNET_Transport(vespalib::AsyncResolver::SP resolver, vespalib::CryptoEngine::SP crypto, size_t num_threads);
+ explicit FNET_Transport(TransportConfig config);
- FNET_Transport(vespalib::AsyncResolver::SP resolver, size_t num_threads)
- : FNET_Transport(std::move(resolver), vespalib::CryptoEngine::get_default(), num_threads) {}
- FNET_Transport(vespalib::CryptoEngine::SP crypto, size_t num_threads)
- : FNET_Transport(vespalib::AsyncResolver::get_shared(), std::move(crypto), num_threads) {}
- FNET_Transport(size_t num_threads)
- : FNET_Transport(vespalib::AsyncResolver::get_shared(), vespalib::CryptoEngine::get_default(), num_threads) {}
+ explicit FNET_Transport(uint32_t num_threads)
+ : FNET_Transport(TransportConfig(num_threads)) {}
FNET_Transport()
- : FNET_Transport(vespalib::AsyncResolver::get_shared(), vespalib::CryptoEngine::get_default(), 1) {}
+ : FNET_Transport(TransportConfig()) {}
~FNET_Transport();
- size_t events_before_wakeup() const { return _events_before_wakeup; }
- void events_before_wakeup(size_t events_before_wakeup_in) {
- _events_before_wakeup = events_before_wakeup_in;
- }
+ const FNET_Config & getConfig() const { return _config; }
/**
* Try to execute the given task on the internal work pool
@@ -173,54 +211,6 @@ public:
uint32_t GetNumIOComponents();
/**
- * Set the I/O Component timeout. Idle I/O Components with timeout
- * enabled (determined by calling the ShouldTimeOut method) will
- * time out if idle for the given number of milliseconds. An I/O
- * component reports its un-idle-ness by calling the UpdateTimeOut
- * method in the owning transport object. Calling this method with 0
- * as parameter will disable I/O Component timeouts. Note that newly
- * created transport objects begin their lives with I/O Component
- * timeouts disabled. An I/O Component timeout has the same effect
- * as calling the Close method in the transport object with the
- * target I/O Component as parameter.
- *
- * @param ms number of milliseconds before IOC idle timeout occurs.
- **/
- void SetIOCTimeOut(uint32_t ms);
-
- /**
- * Set maximum input buffer size. This value will only affect
- * connections that use a common input buffer when decoding
- * incoming packets. Note that this value is not an absolute
- * max. The buffer will still grow larger than this value if
- * needed to decode big packets. However, when the buffer becomes
- * larger than this value, it will be shrunk back when possible.
- *
- * @param bytes buffer size in bytes. 0 means unlimited.
- **/
- void SetMaxInputBufferSize(uint32_t bytes);
-
- /**
- * Set maximum output buffer size. This value will only affect
- * connections that use a common output buffer when encoding
- * outgoing packets. Note that this value is not an absolute
- * max. The buffer will still grow larger than this value if needed
- * to encode big packets. However, when the buffer becomes larger
- * than this value, it will be shrunk back when possible.
- *
- * @param bytes buffer size in bytes. 0 means unlimited.
- **/
- void SetMaxOutputBufferSize(uint32_t bytes);
-
- /**
- * Enable or disable use of the TCP_NODELAY flag with sockets
- * created by this transport object.
- *
- * @param noDelay true if TCP_NODELAY flag should be used.
- **/
- void SetTCPNoDelay(bool noDelay);
-
- /**
* Synchronize with all transport threads. This method will block
* until all events posted before this method was invoked has been
* processed. If a transport thread has been shut down (or is in
diff --git a/fnet/src/vespa/fnet/transport_thread.cpp b/fnet/src/vespa/fnet/transport_thread.cpp
index d61eaffa24f..6b66b1ebe4b 100644
--- a/fnet/src/vespa/fnet/transport_thread.cpp
+++ b/fnet/src/vespa/fnet/transport_thread.cpp
@@ -125,7 +125,7 @@ FNET_TransportThread::PostEvent(FNET_ControlPacket *cpacket,
_queue.QueuePacket_NoLock(cpacket, context);
qLen = _queue.GetPacketCnt_NoLock();
}
- if (qLen == _owner.events_before_wakeup()) {
+ if (qLen == getConfig()._events_before_wakeup) {
_selector.wakeup();
}
return true;
@@ -209,7 +209,6 @@ FNET_TransportThread::FNET_TransportThread(FNET_Transport &owner_in)
: _owner(owner_in),
_now(clock::now()),
_scheduler(&_now),
- _config(),
_componentsHead(nullptr),
_timeOutHead(nullptr),
_componentsTail(nullptr),
@@ -242,13 +241,17 @@ FNET_TransportThread::~FNET_TransportThread()
}
}
+const FNET_Config &
+FNET_TransportThread::getConfig() const {
+ return _owner.getConfig();
+}
bool
FNET_TransportThread::tune(SocketHandle &handle) const
{
handle.set_keepalive(true);
handle.set_linger(true, 0);
- handle.set_nodelay(_config._tcpNoDelay);
+ handle.set_nodelay(getConfig()._tcpNoDelay);
return handle.set_blocking(false);
}
@@ -486,8 +489,8 @@ FNET_TransportThread::EventLoopIteration()
_selector.dispatch(*this);
// handle IOC time-outs
- if (_config._iocTimeOut > 0) {
- time_point oldest = (_now - std::chrono::milliseconds(_config._iocTimeOut));
+ if (getConfig()._iocTimeOut > 0) {
+ time_point oldest = (_now - std::chrono::milliseconds(getConfig()._iocTimeOut));
while (_timeOutHead != nullptr &&
oldest > _timeOutHead->_ioc_timestamp) {
diff --git a/fnet/src/vespa/fnet/transport_thread.h b/fnet/src/vespa/fnet/transport_thread.h
index b4319d4e2bc..c649a677cb4 100644
--- a/fnet/src/vespa/fnet/transport_thread.h
+++ b/fnet/src/vespa/fnet/transport_thread.h
@@ -37,7 +37,6 @@ private:
FNET_Transport &_owner; // owning transport layer
time_point _now; // current time sampler
FNET_Scheduler _scheduler; // transport thread scheduler
- FNET_Config _config; // FNET configuration [static]
FNET_IOComponent *_componentsHead; // I/O component list head
FNET_IOComponent *_timeOutHead; // first IOC in list to time out
FNET_IOComponent *_componentsTail; // I/O component list tail
@@ -140,7 +139,7 @@ private:
*
* @return config object.
**/
- FNET_Config *GetConfig() { return &_config; }
+ const FNET_Config & getConfig() const;
void handle_add_cmd(FNET_IOComponent *ioc);
@@ -185,7 +184,7 @@ public:
*
* @param owner owning transport layer
**/
- FNET_TransportThread(FNET_Transport &owner_in);
+ explicit FNET_TransportThread(FNET_Transport &owner_in);
/**
@@ -269,60 +268,6 @@ public:
**/
uint32_t GetNumIOComponents() { return _componentCnt; }
-
- /**
- * Set the I/O Component timeout. Idle I/O Components with timeout
- * enabled (determined by calling the ShouldTimeOut method) will
- * time out if idle for the given number of milliseconds. An I/O
- * component reports its un-idle-ness by calling the UpdateTimeOut
- * method in the owning transport object. Calling this method with 0
- * as parameter will disable I/O Component timeouts. Note that newly
- * created transport objects begin their lives with I/O Component
- * timeouts disabled. An I/O Component timeout has the same effect
- * as calling the Close method in the transport object with the
- * target I/O Component as parameter.
- *
- * @param ms number of milliseconds before IOC idle timeout occurs.
- **/
- void SetIOCTimeOut(uint32_t ms) { _config._iocTimeOut = ms; }
-
-
- /**
- * Set maximum input buffer size. This value will only affect
- * connections that use a common input buffer when decoding
- * incoming packets. Note that this value is not an absolute
- * max. The buffer will still grow larger than this value if
- * needed to decode big packets. However, when the buffer becomes
- * larger than this value, it will be shrunk back when possible.
- *
- * @param bytes buffer size in bytes. 0 means unlimited.
- **/
- void SetMaxInputBufferSize(uint32_t bytes)
- { _config._maxInputBufferSize = bytes; }
-
-
- /**
- * Set maximum output buffer size. This value will only affect
- * connections that use a common output buffer when encoding
- * outgoing packets. Note that this value is not an absolute
- * max. The buffer will still grow larger than this value if needed
- * to encode big packets. However, when the buffer becomes larger
- * than this value, it will be shrunk back when possible.
- *
- * @param bytes buffer size in bytes. 0 means unlimited.
- **/
- void SetMaxOutputBufferSize(uint32_t bytes)
- { _config._maxOutputBufferSize = bytes; }
-
- /**
- * Enable or disable use of the TCP_NODELAY flag with sockets
- * created by this transport object.
- *
- * @param noDelay true if TCP_NODELAY flag should be used.
- **/
- void SetTCPNoDelay(bool noDelay) { _config._tcpNoDelay = noDelay; }
-
-
/**
* Add an I/O component to the working set of this transport
* object. Note that the actual work is performed by the transport
diff --git a/jrt_test/src/jrt-test/simpleserver/simpleserver.cpp b/jrt_test/src/jrt-test/simpleserver/simpleserver.cpp
index ff6c84b5b18..cf23a027905 100644
--- a/jrt_test/src/jrt-test/simpleserver/simpleserver.cpp
+++ b/jrt_test/src/jrt-test/simpleserver/simpleserver.cpp
@@ -1,6 +1,8 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <vespa/fnet/frt/frt.h>
+#include <vespa/fnet/frt/supervisor.h>
+#include <vespa/fnet/frt/rpcrequest.h>
+#include <vespa/fnet/transport.h>
#include <vespa/fastos/app.h>
class Server : public FRT_Invokable
diff --git a/jrt_test/src/tests/echo/echo-client.cpp b/jrt_test/src/tests/echo/echo-client.cpp
index 3a87e38e6e0..4bc9ac743e4 100644
--- a/jrt_test/src/tests/echo/echo-client.cpp
+++ b/jrt_test/src/tests/echo/echo-client.cpp
@@ -1,6 +1,9 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <vespa/fnet/frt/frt.h>
+#include <vespa/fnet/frt/supervisor.h>
+#include <vespa/fnet/frt/target.h>
+#include <vespa/fnet/frt/rpcrequest.h>
+
#include <vespa/fastos/app.h>
class EchoClient : public FastOS_Application
diff --git a/jrt_test/src/tests/mandatory-methods/extract-reflection.cpp b/jrt_test/src/tests/mandatory-methods/extract-reflection.cpp
index cd1ad7e6eed..af9f60f84d3 100644
--- a/jrt_test/src/tests/mandatory-methods/extract-reflection.cpp
+++ b/jrt_test/src/tests/mandatory-methods/extract-reflection.cpp
@@ -1,7 +1,9 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/fastos/app.h>
-#include <vespa/fnet/frt/frt.h>
+#include <vespa/fnet/frt/supervisor.h>
+#include <vespa/fnet/frt/target.h>
+#include <vespa/fnet/frt/rpcrequest.h>
#include <vespa/vespalib/util/time.h>
#include <thread>
diff --git a/jrt_test/src/tests/mockup-invoke/mockup-server.cpp b/jrt_test/src/tests/mockup-invoke/mockup-server.cpp
index 4101bb8e9aa..1abe27407b3 100644
--- a/jrt_test/src/tests/mockup-invoke/mockup-server.cpp
+++ b/jrt_test/src/tests/mockup-invoke/mockup-server.cpp
@@ -1,15 +1,15 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <vespa/fnet/frt/frt.h>
+#include <vespa/fnet/frt/supervisor.h>
+#include <vespa/fnet/frt/rpcrequest.h>
+#include <vespa/fnet/transport.h>
#include <vespa/fastos/app.h>
class MockupServer : public FRT_Invokable
{
-private:
- MockupServer(const MockupServer &);
- MockupServer &operator=(const MockupServer &);
-
public:
+ MockupServer(const MockupServer &) = delete;
+ MockupServer &operator=(const MockupServer &) = delete;
MockupServer(FRT_Supervisor *s)
{
FRT_ReflectionBuilder rb(s);
diff --git a/jrt_test/src/tests/rpc-error/test-errors.cpp b/jrt_test/src/tests/rpc-error/test-errors.cpp
index c30af8ea579..1c0c057e433 100644
--- a/jrt_test/src/tests/rpc-error/test-errors.cpp
+++ b/jrt_test/src/tests/rpc-error/test-errors.cpp
@@ -1,6 +1,8 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/vespalib/testkit/testapp.h>
-#include <vespa/fnet/frt/frt.h>
+#include <vespa/fnet/frt/supervisor.h>
+#include <vespa/fnet/frt/target.h>
+#include <vespa/fnet/frt/rpcrequest.h>
class TestErrors : public vespalib::TestApp
{
diff --git a/logd/src/logd/rpc_forwarder.cpp b/logd/src/logd/rpc_forwarder.cpp
index ffc43bce0bb..90adf7c0b66 100644
--- a/logd/src/logd/rpc_forwarder.cpp
+++ b/logd/src/logd/rpc_forwarder.cpp
@@ -7,6 +7,10 @@
#include <vespa/log/exceptions.h>
#include <vespa/vespalib/util/buffer.h>
#include <vespa/vespalib/util/stringfmt.h>
+#include <vespa/fnet/frt/rpcrequest.h>
+#include <vespa/fnet/frt/supervisor.h>
+
+
#include <vespa/log/log.h>
LOG_SETUP(".logd.rpc_forwarder");
diff --git a/logd/src/logd/rpc_forwarder.h b/logd/src/logd/rpc_forwarder.h
index 37729db088f..fbc3ecbc09a 100644
--- a/logd/src/logd/rpc_forwarder.h
+++ b/logd/src/logd/rpc_forwarder.h
@@ -5,10 +5,12 @@
#include "forwarder.h"
#include "proto_converter.h"
#include <vespa/log/log_message.h>
-#include <vespa/fnet/frt/frt.h>
+#include <vespa/fnet/frt/target.h>
#include <memory>
#include <vector>
+class FRT_Supervisor;
+
namespace logdemon {
struct Metrics;
diff --git a/logd/src/tests/rpc_forwarder/rpc_forwarder_test.cpp b/logd/src/tests/rpc_forwarder/rpc_forwarder_test.cpp
index d39a9ade0a8..370ca7f458e 100644
--- a/logd/src/tests/rpc_forwarder/rpc_forwarder_test.cpp
+++ b/logd/src/tests/rpc_forwarder/rpc_forwarder_test.cpp
@@ -5,6 +5,10 @@
#include <logd/rpc_forwarder.h>
#include <vespa/vespalib/gtest/gtest.h>
#include <vespa/vespalib/metrics/dummy_metrics_manager.h>
+#include <vespa/fnet/frt/supervisor.h>
+#include <vespa/fnet/frt/rpcrequest.h>
+
+
using namespace logdemon;
using vespalib::metrics::DummyMetricsManager;
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
index 893b8d1d2ca..9ecc57fd9de 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
@@ -79,6 +79,14 @@ struct TargetPoolTask : public FNET_Task {
}
};
+TransportConfig
+toFNETConfig(const RPCNetworkParams & params) {
+ return TransportConfig()
+ .maxInputBufferSize(params.getMaxInputBufferSize())
+ .maxOutputBufferSize(params.getMaxOutputBufferSize())
+ .tcpNoDelay(params.getTcpNoDelay());
+}
+
}
RPCNetwork::SendContext::SendContext(RPCNetwork &net, const Message &msg,
@@ -117,7 +125,7 @@ RPCNetwork::RPCNetwork(const RPCNetworkParams &params) :
_owner(nullptr),
_ident(params.getIdentity()),
_threadPool(std::make_unique<FastOS_ThreadPool>(128000, 0)),
- _transport(std::make_unique<FNET_Transport>()),
+ _transport(std::make_unique<FNET_Transport>(toFNETConfig(params))),
_orb(std::make_unique<FRT_Supervisor>(_transport.get())),
_scheduler(*_transport->GetScheduler()),
_slobrokCfgFactory(std::make_unique<slobrok::ConfiguratorFactory>(params.getSlobrokConfig())),
@@ -135,9 +143,6 @@ RPCNetwork::RPCNetwork(const RPCNetworkParams &params) :
_allowDispatchForEncode(params.getDispatchOnEncode()),
_allowDispatchForDecode(params.getDispatchOnDecode())
{
- _transport->SetMaxInputBufferSize(params.getMaxInputBufferSize());
- _transport->SetMaxOutputBufferSize(params.getMaxOutputBufferSize());
- _transport->SetTCPNoDelay(params.getTcpNoDelay());
}
RPCNetwork::~RPCNetwork()
diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/AllocatableClusterResources.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/AllocatableClusterResources.java
index b1213b2da41..9eb4b796970 100644
--- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/AllocatableClusterResources.java
+++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/AllocatableClusterResources.java
@@ -26,20 +26,19 @@ public class AllocatableClusterResources {
private final NodeResources realResources;
private final NodeResources advertisedResources;
- private final ClusterSpec.Type clusterType;
+ private final ClusterSpec clusterSpec;
private final double fulfilment;
/** Fake allocatable resources from requested capacity */
public AllocatableClusterResources(ClusterResources requested,
- ClusterSpec.Type clusterType,
- boolean exclusive,
+ ClusterSpec clusterSpec,
NodeRepository nodeRepository) {
this.nodes = requested.nodes();
this.groups = requested.groups();
- this.realResources = nodeRepository.resourcesCalculator().requestToReal(requested.nodeResources(), exclusive);
+ this.realResources = nodeRepository.resourcesCalculator().requestToReal(requested.nodeResources(), clusterSpec.isExclusive());
this.advertisedResources = requested.nodeResources();
- this.clusterType = clusterType;
+ this.clusterSpec = clusterSpec;
this.fulfilment = 1;
}
@@ -48,19 +47,19 @@ public class AllocatableClusterResources {
this.groups = (int)nodes.stream().map(node -> node.allocation().get().membership().cluster().group()).distinct().count();
this.realResources = averageRealResourcesOf(nodes, nodeRepository, exclusive); // Average since we average metrics over nodes
this.advertisedResources = nodes.get(0).resources();
- this.clusterType = nodes.get(0).allocation().get().membership().cluster().type();
+ this.clusterSpec = nodes.get(0).allocation().get().membership().cluster();
this.fulfilment = 1;
}
public AllocatableClusterResources(ClusterResources realResources,
NodeResources advertisedResources,
NodeResources idealResources,
- ClusterSpec.Type clusterType) {
+ ClusterSpec clusterSpec) {
this.nodes = realResources.nodes();
this.groups = realResources.groups();
this.realResources = realResources.nodeResources();
this.advertisedResources = advertisedResources;
- this.clusterType = clusterType;
+ this.clusterSpec = clusterSpec;
this.fulfilment = fulfilment(realResources.nodeResources(), idealResources);
}
@@ -88,7 +87,7 @@ public class AllocatableClusterResources {
return (int)Math.ceil((double)nodes / groups);
}
- public ClusterSpec.Type clusterType() { return clusterType; }
+ public ClusterSpec clusterSpec() { return clusterSpec; }
public double cost() { return nodes * advertisedResources.cost(); }
@@ -133,23 +132,23 @@ public class AllocatableClusterResources {
}
public static Optional<AllocatableClusterResources> from(ClusterResources wantedResources,
- boolean exclusive,
- ClusterSpec.Type clusterType,
+ ClusterSpec clusterSpec,
Limits applicationLimits,
NodeRepository nodeRepository) {
var systemLimits = new NodeResourceLimits(nodeRepository);
- if ( !exclusive && !nodeRepository.zone().getCloud().dynamicProvisioning()) {
+ boolean exclusive = clusterSpec.isExclusive();
+ if ( !clusterSpec.isExclusive() && !nodeRepository.zone().getCloud().dynamicProvisioning()) {
// We decide resources: Add overhead to what we'll request (advertised) to make sure real becomes (at least) cappedNodeResources
NodeResources advertisedResources = nodeRepository.resourcesCalculator().realToRequest(wantedResources.nodeResources(), exclusive);
- advertisedResources = systemLimits.enlargeToLegal(advertisedResources, clusterType, exclusive); // Attempt to ask for something legal
+ advertisedResources = systemLimits.enlargeToLegal(advertisedResources, clusterSpec.type(), exclusive); // Attempt to ask for something legal
advertisedResources = applicationLimits.cap(advertisedResources); // Overrides other conditions, even if it will then fail
NodeResources realResources = nodeRepository.resourcesCalculator().requestToReal(advertisedResources, exclusive); // ... thus, what we really get may change
- if ( ! systemLimits.isWithinRealLimits(realResources, clusterType)) return Optional.empty();
+ if ( ! systemLimits.isWithinRealLimits(realResources, clusterSpec.type())) return Optional.empty();
if (matchesAny(nodeRepository.flavors().getFlavors(), advertisedResources))
return Optional.of(new AllocatableClusterResources(wantedResources.with(realResources),
advertisedResources,
wantedResources.nodeResources(),
- clusterType));
+ clusterSpec));
else
return Optional.empty();
}
@@ -172,11 +171,11 @@ public class AllocatableClusterResources {
}
if ( ! between(applicationLimits.min().nodeResources(), applicationLimits.max().nodeResources(), advertisedResources)) continue;
- if ( ! systemLimits.isWithinRealLimits(realResources, clusterType)) continue;
+ if ( ! systemLimits.isWithinRealLimits(realResources, clusterSpec.type())) continue;
var candidate = new AllocatableClusterResources(wantedResources.with(realResources),
advertisedResources,
wantedResources.nodeResources(),
- clusterType);
+ clusterSpec);
if (best.isEmpty() || candidate.preferableTo(best.get()))
best = Optional.of(candidate);
}
diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/AllocationOptimizer.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/AllocationOptimizer.java
index e57011b0e4a..fb97e803a35 100644
--- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/AllocationOptimizer.java
+++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/AllocationOptimizer.java
@@ -58,7 +58,7 @@ public class AllocationOptimizer {
groups,
nodeResourcesWith(nodesAdjustedForRedundancy, groupsAdjustedForRedundancy, limits, current, target));
- var allocatableResources = AllocatableClusterResources.from(next, exclusive, current.clusterType(), limits, nodeRepository);
+ var allocatableResources = AllocatableClusterResources.from(next, current.clusterSpec(), limits, nodeRepository);
if (allocatableResources.isEmpty()) continue;
if (bestAllocation.isEmpty() || allocatableResources.get().preferableTo(bestAllocation.get()))
bestAllocation = allocatableResources;
@@ -79,7 +79,7 @@ public class AllocationOptimizer {
int groupSize = nodes / groups;
- if (current.clusterType().isContent()) { // load scales with node share of content
+ if (current.clusterSpec().isStateful()) { // load scales with node share of content
// The fixed cost portion of cpu does not scale with changes to the node count
// TODO: Only for the portion of cpu consumed by queries
double cpuPerGroup = fixedCpuCostFraction * target.nodeCpu() +
diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/Autoscaler.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/Autoscaler.java
index 023eb5860ee..8fcba452d26 100644
--- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/Autoscaler.java
+++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/Autoscaler.java
@@ -59,7 +59,7 @@ public class Autoscaler {
}
private Advice autoscale(Cluster cluster, List<Node> clusterNodes, Limits limits, boolean exclusive) {
- ClusterSpec.Type clusterType = clusterNodes.get(0).allocation().get().membership().cluster().type();
+ ClusterSpec clusterSpec = clusterNodes.get(0).allocation().get().membership().cluster();
if ( ! stable(clusterNodes, nodeRepository))
return Advice.none("Cluster change in progress");
@@ -69,7 +69,7 @@ public class Autoscaler {
ClusterTimeseries clusterTimeseries = new ClusterTimeseries(cluster, clusterNodes, metricsDb, nodeRepository);
int measurementsPerNode = clusterTimeseries.measurementsPerNode();
- if (measurementsPerNode < minimumMeasurementsPerNode(clusterType))
+ if (measurementsPerNode < minimumMeasurementsPerNode(clusterSpec))
return Advice.none("Collecting more data before making new scaling decisions" +
": Has " + measurementsPerNode + " data points per node" +
"(all: " + clusterTimeseries.measurementCount +
@@ -124,14 +124,14 @@ public class Autoscaler {
}
private boolean recentlyScaled(Cluster cluster, List<Node> clusterNodes) {
- Duration downscalingDelay = downscalingDelay(clusterNodes.get(0).allocation().get().membership().cluster().type());
+ Duration downscalingDelay = downscalingDelay(clusterNodes.get(0).allocation().get().membership().cluster());
return cluster.lastScalingEvent().map(event -> event.at()).orElse(Instant.MIN)
.isAfter(nodeRepository.clock().instant().minus(downscalingDelay));
}
/** The duration of the window we need to consider to make a scaling decision. See also minimumMeasurementsPerNode */
- static Duration scalingWindow(ClusterSpec.Type clusterType) {
- if (clusterType.isContent()) return Duration.ofHours(12);
+ static Duration scalingWindow(ClusterSpec cluster) {
+ if (cluster.isStateful()) return Duration.ofHours(12);
return Duration.ofMinutes(30);
}
@@ -140,8 +140,8 @@ public class Autoscaler {
}
/** Measurements are currently taken once a minute. See also scalingWindow */
- static int minimumMeasurementsPerNode(ClusterSpec.Type clusterType) {
- if (clusterType.isContent()) return 60;
+ static int minimumMeasurementsPerNode(ClusterSpec cluster) {
+ if (cluster.isStateful()) return 60;
return 7;
}
@@ -149,8 +149,8 @@ public class Autoscaler {
* We should wait a while before scaling down after a scaling event as a peak in usage
* indicates more peaks may arrive in the near future.
*/
- static Duration downscalingDelay(ClusterSpec.Type clusterType) {
- if (clusterType.isContent()) return Duration.ofHours(12);
+ static Duration downscalingDelay(ClusterSpec cluster) {
+ if (cluster.isStateful()) return Duration.ofHours(12);
return Duration.ofHours(1);
}
diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/ClusterTimeseries.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/ClusterTimeseries.java
index 1983162f121..2b4ba3fbbcb 100644
--- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/ClusterTimeseries.java
+++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/ClusterTimeseries.java
@@ -7,7 +7,6 @@ import com.yahoo.vespa.hosted.provision.NodeRepository;
import com.yahoo.vespa.hosted.provision.applications.Cluster;
import java.time.Instant;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -33,8 +32,8 @@ public class ClusterTimeseries {
public ClusterTimeseries(Cluster cluster, List<Node> clusterNodes, MetricsDb db, NodeRepository nodeRepository) {
this.clusterNodes = clusterNodes;
- ClusterSpec.Type clusterType = clusterNodes.get(0).allocation().get().membership().cluster().type();
- var timeseries = db.getNodeTimeseries(nodeRepository.clock().instant().minus(Autoscaler.scalingWindow(clusterType)),
+ ClusterSpec clusterSpec = clusterNodes.get(0).allocation().get().membership().cluster();
+ var timeseries = db.getNodeTimeseries(nodeRepository.clock().instant().minus(Autoscaler.scalingWindow(clusterSpec)),
clusterNodes.stream().map(Node::hostname).collect(Collectors.toSet()));
Map<String, Instant> startTimePerNode = metricStartTimes(cluster, clusterNodes, timeseries, nodeRepository);
diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/MetricsReporter.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/MetricsReporter.java
index 685fa3727a1..bcdcd9054a7 100644
--- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/MetricsReporter.java
+++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/MetricsReporter.java
@@ -262,19 +262,19 @@ public class MetricsReporter extends NodeRepositoryMaintainer {
Metric.Context context = getContext(Map.of("lockPath", lockPath));
LatencyMetrics acquireLatencyMetrics = lockMetrics.getAndResetAcquireLatencyMetrics();
- metric.set("lockAttempt.acquireMaxActiveLatency", acquireLatencyMetrics.maxActiveLatencySeconds(), context);
- metric.set("lockAttempt.acquireHz", acquireLatencyMetrics.startHz(), context);
- metric.set("lockAttempt.acquireLoad", acquireLatencyMetrics.load(), context);
+ setNonZero("lockAttempt.acquireMaxActiveLatency", acquireLatencyMetrics.maxActiveLatencySeconds(), context);
+ setNonZero("lockAttempt.acquireHz", acquireLatencyMetrics.startHz(), context);
+ setNonZero("lockAttempt.acquireLoad", acquireLatencyMetrics.load(), context);
LatencyMetrics lockedLatencyMetrics = lockMetrics.getAndResetLockedLatencyMetrics();
- metric.set("lockAttempt.lockedLatency", lockedLatencyMetrics.maxLatencySeconds(), context);
- metric.set("lockAttempt.lockedLoad", lockedLatencyMetrics.load(), context);
+ setNonZero("lockAttempt.lockedLatency", lockedLatencyMetrics.maxLatencySeconds(), context);
+ setNonZero("lockAttempt.lockedLoad", lockedLatencyMetrics.load(), context);
- metric.set("lockAttempt.acquireTimedOut", lockMetrics.getAndResetAcquireTimedOutCount(), context);
- metric.set("lockAttempt.deadlock", lockMetrics.getAndResetDeadlockCount(), context);
+ setNonZero("lockAttempt.acquireTimedOut", lockMetrics.getAndResetAcquireTimedOutCount(), context);
+ setNonZero("lockAttempt.deadlock", lockMetrics.getAndResetDeadlockCount(), context);
// bucket for various rare errors - to reduce #metrics
- metric.set("lockAttempt.errors",
+ setNonZero("lockAttempt.errors",
lockMetrics.getAndResetAcquireFailedCount() +
lockMetrics.getAndResetReleaseFailedCount() +
lockMetrics.getAndResetNakedReleaseCount() +
@@ -284,6 +284,12 @@ public class MetricsReporter extends NodeRepositoryMaintainer {
});
}
+ private void setNonZero(String key, Number value, Metric.Context context) {
+ if (Double.compare(value.doubleValue(), 0.0) != 0) {
+ metric.set(key, value, context);
+ }
+ }
+
private void updateDockerMetrics(NodeList nodes) {
NodeResources totalCapacity = getCapacityTotal(nodes);
metric.set("hostedVespa.docker.totalCapacityCpu", totalCapacity.vcpu(), null);
diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodeAllocation.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodeAllocation.java
index 68e11c4c995..bc164dc37e0 100644
--- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodeAllocation.java
+++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodeAllocation.java
@@ -203,7 +203,7 @@ class NodeAllocation {
* Such nodes will be marked retired during finalization of the list of accepted nodes.
* The conditions for this are:
*
- * This is a content or combined node. These must always be retired before being removed to allow the cluster to
+ * This is a stateful node. These must always be retired before being removed to allow the cluster to
* migrate away data.
*
* This is a container node and it is not desired due to having the wrong flavor. In this case this
@@ -218,7 +218,7 @@ class NodeAllocation {
if (candidate.allocation().get().membership().retired()) return true; // don't second-guess if already retired
if (! requestedNodes.considerRetiring()) return false;
- return cluster.type().isContent() ||
+ return cluster.isStateful() ||
(cluster.type() == ClusterSpec.Type.container && !hasCompatibleFlavor(candidate));
}
diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodeRepositoryProvisioner.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodeRepositoryProvisioner.java
index ede6f4ef250..a6d68243160 100644
--- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodeRepositoryProvisioner.java
+++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodeRepositoryProvisioner.java
@@ -29,7 +29,6 @@ import com.yahoo.vespa.hosted.provision.autoscale.AllocatableClusterResources;
import com.yahoo.vespa.hosted.provision.autoscale.AllocationOptimizer;
import com.yahoo.vespa.hosted.provision.autoscale.Limits;
import com.yahoo.vespa.hosted.provision.autoscale.ResourceTarget;
-import com.yahoo.vespa.hosted.provision.node.Agent;
import com.yahoo.vespa.hosted.provision.node.Allocation;
import com.yahoo.vespa.hosted.provision.node.filter.ApplicationFilter;
import com.yahoo.vespa.hosted.provision.node.filter.NodeHostFilter;
@@ -168,7 +167,7 @@ public class NodeRepositoryProvisioner implements Provisioner {
boolean firstDeployment = nodes.isEmpty();
AllocatableClusterResources currentResources =
firstDeployment // start at min, preserve current resources otherwise
- ? new AllocatableClusterResources(requested.minResources(), clusterSpec.type(), clusterSpec.isExclusive(), nodeRepository)
+ ? new AllocatableClusterResources(requested.minResources(), clusterSpec, nodeRepository)
: new AllocatableClusterResources(nodes, nodeRepository, clusterSpec.isExclusive());
return within(Limits.of(requested), clusterSpec.isExclusive(), currentResources, firstDeployment);
}
diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/restapi/NodesV2ApiTest.java b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/restapi/NodesV2ApiTest.java
index 86427fe30ae..3945c518a77 100644
--- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/restapi/NodesV2ApiTest.java
+++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/restapi/NodesV2ApiTest.java
@@ -485,7 +485,7 @@ public class NodesV2ApiTest {
"{\"message\":\"Moved host2.yahoo.com to parked\"}");
tester.assertResponse(new Request("http://localhost:8080/nodes/v2/state/ready/host2.yahoo.com",
new byte[0], Request.Method.PUT),
- 400, "{\"error-code\":\"BAD_REQUEST\",\"message\":\"Cannot make parked host host2.yahoo.com allocated to tenant2.application2.instance2 as 'content/id2/0/0' available for new allocation as it is not in state [dirty]\"}");
+ 400, "{\"error-code\":\"BAD_REQUEST\",\"message\":\"Cannot make parked host host2.yahoo.com allocated to tenant2.application2.instance2 as 'content/id2/0/0/stateful' available for new allocation as it is not in state [dirty]\"}");
// (... while dirty then ready works (the ready move will be initiated by node maintenance))
assertResponse(new Request("http://localhost:8080/nodes/v2/state/dirty/host2.yahoo.com",
new byte[0], Request.Method.PUT),
@@ -502,7 +502,7 @@ public class NodesV2ApiTest {
// Attempt to DELETE allocated node
tester.assertResponse(new Request("http://localhost:8080/nodes/v2/node/host4.yahoo.com",
new byte[0], Request.Method.DELETE),
- 400, "{\"error-code\":\"BAD_REQUEST\",\"message\":\"active child node host4.yahoo.com allocated to tenant3.application3.instance3 as 'content/id3/0/0' is currently allocated and cannot be removed\"}");
+ 400, "{\"error-code\":\"BAD_REQUEST\",\"message\":\"active child node host4.yahoo.com allocated to tenant3.application3.instance3 as 'content/id3/0/0/stateful' is currently allocated and cannot be removed\"}");
// PUT current restart generation with string instead of long
tester.assertResponse(new Request("http://localhost:8080/nodes/v2/node/host4.yahoo.com",
diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/restapi/responses/capacity-zone.json b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/restapi/responses/capacity-zone.json
index 7104957ba98..c3857e9c8ee 100644
--- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/restapi/responses/capacity-zone.json
+++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/restapi/responses/capacity-zone.json
@@ -4,7 +4,7 @@
"failedTenantParent": "dockerhost1.yahoo.com",
"failedTenant": "host4.yahoo.com",
"failedTenantResources": "[vcpu: 1.0, memory: 4.0 Gb, disk 100.0 Gb, bandwidth: 1.0 Gbps, storage type: local]",
- "failedTenantAllocation": "allocated to tenant3.application3.instance3 as 'content/id3/0/0'",
+ "failedTenantAllocation": "allocated to tenant3.application3.instance3 as 'content/id3/0/0/stateful'",
"hostCandidateRejectionReasons": {
"singularReasonFailures": {
"insufficientVcpu": 0,
diff --git a/searchcore/src/apps/vespa-proton-cmd/vespa-proton-cmd.cpp b/searchcore/src/apps/vespa-proton-cmd/vespa-proton-cmd.cpp
index 8c7e6f13c18..8b6203c78a2 100644
--- a/searchcore/src/apps/vespa-proton-cmd/vespa-proton-cmd.cpp
+++ b/searchcore/src/apps/vespa-proton-cmd/vespa-proton-cmd.cpp
@@ -3,7 +3,8 @@
#include <vespa/slobrok/sbmirror.h>
#include <vespa/config/common/configsystem.h>
#include <vespa/config/common/exceptions.h>
-#include <vespa/fnet/frt/frt.h>
+#include <vespa/fnet/frt/supervisor.h>
+#include <vespa/fnet/frt/target.h>
#include <vespa/vespalib/util/host_name.h>
#include <vespa/vespalib/util/stringfmt.h>
#include <vespa/vespalib/util/time.h>
@@ -28,10 +29,12 @@ private:
FRT_RPCRequest *_req;
public:
- App() : _frt(),
- _target(nullptr),
- _req(nullptr) {}
- virtual ~App()
+ App()
+ : _frt(),
+ _target(nullptr),
+ _req(nullptr)
+ {}
+ ~App() override
{
assert(!_frt);
assert(_target == nullptr);
diff --git a/searchlib/src/test/java/com/yahoo/searchlib/tensor/TensorConformanceTest.java b/searchlib/src/test/java/com/yahoo/searchlib/tensor/TensorConformanceTest.java
deleted file mode 100644
index 61aec069c72..00000000000
--- a/searchlib/src/test/java/com/yahoo/searchlib/tensor/TensorConformanceTest.java
+++ /dev/null
@@ -1,142 +0,0 @@
-// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.searchlib.tensor;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.yahoo.io.GrowableByteBuffer;
-import com.yahoo.searchlib.rankingexpression.RankingExpression;
-import com.yahoo.searchlib.rankingexpression.evaluation.BooleanValue;
-import com.yahoo.searchlib.rankingexpression.evaluation.DoubleCompatibleValue;
-import com.yahoo.searchlib.rankingexpression.evaluation.MapContext;
-import com.yahoo.searchlib.rankingexpression.evaluation.StringValue;
-import com.yahoo.searchlib.rankingexpression.evaluation.TensorValue;
-import com.yahoo.searchlib.rankingexpression.evaluation.Value;
-import com.yahoo.searchlib.rankingexpression.parser.ParseException;
-import com.yahoo.tensor.Tensor;
-import com.yahoo.tensor.serialization.TypedBinaryFormat;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileReader;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Optional;
-
-import static org.junit.Assert.assertEquals;
-
-public class TensorConformanceTest {
-
- private static String testPath = "eval/src/apps/tensor_conformance/test_spec.json";
-
- @Test
- public void testConformance() throws IOException {
- File testSpec = new File(testPath);
- if (!testSpec.exists()) {
- testSpec = new File("../" + testPath);
- }
- int count = 0;
- List<Integer> failList = new ArrayList<>();
-
- try(BufferedReader br = new BufferedReader(new FileReader(testSpec))) {
- String test = br.readLine();
- while (test != null) {
- boolean success = testCase(test, count);
- if (!success) {
- failList.add(count);
- }
- test = br.readLine();
- count++;
- }
- }
- assertEquals(failList.size() + " conformance test fails: " + failList, 0, failList.size());
- }
-
- private boolean testCase(String test, int count) {
- try {
- ObjectMapper mapper = new ObjectMapper();
- JsonNode node = mapper.readTree(test);
-
- if (node.has("num_tests")) {
- Assert.assertEquals(node.get("num_tests").asInt(), count);
- return true;
- }
- if (!node.has("expression")) {
- return true; // ignore
- }
-
- String expression = node.get("expression").asText();
- MapContext context = getInput(node.get("inputs"));
- Tensor expect = getTensor(node.get("result").get("expect").asText());
- Tensor result = evaluate(expression, context);
- boolean equals = Tensor.equals(result, expect);
- if (!equals) {
- System.out.println(count + " : Tensors not equal. Result: " + result.toString() + " Expected: " + expect.toString() + " -> expression \"" + expression + "\"");
- } else if (! result.type().valueType().equals(expect.type().valueType())) {
- System.out.println(count + " : Tensor cell value types not equal. Result: " + result.type() + " Expected: " + expect.type() + " -> expression \"" + expression + "\"");
- equals = false;
- }
- return equals;
-
- } catch (Exception e) {
- System.out.println(count + " : " + e.toString());
- }
- return false;
- }
-
- private Tensor evaluate(String expression, MapContext context) throws ParseException {
- Value value = new RankingExpression(expression).evaluate(context);
- if (!(value instanceof TensorValue)) {
- throw new IllegalArgumentException("Result is not a tensor");
- }
- return ((TensorValue)value).asTensor();
- }
-
- private MapContext getInput(JsonNode inputs) {
- MapContext context = new MapContext();
- for (Iterator<String> i = inputs.fieldNames(); i.hasNext(); ) {
- String name = i.next();
- String value = inputs.get(name).asText();
- Tensor tensor = getTensor(value);
- context.put(name, new TensorValue(tensor));
- }
- return context;
- }
-
- private Tensor getTensor(String binaryRepresentation) {
- byte[] bin = getBytes(binaryRepresentation);
- return TypedBinaryFormat.decode(Optional.empty(), GrowableByteBuffer.wrap(bin));
- }
-
- private byte[] getBytes(String binaryRepresentation) {
- return parseHexValue(binaryRepresentation.substring(2));
- }
-
- private byte[] parseHexValue(String s) {
- final int len = s.length();
- byte[] bytes = new byte[len/2];
- for (int i = 0; i < len; i += 2) {
- int c1 = hexValue(s.charAt(i)) << 4;
- int c2 = hexValue(s.charAt(i + 1));
- bytes[i/2] = (byte)(c1 + c2);
- }
- return bytes;
- }
-
- private int hexValue(Character c) {
- if (c >= 'a' && c <= 'f') {
- return c - 'a' + 10;
- } else if (c >= 'A' && c <= 'F') {
- return c - 'A' + 10;
- } else if (c >= '0' && c <= '9') {
- return c - '0';
- }
- throw new IllegalArgumentException("Hex contains illegal characters");
- }
-
-}
-
diff --git a/searchlib/src/tests/attribute/tensorattribute/tensorattribute_test.cpp b/searchlib/src/tests/attribute/tensorattribute/tensorattribute_test.cpp
index daa85c91b2c..bf46a2cc7d0 100644
--- a/searchlib/src/tests/attribute/tensorattribute/tensorattribute_test.cpp
+++ b/searchlib/src/tests/attribute/tensorattribute/tensorattribute_test.cpp
@@ -16,7 +16,7 @@
#include <vespa/searchlib/tensor/nearest_neighbor_index.h>
#include <vespa/searchlib/tensor/nearest_neighbor_index_factory.h>
#include <vespa/searchlib/tensor/nearest_neighbor_index_saver.h>
-#include <vespa/searchlib/tensor/serialized_tensor_attribute.h>
+#include <vespa/searchlib/tensor/serialized_fast_value_attribute.h>
#include <vespa/searchlib/tensor/tensor_attribute.h>
#include <vespa/searchlib/test/directory_handler.h>
#include <vespa/searchlib/util/fileutil.h>
@@ -40,7 +40,7 @@ using search::tensor::DefaultNearestNeighborIndexFactory;
using search::tensor::DenseTensorAttribute;
using search::tensor::DirectTensorAttribute;
using search::tensor::DocVectorAccess;
-using search::tensor::SerializedTensorAttribute;
+using search::tensor::SerializedFastValueAttribute;
using search::tensor::HnswIndex;
using search::tensor::HnswNode;
using search::tensor::NearestNeighborIndex;
@@ -344,7 +344,7 @@ struct Fixture {
} else if (_traits.use_direct_tensor_attribute) {
return std::make_shared<DirectTensorAttribute>(_name, _cfg);
} else {
- return std::make_shared<SerializedTensorAttribute>(_name, _cfg);
+ return std::make_shared<SerializedFastValueAttribute>(_name, _cfg);
}
}
diff --git a/searchlib/src/tests/engine/proto_rpc_adapter/proto_rpc_adapter_test.cpp b/searchlib/src/tests/engine/proto_rpc_adapter/proto_rpc_adapter_test.cpp
index 3dbe0d00881..1cd91329912 100644
--- a/searchlib/src/tests/engine/proto_rpc_adapter/proto_rpc_adapter_test.cpp
+++ b/searchlib/src/tests/engine/proto_rpc_adapter/proto_rpc_adapter_test.cpp
@@ -6,7 +6,9 @@
#include <vespa/searchlib/engine/searchapi.h>
#include <vespa/searchlib/engine/docsumapi.h>
#include <vespa/searchlib/engine/monitorapi.h>
-#include <vespa/fnet/frt/frt.h>
+#include <vespa/fnet/frt/supervisor.h>
+#include <vespa/fnet/frt/target.h>
+#include <vespa/fnet/frt/rpcrequest.h>
#include <vespa/vespalib/data/slime/slime.h>
#include <vespa/vespalib/data/slime/binary_format.h>
#include <thread>
@@ -83,7 +85,7 @@ struct ProtoRpcAdapterTest : ::testing::Test {
FRT_Target *connect() {
return server.supervisor().GetTarget(server.supervisor().GetListenPort());
}
- ~ProtoRpcAdapterTest() = default;
+ ~ProtoRpcAdapterTest() override = default;
};
//-----------------------------------------------------------------------------
diff --git a/searchlib/src/vespa/searchlib/attribute/createsinglestd.cpp b/searchlib/src/vespa/searchlib/attribute/createsinglestd.cpp
index 148d18f79ff..29033944d4b 100644
--- a/searchlib/src/vespa/searchlib/attribute/createsinglestd.cpp
+++ b/searchlib/src/vespa/searchlib/attribute/createsinglestd.cpp
@@ -7,8 +7,10 @@
#include "singlenumericattribute.hpp"
#include "singlestringattribute.h"
#include "singleboolattribute.h"
+#include <vespa/eval/eval/engine_or_factory.h>
#include <vespa/searchlib/tensor/dense_tensor_attribute.h>
#include <vespa/searchlib/tensor/serialized_tensor_attribute.h>
+#include <vespa/searchlib/tensor/serialized_fast_value_attribute.h>
namespace search {
@@ -45,6 +47,8 @@ AttributeFactory::createSingleStd(stringref name, const Config & info)
case BasicType::TENSOR:
if (info.tensorType().is_dense()) {
return std::make_shared<tensor::DenseTensorAttribute>(name, info);
+ } else if (vespalib::eval::EngineOrFactory::get().is_factory()) {
+ return std::make_shared<tensor::SerializedFastValueAttribute>(name, info);
} else {
return std::make_shared<tensor::SerializedTensorAttribute>(name, info);
}
diff --git a/searchlib/src/vespa/searchlib/tensor/CMakeLists.txt b/searchlib/src/vespa/searchlib/tensor/CMakeLists.txt
index fac6d015a5f..79b18a57a34 100644
--- a/searchlib/src/vespa/searchlib/tensor/CMakeLists.txt
+++ b/searchlib/src/vespa/searchlib/tensor/CMakeLists.txt
@@ -25,5 +25,8 @@ vespa_add_library(searchlib_tensor OBJECT
tensor_attribute.cpp
tensor_deserialize.cpp
tensor_store.cpp
+ serialized_fast_value_attribute.cpp
+ streamed_value_saver.cpp
+ streamed_value_store.cpp
DEPENDS
)
diff --git a/searchlib/src/vespa/searchlib/tensor/serialized_fast_value_attribute.cpp b/searchlib/src/vespa/searchlib/tensor/serialized_fast_value_attribute.cpp
new file mode 100644
index 00000000000..6e1fb1a0a2f
--- /dev/null
+++ b/searchlib/src/vespa/searchlib/tensor/serialized_fast_value_attribute.cpp
@@ -0,0 +1,234 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "serialized_fast_value_attribute.h"
+#include "streamed_value_saver.h"
+#include <vespa/eval/eval/value.h>
+#include <vespa/eval/eval/fast_value.hpp>
+#include <vespa/eval/streamed/streamed_value_utils.h>
+#include <vespa/fastlib/io/bufferedfile.h>
+#include <vespa/searchlib/attribute/readerbase.h>
+#include <vespa/searchlib/util/fileutil.h>
+#include <vespa/vespalib/util/rcuvector.hpp>
+#include <vespa/log/log.h>
+
+LOG_SETUP(".searchlib.tensor.serialized_fast_value_attribute");
+
+#include "blob_sequence_reader.h"
+#include "tensor_attribute.hpp"
+
+using namespace vespalib;
+using namespace vespalib::eval;
+
+namespace search::tensor {
+
+namespace {
+
+struct ValueBlock : LabelBlock {
+ TypedCells cells;
+};
+
+class ValueBlockStream {
+private:
+ const StreamedValueStore::DataFromType &_from_type;
+ LabelBlockStream _label_block_stream;
+ const char *_cells_ptr;
+
+ size_t dsss() const { return _from_type.dense_subspace_size; }
+ auto cell_type() const { return _from_type.cell_type; }
+public:
+ ValueBlock next_block() {
+ auto labels = _label_block_stream.next_block();
+ if (labels) {
+ TypedCells subspace_cells(_cells_ptr, cell_type(), dsss());
+ _cells_ptr += CellTypeUtils::mem_size(cell_type(), dsss());
+ return ValueBlock{labels, subspace_cells};
+ } else {
+ TypedCells none(nullptr, cell_type(), 0);
+ return ValueBlock{labels, none};
+ }
+ }
+
+ ValueBlockStream(const StreamedValueStore::DataFromType &from_type,
+ const StreamedValueStore::StreamedValueData &from_store)
+ : _from_type(from_type),
+ _label_block_stream(from_store.num_subspaces,
+ from_store.labels_buffer,
+ from_type.num_mapped_dimensions),
+ _cells_ptr((const char *)from_store.cells_ref.data)
+ {
+ _label_block_stream.reset();
+ }
+
+ ~ValueBlockStream();
+};
+
+ValueBlockStream::~ValueBlockStream() = default;
+
+void report_problematic_subspace(size_t idx,
+ const StreamedValueStore::DataFromType &from_type,
+ const StreamedValueStore::StreamedValueData &from_store)
+{
+ LOG(error, "PROBLEM: add_mapping returned same index=%zu twice", idx);
+ FastValueIndex temp_index(from_type.num_mapped_dimensions,
+ from_store.num_subspaces);
+ auto from_start = ValueBlockStream(from_type, from_store);
+ while (auto redo_block = from_start.next_block()) {
+ if (idx == temp_index.map.add_mapping(redo_block.address)) {
+ vespalib::string msg = "Block with address[ ";
+ for (vespalib::stringref ref : redo_block.address) {
+ msg.append("'").append(ref).append("' ");
+ }
+ msg.append("]");
+ LOG(error, "%s maps to subspace %zu", msg.c_str(), idx);
+ }
+ }
+}
+
+/**
+ * This Value implementation is almost exactly like FastValue, but
+ * instead of owning its type and cells it just has a reference to
+ * data stored elsewhere.
+ * XXX: we should find a better name for this, and move it
+ * (together with the helper classes above) to its own file,
+ * and add associated unit tests.
+ **/
+class OnlyFastValueIndex : public Value {
+private:
+ const ValueType &_type;
+ TypedCells _cells;
+ FastValueIndex my_index;
+public:
+ OnlyFastValueIndex(const ValueType &type,
+ const StreamedValueStore::DataFromType &from_type,
+ const StreamedValueStore::StreamedValueData &from_store)
+ : _type(type),
+ _cells(from_store.cells_ref),
+ my_index(from_type.num_mapped_dimensions,
+ from_store.num_subspaces)
+ {
+ assert(_type.cell_type() == _cells.type);
+ std::vector<vespalib::stringref> address(from_type.num_mapped_dimensions);
+ auto block_stream = ValueBlockStream(from_type, from_store);
+ size_t ss = 0;
+ while (auto block = block_stream.next_block()) {
+ size_t idx = my_index.map.add_mapping(block.address);
+ if (idx != ss) {
+ report_problematic_subspace(idx, from_type, from_store);
+ }
+ ++ss;
+ }
+ assert(ss == from_store.num_subspaces);
+ }
+
+
+ ~OnlyFastValueIndex();
+
+ const ValueType &type() const final override { return _type; }
+ TypedCells cells() const final override { return _cells; }
+ const Index &index() const final override { return my_index; }
+ vespalib::MemoryUsage get_memory_usage() const final override {
+ auto usage = self_memory_usage<OnlyFastValueIndex>();
+ usage.merge(my_index.map.estimate_extra_memory_usage());
+ return usage;
+ }
+};
+
+OnlyFastValueIndex::~OnlyFastValueIndex() = default;
+
+}
+
+SerializedFastValueAttribute::SerializedFastValueAttribute(stringref name, const Config &cfg)
+ : TensorAttribute(name, cfg, _streamedValueStore),
+ _tensor_type(cfg.tensorType()),
+ _streamedValueStore(_tensor_type),
+ _data_from_type(_tensor_type)
+{
+}
+
+
+SerializedFastValueAttribute::~SerializedFastValueAttribute()
+{
+ getGenerationHolder().clearHoldLists();
+ _tensorStore.clearHoldLists();
+}
+
+void
+SerializedFastValueAttribute::setTensor(DocId docId, const vespalib::eval::Value &tensor)
+{
+ checkTensorType(tensor);
+ EntryRef ref = _streamedValueStore.store_tensor(tensor);
+ assert(ref.valid());
+ setTensorRef(docId, ref);
+}
+
+std::unique_ptr<Value>
+SerializedFastValueAttribute::getTensor(DocId docId) const
+{
+ EntryRef ref;
+ if (docId < getCommittedDocIdLimit()) {
+ ref = _refVector[docId];
+ }
+ if (!ref.valid()) {
+ return {};
+ }
+ if (auto data_from_store = _streamedValueStore.get_tensor_data(ref)) {
+ return std::make_unique<OnlyFastValueIndex>(_tensor_type,
+ _data_from_type,
+ data_from_store);
+ }
+ return {};
+}
+
+bool
+SerializedFastValueAttribute::onLoad()
+{
+ BlobSequenceReader tensorReader(*this);
+ if (!tensorReader.hasData()) {
+ return false;
+ }
+ setCreateSerialNum(tensorReader.getCreateSerialNum());
+ assert(tensorReader.getVersion() == getVersion());
+ uint32_t numDocs(tensorReader.getDocIdLimit());
+ _refVector.reset();
+ _refVector.unsafe_reserve(numDocs);
+ vespalib::Array<char> buffer(1024);
+ for (uint32_t lid = 0; lid < numDocs; ++lid) {
+ uint32_t tensorSize = tensorReader.getNextSize();
+ if (tensorSize != 0) {
+ if (tensorSize > buffer.size()) {
+ buffer.resize(tensorSize + 1024);
+ }
+ tensorReader.readBlob(&buffer[0], tensorSize);
+ vespalib::nbostream source(&buffer[0], tensorSize);
+ EntryRef ref = _streamedValueStore.store_encoded_tensor(source);
+ _refVector.push_back(ref);
+ } else {
+ EntryRef invalid;
+ _refVector.push_back(invalid);
+ }
+ }
+ setNumDocs(numDocs);
+ setCommittedDocIdLimit(numDocs);
+ return true;
+}
+
+
+std::unique_ptr<AttributeSaver>
+SerializedFastValueAttribute::onInitSave(vespalib::stringref fileName)
+{
+ vespalib::GenerationHandler::Guard guard(getGenerationHandler().
+ takeGuard());
+ return std::make_unique<StreamedValueSaver>
+ (std::move(guard),
+ this->createAttributeHeader(fileName),
+ getRefCopy(),
+ _streamedValueStore);
+}
+
+void
+SerializedFastValueAttribute::compactWorst()
+{
+ doCompactWorst<StreamedValueStore::RefType>();
+}
+
+}
diff --git a/searchlib/src/vespa/searchlib/tensor/serialized_fast_value_attribute.h b/searchlib/src/vespa/searchlib/tensor/serialized_fast_value_attribute.h
new file mode 100644
index 00000000000..a8c1df4913a
--- /dev/null
+++ b/searchlib/src/vespa/searchlib/tensor/serialized_fast_value_attribute.h
@@ -0,0 +1,33 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "tensor_attribute.h"
+#include "streamed_value_store.h"
+
+namespace search::tensor {
+
+/**
+ * Attribute vector class storing serialized tensors for all documents in memory.
+ *
+ * When fetching a tensor with getTensor(docId) the returned Value
+ * will have a FastValueIndex (constructed on the fly) for its sparse
+ * mapping, but refer to a common type, while cells() will refer to
+ * memory in the serialized store without copying.
+ *
+ */
+class SerializedFastValueAttribute : public TensorAttribute {
+ vespalib::eval::ValueType _tensor_type;
+ StreamedValueStore _streamedValueStore; // data store for serialized tensors
+ const StreamedValueStore::DataFromType _data_from_type;
+public:
+ SerializedFastValueAttribute(vespalib::stringref baseFileName, const Config &cfg);
+ virtual ~SerializedFastValueAttribute();
+ virtual void setTensor(DocId docId, const vespalib::eval::Value &tensor) override;
+ virtual std::unique_ptr<vespalib::eval::Value> getTensor(DocId docId) const override;
+ virtual bool onLoad() override;
+ virtual std::unique_ptr<AttributeSaver> onInitSave(vespalib::stringref fileName) override;
+ virtual void compactWorst() override;
+};
+
+}
diff --git a/searchlib/src/vespa/searchlib/tensor/streamed_value_saver.cpp b/searchlib/src/vespa/searchlib/tensor/streamed_value_saver.cpp
new file mode 100644
index 00000000000..d4fd681f2cb
--- /dev/null
+++ b/searchlib/src/vespa/searchlib/tensor/streamed_value_saver.cpp
@@ -0,0 +1,48 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "streamed_value_saver.h"
+#include "streamed_value_store.h"
+
+#include <vespa/searchlib/attribute/iattributesavetarget.h>
+#include <vespa/searchlib/util/bufferwriter.h>
+#include <vespa/vespalib/objects/nbostream.h>
+
+using vespalib::GenerationHandler;
+
+namespace search::tensor {
+
+StreamedValueSaver::
+StreamedValueSaver(GenerationHandler::Guard &&guard,
+ const attribute::AttributeHeader &header,
+ RefCopyVector &&refs,
+ const StreamedValueStore &tensorStore)
+ : AttributeSaver(std::move(guard), header),
+ _refs(std::move(refs)),
+ _tensorStore(tensorStore)
+{
+}
+
+StreamedValueSaver::~StreamedValueSaver() = default;
+
+bool
+StreamedValueSaver::onSave(IAttributeSaveTarget &saveTarget)
+{
+ auto datWriter = saveTarget.datWriter().allocBufferWriter();
+ const uint32_t docIdLimit(_refs.size());
+ vespalib::nbostream stream;
+ for (uint32_t lid = 0; lid < docIdLimit; ++lid) {
+ if (_tensorStore.encode_tensor(_refs[lid], stream)) {
+ uint32_t sz = stream.size();
+ datWriter->write(&sz, sizeof(sz));
+ datWriter->write(stream.peek(), stream.size());
+ stream.clear();
+ } else {
+ uint32_t sz = 0;
+ datWriter->write(&sz, sizeof(sz));
+ }
+ }
+ datWriter->flush();
+ return true;
+}
+
+} // namespace search::tensor
diff --git a/searchlib/src/vespa/searchlib/tensor/streamed_value_saver.h b/searchlib/src/vespa/searchlib/tensor/streamed_value_saver.h
new file mode 100644
index 00000000000..71d56539679
--- /dev/null
+++ b/searchlib/src/vespa/searchlib/tensor/streamed_value_saver.h
@@ -0,0 +1,35 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/searchlib/attribute/attributesaver.h>
+#include "tensor_attribute.h"
+
+namespace search::tensor {
+
+class StreamedValueStore;
+
+/*
+ * Class for saving a tensor attribute.
+ */
+class StreamedValueSaver : public AttributeSaver
+{
+public:
+ using RefCopyVector = TensorAttribute::RefCopyVector;
+private:
+ using GenerationHandler = vespalib::GenerationHandler;
+
+ RefCopyVector _refs;
+ const StreamedValueStore &_tensorStore;
+
+ bool onSave(IAttributeSaveTarget &saveTarget) override;
+public:
+ StreamedValueSaver(GenerationHandler::Guard &&guard,
+ const attribute::AttributeHeader &header,
+ RefCopyVector &&refs,
+ const StreamedValueStore &tensorStore);
+
+ virtual ~StreamedValueSaver();
+};
+
+} // namespace search::tensor
diff --git a/searchlib/src/vespa/searchlib/tensor/streamed_value_store.cpp b/searchlib/src/vespa/searchlib/tensor/streamed_value_store.cpp
new file mode 100644
index 00000000000..ae2e0e7ed10
--- /dev/null
+++ b/searchlib/src/vespa/searchlib/tensor/streamed_value_store.cpp
@@ -0,0 +1,228 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "streamed_value_store.h"
+#include "tensor_deserialize.h"
+#include <vespa/eval/eval/value.h>
+#include <vespa/eval/eval/value_codec.h>
+#include <vespa/eval/streamed/streamed_value_builder_factory.h>
+#include <vespa/eval/streamed/streamed_value_view.h>
+#include <vespa/vespalib/datastore/datastore.hpp>
+#include <vespa/vespalib/objects/nbostream.h>
+#include <vespa/vespalib/util/stringfmt.h>
+#include <vespa/log/log.h>
+
+LOG_SETUP(".searchlib.tensor.streamed_value_store");
+
+using vespalib::datastore::Handle;
+using namespace vespalib::eval;
+
+namespace search::tensor {
+
+namespace {
+
+constexpr size_t MIN_BUFFER_ARRAYS = 1024;
+
+struct CellsMemBlock {
+ uint32_t num;
+ uint32_t total_sz;
+ const char *ptr;
+ CellsMemBlock(TypedCells cells)
+ : num(cells.size),
+ total_sz(CellTypeUtils::mem_size(cells.type, num)),
+ ptr((const char *)cells.data)
+ {}
+};
+
+template<typename T>
+T *fix_alignment(T *ptr, size_t align)
+{
+ static_assert(sizeof(T) == 1);
+ assert((align & (align-1)) == 0); // must be 2^N
+ size_t ptr_val = (size_t)ptr;
+ size_t unalign = ptr_val & (align - 1);
+ if (unalign == 0) {
+ return ptr;
+ } else {
+ return ptr + (align - unalign);
+ }
+}
+
+} // namespace <unnamed>
+
+StreamedValueStore::StreamedValueStore(const ValueType &tensor_type)
+ : TensorStore(_concreteStore),
+ _concreteStore(),
+ _bufferType(RefType::align(1),
+ MIN_BUFFER_ARRAYS,
+ RefType::offsetSize() / RefType::align(1)),
+ _tensor_type(tensor_type),
+ _data_from_type(_tensor_type)
+{
+ _store.addType(&_bufferType);
+ _store.initActiveBuffers();
+}
+
+StreamedValueStore::~StreamedValueStore()
+{
+ _store.dropBuffers();
+}
+
+std::pair<const char *, uint32_t>
+StreamedValueStore::getRawBuffer(RefType ref) const
+{
+ if (!ref.valid()) {
+ return std::make_pair(nullptr, 0u);
+ }
+ const char *buf = _store.getEntry<char>(ref);
+ uint32_t len = *reinterpret_cast<const uint32_t *>(buf);
+ return std::make_pair(buf + sizeof(uint32_t), len);
+}
+
+Handle<char>
+StreamedValueStore::allocRawBuffer(uint32_t size)
+{
+ if (size == 0) {
+ return Handle<char>();
+ }
+ size_t extSize = size + sizeof(uint32_t);
+ size_t bufSize = RefType::align(extSize);
+ auto result = _concreteStore.rawAllocator<char>(_typeId).alloc(bufSize);
+ *reinterpret_cast<uint32_t *>(result.data) = size;
+ char *padWritePtr = result.data + extSize;
+ for (size_t i = extSize; i < bufSize; ++i) {
+ *padWritePtr++ = 0;
+ }
+ // Hide length of buffer (first 4 bytes) from users of the buffer.
+ return Handle<char>(result.ref, result.data + sizeof(uint32_t));
+}
+
+void
+StreamedValueStore::holdTensor(EntryRef ref)
+{
+ if (!ref.valid()) {
+ return;
+ }
+ RefType iRef(ref);
+ const char *buf = _store.getEntry<char>(iRef);
+ uint32_t len = *reinterpret_cast<const uint32_t *>(buf);
+ _concreteStore.holdElem(ref, len + sizeof(uint32_t));
+}
+
+TensorStore::EntryRef
+StreamedValueStore::move(EntryRef ref)
+{
+ if (!ref.valid()) {
+ return RefType();
+ }
+ auto oldraw = getRawBuffer(ref);
+ auto newraw = allocRawBuffer(oldraw.second);
+ memcpy(newraw.data, oldraw.first, oldraw.second);
+ _concreteStore.holdElem(ref, oldraw.second + sizeof(uint32_t));
+ return newraw.ref;
+}
+
+StreamedValueStore::StreamedValueData
+StreamedValueStore::get_tensor_data(EntryRef ref) const
+{
+ StreamedValueData retval;
+ retval.valid = false;
+ auto raw = getRawBuffer(ref);
+ if (raw.second == 0u) {
+ return retval;
+ }
+ vespalib::nbostream source(raw.first, raw.second);
+ uint32_t num_cells = source.readValue<uint32_t>();
+ {
+ uint32_t alignment = CellTypeUtils::alignment(_data_from_type.cell_type);
+ const char *aligned_ptr = fix_alignment(source.peek(), alignment);
+ size_t adjustment = aligned_ptr - source.peek();
+ source.adjustReadPos(adjustment);
+ }
+ retval.cells_ref = TypedCells(source.peek(), _data_from_type.cell_type, num_cells);
+ source.adjustReadPos(CellTypeUtils::mem_size(_data_from_type.cell_type, num_cells));
+ retval.num_subspaces = source.readValue<uint32_t>();
+ retval.labels_buffer = vespalib::ConstArrayRef<char>(source.peek(), source.size());
+ assert(retval.num_subspaces * _data_from_type.dense_subspace_size == num_cells);
+ retval.valid = true;
+ return retval;
+}
+
+bool
+StreamedValueStore::encode_tensor(EntryRef ref, vespalib::nbostream &target) const
+{
+ if (auto data = get_tensor_data(ref)) {
+ StreamedValueView value(
+ _tensor_type, _data_from_type.num_mapped_dimensions,
+ data.cells_ref, data.num_subspaces, data.labels_buffer);
+ vespalib::eval::encode_value(value, target);
+ return true;
+ } else {
+ return false;
+ }
+}
+
+void
+StreamedValueStore::serialize_labels(const Value::Index &index,
+ vespalib::nbostream &target) const
+{
+ uint32_t num_subspaces = index.size();
+ target << num_subspaces;
+ uint32_t num_mapped_dims = _data_from_type.num_mapped_dimensions;
+ std::vector<vespalib::stringref> labels(num_mapped_dims * num_subspaces);
+ auto view = index.create_view({});
+ view->lookup({});
+ std::vector<vespalib::stringref> addr(num_mapped_dims);
+ std::vector<vespalib::stringref *> addr_refs;
+ for (auto & label : addr) {
+ addr_refs.push_back(&label);
+ }
+ size_t subspace;
+ for (size_t ss = 0; ss < num_subspaces; ++ss) {
+ bool ok = view->next_result(addr_refs, subspace);
+ assert(ok);
+ size_t idx = subspace * num_mapped_dims;
+ for (auto label : addr) {
+ labels[idx++] = label;
+ }
+ }
+ bool ok = view->next_result(addr_refs, subspace);
+ assert(!ok);
+ for (auto label : labels) {
+ target.writeSmallString(label);
+ }
+}
+
+TensorStore::EntryRef
+StreamedValueStore::store_tensor(const Value &tensor)
+{
+ assert(tensor.type() == _tensor_type);
+ CellsMemBlock cells_mem(tensor.cells());
+ size_t alignment = CellTypeUtils::alignment(_data_from_type.cell_type);
+ size_t padding = alignment - 1;
+ vespalib::nbostream stream;
+ stream << uint32_t(cells_mem.num);
+ serialize_labels(tensor.index(), stream);
+ size_t mem_size = stream.size() + cells_mem.total_sz + padding;
+ auto raw = allocRawBuffer(mem_size);
+ char *target = raw.data;
+ memcpy(target, stream.peek(), sizeof(uint32_t));
+ stream.adjustReadPos(sizeof(uint32_t));
+ target += sizeof(uint32_t);
+ target = fix_alignment(target, alignment);
+ memcpy(target, cells_mem.ptr, cells_mem.total_sz);
+ target += cells_mem.total_sz;
+ memcpy(target, stream.peek(), stream.size());
+ target += stream.size();
+ assert(target <= raw.data + mem_size);
+ return raw.ref;
+}
+
+TensorStore::EntryRef
+StreamedValueStore::store_encoded_tensor(vespalib::nbostream &encoded)
+{
+ const auto &factory = StreamedValueBuilderFactory::get();
+ auto val = vespalib::eval::decode_value(encoded, factory);
+ return store_tensor(*val);
+}
+
+}
diff --git a/searchlib/src/vespa/searchlib/tensor/streamed_value_store.h b/searchlib/src/vespa/searchlib/tensor/streamed_value_store.h
new file mode 100644
index 00000000000..4e12296916d
--- /dev/null
+++ b/searchlib/src/vespa/searchlib/tensor/streamed_value_store.h
@@ -0,0 +1,98 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "tensor_store.h"
+#include <vespa/eval/eval/value_type.h>
+#include <vespa/eval/eval/value.h>
+#include <vespa/vespalib/objects/nbostream.h>
+#include <vespa/vespalib/util/typify.h>
+
+namespace search::tensor {
+
+/**
+ * Class for storing tensors in memory, with a special serialization
+ * format that can be used directly to make a StreamedValueView.
+ *
+ * The tensor type is owned by the store itself and will not be
+ * serialized at all.
+ *
+ * The parameters for serialization (see DataFromType) are:
+ * - number of mapped dimensions [MD]
+ * - dense subspace size [DS]
+ * - size of each cell [CS] - currently 4 (float) or 8 (double)
+ * - alignment for cells [CA] - currently 4 (float) or 8 (double)
+ * While the tensor value to be serialized has:
+ * - number of dense subspaces [ND]
+ * - labels for dense subspaces, ND * MD strings
+ * - cell values, ND * DS cells (each either float or double)
+ * The serialization format looks like:
+ *
+ * [bytes] : [format] : [description]
+ * 4 : n.b.o. uint32_ t : num cells = ND * DS
+ * 1-7 : (none) : padding to cell alignment CA
+ * CS * ND * DS : native float or double : cells
+ * 4 : n.b.o. uint32_t : number of subspaces = ND
+ * (depends) : n.b.o. strings : labels
+ *
+ * Here, n.b.o. means network byte order, or more precisely
+ * it's the format vespalib::nbostream uses for the given data type,
+ * including strings (where exact format depends on the string length).
+ * Note that the only unpredictably-sized data (the labels) are kept
+ * last.
+ * If we ever make a "hbostream" which uses host byte order, we
+ * could switch to that instead since these data are only kept in
+ * memory.
+ */
+class StreamedValueStore : public TensorStore {
+public:
+ using RefType = vespalib::datastore::AlignedEntryRefT<22, 3>;
+ using DataStoreType = vespalib::datastore::DataStoreT<RefType>;
+
+ struct StreamedValueData {
+ bool valid;
+ vespalib::eval::TypedCells cells_ref;
+ size_t num_subspaces;
+ vespalib::ConstArrayRef<char> labels_buffer;
+ operator bool() const { return valid; }
+ };
+
+ struct DataFromType {
+ uint32_t num_mapped_dimensions;
+ uint32_t dense_subspace_size;
+ vespalib::eval::CellType cell_type;
+
+ DataFromType(const vespalib::eval::ValueType& type)
+ : num_mapped_dimensions(type.count_mapped_dimensions()),
+ dense_subspace_size(type.dense_subspace_size()),
+ cell_type(type.cell_type())
+ {}
+ };
+
+private:
+ DataStoreType _concreteStore;
+ vespalib::datastore::BufferType<char> _bufferType;
+ vespalib::eval::ValueType _tensor_type;
+ DataFromType _data_from_type;
+
+ void serialize_labels(const vespalib::eval::Value::Index &index,
+ vespalib::nbostream &target) const;
+
+ std::pair<const char *, uint32_t> getRawBuffer(RefType ref) const;
+ vespalib::datastore::Handle<char> allocRawBuffer(uint32_t size);
+public:
+ StreamedValueStore(const vespalib::eval::ValueType &tensor_type);
+ virtual ~StreamedValueStore();
+
+ virtual void holdTensor(EntryRef ref) override;
+ virtual EntryRef move(EntryRef ref) override;
+
+ StreamedValueData get_tensor_data(EntryRef ref) const;
+ bool encode_tensor(EntryRef ref, vespalib::nbostream &target) const;
+
+ EntryRef store_tensor(const vespalib::eval::Value &tensor);
+ EntryRef store_encoded_tensor(vespalib::nbostream &encoded);
+};
+
+
+}
diff --git a/searchlib/src/vespa/searchlib/tensor/tensor_deserialize.cpp b/searchlib/src/vespa/searchlib/tensor/tensor_deserialize.cpp
index 83988a3af11..35be27bc03b 100644
--- a/searchlib/src/vespa/searchlib/tensor/tensor_deserialize.cpp
+++ b/searchlib/src/vespa/searchlib/tensor/tensor_deserialize.cpp
@@ -1,9 +1,9 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "tensor_deserialize.h"
#include <vespa/document/util/serializableexceptions.h>
#include <vespa/eval/eval/engine_or_factory.h>
#include <vespa/eval/eval/value.h>
-#include <vespa/vespalib/objects/nbostream.h>
using document::DeserializeException;
using vespalib::eval::EngineOrFactory;
@@ -11,14 +11,19 @@ using vespalib::eval::Value;
namespace search::tensor {
-std::unique_ptr<Value> deserialize_tensor(const void *data, size_t size)
+std::unique_ptr<Value> deserialize_tensor(vespalib::nbostream &buffer)
{
- vespalib::nbostream wrapStream(data, size);
- auto tensor = EngineOrFactory::get().decode(wrapStream);
- if (wrapStream.size() != 0) {
+ auto tensor = EngineOrFactory::get().decode(buffer);
+ if (buffer.size() != 0) {
throw DeserializeException("Leftover bytes deserializing tensor attribute value.", VESPA_STRLOC);
}
return tensor;
}
+std::unique_ptr<Value> deserialize_tensor(const void *data, size_t size)
+{
+ vespalib::nbostream wrapStream(data, size);
+ return deserialize_tensor(wrapStream);
+}
+
} // namespace
diff --git a/searchlib/src/vespa/searchlib/tensor/tensor_deserialize.h b/searchlib/src/vespa/searchlib/tensor/tensor_deserialize.h
index 18e166543d6..6f9521c1355 100644
--- a/searchlib/src/vespa/searchlib/tensor/tensor_deserialize.h
+++ b/searchlib/src/vespa/searchlib/tensor/tensor_deserialize.h
@@ -1,10 +1,14 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/eval/eval/value.h>
+#include <vespa/vespalib/objects/nbostream.h>
namespace search::tensor {
extern std::unique_ptr<vespalib::eval::Value>
deserialize_tensor(const void *data, size_t size);
+extern std::unique_ptr<vespalib::eval::Value>
+deserialize_tensor(vespalib::nbostream &stream);
+
} // namespace
diff --git a/slobrok/src/apps/slobrok/slobrok.cpp b/slobrok/src/apps/slobrok/slobrok.cpp
index b89449e6779..5d650fafc96 100644
--- a/slobrok/src/apps/slobrok/slobrok.cpp
+++ b/slobrok/src/apps/slobrok/slobrok.cpp
@@ -1,5 +1,4 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <vespa/fnet/fnet.h>
#include <vespa/slobrok/server/sbenv.h>
#include <vespa/config/common/exceptions.h>
#include <vespa/vespalib/util/exceptions.h>
diff --git a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp
index 2709ae9d7d7..3aa3d21aa7b 100644
--- a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp
+++ b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp
@@ -64,7 +64,7 @@ SharedRpcResources::SharedRpcResources(const config::ConfigUri& config_uri,
int rpc_server_port,
size_t rpc_thread_pool_size)
: _thread_pool(std::make_unique<FastOS_ThreadPool>(1024*60)),
- _transport(std::make_unique<FNET_Transport>(rpc_thread_pool_size)),
+ _transport(std::make_unique<FNET_Transport>(TransportConfig(rpc_thread_pool_size).events_before_wakeup(1))),
_orb(std::make_unique<FRT_Supervisor>(_transport.get())),
_slobrok_register(std::make_unique<slobrok::api::RegisterAPI>(*_orb, slobrok::ConfiguratorFactory(config_uri))),
_slobrok_mirror(std::make_unique<slobrok::api::MirrorAPI>(*_orb, slobrok::ConfiguratorFactory(config_uri))),
@@ -72,9 +72,7 @@ SharedRpcResources::SharedRpcResources(const config::ConfigUri& config_uri,
_hostname(vespalib::HostName::get()),
_rpc_server_port(rpc_server_port),
_shutdown(false)
-{
- _transport->events_before_wakeup(1);
-}
+{ }
// TODO make sure init/shutdown is safe for aborted init in comm. mgr.
diff --git a/storage/src/vespa/storage/tools/storage-cmd.cpp b/storage/src/vespa/storage/tools/storage-cmd.cpp
index 8c0fcc83330..00ee2c9c4cf 100644
--- a/storage/src/vespa/storage/tools/storage-cmd.cpp
+++ b/storage/src/vespa/storage/tools/storage-cmd.cpp
@@ -1,5 +1,7 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <vespa/fnet/frt/frt.h>
+#include <vespa/fnet/frt/rpcrequest.h>
+#include <vespa/fnet/frt/supervisor.h>
+#include <vespa/fnet/frt/target.h>
#include <vespa/slobrok/sbmirror.h>
#include <vespa/fastos/app.h>
#include <vespa/vespalib/locale/c.h>
diff --git a/vespaclient/src/vespa/vespaclient/vdsstates/statesapp.cpp b/vespaclient/src/vespa/vespaclient/vdsstates/statesapp.cpp
index 9393562bf28..a468dd98698 100644
--- a/vespaclient/src/vespa/vespaclient/vdsstates/statesapp.cpp
+++ b/vespaclient/src/vespa/vespaclient/vdsstates/statesapp.cpp
@@ -1,8 +1,10 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/defaults.h>
-#include <vespa/fnet/frt/frt.h>
#include <vespa/slobrok/sbmirror.h>
+#include <vespa/fnet/frt/supervisor.h>
+#include <vespa/fnet/frt/target.h>
+#include <vespa/fnet/frt/rpcrequest.h>
#include <vespa/vdslib/distribution/distribution.h>
#include <vespa/vdslib/state/clusterstate.h>
#include <vespa/vespalib/util/programoptions.h>