aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xconfig-lib/src/main/java/com/yahoo/config/FileReference.java12
-rw-r--r--config-model/src/main/java/com/yahoo/config/model/deploy/TestProperties.java8
-rw-r--r--config-model/src/main/java/com/yahoo/vespa/model/container/http/ssl/HostedSslConnectorFactory.java12
-rw-r--r--config-model/src/main/java/com/yahoo/vespa/model/search/Tuning.java16
-rw-r--r--config-model/src/test/java/com/yahoo/config/model/provision/ModelProvisioningTest.java42
-rw-r--r--config-model/src/test/java/com/yahoo/vespa/model/builder/xml/dom/DomSearchTuningBuilderTest.java3
-rw-r--r--config-model/src/test/java/com/yahoo/vespa/model/container/xml/ContainerModelBuilderTest.java2
-rw-r--r--config-model/src/test/java/com/yahoo/vespa/model/test/VespaModelTester.java9
-rw-r--r--container-core/src/main/java/com/yahoo/container/core/config/BundleLoader.java11
-rw-r--r--container-search/src/main/java/com/yahoo/search/query/Select.java9
-rw-r--r--container-search/src/test/java/com/yahoo/select/SelectTestCase.java8
-rw-r--r--controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiHandler.java9
-rw-r--r--controller-server/src/test/java/com/yahoo/vespa/hosted/controller/ControllerTest.java2
-rw-r--r--controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/responses/deployment-with-routing-policy.json3
-rw-r--r--controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/responses/deployment.json3
-rw-r--r--controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/responses/dev-us-east-1.json3
-rw-r--r--controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/responses/prod-us-central-1.json3
-rw-r--r--document/src/tests/documentselectparsertest.cpp82
-rw-r--r--document/src/vespa/document/select/CMakeLists.txt1
-rw-r--r--document/src/vespa/document/select/branch.cpp6
-rw-r--r--document/src/vespa/document/select/branch.h9
-rw-r--r--document/src/vespa/document/select/compare.cpp2
-rw-r--r--document/src/vespa/document/select/grammar/lexer.ll7
-rw-r--r--document/src/vespa/document/select/node.h29
-rw-r--r--document/src/vespa/document/select/parser.cpp14
-rw-r--r--document/src/vespa/document/select/parser_limits.cpp13
-rw-r--r--document/src/vespa/document/select/parser_limits.h19
-rw-r--r--document/src/vespa/document/select/valuenode.h26
-rw-r--r--document/src/vespa/document/select/valuenodes.cpp39
-rw-r--r--document/src/vespa/document/select/valuenodes.h10
-rw-r--r--fbench/src/fbench/fbench.cpp54
-rw-r--r--fbench/src/util/clientstatus.cpp6
-rw-r--r--filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java56
-rw-r--r--filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java8
-rw-r--r--filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java23
-rw-r--r--filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java76
-rw-r--r--flags/src/main/java/com/yahoo/vespa/flags/Flags.java16
-rw-r--r--jdisc_http_service/abi-spec.json42
-rw-r--r--jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/HttpRequestDispatch.java34
-rw-r--r--jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/JDiscHttpServlet.java2
-rw-r--r--jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/JettyHttpServer.java31
-rw-r--r--jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/SecuredRedirectHandler.java52
-rw-r--r--jdisc_http_service/src/main/resources/configdefinitions/jdisc.http.connector.def12
-rw-r--r--jdisc_http_service/src/main/resources/configdefinitions/jdisc.http.server.def5
-rw-r--r--jdisc_http_service/src/test/java/com/yahoo/jdisc/http/server/jetty/HttpServerTest.java4
-rw-r--r--messagebus/src/tests/CMakeLists.txt1
-rw-r--r--messagebus/src/tests/error/error.cpp22
-rw-r--r--messagebus/src/tests/loadbalance/.gitignore4
-rw-r--r--messagebus/src/tests/loadbalance/CMakeLists.txt9
-rw-r--r--messagebus/src/tests/loadbalance/loadbalance.cpp88
-rw-r--r--messagebus/src/tests/messagebus/messagebus.cpp181
-rw-r--r--messagebus/src/tests/serviceaddress/serviceaddress.cpp107
-rw-r--r--messagebus/src/tests/servicepool/servicepool.cpp84
-rw-r--r--messagebus/src/tests/shutdown/shutdown.cpp33
-rw-r--r--messagebus/src/tests/targetpool/targetpool.cpp31
-rw-r--r--messagebus/src/vespa/messagebus/callstack.cpp2
-rw-r--r--messagebus/src/vespa/messagebus/messenger.cpp4
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.cpp86
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.h20
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp4
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetworkparams.h33
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsend.cpp25
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcservice.cpp50
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcservice.h18
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcserviceaddress.h2
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcservicepool.cpp52
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcservicepool.h19
-rw-r--r--messagebus/src/vespa/messagebus/network/rpctargetpool.cpp6
-rw-r--r--messagebus/src/vespa/messagebus/network/rpctargetpool.h5
-rw-r--r--messagebus_test/src/tests/error/cpp-client.cpp7
-rw-r--r--messagebus_test/src/tests/error/error.cpp41
-rw-r--r--node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/yum/YumPackageName.java11
-rw-r--r--node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/task/util/yum/YumPackageNameTest.java6
-rw-r--r--node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/MetricsReporter.java9
-rw-r--r--node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/CapacityPolicies.java1
-rw-r--r--node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/MetricsReporterTest.java5
-rw-r--r--node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/ProvisioningTest.java15
-rw-r--r--searchlib/src/vespa/searchlib/tensor/hnsw_graph.cpp19
-rw-r--r--searchlib/src/vespa/searchlib/tensor/hnsw_graph.h2
-rw-r--r--searchlib/src/vespa/searchlib/tensor/hnsw_index.cpp41
-rw-r--r--searchlib/src/vespa/searchlib/tensor/hnsw_index.h1
-rw-r--r--searchlib/src/vespa/searchlib/tensor/hnsw_index_loader.cpp12
-rw-r--r--searchlib/src/vespa/searchlib/tensor/hnsw_index_loader.h2
-rw-r--r--slobrok/src/tests/configure/configure.cpp12
-rw-r--r--storage/src/tests/storageserver/communicationmanagertest.cpp4
-rw-r--r--storage/src/vespa/storage/common/storagelink.cpp8
-rw-r--r--storage/src/vespa/storage/config/stor-communicationmanager.def6
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.cpp25
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.h3
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java279
-rw-r--r--vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java191
-rw-r--r--vespalib/src/vespa/vespalib/util/gencnt.cpp2
-rw-r--r--vespalib/src/vespa/vespalib/util/gencnt.h9
-rw-r--r--vespalib/src/vespa/vespalib/util/signalhandler.cpp35
-rw-r--r--vespalog/abi-spec.json20
-rw-r--r--vespalog/src/main/java/com/yahoo/log/LevelController.java26
-rw-r--r--vespalog/src/main/java/com/yahoo/log/MappedLevelControllerRepo.java8
-rw-r--r--vespalog/src/main/java/com/yahoo/log/VespaLevelControllerRepo.java14
-rw-r--r--vespalog/src/main/java/com/yahoo/log/VespaLogHandler.java2
-rw-r--r--vespalog/src/test/java/com/yahoo/log/VespaLevelControllerRepoTest.java10
-rw-r--r--vespalog/src/test/java/com/yahoo/log/VespaLogHandlerTestCase.java65
-rw-r--r--vespalog/src/vespa/log/control-file.cpp2
102 files changed, 1539 insertions, 1073 deletions
diff --git a/config-lib/src/main/java/com/yahoo/config/FileReference.java b/config-lib/src/main/java/com/yahoo/config/FileReference.java
index 3b95c2fbd4c..ee99ebfa2b7 100755
--- a/config-lib/src/main/java/com/yahoo/config/FileReference.java
+++ b/config-lib/src/main/java/com/yahoo/config/FileReference.java
@@ -28,14 +28,16 @@ public final class FileReference {
}
@Override
- public int hashCode() {
- return value.hashCode();
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ FileReference that = (FileReference) o;
+ return value.equals(that.value);
}
@Override
- public boolean equals(Object other) {
- return other instanceof FileReference &&
- value.equals(((FileReference)other).value);
+ public int hashCode() {
+ return Objects.hash(value);
}
@Override
diff --git a/config-model/src/main/java/com/yahoo/config/model/deploy/TestProperties.java b/config-model/src/main/java/com/yahoo/config/model/deploy/TestProperties.java
index e37b0b07746..896c6ea9a7f 100644
--- a/config-model/src/main/java/com/yahoo/config/model/deploy/TestProperties.java
+++ b/config-model/src/main/java/com/yahoo/config/model/deploy/TestProperties.java
@@ -43,7 +43,6 @@ public class TestProperties implements ModelContext.Properties {
private double defaultTermwiseLimit = 1.0;
private Optional<EndpointCertificateSecrets> endpointCertificateSecrets = Optional.empty();
private boolean useNewAthenzFilter = false;
- private boolean useDedicatedNodesWhenUnspecified = false;
private AthenzDomain athenzDomain;
@Override public boolean multitenant() { return multitenant; }
@@ -65,7 +64,7 @@ public class TestProperties implements ModelContext.Properties {
@Override public double defaultTermwiseLimit() { return defaultTermwiseLimit; }
@Override public boolean useBucketSpaceMetric() { return true; }
@Override public boolean useNewAthenzFilter() { return useNewAthenzFilter; }
- @Override public boolean useDedicatedNodesWhenUnspecified() { return useDedicatedNodesWhenUnspecified; }
+ @Override public boolean useDedicatedNodesWhenUnspecified() { return true; }
@Override public Optional<AthenzDomain> athenzDomain() { return Optional.ofNullable(athenzDomain); }
public TestProperties setDefaultTermwiseLimit(double limit) {
@@ -118,11 +117,6 @@ public class TestProperties implements ModelContext.Properties {
return this;
}
- public TestProperties setUseDedicatedNodesWhenUnspecified(boolean useDedicatedNodesWhenUnspecified) {
- this.useDedicatedNodesWhenUnspecified = useDedicatedNodesWhenUnspecified;
- return this;
- }
-
public TestProperties setAthenzDomain(AthenzDomain domain) {
this.athenzDomain = domain;
return this;
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/container/http/ssl/HostedSslConnectorFactory.java b/config-model/src/main/java/com/yahoo/vespa/model/container/http/ssl/HostedSslConnectorFactory.java
index f61618c789b..fb8e9dffbbb 100644
--- a/config-model/src/main/java/com/yahoo/vespa/model/container/http/ssl/HostedSslConnectorFactory.java
+++ b/config-model/src/main/java/com/yahoo/vespa/model/container/http/ssl/HostedSslConnectorFactory.java
@@ -7,6 +7,7 @@ import com.yahoo.jdisc.http.ConnectorConfig.Ssl.ClientAuth;
import com.yahoo.vespa.model.container.component.SimpleComponent;
import com.yahoo.vespa.model.container.http.ConnectorFactory;
+import java.time.Duration;
import java.util.List;
/**
@@ -67,10 +68,13 @@ public class HostedSslConnectorFactory extends ConnectorFactory {
@Override
public void getConfig(ConnectorConfig.Builder connectorBuilder) {
super.getConfig(connectorBuilder);
- connectorBuilder.tlsClientAuthEnforcer(new ConnectorConfig.TlsClientAuthEnforcer.Builder()
- .pathWhitelist(INSECURE_WHITELISTED_PATHS)
- .enable(enforceClientAuth));
- connectorBuilder.proxyProtocol(configureProxyProtocol());
+ connectorBuilder
+ .tlsClientAuthEnforcer(new ConnectorConfig.TlsClientAuthEnforcer.Builder()
+ .pathWhitelist(INSECURE_WHITELISTED_PATHS)
+ .enable(enforceClientAuth))
+ .proxyProtocol(configureProxyProtocol())
+ .idleTimeout(Duration.ofMinutes(3).toSeconds())
+ .maxConnectionLife(Duration.ofMinutes(10).toSeconds());
}
private ConnectorConfig.ProxyProtocol.Builder configureProxyProtocol() {
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/search/Tuning.java b/config-model/src/main/java/com/yahoo/vespa/model/search/Tuning.java
index f5a91297e9e..f93bf6fc872 100644
--- a/config-model/src/main/java/com/yahoo/vespa/model/search/Tuning.java
+++ b/config-model/src/main/java/com/yahoo/vespa/model/search/Tuning.java
@@ -241,7 +241,12 @@ public class Tuning extends AbstractConfigProducer implements ProtonConfig.Produ
public Integer level = null;
public void getConfig(ProtonConfig.Summary.Cache.Compression.Builder compression) {
- if (type != null) compression.type(ProtonConfig.Summary.Cache.Compression.Type.Enum.valueOf(type.name));
+ if (type != null) compression.type(ProtonConfig.Summary.Cache.Compression.Type.Enum.valueOf(type.name));
+ if (level != null) compression.level(level);
+ }
+
+ public void getConfig(ProtonConfig.Summary.Log.Compact.Compression.Builder compression) {
+ if (type != null) compression.type(ProtonConfig.Summary.Log.Compact.Compression.Type.Enum.valueOf(type.name));
if (level != null) compression.level(level);
}
@@ -281,6 +286,12 @@ public class Tuning extends AbstractConfigProducer implements ProtonConfig.Produ
}
}
+ public void getConfig(ProtonConfig.Summary.Log.Compact.Builder compact) {
+ if (compression != null) {
+ compression.getConfig(compact.compression);
+ }
+ }
+
public void getConfig(ProtonConfig.Summary.Log.Chunk.Builder chunk) {
if (outputInt) {
if (maxSize!=null) chunk.maxbytes(maxSize.intValue());
@@ -288,7 +299,7 @@ public class Tuning extends AbstractConfigProducer implements ProtonConfig.Produ
throw new IllegalStateException("Fix this, chunk does not have long types");
}
if (compression != null) {
- compression.getConfig(chunk.compression);
+ compression.getConfig(chunk.compression);
}
}
}
@@ -303,6 +314,7 @@ public class Tuning extends AbstractConfigProducer implements ProtonConfig.Produ
if (minFileSizeFactor!=null) log.minfilesizefactor(minFileSizeFactor);
if (chunk != null) {
chunk.getConfig(log.chunk);
+ chunk.getConfig(log.compact);
}
}
}
diff --git a/config-model/src/test/java/com/yahoo/config/model/provision/ModelProvisioningTest.java b/config-model/src/test/java/com/yahoo/config/model/provision/ModelProvisioningTest.java
index ebafe9dd45b..7208d8c5fc1 100644
--- a/config-model/src/test/java/com/yahoo/config/model/provision/ModelProvisioningTest.java
+++ b/config-model/src/test/java/com/yahoo/config/model/provision/ModelProvisioningTest.java
@@ -1364,10 +1364,11 @@ public class ModelProvisioningTest {
" </http>" +
"</container>";
VespaModelTester tester = new VespaModelTester();
- tester.addHosts(1);
+ tester.addHosts(2);
VespaModel model = tester.createModel(services, true);
- assertEquals(1, model.getHosts().size());
+ assertEquals(2, model.getHosts().size());
assertEquals(1, model.getContainerClusters().size());
+ assertEquals(2, model.getContainerClusters().get("foo").getContainers().size());
}
@Test
@@ -1430,7 +1431,7 @@ public class ModelProvisioningTest {
}
@Test
- public void testNoNodeTagMeans1Node() {
+ public void testNoNodeTagMeansTwoNodes() {
String services =
"<?xml version='1.0' encoding='utf-8' ?>\n" +
"<services>" +
@@ -1445,31 +1446,6 @@ public class ModelProvisioningTest {
" </content>" +
"</services>";
VespaModelTester tester = new VespaModelTester();
- tester.addHosts(1);
- VespaModel model = tester.createModel(services, true);
- assertEquals(1, model.getRoot().hostSystem().getHosts().size());
- assertEquals(1, model.getAdmin().getSlobroks().size());
- assertEquals(1, model.getContainerClusters().get("foo").getContainers().size());
- assertEquals(1, model.getContentClusters().get("bar").getRootGroup().countNodes());
- }
-
- @Test
- public void testNoNodeTagMeansTwoNodesInContainerClusterWithFeatureFlag() {
- String services =
- "<?xml version='1.0' encoding='utf-8' ?>\n" +
- "<services>" +
- " <container id='foo' version='1.0'>" +
- " <search/>" +
- " <document-api/>" +
- " </container>" +
- " <content version='1.0' id='bar'>" +
- " <documents>" +
- " <document type='type1' mode='index'/>" +
- " </documents>" +
- " </content>" +
- "</services>";
- VespaModelTester tester = new VespaModelTester();
- tester.setUseDedicatedNodesWhenUnspecified(true);
tester.addHosts(3);
VespaModel model = tester.createModel(services, true);
assertEquals(3, model.getRoot().hostSystem().getHosts().size());
@@ -1479,7 +1455,7 @@ public class ModelProvisioningTest {
}
@Test
- public void testNoNodeTagMeans1NodeNoContent() {
+ public void testNoNodeTagMeansTwoNodesNoContent() {
String services =
"<?xml version='1.0' encoding='utf-8' ?>\n" +
"<services>" +
@@ -1489,11 +1465,11 @@ public class ModelProvisioningTest {
" </container>" +
"</services>";
VespaModelTester tester = new VespaModelTester();
- tester.addHosts(1);
+ tester.addHosts(2);
VespaModel model = tester.createModel(services, true);
- assertEquals(1, model.getRoot().hostSystem().getHosts().size());
- assertEquals(1, model.getAdmin().getSlobroks().size());
- assertEquals(1, model.getContainerClusters().get("foo").getContainers().size());
+ assertEquals(2, model.getRoot().hostSystem().getHosts().size());
+ assertEquals(2, model.getAdmin().getSlobroks().size());
+ assertEquals(2, model.getContainerClusters().get("foo").getContainers().size());
}
@Test
diff --git a/config-model/src/test/java/com/yahoo/vespa/model/builder/xml/dom/DomSearchTuningBuilderTest.java b/config-model/src/test/java/com/yahoo/vespa/model/builder/xml/dom/DomSearchTuningBuilderTest.java
index 060fff5bf66..73dd1ca3f3b 100644
--- a/config-model/src/test/java/com/yahoo/vespa/model/builder/xml/dom/DomSearchTuningBuilderTest.java
+++ b/config-model/src/test/java/com/yahoo/vespa/model/builder/xml/dom/DomSearchTuningBuilderTest.java
@@ -4,7 +4,6 @@ package com.yahoo.vespa.model.builder.xml.dom;
import com.yahoo.collections.CollectionUtil;
import com.yahoo.vespa.config.search.core.ProtonConfig;
import com.yahoo.config.model.builder.xml.test.DomBuilderTest;
-import com.yahoo.vespa.model.content.DispatchTuning;
import com.yahoo.vespa.model.search.Tuning;
import org.junit.Test;
import org.w3c.dom.Element;
@@ -228,6 +227,8 @@ public class DomSearchTuningBuilderTest extends DomBuilderTest {
assertEquals(cfg.summary().log().chunk().maxbytes(), 256);
assertEquals(cfg.summary().log().chunk().compression().type(), ProtonConfig.Summary.Log.Chunk.Compression.Type.LZ4);
assertEquals(cfg.summary().log().chunk().compression().level(), 5);
+ assertEquals(cfg.summary().log().compact().compression().type(), ProtonConfig.Summary.Log.Compact.Compression.Type.LZ4);
+ assertEquals(cfg.summary().log().compact().compression().level(), 5);
}
@Test
diff --git a/config-model/src/test/java/com/yahoo/vespa/model/container/xml/ContainerModelBuilderTest.java b/config-model/src/test/java/com/yahoo/vespa/model/container/xml/ContainerModelBuilderTest.java
index 47d85465f3f..dcd1c46e52f 100644
--- a/config-model/src/test/java/com/yahoo/vespa/model/container/xml/ContainerModelBuilderTest.java
+++ b/config-model/src/test/java/com/yahoo/vespa/model/container/xml/ContainerModelBuilderTest.java
@@ -624,7 +624,7 @@ public class ContainerModelBuilderTest extends ContainerModelBuilderTestBase {
.setMultitenant(true)
.setHostedVespa(true))
.build());
- assertEquals(1, model.hostSystem().getHosts().size());
+ assertEquals(2, model.hostSystem().getHosts().size());
}
@Test(expected = IllegalArgumentException.class)
diff --git a/config-model/src/test/java/com/yahoo/vespa/model/test/VespaModelTester.java b/config-model/src/test/java/com/yahoo/vespa/model/test/VespaModelTester.java
index 17c7b3a308d..cdfd9fab194 100644
--- a/config-model/src/test/java/com/yahoo/vespa/model/test/VespaModelTester.java
+++ b/config-model/src/test/java/com/yahoo/vespa/model/test/VespaModelTester.java
@@ -15,7 +15,6 @@ import com.yahoo.config.model.provision.SingleNodeProvisioner;
import com.yahoo.config.provision.ApplicationId;
import com.yahoo.config.provision.Flavor;
import com.yahoo.config.provision.NodeResources;
-import com.yahoo.config.provision.Provisioner;
import com.yahoo.config.provision.Zone;
import com.yahoo.vespa.model.VespaModel;
import com.yahoo.vespa.model.test.utils.ApplicationPackageUtils;
@@ -49,7 +48,6 @@ public class VespaModelTester {
private Map<NodeResources, Collection<Host>> hostsByResources = new HashMap<>();
private ApplicationId applicationId = ApplicationId.defaultId();
private boolean useDedicatedNodeForLogserver = false;
- private boolean useDedicatedNodesWhenUnspecified = false;
public VespaModelTester() {
this(new NullConfigModelRegistry());
@@ -99,10 +97,6 @@ public class VespaModelTester {
this.useDedicatedNodeForLogserver = useDedicatedNodeForLogserver;
}
- public void setUseDedicatedNodesWhenUnspecified(boolean useDedicatedNodesWhenUnspecified) {
- this.useDedicatedNodesWhenUnspecified = useDedicatedNodesWhenUnspecified;
- }
-
/** Creates a model which uses 0 as start index and fails on out of capacity */
public VespaModel createModel(String services, String ... retiredHostNames) {
return createModel(Zone.defaultZone(), services, true, retiredHostNames);
@@ -151,8 +145,7 @@ public class VespaModelTester {
.setMultitenant(true)
.setHostedVespa(hosted)
.setApplicationId(applicationId)
- .setUseDedicatedNodeForLogserver(useDedicatedNodeForLogserver)
- .setUseDedicatedNodesWhenUnspecified(useDedicatedNodesWhenUnspecified);
+ .setUseDedicatedNodeForLogserver(useDedicatedNodeForLogserver);
DeployState deployState = new DeployState.Builder()
.applicationPackage(appPkg)
diff --git a/container-core/src/main/java/com/yahoo/container/core/config/BundleLoader.java b/container-core/src/main/java/com/yahoo/container/core/config/BundleLoader.java
index 4c3d76436dd..ca11ad387ee 100644
--- a/container-core/src/main/java/com/yahoo/container/core/config/BundleLoader.java
+++ b/container-core/src/main/java/com/yahoo/container/core/config/BundleLoader.java
@@ -148,10 +148,9 @@ public class BundleLoader {
/**
* Returns the bundles that are not assumed to be retained by the new application generation.
- * and cleans up the map of active file references. Note that at this point we don't yet know
- * the full set of new bundles, because of the potential pre-install directives in the new bundles.
- * However, only "disk bundles" (file:) can be listed in the pre-install directive, so we know
- * about all the obsolete application bundles.
+ * Note that at this point we don't yet know the full set of new bundles, because of the potential
+ * pre-install directives in the new bundles. However, only "disk bundles" (file:) can be listed
+ * in the pre-install directive, so we know about all the obsolete application bundles.
*/
private Set<Bundle> getObsoleteBundles(List<FileReference> newReferences) {
Set<Bundle> bundlesToRemove = new HashSet<>(osgi.getCurrentBundles());
@@ -165,6 +164,9 @@ public class BundleLoader {
return bundlesToRemove;
}
+ /**
+ * Cleans up the map of active file references
+ */
private void removeInactiveFileReferences(List<FileReference> newReferences) {
// Clean up the map of active bundles
Set<FileReference> fileReferencesToRemove = getObsoleteFileReferences(newReferences);
@@ -184,6 +186,7 @@ public class BundleLoader {
// The bundle at index 0 for each file reference always corresponds to the bundle at the file reference location
Set<Bundle> allowedDuplicates = obsoleteReferences.stream()
+ .filter(reference -> ! isDiskBundle(reference))
.map(reference -> reference2Bundles.get(reference).get(0))
.collect(Collectors.toSet());
diff --git a/container-search/src/main/java/com/yahoo/search/query/Select.java b/container-search/src/main/java/com/yahoo/search/query/Select.java
index cb662dcd671..65ffd29efe0 100644
--- a/container-search/src/main/java/com/yahoo/search/query/Select.java
+++ b/container-search/src/main/java/com/yahoo/search/query/Select.java
@@ -57,12 +57,13 @@ public class Select implements Cloneable {
}
public Select(String where, String grouping, Query query) {
- this(where, grouping, query, Collections.emptyList());
+ this(where, grouping, null, query, Collections.emptyList());
}
- private Select(String where, String grouping, Query query, List<GroupingRequest> groupingRequests) {
+ private Select(String where, String grouping, String groupingExpressionString, Query query, List<GroupingRequest> groupingRequests) {
this.where = Objects.requireNonNull(where, "A Select must have a where string (possibly the empty string)");
this.grouping = Objects.requireNonNull(grouping, "A Select must have a select string (possibly the empty string)");
+ this.groupingExpressionString = groupingExpressionString;
this.parent = Objects.requireNonNull(query, "A Select must have a parent query");
this.groupingRequests = deepCopy(groupingRequests, this);
}
@@ -136,11 +137,11 @@ public class Select implements Cloneable {
@Override
public Object clone() {
- return new Select(where, grouping, parent, groupingRequests);
+ return new Select(where, grouping, groupingExpressionString, parent, groupingRequests);
}
public Select cloneFor(Query parent) {
- return new Select(where, grouping, parent, groupingRequests);
+ return new Select(where, grouping, groupingExpressionString, parent, groupingRequests);
}
}
diff --git a/container-search/src/test/java/com/yahoo/select/SelectTestCase.java b/container-search/src/test/java/com/yahoo/select/SelectTestCase.java
index 7b1b4fe6362..1715ed38964 100644
--- a/container-search/src/test/java/com/yahoo/select/SelectTestCase.java
+++ b/container-search/src/test/java/com/yahoo/select/SelectTestCase.java
@@ -724,6 +724,7 @@ public class SelectTestCase {
assertEquals("all(group(time.dayofmonth(a)) each(output(count())))", query.getSelect().getGrouping().get(0).toString());
Query clone = query.clone();
+ assertEquals(clone.getSelect().getGroupingExpressionString(), query.getSelect().getGroupingExpressionString());
assertNotSame(query.getSelect(), clone.getSelect());
assertNotSame(query.getSelect().getGrouping(), clone.getSelect().getGrouping());
assertNotSame(query.getSelect().getGrouping().get(0), clone.getSelect().getGrouping().get(0));
@@ -732,8 +733,15 @@ public class SelectTestCase {
assertEquals(query.getSelect().getGroupingString(), clone.getSelect().getGroupingString());
assertEquals(query.getSelect().getGrouping().get(0).toString(), clone.getSelect().getGrouping().get(0).toString());
assertEquals(query.getSelect().getGrouping().get(1).toString(), clone.getSelect().getGrouping().get(1).toString());
+ }
+ @Test
+ public void testCloneWithGroupingExpressionString() {
+ Query query = new Query();
+ query.getSelect().setGroupingExpressionString("all(group(foo) each(output(count())))");
+ Query clone = query.clone();
+ assertEquals(clone.getSelect().getGroupingExpressionString(), query.getSelect().getGroupingExpressionString());
}
//------------------------------------------------------------------- Assert methods
diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiHandler.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiHandler.java
index 08f22ac778e..d7ad96ec5e7 100644
--- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiHandler.java
+++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiHandler.java
@@ -29,7 +29,6 @@ import com.yahoo.slime.Cursor;
import com.yahoo.slime.Inspector;
import com.yahoo.slime.Slime;
import com.yahoo.slime.SlimeUtils;
-import com.yahoo.vespa.athenz.api.AthenzPrincipal;
import com.yahoo.vespa.hosted.controller.Application;
import com.yahoo.vespa.hosted.controller.Controller;
import com.yahoo.vespa.hosted.controller.Instance;
@@ -106,7 +105,6 @@ import java.time.Duration;
import java.time.Instant;
import java.time.YearMonth;
import java.time.format.DateTimeParseException;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Comparator;
@@ -1043,12 +1041,8 @@ public class ApplicationApiHandler extends LoggingRequestHandler {
// Add zone endpoints
var endpointArray = response.setArray("endpoints");
- var serviceUrls = new ArrayList<URI>();
for (var endpoint : controller.routing().endpointsOf(deploymentId)) {
toSlime(endpoint, endpoint.name(), endpointArray.addObject());
- if (endpoint.routingMethod() == RoutingMethod.shared) {
- serviceUrls.add(endpoint.url());
- }
}
// Add global endpoints
var globalEndpoints = controller.routing().endpointsOf(application, deploymentId.applicationId().instance())
@@ -1058,9 +1052,6 @@ public class ApplicationApiHandler extends LoggingRequestHandler {
// TODO(mpolden): Pass cluster name. Cluster that a global endpoint points to is not available at this level.
toSlime(endpoint, "", endpointArray.addObject());
}
- // TODO(mpolden): Remove this once all clients stop reading it
- Cursor serviceUrlArray = response.setArray("serviceUrls");
- serviceUrls.forEach(url -> serviceUrlArray.addString(url.toString()));
response.setString("nodes", withPath("/zone/v2/" + deploymentId.zoneId().environment() + "/" + deploymentId.zoneId().region() + "/nodes/v2/node/?&recursive=true&application=" + deploymentId.applicationId().tenant() + "." + deploymentId.applicationId().application() + "." + deploymentId.applicationId().instance(), request.getUri()).toString());
response.setString("yamasUrl", monitoringSystemUri(deploymentId).toString());
diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/ControllerTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/ControllerTest.java
index 497a0ddf5a0..351f530f623 100644
--- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/ControllerTest.java
+++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/ControllerTest.java
@@ -788,6 +788,7 @@ public class ControllerTest {
@Test
public void testDeployWithRoutingGeneratorEndpoints() {
+ ((InMemoryFlagSource) tester.controller().flagSource()).withBooleanFlag(Flags.DISABLE_ROUTING_GENERATOR.id(), false);
var context = tester.newDeploymentContext();
var applicationPackage = new ApplicationPackageBuilder()
.upgradePolicy("default")
@@ -804,6 +805,7 @@ public class ControllerTest {
List.of(new RoutingEndpoint("http://legacy-endpoint", "hostname",
false, "upstreamName")));
}
+
// Defer load balancer provisioning in all environments so that routing controller uses routing generator
context.deferLoadBalancerProvisioningIn(zones.stream().map(ZoneId::environment).collect(Collectors.toSet()))
.submit(applicationPackage)
diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/responses/deployment-with-routing-policy.json b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/responses/deployment-with-routing-policy.json
index cd47859c7cc..63e6e4b3937 100644
--- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/responses/deployment-with-routing-policy.json
+++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/responses/deployment-with-routing-policy.json
@@ -20,9 +20,6 @@
"routingMethod": "shared"
}
],
- "serviceUrls": [
- "https://instance1--application1--tenant1.us-west-1.vespa.oath.cloud:4443/"
- ],
"nodes": "http://localhost:8080/zone/v2/prod/us-west-1/nodes/v2/node/%3F&recursive=true&application=tenant1.application1.instance1",
"yamasUrl": "http://monitoring-system.test/?environment=prod&region=us-west-1&application=tenant1.application1.instance1",
"version": "(ignore)",
diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/responses/deployment.json b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/responses/deployment.json
index 726df575028..928525a20d1 100644
--- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/responses/deployment.json
+++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/responses/deployment.json
@@ -20,9 +20,6 @@
"routingMethod": "shared"
}
],
- "serviceUrls": [
- "https://instance1--application1--tenant1.us-central-1.vespa.oath.cloud:4443/"
- ],
"nodes": "http://localhost:8080/zone/v2/prod/us-central-1/nodes/v2/node/%3F&recursive=true&application=tenant1.application1.instance1",
"yamasUrl": "http://monitoring-system.test/?environment=prod&region=us-central-1&application=tenant1.application1.instance1",
"version": "(ignore)",
diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/responses/dev-us-east-1.json b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/responses/dev-us-east-1.json
index 7c231beb5ed..83fa1983957 100644
--- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/responses/dev-us-east-1.json
+++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/responses/dev-us-east-1.json
@@ -13,9 +13,6 @@
"routingMethod": "shared"
}
],
- "serviceUrls": [
- "https://instance1--application1--tenant1.us-east-1.dev.vespa.oath.cloud:4443/"
- ],
"nodes": "http://localhost:8080/zone/v2/dev/us-east-1/nodes/v2/node/%3F&recursive=true&application=tenant1.application1.instance1",
"yamasUrl": "http://monitoring-system.test/?environment=dev&region=us-east-1&application=tenant1.application1.instance1",
"version": "(ignore)",
diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/responses/prod-us-central-1.json b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/responses/prod-us-central-1.json
index 41f3908f12f..4ffe809297d 100644
--- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/responses/prod-us-central-1.json
+++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/responses/prod-us-central-1.json
@@ -23,9 +23,6 @@
"routingMethod": "shared"
}
],
- "serviceUrls": [
- "https://instance1--application1--tenant1.us-central-1.vespa.oath.cloud:4443/"
- ],
"nodes": "http://localhost:8080/zone/v2/prod/us-central-1/nodes/v2/node/%3F&recursive=true&application=tenant1.application1.instance1",
"yamasUrl": "http://monitoring-system.test/?environment=prod&region=us-central-1&application=tenant1.application1.instance1",
"version": "(ignore)",
diff --git a/document/src/tests/documentselectparsertest.cpp b/document/src/tests/documentselectparsertest.cpp
index 110153954af..9ac402f56ef 100644
--- a/document/src/tests/documentselectparsertest.cpp
+++ b/document/src/tests/documentselectparsertest.cpp
@@ -18,6 +18,7 @@
#include <vespa/document/select/compare.h>
#include <vespa/document/select/operator.h>
#include <vespa/document/select/parse_utils.h>
+#include <vespa/document/select/parser_limits.h>
#include <vespa/vespalib/util/exceptions.h>
#include <limits>
#include <gtest/gtest.h>
@@ -33,6 +34,8 @@ protected:
std::vector<Document::SP > _doc;
std::vector<DocumentUpdate::SP > _update;
+ ~DocumentSelectParserTest();
+
Document::SP createDoc(
const std::string& doctype, const std::string& id, uint32_t hint,
double hfloat, const std::string& hstr, const std::string& cstr,
@@ -64,6 +67,7 @@ protected:
void testDocumentUpdates4();
};
+DocumentSelectParserTest::~DocumentSelectParserTest() = default;
namespace {
std::shared_ptr<const DocumentTypeRepo> _repo;
@@ -1247,17 +1251,17 @@ TEST_F(DocumentSelectParserTest, testThatSimpleFieldValuesHaveCorrectFieldName)
TEST_F(DocumentSelectParserTest, testThatComplexFieldValuesHaveCorrectFieldNames)
{
- EXPECT_EQ(
- vespalib::string("headerval"),
- parseFieldValue("testdoctype1.headerval{test}")->getRealFieldName());
+ EXPECT_EQ(vespalib::string("headerval"),
+ parseFieldValue("testdoctype1.headerval{test}")->getRealFieldName());
- EXPECT_EQ(
- vespalib::string("headerval"),
- parseFieldValue("testdoctype1.headerval[42]")->getRealFieldName());
+ EXPECT_EQ(vespalib::string("headerval"),
+ parseFieldValue("testdoctype1.headerval[42]")->getRealFieldName());
- EXPECT_EQ(
- vespalib::string("headerval"),
- parseFieldValue("testdoctype1.headerval.meow.meow{test}")->getRealFieldName());
+ EXPECT_EQ(vespalib::string("headerval"),
+ parseFieldValue("testdoctype1.headerval.meow.meow{test}")->getRealFieldName());
+
+ EXPECT_EQ(vespalib::string("headerval"),
+ parseFieldValue("testdoctype1.headerval .meow.meow{test}")->getRealFieldName());
}
namespace {
@@ -1603,4 +1607,64 @@ TEST_F(DocumentSelectParserTest, redundant_glob_wildcards_are_collapsed_into_min
EXPECT_EQ(GlobOperator::convertToRegex("*?*?*?*"), "..*..*."); // Don't try this at home, kids!
}
+TEST_F(DocumentSelectParserTest, recursion_depth_is_bounded_for_field_exprs) {
+ createDocs();
+ std::string expr = "testdoctype1";
+ for (size_t i = 0; i < 50000; ++i) {
+ expr += ".foo";
+ }
+ expr += ".hash() != 0";
+ verifyFailedParse(expr, "ParsingFailedException: expression is too deeply nested (max 1024 levels)");
+}
+
+TEST_F(DocumentSelectParserTest, recursion_depth_is_bounded_for_arithmetic_exprs) {
+ createDocs();
+ std::string expr = "1";
+ for (size_t i = 0; i < 50000; ++i) {
+ expr += "+1";
+ }
+ expr += " != 0";
+ verifyFailedParse(expr, "ParsingFailedException: expression is too deeply nested (max 1024 levels)");
+}
+
+TEST_F(DocumentSelectParserTest, recursion_depth_is_bounded_for_binary_logical_exprs) {
+ createDocs();
+ // Also throw in some comparisons to ensure they carry over the max depth.
+ std::string expr = "1 == 2";
+ std::string cmp_subexpr = "3 != 4";
+ for (size_t i = 0; i < 10000; ++i) {
+ expr += (i % 2 == 0 ? " and " : " or ") + cmp_subexpr;
+ }
+ verifyFailedParse(expr, "ParsingFailedException: expression is too deeply nested (max 1024 levels)");
+}
+
+TEST_F(DocumentSelectParserTest, recursion_depth_is_bounded_for_unary_logical_exprs) {
+ createDocs();
+ std::string expr;
+ for (size_t i = 0; i < 10000; ++i) {
+ expr += "not ";
+ }
+ expr += "true";
+ verifyFailedParse(expr, "ParsingFailedException: expression is too deeply nested (max 1024 levels)");
+}
+
+TEST_F(DocumentSelectParserTest, selection_has_upper_limit_on_input_size) {
+ createDocs();
+ std::string expr = ("testdoctype1.a_biii"
+ + std::string(select::ParserLimits::MaxSelectionByteSize, 'i')
+ + "iiig_identifier");
+ verifyFailedParse(expr, "ParsingFailedException: expression is too large to be "
+ "parsed (max 1048576 bytes, got 1048610)");
+}
+
+TEST_F(DocumentSelectParserTest, lexing_does_not_have_superlinear_time_complexity) {
+ createDocs();
+ std::string expr = ("testdoctype1.hstringval == 'a_biii"
+ + std::string(select::ParserLimits::MaxSelectionByteSize - 100, 'i')
+ + "iiig string'");
+ // If the lexer is not compiled with the appropriate options, this will take a long time.
+ // A really, really long time.
+ PARSE(expr, *_doc[0], False);
+}
+
} // document
diff --git a/document/src/vespa/document/select/CMakeLists.txt b/document/src/vespa/document/select/CMakeLists.txt
index 81e5d86675c..f210e8abdd7 100644
--- a/document/src/vespa/document/select/CMakeLists.txt
+++ b/document/src/vespa/document/select/CMakeLists.txt
@@ -36,6 +36,7 @@ vespa_add_library(document_select OBJECT
parser.cpp
parse_utils.cpp
parsing_failed_exception.cpp
+ parser_limits.cpp
${BISON_DocSelParser_OUTPUTS}
${FLEX_DocSelLexer_OUTPUTS}
AFTER
diff --git a/document/src/vespa/document/select/branch.cpp b/document/src/vespa/document/select/branch.cpp
index b3d5f97ccab..9104e2c5544 100644
--- a/document/src/vespa/document/select/branch.cpp
+++ b/document/src/vespa/document/select/branch.cpp
@@ -8,7 +8,7 @@
namespace document::select {
And::And(std::unique_ptr<Node> left, std::unique_ptr<Node> right, const char* name)
- : Branch(name ? name : "and"),
+ : Branch(name ? name : "and", std::max(left->max_depth(), right->max_depth()) + 1),
_left(std::move(left)),
_right(std::move(right))
{
@@ -54,7 +54,7 @@ And::trace(const Context& context, std::ostream& out) const
}
Or::Or(std::unique_ptr<Node> left, std::unique_ptr<Node> right, const char* name)
- : Branch(name ? name : "or"),
+ : Branch(name ? name : "or", std::max(left->max_depth(), right->max_depth()) + 1),
_left(std::move(left)),
_right(std::move(right))
{
@@ -100,7 +100,7 @@ Or::trace(const Context& context, std::ostream& out) const
}
Not::Not(std::unique_ptr<Node> child, const char* name)
- : Branch(name ? name : "not"),
+ : Branch(name ? name : "not", child->max_depth() + 1),
_child(std::move(child))
{
assert(_child.get());
diff --git a/document/src/vespa/document/select/branch.h b/document/src/vespa/document/select/branch.h
index 8637b41de89..77ed74030b5 100644
--- a/document/src/vespa/document/select/branch.h
+++ b/document/src/vespa/document/select/branch.h
@@ -19,7 +19,8 @@ namespace document::select {
class Branch : public Node
{
public:
- Branch(vespalib::stringref name) : Node(name) {}
+ explicit Branch(vespalib::stringref name) : Node(name) {}
+ Branch(vespalib::stringref name, uint32_t max_depth) : Node(name, max_depth) {}
bool isLeafNode() const override { return false; }
};
@@ -30,7 +31,7 @@ class And : public Branch
std::unique_ptr<Node> _right;
public:
And(std::unique_ptr<Node> left, std::unique_ptr<Node> right,
- const char* name = 0);
+ const char* name = nullptr);
ResultList contains(const Context& context) const override {
return (_left->contains(context) && _right->contains(context));
@@ -53,7 +54,7 @@ class Or : public Branch
std::unique_ptr<Node> _right;
public:
Or(std::unique_ptr<Node> left, std::unique_ptr<Node> right,
- const char* name = 0);
+ const char* name = nullptr);
ResultList contains(const Context& context) const override {
return (_left->contains(context) || _right->contains(context));
@@ -74,7 +75,7 @@ class Not : public Branch
{
std::unique_ptr<Node> _child;
public:
- Not(std::unique_ptr<Node> child, const char* name = 0);
+ Not(std::unique_ptr<Node> child, const char* name = nullptr);
ResultList contains(const Context& context) const override { return !_child->contains(context); }
ResultList trace(const Context&, std::ostream& trace) const override;
diff --git a/document/src/vespa/document/select/compare.cpp b/document/src/vespa/document/select/compare.cpp
index 7db40929a64..caef1bdd250 100644
--- a/document/src/vespa/document/select/compare.cpp
+++ b/document/src/vespa/document/select/compare.cpp
@@ -15,7 +15,7 @@ Compare::Compare(std::unique_ptr<ValueNode> left,
const Operator& op,
std::unique_ptr<ValueNode> right,
const BucketIdFactory& bucketIdFactory)
- : Node("Compare"),
+ : Node("Compare", std::max(left->max_depth(), right->max_depth()) + 1),
_left(std::move(left)),
_right(std::move(right)),
_operator(op),
diff --git a/document/src/vespa/document/select/grammar/lexer.ll b/document/src/vespa/document/select/grammar/lexer.ll
index bd011c8ebf6..1222aac02a2 100644
--- a/document/src/vespa/document/select/grammar/lexer.ll
+++ b/document/src/vespa/document/select/grammar/lexer.ll
@@ -7,6 +7,13 @@
%option noyywrap nounput
%option yyclass="document::select::DocSelScanner"
+ /* Flex lexer must be compiled with batch mode (as opposed to interactive mode)
+ * or parsing of large tokens appears to trigger superlinear time complexity.
+ * Also use full, non-compressed lookup tables for maximum performance.
+ */
+%option batch
+%option full
+
/* Used to track source locations, see https://github.com/bingmann/flex-bison-cpp-example/blob/master/src/scanner.ll */
%{
#define YY_USER_ACTION yyloc->columns(yyleng);
diff --git a/document/src/vespa/document/select/node.h b/document/src/vespa/document/select/node.h
index 48a64ae63f5..9a3b687d81c 100644
--- a/document/src/vespa/document/select/node.h
+++ b/document/src/vespa/document/select/node.h
@@ -12,6 +12,7 @@
#include "resultlist.h"
#include "context.h"
+#include "parser_limits.h"
namespace document::select {
@@ -21,19 +22,33 @@ class Node : public Printable
{
protected:
vespalib::string _name;
+ uint32_t _max_depth;
bool _parentheses; // Set to true if parentheses was used around this part
// Set such that we can recreate original query in print.
public:
typedef std::unique_ptr<Node> UP;
typedef std::shared_ptr<Node> SP;
- Node(vespalib::stringref name) : _name(name), _parentheses(false) {}
- ~Node() override {}
+ Node(vespalib::stringref name, uint32_t max_depth)
+ : _name(name), _max_depth(max_depth), _parentheses(false)
+ {
+ throw_parse_error_if_max_depth_exceeded();
+ }
- void setParentheses() { _parentheses = true; }
+ explicit Node(vespalib::stringref name)
+ : _name(name), _max_depth(1), _parentheses(false)
+ {}
+ ~Node() override = default;
- void clearParentheses() { _parentheses = false; }
+ // Depth is explicitly tracked to limit recursion to a sane maximum when building and
+ // processing ASTs, as the Bison framework does not have anything useful for us there.
+ // The AST is built from the leaves up towards the root, so we can cheaply track depth
+ // of subtrees in O(1) time per node by computing a node's own depth based on immediate
+ // children at node construction time.
+ [[nodiscard]] uint32_t max_depth() const noexcept { return _max_depth; }
+ void setParentheses() { _parentheses = true; }
+ void clearParentheses() { _parentheses = false; }
bool hadParentheses() const { return _parentheses; }
virtual ResultList contains(const Context&) const = 0;
@@ -43,6 +58,12 @@ public:
virtual Node::UP clone() const = 0;
protected:
+ void throw_parse_error_if_max_depth_exceeded() const {
+ if (_max_depth > ParserLimits::MaxRecursionDepth) {
+ throw_max_depth_exceeded_exception();
+ }
+ }
+
Node::UP wrapParens(Node* node) const {
Node::UP ret(node);
if (_parentheses) {
diff --git a/document/src/vespa/document/select/parser.cpp b/document/src/vespa/document/select/parser.cpp
index 9f015409011..fadb46e5aa3 100644
--- a/document/src/vespa/document/select/parser.cpp
+++ b/document/src/vespa/document/select/parser.cpp
@@ -1,5 +1,6 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "parser.h"
+#include "parser_limits.h"
#include "scanner.h"
#include <vespa/document/base/exceptions.h>
#include <vespa/document/util/stringutil.h>
@@ -8,7 +9,20 @@
namespace document::select {
+namespace {
+
+void verify_expression_not_too_large(const std::string& expr) {
+ if (expr.size() > ParserLimits::MaxSelectionByteSize) {
+ throw ParsingFailedException(vespalib::make_string(
+ "expression is too large to be parsed (max %zu bytes, got %zu)",
+ ParserLimits::MaxSelectionByteSize, expr.size()));
+ }
+}
+
+}
+
std::unique_ptr<Node> Parser::parse(const std::string& str) const {
+ verify_expression_not_too_large(str);
try {
std::istringstream ss(str);
DocSelScanner scanner(&ss);
diff --git a/document/src/vespa/document/select/parser_limits.cpp b/document/src/vespa/document/select/parser_limits.cpp
new file mode 100644
index 00000000000..13e494b376f
--- /dev/null
+++ b/document/src/vespa/document/select/parser_limits.cpp
@@ -0,0 +1,13 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "parser_limits.h"
+#include "parsing_failed_exception.h"
+#include <vespa/vespalib/util/stringfmt.h>
+
+namespace document::select {
+
+void throw_max_depth_exceeded_exception() {
+ throw ParsingFailedException(vespalib::make_string(
+ "expression is too deeply nested (max %u levels)", ParserLimits::MaxRecursionDepth));
+}
+
+}
diff --git a/document/src/vespa/document/select/parser_limits.h b/document/src/vespa/document/select/parser_limits.h
new file mode 100644
index 00000000000..24c0a165611
--- /dev/null
+++ b/document/src/vespa/document/select/parser_limits.h
@@ -0,0 +1,19 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <cstdint>
+#include <cstddef>
+
+namespace document::select {
+
+// Any resource constraints set for parsing document selection expressions
+struct ParserLimits {
+ // Max depth allowed for nodes in the AST tree.
+ constexpr static uint32_t MaxRecursionDepth = 1024;
+ // Max size of entire input document selection string, in bytes.
+ constexpr static size_t MaxSelectionByteSize = 1024*1024;
+};
+
+void __attribute__((noinline)) throw_max_depth_exceeded_exception();
+
+}
diff --git a/document/src/vespa/document/select/valuenode.h b/document/src/vespa/document/select/valuenode.h
index 04ed8178b40..8dd535a736a 100644
--- a/document/src/vespa/document/select/valuenode.h
+++ b/document/src/vespa/document/select/valuenode.h
@@ -5,12 +5,13 @@
*
* @brief Node representing a value in the tree
*
- * @author H�kon Humberset
+ * @author HÃ¥kon Humberset
*/
#pragma once
#include "value.h"
+#include "parser_limits.h"
namespace document::select {
@@ -22,8 +23,19 @@ class ValueNode : public Printable
public:
using UP = std::unique_ptr<ValueNode>;
- ValueNode() : _parentheses(false) {}
- virtual ~ValueNode() {}
+ explicit ValueNode(uint32_t max_depth)
+ : _max_depth(max_depth), _parentheses(false)
+ {
+ throw_parse_error_if_max_depth_exceeded();
+ }
+ ValueNode() : _max_depth(1), _parentheses(false) {}
+ ~ValueNode() override = default;
+
+ // See comments for same function in node.h for a description on how and why
+ // we track this. Since Node and ValueNode live in completely separate type
+ // hierarchies, this particular bit of code duplication is unfortunate but
+ // incurs the least cognitive overhead.
+ [[nodiscard]] uint32_t max_depth() const noexcept { return _max_depth; }
void setParentheses() { _parentheses = true; }
void clearParentheses() { _parentheses = false; }
@@ -34,9 +46,17 @@ public:
virtual ValueNode::UP clone() const = 0;
virtual std::unique_ptr<Value> traceValue(const Context &context, std::ostream &out) const;
private:
+ uint32_t _max_depth;
bool _parentheses; // Set to true if parentheses was used around this part
// Set such that we can recreate original query in print.
+
protected:
+ void throw_parse_error_if_max_depth_exceeded() const {
+ if (_max_depth > ParserLimits::MaxRecursionDepth) {
+ throw_max_depth_exceeded_exception();
+ }
+ }
+
ValueNode::UP wrapParens(ValueNode* node) const {
ValueNode::UP ret(node);
if (_parentheses) {
diff --git a/document/src/vespa/document/select/valuenodes.cpp b/document/src/vespa/document/select/valuenodes.cpp
index 95cf2f4e7e5..026623cf83c 100644
--- a/document/src/vespa/document/select/valuenodes.cpp
+++ b/document/src/vespa/document/select/valuenodes.cpp
@@ -21,10 +21,6 @@ LOG_SETUP(".document.select.valuenode");
namespace document::select {
namespace {
- static const std::regex FIELD_NAME_REGEX("^([_A-Za-z][_A-Za-z0-9]*).*");
-}
-
-namespace {
bool documentTypeEqualsName(const DocumentType& type, vespalib::stringref name)
{
if (type.getName() == name) return true;
@@ -40,7 +36,7 @@ namespace {
InvalidValueNode::InvalidValueNode(vespalib::stringref name)
: _name(name)
-{ }
+{}
void
@@ -194,15 +190,33 @@ FieldValueNode::FieldValueNode(const vespalib::string& doctype,
FieldValueNode::~FieldValueNode() = default;
-vespalib::string
-FieldValueNode::extractFieldName(const std::string & fieldExpression) {
- std::smatch match;
+namespace {
- if (std::regex_match(fieldExpression, match, FIELD_NAME_REGEX) && match[1].matched) {
- return vespalib::string(match[1].first, match[1].second);
+size_t first_ident_length_or_npos(const vespalib::string& expr) {
+ for (size_t i = 0; i < expr.size(); ++i) {
+ switch (expr[i]) {
+ case '.':
+ case '{':
+ case '[':
+ case ' ':
+ case '\n':
+ case '\t':
+ return i;
+ default:
+ continue;
+ }
}
+ return vespalib::string::npos;
+}
- throw ParsingFailedException("Fatal: could not extract field name from field expression '" + fieldExpression + "'");
+}
+
+// TODO remove this pile of fun in favor of actually parsed AST nodes...!
+vespalib::string
+FieldValueNode::extractFieldName(const vespalib::string & fieldExpression) {
+ // When we get here the actual contents of the field expression shall already
+ // have been structurally and syntactically verified by the parser.
+ return fieldExpression.substr(0, first_ident_length_or_npos(fieldExpression));
}
namespace {
@@ -844,7 +858,8 @@ FunctionValueNode::print(std::ostream& out, bool verbose,
ArithmeticValueNode::ArithmeticValueNode(
std::unique_ptr<ValueNode> left, vespalib::stringref op,
std::unique_ptr<ValueNode> right)
- : _operator(),
+ : ValueNode(std::max(left->max_depth(), right->max_depth()) + 1),
+ _operator(),
_left(std::move(left)),
_right(std::move(right))
{
diff --git a/document/src/vespa/document/select/valuenodes.h b/document/src/vespa/document/select/valuenodes.h
index 8009542c364..a7d5fa15f37 100644
--- a/document/src/vespa/document/select/valuenodes.h
+++ b/document/src/vespa/document/select/valuenodes.h
@@ -160,7 +160,7 @@ public:
FieldValueNode & operator = (const FieldValueNode &) = delete;
FieldValueNode(FieldValueNode &&) = default;
FieldValueNode & operator = (FieldValueNode &&) = default;
- ~FieldValueNode();
+ ~FieldValueNode() override;
const vespalib::string& getDocType() const { return _doctype; }
const vespalib::string& getRealFieldName() const { return _fieldName; }
@@ -175,7 +175,7 @@ public:
return wrapParens(new FieldValueNode(_doctype, _fieldExpression));
}
- static vespalib::string extractFieldName(const std::string & fieldExpression);
+ static vespalib::string extractFieldName(const vespalib::string & fieldExpression);
private:
@@ -192,13 +192,15 @@ class FieldExprNode final : public ValueNode {
public:
explicit FieldExprNode(const vespalib::string& doctype) : _left_expr(), _right_expr(doctype) {}
FieldExprNode(std::unique_ptr<FieldExprNode> left_expr, vespalib::stringref right_expr)
- : _left_expr(std::move(left_expr)), _right_expr(right_expr)
+ : ValueNode(left_expr->max_depth() + 1),
+ _left_expr(std::move(left_expr)),
+ _right_expr(right_expr)
{}
FieldExprNode(const FieldExprNode &) = delete;
FieldExprNode & operator = (const FieldExprNode &) = delete;
FieldExprNode(FieldExprNode &&) = default;
FieldExprNode & operator = (FieldExprNode &&) = default;
- ~FieldExprNode();
+ ~FieldExprNode() override;
std::unique_ptr<FieldValueNode> convert_to_field_value() const;
std::unique_ptr<FunctionValueNode> convert_to_function_call() const;
diff --git a/fbench/src/fbench/fbench.cpp b/fbench/src/fbench/fbench.cpp
index c2996eebb5c..88d27a33bd7 100644
--- a/fbench/src/fbench/fbench.cpp
+++ b/fbench/src/fbench/fbench.cpp
@@ -200,6 +200,28 @@ FBench::StopClients()
printf("\nClients Joined.\n");
}
+namespace {
+
+const char *
+approx(double latency, const ClientStatus & status) {
+ return (latency > (status._timetable.size() / status._timetableResolution - 1))
+ ? "ms (approx)"
+ : "ms";
+}
+
+std::string
+fmtPercentile(double percentile) {
+ char buf[32];
+ if (percentile <= 99.0) {
+ snprintf(buf, sizeof(buf), "%2d ", int(percentile));
+ } else {
+ snprintf(buf, sizeof(buf), "%2.1f", percentile);
+ }
+ return buf;
+}
+
+}
+
void
FBench::PrintSummary()
{
@@ -227,13 +249,6 @@ FBench::PrintSummary()
actualRate = (status._realTime > 0) ?
realNumClients * 1000.0 * status._requestCnt / status._realTime : 0;
- double p25 = status.GetPercentile(25);
- double p50 = status.GetPercentile(50);
- double p75 = status.GetPercentile(75);
- double p90 = status.GetPercentile(90);
- double p95 = status.GetPercentile(95);
- double p99 = status.GetPercentile(99);
-
if (_keepAlive) {
printf("*** HTTP keep-alive statistics ***\n");
printf("connection reuse count -- %" PRIu64 "\n", status._reuseCnt);
@@ -250,24 +265,13 @@ FBench::PrintSummary()
printf("minimum response time: %8.2f ms\n", status._minTime);
printf("maximum response time: %8.2f ms\n", status._maxTime);
printf("average response time: %8.2f ms\n", status.GetAverage());
- if (p25 > status._timetable.size() / status._timetableResolution - 1)
- printf("25 percentile: %8.2f ms (approx)\n", p25);
- else printf("25 percentile: %8.2f ms\n", p25);
- if (p50 > status._timetable.size() / status._timetableResolution - 1)
- printf("50 percentile: %8.2f ms (approx)\n", p50);
- else printf("50 percentile: %8.2f ms\n", p50);
- if (p75 > status._timetable.size() / status._timetableResolution - 1)
- printf("75 percentile: %8.2f ms (approx)\n", p75);
- else printf("75 percentile: %8.2f ms\n", p75);
- if (p90 > status._timetable.size() / status._timetableResolution - 1)
- printf("90 percentile: %8.2f ms (approx)\n", p90);
- else printf("90 percentile: %8.2f ms\n", p90);
- if (p95 > status._timetable.size() / status._timetableResolution - 1)
- printf("95 percentile: %8.2f ms (approx)\n", p95);
- else printf("95 percentile: %8.2f ms\n", p95);
- if (p99 > status._timetable.size() / status._timetableResolution - 1)
- printf("99 percentile: %8.2f ms (approx)\n", p99);
- else printf("99 percentile: %8.2f ms\n", p99);
+
+ for (double percentile : {25.0, 50.0, 75.0, 90.0, 95.0, 98.0, 99.0, 99.5, 99.6, 99.7, 99.8, 99.9}) {
+ double latency = status.GetPercentile(percentile);
+ printf("%s percentile: %8.2f %s\n",
+ fmtPercentile(percentile).c_str(), latency, approx(latency, status));
+ }
+
printf("actual query rate: %8.2f Q/s\n", actualRate);
printf("utilization: %8.2f %%\n",
(maxRate > 0) ? 100 * (actualRate / maxRate) : 0);
diff --git a/fbench/src/util/clientstatus.cpp b/fbench/src/util/clientstatus.cpp
index 9e03068e13c..6ef188da201 100644
--- a/fbench/src/util/clientstatus.cpp
+++ b/fbench/src/util/clientstatus.cpp
@@ -1,6 +1,6 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "clientstatus.h"
-#include <string.h>
+#include <cstring>
#include <cmath>
@@ -24,9 +24,7 @@ ClientStatus::ClientStatus()
{
}
-ClientStatus::~ClientStatus()
-{
-}
+ClientStatus::~ClientStatus() = default;
void
ClientStatus::SetError(const char *errorMsg)
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java
index 24b3fcac3e3..5829ab37b77 100644
--- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java
+++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java
@@ -1,7 +1,6 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.filedistribution;
-import com.google.common.util.concurrent.SettableFuture;
import com.yahoo.config.FileReference;
import com.yahoo.log.LogLevel;
import com.yahoo.vespa.config.ConnectionPool;
@@ -13,6 +12,7 @@ import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -57,21 +57,14 @@ public class FileDownloader {
}
}
- private Future<Optional<File>> getFutureFile(FileReferenceDownload fileReferenceDownload) {
+ Future<Optional<File>> getFutureFile(FileReferenceDownload fileReferenceDownload) {
FileReference fileReference = fileReferenceDownload.fileReference();
Objects.requireNonNull(fileReference, "file reference cannot be null");
- log.log(LogLevel.DEBUG, () -> "Checking if file reference '" + fileReference.value() + "' exists in '" +
- downloadDirectory.getAbsolutePath() + "' ");
- Optional<File> file = getFileFromFileSystem(fileReference, downloadDirectory);
- if (file.isPresent()) {
- SettableFuture<Optional<File>> future = SettableFuture.create();
- future.set(file);
- return future;
- } else {
- log.log(LogLevel.DEBUG, () -> "File reference '" + fileReference.value() + "' not found in " +
- downloadDirectory.getAbsolutePath() + ", starting download");
- return download(fileReferenceDownload);
- }
+
+ Optional<File> file = getFileFromFileSystem(fileReference);
+ return (file.isPresent())
+ ? CompletableFuture.completedFuture(file)
+ : download(fileReferenceDownload);
}
double downloadStatus(FileReference fileReference) {
@@ -86,9 +79,10 @@ public class FileDownloader {
return downloadDirectory;
}
- private Optional<File> getFileFromFileSystem(FileReference fileReference, File directory) {
- File[] files = new File(directory, fileReference.value()).listFiles();
- if (directory.exists() && directory.isDirectory() && files != null && files.length > 0) {
+ // Files are moved atomically, so if file reference exists and is accessible we can use it
+ private Optional<File> getFileFromFileSystem(FileReference fileReference) {
+ File[] files = new File(downloadDirectory, fileReference.value()).listFiles();
+ if (downloadDirectory.exists() && downloadDirectory.isDirectory() && files != null && files.length > 0) {
File file = files[0];
if (!file.exists()) {
throw new RuntimeException("File reference '" + fileReference.value() + "' does not exist");
@@ -105,33 +99,25 @@ public class FileDownloader {
private boolean alreadyDownloaded(FileReference fileReference) {
try {
- return (getFileFromFileSystem(fileReference, downloadDirectory).isPresent());
+ return (getFileFromFileSystem(fileReference).isPresent());
} catch (RuntimeException e) {
return false;
}
}
- public boolean downloadIfNeeded(FileReferenceDownload fileReferenceDownload) {
- if (!alreadyDownloaded(fileReferenceDownload.fileReference())) {
- download(fileReferenceDownload);
- return true;
- } else {
- log.log(LogLevel.DEBUG, () -> "Download not needed, " + fileReferenceDownload.fileReference() + " already downloaded" );
- return false;
- }
+ /** Start a download, don't wait for result */
+ public void downloadIfNeeded(FileReferenceDownload fileReferenceDownload) {
+ FileReference fileReference = fileReferenceDownload.fileReference();
+ if (alreadyDownloaded(fileReference)) return;
+
+ download(fileReferenceDownload);
}
+ /** Download, the future returned will be complete()d by receiving method in {@link FileReceiver} */
private synchronized Future<Optional<File>> download(FileReferenceDownload fileReferenceDownload) {
FileReference fileReference = fileReferenceDownload.fileReference();
- Future<Optional<File>> inProgress = fileReferenceDownloader.addDownloadListener(fileReference, () -> getFile(fileReferenceDownload));
- if (inProgress != null) {
- log.log(LogLevel.DEBUG, () -> "Already downloading '" + fileReference.value() + "'");
- return inProgress;
- }
-
- Future<Optional<File>> future = queueForDownload(fileReferenceDownload);
- log.log(LogLevel.DEBUG, () -> "Queued '" + fileReference.value() + "' for download with timeout " + timeout);
- return future;
+ FileReferenceDownload inProgress = fileReferenceDownloader.getDownloadInProgress(fileReference);
+ return (inProgress == null) ? queueForDownload(fileReferenceDownload) : inProgress.future();
}
private Future<Optional<File>> queueForDownload(FileReferenceDownload fileReferenceDownload) {
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java
index 4a9fadf1a61..fe501484faf 100644
--- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java
+++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java
@@ -2,16 +2,16 @@
package com.yahoo.vespa.filedistribution;
-import com.google.common.util.concurrent.SettableFuture;
import com.yahoo.config.FileReference;
import java.io.File;
import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
public class FileReferenceDownload {
private final FileReference fileReference;
- private final SettableFuture<Optional<File>> future;
+ private final CompletableFuture<Optional<File>> future;
// If a config server wants to download from another config server (because it does not have the
// file itself) we set this flag to true to avoid an eternal loop
private final boolean downloadFromOtherSourceIfNotFound;
@@ -22,7 +22,7 @@ public class FileReferenceDownload {
public FileReferenceDownload(FileReference fileReference, boolean downloadFromOtherSourceIfNotFound) {
this.fileReference = fileReference;
- this.future = SettableFuture.create();
+ this.future = new CompletableFuture<>();
this.downloadFromOtherSourceIfNotFound = downloadFromOtherSourceIfNotFound;
}
@@ -30,7 +30,7 @@ public class FileReferenceDownload {
return fileReference;
}
- SettableFuture<Optional<File>> future() {
+ CompletableFuture<Optional<File>> future() {
return future;
}
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java
index 66b86866c3e..c4fe257c991 100644
--- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java
+++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java
@@ -1,8 +1,6 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.filedistribution;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.util.concurrent.ListenableFuture;
import com.yahoo.concurrent.DaemonThreadFactory;
import com.yahoo.config.FileReference;
import com.yahoo.jrt.Int32Value;
@@ -18,6 +16,7 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -37,9 +36,12 @@ public class FileReferenceDownloader {
private final static Duration rpcTimeout = Duration.ofSeconds(10);
private final ExecutorService downloadExecutor =
- Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), new DaemonThreadFactory("filereference downloader"));
+ Executors.newFixedThreadPool(Math.max(8, Runtime.getRuntime().availableProcessors()),
+ new DaemonThreadFactory("filereference downloader"));
private final ConnectionPool connectionPool;
+ /* Ongoing downloads */
private final Map<FileReference, FileReferenceDownload> downloads = new LinkedHashMap<>();
+ /* Status for ongoing and finished downloads */
private final Map<FileReference, Double> downloadStatus = new HashMap<>(); // between 0 and 1
private final Duration downloadTimeout;
private final Duration sleepBetweenRetries;
@@ -70,7 +72,7 @@ public class FileReferenceDownloader {
}
if ( !downloadStarted) {
- fileReferenceDownload.future().setException(new RuntimeException("Failed getting file reference '" + fileReference.value() + "'"));
+ fileReferenceDownload.future().completeExceptionally(new RuntimeException("Failed getting file reference '" + fileReference.value() + "'"));
synchronized (downloads) {
downloads.remove(fileReference);
}
@@ -93,7 +95,7 @@ public class FileReferenceDownloader {
if (download != null) {
downloadStatus.put(fileReference, 1.0);
downloads.remove(fileReference);
- download.future().set(Optional.of(file));
+ download.future().complete(Optional.of(file));
} else {
log.log(LogLevel.DEBUG, () -> "Received '" + fileReference + "', which was not requested. Can be ignored if happening during upgrades/restarts");
}
@@ -140,15 +142,10 @@ public class FileReferenceDownloader {
}
}
- ListenableFuture<Optional<File>> addDownloadListener(FileReference fileReference, Runnable runnable) {
+ FileReferenceDownload getDownloadInProgress(FileReference fileReference) {
synchronized (downloads) {
- FileReferenceDownload download = downloads.get(fileReference);
- if (download != null) {
- download.future().addListener(runnable, downloadExecutor);
- return download.future();
- }
+ return downloads.get(fileReference);
}
- return null;
}
private void execute(Request request, Connection connection) {
@@ -186,7 +183,7 @@ public class FileReferenceDownloader {
Map<FileReference, Double> downloadStatus() {
synchronized (downloads) {
- return ImmutableMap.copyOf(downloadStatus);
+ return Map.copyOf(downloadStatus);
}
}
diff --git a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java
index d1d12cb07b7..52d8507acea 100644
--- a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java
+++ b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java
@@ -27,6 +27,10 @@ import java.nio.file.Path;
import java.time.Duration;
import java.util.Arrays;
import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import static com.yahoo.jrt.ErrorCode.CONNECTION;
import static org.junit.Assert.assertEquals;
@@ -35,6 +39,7 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class FileDownloaderTest {
+ private static final Duration sleepBetweenRetries = Duration.ofMillis(10);
private MockConnection connection;
private FileDownloader fileDownloader;
@@ -47,7 +52,7 @@ public class FileDownloaderTest {
downloadDir = Files.createTempDirectory("filedistribution").toFile();
tempDir = Files.createTempDirectory("download").toFile();
connection = new MockConnection();
- fileDownloader = new FileDownloader(connection, downloadDir, tempDir, Duration.ofSeconds(2), Duration.ofMillis(100));
+ fileDownloader = new FileDownloader(connection, downloadDir, tempDir, Duration.ofSeconds(1), sleepBetweenRetries);
} catch (IOException e) {
e.printStackTrace();
fail(e.getMessage());
@@ -108,7 +113,7 @@ public class FileDownloaderTest {
// Receives fileReference, should return and make it available to caller
String filename = "abc.jar";
- receiveFile(fileReference, filename, FileReferenceData.Type.file, "some other content");
+ receiveFile(fileDownloader, fileReference, filename, FileReferenceData.Type.file, "some other content");
Optional<File> downloadedFile = fileDownloader.getFile(fileReference);
assertTrue(downloadedFile.isPresent());
@@ -142,7 +147,7 @@ public class FileDownloaderTest {
File tarFile = CompressedFileReference.compress(tempPath.toFile(), Arrays.asList(fooFile, barFile), new File(tempPath.toFile(), filename));
byte[] tarredContent = IOUtils.readFileBytes(tarFile);
- receiveFile(fileReference, filename, FileReferenceData.Type.compressed, tarredContent);
+ receiveFile(fileDownloader, fileReference, filename, FileReferenceData.Type.compressed, tarredContent);
Optional<File> downloadedFile = fileDownloader.getFile(fileReference);
assertTrue(downloadedFile.isPresent());
@@ -158,7 +163,7 @@ public class FileDownloaderTest {
@Test
public void getFileWhenConnectionError() throws IOException {
- fileDownloader = new FileDownloader(connection, downloadDir, tempDir, Duration.ofSeconds(3), Duration.ofMillis(100));
+ fileDownloader = new FileDownloader(connection, downloadDir, tempDir, Duration.ofSeconds(1), sleepBetweenRetries);
File downloadDir = fileDownloader.downloadDirectory();
int timesToFail = 2;
@@ -175,7 +180,7 @@ public class FileDownloaderTest {
// Receives fileReference, should return and make it available to caller
String filename = "abc.jar";
- receiveFile(fileReference, filename, FileReferenceData.Type.file, "some other content");
+ receiveFile(fileDownloader, fileReference, filename, FileReferenceData.Type.file, "some other content");
Optional<File> downloadedFile = fileDownloader.getFile(fileReference);
assertTrue(downloadedFile.isPresent());
File downloadedFileFullPath = new File(fileReferenceFullPath, filename);
@@ -189,26 +194,68 @@ public class FileDownloaderTest {
}
@Test
+ public void getFileWhenDownloadInProgress() throws IOException, ExecutionException, InterruptedException {
+ ExecutorService executor = Executors.newFixedThreadPool(2);
+ String filename = "abc.jar";
+ fileDownloader = new FileDownloader(connection, downloadDir, tempDir, Duration.ofSeconds(3), sleepBetweenRetries);
+ File downloadDir = fileDownloader.downloadDirectory();
+
+ // Delay response so that we can make a second request while downloading the file from the first request
+ connection.setResponseHandler(new MockConnection.WaitResponseHandler(Duration.ofSeconds(1)));
+
+ FileReference fileReference = new FileReference("fileReference");
+ File fileReferenceFullPath = fileReferenceFullPath(downloadDir, fileReference);
+ FileReferenceDownload fileReferenceDownload = new FileReferenceDownload(fileReference);
+
+ Future<Future<Optional<File>>> future1 = executor.submit(() -> fileDownloader.getFutureFile(fileReferenceDownload));
+ do {
+ Thread.sleep(10);
+ } while (! fileDownloader.fileReferenceDownloader().isDownloading(fileReference));
+ assertTrue(fileDownloader.fileReferenceDownloader().isDownloading(fileReference));
+
+ // Request file while download is in progress
+ Future<Future<Optional<File>>> future2 = executor.submit(() -> fileDownloader.getFutureFile(fileReferenceDownload));
+
+ // Receive file, will complete downloading and futures
+ receiveFile(fileDownloader, fileReference, filename, FileReferenceData.Type.file, "some other content");
+
+ // Check that we got file correctly with first request
+ Optional<File> downloadedFile = future1.get().get();
+ assertTrue(downloadedFile.isPresent());
+ File downloadedFileFullPath = new File(fileReferenceFullPath, filename);
+ assertEquals(downloadedFileFullPath.getAbsolutePath(), downloadedFile.get().getAbsolutePath());
+ assertEquals("some other content", IOUtils.readFile(downloadedFile.get()));
+
+ // Check that request done while downloading works
+ downloadedFile = future2.get().get();
+ assertTrue(downloadedFile.isPresent());
+ executor.shutdownNow();
+ }
+
+ @Test
public void setFilesToDownload() throws IOException {
Duration timeout = Duration.ofMillis(200);
- Duration sleepBetweenRetries = Duration.ofMillis(200);
MockConnection connectionPool = new MockConnection();
connectionPool.setResponseHandler(new MockConnection.WaitResponseHandler(timeout.plus(Duration.ofMillis(1000))));
FileDownloader fileDownloader = new FileDownloader(connectionPool, downloadDir, tempDir, timeout, sleepBetweenRetries);
FileReference foo = new FileReference("foo");
// Should download since we do not have the file on disk
- assertTrue(fileDownloader.downloadIfNeeded(new FileReferenceDownload(foo)));
+ fileDownloader.downloadIfNeeded(new FileReferenceDownload(foo));
+ assertTrue(fileDownloader.fileReferenceDownloader().isDownloading(foo));
+ assertFalse(fileDownloader.getFile(foo).isPresent());
// Receive files to simulate download
receiveFile();
// Should not download, since file has already been downloaded
- assertFalse(fileDownloader.downloadIfNeeded(new FileReferenceDownload(foo)));
+ fileDownloader.downloadIfNeeded(new FileReferenceDownload(foo));
+ // and file should be available
+ assertTrue(fileDownloader.getFile(foo).isPresent());
}
@Test
public void receiveFile() throws IOException {
FileReference foo = new FileReference("foo");
String filename = "foo.jar";
- receiveFile(foo, filename, FileReferenceData.Type.file, "content");
+ receiveFile(fileDownloader, foo, filename, FileReferenceData.Type.file, "content");
File downloadedFile = new File(fileReferenceFullPath(downloadDir, foo), filename);
assertEquals("content", IOUtils.readFile(downloadedFile));
}
@@ -229,16 +276,19 @@ public class FileDownloaderTest {
assertEquals(expectedDownloadStatus, downloadStatus, 0.0001);
}
- private void receiveFile(FileReference fileReference, String filename, FileReferenceData.Type type, String content) {
- receiveFile(fileReference, filename, type, Utf8.toBytes(content));
+ private void receiveFile(FileDownloader fileDownloader, FileReference fileReference, String filename,
+ FileReferenceData.Type type, String content) {
+ receiveFile(fileDownloader, fileReference, filename, type, Utf8.toBytes(content));
}
- private void receiveFile(FileReference fileReference, String filename, FileReferenceData.Type type, byte[] content) {
+ private void receiveFile(FileDownloader fileDownloader, FileReference fileReference, String filename,
+ FileReferenceData.Type type, byte[] content) {
XXHash64 hasher = XXHashFactory.fastestInstance().hash64();
FileReceiver.Session session =
new FileReceiver.Session(downloadDir, tempDir, 1, fileReference, type, filename, content.length);
session.addPart(0, content);
- session.close(hasher.hash(ByteBuffer.wrap(content), 0));
+ File file = session.close(hasher.hash(ByteBuffer.wrap(content), 0));
+ fileDownloader.fileReferenceDownloader().completedDownloading(fileReference, file);
}
private static class MockConnection implements ConnectionPool, com.yahoo.vespa.config.Connection {
diff --git a/flags/src/main/java/com/yahoo/vespa/flags/Flags.java b/flags/src/main/java/com/yahoo/vespa/flags/Flags.java
index e81bccee593..f392b597e3b 100644
--- a/flags/src/main/java/com/yahoo/vespa/flags/Flags.java
+++ b/flags/src/main/java/com/yahoo/vespa/flags/Flags.java
@@ -109,12 +109,6 @@ public class Flags {
"Takes effect on the next deployment of the application",
APPLICATION_ID);
- public static final UnboundStringFlag TLS_INSECURE_MIXED_MODE = defineStringFlag(
- "tls-insecure-mixed-mode", "tls_client_mixed_server",
- "TLS insecure mixed mode. Allowed values: ['plaintext_client_mixed_server', 'tls_client_mixed_server', 'tls_client_tls_server']",
- "Takes effect on restart of Docker container",
- NODE_TYPE, APPLICATION_ID, HOSTNAME);
-
public static final UnboundStringFlag TLS_INSECURE_AUTHORIZATION_MODE = defineStringFlag(
"tls-insecure-authorization-mode", "log_only",
"TLS insecure authorization mode. Allowed values: ['disable', 'log_only', 'enforce']",
@@ -200,12 +194,6 @@ public class Flags {
"Takes effect on next node agent tick (but does not clear existing failure reports)",
HOSTNAME);
- public static final UnboundBooleanFlag GENERATE_L4_ROUTING_CONFIG = defineFeatureFlag(
- "generate-l4-routing-config", false,
- "Whether routing nodes should generate L4 routing config",
- "Takes effect immediately",
- ZONE_ID, HOSTNAME);
-
public static final UnboundBooleanFlag USE_REFRESHED_ENDPOINT_CERTIFICATE = defineFeatureFlag(
"use-refreshed-endpoint-certificate", false,
"Whether an application should start using a newer certificate/key pair if available",
@@ -258,13 +246,13 @@ public class Flags {
APPLICATION_ID);
public static final UnboundBooleanFlag DISABLE_ROUTING_GENERATOR = defineFeatureFlag(
- "disable-routing-generator", false,
+ "disable-routing-generator", true,
"Whether the controller should stop asking the routing layer for endpoints",
"Takes effect immediately",
APPLICATION_ID);
public static final UnboundBooleanFlag DEDICATED_NODES_WHEN_UNSPECIFIED = defineFeatureFlag(
- "dedicated-nodes-when-unspecified", false,
+ "dedicated-nodes-when-unspecified", true,
"Whether config-server should allocate dedicated container nodes when <nodes/> is not specified in services.xml",
"Takes effect on redeploy",
APPLICATION_ID);
diff --git a/jdisc_http_service/abi-spec.json b/jdisc_http_service/abi-spec.json
index bb6285ab94f..c5a0a676a70 100644
--- a/jdisc_http_service/abi-spec.json
+++ b/jdisc_http_service/abi-spec.json
@@ -42,6 +42,9 @@
"public com.yahoo.jdisc.http.ConnectorConfig$Builder tlsClientAuthEnforcer(com.yahoo.jdisc.http.ConnectorConfig$TlsClientAuthEnforcer$Builder)",
"public com.yahoo.jdisc.http.ConnectorConfig$Builder healthCheckProxy(com.yahoo.jdisc.http.ConnectorConfig$HealthCheckProxy$Builder)",
"public com.yahoo.jdisc.http.ConnectorConfig$Builder proxyProtocol(com.yahoo.jdisc.http.ConnectorConfig$ProxyProtocol$Builder)",
+ "public com.yahoo.jdisc.http.ConnectorConfig$Builder secureRedirect(com.yahoo.jdisc.http.ConnectorConfig$SecureRedirect$Builder)",
+ "public com.yahoo.jdisc.http.ConnectorConfig$Builder maxRequestsPerConnection(int)",
+ "public com.yahoo.jdisc.http.ConnectorConfig$Builder maxConnectionLife(double)",
"public final boolean dispatchGetConfig(com.yahoo.config.ConfigInstance$Producer)",
"public final java.lang.String getDefMd5()",
"public final java.lang.String getDefName()",
@@ -53,7 +56,8 @@
"public com.yahoo.jdisc.http.ConnectorConfig$Ssl$Builder ssl",
"public com.yahoo.jdisc.http.ConnectorConfig$TlsClientAuthEnforcer$Builder tlsClientAuthEnforcer",
"public com.yahoo.jdisc.http.ConnectorConfig$HealthCheckProxy$Builder healthCheckProxy",
- "public com.yahoo.jdisc.http.ConnectorConfig$ProxyProtocol$Builder proxyProtocol"
+ "public com.yahoo.jdisc.http.ConnectorConfig$ProxyProtocol$Builder proxyProtocol",
+ "public com.yahoo.jdisc.http.ConnectorConfig$SecureRedirect$Builder secureRedirect"
]
},
"com.yahoo.jdisc.http.ConnectorConfig$HealthCheckProxy$Builder": {
@@ -133,6 +137,37 @@
],
"fields": []
},
+ "com.yahoo.jdisc.http.ConnectorConfig$SecureRedirect$Builder": {
+ "superClass": "java.lang.Object",
+ "interfaces": [
+ "com.yahoo.config.ConfigBuilder"
+ ],
+ "attributes": [
+ "public"
+ ],
+ "methods": [
+ "public void <init>()",
+ "public void <init>(com.yahoo.jdisc.http.ConnectorConfig$SecureRedirect)",
+ "public com.yahoo.jdisc.http.ConnectorConfig$SecureRedirect$Builder enabled(boolean)",
+ "public com.yahoo.jdisc.http.ConnectorConfig$SecureRedirect$Builder port(int)",
+ "public com.yahoo.jdisc.http.ConnectorConfig$SecureRedirect build()"
+ ],
+ "fields": []
+ },
+ "com.yahoo.jdisc.http.ConnectorConfig$SecureRedirect": {
+ "superClass": "com.yahoo.config.InnerNode",
+ "interfaces": [],
+ "attributes": [
+ "public",
+ "final"
+ ],
+ "methods": [
+ "public void <init>(com.yahoo.jdisc.http.ConnectorConfig$SecureRedirect$Builder)",
+ "public boolean enabled()",
+ "public int port()"
+ ],
+ "fields": []
+ },
"com.yahoo.jdisc.http.ConnectorConfig$Ssl$Builder": {
"superClass": "java.lang.Object",
"interfaces": [
@@ -323,7 +358,10 @@
"public com.yahoo.jdisc.http.ConnectorConfig$Ssl ssl()",
"public com.yahoo.jdisc.http.ConnectorConfig$TlsClientAuthEnforcer tlsClientAuthEnforcer()",
"public com.yahoo.jdisc.http.ConnectorConfig$HealthCheckProxy healthCheckProxy()",
- "public com.yahoo.jdisc.http.ConnectorConfig$ProxyProtocol proxyProtocol()"
+ "public com.yahoo.jdisc.http.ConnectorConfig$ProxyProtocol proxyProtocol()",
+ "public com.yahoo.jdisc.http.ConnectorConfig$SecureRedirect secureRedirect()",
+ "public int maxRequestsPerConnection()",
+ "public double maxConnectionLife()"
],
"fields": [
"public static final java.lang.String CONFIG_DEF_MD5",
diff --git a/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/HttpRequestDispatch.java b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/HttpRequestDispatch.java
index 71dcb7d0682..b9d686c1d6b 100644
--- a/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/HttpRequestDispatch.java
+++ b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/HttpRequestDispatch.java
@@ -10,6 +10,7 @@ import com.yahoo.jdisc.handler.BindingNotFoundException;
import com.yahoo.jdisc.handler.ContentChannel;
import com.yahoo.jdisc.handler.OverloadException;
import com.yahoo.jdisc.handler.RequestHandler;
+import com.yahoo.jdisc.http.ConnectorConfig;
import com.yahoo.jdisc.http.HttpHeaders;
import com.yahoo.jdisc.http.HttpRequest;
import org.eclipse.jetty.io.EofException;
@@ -22,6 +23,7 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
+import java.time.Instant;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
@@ -34,6 +36,7 @@ import java.util.logging.Logger;
import static com.yahoo.jdisc.http.HttpHeaders.Values.APPLICATION_X_WWW_FORM_URLENCODED;
import static com.yahoo.jdisc.http.core.HttpServletRequestUtils.getConnection;
import static com.yahoo.jdisc.http.server.jetty.Exceptions.throwUnchecked;
+import static com.yahoo.jdisc.http.server.jetty.JDiscHttpServlet.getConnector;
/**
* @author Simon Thoresen Hult
@@ -64,14 +67,13 @@ class HttpRequestDispatch {
this.jettyRequest = (Request) servletRequest;
this.metricReporter = new MetricReporter(jDiscContext.metric, metricContext, jettyRequest.getTimeStamp());
- honourMaxKeepAliveRequests();
this.servletResponseController = new ServletResponseController(
servletRequest,
servletResponse,
jDiscContext.janitor,
metricReporter,
jDiscContext.developerMode());
-
+ markConnectionAsNonPersistentIfThresholdReached(servletRequest);
this.async = servletRequest.startAsync();
async.setTimeout(0);
metricReporter.uriLength(jettyRequest.getOriginalURI().length());
@@ -102,15 +104,6 @@ class HttpRequestDispatch {
}
}
- private void honourMaxKeepAliveRequests() {
- if (jDiscContext.serverConfig.maxKeepAliveRequests() > 0) {
- HttpConnection connection = getConnection(jettyRequest);
- if (connection.getMessagesIn() >= jDiscContext.serverConfig.maxKeepAliveRequests()) {
- connection.getGenerator().setPersistent(false);
- }
- }
- }
-
private BiConsumer<Void, Throwable> completeRequestCallback;
{
AtomicBoolean completeRequestCalled = new AtomicBoolean(false);
@@ -151,6 +144,25 @@ class HttpRequestDispatch {
};
}
+ private static void markConnectionAsNonPersistentIfThresholdReached(HttpServletRequest request) {
+ ConnectorConfig connectorConfig = getConnector(request).connectorConfig();
+ int maxRequestsPerConnection = connectorConfig.maxRequestsPerConnection();
+ if (maxRequestsPerConnection > 0) {
+ HttpConnection connection = getConnection(request);
+ if (connection.getMessagesIn() >= maxRequestsPerConnection) {
+ connection.getGenerator().setPersistent(false);
+ }
+ }
+ double maxConnectionLifeInSeconds = connectorConfig.maxConnectionLife();
+ if (maxConnectionLifeInSeconds > 0) {
+ HttpConnection connection = getConnection(request);
+ Instant expireAt = Instant.ofEpochMilli((long)(connection.getCreatedTimeStamp() + maxConnectionLifeInSeconds * 1000));
+ if (Instant.now().isAfter(expireAt)) {
+ connection.getGenerator().setPersistent(false);
+ }
+ }
+ }
+
@SafeVarargs
@SuppressWarnings("varargs")
private static boolean isErrorOfType(Throwable throwable, Class<? extends Throwable>... handledTypes) {
diff --git a/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/JDiscHttpServlet.java b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/JDiscHttpServlet.java
index cf66af31a79..5cbe7320f0e 100644
--- a/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/JDiscHttpServlet.java
+++ b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/JDiscHttpServlet.java
@@ -100,6 +100,8 @@ class JDiscHttpServlet extends HttpServlet {
}
}
+
+
static JDiscServerConnector getConnector(HttpServletRequest request) {
return (JDiscServerConnector)getConnection(request).getConnector();
}
diff --git a/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/JettyHttpServer.java b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/JettyHttpServer.java
index 71284e09669..c5f42ff9cc5 100644
--- a/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/JettyHttpServer.java
+++ b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/JettyHttpServer.java
@@ -34,9 +34,6 @@ import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.util.log.JavaUtilLog;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.InvalidSyntaxException;
-import org.osgi.framework.ServiceReference;
import javax.management.remote.JMXServiceURL;
import javax.servlet.DispatcherType;
@@ -44,10 +41,8 @@ import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.net.BindException;
import java.net.MalformedURLException;
-import java.nio.channels.ServerSocketChannel;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
@@ -246,10 +241,13 @@ public class JettyHttpServer extends AbstractServerProvider {
servletContextHandler.addServlet(jdiscServlet, "/*");
+ List<ConnectorConfig> connectorConfigs = connectors.stream().map(JDiscServerConnector::connectorConfig).collect(toList());
+ var secureRedirectHandler = new SecuredRedirectHandler(connectorConfigs);
+ secureRedirectHandler.setHandler(servletContextHandler);
+
var proxyHandler = new HealthCheckProxyHandler(connectors);
- proxyHandler.setHandler(servletContextHandler);
+ proxyHandler.setHandler(secureRedirectHandler);
- List<ConnectorConfig> connectorConfigs = connectors.stream().map(JDiscServerConnector::connectorConfig).collect(toList());
var authEnforcer = new TlsClientAuthenticationEnforcer(connectorConfigs);
authEnforcer.setHandler(proxyHandler);
@@ -282,25 +280,6 @@ public class JettyHttpServer extends AbstractServerProvider {
return ports.stream().map(Object::toString).collect(Collectors.joining(":"));
}
- private ServerSocketChannel getChannelFromServiceLayer(int listenPort, BundleContext bundleContext) {
- log.log(Level.FINE, "Retrieving channel for port " + listenPort + " from " + bundleContext.getClass().getName());
- Collection<ServiceReference<ServerSocketChannel>> refs;
- final String filter = "(port=" + listenPort + ")";
- try {
- refs = bundleContext.getServiceReferences(ServerSocketChannel.class, filter);
- } catch (InvalidSyntaxException e) {
- throw new IllegalStateException("OSGi framework rejected filter " + filter, e);
- }
- if (refs.isEmpty()) {
- return null;
- }
- if (refs.size() != 1) {
- throw new IllegalStateException("Got more than one service reference for " + ServerSocketChannel.class + " port " + listenPort + ".");
- }
- ServiceReference<ServerSocketChannel> ref = refs.iterator().next();
- return bundleContext.getService(ref);
- }
-
private static ExecutorService newJanitor(ThreadFactory factory) {
int threadPoolSize = Runtime.getRuntime().availableProcessors();
log.info("Creating janitor executor with " + threadPoolSize + " threads");
diff --git a/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/SecuredRedirectHandler.java b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/SecuredRedirectHandler.java
new file mode 100644
index 00000000000..32c0628186a
--- /dev/null
+++ b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/SecuredRedirectHandler.java
@@ -0,0 +1,52 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jdisc.http.server.jetty;
+
+import com.yahoo.jdisc.http.ConnectorConfig;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.handler.HandlerWrapper;
+import org.eclipse.jetty.util.URIUtil;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A secure redirect handler inspired by {@link org.eclipse.jetty.server.handler.SecuredRedirectHandler}.
+ *
+ * @author bjorncs
+ */
+class SecuredRedirectHandler extends HandlerWrapper {
+
+ private final Map<Integer, Integer> redirectMap;
+
+ SecuredRedirectHandler(List<ConnectorConfig> connectorConfigs) {
+ this.redirectMap = createRedirectMap(connectorConfigs);
+ }
+
+ @Override
+ public void handle(String target, Request request, HttpServletRequest servletRequest, HttpServletResponse servletResponse) throws IOException, ServletException {
+ int localPort = servletRequest.getLocalPort();
+ if (!redirectMap.containsKey(localPort)) {
+ _handler.handle(target, request, servletRequest, servletResponse);
+ return;
+ }
+ servletResponse.setContentLength(0);
+ servletResponse.sendRedirect(
+ URIUtil.newURI("https", request.getServerName(), redirectMap.get(localPort), request.getRequestURI(), request.getQueryString()));
+ request.setHandled(true);
+ }
+
+ private static Map<Integer, Integer> createRedirectMap(List<ConnectorConfig> connectorConfigs) {
+ var redirectMap = new HashMap<Integer, Integer>();
+ for (ConnectorConfig connectorConfig : connectorConfigs) {
+ if (connectorConfig.secureRedirect().enabled()) {
+ redirectMap.put(connectorConfig.listenPort(), connectorConfig.secureRedirect().port());
+ }
+ }
+ return redirectMap;
+ }
+}
diff --git a/jdisc_http_service/src/main/resources/configdefinitions/jdisc.http.connector.def b/jdisc_http_service/src/main/resources/configdefinitions/jdisc.http.connector.def
index 8027525521c..fa7ed6657d9 100644
--- a/jdisc_http_service/src/main/resources/configdefinitions/jdisc.http.connector.def
+++ b/jdisc_http_service/src/main/resources/configdefinitions/jdisc.http.connector.def
@@ -106,3 +106,15 @@ proxyProtocol.enabled bool default=false
# Allow https in parallel with proxy protocol
proxyProtocol.mixedMode bool default=false
+
+# Redirect all requests to https port
+secureRedirect.enabled bool default=false
+
+# Target port for redirect
+secureRedirect.port int default=443
+
+# Maximum number of request per connection before server marks connections as non-persistent. Set to '0' to disable.
+maxRequestsPerConnection int default=0
+
+# Maximum number of seconds a connection can live before it's marked as non-persistent. Set to '0' to disable.
+maxConnectionLife double default=0.0
diff --git a/jdisc_http_service/src/main/resources/configdefinitions/jdisc.http.server.def b/jdisc_http_service/src/main/resources/configdefinitions/jdisc.http.server.def
index 0836a080e1f..33f82963243 100644
--- a/jdisc_http_service/src/main/resources/configdefinitions/jdisc.http.server.def
+++ b/jdisc_http_service/src/main/resources/configdefinitions/jdisc.http.server.def
@@ -7,13 +7,16 @@ developerMode bool default=false
# The gzip compression level to use, if compression is enabled in a request.
responseCompressionLevel int default=6
-# Whether to enable HTTP keep-alive for requests that support this.
+# DEPRECATED - Ignored, no longer in use.
httpKeepAliveEnabled bool default=true
+# TODO Vespa 8 Remove httpKeepAliveEnabled
# Maximum number of request per http connection before server will hangup.
# Naming taken from apache http server.
# 0 means never hangup.
+# DEPRECATED - Ignored, no longer in use. Use similar parameter in connector config instead.
maxKeepAliveRequests int default=0
+# TODO Vespa 8 Remove maxKeepAliveRequests
# Whether the request body of POSTed forms should be removed (form parameters are available as request parameters).
removeRawPostBodyForWwwUrlEncodedPost bool default=false
diff --git a/jdisc_http_service/src/test/java/com/yahoo/jdisc/http/server/jetty/HttpServerTest.java b/jdisc_http_service/src/test/java/com/yahoo/jdisc/http/server/jetty/HttpServerTest.java
index 6ace9699b42..f2f3fb0ef11 100644
--- a/jdisc_http_service/src/test/java/com/yahoo/jdisc/http/server/jetty/HttpServerTest.java
+++ b/jdisc_http_service/src/test/java/com/yahoo/jdisc/http/server/jetty/HttpServerTest.java
@@ -484,8 +484,8 @@ public class HttpServerTest {
public void requireThatConnectionIsClosedAfterXRequests() throws Exception {
final int MAX_KEEPALIVE_REQUESTS = 100;
final TestDriver driver = TestDrivers.newConfiguredInstance(new EchoRequestHandler(),
- new ServerConfig.Builder().maxKeepAliveRequests(MAX_KEEPALIVE_REQUESTS),
- new ConnectorConfig.Builder());
+ new ServerConfig.Builder(),
+ new ConnectorConfig.Builder().maxRequestsPerConnection(MAX_KEEPALIVE_REQUESTS));
for (int i = 0; i < MAX_KEEPALIVE_REQUESTS - 1; i++) {
driver.client().get("/status.html")
.expectStatusCode(is(OK))
diff --git a/messagebus/src/tests/CMakeLists.txt b/messagebus/src/tests/CMakeLists.txt
index cb2a403f55d..e05f732d8b4 100644
--- a/messagebus/src/tests/CMakeLists.txt
+++ b/messagebus/src/tests/CMakeLists.txt
@@ -9,7 +9,6 @@ add_subdirectory(context)
add_subdirectory(emptyreply)
add_subdirectory(error)
add_subdirectory(identity)
-add_subdirectory(loadbalance)
add_subdirectory(messagebus)
add_subdirectory(messageordering)
add_subdirectory(messenger)
diff --git a/messagebus/src/tests/error/error.cpp b/messagebus/src/tests/error/error.cpp
index 1d8a489a5ed..244efe0bf99 100644
--- a/messagebus/src/tests/error/error.cpp
+++ b/messagebus/src/tests/error/error.cpp
@@ -18,8 +18,6 @@
using namespace mbus;
-TEST_SETUP(Test);
-
RoutingSpec getRouting() {
return RoutingSpec()
.addTable(RoutingTableSpec("Simple")
@@ -28,10 +26,7 @@ RoutingSpec getRouting() {
.addRoute(RouteSpec("test").addHop("pxy").addHop("dst")));
}
-int
-Test::Main()
-{
- TEST_INIT("error_test");
+TEST("error_test") {
Slobrok slobrok;
TestServer srcNet(Identity("test/src"), getRouting(), slobrok);
@@ -51,30 +46,31 @@ Test::Main()
ASSERT_TRUE(pxyNet.waitSlobrok("test/dst/session"));
for (int i = 0; i < 5; i++) {
- ASSERT_TRUE(ss->send(SimpleMessage::UP(new SimpleMessage("test message")), "test").isAccepted());
+ ASSERT_TRUE(ss->send(std::make_unique<SimpleMessage>("test message"), "test").isAccepted());
Message::UP msg = pxy.getMessage();
- ASSERT_TRUE(msg.get() != 0);
+ ASSERT_TRUE(msg);
is->forward(std::move(msg));
msg = dst.getMessage();
- ASSERT_TRUE(msg.get() != 0);
- Reply::UP reply(new EmptyReply());
+ ASSERT_TRUE(msg);
+ Reply::UP reply = std::make_unique<EmptyReply>();
msg->swapState(*reply);
reply->addError(Error(ErrorCode::APP_FATAL_ERROR, "fatality"));
ds->reply(std::move(reply));
reply = pxy.getReply();
- ASSERT_TRUE(reply.get() != 0);
+ ASSERT_TRUE(reply);
ASSERT_EQUAL(reply->getNumErrors(), 1u);
EXPECT_EQUAL(reply->getError(0).getService(), "test/dst/session");
reply->addError(Error(ErrorCode::APP_FATAL_ERROR, "fatality"));
is->forward(std::move(reply));
reply = src.getReply();
- ASSERT_TRUE(reply.get() != 0);
+ ASSERT_TRUE(reply);
ASSERT_EQUAL(reply->getNumErrors(), 2u);
EXPECT_EQUAL(reply->getError(0).getService(), "test/dst/session");
EXPECT_EQUAL(reply->getError(1).getService(), "test/pxy/session");
}
- TEST_DONE();
}
+
+TEST_MAIN() { TEST_RUN_ALL(); } \ No newline at end of file
diff --git a/messagebus/src/tests/loadbalance/.gitignore b/messagebus/src/tests/loadbalance/.gitignore
deleted file mode 100644
index d1cbb5977f1..00000000000
--- a/messagebus/src/tests/loadbalance/.gitignore
+++ /dev/null
@@ -1,4 +0,0 @@
-.depend
-Makefile
-loadbalance_test
-messagebus_loadbalance_test_app
diff --git a/messagebus/src/tests/loadbalance/CMakeLists.txt b/messagebus/src/tests/loadbalance/CMakeLists.txt
deleted file mode 100644
index e249a8284a6..00000000000
--- a/messagebus/src/tests/loadbalance/CMakeLists.txt
+++ /dev/null
@@ -1,9 +0,0 @@
-# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-vespa_add_executable(messagebus_loadbalance_test_app TEST
- SOURCES
- loadbalance.cpp
- DEPENDS
- messagebus_messagebus-test
- messagebus
-)
-vespa_add_test(NAME messagebus_loadbalance_test_app COMMAND messagebus_loadbalance_test_app)
diff --git a/messagebus/src/tests/loadbalance/loadbalance.cpp b/messagebus/src/tests/loadbalance/loadbalance.cpp
deleted file mode 100644
index 05ea6d78871..00000000000
--- a/messagebus/src/tests/loadbalance/loadbalance.cpp
+++ /dev/null
@@ -1,88 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include <vespa/vespalib/testkit/testapp.h>
-#include <vespa/messagebus/destinationsession.h>
-#include <vespa/messagebus/intermediatesession.h>
-#include <vespa/messagebus/messagebus.h>
-#include <vespa/messagebus/routablequeue.h>
-#include <vespa/messagebus/sourcesession.h>
-#include <vespa/messagebus/sourcesessionparams.h>
-#include <vespa/messagebus/testlib/receptor.h>
-#include <vespa/messagebus/routing/routingspec.h>
-#include <vespa/messagebus/testlib/simplemessage.h>
-#include <vespa/messagebus/testlib/simplereply.h>
-#include <vespa/messagebus/testlib/simpleprotocol.h>
-#include <vespa/messagebus/testlib/slobrok.h>
-#include <vespa/messagebus/testlib/testserver.h>
-
-using namespace mbus;
-using namespace std::chrono_literals;
-
-struct Handler : public IMessageHandler
-{
- DestinationSession::UP session;
- uint32_t cnt;
-
- Handler(MessageBus &mb) : session(), cnt(0) {
- session = mb.createDestinationSession("session", true, *this);
- }
- ~Handler() {
- session.reset();
- }
- void handleMessage(Message::UP msg) override {
- ++cnt;
- session->acknowledge(std::move(msg));
- }
-};
-
-RoutingSpec getRouting() {
- return RoutingSpec()
- .addTable(RoutingTableSpec("Simple")
- .addHop(HopSpec("dst", "test/*/session"))
- .addRoute(RouteSpec("test").addHop("dst")));
-}
-
-TEST_SETUP(Test);
-
-int
-Test::Main()
-{
- TEST_INIT("loadbalance_test");
-
- Slobrok slobrok;
- TestServer src(Identity(""), getRouting(), slobrok);
- TestServer dst1(Identity("test/dst1"), getRouting(), slobrok);
- TestServer dst2(Identity("test/dst2"), getRouting(), slobrok);
- TestServer dst3(Identity("test/dst3"), getRouting(), slobrok);
-
- Handler h1(dst1.mb);
- Handler h2(dst2.mb);
- Handler h3(dst3.mb);
-
- ASSERT_TRUE(src.waitSlobrok("test/dst1/session"));
- ASSERT_TRUE(src.waitSlobrok("test/dst2/session"));
- ASSERT_TRUE(src.waitSlobrok("test/dst3/session"));
-
- RoutableQueue queue;
- SourceSessionParams params;
- params.setTimeout(30s);
- params.setThrottlePolicy(IThrottlePolicy::SP());
- SourceSession::UP ss = src.mb.createSourceSession(queue, params);
-
- uint32_t msgCnt = 90;
- ASSERT_TRUE(msgCnt % 3 == 0);
- for (uint32_t i = 0; i < msgCnt; ++i) {
- ss->send(Message::UP(new SimpleMessage("test")), "test");
- }
- for (uint32_t i = 0; i < 1000; ++i) {
- if (queue.size() == msgCnt) {
- break;
- }
- std::this_thread::sleep_for(10ms);
- }
- EXPECT_TRUE(queue.size() == msgCnt);
- EXPECT_TRUE(h1.cnt == msgCnt / 3);
- EXPECT_TRUE(h2.cnt == msgCnt / 3);
- EXPECT_TRUE(h3.cnt == msgCnt / 3);
- TEST_DONE();
-}
diff --git a/messagebus/src/tests/messagebus/messagebus.cpp b/messagebus/src/tests/messagebus/messagebus.cpp
index 86c7bf91f2a..367bfc997d0 100644
--- a/messagebus/src/tests/messagebus/messagebus.cpp
+++ b/messagebus/src/tests/messagebus/messagebus.cpp
@@ -112,13 +112,10 @@ public:
Test();
~Test();
int Main() override;
- void testSendToAny();
void testSendToCol();
- void testSendToAnyThenCol();
void testDirectHop();
void testDirectRoute();
void testRoutingPolicyCache();
- void debugTrace();
private:
void setup();
@@ -131,21 +128,18 @@ private:
TEST_APPHOOK(Test);
-Test::Test() {}
-Test::~Test() {}
+Test::Test() = default;
+Test::~Test() = default;
int
Test::Main()
{
TEST_INIT("messagebus_test");
- testSendToAny(); TEST_FLUSH();
testSendToCol(); TEST_FLUSH();
- testSendToAnyThenCol(); TEST_FLUSH();
testDirectHop(); TEST_FLUSH();
testDirectRoute(); TEST_FLUSH();
testRoutingPolicyCache(); TEST_FLUSH();
- debugTrace(); TEST_FLUSH();
TEST_DONE();
}
@@ -206,38 +200,6 @@ void Test::teardown()
}
void
-Test::testSendToAny()
-{
- setup();
- for (uint32_t i = 0; i < 300; ++i) {
- Message::UP msg(new SimpleMessage("test"));
- EXPECT_TRUE(client->session->send(std::move(msg), "DocProc").isAccepted());
- }
- EXPECT_TRUE(dp0->waitQueueSize(100));
- EXPECT_TRUE(dp1->waitQueueSize(100));
- EXPECT_TRUE(dp2->waitQueueSize(100));
- for (uint32_t i = 0; i < dpVec.size(); ++i) {
- DocProc *p = dpVec[i];
- while (p->queue.size() > 0) {
- Routable::UP msg = p->queue.dequeue();
- ASSERT_TRUE(msg);
- Reply::UP reply(new EmptyReply());
- msg->swapState(*reply);
- reply->addError(Error(ErrorCode::FATAL_ERROR, ""));
- p->session->forward(std::move(reply));
- }
- }
- EXPECT_TRUE(client->waitQueueSize(300));
- while (client->queue.size() > 0) {
- Routable::UP reply = client->queue.dequeue();
- ASSERT_TRUE(reply);
- ASSERT_TRUE(reply->isReply());
- EXPECT_TRUE(static_cast<Reply&>(*reply).getNumErrors() == 1);
- }
- teardown();
-}
-
-void
Test::testSendToCol()
{
setup();
@@ -282,83 +244,6 @@ Test::testSendToCol()
}
void
-Test::testSendToAnyThenCol()
-{
- setup();
- ASSERT_TRUE(SimpleMessage("msg").getHash() % 2 == 0);
- for (uint32_t i = 0; i < 150; ++i) {
- Message::UP msg(new SimpleMessage("msg"));
- EXPECT_TRUE(client->session->send(std::move(msg), "Index").isAccepted());
- }
- EXPECT_TRUE(dp0->waitQueueSize(50));
- EXPECT_TRUE(dp1->waitQueueSize(50));
- EXPECT_TRUE(dp2->waitQueueSize(50));
- for (uint32_t i = 0; i < dpVec.size(); ++i) {
- DocProc *p = dpVec[i];
- while (p->queue.size() > 0) {
- Routable::UP r = p->queue.dequeue();
- ASSERT_TRUE(r);
- p->session->forward(std::move(r));
- }
- }
- EXPECT_TRUE(search00->waitQueueSize(150));
- EXPECT_TRUE(search01->waitQueueSize(0));
- EXPECT_TRUE(search10->waitQueueSize(150));
- EXPECT_TRUE(search11->waitQueueSize(0));
- ASSERT_TRUE(SimpleMessage("msh").getHash() % 2 == 1);
- for (uint32_t i = 0; i < 150; ++i) {
- Message::UP msg(new SimpleMessage("msh"));
- ASSERT_TRUE(client->session->send(std::move(msg), "Index").isAccepted());
- }
- EXPECT_TRUE(dp0->waitQueueSize(50));
- EXPECT_TRUE(dp1->waitQueueSize(50));
- EXPECT_TRUE(dp2->waitQueueSize(50));
- for (uint32_t i = 0; i < dpVec.size(); ++i) {
- DocProc *p = dpVec[i];
- while (p->queue.size() > 0) {
- Routable::UP r = p->queue.dequeue();
- ASSERT_TRUE(r);
- p->session->forward(std::move(r));
- }
- }
- EXPECT_TRUE(search00->waitQueueSize(150));
- EXPECT_TRUE(search01->waitQueueSize(150));
- EXPECT_TRUE(search10->waitQueueSize(150));
- EXPECT_TRUE(search11->waitQueueSize(150));
- for (uint32_t i = 0; i < searchVec.size(); ++i) {
- Search *s = searchVec[i];
- while (s->queue.size() > 0) {
- Routable::UP msg = s->queue.dequeue();
- ASSERT_TRUE(msg);
- Reply::UP reply(new EmptyReply());
- msg->swapState(*reply);
- s->session->reply(std::move(reply));
- }
- }
- EXPECT_TRUE(dp0->waitQueueSize(100));
- EXPECT_TRUE(dp1->waitQueueSize(100));
- EXPECT_TRUE(dp2->waitQueueSize(100));
- for (uint32_t i = 0; i < dpVec.size(); ++i) {
- DocProc *p = dpVec[i];
- while (p->queue.size() > 0) {
- Routable::UP r = p->queue.dequeue();
- ASSERT_TRUE(r);
- p->session->forward(std::move(r));
- }
- }
- client->waitQueueSize(300);
- std::this_thread::sleep_for(100ms);
- client->waitQueueSize(300);
- while (client->queue.size() > 0) {
- Routable::UP reply = client->queue.dequeue();
- ASSERT_TRUE(reply);
- ASSERT_TRUE(reply->isReply());
- EXPECT_TRUE(static_cast<Reply&>(*reply).getNumErrors() == 0);
- }
- teardown();
-}
-
-void
Test::testDirectHop()
{
setup();
@@ -468,65 +353,3 @@ Test::testRoutingPolicyCache()
teardown();
}
-
-void
-Test::debugTrace()
-{
- setup();
- ASSERT_TRUE(SimpleMessage("msg").getHash() % 2 == 0);
- for (uint32_t i = 0; i < 3; ++i) {
- Message::UP msg(new SimpleMessage("msg"));
- msg->getTrace().setLevel(4 + i);
- EXPECT_TRUE(client->session->send(std::move(msg), "Index").isAccepted());
- }
- EXPECT_TRUE(dp0->waitQueueSize(1));
- EXPECT_TRUE(dp1->waitQueueSize(1));
- EXPECT_TRUE(dp2->waitQueueSize(1));
- for (uint32_t i = 0; i < dpVec.size(); ++i) {
- DocProc *p = dpVec[i];
- while (p->queue.size() > 0) {
- Routable::UP r = p->queue.dequeue();
- ASSERT_TRUE(r);
- p->session->forward(std::move(r));
- }
- }
- EXPECT_TRUE(search00->waitQueueSize(3));
- EXPECT_TRUE(search01->waitQueueSize(0));
- EXPECT_TRUE(search10->waitQueueSize(3));
- EXPECT_TRUE(search11->waitQueueSize(0));
- for (uint32_t i = 0; i < searchVec.size(); ++i) {
- Search *s = searchVec[i];
- while (s->queue.size() > 0) {
- Routable::UP msg = s->queue.dequeue();
- ASSERT_TRUE(msg);
- Reply::UP reply(new EmptyReply());
- msg->swapState(*reply);
- s->session->reply(std::move(reply));
- }
- }
- EXPECT_TRUE(dp0->waitQueueSize(1));
- EXPECT_TRUE(dp1->waitQueueSize(1));
- EXPECT_TRUE(dp2->waitQueueSize(1));
- for (uint32_t i = 0; i < dpVec.size(); ++i) {
- DocProc *p = dpVec[i];
- while (p->queue.size() > 0) {
- Routable::UP r = p->queue.dequeue();
- ASSERT_TRUE(r);
- p->session->forward(std::move(r));
- }
- }
- client->waitQueueSize(3);
- Routable::UP reply = client->queue.dequeue();
- fprintf(stderr, "\nTRACE DUMP(level=%d):\n%s\n\n",
- reply->getTrace().getLevel(),
- reply->getTrace().toString().c_str());
- reply = client->queue.dequeue();
- fprintf(stderr, "\nTRACE DUMP(level=%d):\n%s\n\n",
- reply->getTrace().getLevel(),
- reply->getTrace().toString().c_str());
- reply = client->queue.dequeue();
- fprintf(stderr, "\nTRACE DUMP(level=%d):\n%s\n\n",
- reply->getTrace().getLevel(),
- reply->getTrace().toString().c_str());
- teardown();
-}
diff --git a/messagebus/src/tests/serviceaddress/serviceaddress.cpp b/messagebus/src/tests/serviceaddress/serviceaddress.cpp
index 441da5a80ac..c5d07bc437a 100644
--- a/messagebus/src/tests/serviceaddress/serviceaddress.cpp
+++ b/messagebus/src/tests/serviceaddress/serviceaddress.cpp
@@ -7,34 +7,49 @@
using namespace mbus;
-class Test : public vespalib::TestApp {
-public:
- int Main() override;
- void testAddrServiceAddress();
- void testNameServiceAddress();
-
-private:
- bool waitSlobrok(RPCNetwork &network, const string &pattern, size_t num);
- bool testAddress(RPCNetwork& network, const string &pattern,
- const string &expectedSpec, const string &expectedSession);
- bool testNullAddress(RPCNetwork &network, const string &pattern);
-};
-
-int
-Test::Main()
+bool
+waitSlobrok(RPCNetwork &network, const string &pattern, size_t num)
{
- TEST_INIT("serviceaddress_test");
-
- testAddrServiceAddress(); TEST_FLUSH();
- testNameServiceAddress(); TEST_FLUSH();
+ for (int i = 0; i < 1000; i++) {
+ slobrok::api::IMirrorAPI::SpecList res = network.getMirror().lookup(pattern);
+ if (res.size() == num) {
+ return true;
+ }
+ std::this_thread::sleep_for(10ms);
+ }
+ return false;
+}
- TEST_DONE();
+bool
+testNullAddress(RPCNetwork &network, const string &pattern)
+{
+ RPCService service(network.getMirror(), pattern);
+ RPCServiceAddress::UP obj = service.make_address();
+ if ( ! EXPECT_FALSE(obj)) {
+ return false;
+ }
+ return true;
}
-TEST_APPHOOK(Test);
+bool
+testAddress(RPCNetwork &network, const string &pattern,
+ const string &expectedSpec, const string &expectedSession)
+{
+ RPCService service(network.getMirror(), pattern);
+ RPCServiceAddress::UP obj = service.make_address();
+ if (!EXPECT_TRUE(obj)) {
+ return false;
+ }
+ if (!EXPECT_EQUAL(expectedSpec, obj->getConnectionSpec())) {
+ return false;
+ }
+ if (!EXPECT_EQUAL(expectedSession, obj->getSessionName())) {
+ return false;
+ }
+ return true;
+}
-void
-Test::testAddrServiceAddress()
+TEST("testAddrServiceAddress")
{
Slobrok slobrok;
RPCNetwork network(RPCNetworkParams(slobrok.config())
@@ -55,8 +70,7 @@ Test::testAddrServiceAddress()
network.shutdown();
}
-void
-Test::testNameServiceAddress()
+TEST("testNameServiceAddress")
{
Slobrok slobrok;
RPCNetwork network(RPCNetworkParams(slobrok.config())
@@ -74,45 +88,4 @@ Test::testNameServiceAddress()
network.shutdown();
}
-bool
-Test::waitSlobrok(RPCNetwork &network, const string &pattern, size_t num)
-{
- for (int i = 0; i < 1000; i++) {
- slobrok::api::IMirrorAPI::SpecList res = network.getMirror().lookup(pattern);
- if (res.size() == num) {
- return true;
- }
- std::this_thread::sleep_for(10ms);
- }
- return false;
-}
-
-bool
-Test::testNullAddress(RPCNetwork &network, const string &pattern)
-{
- RPCService service(network.getMirror(), pattern);
- RPCServiceAddress::UP obj = service.resolve();
- if (!EXPECT_TRUE(obj.get() == NULL)) {
- return false;
- }
- return true;
-}
-
-bool
-Test::testAddress(RPCNetwork &network, const string &pattern,
- const string &expectedSpec, const string &expectedSession)
-{
- RPCService service(network.getMirror(), pattern);
- RPCServiceAddress::UP obj = service.resolve();
- if (!EXPECT_TRUE(obj.get() != NULL)) {
- return false;
- }
- if (!EXPECT_EQUAL(expectedSpec, obj->getConnectionSpec())) {
- return false;
- }
- if (!EXPECT_EQUAL(expectedSession, obj->getSessionName())) {
- return false;
- }
- return true;
-}
-
+TEST_MAIN() { TEST_RUN_ALL(); } \ No newline at end of file
diff --git a/messagebus/src/tests/servicepool/servicepool.cpp b/messagebus/src/tests/servicepool/servicepool.cpp
index 1334831c30c..58fc76f6b50 100644
--- a/messagebus/src/tests/servicepool/servicepool.cpp
+++ b/messagebus/src/tests/servicepool/servicepool.cpp
@@ -4,67 +4,59 @@
#include <vespa/messagebus/network/rpcnetworkparams.h>
#include <vespa/messagebus/network/rpcservicepool.h>
#include <vespa/messagebus/testlib/slobrok.h>
+#include <vespa/messagebus/testlib/testserver.h>
#include <vespa/vespalib/testkit/testapp.h>
using namespace mbus;
-class Test : public vespalib::TestApp {
-private:
- void testMaxSize();
-
-public:
- int Main() override {
- TEST_INIT("servicepool_test");
-
- testMaxSize(); TEST_FLUSH();
-
- TEST_DONE();
- }
-};
-
-TEST_APPHOOK(Test);
-
-void
-Test::testMaxSize()
+TEST("testMaxSize")
{
Slobrok slobrok;
- RPCNetwork net(RPCNetworkParams(slobrok.config()));
- RPCServicePool pool(net, 2);
- net.start();
-
- pool.resolve("foo");
+ TestServer me(Identity("me"), RoutingSpec(), slobrok);
+ RPCNetwork & net = me.net;
+ net.registerSession("foo");
+ net.registerSession("bar");
+ net.registerSession("baz");
+ me.waitSlobrok("me/foo");
+ me.waitSlobrok("me/bar");
+ me.waitSlobrok("me/baz");
+ RPCServicePool pool(net.getMirror(), 2);
+
+ RPCServiceAddress::UP addr = pool.resolve("me/foo");
EXPECT_EQUAL(1u, pool.getSize());
- EXPECT_TRUE(pool.hasService("foo"));
- EXPECT_TRUE(!pool.hasService("bar"));
- EXPECT_TRUE(!pool.hasService("baz"));
+ EXPECT_TRUE(pool.hasService("me/foo"));
+ EXPECT_TRUE(!pool.hasService("me/bar"));
+ EXPECT_TRUE(!pool.hasService("me/baz"));
- pool.resolve("foo");
+ addr = pool.resolve("me/foo");
EXPECT_EQUAL(1u, pool.getSize());
- EXPECT_TRUE(pool.hasService("foo"));
- EXPECT_TRUE(!pool.hasService("bar"));
- EXPECT_TRUE(!pool.hasService("baz"));
+ EXPECT_TRUE(pool.hasService("me/foo"));
+ EXPECT_TRUE(!pool.hasService("me/bar"));
+ EXPECT_TRUE(!pool.hasService("me/baz"));
- pool.resolve("bar");
+ addr = pool.resolve("me/bar");
EXPECT_EQUAL(2u, pool.getSize());
- EXPECT_TRUE(pool.hasService("foo"));
- EXPECT_TRUE(pool.hasService("bar"));
- EXPECT_TRUE(!pool.hasService("baz"));
+ EXPECT_TRUE(pool.hasService("me/foo"));
+ EXPECT_TRUE(pool.hasService("me/bar"));
+ EXPECT_TRUE(!pool.hasService("me/baz"));
- pool.resolve("baz");
+ addr = pool.resolve("me/baz");
EXPECT_EQUAL(2u, pool.getSize());
- EXPECT_TRUE(!pool.hasService("foo"));
- EXPECT_TRUE(pool.hasService("bar"));
- EXPECT_TRUE(pool.hasService("baz"));
+ EXPECT_TRUE(!pool.hasService("me/foo"));
+ EXPECT_TRUE(pool.hasService("me/bar"));
+ EXPECT_TRUE(pool.hasService("me/baz"));
- pool.resolve("bar");
+ addr = pool.resolve("me/bar");
EXPECT_EQUAL(2u, pool.getSize());
- EXPECT_TRUE(!pool.hasService("foo"));
- EXPECT_TRUE(pool.hasService("bar"));
- EXPECT_TRUE(pool.hasService("baz"));
+ EXPECT_TRUE(!pool.hasService("me/foo"));
+ EXPECT_TRUE(pool.hasService("me/bar"));
+ EXPECT_TRUE(pool.hasService("me/baz"));
- pool.resolve("foo");
+ addr = pool.resolve("me/foo");
EXPECT_EQUAL(2u, pool.getSize());
- EXPECT_TRUE(pool.hasService("foo"));
- EXPECT_TRUE(pool.hasService("bar"));
- EXPECT_TRUE(!pool.hasService("baz"));
+ EXPECT_TRUE(pool.hasService("me/foo"));
+ EXPECT_TRUE(pool.hasService("me/bar"));
+ EXPECT_TRUE(!pool.hasService("me/baz"));
}
+
+TEST_MAIN() { TEST_RUN_ALL(); } \ No newline at end of file
diff --git a/messagebus/src/tests/shutdown/shutdown.cpp b/messagebus/src/tests/shutdown/shutdown.cpp
index e415622707f..07d9f0fae5d 100644
--- a/messagebus/src/tests/shutdown/shutdown.cpp
+++ b/messagebus/src/tests/shutdown/shutdown.cpp
@@ -12,30 +12,9 @@
using namespace mbus;
-class Test : public vespalib::TestApp {
-private:
- void requireThatListenFailedIsExceptionSafe();
- void requireThatShutdownOnSourceWithPendingIsSafe();
- void requireThatShutdownOnIntermediateWithPendingIsSafe();
-
-public:
- int Main() override {
- TEST_INIT("shutdown_test");
-
- requireThatListenFailedIsExceptionSafe(); TEST_FLUSH();
- requireThatShutdownOnSourceWithPendingIsSafe(); TEST_FLUSH();
- requireThatShutdownOnIntermediateWithPendingIsSafe(); TEST_FLUSH();
-
- TEST_DONE();
- }
-};
-
static const duration TIMEOUT = 120s;
-TEST_APPHOOK(Test);
-
-void
-Test::requireThatListenFailedIsExceptionSafe()
+TEST("requireThatListenFailedIsExceptionSafe")
{
fnet::frt::StandaloneFRT orb;
ASSERT_TRUE(orb.supervisor().Listen(0));
@@ -51,8 +30,7 @@ Test::requireThatListenFailedIsExceptionSafe()
}
}
-void
-Test::requireThatShutdownOnSourceWithPendingIsSafe()
+TEST("requireThatShutdownOnSourceWithPendingIsSafe")
{
Slobrok slobrok;
TestServer dstServer(MessageBusParams()
@@ -87,8 +65,7 @@ Test::requireThatShutdownOnSourceWithPendingIsSafe()
}
}
-void
-Test::requireThatShutdownOnIntermediateWithPendingIsSafe()
+TEST("requireThatShutdownOnIntermediateWithPendingIsSafe")
{
Slobrok slobrok;
TestServer dstServer(MessageBusParams()
@@ -114,7 +91,7 @@ Test::requireThatShutdownOnIntermediateWithPendingIsSafe()
ASSERT_TRUE(srcServer.waitSlobrok("dst/session", 1));
for (uint32_t i = 0; i < 10; ++i) {
- Message::UP msg(new SimpleMessage("msg"));
+ Message::UP msg = std::make_unique<SimpleMessage>("msg");
{
TestServer itrServer(MessageBusParams()
.setRetryPolicy(std::make_shared<RetryTransientErrorsPolicy>())
@@ -141,3 +118,5 @@ Test::requireThatShutdownOnIntermediateWithPendingIsSafe()
dstServer.mb.sync();
}
}
+
+TEST_MAIN() { TEST_RUN_ALL(); } \ No newline at end of file
diff --git a/messagebus/src/tests/targetpool/targetpool.cpp b/messagebus/src/tests/targetpool/targetpool.cpp
index 0e0e566f2be..9259f992d6c 100644
--- a/messagebus/src/tests/targetpool/targetpool.cpp
+++ b/messagebus/src/tests/targetpool/targetpool.cpp
@@ -22,12 +22,7 @@ public:
}
};
-TEST_SETUP(Test);
-
-int
-Test::Main()
-{
- TEST_INIT("targetpool_test");
+TEST("targetpool_test") {
// Necessary setup to be able to resolve targets.
Slobrok slobrok;
@@ -46,9 +41,9 @@ Test::Main()
// Assert that all connections expire.
RPCTarget::SP target;
- ASSERT_TRUE((target = pool.getTarget(orb, adr1)).get() != NULL); target.reset();
- ASSERT_TRUE((target = pool.getTarget(orb, adr2)).get() != NULL); target.reset();
- ASSERT_TRUE((target = pool.getTarget(orb, adr3)).get() != NULL); target.reset();
+ ASSERT_TRUE((target = pool.getTarget(orb, adr1))); target.reset();
+ ASSERT_TRUE((target = pool.getTarget(orb, adr2))); target.reset();
+ ASSERT_TRUE((target = pool.getTarget(orb, adr3))); target.reset();
EXPECT_EQUAL(3u, pool.size());
for (uint32_t i = 0; i < 10; ++i) {
pool.flushTargets(false);
@@ -59,19 +54,19 @@ Test::Main()
EXPECT_EQUAL(0u, pool.size());
// Assert that only idle connections expire.
- ASSERT_TRUE((target = pool.getTarget(orb, adr1)).get() != NULL); target.reset();
- ASSERT_TRUE((target = pool.getTarget(orb, adr2)).get() != NULL); target.reset();
- ASSERT_TRUE((target = pool.getTarget(orb, adr3)).get() != NULL); target.reset();
+ ASSERT_TRUE((target = pool.getTarget(orb, adr1))); target.reset();
+ ASSERT_TRUE((target = pool.getTarget(orb, adr2))); target.reset();
+ ASSERT_TRUE((target = pool.getTarget(orb, adr3))); target.reset();
EXPECT_EQUAL(3u, pool.size());
timer.millis += 444;
pool.flushTargets(false);
EXPECT_EQUAL(3u, pool.size());
- ASSERT_TRUE((target = pool.getTarget(orb, adr2)).get() != NULL); target.reset();
- ASSERT_TRUE((target = pool.getTarget(orb, adr3)).get() != NULL); target.reset();
+ ASSERT_TRUE((target = pool.getTarget(orb, adr2))); target.reset();
+ ASSERT_TRUE((target = pool.getTarget(orb, adr3))); target.reset();
timer.millis += 444;
pool.flushTargets(false);
EXPECT_EQUAL(2u, pool.size());
- ASSERT_TRUE((target = pool.getTarget(orb, adr3)).get() != NULL); target.reset();
+ ASSERT_TRUE((target = pool.getTarget(orb, adr3))); target.reset();
timer.millis += 444;
pool.flushTargets(false);
EXPECT_EQUAL(1u, pool.size());
@@ -80,7 +75,7 @@ Test::Main()
EXPECT_EQUAL(0u, pool.size());
// Assert that connections never expire while they are referenced.
- ASSERT_TRUE((target = pool.getTarget(orb, adr1)).get() != NULL);
+ ASSERT_TRUE((target = pool.getTarget(orb, adr1)));
EXPECT_EQUAL(1u, pool.size());
for (int i = 0; i < 10; ++i) {
timer.millis += 999;
@@ -91,6 +86,6 @@ Test::Main()
timer.millis += 999;
pool.flushTargets(false);
EXPECT_EQUAL(0u, pool.size());
-
- TEST_DONE();
}
+
+TEST_MAIN() { TEST_RUN_ALL(); } \ No newline at end of file
diff --git a/messagebus/src/vespa/messagebus/callstack.cpp b/messagebus/src/vespa/messagebus/callstack.cpp
index b7179e14cad..ab22f1ace34 100644
--- a/messagebus/src/vespa/messagebus/callstack.cpp
+++ b/messagebus/src/vespa/messagebus/callstack.cpp
@@ -20,7 +20,7 @@ CallStack::discard()
}
}
-CallStack::~CallStack() { }
+CallStack::~CallStack() = default;
IReplyHandler &
CallStack::pop(Reply &reply)
diff --git a/messagebus/src/vespa/messagebus/messenger.cpp b/messagebus/src/vespa/messagebus/messenger.cpp
index 5313c4adcbb..36211d8ec38 100644
--- a/messagebus/src/vespa/messagebus/messenger.cpp
+++ b/messagebus/src/vespa/messagebus/messenger.cpp
@@ -246,13 +246,13 @@ Messenger::start()
void
Messenger::deliverMessage(Message::UP msg, IMessageHandler &handler)
{
- enqueue(std::make_unique<MessageTask>(std::move(msg), handler));
+ handler.handleMessage(std::move(msg));
}
void
Messenger::deliverReply(Reply::UP reply, IReplyHandler &handler)
{
- enqueue(std::make_unique<ReplyTask>(std::move(reply), handler));
+ handler.handleReply(std::move(reply));
}
void
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
index 4b498c4c014..faa67b9bece 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
@@ -17,6 +17,8 @@
#include <vespa/fnet/scheduler.h>
#include <vespa/fnet/transport.h>
#include <vespa/fnet/frt/supervisor.h>
+#include <vespa/vespalib/util/singleexecutor.h>
+#include <vespa/vespalib/util/lambdatask.h>
#include <vespa/fastos/thread.h>
#include <thread>
@@ -26,6 +28,8 @@ LOG_SETUP(".rpcnetwork");
using vespalib::make_string;
using namespace std::chrono_literals;
+namespace mbus {
+
namespace {
/**
@@ -45,7 +49,9 @@ public:
_gate() {
ScheduleNow();
}
- ~SyncTask() override = default;
+ ~SyncTask() override {
+ Kill();
+ }
void await() {
_gate.await();
@@ -56,9 +62,36 @@ public:
}
};
-} // namespace <unnamed>
+struct TargetPoolTask : public FNET_Task {
+ RPCTargetPool &_pool;
-namespace mbus {
+ TargetPoolTask(FNET_Scheduler &scheduler, RPCTargetPool &pool)
+ : FNET_Task(&scheduler),
+ _pool(pool)
+ {
+ ScheduleNow();
+ }
+ ~TargetPoolTask() override {
+ Kill();
+ }
+ void PerformTask() override {
+ _pool.flushTargets(false);
+ Schedule(1.0);
+ }
+};
+
+std::unique_ptr<vespalib::SyncableThreadExecutor>
+createExecutor(RPCNetworkParams::OptimizeFor optimizeFor) {
+ switch (optimizeFor) {
+ case RPCNetworkParams::OptimizeFor::LATENCY:
+ return std::make_unique<vespalib::ThreadStackExecutor>(1, 0x10000);
+ case RPCNetworkParams::OptimizeFor::THROUGHPUT:
+ default:
+ return std::make_unique<vespalib::SingleExecutor>(100);
+ }
+}
+
+}
RPCNetwork::SendContext::SendContext(RPCNetwork &net, const Message &msg,
const std::vector<RoutingNode*> &recipients)
@@ -87,40 +120,35 @@ RPCNetwork::SendContext::handleVersion(const vespalib::Version *version)
}
}
if (shouldSend) {
- _net.send(*this);
- delete this;
+ if (_net.allowDispatchForEncode()) {
+ auto rejected = _net.getEncodeExecutor(true).execute(vespalib::makeLambdaTask([this]() {
+ _net.send(*this);
+ delete this;
+ }));
+ assert (!rejected);
+ } else {
+ _net.send(*this);
+ delete this;
+ }
}
}
-RPCNetwork::TargetPoolTask::TargetPoolTask(FNET_Scheduler &scheduler, RPCTargetPool &pool)
- : FNET_Task(&scheduler),
- _pool(pool)
-{
- ScheduleNow();
-}
-
-void
-RPCNetwork::TargetPoolTask::PerformTask()
-{
- _pool.flushTargets(false);
- Schedule(1.0);
-}
-
RPCNetwork::RPCNetwork(const RPCNetworkParams &params) :
_owner(nullptr),
_ident(params.getIdentity()),
_threadPool(std::make_unique<FastOS_ThreadPool>(128000, 0)),
- _transport(std::make_unique<FNET_Transport>()),
+ _transport(std::make_unique<FNET_Transport>(params.getNumThreads())),
_orb(std::make_unique<FRT_Supervisor>(_transport.get())),
_scheduler(*_transport->GetScheduler()),
- _targetPool(std::make_unique<RPCTargetPool>(params.getConnectionExpireSecs())),
- _targetPoolTask(_scheduler, *_targetPool),
- _servicePool(std::make_unique<RPCServicePool>(*this, 4096)),
_slobrokCfgFactory(std::make_unique<slobrok::ConfiguratorFactory>(params.getSlobrokConfig())),
_mirror(std::make_unique<slobrok::api::MirrorAPI>(*_orb, *_slobrokCfgFactory)),
_regAPI(std::make_unique<slobrok::api::RegisterAPI>(*_orb, *_slobrokCfgFactory)),
_requestedPort(params.getListenPort()),
- _executor(std::make_unique<vespalib::ThreadStackExecutor>(params.getNumThreads(), 65536)),
+ _targetPool(std::make_unique<RPCTargetPool>(params.getConnectionExpireSecs())),
+ _targetPoolTask(std::make_unique<TargetPoolTask>(_scheduler, *_targetPool)),
+ _servicePool(std::make_unique<RPCServicePool>(*_mirror, 4096)),
+ _singleEncodeExecutor(createExecutor(params.getOptimizeFor())),
+ _singleDecodeExecutor(createExecutor(params.getOptimizeFor())),
_sendV1(std::make_unique<RPCSendV1>()),
_sendV2(std::make_unique<RPCSendV2>()),
_sendAdapters(),
@@ -130,7 +158,6 @@ RPCNetwork::RPCNetwork(const RPCNetworkParams &params) :
{
_transport->SetMaxInputBufferSize(params.getMaxInputBufferSize());
_transport->SetMaxOutputBufferSize(params.getMaxOutputBufferSize());
- _transport->SetTCPNoDelay(params.getTcpNoDelay());
}
RPCNetwork::~RPCNetwork()
@@ -400,7 +427,8 @@ void
RPCNetwork::sync()
{
SyncTask task(_scheduler);
- _executor->sync();
+ _singleEncodeExecutor->sync();
+ _singleDecodeExecutor->sync();
task.await();
}
@@ -409,8 +437,10 @@ RPCNetwork::shutdown()
{
_transport->ShutDown(true);
_threadPool->Close();
- _executor->shutdown();
- _executor->sync();
+ _singleEncodeExecutor->shutdown();
+ _singleDecodeExecutor->shutdown();
+ _singleEncodeExecutor->sync();
+ _singleDecodeExecutor->sync();
}
void
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.h b/messagebus/src/vespa/messagebus/network/rpcnetwork.h
index a6c2724929d..a510aae9014 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.h
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.h
@@ -50,13 +50,6 @@ private:
void handleVersion(const vespalib::Version *version) override;
};
- struct TargetPoolTask : public FNET_Task {
- RPCTargetPool &_pool;
-
- TargetPoolTask(FNET_Scheduler &scheduler, RPCTargetPool &pool);
- void PerformTask() override;
- };
-
using SendAdapterMap = std::map<vespalib::Version, RPCSendAdapter*>;
INetworkOwner *_owner;
@@ -65,14 +58,15 @@ private:
std::unique_ptr<FNET_Transport> _transport;
std::unique_ptr<FRT_Supervisor> _orb;
FNET_Scheduler &_scheduler;
- std::unique_ptr<RPCTargetPool> _targetPool;
- TargetPoolTask _targetPoolTask;
- std::unique_ptr<RPCServicePool> _servicePool;
std::unique_ptr<slobrok::ConfiguratorFactory> _slobrokCfgFactory;
std::unique_ptr<slobrok::api::IMirrorAPI> _mirror;
std::unique_ptr<slobrok::api::RegisterAPI> _regAPI;
int _requestedPort;
- std::unique_ptr<vespalib::ThreadStackExecutor> _executor;
+ std::unique_ptr<RPCTargetPool> _targetPool;
+ std::unique_ptr<FNET_Task> _targetPoolTask;
+ std::unique_ptr<RPCServicePool> _servicePool;
+ std::unique_ptr<vespalib::SyncableThreadExecutor> _singleEncodeExecutor;
+ std::unique_ptr<vespalib::SyncableThreadExecutor> _singleDecodeExecutor;
std::unique_ptr<RPCSendAdapter> _sendV1;
std::unique_ptr<RPCSendAdapter> _sendV2;
SendAdapterMap _sendAdapters;
@@ -80,7 +74,6 @@ private:
bool _allowDispatchForEncode;
bool _allowDispatchForDecode;
-
/**
* Resolves and assigns a service address for the given recipient using the
* given address. This is called by the {@link
@@ -231,7 +224,8 @@ public:
const slobrok::api::IMirrorAPI &getMirror() const override;
CompressionConfig getCompressionConfig() { return _compressionConfig; }
void invoke(FRT_RPCRequest *req);
- vespalib::Executor & getExecutor() const { return *_executor; }
+ vespalib::Executor & getEncodeExecutor(bool requireSequencing) const { return requireSequencing ? *_singleEncodeExecutor : *_singleEncodeExecutor; }
+ vespalib::Executor & getDecodeExecutor(bool requireSequencing) const { return requireSequencing ? *_singleDecodeExecutor : *_singleDecodeExecutor; }
bool allowDispatchForEncode() const { return _allowDispatchForEncode; }
bool allowDispatchForDecode() const { return _allowDispatchForDecode; }
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp
index 5bf277a8ee6..482a46b2564 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp
@@ -14,8 +14,8 @@ RPCNetworkParams::RPCNetworkParams(config::ConfigUri configUri) :
_listenPort(0),
_maxInputBufferSize(256*1024),
_maxOutputBufferSize(256*1024),
- _numThreads(4),
- _tcpNoDelay(true),
+ _numThreads(1),
+ _optimizeFor(OptimizeFor::LATENCY),
_dispatchOnEncode(true),
_dispatchOnDecode(false),
_connectionExpireSecs(600),
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h
index 140f81c611c..a4b752f46d4 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h
+++ b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h
@@ -12,21 +12,10 @@ namespace mbus {
* held by this class. This class has reasonable default values for each parameter.
*/
class RPCNetworkParams {
-private:
+public:
+ enum class OptimizeFor { LATENCY, THROUGHPUT};
using CompressionConfig = vespalib::compression::CompressionConfig;
- Identity _identity;
- config::ConfigUri _slobrokConfig;
- int _listenPort;
- uint32_t _maxInputBufferSize;
- uint32_t _maxOutputBufferSize;
- uint32_t _numThreads;
- bool _tcpNoDelay;
- bool _dispatchOnEncode;
- bool _dispatchOnDecode;
- double _connectionExpireSecs;
- CompressionConfig _compressionConfig;
-public:
RPCNetworkParams();
RPCNetworkParams(config::ConfigUri configUri);
~RPCNetworkParams();
@@ -107,12 +96,12 @@ public:
uint32_t getNumThreads() const { return _numThreads; }
- RPCNetworkParams &setTcpNoDelay(bool tcpNoDelay) {
- _tcpNoDelay = tcpNoDelay;
+ RPCNetworkParams &setOptimizeFor(OptimizeFor tcpNoDelay) {
+ _optimizeFor = tcpNoDelay;
return *this;
}
- bool getTcpNoDelay() const { return _tcpNoDelay; }
+ OptimizeFor getOptimizeFor() const { return _optimizeFor; }
/**
* Returns the number of seconds before an idle network connection expires.
@@ -198,6 +187,18 @@ public:
}
uint32_t getDispatchOnEncode() const { return _dispatchOnEncode; }
+private:
+ Identity _identity;
+ config::ConfigUri _slobrokConfig;
+ int _listenPort;
+ uint32_t _maxInputBufferSize;
+ uint32_t _maxOutputBufferSize;
+ uint32_t _numThreads;
+ OptimizeFor _optimizeFor;
+ bool _dispatchOnEncode;
+ bool _dispatchOnDecode;
+ double _connectionExpireSecs;
+ CompressionConfig _compressionConfig;
};
}
diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.cpp b/messagebus/src/vespa/messagebus/network/rpcsend.cpp
index 2422638dc05..d217c7964d6 100644
--- a/messagebus/src/vespa/messagebus/network/rpcsend.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcsend.cpp
@@ -148,7 +148,14 @@ RPCSend::send(RoutingNode &recipient, const vespalib::Version &version,
void
RPCSend::RequestDone(FRT_RPCRequest *req)
{
- doRequestDone(req);
+ if ( _net->allowDispatchForDecode()) {
+ auto rejected = _net->getDecodeExecutor(true).execute(makeLambdaTask([this, req]() {
+ doRequestDone(req);
+ }));
+ assert (!rejected);
+ } else {
+ doRequestDone(req);
+ }
}
void
@@ -221,13 +228,13 @@ void
RPCSend::handleReply(Reply::UP reply)
{
const IProtocol * protocol = _net->getOwner().getProtocol(reply->getProtocol());
- if (!protocol || protocol->requireSequencing() || !_net->allowDispatchForEncode()) {
- doHandleReply(protocol, std::move(reply));
- } else {
- auto rejected = _net->getExecutor().execute(makeLambdaTask([this, protocol, reply = std::move(reply)]() mutable {
+ if (protocol && _net->allowDispatchForEncode()) {
+ auto rejected = _net->getEncodeExecutor(protocol->requireSequencing()).execute(makeLambdaTask([this, protocol, reply = std::move(reply)]() mutable {
doHandleReply(protocol, std::move(reply));
}));
assert (!rejected);
+ } else {
+ doHandleReply(protocol, std::move(reply));
}
}
@@ -266,13 +273,13 @@ RPCSend::invoke(FRT_RPCRequest *req)
vespalib::string(params->getProtocol()).c_str(), _serverIdent.c_str())));
return;
}
- if (protocol->requireSequencing() || !_net->allowDispatchForDecode()) {
- doRequest(req, protocol, std::move(params));
- } else {
- auto rejected = _net->getExecutor().execute(makeLambdaTask([this, req, protocol, params = std::move(params)]() mutable {
+ if (_net->allowDispatchForDecode()) {
+ auto rejected = _net->getDecodeExecutor(protocol->requireSequencing()).execute(makeLambdaTask([this, req, protocol, params = std::move(params)]() mutable {
doRequest(req, protocol, std::move(params));
}));
assert (!rejected);
+ } else {
+ doRequest(req, protocol, std::move(params));
}
}
diff --git a/messagebus/src/vespa/messagebus/network/rpcservice.cpp b/messagebus/src/vespa/messagebus/network/rpcservice.cpp
index fd1b84f545f..ecf40973187 100644
--- a/messagebus/src/vespa/messagebus/network/rpcservice.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcservice.cpp
@@ -6,37 +6,37 @@
namespace mbus {
RPCService::RPCService(const Mirror &mirror, const string &pattern) :
- _mirror(mirror),
- _pattern(pattern),
- _addressIdx(random()),
- _addressGen(0),
- _addressList()
-{ }
-
-RPCService::~RPCService() = default;
-
-RPCServiceAddress::UP
-RPCService::resolve()
+ _serviceName(),
+ _connectionSpec()
{
- if (_pattern.find("tcp/") == 0) {
- size_t pos = _pattern.find_last_of('/');
- if (pos != string::npos && pos < _pattern.size() - 1) {
- auto ret = std::make_unique<RPCServiceAddress>(_pattern, _pattern.substr(0, pos));
- if (!ret->isMalformed()) {
- return ret;
+ if (pattern.find("tcp/") == 0) {
+ size_t pos = pattern.find_last_of('/');
+ if (pos != string::npos && pos < pattern.size() - 1) {
+ RPCServiceAddress test(pattern, pattern.substr(0, pos));
+ if ( ! test.isMalformed()) {
+ _serviceName = pattern;
+ _connectionSpec = pattern.substr(0, pos);
}
}
} else {
- if (_addressGen != _mirror.updates()) {
- _addressGen = _mirror.updates();
- _addressList = _mirror.lookup(_pattern);
- }
- if (!_addressList.empty()) {
- _addressIdx = (_addressIdx + 1) % _addressList.size();
- const AddressList::value_type &entry = _addressList[_addressIdx];
- return std::make_unique<RPCServiceAddress>(entry.first, entry.second);
+ Mirror::SpecList addressList = mirror.lookup(pattern);
+ if (!addressList.empty()) {
+ assert(addressList.size() == 1); //TODO URGENT remove assert after a few factory runs.
+ const auto &entry = addressList.front();
+ _serviceName = entry.first;
+ _connectionSpec = entry.second;
}
}
+}
+
+RPCService::~RPCService() = default;
+
+RPCServiceAddress::UP
+RPCService::make_address()
+{
+ if ( !_serviceName.empty()) {
+ return std::make_unique<RPCServiceAddress>(_serviceName, _connectionSpec);
+ }
return RPCServiceAddress::UP();
}
diff --git a/messagebus/src/vespa/messagebus/network/rpcservice.h b/messagebus/src/vespa/messagebus/network/rpcservice.h
index 18c847b0298..13792163693 100644
--- a/messagebus/src/vespa/messagebus/network/rpcservice.h
+++ b/messagebus/src/vespa/messagebus/network/rpcservice.h
@@ -16,13 +16,9 @@ class RPCNetwork;
class RPCService {
private:
typedef slobrok::api::IMirrorAPI Mirror;
- typedef Mirror::SpecList AddressList;
- const Mirror &_mirror;
- string _pattern;
- uint32_t _addressIdx;
- uint32_t _addressGen;
- AddressList _addressList;
+ string _serviceName;
+ string _connectionSpec;
public:
using UP = std::unique_ptr<RPCService>;
@@ -44,15 +40,9 @@ public:
*
* @return A concrete service address.
*/
- RPCServiceAddress::UP resolve();
+ RPCServiceAddress::UP make_address();
- /**
- * Returns the pattern used when querying for the naming server for
- * addresses. This is given at construtor time.
- *
- * @return The service pattern.
- */
- const string &getPattern() const { return _pattern; }
+ bool isValid() const { return ! _connectionSpec.empty(); }
};
} // namespace mbus
diff --git a/messagebus/src/vespa/messagebus/network/rpcserviceaddress.h b/messagebus/src/vespa/messagebus/network/rpcserviceaddress.h
index 99a9f383e75..77d64517c40 100644
--- a/messagebus/src/vespa/messagebus/network/rpcserviceaddress.h
+++ b/messagebus/src/vespa/messagebus/network/rpcserviceaddress.h
@@ -33,7 +33,7 @@ public:
* @param connectionSpec The connection specification.
*/
RPCServiceAddress(const string &serviceName, const string &connectionSpec);
- ~RPCServiceAddress();
+ ~RPCServiceAddress() override;
/**
* Returns whether or not this service address is malformed.
diff --git a/messagebus/src/vespa/messagebus/network/rpcservicepool.cpp b/messagebus/src/vespa/messagebus/network/rpcservicepool.cpp
index fb40ccff62b..358698570d2 100644
--- a/messagebus/src/vespa/messagebus/network/rpcservicepool.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcservicepool.cpp
@@ -6,11 +6,14 @@
namespace mbus {
-RPCServicePool::RPCServicePool(RPCNetwork &net, uint32_t maxSize) :
- _net(net),
- _lru(maxSize)
+RPCServicePool::RPCServicePool(const slobrok::api::IMirrorAPI & mirror, uint32_t maxSize) :
+ _mirror(mirror),
+ _lock(),
+ _lru(std::make_unique<ServiceCache>(maxSize)),
+ _updateGen(0),
+ _maxSize(maxSize)
{
- _lru.reserve(maxSize);
+ _lru->reserve(maxSize);
assert(maxSize > 0);
}
@@ -19,27 +22,52 @@ RPCServicePool::~RPCServicePool() = default;
RPCServiceAddress::UP
RPCServicePool::resolve(const string &pattern)
{
- std::unique_ptr<RPCService> * found = _lru.findAndRef(pattern);
- if (found) {
- return (*found)->resolve();
+ std::shared_ptr<RPCService> service;
+ {
+ LockGuard guard(_lock);
+ handleMirrorUpdates(guard);
+ std::shared_ptr<RPCService> *found = _lru->findAndRef(pattern);
+ if (found) {
+ service = *found;
+ }
+ }
+
+ if (service) {
+ return service->make_address();
} else {
- auto service = std::make_unique<RPCService>(_net.getMirror(), pattern);
- auto result = service->resolve();
- _lru[pattern] = std::move(service);
+ service = std::make_shared<RPCService>(_mirror, pattern);
+ auto result = service->make_address();
+ if (service->isValid()) {
+ LockGuard guard(_lock);
+ (*_lru)[pattern] = std::move(service);
+ }
return result;
}
+
+}
+
+void
+RPCServicePool::handleMirrorUpdates(const LockGuard &) {
+ uint32_t currentgen = _mirror.updates();
+ if (_updateGen != currentgen) {
+ auto lru = std::make_unique<ServiceCache>(_maxSize);
+ _lru.swap(lru);
+ _updateGen = currentgen;
+ }
}
uint32_t
RPCServicePool::getSize() const
{
- return _lru.size();
+ LockGuard guard(_lock);
+ return _lru->size();
}
bool
RPCServicePool::hasService(const string &pattern) const
{
- return _lru.hasKey(pattern);
+ LockGuard guard(_lock);
+ return _lru->hasKey(pattern);
}
} // namespace mbus
diff --git a/messagebus/src/vespa/messagebus/network/rpcservicepool.h b/messagebus/src/vespa/messagebus/network/rpcservicepool.h
index 2614363838c..212c975a38c 100644
--- a/messagebus/src/vespa/messagebus/network/rpcservicepool.h
+++ b/messagebus/src/vespa/messagebus/network/rpcservicepool.h
@@ -13,12 +13,6 @@ class RPCNetwork;
* the rpc network.
*/
class RPCServicePool {
-private:
- typedef vespalib::lrucache_map< vespalib::LruParam<string, RPCService::UP> > ServiceCache;
-
- RPCNetwork &_net;
- ServiceCache _lru;
-
public:
RPCServicePool(const RPCServicePool &) = delete;
RPCServicePool & operator = (const RPCServicePool &) = delete;
@@ -28,7 +22,7 @@ public:
* @param net The underlying RPC network.
* @param maxSize The max number of services to cache.
*/
- RPCServicePool(RPCNetwork &net, uint32_t maxSize);
+ RPCServicePool(const slobrok::api::IMirrorAPI & mirror, uint32_t maxSize);
/**
* Destructor. Frees any allocated resources.
@@ -61,6 +55,17 @@ public:
* @return True if a corresponding service is in the pool.
*/
bool hasService(const string &pattern) const;
+private:
+ using ServiceCache = vespalib::lrucache_map< vespalib::LruParam<string, std::shared_ptr<RPCService> >>;
+ using LockGuard = std::lock_guard<std::mutex>;
+
+ void handleMirrorUpdates(const LockGuard & guard);
+
+ const slobrok::api::IMirrorAPI & _mirror;
+ mutable std::mutex _lock;
+ std::unique_ptr<ServiceCache> _lru;
+ uint32_t _updateGen;
+ uint32_t _maxSize;
};
} // namespace mbus
diff --git a/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp b/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp
index cc09e44c460..b42ac47e54d 100644
--- a/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp
@@ -32,7 +32,7 @@ void
RPCTargetPool::flushTargets(bool force)
{
uint64_t currentTime = _timer->getMilliTime();
- vespalib::LockGuard guard(_lock);
+ LockGuard guard(_lock);
TargetMap::iterator it = _targets.begin();
while (it != _targets.end()) {
Entry &entry = it->second;
@@ -56,7 +56,7 @@ RPCTargetPool::flushTargets(bool force)
size_t
RPCTargetPool::size()
{
- vespalib::LockGuard guard(_lock);
+ LockGuard guard(_lock);
return _targets.size();
}
@@ -65,7 +65,7 @@ RPCTargetPool::getTarget(FRT_Supervisor &orb, const RPCServiceAddress &address)
{
const string & spec = address.getConnectionSpec();
uint64_t currentTime = _timer->getMilliTime();
- vespalib::LockGuard guard(_lock);
+ LockGuard guard(_lock);
auto it = _targets.find(spec);
if (it != _targets.end()) {
Entry &entry = it->second;
diff --git a/messagebus/src/vespa/messagebus/network/rpctargetpool.h b/messagebus/src/vespa/messagebus/network/rpctargetpool.h
index 5f858f66993..d47fd977356 100644
--- a/messagebus/src/vespa/messagebus/network/rpctargetpool.h
+++ b/messagebus/src/vespa/messagebus/network/rpctargetpool.h
@@ -28,9 +28,10 @@ private:
Entry(RPCTarget::SP target, uint64_t lastUse);
};
- typedef std::map<string, Entry> TargetMap;
+ using TargetMap = std::map<string, Entry>;
+ using LockGuard = std::lock_guard<std::mutex>;
- vespalib::Lock _lock;
+ std::mutex _lock;
TargetMap _targets;
ITimer::UP _timer;
uint64_t _expireMillis;
diff --git a/messagebus_test/src/tests/error/cpp-client.cpp b/messagebus_test/src/tests/error/cpp-client.cpp
index abc7967bfe5..6296253d3df 100644
--- a/messagebus_test/src/tests/error/cpp-client.cpp
+++ b/messagebus_test/src/tests/error/cpp-client.cpp
@@ -7,7 +7,6 @@
#include <vespa/messagebus/rpcmessagebus.h>
#include <vespa/messagebus/network/rpcnetworkparams.h>
#include <vespa/messagebus/testlib/receptor.h>
-#include <vespa/vespalib/util/time.h>
#include <thread>
#include <vespa/fastos/app.h>
@@ -34,11 +33,11 @@ App::Main()
SourceSession::UP ss = mb.getMessageBus().createSourceSession(src, SourceSessionParams().setTimeout(300s));
for (int i = 0; i < 10; ++i) {
- msg.reset(new SimpleMessage("test"));
+ msg = std::make_unique<SimpleMessage>("test");
msg->getTrace().setLevel(9);
ss->send(std::move(msg), "test");
reply = src.getReply(600s); // 10 minutes timeout
- if (reply.get() == 0) {
+ if ( ! reply) {
fprintf(stderr, "CPP-CLIENT: no reply\n");
} else {
fprintf(stderr, "CPP-CLIENT:\n%s\n",
@@ -49,7 +48,7 @@ App::Main()
}
std::this_thread::sleep_for(1s);
}
- if (reply.get() == 0) {
+ if ( ! reply) {
fprintf(stderr, "CPP-CLIENT: no reply\n");
return 1;
}
diff --git a/messagebus_test/src/tests/error/error.cpp b/messagebus_test/src/tests/error/error.cpp
index e5749db452b..87eb391fc86 100644
--- a/messagebus_test/src/tests/error/error.cpp
+++ b/messagebus_test/src/tests/error/error.cpp
@@ -1,48 +1,47 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <vespa/log/log.h>
-LOG_SETUP("error_test");
+
#include <vespa/vespalib/testkit/testapp.h>
#include <vespa/vespalib/testkit/test_kit.h>
#include <vespa/messagebus/testlib/slobrok.h>
#include <vespa/vespalib/util/stringfmt.h>
+#include <vespa/log/log.h>
+LOG_SETUP("error_test");
+
using namespace mbus;
using vespalib::make_string;
-TEST_SETUP(Test);
-int
-Test::Main()
-{
- TEST_INIT("error_test");
+TEST("error_test") {
Slobrok slobrok;
const std::string routing_template = TEST_PATH("routing-template.cfg");
const std::string ctl_script = TEST_PATH("ctl.sh");
{ // Make slobrok config
- EXPECT_TRUE(system("echo slobrok[1] > slobrok.cfg") == 0);
- EXPECT_TRUE(system(make_string("echo 'slobrok[0].connectionspec tcp/localhost:%d' "
- ">> slobrok.cfg", slobrok.port()).c_str()) == 0);
+ EXPECT_EQUAL(0, system("echo slobrok[1] > slobrok.cfg"));
+ EXPECT_EQUAL(0, system(make_string("echo 'slobrok[0].connectionspec tcp/localhost:%d' "
+ ">> slobrok.cfg", slobrok.port()).c_str()));
}
{ // CPP SERVER
{ // Make routing config
- EXPECT_TRUE(system(("cat " + routing_template + " | sed 's#session#cpp/session#' > routing.cfg").c_str()) == 0);
+ EXPECT_EQUAL(0, system(("cat " + routing_template + " | sed 's#session#cpp/session#' > routing.cfg").c_str()));
}
fprintf(stderr, "STARTING CPP-SERVER\n");
- EXPECT_TRUE(system((ctl_script + " start server cpp").c_str()) == 0);
- EXPECT_TRUE(system("./messagebus_test_cpp-client-error_app") == 0);
- EXPECT_TRUE(system("../../binref/runjava JavaClient") == 0);
- EXPECT_TRUE(system((ctl_script + " stop server cpp").c_str()) == 0);
+ EXPECT_EQUAL(0, system((ctl_script + " start server cpp").c_str()));
+ EXPECT_EQUAL(0, system("./messagebus_test_cpp-client-error_app"));
+ EXPECT_EQUAL(0, system("../../binref/runjava JavaClient"));
+ EXPECT_EQUAL(0, system((ctl_script + " stop server cpp").c_str()));
}
{ // JAVA SERVER
{ // Make routing config
- EXPECT_TRUE(system(("cat " + routing_template + " | sed 's#session#java/session#' > routing.cfg").c_str()) == 0);
+ EXPECT_EQUAL(0, system(("cat " + routing_template + " | sed 's#session#java/session#' > routing.cfg").c_str()));
}
fprintf(stderr, "STARTING JAVA-SERVER\n");
- EXPECT_TRUE(system((ctl_script + " start server java").c_str()) == 0);
- EXPECT_TRUE(system("./messagebus_test_cpp-client-error_app") == 0);
- EXPECT_TRUE(system("../../binref/runjava JavaClient") == 0);
- EXPECT_TRUE(system((ctl_script + " stop server java").c_str()) == 0);
+ EXPECT_EQUAL(0, system((ctl_script + " start server java").c_str()));
+ EXPECT_EQUAL(0, system("./messagebus_test_cpp-client-error_app"));
+ EXPECT_EQUAL(0, system("../../binref/runjava JavaClient"));
+ EXPECT_EQUAL(0, system((ctl_script + " stop server java").c_str()));
}
- TEST_DONE();
}
+
+TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/yum/YumPackageName.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/yum/YumPackageName.java
index 54c8719bceb..3f5c3025850 100644
--- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/yum/YumPackageName.java
+++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/yum/YumPackageName.java
@@ -72,6 +72,15 @@ public class YumPackageName {
architecture = packageName.architecture;
}
+ /**
+ * Set the epoch of the YUM package.
+ *
+ * <p>WARNING: Should only be invoked if the YUM package actually has an epoch. Typically
+ * YUM packages doesn't have one explicitly set, and in case "0" will be used with
+ * {@link #toVersionLockName()} (otherwise it fails), but it will be absent from an
+ * install with {@link #toName()} (otherwise it fails). This typically means that
+ * you should set this only if the epoch is != "0".</p>
+ */
public Builder setEpoch(String epoch) { this.epoch = Optional.of(epoch); return this; }
public Builder setName(String name) { this.name = name; return this; }
public Builder setVersion(String version) { this.version = Optional.of(version); return this; }
@@ -235,7 +244,7 @@ public class YumPackageName {
*/
public String toVersionLockName() {
return String.format("%s:%s-%s-%s.%s",
- epoch.orElseThrow(() -> new IllegalStateException("Epoch is missing for YUM package " + name)),
+ epoch.orElse("0"),
name,
version.orElseThrow(() -> new IllegalStateException("Version is missing for YUM package " + name)),
release.orElseThrow(() -> new IllegalStateException("Release is missing for YUM package " + name)),
diff --git a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/task/util/yum/YumPackageNameTest.java b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/task/util/yum/YumPackageNameTest.java
index 01664f5c22b..64e2997d486 100644
--- a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/task/util/yum/YumPackageNameTest.java
+++ b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/task/util/yum/YumPackageNameTest.java
@@ -56,7 +56,7 @@ public class YumPackageNameTest {
"1.el7",
null,
"docker-engine-selinux-1.12.6-1.el7",
- null);
+ "0:docker-engine-selinux-1.12.6-1.el7.*");
// name-ver-rel.arch
verifyPackageName("docker-engine-selinux-1.12.6-1.el7.x86_64",
@@ -66,7 +66,7 @@ public class YumPackageNameTest {
"1.el7",
"x86_64",
"docker-engine-selinux-1.12.6-1.el7.x86_64",
- null);
+ "0:docker-engine-selinux-1.12.6-1.el7.*");
// name-epoch:ver-rel.arch
verifyPackageName(
@@ -112,7 +112,7 @@ public class YumPackageNameTest {
yumPackageName.toVersionLockName();
fail();
} catch (IllegalStateException e) {
- assertThat(e.getMessage(), containsStringIgnoringCase("epoch is missing"));
+ assertThat(e.getMessage(), containsStringIgnoringCase("Version is missing "));
}
} else {
assertEquals(toVersionName, yumPackageName.toVersionLockName());
diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/MetricsReporter.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/MetricsReporter.java
index e107abf8fbb..098d706bf05 100644
--- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/MetricsReporter.java
+++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/MetricsReporter.java
@@ -29,6 +29,7 @@ import java.util.function.Supplier;
import java.util.stream.Collectors;
import static com.yahoo.config.provision.NodeResources.DiskSpeed.any;
+import static com.yahoo.vespa.hosted.provision.Node.State.active;
/**
* @author oyving
@@ -249,16 +250,16 @@ public class MetricsReporter extends Maintainer {
}
private static NodeResources getCapacityTotal(NodeList nodes) {
- return nodes.nodeType(NodeType.host).asList().stream()
+ return nodes.nodeType(NodeType.host).state(active).asList().stream()
.map(host -> host.flavor().resources())
- .map(resources -> resources.justNumbers())
+ .map(NodeResources::justNumbers)
.reduce(new NodeResources(0, 0, 0, 0, any), NodeResources::add);
}
private static NodeResources getFreeCapacityTotal(NodeList nodes) {
- return nodes.nodeType(NodeType.host).asList().stream()
+ return nodes.nodeType(NodeType.host).state(active).asList().stream()
.map(n -> freeCapacityOf(nodes, n))
- .map(resources -> resources.justNumbers())
+ .map(NodeResources::justNumbers)
.reduce(new NodeResources(0, 0, 0, 0, any), NodeResources::add);
}
diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/CapacityPolicies.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/CapacityPolicies.java
index f010e6905e1..648bf52f455 100644
--- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/CapacityPolicies.java
+++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/CapacityPolicies.java
@@ -67,7 +67,6 @@ public class CapacityPolicies {
private void ensureSufficientResources(NodeResources resources, ClusterSpec cluster) {
double minMemoryGb = nodeResourceLimits.minMemoryGb(cluster.type());
if (resources.memoryGb() >= minMemoryGb) return;
-
throw new IllegalArgumentException(String.format(Locale.ENGLISH,
"Must specify at least %.2f Gb of memory for %s cluster '%s', was: %.2f Gb",
minMemoryGb, cluster.type().name(), cluster.id().value(), resources.memoryGb()));
diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/MetricsReporterTest.java b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/MetricsReporterTest.java
index aa262bdf751..ca7c33f96bd 100644
--- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/MetricsReporterTest.java
+++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/MetricsReporterTest.java
@@ -11,6 +11,7 @@ import com.yahoo.config.provision.NodeType;
import com.yahoo.config.provision.Zone;
import com.yahoo.jdisc.Metric;
import com.yahoo.test.ManualClock;
+import com.yahoo.transaction.NestedTransaction;
import com.yahoo.vespa.applicationmodel.ApplicationInstance;
import com.yahoo.vespa.applicationmodel.ApplicationInstanceReference;
import com.yahoo.vespa.curator.Curator;
@@ -163,6 +164,10 @@ public class MetricsReporterTest {
container2 = container2.with(allocation(Optional.of("app2"), container2).get());
nodeRepository.addDockerNodes(new LockedNodeList(List.of(container2), nodeRepository.lockUnallocated()));
+ NestedTransaction transaction = new NestedTransaction();
+ nodeRepository.activate(nodeRepository.getNodes(NodeType.host), transaction);
+ transaction.commit();
+
Orchestrator orchestrator = mock(Orchestrator.class);
when(orchestrator.getHostInfo(eq(reference), any())).thenReturn(HostInfo.createNoRemarks());
diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/ProvisioningTest.java b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/ProvisioningTest.java
index 76258e86de9..bd8be5063fd 100644
--- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/ProvisioningTest.java
+++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/ProvisioningTest.java
@@ -403,6 +403,21 @@ public class ProvisioningTest {
prepare(application, 1, 2, 3, 3, defaultResources, tester);
}
+ @Test
+ public void below_resource_limit() {
+ ProvisioningTester tester = new ProvisioningTester.Builder().zone(new Zone(Environment.prod, RegionName.from("us-east"))).build();
+
+ ApplicationId application = tester.makeApplicationId();
+ tester.makeReadyNodes(10, defaultResources);
+ try {
+ prepare(application, 2, 2, 3, 3,
+ new NodeResources(2, 2, 10, 2), tester);
+ }
+ catch (IllegalArgumentException e) {
+ assertEquals("Must specify at least 4.00 Gb of memory for container cluster 'container0', was: 2.00 Gb", e.getMessage());
+ }
+ }
+
/** Dev always uses the zone default flavor */
@Test
public void dev_deployment_flavor() {
diff --git a/searchlib/src/vespa/searchlib/tensor/hnsw_graph.cpp b/searchlib/src/vespa/searchlib/tensor/hnsw_graph.cpp
index db8d1067980..6f902a30861 100644
--- a/searchlib/src/vespa/searchlib/tensor/hnsw_graph.cpp
+++ b/searchlib/src/vespa/searchlib/tensor/hnsw_graph.cpp
@@ -50,4 +50,23 @@ HnswGraph::set_link_array(uint32_t docid, uint32_t level, const LinkArrayRef& ne
links.remove(old_links_ref);
}
+std::vector<uint32_t>
+HnswGraph::level_histogram() const
+{
+ std::vector<uint32_t> result;
+ size_t num_nodes = node_refs.size();
+ for (size_t i = 0; i < num_nodes; ++i) {
+ uint32_t levels = 0;
+ auto node_ref = node_refs[i].load_acquire();
+ if (node_ref.valid()) {
+ levels = nodes.get(node_ref).size();
+ }
+ while (result.size() <= levels) {
+ result.push_back(0);
+ }
+ ++result[levels];
+ }
+ return result;
+}
+
} // namespace
diff --git a/searchlib/src/vespa/searchlib/tensor/hnsw_graph.h b/searchlib/src/vespa/searchlib/tensor/hnsw_graph.h
index 64892d06f09..233b9087af7 100644
--- a/searchlib/src/vespa/searchlib/tensor/hnsw_graph.h
+++ b/searchlib/src/vespa/searchlib/tensor/hnsw_graph.h
@@ -69,6 +69,8 @@ struct HnswGraph {
}
size_t size() const { return node_refs.size(); }
+
+ std::vector<uint32_t> level_histogram() const;
};
}
diff --git a/searchlib/src/vespa/searchlib/tensor/hnsw_index.cpp b/searchlib/src/vespa/searchlib/tensor/hnsw_index.cpp
index de6daba650c..b08d862ae6d 100644
--- a/searchlib/src/vespa/searchlib/tensor/hnsw_index.cpp
+++ b/searchlib/src/vespa/searchlib/tensor/hnsw_index.cpp
@@ -381,6 +381,20 @@ HnswIndex::get_state(const vespalib::slime::Inserter& inserter) const
{
auto& object = inserter.insertObject();
StateExplorerUtils::memory_usage_to_slime(memory_usage(), object.setObject("memory_usage"));
+ object.setLong("nodes", _graph.size());
+ auto& histogram_array = object.setArray("level_histogram");
+ auto level_histogram = _graph.level_histogram();
+ for (uint32_t hist_val : level_histogram) {
+ histogram_array.addLong(hist_val);
+ }
+ uint32_t reachable = count_reachable_nodes();
+ uint32_t unreachable = _graph.size() - reachable;
+ if (level_histogram.size() > 0) {
+ unreachable -= level_histogram[0];
+ }
+ object.setLong("unreachable_nodes", unreachable);
+ object.setLong("entry_docid", _graph.entry_docid);
+ object.setLong("entry_level", _graph.entry_level);
}
std::unique_ptr<NearestNeighborIndexSaver>
@@ -498,4 +512,31 @@ HnswIndex::check_link_symmetry() const
return all_sym;
}
+uint32_t
+HnswIndex::count_reachable_nodes() const
+{
+ int search_level = get_entry_level();
+ if (search_level < 0) {
+ return 0;
+ }
+ auto visited = _visited_set_pool.get(_graph.size());
+ uint32_t entry_id = get_entry_docid();
+ LinkArray found_links;
+ found_links.push_back(entry_id);
+ visited.mark(entry_id);
+ while (search_level >= 0) {
+ for (uint32_t idx = 0; idx < found_links.size(); ++idx) {
+ uint32_t docid = found_links[idx];
+ auto neighbors = _graph.get_link_array(docid, search_level);
+ for (uint32_t neighbor : neighbors) {
+ if (visited.is_marked(neighbor)) continue;
+ visited.mark(neighbor);
+ found_links.push_back(neighbor);
+ }
+ }
+ --search_level;
+ }
+ return found_links.size();
+}
+
} // namespace
diff --git a/searchlib/src/vespa/searchlib/tensor/hnsw_index.h b/searchlib/src/vespa/searchlib/tensor/hnsw_index.h
index b5d57c2ebfd..95001853710 100644
--- a/searchlib/src/vespa/searchlib/tensor/hnsw_index.h
+++ b/searchlib/src/vespa/searchlib/tensor/hnsw_index.h
@@ -147,6 +147,7 @@ public:
HnswNode get_node(uint32_t docid) const;
void set_node(uint32_t docid, const HnswNode &node);
bool check_link_symmetry() const;
+ uint32_t count_reachable_nodes() const;
static search::datastore::ArrayStoreConfig make_default_node_store_config();
static search::datastore::ArrayStoreConfig make_default_link_store_config();
diff --git a/searchlib/src/vespa/searchlib/tensor/hnsw_index_loader.cpp b/searchlib/src/vespa/searchlib/tensor/hnsw_index_loader.cpp
index 75c62e5202f..f02ead86a8d 100644
--- a/searchlib/src/vespa/searchlib/tensor/hnsw_index_loader.cpp
+++ b/searchlib/src/vespa/searchlib/tensor/hnsw_index_loader.cpp
@@ -19,19 +19,19 @@ HnswIndexLoader::load(const fileutil::LoadedBuffer& buf)
size_t num_readable = buf.size(sizeof(uint32_t));
_ptr = static_cast<const uint32_t *>(buf.buffer());
_end = _ptr + num_readable;
- uint32_t entry_docid = nextVal();
- int32_t entry_level = nextVal();
- uint32_t num_nodes = nextVal();
+ uint32_t entry_docid = next_int();
+ int32_t entry_level = next_int();
+ uint32_t num_nodes = next_int();
std::vector<uint32_t> link_array;
for (uint32_t docid = 0; docid < num_nodes; ++docid) {
- uint32_t num_levels = nextVal();
+ uint32_t num_levels = next_int();
if (num_levels > 0) {
_graph.make_node_for_document(docid, num_levels);
for (uint32_t level = 0; level < num_levels; ++level) {
- uint32_t num_links = nextVal();
+ uint32_t num_links = next_int();
link_array.clear();
while (num_links-- > 0) {
- link_array.push_back(nextVal());
+ link_array.push_back(next_int());
}
_graph.set_link_array(docid, level, link_array);
}
diff --git a/searchlib/src/vespa/searchlib/tensor/hnsw_index_loader.h b/searchlib/src/vespa/searchlib/tensor/hnsw_index_loader.h
index abc68889a1b..174f66b95ec 100644
--- a/searchlib/src/vespa/searchlib/tensor/hnsw_index_loader.h
+++ b/searchlib/src/vespa/searchlib/tensor/hnsw_index_loader.h
@@ -23,7 +23,7 @@ private:
const uint32_t *_ptr;
const uint32_t *_end;
bool _failed;
- uint32_t nextVal() {
+ uint32_t next_int() {
if (__builtin_expect((_ptr == _end), false)) {
_failed = true;
return 0;
diff --git a/slobrok/src/tests/configure/configure.cpp b/slobrok/src/tests/configure/configure.cpp
index 579e468db45..aa9826045ef 100644
--- a/slobrok/src/tests/configure/configure.cpp
+++ b/slobrok/src/tests/configure/configure.cpp
@@ -23,9 +23,6 @@ using slobrok::ConfigShim;
using slobrok::SlobrokServer;
using slobrok::ConfiguratorFactory;
-TEST_SETUP(Test);
-
-
std::string
createSpec(int port)
{
@@ -93,10 +90,7 @@ compare(MirrorAPI &api, const char *pattern, SpecList expect)
return false;
}
-int
-Test::Main()
-{
- TEST_INIT("configure_test");
+TEST("configure_test") {
fnet::frt::StandaloneFRT orb1;
fnet::frt::StandaloneFRT orb2;
@@ -214,10 +208,10 @@ Test::Main()
serverOne.stop();
serverTwo.stop();
- TEST_DONE();
-
orb4.supervisor().GetTransport()->ShutDown(true);
orb3.supervisor().GetTransport()->ShutDown(true);
orb2.supervisor().GetTransport()->ShutDown(true);
orb1.supervisor().GetTransport()->ShutDown(true);
}
+
+TEST_MAIN() { TEST_RUN_ALL(); } \ No newline at end of file
diff --git a/storage/src/tests/storageserver/communicationmanagertest.cpp b/storage/src/tests/storageserver/communicationmanagertest.cpp
index 6657a9f1600..8444319b395 100644
--- a/storage/src/tests/storageserver/communicationmanagertest.cpp
+++ b/storage/src/tests/storageserver/communicationmanagertest.cpp
@@ -158,6 +158,7 @@ TEST_F(CommunicationManagerTest, commands_are_dequeued_in_fifo_order) {
storConfig.getConfigId());
DummyStorageLink *storageLink = new DummyStorageLink();
storage.push_back(std::unique_ptr<StorageLink>(storageLink));
+ storage.open();
// Message dequeing does not start before we invoke `open` on the storage
// link chain, so we enqueue messages in randomized priority order before
@@ -168,7 +169,6 @@ TEST_F(CommunicationManagerTest, commands_are_dequeued_in_fifo_order) {
for (auto pri : pris) {
storage.enqueue(createDummyCommand(pri));
}
- storage.open();
storageLink->waitForMessages(pris.size(), MESSAGE_WAIT_TIME_SEC);
for (size_t i = 0; i < pris.size(); ++i) {
@@ -191,12 +191,12 @@ TEST_F(CommunicationManagerTest, replies_are_dequeued_in_fifo_order) {
storConfig.getConfigId());
DummyStorageLink *storageLink = new DummyStorageLink();
storage.push_back(std::unique_ptr<StorageLink>(storageLink));
+ storage.open();
std::vector<api::StorageMessage::Priority> pris{200, 0, 255, 128};
for (auto pri : pris) {
storage.enqueue(createDummyCommand(pri)->makeReply());
}
- storage.open();
storageLink->waitForMessages(pris.size(), MESSAGE_WAIT_TIME_SEC);
// Want FIFO order for replies, not priority-sorted order.
diff --git a/storage/src/vespa/storage/common/storagelink.cpp b/storage/src/vespa/storage/common/storagelink.cpp
index 431c90b27f2..a593cc913a8 100644
--- a/storage/src/vespa/storage/common/storagelink.cpp
+++ b/storage/src/vespa/storage/common/storagelink.cpp
@@ -123,9 +123,9 @@ void StorageLink::sendDown(const StorageMessage::SP& msg)
default:
LOG(error, "Link %s trying to send %s down while in state %s",
toString().c_str(), msg->toString().c_str(), stateToString(getState()));
- assert(false);
+ return;
}
- assert(msg.get());
+ assert(msg);
LOG(spam, "Storage Link %s to handle %s", toString().c_str(), msg->toString().c_str());
if (isBottom()) {
LOG(spam, "Storage link %s at bottom of chain got message %s.", toString().c_str(), msg->toString().c_str());
@@ -165,9 +165,9 @@ void StorageLink::sendUp(const shared_ptr<StorageMessage> & msg)
default:
LOG(error, "Link %s trying to send %s up while in state %s",
toString().c_str(), msg->toString(true).c_str(), stateToString(getState()));
- assert(false);
+ return;
}
- assert(msg.get());
+ assert(msg);
if (isTop()) {
ostringstream ost;
ost << "Unhandled message at top of chain " << *msg << ".";
diff --git a/storage/src/vespa/storage/config/stor-communicationmanager.def b/storage/src/vespa/storage/config/stor-communicationmanager.def
index 8f5b22aa7fa..4536ea97855 100644
--- a/storage/src/vespa/storage/config/stor-communicationmanager.def
+++ b/storage/src/vespa/storage/config/stor-communicationmanager.def
@@ -29,9 +29,9 @@ mbus.compress.type enum {NONE, LZ4, ZSTD} default=LZ4
## TTL for rpc target cache
mbus.rpctargetcache.ttl double default = 600
-## Number of threads for mbus threadpool
+## Number of threads for network.
## Any value below 1 will be 1.
-mbus.num_threads int default=4
+mbus.num_threads int default=1
mbus.optimize_for enum {LATENCY, THROUGHPUT} default = LATENCY
@@ -42,4 +42,4 @@ mbus.dispatch_on_encode bool default=true
## Enable to use above thread pool for decoding replies
## False will use network(fnet) thread
## Todo: Change default once verified in large scale deployment.
-mbus.dispatch_on_decode bool default=false
+mbus.dispatch_on_decode bool default=true
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
index fa2b0cda018..aff2b0f624f 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
@@ -14,17 +14,16 @@
#include <vespa/storageapi/message/state.h>
#include <vespa/storageframework/generic/clock/timer.h>
#include <vespa/vespalib/stllike/asciistream.h>
-#include <vespa/vespalib/stllike/hash_map.hpp>
#include <vespa/vespalib/util/stringfmt.h>
-
-#include <vespa/log/bufferedlogger.h>
#include <vespa/document/bucket/fixed_bucket_spaces.h>
-#include <vespa/documentapi/messagebus/messages/getdocumentreply.h>
+#include <vespa/vespalib/stllike/hash_map.hpp>
+#include <vespa/log/bufferedlogger.h>
LOG_SETUP(".communication.manager");
using vespalib::make_string;
using document::FixedBucketSpaces;
+using CommunicationManagerConfig = vespa::config::content::core::StorCommunicationmanagerConfig;
namespace storage {
@@ -281,6 +280,17 @@ struct PlaceHolderBucketResolver : public BucketResolver {
}
};
+mbus::RPCNetworkParams::OptimizeFor
+convert(CommunicationManagerConfig::Mbus::OptimizeFor optimizeFor) {
+ switch (optimizeFor) {
+ case CommunicationManagerConfig::Mbus::OptimizeFor::LATENCY:
+ return mbus::RPCNetworkParams::OptimizeFor::LATENCY;
+ case CommunicationManagerConfig::Mbus::OptimizeFor::THROUGHPUT:
+ default:
+ return mbus::RPCNetworkParams::OptimizeFor::THROUGHPUT;
+ }
+}
+
}
CommunicationManager::CommunicationManager(StorageComponentRegister& compReg, const config::ConfigUri & configUri)
@@ -290,7 +300,6 @@ CommunicationManager::CommunicationManager(StorageComponentRegister& compReg, co
_listener(),
_eventQueue(),
_mbus(),
- _count(0),
_configUri(configUri),
_closed(false),
_docApiConverter(configUri, std::make_shared<PlaceHolderBucketResolver>())
@@ -415,7 +424,7 @@ void CommunicationManager::configure(std::unique_ptr<CommunicationManagerConfig>
params.setNumThreads(std::max(1, config->mbus.numThreads));
params.setDispatchOnDecode(config->mbus.dispatchOnDecode);
params.setDispatchOnEncode(config->mbus.dispatchOnEncode);
- params.setTcpNoDelay(config->mbus.optimizeFor == CommunicationManagerConfig::Mbus::OptimizeFor::LATENCY);
+ params.setOptimizeFor(convert(config->mbus.optimizeFor));
params.setIdentity(mbus::Identity(_component.getIdentity()));
if (config->mbusport != -1) {
@@ -480,8 +489,8 @@ void
CommunicationManager::enqueue(std::shared_ptr<api::StorageMessage> msg)
{
assert(msg);
- LOG(spam, "Enq storage message %s, priority %d", msg->toString().c_str(), msg->getPriority());
- _eventQueue.enqueue(std::move(msg));
+ LOG(spam, "Process storage message %s, priority %d", msg->toString().c_str(), msg->getPriority());
+ process(msg);
}
bool
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.h b/storage/src/vespa/storage/storageserver/communicationmanager.h
index c08ad214768..8983dbdf057 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.h
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.h
@@ -116,7 +116,7 @@ private:
void process(const std::shared_ptr<api::StorageMessage>& msg);
- using CommunicationManagerConfig= vespa::config::content::core::StorCommunicationmanagerConfig;
+ using CommunicationManagerConfig = vespa::config::content::core::StorCommunicationmanagerConfig;
using BucketspacesConfig = vespa::config::content::core::BucketspacesConfig;
void configureMessageBusLimits(const CommunicationManagerConfig& cfg);
@@ -133,7 +133,6 @@ private:
std::unique_ptr<mbus::RPCMessageBus> _mbus;
std::unique_ptr<mbus::DestinationSession> _messageBusSession;
std::unique_ptr<mbus::SourceSession> _sourceSession;
- uint32_t _count;
vespalib::Lock _messageBusSentLock;
std::map<api::StorageMessage::Id, std::shared_ptr<api::StorageCommand> > _messageBusSent;
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java
index 32cdbf9af5c..94176bbb658 100644
--- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java
+++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java
@@ -6,18 +6,21 @@ import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.yahoo.vespa.hadoop.mapreduce.util.TupleTools;
import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration;
-import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.pig.EvalFunc;
import org.apache.pig.PigWarning;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
+import org.apache.pig.tools.pigstats.PigStatusReporter;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.joda.time.DateTime;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.io.UncheckedIOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.*;
@@ -68,16 +71,45 @@ public class VespaDocumentOperation extends EvalFunc<String> {
private static final String SIMPLE_ARRAY_FIELDS = "simple-array-fields";
private static final String SIMPLE_OBJECT_FIELDS = "simple-object-fields";
private static final String CREATE_TENSOR_FIELDS = "create-tensor-fields";
+ private static final String REMOVE_TENSOR_FIELDS = "remove-tensor-fields";
+ private static final String UPDATE_TENSOR_FIELDS = "update-tensor-fields";
+ private static final String REMOVE_MAP_FIELDS = "remove-map-fields";
+ private static final String UPDATE_MAP_FIELDS = "update-map-fields";
private static final String EXCLUDE_FIELDS = "exclude-fields";
private static final String TESTSET_CONDITION = "condition";
-
private static final String PARTIAL_UPDATE_ASSIGN = "assign";
+ private static final String PARTIAL_UPDATE_ADD = "add";
+ private static final String PARTIAL_UPDATE_REMOVE = "remove";
+
+ private static Map<String, String> mapPartialOperationMap;
+
+ static {
+ mapPartialOperationMap = new HashMap<>();
+ mapPartialOperationMap.put(REMOVE_MAP_FIELDS, PARTIAL_UPDATE_REMOVE);
+ mapPartialOperationMap.put(UPDATE_MAP_FIELDS, PARTIAL_UPDATE_ASSIGN);
+ }
+
+ private static Map<String, String> partialOperationMap;
+
+ static {
+ partialOperationMap = new HashMap<>();
+ partialOperationMap.put(REMOVE_TENSOR_FIELDS, PARTIAL_UPDATE_REMOVE);
+ partialOperationMap.put(UPDATE_TENSOR_FIELDS, PARTIAL_UPDATE_ADD);
+ partialOperationMap.put(REMOVE_MAP_FIELDS, PARTIAL_UPDATE_REMOVE);
+ partialOperationMap.put(UPDATE_MAP_FIELDS, PARTIAL_UPDATE_ASSIGN);
+ }
private final String template;
private final Operation operation;
private final Properties properties;
+ private PigStatusReporter statusReporter;
public VespaDocumentOperation(String... params) {
+ statusReporter = PigStatusReporter.getInstance();
+ if (statusReporter != null) {
+ statusReporter.incrCounter("Vespa Document Operation Counters", "Document operation ok", 0);
+ statusReporter.incrCounter("Vespa Document Operation Counters", "Document operation failed", 0);
+ }
properties = VespaConfiguration.loadProperties(params);
template = properties.getProperty(PROPERTY_ID_TEMPLATE);
operation = Operation.fromString(properties.getProperty(PROPERTY_OPERATION, "put"));
@@ -86,14 +118,20 @@ public class VespaDocumentOperation extends EvalFunc<String> {
@Override
public String exec(Tuple tuple) throws IOException {
if (tuple == null || tuple.size() == 0) {
+ if (statusReporter != null) {
+ statusReporter.incrCounter("Vespa Document Operation Counters", "Document operation failed", 1);
+ }
return null;
}
if (template == null || template.length() == 0) {
- warn("No valid document id template found. Skipping.", PigWarning.UDF_WARNING_1);
+ if (statusReporter != null) {
+ statusReporter.incrCounter("Vespa Document Operation Counters", "Document operation failed", 1);
+ }
+ warnLog("No valid document id template found. Skipping.", PigWarning.UDF_WARNING_1);
return null;
}
if (operation == null) {
- warn("No valid operation found. Skipping.", PigWarning.UDF_WARNING_1);
+ warnLog("No valid operation found. Skipping.", PigWarning.UDF_WARNING_2);
return null;
}
@@ -107,25 +145,29 @@ public class VespaDocumentOperation extends EvalFunc<String> {
Schema inputSchema = getInputSchema();
Map<String, Object> fields = TupleTools.tupleMap(inputSchema, tuple);
String docId = TupleTools.toString(fields, template);
-
// create json
json = create(operation, docId, fields, properties, inputSchema);
if (json == null || json.length() == 0) {
- warn("No valid document operation could be created.", PigWarning.UDF_WARNING_1);
+ warnLog("No valid document operation could be created.", PigWarning.UDF_WARNING_3);
return null;
}
} catch (Exception e) {
+ if (statusReporter != null) {
+ statusReporter.incrCounter("Vespa Document Operation Counters", "Document operation failed", 1);
+ }
StringBuilder sb = new StringBuilder();
sb.append("Caught exception processing input row: \n");
sb.append(tuple.toString());
sb.append("\nException: ");
- sb.append(ExceptionUtils.getStackTrace(e));
- warn(sb.toString(), PigWarning.UDF_WARNING_1);
+ sb.append(getStackTraceAsString(e));
+ warnLog(sb.toString(), PigWarning.UDF_WARNING_4);
return null;
}
-
+ if (statusReporter != null) {
+ statusReporter.incrCounter("Vespa Document Operation Counters", "Document operation ok", 1);
+ }
return json;
}
@@ -134,14 +176,14 @@ public class VespaDocumentOperation extends EvalFunc<String> {
* Create a JSON Vespa document operation given the supplied fields,
* operation and document id template.
*
- * @param op Operation (put, remove, update)
- * @param docId Document id
- * @param fields Fields to put in document operation
- * @return A valid JSON Vespa document operation
+ * @param op Operation (put, remove, update)
+ * @param docId Document id
+ * @param fields Fields to put in document operation
+ * @return A valid JSON Vespa document operation
* @throws IOException ...
*/
public static String create(Operation op, String docId, Map<String, Object> fields, Properties properties,
- Schema schema) throws IOException {
+ Schema schema) throws IOException {
if (op == null) {
return null;
}
@@ -178,15 +220,72 @@ public class VespaDocumentOperation extends EvalFunc<String> {
return out.toString();
}
+ private static String getPartialOperation(Map<String, String> operationMap, String name, Properties properties) {
+ // This function checks if the property of the name falls into the map provided
+ // if yes, return the desired operation. if no, return null
+ // for example, input:
+ // operationMap map{"update-map-fields":"assign","remove-map-fields":"remove"}
+ // name date
+ // properties "update-map-fields":"date,month"
+ // output: assign
+ for (String label : operationMap.keySet()) {
+ if (properties.getProperty(label) != null) {
+ String[] p = properties.getProperty(label).split(",");
+ if (Arrays.asList(p).contains(name)) {
+ return operationMap.get(label);
+ }
+ }
+ }
+ return null;
+ }
@SuppressWarnings("unchecked")
private static void writeField(String name, Object value, Byte type, JsonGenerator g, Properties properties, Schema schema, Operation op, int depth) throws IOException {
if (shouldWriteField(name, properties, depth)) {
- g.writeFieldName(name);
- if (shouldWritePartialUpdate(op, depth)) {
- writePartialUpdate(value, type, g, name, properties, schema, op, depth);
+ String operation = getPartialOperation(mapPartialOperationMap, name, properties);
+ // check if the name has the property update-map-fields/remove-map-fields
+ // if yes, we need special treatments here as we need to loop through the tuple
+ // be aware the the operation here is not vespa operation such as "put" and "update"
+ // operation here are the field name we wish use to such as "assign" and "remove"
+ if (operation != null) {
+ writePartialUpdateAndRemoveMap(name, value, g, properties, schema, op, depth, operation);
} else {
- writeValue(value, type, g, name, properties, schema, op, depth);
+ g.writeFieldName(name);
+ if (shouldWritePartialUpdate(op, depth)) {
+ writePartialUpdate(value, type, g, name, properties, schema, op, depth);
+ } else {
+ writeValue(value, type, g, name, properties, schema, op, depth);
+ }
+ }
+
+ }
+ }
+
+ private static void writePartialUpdateAndRemoveMap(String name, Object value, JsonGenerator g, Properties properties, Schema schema, Operation op, int depth, String operation) throws IOException {
+ schema = (schema != null) ? schema.getField(0).schema : null;
+ // extract the key of map and keys in map for writing json when partial updating maps
+ Schema valueSchema = (schema != null) ? schema.getField(1).schema : null;
+ // data format { ( key; id, value: (abc,123,(123234,bbaa))) }
+ // the first element of each tuple in the bag will be the map to update
+ // the second element of each tuple in the bag will be the new value of the map
+ DataBag bag = (DataBag) value;
+ for (Tuple element : bag) {
+ if (element.size() != 2) {
+ continue;
+ }
+ String k = (String) element.get(0);
+ Object v = element.get(1);
+ Byte t = DataType.findType(v);
+ if (t == DataType.TUPLE) {
+ g.writeFieldName(name + "{" + k + "}");
+ if (operation.equals(PARTIAL_UPDATE_REMOVE)) {
+ g.writeStartObject();
+ g.writeFieldName(PARTIAL_UPDATE_REMOVE);
+ g.writeNumber(0);
+ g.writeEndObject();
+ } else {
+ writePartialUpdate(v, t, g, name, properties, valueSchema, op, depth);
+ }
}
}
}
@@ -235,14 +334,18 @@ public class VespaDocumentOperation extends EvalFunc<String> {
g.writeStartObject();
Map<Object, Object> map = (Map<Object, Object>) value;
if (shouldCreateTensor(map, name, properties)) {
- writeTensor(map, g);
+ if (isRemoveTensor(name, properties)) {
+ writeRemoveTensor(map, g);
+ } else {
+ writeTensor(map, g);
+ }
} else {
for (Map.Entry<Object, Object> entry : map.entrySet()) {
String k = entry.getKey().toString();
Object v = entry.getValue();
- Byte t = DataType.findType(v);
+ Byte t = DataType.findType(v);
Schema fieldSchema = (schema != null) ? schema.getField(k).schema : null;
- writeField(k, v, t, g, properties, fieldSchema, op, depth+1);
+ writeField(k, v, t, g, properties, fieldSchema, op, depth + 1);
}
}
g.writeEndObject();
@@ -269,7 +372,6 @@ public class VespaDocumentOperation extends EvalFunc<String> {
DataBag bag = (DataBag) value;
// get the schema of the tuple in bag
schema = (schema != null) ? schema.getField(0).schema : null;
-
if (shouldWriteBagAsMap(name, properties)) {
// when treating bag as map, the schema of bag should be {(key, val)....}
// the size of tuple in bag should be 2. 1st one is key. 2nd one is val.
@@ -285,9 +387,9 @@ public class VespaDocumentOperation extends EvalFunc<String> {
Byte t = DataType.findType(v);
if (t == DataType.TUPLE) {
Map<String, Object> fields = TupleTools.tupleMap(valueSchema, (Tuple) v);
- writeField(k, fields, DataType.MAP, g, properties, valueSchema, op, depth);
+ writeField(k, fields, DataType.MAP, g, properties, valueSchema, op, depth + 1);
} else {
- writeField(k, v, t, g, properties, valueSchema, op, depth);
+ writeField(k, v, t, g, properties, valueSchema, op, depth + 1);
}
}
g.writeEndObject();
@@ -309,7 +411,15 @@ public class VespaDocumentOperation extends EvalFunc<String> {
private static void writePartialUpdate(Object value, Byte type, JsonGenerator g, String name, Properties properties, Schema schema, Operation op, int depth) throws IOException {
g.writeStartObject();
- g.writeFieldName(PARTIAL_UPDATE_ASSIGN); // TODO: lookup field name in a property to determine correct operation
+ // here we check if the operation falls into the four partial operations we do on map/tensor structure
+ // if no, we assume it's a update on the whole document and we write assign here
+ // if yes, we write the desired operation here
+ String operation = getPartialOperation(partialOperationMap, name, properties);
+ if (operation != null) {
+ g.writeFieldName(operation);
+ } else {
+ g.writeFieldName(PARTIAL_UPDATE_ASSIGN);
+ }
writeValue(value, type, g, name, properties, schema, op, depth);
g.writeEndObject();
}
@@ -335,21 +445,38 @@ public class VespaDocumentOperation extends EvalFunc<String> {
}
private static boolean shouldWriteTupleAsMap(String name, Properties properties) {
+ // include UPDATE_MAP_FIELDS here because when updating the map
+ // the second element in each tuple should be written as a map
if (properties == null) {
return false;
}
+ String addBagAsMapFields = properties.getProperty(UPDATE_MAP_FIELDS);
String simpleObjectFields = properties.getProperty(SIMPLE_OBJECT_FIELDS);
- if (simpleObjectFields == null) {
+ if (simpleObjectFields == null && addBagAsMapFields == null) {
return false;
}
- if (simpleObjectFields.equals("*")) {
- return true;
+ if (addBagAsMapFields != null) {
+ if (addBagAsMapFields.equals("*")) {
+ return true;
+ }
+ String[] fields = addBagAsMapFields.split(",");
+ for (String field : fields) {
+ if (field.trim().equalsIgnoreCase(name)) {
+ return true;
+ }
+ }
+
}
- String[] fields = simpleObjectFields.split(",");
- for (String field : fields) {
- if (field.trim().equalsIgnoreCase(name)) {
+ if (simpleObjectFields != null) {
+ if (simpleObjectFields.equals("*")) {
return true;
}
+ String[] fields = simpleObjectFields.split(",");
+ for (String field : fields) {
+ if (field.trim().equalsIgnoreCase(name)) {
+ return true;
+ }
+ }
}
return false;
}
@@ -378,11 +505,50 @@ public class VespaDocumentOperation extends EvalFunc<String> {
if (properties == null) {
return false;
}
- String tensorFields = properties.getProperty(CREATE_TENSOR_FIELDS);
- if (tensorFields == null) {
+ String createTensorFields = properties.getProperty(CREATE_TENSOR_FIELDS);
+ String addTensorFields = properties.getProperty(UPDATE_TENSOR_FIELDS);
+ String removeTensorFields = properties.getProperty(REMOVE_TENSOR_FIELDS);
+
+ if (createTensorFields == null && addTensorFields == null && removeTensorFields == null) {
return false;
}
- String[] fields = tensorFields.split(",");
+ String[] fields;
+ if (createTensorFields != null) {
+ fields = createTensorFields.split(",");
+ for (String field : fields) {
+ if (field.trim().equalsIgnoreCase(name)) {
+ return true;
+ }
+ }
+ }
+ if (addTensorFields != null) {
+ fields = addTensorFields.split(",");
+ for (String field : fields) {
+ if (field.trim().equalsIgnoreCase(name)) {
+ return true;
+ }
+ }
+ }
+ if (removeTensorFields != null) {
+ fields = removeTensorFields.split(",");
+ for (String field : fields) {
+ if (field.trim().equalsIgnoreCase(name)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ private static boolean isRemoveTensor(String name, Properties properties) {
+ if (properties == null) {
+ return false;
+ }
+ String removeTensorFields = properties.getProperty(REMOVE_TENSOR_FIELDS);
+ if (removeTensorFields == null) {
+ return false;
+ }
+ String[] fields = removeTensorFields.split(",");
for (String field : fields) {
if (field.trim().equalsIgnoreCase(name)) {
return true;
@@ -449,4 +615,49 @@ public class VespaDocumentOperation extends EvalFunc<String> {
g.writeEndArray();
}
+ private static void writeRemoveTensor(Map<Object, Object> map, JsonGenerator g) throws IOException {
+ g.writeFieldName("addresses");
+ g.writeStartArray();
+ for (Map.Entry<Object, Object> entry : map.entrySet()) {
+ String k = entry.getKey().toString();
+ String[] dimensions = k.split(",");
+ for (String dimension : dimensions) {
+ g.writeStartObject();
+ if (dimension == null || dimension.isEmpty()) {
+ continue;
+ }
+ String[] address = dimension.split(":");
+ if (address.length != 2) {
+ throw new IllegalArgumentException("Malformed cell address: " + dimension);
+ }
+ String dim = address[0];
+ String label = address[1];
+ if (dim == null || label == null || dim.isEmpty() || label.isEmpty()) {
+ throw new IllegalArgumentException("Malformed cell address: " + dimension);
+ }
+ g.writeFieldName(dim.trim());
+ g.writeString(label.trim());
+ g.writeEndObject();
+ // Write address
+ }
+ }
+ g.writeEndArray();
+ }
+
+ // copied from vespajlib for reducing dependency and building with JDK 8
+ private static String getStackTraceAsString(Throwable throwable) {
+ try (StringWriter stringWriter = new StringWriter();
+ PrintWriter printWriter = new PrintWriter(stringWriter, true)) {
+ throwable.printStackTrace(printWriter);
+ return stringWriter.getBuffer().toString();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ // wrapper to emit logs
+ private void warnLog(String msg, PigWarning warning) {
+ warn(msg, warning);
+ System.err.println(msg);
+ }
}
diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java
index 3c6805019b8..67003273cac 100644
--- a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java
+++ b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java
@@ -10,6 +10,7 @@ import org.junit.Test;
import java.io.IOException;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.Map;
import static org.junit.Assert.assertEquals;
@@ -18,7 +19,6 @@ import static org.junit.Assert.assertNull;
@SuppressWarnings("serial")
public class VespaDocumentOperationTest {
-
@Test
public void requireThatUDFReturnsCorrectJson() throws Exception {
String json = getDocumentOperationJson("docid=id:<application>:metrics::<name>-<date>");
@@ -85,6 +85,189 @@ public class VespaDocumentOperationTest {
@Test
+ public void requireThatUDFCorrectlyGeneratesRemoveBagAsMapOperation() throws Exception {
+ DataBag bag = BagFactory.getInstance().newDefaultBag();
+
+ Schema innerObjectSchema = new Schema();
+ Tuple innerObjectTuple = TupleFactory.getInstance().newTuple();
+ addToTuple("year", DataType.CHARARRAY, "2020", innerObjectSchema, innerObjectTuple);
+ addToTuple("month", DataType.INTEGER, 3, innerObjectSchema, innerObjectTuple);
+
+ Schema objectSchema = new Schema();
+ Tuple objectTuple = TupleFactory.getInstance().newTuple();
+ addToTuple("key", DataType.CHARARRAY, "234566", objectSchema, objectTuple);
+ addToTuple("value", DataType.TUPLE, innerObjectTuple,innerObjectSchema,objectSchema, objectTuple);
+
+ Schema bagSchema = new Schema();
+ addToBagWithSchema("firstLayerTuple",DataType.TUPLE,objectTuple,objectSchema,bagSchema,bag);
+
+ innerObjectSchema = new Schema();
+ innerObjectTuple = TupleFactory.getInstance().newTuple();
+ addToTuple("year", DataType.CHARARRAY, "2020", innerObjectSchema, innerObjectTuple);
+ addToTuple("month", DataType.INTEGER, 3, innerObjectSchema, innerObjectTuple);
+
+ objectSchema = new Schema();
+ objectTuple = TupleFactory.getInstance().newTuple();
+ addToTuple("key", DataType.CHARARRAY, "123456", objectSchema, objectTuple);
+ addToTuple("value", DataType.TUPLE, innerObjectTuple,innerObjectSchema,objectSchema, objectTuple);
+
+ addToBagWithSchema("firstLayerTuple",DataType.TUPLE,objectTuple,objectSchema,bagSchema,bag);
+
+ Schema schema = new Schema();
+ Tuple tuple = TupleFactory.getInstance().newTuple();
+ addToTuple("bag", DataType.BAG, bag, bagSchema, schema, tuple);
+ addToTuple("id", DataType.CHARARRAY, "123", schema, tuple);
+
+ VespaDocumentOperation docOp = new VespaDocumentOperation("docid=id", "remove-map-fields=bag","operation=update");
+ docOp.setInputSchema(schema);
+ String json = docOp.exec(tuple);
+
+ ObjectMapper m = new ObjectMapper();
+ JsonNode root = m.readTree(json);
+ JsonNode fields = root.get("fields");
+ assertEquals("{\"remove\":0}", fields.get("bag{123456}").toString());
+ assertEquals("{\"remove\":0}", fields.get("bag{234566}").toString());
+
+ }
+
+ @Test
+ public void requireThatUDFCorrectlyGeneratesAddBagAsMapOperation() throws Exception {
+
+ DataBag bag = BagFactory.getInstance().newDefaultBag();
+
+ Schema innerObjectSchema = new Schema();
+ Tuple innerObjectTuple = TupleFactory.getInstance().newTuple();
+ addToTuple("year", DataType.CHARARRAY, "2020", innerObjectSchema, innerObjectTuple);
+ addToTuple("month", DataType.INTEGER, 3, innerObjectSchema, innerObjectTuple);
+
+ Schema objectSchema = new Schema();
+ Tuple objectTuple = TupleFactory.getInstance().newTuple();
+ addToTuple("key", DataType.CHARARRAY, "123456", objectSchema, objectTuple);
+ addToTuple("value", DataType.TUPLE, innerObjectTuple,innerObjectSchema,objectSchema, objectTuple);
+
+ Schema bagSchema = new Schema();
+ addToBagWithSchema("firstLayerTuple",DataType.TUPLE,objectTuple,objectSchema,bagSchema,bag);
+
+ Schema schema = new Schema();
+ Tuple tuple = TupleFactory.getInstance().newTuple();
+ addToTuple("bag", DataType.BAG, bag, bagSchema, schema, tuple);
+ addToTuple("id", DataType.CHARARRAY, "123", schema, tuple);
+ VespaDocumentOperation docOp = new VespaDocumentOperation("docid=id", "update-map-fields=bag","operation=update");
+ docOp.setInputSchema(schema);
+ String json = docOp.exec(tuple);
+
+ ObjectMapper m = new ObjectMapper();
+ JsonNode root = m.readTree(json);
+
+ JsonNode fields = root.get("fields");
+ JsonNode value = fields.get("bag{123456}");
+ JsonNode assign = value.get("assign");
+ assertEquals("2020", assign.get("year").getTextValue());
+ assertEquals(3, assign.get("month").getIntValue());
+ }
+
+ @Test
+ public void requireThatUDFCorrectlyGeneratesAddTensorOperation() throws Exception {
+
+ Schema schema = new Schema();
+ Tuple tuple = TupleFactory.getInstance().newTuple();
+
+ // Please refer to the tensor format documentation
+
+ Map<String, Double> tensor = new HashMap<String, Double>() {{
+ put("x:label1,y:label2,z:label4", 2.0);
+ put("x:label3", 3.0);
+ }};
+
+ addToTuple("id", DataType.CHARARRAY, "123", schema, tuple);
+ addToTuple("tensor", DataType.MAP, tensor, schema, tuple);
+
+ VespaDocumentOperation docOp = new VespaDocumentOperation("docid=empty", "update-tensor-fields=tensor","operation=update");
+ docOp.setInputSchema(schema);
+ String json = docOp.exec(tuple);
+
+ ObjectMapper m = new ObjectMapper();
+ JsonNode root = m.readTree(json);
+ JsonNode fields = root.get("fields");
+ JsonNode tensorValue = fields.get("tensor");
+ JsonNode add = tensorValue.get("add");
+ JsonNode cells = add.get("cells");
+ Iterator<JsonNode> cellsIterator = cells.getElements();
+
+ JsonNode element = cellsIterator.next();
+ assertEquals("label1", element.get("address").get("x").getTextValue());
+ assertEquals("label2", element.get("address").get("y").getTextValue());
+ assertEquals("label4", element.get("address").get("z").getTextValue());
+ assertEquals("2.0", element.get("value").toString());
+
+ element = cellsIterator.next();
+ assertEquals("label3", element.get("address").get("x").getTextValue());
+ assertEquals("3.0", element.get("value").toString());
+ }
+
+ @Test
+ public void requireThatUDFCorrectlyGeneratesRemoveTensorOperation() throws Exception {
+
+ Schema schema = new Schema();
+ Tuple tuple = TupleFactory.getInstance().newTuple();
+
+ // Please refer to the tensor format documentation
+
+ Map<String, Double> tensor = new HashMap<String, Double>() {{
+ put("x:label1,y:label2,z:label4", 2.0);
+ put("x:label3", 3.0);
+ }};
+
+ addToTuple("id", DataType.CHARARRAY, "123", schema, tuple);
+ addToTuple("tensor", DataType.MAP, tensor, schema, tuple);
+
+ VespaDocumentOperation docOp = new VespaDocumentOperation("docid=empty", "remove-tensor-fields=tensor","operation=update");
+ docOp.setInputSchema(schema);
+ String json = docOp.exec(tuple);
+
+ ObjectMapper m = new ObjectMapper();
+ JsonNode root = m.readTree(json);
+ JsonNode fields = root.get("fields");
+ JsonNode tensorValue = fields.get("tensor");
+ JsonNode remove = tensorValue.get("remove");
+ JsonNode address = remove.get("addresses");
+
+ Iterator<JsonNode> addressIterator = address.getElements();
+
+ JsonNode element = addressIterator.next();
+ assertEquals("label1", element.get("x").getTextValue());
+
+ element = addressIterator.next();
+ assertEquals("label2", element.get("y").getTextValue());
+
+ element = addressIterator.next();
+ assertEquals("label4", element.get("z").getTextValue());
+
+ element = addressIterator.next();
+ assertEquals("label3", element.get("x").getTextValue());
+ }
+
+ @Test
+ public void requireThatUDFReturnsNullWhenExceptionHappens() throws IOException {
+ Schema schema = new Schema();
+ Tuple tuple = TupleFactory.getInstance().newTuple();
+
+ // broken DELTA format that would throw internally
+ Map<String, Double> tensor = new HashMap<String, Double>() {{
+ put("xlabel1", 2.0); // missing : between 'x' and 'label1'
+ }};
+
+ addToTuple("id", DataType.CHARARRAY, "123", schema, tuple);
+ addToTuple("tensor", DataType.MAP, tensor, schema, tuple);
+
+ VespaDocumentOperation docOp = new VespaDocumentOperation("docid=empty", "create-tensor-fields=tensor");
+ docOp.setInputSchema(schema);
+ String json = docOp.exec(tuple);
+
+ assertNull(json);
+ }
+
+ @Test
public void requireThatUDFCorrectlyGeneratesRemoveOperation() throws Exception {
String json = getDocumentOperationJson("operation=remove", "docid=id:<application>:metrics::<name>-<date>");
ObjectMapper m = new ObjectMapper();
@@ -368,4 +551,10 @@ public class VespaDocumentOperationTest {
schema.add(new Schema.FieldSchema(alias, schemaInField, type));
tuple.append(value);
}
+
+ private void addToBagWithSchema(String alias, byte type, Tuple value, Schema schemaInField, Schema schema,DataBag bag)
+ throws FrontendException {
+ schema.add(new Schema.FieldSchema(alias, schemaInField, type));
+ bag.add(value);
+ }
}
diff --git a/vespalib/src/vespa/vespalib/util/gencnt.cpp b/vespalib/src/vespa/vespalib/util/gencnt.cpp
index 5adbf14a757..ad82cf2e67c 100644
--- a/vespalib/src/vespa/vespalib/util/gencnt.cpp
+++ b/vespalib/src/vespa/vespalib/util/gencnt.cpp
@@ -58,7 +58,7 @@ GenCnt::distance(const GenCnt &other) const
GenCnt &
GenCnt::operator=(const GenCnt &src)
{
- _val = src._val;
+ _val = src.getAsInt();
return *this;
}
diff --git a/vespalib/src/vespa/vespalib/util/gencnt.h b/vespalib/src/vespa/vespalib/util/gencnt.h
index 7bfc5a7e49b..cac868a8adb 100644
--- a/vespalib/src/vespa/vespalib/util/gencnt.h
+++ b/vespalib/src/vespa/vespalib/util/gencnt.h
@@ -2,6 +2,7 @@
#pragma once
#include <cstdint>
+#include <atomic>
namespace vespalib {
@@ -16,7 +17,7 @@ namespace vespalib {
class GenCnt
{
private:
- uint32_t _val;
+ std::atomic<uint32_t> _val;
public:
/**
@@ -31,12 +32,12 @@ public:
**/
GenCnt(uint32_t val) : _val(val) {}
- GenCnt(const GenCnt &rhs) = default;
+ GenCnt(const GenCnt &rhs) : _val(rhs.getAsInt()) {}
/**
* @brief empty destructor
**/
- ~GenCnt() {}
+ ~GenCnt() = default;
/**
* @brief Increase the generation count held by this object
@@ -95,7 +96,7 @@ public:
*
* @return generation counter
**/
- uint32_t getAsInt() const { return _val; }
+ uint32_t getAsInt() const { return _val.load(std::memory_order_relaxed); }
/**
* @brief Set the generation counter from an integer
diff --git a/vespalib/src/vespa/vespalib/util/signalhandler.cpp b/vespalib/src/vespa/vespalib/util/signalhandler.cpp
index 21543ef10d8..c4fb7cfa517 100644
--- a/vespalib/src/vespa/vespalib/util/signalhandler.cpp
+++ b/vespalib/src/vespa/vespalib/util/signalhandler.cpp
@@ -2,6 +2,11 @@
#include "signalhandler.h"
#include <cassert>
+#include <atomic>
+#include <chrono>
+#include <thread>
+
+using namespace std::chrono_literals;
namespace vespalib {
@@ -9,6 +14,9 @@ std::vector<SignalHandler*> SignalHandler::_handlers;
namespace {
+// 31 bit concurrency counter, 1 (lsb) bit indicating shutdown
+std::atomic<int> signal_counter;
+
class Shutdown
{
public:
@@ -19,9 +27,6 @@ public:
}
-// Clear SignalHandler::_handlers in a slightly less unsafe manner.
-Shutdown shutdown;
-
SignalHandler SignalHandler::HUP(SIGHUP);
SignalHandler SignalHandler::INT(SIGINT);
SignalHandler SignalHandler::TERM(SIGTERM);
@@ -36,12 +41,19 @@ SignalHandler SignalHandler::FPE(SIGFPE);
SignalHandler SignalHandler::QUIT(SIGQUIT);
SignalHandler SignalHandler::USR1(SIGUSR1);
+// Clear SignalHandler::_handlers in a slightly less unsafe manner.
+Shutdown shutdown;
+
void
SignalHandler::handleSignal(int signal)
{
- if ((((size_t)signal) < _handlers.size()) && (_handlers[signal] != 0)) {
- _handlers[signal]->gotSignal();
+ static_assert(std::atomic<int>::is_always_lock_free, "signal_counter must be lock free");
+ if ((signal_counter.fetch_add(2) & 1) == 0) {
+ if ((((size_t)signal) < _handlers.size()) && (_handlers[signal] != 0)) {
+ _handlers[signal]->gotSignal();
+ }
}
+ signal_counter.fetch_sub(2);
}
void
@@ -108,12 +120,21 @@ SignalHandler::unhook()
void
SignalHandler::shutdown()
{
+ while ((signal_counter.fetch_or(1) & ~1) != 0) {
+ std::this_thread::sleep_for(10ms);
+ }
for (std::vector<SignalHandler*>::iterator
it = _handlers.begin(), ite = _handlers.end();
it != ite;
++it) {
- if (*it != nullptr)
- (*it)->unhook();
+ if (*it != nullptr) {
+ // Ignore SIGTERM at shutdown in case valgrind is used.
+ if ((*it)->_signal == SIGTERM) {
+ (*it)->ignore();
+ } else {
+ (*it)->unhook();
+ }
+ }
}
std::vector<SignalHandler *>().swap(_handlers);
}
diff --git a/vespalog/abi-spec.json b/vespalog/abi-spec.json
index 09ac3fa75d3..996cc0259a0 100644
--- a/vespalog/abi-spec.json
+++ b/vespalog/abi-spec.json
@@ -186,19 +186,6 @@
],
"fields": []
},
- "com.yahoo.log.MappedLevelControllerRepo": {
- "superClass": "java.lang.Object",
- "interfaces": [],
- "attributes": [
- "public"
- ],
- "methods": [
- "public void <init>(java.nio.MappedByteBuffer, int, int, java.lang.String)",
- "public com.yahoo.log.LevelController getLevelController(java.lang.String)",
- "public void checkBack()"
- ],
- "fields": []
- },
"com.yahoo.log.RejectFilter": {
"superClass": "java.lang.Object",
"interfaces": [],
@@ -300,15 +287,10 @@
"public"
],
"methods": [
- "public void <init>(java.lang.String, java.lang.String, java.lang.String)",
- "public com.yahoo.log.LevelController getLevelControl(java.lang.String)",
"public com.yahoo.log.LevelController getLevelController(java.lang.String)",
"public void close()"
],
- "fields": [
- "public static final int controlFileHeaderLength",
- "public static final int numLevels"
- ]
+ "fields": []
},
"com.yahoo.log.event.Collection": {
"superClass": "com.yahoo.log.event.Event",
diff --git a/vespalog/src/main/java/com/yahoo/log/LevelController.java b/vespalog/src/main/java/com/yahoo/log/LevelController.java
index ccd18f126d6..0efe0d4e7c1 100644
--- a/vespalog/src/main/java/com/yahoo/log/LevelController.java
+++ b/vespalog/src/main/java/com/yahoo/log/LevelController.java
@@ -1,4 +1,8 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.log;
+
+import java.util.logging.Level;
+
/**
* This is the interface for controlling the log level of a
* component logger. This hides the actual controlling
@@ -7,32 +11,24 @@
* @author arnej27959
*
*/
-
-/**
- * @author arnej27959
- **/
-package com.yahoo.log;
-
-import java.util.logging.Level;
-
public interface LevelController {
/**
* should we actually publish a log message with the given Level now?
- **/
- public boolean shouldLog(Level level);
+ */
+ boolean shouldLog(Level level);
/**
* return a string suitable for printing in a logctl file.
* the string must be be 4 * 8 characters, where each group
* of 4 characters is either " ON" or " OFF".
- **/
- public String getOnOffString();
+ */
+ String getOnOffString();
/**
* check the current state of logging and reflect it into the
* associated Logger instance, if available.
- **/
- public void checkBack();
- public Level getLevelLimit();
+ */
+ void checkBack();
+ Level getLevelLimit();
}
diff --git a/vespalog/src/main/java/com/yahoo/log/MappedLevelControllerRepo.java b/vespalog/src/main/java/com/yahoo/log/MappedLevelControllerRepo.java
index f02d8793b23..53f4de4f264 100644
--- a/vespalog/src/main/java/com/yahoo/log/MappedLevelControllerRepo.java
+++ b/vespalog/src/main/java/com/yahoo/log/MappedLevelControllerRepo.java
@@ -13,14 +13,14 @@ import java.util.Map;
* @author Ulf Lilleengen
* @since 5.1
*/
-public class MappedLevelControllerRepo {
+class MappedLevelControllerRepo {
private final Map<String, LevelController> levelControllerMap = new HashMap<>();
private final MappedByteBuffer mapBuf;
private final int controlFileHeaderLength;
private final int numLevels;
private final String logControlFilename;
- public MappedLevelControllerRepo(MappedByteBuffer mapBuf, int controlFileHeaderLength, int numLevels, String logControlFilename) {
+ MappedLevelControllerRepo(MappedByteBuffer mapBuf, int controlFileHeaderLength, int numLevels, String logControlFilename) {
this.mapBuf = mapBuf;
this.controlFileHeaderLength = controlFileHeaderLength;
this.numLevels = numLevels;
@@ -101,12 +101,12 @@ public class MappedLevelControllerRepo {
return MappedLevelController.checkOnOff(mapBuf, levstart);
}
- public LevelController getLevelController(String suffix) {
+ LevelController getLevelController(String suffix) {
return levelControllerMap.get(suffix);
}
- public void checkBack() {
+ void checkBack() {
for (LevelController ctrl : levelControllerMap.values()) {
ctrl.checkBack();
}
diff --git a/vespalog/src/main/java/com/yahoo/log/VespaLevelControllerRepo.java b/vespalog/src/main/java/com/yahoo/log/VespaLevelControllerRepo.java
index 85d92075827..86eba1c019e 100644
--- a/vespalog/src/main/java/com/yahoo/log/VespaLevelControllerRepo.java
+++ b/vespalog/src/main/java/com/yahoo/log/VespaLevelControllerRepo.java
@@ -30,12 +30,12 @@ public class VespaLevelControllerRepo implements LevelControllerRepo {
/**
* length of fixed header content of a control file, constant:
**/
- public static final int controlFileHeaderLength;
+ static final int controlFileHeaderLength;
/**
* number of distinctly controlled levels (in logctl files),
* must be compatible with C++ Vespa logging
**/
- public static final int numLevels = 8;
+ static final int numLevels = 8;
static {
controlFileHeaderLength = CFHEADER.length()
@@ -50,7 +50,7 @@ public class VespaLevelControllerRepo implements LevelControllerRepo {
**/
private LevelController defaultLevelCtrl;
- public VespaLevelControllerRepo(String logCtlFn, String logLevel, String applicationPrefix) {
+ VespaLevelControllerRepo(String logCtlFn, String logLevel, String applicationPrefix) {
this.logControlFilename = logCtlFn;
this.appPrefix = applicationPrefix;
defaultLevelCtrl = new DefaultLevelController(logLevel);
@@ -116,17 +116,17 @@ public class VespaLevelControllerRepo implements LevelControllerRepo {
ctlFile.writeBytes(appPrefix);
}
ctlFile.writeBytes("\n");
- for (int i = appLen; i < maxPrefix; i++) {
+ for (int i = appLen; i < maxPrefix + 2; i++) {
byte space = ' ';
ctlFile.write(space);
}
ctlFile.writeBytes("\n");
ctlFile.setLength(ctlFile.getFilePointer());
- if (ctlFile.getFilePointer() != controlFileHeaderLength) {
+ if (ctlFile.getFilePointer() != (controlFileHeaderLength + 2)) {
System.err.println("internal error, bad header length: "
+ ctlFile.getFilePointer()
+ " (should have been: "
- + controlFileHeaderLength
+ + (controlFileHeaderLength + 2)
+ ")");
}
}
@@ -142,7 +142,7 @@ public class VespaLevelControllerRepo implements LevelControllerRepo {
levelControllerRepo = new MappedLevelControllerRepo(mapBuf, controlFileHeaderLength, numLevels, logControlFilename);
}
- public LevelController getLevelControl(String suffix) {
+ private LevelController getLevelControl(String suffix) {
LevelController ctrl = null;
if (levelControllerRepo != null) {
if (suffix == null || suffix.equals("default")) {
diff --git a/vespalog/src/main/java/com/yahoo/log/VespaLogHandler.java b/vespalog/src/main/java/com/yahoo/log/VespaLogHandler.java
index 331780f226b..32b1003c20c 100644
--- a/vespalog/src/main/java/com/yahoo/log/VespaLogHandler.java
+++ b/vespalog/src/main/java/com/yahoo/log/VespaLogHandler.java
@@ -44,7 +44,7 @@ class VespaLogHandler extends StreamHandler {
/**
* Publish a log record into the Vespa log target.
*/
- public synchronized void publish (LogRecord record) {
+ public synchronized void publish(LogRecord record) {
Level level = record.getLevel();
String component = record.getLoggerName();
diff --git a/vespalog/src/test/java/com/yahoo/log/VespaLevelControllerRepoTest.java b/vespalog/src/test/java/com/yahoo/log/VespaLevelControllerRepoTest.java
index 3bd4de03f4e..9d04e079f55 100644
--- a/vespalog/src/test/java/com/yahoo/log/VespaLevelControllerRepoTest.java
+++ b/vespalog/src/test/java/com/yahoo/log/VespaLevelControllerRepoTest.java
@@ -62,12 +62,14 @@ public class VespaLevelControllerRepoTest {
RandomAccessFile lcfile = new RandomAccessFile(lcf, "rw");
- lcfile.seek(VespaLevelControllerRepo.controlFileHeaderLength);
+ lcfile.seek(VespaLevelControllerRepo.controlFileHeaderLength+1);
+ assertEquals(lcfile.readByte(), '\n');
+ lcfile.seek(VespaLevelControllerRepo.controlFileHeaderLength+2);
assertEquals(lcfile.readByte(), 'd');
- lcfile.seek(VespaLevelControllerRepo.controlFileHeaderLength + 7);
+ lcfile.seek(VespaLevelControllerRepo.controlFileHeaderLength+2 + 7);
assertEquals(lcfile.readByte(), ':');
- assertEquals(0, (VespaLevelControllerRepo.controlFileHeaderLength+9) % 4);
- lcfile.seek(VespaLevelControllerRepo.controlFileHeaderLength + 9);
+ assertEquals(0, (VespaLevelControllerRepo.controlFileHeaderLength+13) % 4);
+ lcfile.seek(VespaLevelControllerRepo.controlFileHeaderLength + 13);
assertEquals(0x20204f4e, lcfile.readInt());
int off = findControlString(lcfile, "com.yahoo.log.test");
diff --git a/vespalog/src/test/java/com/yahoo/log/VespaLogHandlerTestCase.java b/vespalog/src/test/java/com/yahoo/log/VespaLogHandlerTestCase.java
index c0dd856b634..220e5e9271e 100644
--- a/vespalog/src/test/java/com/yahoo/log/VespaLogHandlerTestCase.java
+++ b/vespalog/src/test/java/com/yahoo/log/VespaLogHandlerTestCase.java
@@ -13,6 +13,7 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
@@ -32,20 +33,20 @@ import static org.junit.Assert.fail;
* @author Bjorn Borud
*/
public class VespaLogHandlerTestCase {
- protected static String hostname;
- protected static String pid;
+ private static String hostname;
+ private static String pid;
- protected static LogRecord record1;
- protected static String record1String;
+ static LogRecord record1;
+ static String record1String;
- protected static LogRecord record2;
- protected static String record2String;
+ static LogRecord record2;
+ private static String record2String;
- protected static LogRecord record3;
- protected static String record3String;
+ private static LogRecord record3;
+ private static String record3String;
- protected static LogRecord record4;
- protected static String record4String;
+ private static LogRecord record4;
+ private static String record4String;
static {
hostname = Util.getHostName();
@@ -139,7 +140,7 @@ public class VespaLogHandlerTestCase {
}
@Test
- public void testFallback() throws FileNotFoundException {
+ public void testFallback() {
File file = new File("mydir2");
file.delete();
assertTrue(file.mkdir());
@@ -157,7 +158,7 @@ public class VespaLogHandlerTestCase {
* Perform simple test
*/
@Test
- public void testLogCtl () throws InterruptedException, FileNotFoundException {
+ public void testLogCtl () {
MockLevelController ctl = new MockLevelController();
MockLevelControllerRepo ctlRepo = new MockLevelControllerRepo(ctl);
MockLogTarget target = new MockLogTarget();
@@ -203,7 +204,7 @@ public class VespaLogHandlerTestCase {
@Test
public void testRotate () throws IOException {
// Doesn't work in Windows. TODO: Fix the logging stuff
- if (System.getProperty("os.name").toLowerCase().indexOf("win")>=0)
+ if (System.getProperty("os.name").toLowerCase().contains("win"))
return;
try {
VespaLogHandler h
@@ -269,10 +270,8 @@ public class VespaLogHandlerTestCase {
);
class LogRacer implements Runnable {
- private int n;
- public LogRacer (int n) {
- this.n = n;
+ private LogRacer() {
}
public void run () {
@@ -285,7 +284,7 @@ public class VespaLogHandlerTestCase {
}
}
- public void logLikeCrazy () {
+ void logLikeCrazy() {
for (int j = 0; j < numLogEntries; j++) {
try {
h.publish(record1);
@@ -299,7 +298,7 @@ public class VespaLogHandlerTestCase {
}
for (int i = 0; i < numThreads; i++) {
- t[i] = new Thread(new LogRacer(i));
+ t[i] = new Thread(new LogRacer());
t[i].start();
}
@@ -361,35 +360,23 @@ public class VespaLogHandlerTestCase {
*
*/
protected static String[] readFile (String fileName) {
- BufferedReader br = null;
- List<String> lines = new LinkedList<String>();
- try {
- br = new BufferedReader(
- new InputStreamReader(new FileInputStream(new File(fileName)), "UTF-8"));
+ List<String> lines = new LinkedList<>();
+ try (BufferedReader br = new BufferedReader(
+ new InputStreamReader(new FileInputStream(new File(fileName)), StandardCharsets.UTF_8))) {
for (String line = br.readLine();
line != null;
- line = br.readLine())
- {
+ line = br.readLine()) {
lines.add(line);
}
- return lines.toArray(new String[lines.size()]);
- }
- catch (Throwable e) {
+ return lines.toArray(new String[0]);
+ } catch (Throwable e) {
return new String[0];
}
- finally {
- if (br != null) {
- try {
- br.close();
- }
- catch (IOException e) {}
- }
- }
}
private static class MockLevelControllerRepo implements LevelControllerRepo {
private LevelController levelController;
- public MockLevelControllerRepo(LevelController controller) {
+ MockLevelControllerRepo(LevelController controller) {
this.levelController = controller;
}
@@ -411,7 +398,7 @@ public class VespaLogHandlerTestCase {
return (level.equals(logLevel));
}
- public void setShouldLog(Level level) {
+ void setShouldLog(Level level) {
this.logLevel = level;
}
@@ -431,7 +418,7 @@ public class VespaLogHandlerTestCase {
private static class MockLogTarget implements LogTarget {
private final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- public String[] getLines() {
+ String[] getLines() {
return baos.toString().split("\n");
}
@Override
diff --git a/vespalog/src/vespa/log/control-file.cpp b/vespalog/src/vespa/log/control-file.cpp
index 4a4fd36e0ac..a4fe9e98b8d 100644
--- a/vespalog/src/vespa/log/control-file.cpp
+++ b/vespalog/src/vespa/log/control-file.cpp
@@ -69,7 +69,7 @@ ControlFile::ensureHeader()
perror("log::ControlFile write(A) failed");
}
- char spaces[_maxPrefix + 1];
+ char spaces[_maxPrefix + 3];
memset(spaces, ' ', sizeof spaces);
spaces[sizeof(spaces) - 1] = '\0';