diff options
102 files changed, 1539 insertions, 1073 deletions
diff --git a/config-lib/src/main/java/com/yahoo/config/FileReference.java b/config-lib/src/main/java/com/yahoo/config/FileReference.java index 3b95c2fbd4c..ee99ebfa2b7 100755 --- a/config-lib/src/main/java/com/yahoo/config/FileReference.java +++ b/config-lib/src/main/java/com/yahoo/config/FileReference.java @@ -28,14 +28,16 @@ public final class FileReference { } @Override - public int hashCode() { - return value.hashCode(); + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + FileReference that = (FileReference) o; + return value.equals(that.value); } @Override - public boolean equals(Object other) { - return other instanceof FileReference && - value.equals(((FileReference)other).value); + public int hashCode() { + return Objects.hash(value); } @Override diff --git a/config-model/src/main/java/com/yahoo/config/model/deploy/TestProperties.java b/config-model/src/main/java/com/yahoo/config/model/deploy/TestProperties.java index e37b0b07746..896c6ea9a7f 100644 --- a/config-model/src/main/java/com/yahoo/config/model/deploy/TestProperties.java +++ b/config-model/src/main/java/com/yahoo/config/model/deploy/TestProperties.java @@ -43,7 +43,6 @@ public class TestProperties implements ModelContext.Properties { private double defaultTermwiseLimit = 1.0; private Optional<EndpointCertificateSecrets> endpointCertificateSecrets = Optional.empty(); private boolean useNewAthenzFilter = false; - private boolean useDedicatedNodesWhenUnspecified = false; private AthenzDomain athenzDomain; @Override public boolean multitenant() { return multitenant; } @@ -65,7 +64,7 @@ public class TestProperties implements ModelContext.Properties { @Override public double defaultTermwiseLimit() { return defaultTermwiseLimit; } @Override public boolean useBucketSpaceMetric() { return true; } @Override public boolean useNewAthenzFilter() { return useNewAthenzFilter; } - @Override public boolean useDedicatedNodesWhenUnspecified() { return useDedicatedNodesWhenUnspecified; } + @Override public boolean useDedicatedNodesWhenUnspecified() { return true; } @Override public Optional<AthenzDomain> athenzDomain() { return Optional.ofNullable(athenzDomain); } public TestProperties setDefaultTermwiseLimit(double limit) { @@ -118,11 +117,6 @@ public class TestProperties implements ModelContext.Properties { return this; } - public TestProperties setUseDedicatedNodesWhenUnspecified(boolean useDedicatedNodesWhenUnspecified) { - this.useDedicatedNodesWhenUnspecified = useDedicatedNodesWhenUnspecified; - return this; - } - public TestProperties setAthenzDomain(AthenzDomain domain) { this.athenzDomain = domain; return this; diff --git a/config-model/src/main/java/com/yahoo/vespa/model/container/http/ssl/HostedSslConnectorFactory.java b/config-model/src/main/java/com/yahoo/vespa/model/container/http/ssl/HostedSslConnectorFactory.java index f61618c789b..fb8e9dffbbb 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/container/http/ssl/HostedSslConnectorFactory.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/container/http/ssl/HostedSslConnectorFactory.java @@ -7,6 +7,7 @@ import com.yahoo.jdisc.http.ConnectorConfig.Ssl.ClientAuth; import com.yahoo.vespa.model.container.component.SimpleComponent; import com.yahoo.vespa.model.container.http.ConnectorFactory; +import java.time.Duration; import java.util.List; /** @@ -67,10 +68,13 @@ public class HostedSslConnectorFactory extends ConnectorFactory { @Override public void getConfig(ConnectorConfig.Builder connectorBuilder) { super.getConfig(connectorBuilder); - connectorBuilder.tlsClientAuthEnforcer(new ConnectorConfig.TlsClientAuthEnforcer.Builder() - .pathWhitelist(INSECURE_WHITELISTED_PATHS) - .enable(enforceClientAuth)); - connectorBuilder.proxyProtocol(configureProxyProtocol()); + connectorBuilder + .tlsClientAuthEnforcer(new ConnectorConfig.TlsClientAuthEnforcer.Builder() + .pathWhitelist(INSECURE_WHITELISTED_PATHS) + .enable(enforceClientAuth)) + .proxyProtocol(configureProxyProtocol()) + .idleTimeout(Duration.ofMinutes(3).toSeconds()) + .maxConnectionLife(Duration.ofMinutes(10).toSeconds()); } private ConnectorConfig.ProxyProtocol.Builder configureProxyProtocol() { diff --git a/config-model/src/main/java/com/yahoo/vespa/model/search/Tuning.java b/config-model/src/main/java/com/yahoo/vespa/model/search/Tuning.java index f5a91297e9e..f93bf6fc872 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/search/Tuning.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/search/Tuning.java @@ -241,7 +241,12 @@ public class Tuning extends AbstractConfigProducer implements ProtonConfig.Produ public Integer level = null; public void getConfig(ProtonConfig.Summary.Cache.Compression.Builder compression) { - if (type != null) compression.type(ProtonConfig.Summary.Cache.Compression.Type.Enum.valueOf(type.name)); + if (type != null) compression.type(ProtonConfig.Summary.Cache.Compression.Type.Enum.valueOf(type.name)); + if (level != null) compression.level(level); + } + + public void getConfig(ProtonConfig.Summary.Log.Compact.Compression.Builder compression) { + if (type != null) compression.type(ProtonConfig.Summary.Log.Compact.Compression.Type.Enum.valueOf(type.name)); if (level != null) compression.level(level); } @@ -281,6 +286,12 @@ public class Tuning extends AbstractConfigProducer implements ProtonConfig.Produ } } + public void getConfig(ProtonConfig.Summary.Log.Compact.Builder compact) { + if (compression != null) { + compression.getConfig(compact.compression); + } + } + public void getConfig(ProtonConfig.Summary.Log.Chunk.Builder chunk) { if (outputInt) { if (maxSize!=null) chunk.maxbytes(maxSize.intValue()); @@ -288,7 +299,7 @@ public class Tuning extends AbstractConfigProducer implements ProtonConfig.Produ throw new IllegalStateException("Fix this, chunk does not have long types"); } if (compression != null) { - compression.getConfig(chunk.compression); + compression.getConfig(chunk.compression); } } } @@ -303,6 +314,7 @@ public class Tuning extends AbstractConfigProducer implements ProtonConfig.Produ if (minFileSizeFactor!=null) log.minfilesizefactor(minFileSizeFactor); if (chunk != null) { chunk.getConfig(log.chunk); + chunk.getConfig(log.compact); } } } diff --git a/config-model/src/test/java/com/yahoo/config/model/provision/ModelProvisioningTest.java b/config-model/src/test/java/com/yahoo/config/model/provision/ModelProvisioningTest.java index ebafe9dd45b..7208d8c5fc1 100644 --- a/config-model/src/test/java/com/yahoo/config/model/provision/ModelProvisioningTest.java +++ b/config-model/src/test/java/com/yahoo/config/model/provision/ModelProvisioningTest.java @@ -1364,10 +1364,11 @@ public class ModelProvisioningTest { " </http>" + "</container>"; VespaModelTester tester = new VespaModelTester(); - tester.addHosts(1); + tester.addHosts(2); VespaModel model = tester.createModel(services, true); - assertEquals(1, model.getHosts().size()); + assertEquals(2, model.getHosts().size()); assertEquals(1, model.getContainerClusters().size()); + assertEquals(2, model.getContainerClusters().get("foo").getContainers().size()); } @Test @@ -1430,7 +1431,7 @@ public class ModelProvisioningTest { } @Test - public void testNoNodeTagMeans1Node() { + public void testNoNodeTagMeansTwoNodes() { String services = "<?xml version='1.0' encoding='utf-8' ?>\n" + "<services>" + @@ -1445,31 +1446,6 @@ public class ModelProvisioningTest { " </content>" + "</services>"; VespaModelTester tester = new VespaModelTester(); - tester.addHosts(1); - VespaModel model = tester.createModel(services, true); - assertEquals(1, model.getRoot().hostSystem().getHosts().size()); - assertEquals(1, model.getAdmin().getSlobroks().size()); - assertEquals(1, model.getContainerClusters().get("foo").getContainers().size()); - assertEquals(1, model.getContentClusters().get("bar").getRootGroup().countNodes()); - } - - @Test - public void testNoNodeTagMeansTwoNodesInContainerClusterWithFeatureFlag() { - String services = - "<?xml version='1.0' encoding='utf-8' ?>\n" + - "<services>" + - " <container id='foo' version='1.0'>" + - " <search/>" + - " <document-api/>" + - " </container>" + - " <content version='1.0' id='bar'>" + - " <documents>" + - " <document type='type1' mode='index'/>" + - " </documents>" + - " </content>" + - "</services>"; - VespaModelTester tester = new VespaModelTester(); - tester.setUseDedicatedNodesWhenUnspecified(true); tester.addHosts(3); VespaModel model = tester.createModel(services, true); assertEquals(3, model.getRoot().hostSystem().getHosts().size()); @@ -1479,7 +1455,7 @@ public class ModelProvisioningTest { } @Test - public void testNoNodeTagMeans1NodeNoContent() { + public void testNoNodeTagMeansTwoNodesNoContent() { String services = "<?xml version='1.0' encoding='utf-8' ?>\n" + "<services>" + @@ -1489,11 +1465,11 @@ public class ModelProvisioningTest { " </container>" + "</services>"; VespaModelTester tester = new VespaModelTester(); - tester.addHosts(1); + tester.addHosts(2); VespaModel model = tester.createModel(services, true); - assertEquals(1, model.getRoot().hostSystem().getHosts().size()); - assertEquals(1, model.getAdmin().getSlobroks().size()); - assertEquals(1, model.getContainerClusters().get("foo").getContainers().size()); + assertEquals(2, model.getRoot().hostSystem().getHosts().size()); + assertEquals(2, model.getAdmin().getSlobroks().size()); + assertEquals(2, model.getContainerClusters().get("foo").getContainers().size()); } @Test diff --git a/config-model/src/test/java/com/yahoo/vespa/model/builder/xml/dom/DomSearchTuningBuilderTest.java b/config-model/src/test/java/com/yahoo/vespa/model/builder/xml/dom/DomSearchTuningBuilderTest.java index 060fff5bf66..73dd1ca3f3b 100644 --- a/config-model/src/test/java/com/yahoo/vespa/model/builder/xml/dom/DomSearchTuningBuilderTest.java +++ b/config-model/src/test/java/com/yahoo/vespa/model/builder/xml/dom/DomSearchTuningBuilderTest.java @@ -4,7 +4,6 @@ package com.yahoo.vespa.model.builder.xml.dom; import com.yahoo.collections.CollectionUtil; import com.yahoo.vespa.config.search.core.ProtonConfig; import com.yahoo.config.model.builder.xml.test.DomBuilderTest; -import com.yahoo.vespa.model.content.DispatchTuning; import com.yahoo.vespa.model.search.Tuning; import org.junit.Test; import org.w3c.dom.Element; @@ -228,6 +227,8 @@ public class DomSearchTuningBuilderTest extends DomBuilderTest { assertEquals(cfg.summary().log().chunk().maxbytes(), 256); assertEquals(cfg.summary().log().chunk().compression().type(), ProtonConfig.Summary.Log.Chunk.Compression.Type.LZ4); assertEquals(cfg.summary().log().chunk().compression().level(), 5); + assertEquals(cfg.summary().log().compact().compression().type(), ProtonConfig.Summary.Log.Compact.Compression.Type.LZ4); + assertEquals(cfg.summary().log().compact().compression().level(), 5); } @Test diff --git a/config-model/src/test/java/com/yahoo/vespa/model/container/xml/ContainerModelBuilderTest.java b/config-model/src/test/java/com/yahoo/vespa/model/container/xml/ContainerModelBuilderTest.java index 47d85465f3f..dcd1c46e52f 100644 --- a/config-model/src/test/java/com/yahoo/vespa/model/container/xml/ContainerModelBuilderTest.java +++ b/config-model/src/test/java/com/yahoo/vespa/model/container/xml/ContainerModelBuilderTest.java @@ -624,7 +624,7 @@ public class ContainerModelBuilderTest extends ContainerModelBuilderTestBase { .setMultitenant(true) .setHostedVespa(true)) .build()); - assertEquals(1, model.hostSystem().getHosts().size()); + assertEquals(2, model.hostSystem().getHosts().size()); } @Test(expected = IllegalArgumentException.class) diff --git a/config-model/src/test/java/com/yahoo/vespa/model/test/VespaModelTester.java b/config-model/src/test/java/com/yahoo/vespa/model/test/VespaModelTester.java index 17c7b3a308d..cdfd9fab194 100644 --- a/config-model/src/test/java/com/yahoo/vespa/model/test/VespaModelTester.java +++ b/config-model/src/test/java/com/yahoo/vespa/model/test/VespaModelTester.java @@ -15,7 +15,6 @@ import com.yahoo.config.model.provision.SingleNodeProvisioner; import com.yahoo.config.provision.ApplicationId; import com.yahoo.config.provision.Flavor; import com.yahoo.config.provision.NodeResources; -import com.yahoo.config.provision.Provisioner; import com.yahoo.config.provision.Zone; import com.yahoo.vespa.model.VespaModel; import com.yahoo.vespa.model.test.utils.ApplicationPackageUtils; @@ -49,7 +48,6 @@ public class VespaModelTester { private Map<NodeResources, Collection<Host>> hostsByResources = new HashMap<>(); private ApplicationId applicationId = ApplicationId.defaultId(); private boolean useDedicatedNodeForLogserver = false; - private boolean useDedicatedNodesWhenUnspecified = false; public VespaModelTester() { this(new NullConfigModelRegistry()); @@ -99,10 +97,6 @@ public class VespaModelTester { this.useDedicatedNodeForLogserver = useDedicatedNodeForLogserver; } - public void setUseDedicatedNodesWhenUnspecified(boolean useDedicatedNodesWhenUnspecified) { - this.useDedicatedNodesWhenUnspecified = useDedicatedNodesWhenUnspecified; - } - /** Creates a model which uses 0 as start index and fails on out of capacity */ public VespaModel createModel(String services, String ... retiredHostNames) { return createModel(Zone.defaultZone(), services, true, retiredHostNames); @@ -151,8 +145,7 @@ public class VespaModelTester { .setMultitenant(true) .setHostedVespa(hosted) .setApplicationId(applicationId) - .setUseDedicatedNodeForLogserver(useDedicatedNodeForLogserver) - .setUseDedicatedNodesWhenUnspecified(useDedicatedNodesWhenUnspecified); + .setUseDedicatedNodeForLogserver(useDedicatedNodeForLogserver); DeployState deployState = new DeployState.Builder() .applicationPackage(appPkg) diff --git a/container-core/src/main/java/com/yahoo/container/core/config/BundleLoader.java b/container-core/src/main/java/com/yahoo/container/core/config/BundleLoader.java index 4c3d76436dd..ca11ad387ee 100644 --- a/container-core/src/main/java/com/yahoo/container/core/config/BundleLoader.java +++ b/container-core/src/main/java/com/yahoo/container/core/config/BundleLoader.java @@ -148,10 +148,9 @@ public class BundleLoader { /** * Returns the bundles that are not assumed to be retained by the new application generation. - * and cleans up the map of active file references. Note that at this point we don't yet know - * the full set of new bundles, because of the potential pre-install directives in the new bundles. - * However, only "disk bundles" (file:) can be listed in the pre-install directive, so we know - * about all the obsolete application bundles. + * Note that at this point we don't yet know the full set of new bundles, because of the potential + * pre-install directives in the new bundles. However, only "disk bundles" (file:) can be listed + * in the pre-install directive, so we know about all the obsolete application bundles. */ private Set<Bundle> getObsoleteBundles(List<FileReference> newReferences) { Set<Bundle> bundlesToRemove = new HashSet<>(osgi.getCurrentBundles()); @@ -165,6 +164,9 @@ public class BundleLoader { return bundlesToRemove; } + /** + * Cleans up the map of active file references + */ private void removeInactiveFileReferences(List<FileReference> newReferences) { // Clean up the map of active bundles Set<FileReference> fileReferencesToRemove = getObsoleteFileReferences(newReferences); @@ -184,6 +186,7 @@ public class BundleLoader { // The bundle at index 0 for each file reference always corresponds to the bundle at the file reference location Set<Bundle> allowedDuplicates = obsoleteReferences.stream() + .filter(reference -> ! isDiskBundle(reference)) .map(reference -> reference2Bundles.get(reference).get(0)) .collect(Collectors.toSet()); diff --git a/container-search/src/main/java/com/yahoo/search/query/Select.java b/container-search/src/main/java/com/yahoo/search/query/Select.java index cb662dcd671..65ffd29efe0 100644 --- a/container-search/src/main/java/com/yahoo/search/query/Select.java +++ b/container-search/src/main/java/com/yahoo/search/query/Select.java @@ -57,12 +57,13 @@ public class Select implements Cloneable { } public Select(String where, String grouping, Query query) { - this(where, grouping, query, Collections.emptyList()); + this(where, grouping, null, query, Collections.emptyList()); } - private Select(String where, String grouping, Query query, List<GroupingRequest> groupingRequests) { + private Select(String where, String grouping, String groupingExpressionString, Query query, List<GroupingRequest> groupingRequests) { this.where = Objects.requireNonNull(where, "A Select must have a where string (possibly the empty string)"); this.grouping = Objects.requireNonNull(grouping, "A Select must have a select string (possibly the empty string)"); + this.groupingExpressionString = groupingExpressionString; this.parent = Objects.requireNonNull(query, "A Select must have a parent query"); this.groupingRequests = deepCopy(groupingRequests, this); } @@ -136,11 +137,11 @@ public class Select implements Cloneable { @Override public Object clone() { - return new Select(where, grouping, parent, groupingRequests); + return new Select(where, grouping, groupingExpressionString, parent, groupingRequests); } public Select cloneFor(Query parent) { - return new Select(where, grouping, parent, groupingRequests); + return new Select(where, grouping, groupingExpressionString, parent, groupingRequests); } } diff --git a/container-search/src/test/java/com/yahoo/select/SelectTestCase.java b/container-search/src/test/java/com/yahoo/select/SelectTestCase.java index 7b1b4fe6362..1715ed38964 100644 --- a/container-search/src/test/java/com/yahoo/select/SelectTestCase.java +++ b/container-search/src/test/java/com/yahoo/select/SelectTestCase.java @@ -724,6 +724,7 @@ public class SelectTestCase { assertEquals("all(group(time.dayofmonth(a)) each(output(count())))", query.getSelect().getGrouping().get(0).toString()); Query clone = query.clone(); + assertEquals(clone.getSelect().getGroupingExpressionString(), query.getSelect().getGroupingExpressionString()); assertNotSame(query.getSelect(), clone.getSelect()); assertNotSame(query.getSelect().getGrouping(), clone.getSelect().getGrouping()); assertNotSame(query.getSelect().getGrouping().get(0), clone.getSelect().getGrouping().get(0)); @@ -732,8 +733,15 @@ public class SelectTestCase { assertEquals(query.getSelect().getGroupingString(), clone.getSelect().getGroupingString()); assertEquals(query.getSelect().getGrouping().get(0).toString(), clone.getSelect().getGrouping().get(0).toString()); assertEquals(query.getSelect().getGrouping().get(1).toString(), clone.getSelect().getGrouping().get(1).toString()); + } + @Test + public void testCloneWithGroupingExpressionString() { + Query query = new Query(); + query.getSelect().setGroupingExpressionString("all(group(foo) each(output(count())))"); + Query clone = query.clone(); + assertEquals(clone.getSelect().getGroupingExpressionString(), query.getSelect().getGroupingExpressionString()); } //------------------------------------------------------------------- Assert methods diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiHandler.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiHandler.java index 08f22ac778e..d7ad96ec5e7 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiHandler.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiHandler.java @@ -29,7 +29,6 @@ import com.yahoo.slime.Cursor; import com.yahoo.slime.Inspector; import com.yahoo.slime.Slime; import com.yahoo.slime.SlimeUtils; -import com.yahoo.vespa.athenz.api.AthenzPrincipal; import com.yahoo.vespa.hosted.controller.Application; import com.yahoo.vespa.hosted.controller.Controller; import com.yahoo.vespa.hosted.controller.Instance; @@ -106,7 +105,6 @@ import java.time.Duration; import java.time.Instant; import java.time.YearMonth; import java.time.format.DateTimeParseException; -import java.util.ArrayList; import java.util.Arrays; import java.util.Base64; import java.util.Comparator; @@ -1043,12 +1041,8 @@ public class ApplicationApiHandler extends LoggingRequestHandler { // Add zone endpoints var endpointArray = response.setArray("endpoints"); - var serviceUrls = new ArrayList<URI>(); for (var endpoint : controller.routing().endpointsOf(deploymentId)) { toSlime(endpoint, endpoint.name(), endpointArray.addObject()); - if (endpoint.routingMethod() == RoutingMethod.shared) { - serviceUrls.add(endpoint.url()); - } } // Add global endpoints var globalEndpoints = controller.routing().endpointsOf(application, deploymentId.applicationId().instance()) @@ -1058,9 +1052,6 @@ public class ApplicationApiHandler extends LoggingRequestHandler { // TODO(mpolden): Pass cluster name. Cluster that a global endpoint points to is not available at this level. toSlime(endpoint, "", endpointArray.addObject()); } - // TODO(mpolden): Remove this once all clients stop reading it - Cursor serviceUrlArray = response.setArray("serviceUrls"); - serviceUrls.forEach(url -> serviceUrlArray.addString(url.toString())); response.setString("nodes", withPath("/zone/v2/" + deploymentId.zoneId().environment() + "/" + deploymentId.zoneId().region() + "/nodes/v2/node/?&recursive=true&application=" + deploymentId.applicationId().tenant() + "." + deploymentId.applicationId().application() + "." + deploymentId.applicationId().instance(), request.getUri()).toString()); response.setString("yamasUrl", monitoringSystemUri(deploymentId).toString()); diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/ControllerTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/ControllerTest.java index 497a0ddf5a0..351f530f623 100644 --- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/ControllerTest.java +++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/ControllerTest.java @@ -788,6 +788,7 @@ public class ControllerTest { @Test public void testDeployWithRoutingGeneratorEndpoints() { + ((InMemoryFlagSource) tester.controller().flagSource()).withBooleanFlag(Flags.DISABLE_ROUTING_GENERATOR.id(), false); var context = tester.newDeploymentContext(); var applicationPackage = new ApplicationPackageBuilder() .upgradePolicy("default") @@ -804,6 +805,7 @@ public class ControllerTest { List.of(new RoutingEndpoint("http://legacy-endpoint", "hostname", false, "upstreamName"))); } + // Defer load balancer provisioning in all environments so that routing controller uses routing generator context.deferLoadBalancerProvisioningIn(zones.stream().map(ZoneId::environment).collect(Collectors.toSet())) .submit(applicationPackage) diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/responses/deployment-with-routing-policy.json b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/responses/deployment-with-routing-policy.json index cd47859c7cc..63e6e4b3937 100644 --- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/responses/deployment-with-routing-policy.json +++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/responses/deployment-with-routing-policy.json @@ -20,9 +20,6 @@ "routingMethod": "shared" } ], - "serviceUrls": [ - "https://instance1--application1--tenant1.us-west-1.vespa.oath.cloud:4443/" - ], "nodes": "http://localhost:8080/zone/v2/prod/us-west-1/nodes/v2/node/%3F&recursive=true&application=tenant1.application1.instance1", "yamasUrl": "http://monitoring-system.test/?environment=prod®ion=us-west-1&application=tenant1.application1.instance1", "version": "(ignore)", diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/responses/deployment.json b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/responses/deployment.json index 726df575028..928525a20d1 100644 --- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/responses/deployment.json +++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/responses/deployment.json @@ -20,9 +20,6 @@ "routingMethod": "shared" } ], - "serviceUrls": [ - "https://instance1--application1--tenant1.us-central-1.vespa.oath.cloud:4443/" - ], "nodes": "http://localhost:8080/zone/v2/prod/us-central-1/nodes/v2/node/%3F&recursive=true&application=tenant1.application1.instance1", "yamasUrl": "http://monitoring-system.test/?environment=prod®ion=us-central-1&application=tenant1.application1.instance1", "version": "(ignore)", diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/responses/dev-us-east-1.json b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/responses/dev-us-east-1.json index 7c231beb5ed..83fa1983957 100644 --- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/responses/dev-us-east-1.json +++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/responses/dev-us-east-1.json @@ -13,9 +13,6 @@ "routingMethod": "shared" } ], - "serviceUrls": [ - "https://instance1--application1--tenant1.us-east-1.dev.vespa.oath.cloud:4443/" - ], "nodes": "http://localhost:8080/zone/v2/dev/us-east-1/nodes/v2/node/%3F&recursive=true&application=tenant1.application1.instance1", "yamasUrl": "http://monitoring-system.test/?environment=dev®ion=us-east-1&application=tenant1.application1.instance1", "version": "(ignore)", diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/responses/prod-us-central-1.json b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/responses/prod-us-central-1.json index 41f3908f12f..4ffe809297d 100644 --- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/responses/prod-us-central-1.json +++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/responses/prod-us-central-1.json @@ -23,9 +23,6 @@ "routingMethod": "shared" } ], - "serviceUrls": [ - "https://instance1--application1--tenant1.us-central-1.vespa.oath.cloud:4443/" - ], "nodes": "http://localhost:8080/zone/v2/prod/us-central-1/nodes/v2/node/%3F&recursive=true&application=tenant1.application1.instance1", "yamasUrl": "http://monitoring-system.test/?environment=prod®ion=us-central-1&application=tenant1.application1.instance1", "version": "(ignore)", diff --git a/document/src/tests/documentselectparsertest.cpp b/document/src/tests/documentselectparsertest.cpp index 110153954af..9ac402f56ef 100644 --- a/document/src/tests/documentselectparsertest.cpp +++ b/document/src/tests/documentselectparsertest.cpp @@ -18,6 +18,7 @@ #include <vespa/document/select/compare.h> #include <vespa/document/select/operator.h> #include <vespa/document/select/parse_utils.h> +#include <vespa/document/select/parser_limits.h> #include <vespa/vespalib/util/exceptions.h> #include <limits> #include <gtest/gtest.h> @@ -33,6 +34,8 @@ protected: std::vector<Document::SP > _doc; std::vector<DocumentUpdate::SP > _update; + ~DocumentSelectParserTest(); + Document::SP createDoc( const std::string& doctype, const std::string& id, uint32_t hint, double hfloat, const std::string& hstr, const std::string& cstr, @@ -64,6 +67,7 @@ protected: void testDocumentUpdates4(); }; +DocumentSelectParserTest::~DocumentSelectParserTest() = default; namespace { std::shared_ptr<const DocumentTypeRepo> _repo; @@ -1247,17 +1251,17 @@ TEST_F(DocumentSelectParserTest, testThatSimpleFieldValuesHaveCorrectFieldName) TEST_F(DocumentSelectParserTest, testThatComplexFieldValuesHaveCorrectFieldNames) { - EXPECT_EQ( - vespalib::string("headerval"), - parseFieldValue("testdoctype1.headerval{test}")->getRealFieldName()); + EXPECT_EQ(vespalib::string("headerval"), + parseFieldValue("testdoctype1.headerval{test}")->getRealFieldName()); - EXPECT_EQ( - vespalib::string("headerval"), - parseFieldValue("testdoctype1.headerval[42]")->getRealFieldName()); + EXPECT_EQ(vespalib::string("headerval"), + parseFieldValue("testdoctype1.headerval[42]")->getRealFieldName()); - EXPECT_EQ( - vespalib::string("headerval"), - parseFieldValue("testdoctype1.headerval.meow.meow{test}")->getRealFieldName()); + EXPECT_EQ(vespalib::string("headerval"), + parseFieldValue("testdoctype1.headerval.meow.meow{test}")->getRealFieldName()); + + EXPECT_EQ(vespalib::string("headerval"), + parseFieldValue("testdoctype1.headerval .meow.meow{test}")->getRealFieldName()); } namespace { @@ -1603,4 +1607,64 @@ TEST_F(DocumentSelectParserTest, redundant_glob_wildcards_are_collapsed_into_min EXPECT_EQ(GlobOperator::convertToRegex("*?*?*?*"), "..*..*."); // Don't try this at home, kids! } +TEST_F(DocumentSelectParserTest, recursion_depth_is_bounded_for_field_exprs) { + createDocs(); + std::string expr = "testdoctype1"; + for (size_t i = 0; i < 50000; ++i) { + expr += ".foo"; + } + expr += ".hash() != 0"; + verifyFailedParse(expr, "ParsingFailedException: expression is too deeply nested (max 1024 levels)"); +} + +TEST_F(DocumentSelectParserTest, recursion_depth_is_bounded_for_arithmetic_exprs) { + createDocs(); + std::string expr = "1"; + for (size_t i = 0; i < 50000; ++i) { + expr += "+1"; + } + expr += " != 0"; + verifyFailedParse(expr, "ParsingFailedException: expression is too deeply nested (max 1024 levels)"); +} + +TEST_F(DocumentSelectParserTest, recursion_depth_is_bounded_for_binary_logical_exprs) { + createDocs(); + // Also throw in some comparisons to ensure they carry over the max depth. + std::string expr = "1 == 2"; + std::string cmp_subexpr = "3 != 4"; + for (size_t i = 0; i < 10000; ++i) { + expr += (i % 2 == 0 ? " and " : " or ") + cmp_subexpr; + } + verifyFailedParse(expr, "ParsingFailedException: expression is too deeply nested (max 1024 levels)"); +} + +TEST_F(DocumentSelectParserTest, recursion_depth_is_bounded_for_unary_logical_exprs) { + createDocs(); + std::string expr; + for (size_t i = 0; i < 10000; ++i) { + expr += "not "; + } + expr += "true"; + verifyFailedParse(expr, "ParsingFailedException: expression is too deeply nested (max 1024 levels)"); +} + +TEST_F(DocumentSelectParserTest, selection_has_upper_limit_on_input_size) { + createDocs(); + std::string expr = ("testdoctype1.a_biii" + + std::string(select::ParserLimits::MaxSelectionByteSize, 'i') + + "iiig_identifier"); + verifyFailedParse(expr, "ParsingFailedException: expression is too large to be " + "parsed (max 1048576 bytes, got 1048610)"); +} + +TEST_F(DocumentSelectParserTest, lexing_does_not_have_superlinear_time_complexity) { + createDocs(); + std::string expr = ("testdoctype1.hstringval == 'a_biii" + + std::string(select::ParserLimits::MaxSelectionByteSize - 100, 'i') + + "iiig string'"); + // If the lexer is not compiled with the appropriate options, this will take a long time. + // A really, really long time. + PARSE(expr, *_doc[0], False); +} + } // document diff --git a/document/src/vespa/document/select/CMakeLists.txt b/document/src/vespa/document/select/CMakeLists.txt index 81e5d86675c..f210e8abdd7 100644 --- a/document/src/vespa/document/select/CMakeLists.txt +++ b/document/src/vespa/document/select/CMakeLists.txt @@ -36,6 +36,7 @@ vespa_add_library(document_select OBJECT parser.cpp parse_utils.cpp parsing_failed_exception.cpp + parser_limits.cpp ${BISON_DocSelParser_OUTPUTS} ${FLEX_DocSelLexer_OUTPUTS} AFTER diff --git a/document/src/vespa/document/select/branch.cpp b/document/src/vespa/document/select/branch.cpp index b3d5f97ccab..9104e2c5544 100644 --- a/document/src/vespa/document/select/branch.cpp +++ b/document/src/vespa/document/select/branch.cpp @@ -8,7 +8,7 @@ namespace document::select { And::And(std::unique_ptr<Node> left, std::unique_ptr<Node> right, const char* name) - : Branch(name ? name : "and"), + : Branch(name ? name : "and", std::max(left->max_depth(), right->max_depth()) + 1), _left(std::move(left)), _right(std::move(right)) { @@ -54,7 +54,7 @@ And::trace(const Context& context, std::ostream& out) const } Or::Or(std::unique_ptr<Node> left, std::unique_ptr<Node> right, const char* name) - : Branch(name ? name : "or"), + : Branch(name ? name : "or", std::max(left->max_depth(), right->max_depth()) + 1), _left(std::move(left)), _right(std::move(right)) { @@ -100,7 +100,7 @@ Or::trace(const Context& context, std::ostream& out) const } Not::Not(std::unique_ptr<Node> child, const char* name) - : Branch(name ? name : "not"), + : Branch(name ? name : "not", child->max_depth() + 1), _child(std::move(child)) { assert(_child.get()); diff --git a/document/src/vespa/document/select/branch.h b/document/src/vespa/document/select/branch.h index 8637b41de89..77ed74030b5 100644 --- a/document/src/vespa/document/select/branch.h +++ b/document/src/vespa/document/select/branch.h @@ -19,7 +19,8 @@ namespace document::select { class Branch : public Node { public: - Branch(vespalib::stringref name) : Node(name) {} + explicit Branch(vespalib::stringref name) : Node(name) {} + Branch(vespalib::stringref name, uint32_t max_depth) : Node(name, max_depth) {} bool isLeafNode() const override { return false; } }; @@ -30,7 +31,7 @@ class And : public Branch std::unique_ptr<Node> _right; public: And(std::unique_ptr<Node> left, std::unique_ptr<Node> right, - const char* name = 0); + const char* name = nullptr); ResultList contains(const Context& context) const override { return (_left->contains(context) && _right->contains(context)); @@ -53,7 +54,7 @@ class Or : public Branch std::unique_ptr<Node> _right; public: Or(std::unique_ptr<Node> left, std::unique_ptr<Node> right, - const char* name = 0); + const char* name = nullptr); ResultList contains(const Context& context) const override { return (_left->contains(context) || _right->contains(context)); @@ -74,7 +75,7 @@ class Not : public Branch { std::unique_ptr<Node> _child; public: - Not(std::unique_ptr<Node> child, const char* name = 0); + Not(std::unique_ptr<Node> child, const char* name = nullptr); ResultList contains(const Context& context) const override { return !_child->contains(context); } ResultList trace(const Context&, std::ostream& trace) const override; diff --git a/document/src/vespa/document/select/compare.cpp b/document/src/vespa/document/select/compare.cpp index 7db40929a64..caef1bdd250 100644 --- a/document/src/vespa/document/select/compare.cpp +++ b/document/src/vespa/document/select/compare.cpp @@ -15,7 +15,7 @@ Compare::Compare(std::unique_ptr<ValueNode> left, const Operator& op, std::unique_ptr<ValueNode> right, const BucketIdFactory& bucketIdFactory) - : Node("Compare"), + : Node("Compare", std::max(left->max_depth(), right->max_depth()) + 1), _left(std::move(left)), _right(std::move(right)), _operator(op), diff --git a/document/src/vespa/document/select/grammar/lexer.ll b/document/src/vespa/document/select/grammar/lexer.ll index bd011c8ebf6..1222aac02a2 100644 --- a/document/src/vespa/document/select/grammar/lexer.ll +++ b/document/src/vespa/document/select/grammar/lexer.ll @@ -7,6 +7,13 @@ %option noyywrap nounput %option yyclass="document::select::DocSelScanner" + /* Flex lexer must be compiled with batch mode (as opposed to interactive mode) + * or parsing of large tokens appears to trigger superlinear time complexity. + * Also use full, non-compressed lookup tables for maximum performance. + */ +%option batch +%option full + /* Used to track source locations, see https://github.com/bingmann/flex-bison-cpp-example/blob/master/src/scanner.ll */ %{ #define YY_USER_ACTION yyloc->columns(yyleng); diff --git a/document/src/vespa/document/select/node.h b/document/src/vespa/document/select/node.h index 48a64ae63f5..9a3b687d81c 100644 --- a/document/src/vespa/document/select/node.h +++ b/document/src/vespa/document/select/node.h @@ -12,6 +12,7 @@ #include "resultlist.h" #include "context.h" +#include "parser_limits.h" namespace document::select { @@ -21,19 +22,33 @@ class Node : public Printable { protected: vespalib::string _name; + uint32_t _max_depth; bool _parentheses; // Set to true if parentheses was used around this part // Set such that we can recreate original query in print. public: typedef std::unique_ptr<Node> UP; typedef std::shared_ptr<Node> SP; - Node(vespalib::stringref name) : _name(name), _parentheses(false) {} - ~Node() override {} + Node(vespalib::stringref name, uint32_t max_depth) + : _name(name), _max_depth(max_depth), _parentheses(false) + { + throw_parse_error_if_max_depth_exceeded(); + } - void setParentheses() { _parentheses = true; } + explicit Node(vespalib::stringref name) + : _name(name), _max_depth(1), _parentheses(false) + {} + ~Node() override = default; - void clearParentheses() { _parentheses = false; } + // Depth is explicitly tracked to limit recursion to a sane maximum when building and + // processing ASTs, as the Bison framework does not have anything useful for us there. + // The AST is built from the leaves up towards the root, so we can cheaply track depth + // of subtrees in O(1) time per node by computing a node's own depth based on immediate + // children at node construction time. + [[nodiscard]] uint32_t max_depth() const noexcept { return _max_depth; } + void setParentheses() { _parentheses = true; } + void clearParentheses() { _parentheses = false; } bool hadParentheses() const { return _parentheses; } virtual ResultList contains(const Context&) const = 0; @@ -43,6 +58,12 @@ public: virtual Node::UP clone() const = 0; protected: + void throw_parse_error_if_max_depth_exceeded() const { + if (_max_depth > ParserLimits::MaxRecursionDepth) { + throw_max_depth_exceeded_exception(); + } + } + Node::UP wrapParens(Node* node) const { Node::UP ret(node); if (_parentheses) { diff --git a/document/src/vespa/document/select/parser.cpp b/document/src/vespa/document/select/parser.cpp index 9f015409011..fadb46e5aa3 100644 --- a/document/src/vespa/document/select/parser.cpp +++ b/document/src/vespa/document/select/parser.cpp @@ -1,5 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "parser.h" +#include "parser_limits.h" #include "scanner.h" #include <vespa/document/base/exceptions.h> #include <vespa/document/util/stringutil.h> @@ -8,7 +9,20 @@ namespace document::select { +namespace { + +void verify_expression_not_too_large(const std::string& expr) { + if (expr.size() > ParserLimits::MaxSelectionByteSize) { + throw ParsingFailedException(vespalib::make_string( + "expression is too large to be parsed (max %zu bytes, got %zu)", + ParserLimits::MaxSelectionByteSize, expr.size())); + } +} + +} + std::unique_ptr<Node> Parser::parse(const std::string& str) const { + verify_expression_not_too_large(str); try { std::istringstream ss(str); DocSelScanner scanner(&ss); diff --git a/document/src/vespa/document/select/parser_limits.cpp b/document/src/vespa/document/select/parser_limits.cpp new file mode 100644 index 00000000000..13e494b376f --- /dev/null +++ b/document/src/vespa/document/select/parser_limits.cpp @@ -0,0 +1,13 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "parser_limits.h" +#include "parsing_failed_exception.h" +#include <vespa/vespalib/util/stringfmt.h> + +namespace document::select { + +void throw_max_depth_exceeded_exception() { + throw ParsingFailedException(vespalib::make_string( + "expression is too deeply nested (max %u levels)", ParserLimits::MaxRecursionDepth)); +} + +} diff --git a/document/src/vespa/document/select/parser_limits.h b/document/src/vespa/document/select/parser_limits.h new file mode 100644 index 00000000000..24c0a165611 --- /dev/null +++ b/document/src/vespa/document/select/parser_limits.h @@ -0,0 +1,19 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <cstdint> +#include <cstddef> + +namespace document::select { + +// Any resource constraints set for parsing document selection expressions +struct ParserLimits { + // Max depth allowed for nodes in the AST tree. + constexpr static uint32_t MaxRecursionDepth = 1024; + // Max size of entire input document selection string, in bytes. + constexpr static size_t MaxSelectionByteSize = 1024*1024; +}; + +void __attribute__((noinline)) throw_max_depth_exceeded_exception(); + +} diff --git a/document/src/vespa/document/select/valuenode.h b/document/src/vespa/document/select/valuenode.h index 04ed8178b40..8dd535a736a 100644 --- a/document/src/vespa/document/select/valuenode.h +++ b/document/src/vespa/document/select/valuenode.h @@ -5,12 +5,13 @@ * * @brief Node representing a value in the tree * - * @author H�kon Humberset + * @author HÃ¥kon Humberset */ #pragma once #include "value.h" +#include "parser_limits.h" namespace document::select { @@ -22,8 +23,19 @@ class ValueNode : public Printable public: using UP = std::unique_ptr<ValueNode>; - ValueNode() : _parentheses(false) {} - virtual ~ValueNode() {} + explicit ValueNode(uint32_t max_depth) + : _max_depth(max_depth), _parentheses(false) + { + throw_parse_error_if_max_depth_exceeded(); + } + ValueNode() : _max_depth(1), _parentheses(false) {} + ~ValueNode() override = default; + + // See comments for same function in node.h for a description on how and why + // we track this. Since Node and ValueNode live in completely separate type + // hierarchies, this particular bit of code duplication is unfortunate but + // incurs the least cognitive overhead. + [[nodiscard]] uint32_t max_depth() const noexcept { return _max_depth; } void setParentheses() { _parentheses = true; } void clearParentheses() { _parentheses = false; } @@ -34,9 +46,17 @@ public: virtual ValueNode::UP clone() const = 0; virtual std::unique_ptr<Value> traceValue(const Context &context, std::ostream &out) const; private: + uint32_t _max_depth; bool _parentheses; // Set to true if parentheses was used around this part // Set such that we can recreate original query in print. + protected: + void throw_parse_error_if_max_depth_exceeded() const { + if (_max_depth > ParserLimits::MaxRecursionDepth) { + throw_max_depth_exceeded_exception(); + } + } + ValueNode::UP wrapParens(ValueNode* node) const { ValueNode::UP ret(node); if (_parentheses) { diff --git a/document/src/vespa/document/select/valuenodes.cpp b/document/src/vespa/document/select/valuenodes.cpp index 95cf2f4e7e5..026623cf83c 100644 --- a/document/src/vespa/document/select/valuenodes.cpp +++ b/document/src/vespa/document/select/valuenodes.cpp @@ -21,10 +21,6 @@ LOG_SETUP(".document.select.valuenode"); namespace document::select { namespace { - static const std::regex FIELD_NAME_REGEX("^([_A-Za-z][_A-Za-z0-9]*).*"); -} - -namespace { bool documentTypeEqualsName(const DocumentType& type, vespalib::stringref name) { if (type.getName() == name) return true; @@ -40,7 +36,7 @@ namespace { InvalidValueNode::InvalidValueNode(vespalib::stringref name) : _name(name) -{ } +{} void @@ -194,15 +190,33 @@ FieldValueNode::FieldValueNode(const vespalib::string& doctype, FieldValueNode::~FieldValueNode() = default; -vespalib::string -FieldValueNode::extractFieldName(const std::string & fieldExpression) { - std::smatch match; +namespace { - if (std::regex_match(fieldExpression, match, FIELD_NAME_REGEX) && match[1].matched) { - return vespalib::string(match[1].first, match[1].second); +size_t first_ident_length_or_npos(const vespalib::string& expr) { + for (size_t i = 0; i < expr.size(); ++i) { + switch (expr[i]) { + case '.': + case '{': + case '[': + case ' ': + case '\n': + case '\t': + return i; + default: + continue; + } } + return vespalib::string::npos; +} - throw ParsingFailedException("Fatal: could not extract field name from field expression '" + fieldExpression + "'"); +} + +// TODO remove this pile of fun in favor of actually parsed AST nodes...! +vespalib::string +FieldValueNode::extractFieldName(const vespalib::string & fieldExpression) { + // When we get here the actual contents of the field expression shall already + // have been structurally and syntactically verified by the parser. + return fieldExpression.substr(0, first_ident_length_or_npos(fieldExpression)); } namespace { @@ -844,7 +858,8 @@ FunctionValueNode::print(std::ostream& out, bool verbose, ArithmeticValueNode::ArithmeticValueNode( std::unique_ptr<ValueNode> left, vespalib::stringref op, std::unique_ptr<ValueNode> right) - : _operator(), + : ValueNode(std::max(left->max_depth(), right->max_depth()) + 1), + _operator(), _left(std::move(left)), _right(std::move(right)) { diff --git a/document/src/vespa/document/select/valuenodes.h b/document/src/vespa/document/select/valuenodes.h index 8009542c364..a7d5fa15f37 100644 --- a/document/src/vespa/document/select/valuenodes.h +++ b/document/src/vespa/document/select/valuenodes.h @@ -160,7 +160,7 @@ public: FieldValueNode & operator = (const FieldValueNode &) = delete; FieldValueNode(FieldValueNode &&) = default; FieldValueNode & operator = (FieldValueNode &&) = default; - ~FieldValueNode(); + ~FieldValueNode() override; const vespalib::string& getDocType() const { return _doctype; } const vespalib::string& getRealFieldName() const { return _fieldName; } @@ -175,7 +175,7 @@ public: return wrapParens(new FieldValueNode(_doctype, _fieldExpression)); } - static vespalib::string extractFieldName(const std::string & fieldExpression); + static vespalib::string extractFieldName(const vespalib::string & fieldExpression); private: @@ -192,13 +192,15 @@ class FieldExprNode final : public ValueNode { public: explicit FieldExprNode(const vespalib::string& doctype) : _left_expr(), _right_expr(doctype) {} FieldExprNode(std::unique_ptr<FieldExprNode> left_expr, vespalib::stringref right_expr) - : _left_expr(std::move(left_expr)), _right_expr(right_expr) + : ValueNode(left_expr->max_depth() + 1), + _left_expr(std::move(left_expr)), + _right_expr(right_expr) {} FieldExprNode(const FieldExprNode &) = delete; FieldExprNode & operator = (const FieldExprNode &) = delete; FieldExprNode(FieldExprNode &&) = default; FieldExprNode & operator = (FieldExprNode &&) = default; - ~FieldExprNode(); + ~FieldExprNode() override; std::unique_ptr<FieldValueNode> convert_to_field_value() const; std::unique_ptr<FunctionValueNode> convert_to_function_call() const; diff --git a/fbench/src/fbench/fbench.cpp b/fbench/src/fbench/fbench.cpp index c2996eebb5c..88d27a33bd7 100644 --- a/fbench/src/fbench/fbench.cpp +++ b/fbench/src/fbench/fbench.cpp @@ -200,6 +200,28 @@ FBench::StopClients() printf("\nClients Joined.\n"); } +namespace { + +const char * +approx(double latency, const ClientStatus & status) { + return (latency > (status._timetable.size() / status._timetableResolution - 1)) + ? "ms (approx)" + : "ms"; +} + +std::string +fmtPercentile(double percentile) { + char buf[32]; + if (percentile <= 99.0) { + snprintf(buf, sizeof(buf), "%2d ", int(percentile)); + } else { + snprintf(buf, sizeof(buf), "%2.1f", percentile); + } + return buf; +} + +} + void FBench::PrintSummary() { @@ -227,13 +249,6 @@ FBench::PrintSummary() actualRate = (status._realTime > 0) ? realNumClients * 1000.0 * status._requestCnt / status._realTime : 0; - double p25 = status.GetPercentile(25); - double p50 = status.GetPercentile(50); - double p75 = status.GetPercentile(75); - double p90 = status.GetPercentile(90); - double p95 = status.GetPercentile(95); - double p99 = status.GetPercentile(99); - if (_keepAlive) { printf("*** HTTP keep-alive statistics ***\n"); printf("connection reuse count -- %" PRIu64 "\n", status._reuseCnt); @@ -250,24 +265,13 @@ FBench::PrintSummary() printf("minimum response time: %8.2f ms\n", status._minTime); printf("maximum response time: %8.2f ms\n", status._maxTime); printf("average response time: %8.2f ms\n", status.GetAverage()); - if (p25 > status._timetable.size() / status._timetableResolution - 1) - printf("25 percentile: %8.2f ms (approx)\n", p25); - else printf("25 percentile: %8.2f ms\n", p25); - if (p50 > status._timetable.size() / status._timetableResolution - 1) - printf("50 percentile: %8.2f ms (approx)\n", p50); - else printf("50 percentile: %8.2f ms\n", p50); - if (p75 > status._timetable.size() / status._timetableResolution - 1) - printf("75 percentile: %8.2f ms (approx)\n", p75); - else printf("75 percentile: %8.2f ms\n", p75); - if (p90 > status._timetable.size() / status._timetableResolution - 1) - printf("90 percentile: %8.2f ms (approx)\n", p90); - else printf("90 percentile: %8.2f ms\n", p90); - if (p95 > status._timetable.size() / status._timetableResolution - 1) - printf("95 percentile: %8.2f ms (approx)\n", p95); - else printf("95 percentile: %8.2f ms\n", p95); - if (p99 > status._timetable.size() / status._timetableResolution - 1) - printf("99 percentile: %8.2f ms (approx)\n", p99); - else printf("99 percentile: %8.2f ms\n", p99); + + for (double percentile : {25.0, 50.0, 75.0, 90.0, 95.0, 98.0, 99.0, 99.5, 99.6, 99.7, 99.8, 99.9}) { + double latency = status.GetPercentile(percentile); + printf("%s percentile: %8.2f %s\n", + fmtPercentile(percentile).c_str(), latency, approx(latency, status)); + } + printf("actual query rate: %8.2f Q/s\n", actualRate); printf("utilization: %8.2f %%\n", (maxRate > 0) ? 100 * (actualRate / maxRate) : 0); diff --git a/fbench/src/util/clientstatus.cpp b/fbench/src/util/clientstatus.cpp index 9e03068e13c..6ef188da201 100644 --- a/fbench/src/util/clientstatus.cpp +++ b/fbench/src/util/clientstatus.cpp @@ -1,6 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "clientstatus.h" -#include <string.h> +#include <cstring> #include <cmath> @@ -24,9 +24,7 @@ ClientStatus::ClientStatus() { } -ClientStatus::~ClientStatus() -{ -} +ClientStatus::~ClientStatus() = default; void ClientStatus::SetError(const char *errorMsg) diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java index 24b3fcac3e3..5829ab37b77 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java @@ -1,7 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.filedistribution; -import com.google.common.util.concurrent.SettableFuture; import com.yahoo.config.FileReference; import com.yahoo.log.LogLevel; import com.yahoo.vespa.config.ConnectionPool; @@ -13,6 +12,7 @@ import java.time.Duration; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -57,21 +57,14 @@ public class FileDownloader { } } - private Future<Optional<File>> getFutureFile(FileReferenceDownload fileReferenceDownload) { + Future<Optional<File>> getFutureFile(FileReferenceDownload fileReferenceDownload) { FileReference fileReference = fileReferenceDownload.fileReference(); Objects.requireNonNull(fileReference, "file reference cannot be null"); - log.log(LogLevel.DEBUG, () -> "Checking if file reference '" + fileReference.value() + "' exists in '" + - downloadDirectory.getAbsolutePath() + "' "); - Optional<File> file = getFileFromFileSystem(fileReference, downloadDirectory); - if (file.isPresent()) { - SettableFuture<Optional<File>> future = SettableFuture.create(); - future.set(file); - return future; - } else { - log.log(LogLevel.DEBUG, () -> "File reference '" + fileReference.value() + "' not found in " + - downloadDirectory.getAbsolutePath() + ", starting download"); - return download(fileReferenceDownload); - } + + Optional<File> file = getFileFromFileSystem(fileReference); + return (file.isPresent()) + ? CompletableFuture.completedFuture(file) + : download(fileReferenceDownload); } double downloadStatus(FileReference fileReference) { @@ -86,9 +79,10 @@ public class FileDownloader { return downloadDirectory; } - private Optional<File> getFileFromFileSystem(FileReference fileReference, File directory) { - File[] files = new File(directory, fileReference.value()).listFiles(); - if (directory.exists() && directory.isDirectory() && files != null && files.length > 0) { + // Files are moved atomically, so if file reference exists and is accessible we can use it + private Optional<File> getFileFromFileSystem(FileReference fileReference) { + File[] files = new File(downloadDirectory, fileReference.value()).listFiles(); + if (downloadDirectory.exists() && downloadDirectory.isDirectory() && files != null && files.length > 0) { File file = files[0]; if (!file.exists()) { throw new RuntimeException("File reference '" + fileReference.value() + "' does not exist"); @@ -105,33 +99,25 @@ public class FileDownloader { private boolean alreadyDownloaded(FileReference fileReference) { try { - return (getFileFromFileSystem(fileReference, downloadDirectory).isPresent()); + return (getFileFromFileSystem(fileReference).isPresent()); } catch (RuntimeException e) { return false; } } - public boolean downloadIfNeeded(FileReferenceDownload fileReferenceDownload) { - if (!alreadyDownloaded(fileReferenceDownload.fileReference())) { - download(fileReferenceDownload); - return true; - } else { - log.log(LogLevel.DEBUG, () -> "Download not needed, " + fileReferenceDownload.fileReference() + " already downloaded" ); - return false; - } + /** Start a download, don't wait for result */ + public void downloadIfNeeded(FileReferenceDownload fileReferenceDownload) { + FileReference fileReference = fileReferenceDownload.fileReference(); + if (alreadyDownloaded(fileReference)) return; + + download(fileReferenceDownload); } + /** Download, the future returned will be complete()d by receiving method in {@link FileReceiver} */ private synchronized Future<Optional<File>> download(FileReferenceDownload fileReferenceDownload) { FileReference fileReference = fileReferenceDownload.fileReference(); - Future<Optional<File>> inProgress = fileReferenceDownloader.addDownloadListener(fileReference, () -> getFile(fileReferenceDownload)); - if (inProgress != null) { - log.log(LogLevel.DEBUG, () -> "Already downloading '" + fileReference.value() + "'"); - return inProgress; - } - - Future<Optional<File>> future = queueForDownload(fileReferenceDownload); - log.log(LogLevel.DEBUG, () -> "Queued '" + fileReference.value() + "' for download with timeout " + timeout); - return future; + FileReferenceDownload inProgress = fileReferenceDownloader.getDownloadInProgress(fileReference); + return (inProgress == null) ? queueForDownload(fileReferenceDownload) : inProgress.future(); } private Future<Optional<File>> queueForDownload(FileReferenceDownload fileReferenceDownload) { diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java index 4a9fadf1a61..fe501484faf 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java @@ -2,16 +2,16 @@ package com.yahoo.vespa.filedistribution; -import com.google.common.util.concurrent.SettableFuture; import com.yahoo.config.FileReference; import java.io.File; import java.util.Optional; +import java.util.concurrent.CompletableFuture; public class FileReferenceDownload { private final FileReference fileReference; - private final SettableFuture<Optional<File>> future; + private final CompletableFuture<Optional<File>> future; // If a config server wants to download from another config server (because it does not have the // file itself) we set this flag to true to avoid an eternal loop private final boolean downloadFromOtherSourceIfNotFound; @@ -22,7 +22,7 @@ public class FileReferenceDownload { public FileReferenceDownload(FileReference fileReference, boolean downloadFromOtherSourceIfNotFound) { this.fileReference = fileReference; - this.future = SettableFuture.create(); + this.future = new CompletableFuture<>(); this.downloadFromOtherSourceIfNotFound = downloadFromOtherSourceIfNotFound; } @@ -30,7 +30,7 @@ public class FileReferenceDownload { return fileReference; } - SettableFuture<Optional<File>> future() { + CompletableFuture<Optional<File>> future() { return future; } diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java index 66b86866c3e..c4fe257c991 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java @@ -1,8 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.filedistribution; -import com.google.common.collect.ImmutableMap; -import com.google.common.util.concurrent.ListenableFuture; import com.yahoo.concurrent.DaemonThreadFactory; import com.yahoo.config.FileReference; import com.yahoo.jrt.Int32Value; @@ -18,6 +16,7 @@ import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -37,9 +36,12 @@ public class FileReferenceDownloader { private final static Duration rpcTimeout = Duration.ofSeconds(10); private final ExecutorService downloadExecutor = - Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), new DaemonThreadFactory("filereference downloader")); + Executors.newFixedThreadPool(Math.max(8, Runtime.getRuntime().availableProcessors()), + new DaemonThreadFactory("filereference downloader")); private final ConnectionPool connectionPool; + /* Ongoing downloads */ private final Map<FileReference, FileReferenceDownload> downloads = new LinkedHashMap<>(); + /* Status for ongoing and finished downloads */ private final Map<FileReference, Double> downloadStatus = new HashMap<>(); // between 0 and 1 private final Duration downloadTimeout; private final Duration sleepBetweenRetries; @@ -70,7 +72,7 @@ public class FileReferenceDownloader { } if ( !downloadStarted) { - fileReferenceDownload.future().setException(new RuntimeException("Failed getting file reference '" + fileReference.value() + "'")); + fileReferenceDownload.future().completeExceptionally(new RuntimeException("Failed getting file reference '" + fileReference.value() + "'")); synchronized (downloads) { downloads.remove(fileReference); } @@ -93,7 +95,7 @@ public class FileReferenceDownloader { if (download != null) { downloadStatus.put(fileReference, 1.0); downloads.remove(fileReference); - download.future().set(Optional.of(file)); + download.future().complete(Optional.of(file)); } else { log.log(LogLevel.DEBUG, () -> "Received '" + fileReference + "', which was not requested. Can be ignored if happening during upgrades/restarts"); } @@ -140,15 +142,10 @@ public class FileReferenceDownloader { } } - ListenableFuture<Optional<File>> addDownloadListener(FileReference fileReference, Runnable runnable) { + FileReferenceDownload getDownloadInProgress(FileReference fileReference) { synchronized (downloads) { - FileReferenceDownload download = downloads.get(fileReference); - if (download != null) { - download.future().addListener(runnable, downloadExecutor); - return download.future(); - } + return downloads.get(fileReference); } - return null; } private void execute(Request request, Connection connection) { @@ -186,7 +183,7 @@ public class FileReferenceDownloader { Map<FileReference, Double> downloadStatus() { synchronized (downloads) { - return ImmutableMap.copyOf(downloadStatus); + return Map.copyOf(downloadStatus); } } diff --git a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java index d1d12cb07b7..52d8507acea 100644 --- a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java +++ b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java @@ -27,6 +27,10 @@ import java.nio.file.Path; import java.time.Duration; import java.util.Arrays; import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import static com.yahoo.jrt.ErrorCode.CONNECTION; import static org.junit.Assert.assertEquals; @@ -35,6 +39,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; public class FileDownloaderTest { + private static final Duration sleepBetweenRetries = Duration.ofMillis(10); private MockConnection connection; private FileDownloader fileDownloader; @@ -47,7 +52,7 @@ public class FileDownloaderTest { downloadDir = Files.createTempDirectory("filedistribution").toFile(); tempDir = Files.createTempDirectory("download").toFile(); connection = new MockConnection(); - fileDownloader = new FileDownloader(connection, downloadDir, tempDir, Duration.ofSeconds(2), Duration.ofMillis(100)); + fileDownloader = new FileDownloader(connection, downloadDir, tempDir, Duration.ofSeconds(1), sleepBetweenRetries); } catch (IOException e) { e.printStackTrace(); fail(e.getMessage()); @@ -108,7 +113,7 @@ public class FileDownloaderTest { // Receives fileReference, should return and make it available to caller String filename = "abc.jar"; - receiveFile(fileReference, filename, FileReferenceData.Type.file, "some other content"); + receiveFile(fileDownloader, fileReference, filename, FileReferenceData.Type.file, "some other content"); Optional<File> downloadedFile = fileDownloader.getFile(fileReference); assertTrue(downloadedFile.isPresent()); @@ -142,7 +147,7 @@ public class FileDownloaderTest { File tarFile = CompressedFileReference.compress(tempPath.toFile(), Arrays.asList(fooFile, barFile), new File(tempPath.toFile(), filename)); byte[] tarredContent = IOUtils.readFileBytes(tarFile); - receiveFile(fileReference, filename, FileReferenceData.Type.compressed, tarredContent); + receiveFile(fileDownloader, fileReference, filename, FileReferenceData.Type.compressed, tarredContent); Optional<File> downloadedFile = fileDownloader.getFile(fileReference); assertTrue(downloadedFile.isPresent()); @@ -158,7 +163,7 @@ public class FileDownloaderTest { @Test public void getFileWhenConnectionError() throws IOException { - fileDownloader = new FileDownloader(connection, downloadDir, tempDir, Duration.ofSeconds(3), Duration.ofMillis(100)); + fileDownloader = new FileDownloader(connection, downloadDir, tempDir, Duration.ofSeconds(1), sleepBetweenRetries); File downloadDir = fileDownloader.downloadDirectory(); int timesToFail = 2; @@ -175,7 +180,7 @@ public class FileDownloaderTest { // Receives fileReference, should return and make it available to caller String filename = "abc.jar"; - receiveFile(fileReference, filename, FileReferenceData.Type.file, "some other content"); + receiveFile(fileDownloader, fileReference, filename, FileReferenceData.Type.file, "some other content"); Optional<File> downloadedFile = fileDownloader.getFile(fileReference); assertTrue(downloadedFile.isPresent()); File downloadedFileFullPath = new File(fileReferenceFullPath, filename); @@ -189,26 +194,68 @@ public class FileDownloaderTest { } @Test + public void getFileWhenDownloadInProgress() throws IOException, ExecutionException, InterruptedException { + ExecutorService executor = Executors.newFixedThreadPool(2); + String filename = "abc.jar"; + fileDownloader = new FileDownloader(connection, downloadDir, tempDir, Duration.ofSeconds(3), sleepBetweenRetries); + File downloadDir = fileDownloader.downloadDirectory(); + + // Delay response so that we can make a second request while downloading the file from the first request + connection.setResponseHandler(new MockConnection.WaitResponseHandler(Duration.ofSeconds(1))); + + FileReference fileReference = new FileReference("fileReference"); + File fileReferenceFullPath = fileReferenceFullPath(downloadDir, fileReference); + FileReferenceDownload fileReferenceDownload = new FileReferenceDownload(fileReference); + + Future<Future<Optional<File>>> future1 = executor.submit(() -> fileDownloader.getFutureFile(fileReferenceDownload)); + do { + Thread.sleep(10); + } while (! fileDownloader.fileReferenceDownloader().isDownloading(fileReference)); + assertTrue(fileDownloader.fileReferenceDownloader().isDownloading(fileReference)); + + // Request file while download is in progress + Future<Future<Optional<File>>> future2 = executor.submit(() -> fileDownloader.getFutureFile(fileReferenceDownload)); + + // Receive file, will complete downloading and futures + receiveFile(fileDownloader, fileReference, filename, FileReferenceData.Type.file, "some other content"); + + // Check that we got file correctly with first request + Optional<File> downloadedFile = future1.get().get(); + assertTrue(downloadedFile.isPresent()); + File downloadedFileFullPath = new File(fileReferenceFullPath, filename); + assertEquals(downloadedFileFullPath.getAbsolutePath(), downloadedFile.get().getAbsolutePath()); + assertEquals("some other content", IOUtils.readFile(downloadedFile.get())); + + // Check that request done while downloading works + downloadedFile = future2.get().get(); + assertTrue(downloadedFile.isPresent()); + executor.shutdownNow(); + } + + @Test public void setFilesToDownload() throws IOException { Duration timeout = Duration.ofMillis(200); - Duration sleepBetweenRetries = Duration.ofMillis(200); MockConnection connectionPool = new MockConnection(); connectionPool.setResponseHandler(new MockConnection.WaitResponseHandler(timeout.plus(Duration.ofMillis(1000)))); FileDownloader fileDownloader = new FileDownloader(connectionPool, downloadDir, tempDir, timeout, sleepBetweenRetries); FileReference foo = new FileReference("foo"); // Should download since we do not have the file on disk - assertTrue(fileDownloader.downloadIfNeeded(new FileReferenceDownload(foo))); + fileDownloader.downloadIfNeeded(new FileReferenceDownload(foo)); + assertTrue(fileDownloader.fileReferenceDownloader().isDownloading(foo)); + assertFalse(fileDownloader.getFile(foo).isPresent()); // Receive files to simulate download receiveFile(); // Should not download, since file has already been downloaded - assertFalse(fileDownloader.downloadIfNeeded(new FileReferenceDownload(foo))); + fileDownloader.downloadIfNeeded(new FileReferenceDownload(foo)); + // and file should be available + assertTrue(fileDownloader.getFile(foo).isPresent()); } @Test public void receiveFile() throws IOException { FileReference foo = new FileReference("foo"); String filename = "foo.jar"; - receiveFile(foo, filename, FileReferenceData.Type.file, "content"); + receiveFile(fileDownloader, foo, filename, FileReferenceData.Type.file, "content"); File downloadedFile = new File(fileReferenceFullPath(downloadDir, foo), filename); assertEquals("content", IOUtils.readFile(downloadedFile)); } @@ -229,16 +276,19 @@ public class FileDownloaderTest { assertEquals(expectedDownloadStatus, downloadStatus, 0.0001); } - private void receiveFile(FileReference fileReference, String filename, FileReferenceData.Type type, String content) { - receiveFile(fileReference, filename, type, Utf8.toBytes(content)); + private void receiveFile(FileDownloader fileDownloader, FileReference fileReference, String filename, + FileReferenceData.Type type, String content) { + receiveFile(fileDownloader, fileReference, filename, type, Utf8.toBytes(content)); } - private void receiveFile(FileReference fileReference, String filename, FileReferenceData.Type type, byte[] content) { + private void receiveFile(FileDownloader fileDownloader, FileReference fileReference, String filename, + FileReferenceData.Type type, byte[] content) { XXHash64 hasher = XXHashFactory.fastestInstance().hash64(); FileReceiver.Session session = new FileReceiver.Session(downloadDir, tempDir, 1, fileReference, type, filename, content.length); session.addPart(0, content); - session.close(hasher.hash(ByteBuffer.wrap(content), 0)); + File file = session.close(hasher.hash(ByteBuffer.wrap(content), 0)); + fileDownloader.fileReferenceDownloader().completedDownloading(fileReference, file); } private static class MockConnection implements ConnectionPool, com.yahoo.vespa.config.Connection { diff --git a/flags/src/main/java/com/yahoo/vespa/flags/Flags.java b/flags/src/main/java/com/yahoo/vespa/flags/Flags.java index e81bccee593..f392b597e3b 100644 --- a/flags/src/main/java/com/yahoo/vespa/flags/Flags.java +++ b/flags/src/main/java/com/yahoo/vespa/flags/Flags.java @@ -109,12 +109,6 @@ public class Flags { "Takes effect on the next deployment of the application", APPLICATION_ID); - public static final UnboundStringFlag TLS_INSECURE_MIXED_MODE = defineStringFlag( - "tls-insecure-mixed-mode", "tls_client_mixed_server", - "TLS insecure mixed mode. Allowed values: ['plaintext_client_mixed_server', 'tls_client_mixed_server', 'tls_client_tls_server']", - "Takes effect on restart of Docker container", - NODE_TYPE, APPLICATION_ID, HOSTNAME); - public static final UnboundStringFlag TLS_INSECURE_AUTHORIZATION_MODE = defineStringFlag( "tls-insecure-authorization-mode", "log_only", "TLS insecure authorization mode. Allowed values: ['disable', 'log_only', 'enforce']", @@ -200,12 +194,6 @@ public class Flags { "Takes effect on next node agent tick (but does not clear existing failure reports)", HOSTNAME); - public static final UnboundBooleanFlag GENERATE_L4_ROUTING_CONFIG = defineFeatureFlag( - "generate-l4-routing-config", false, - "Whether routing nodes should generate L4 routing config", - "Takes effect immediately", - ZONE_ID, HOSTNAME); - public static final UnboundBooleanFlag USE_REFRESHED_ENDPOINT_CERTIFICATE = defineFeatureFlag( "use-refreshed-endpoint-certificate", false, "Whether an application should start using a newer certificate/key pair if available", @@ -258,13 +246,13 @@ public class Flags { APPLICATION_ID); public static final UnboundBooleanFlag DISABLE_ROUTING_GENERATOR = defineFeatureFlag( - "disable-routing-generator", false, + "disable-routing-generator", true, "Whether the controller should stop asking the routing layer for endpoints", "Takes effect immediately", APPLICATION_ID); public static final UnboundBooleanFlag DEDICATED_NODES_WHEN_UNSPECIFIED = defineFeatureFlag( - "dedicated-nodes-when-unspecified", false, + "dedicated-nodes-when-unspecified", true, "Whether config-server should allocate dedicated container nodes when <nodes/> is not specified in services.xml", "Takes effect on redeploy", APPLICATION_ID); diff --git a/jdisc_http_service/abi-spec.json b/jdisc_http_service/abi-spec.json index bb6285ab94f..c5a0a676a70 100644 --- a/jdisc_http_service/abi-spec.json +++ b/jdisc_http_service/abi-spec.json @@ -42,6 +42,9 @@ "public com.yahoo.jdisc.http.ConnectorConfig$Builder tlsClientAuthEnforcer(com.yahoo.jdisc.http.ConnectorConfig$TlsClientAuthEnforcer$Builder)", "public com.yahoo.jdisc.http.ConnectorConfig$Builder healthCheckProxy(com.yahoo.jdisc.http.ConnectorConfig$HealthCheckProxy$Builder)", "public com.yahoo.jdisc.http.ConnectorConfig$Builder proxyProtocol(com.yahoo.jdisc.http.ConnectorConfig$ProxyProtocol$Builder)", + "public com.yahoo.jdisc.http.ConnectorConfig$Builder secureRedirect(com.yahoo.jdisc.http.ConnectorConfig$SecureRedirect$Builder)", + "public com.yahoo.jdisc.http.ConnectorConfig$Builder maxRequestsPerConnection(int)", + "public com.yahoo.jdisc.http.ConnectorConfig$Builder maxConnectionLife(double)", "public final boolean dispatchGetConfig(com.yahoo.config.ConfigInstance$Producer)", "public final java.lang.String getDefMd5()", "public final java.lang.String getDefName()", @@ -53,7 +56,8 @@ "public com.yahoo.jdisc.http.ConnectorConfig$Ssl$Builder ssl", "public com.yahoo.jdisc.http.ConnectorConfig$TlsClientAuthEnforcer$Builder tlsClientAuthEnforcer", "public com.yahoo.jdisc.http.ConnectorConfig$HealthCheckProxy$Builder healthCheckProxy", - "public com.yahoo.jdisc.http.ConnectorConfig$ProxyProtocol$Builder proxyProtocol" + "public com.yahoo.jdisc.http.ConnectorConfig$ProxyProtocol$Builder proxyProtocol", + "public com.yahoo.jdisc.http.ConnectorConfig$SecureRedirect$Builder secureRedirect" ] }, "com.yahoo.jdisc.http.ConnectorConfig$HealthCheckProxy$Builder": { @@ -133,6 +137,37 @@ ], "fields": [] }, + "com.yahoo.jdisc.http.ConnectorConfig$SecureRedirect$Builder": { + "superClass": "java.lang.Object", + "interfaces": [ + "com.yahoo.config.ConfigBuilder" + ], + "attributes": [ + "public" + ], + "methods": [ + "public void <init>()", + "public void <init>(com.yahoo.jdisc.http.ConnectorConfig$SecureRedirect)", + "public com.yahoo.jdisc.http.ConnectorConfig$SecureRedirect$Builder enabled(boolean)", + "public com.yahoo.jdisc.http.ConnectorConfig$SecureRedirect$Builder port(int)", + "public com.yahoo.jdisc.http.ConnectorConfig$SecureRedirect build()" + ], + "fields": [] + }, + "com.yahoo.jdisc.http.ConnectorConfig$SecureRedirect": { + "superClass": "com.yahoo.config.InnerNode", + "interfaces": [], + "attributes": [ + "public", + "final" + ], + "methods": [ + "public void <init>(com.yahoo.jdisc.http.ConnectorConfig$SecureRedirect$Builder)", + "public boolean enabled()", + "public int port()" + ], + "fields": [] + }, "com.yahoo.jdisc.http.ConnectorConfig$Ssl$Builder": { "superClass": "java.lang.Object", "interfaces": [ @@ -323,7 +358,10 @@ "public com.yahoo.jdisc.http.ConnectorConfig$Ssl ssl()", "public com.yahoo.jdisc.http.ConnectorConfig$TlsClientAuthEnforcer tlsClientAuthEnforcer()", "public com.yahoo.jdisc.http.ConnectorConfig$HealthCheckProxy healthCheckProxy()", - "public com.yahoo.jdisc.http.ConnectorConfig$ProxyProtocol proxyProtocol()" + "public com.yahoo.jdisc.http.ConnectorConfig$ProxyProtocol proxyProtocol()", + "public com.yahoo.jdisc.http.ConnectorConfig$SecureRedirect secureRedirect()", + "public int maxRequestsPerConnection()", + "public double maxConnectionLife()" ], "fields": [ "public static final java.lang.String CONFIG_DEF_MD5", diff --git a/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/HttpRequestDispatch.java b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/HttpRequestDispatch.java index 71dcb7d0682..b9d686c1d6b 100644 --- a/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/HttpRequestDispatch.java +++ b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/HttpRequestDispatch.java @@ -10,6 +10,7 @@ import com.yahoo.jdisc.handler.BindingNotFoundException; import com.yahoo.jdisc.handler.ContentChannel; import com.yahoo.jdisc.handler.OverloadException; import com.yahoo.jdisc.handler.RequestHandler; +import com.yahoo.jdisc.http.ConnectorConfig; import com.yahoo.jdisc.http.HttpHeaders; import com.yahoo.jdisc.http.HttpRequest; import org.eclipse.jetty.io.EofException; @@ -22,6 +23,7 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.time.Instant; import java.util.Arrays; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -34,6 +36,7 @@ import java.util.logging.Logger; import static com.yahoo.jdisc.http.HttpHeaders.Values.APPLICATION_X_WWW_FORM_URLENCODED; import static com.yahoo.jdisc.http.core.HttpServletRequestUtils.getConnection; import static com.yahoo.jdisc.http.server.jetty.Exceptions.throwUnchecked; +import static com.yahoo.jdisc.http.server.jetty.JDiscHttpServlet.getConnector; /** * @author Simon Thoresen Hult @@ -64,14 +67,13 @@ class HttpRequestDispatch { this.jettyRequest = (Request) servletRequest; this.metricReporter = new MetricReporter(jDiscContext.metric, metricContext, jettyRequest.getTimeStamp()); - honourMaxKeepAliveRequests(); this.servletResponseController = new ServletResponseController( servletRequest, servletResponse, jDiscContext.janitor, metricReporter, jDiscContext.developerMode()); - + markConnectionAsNonPersistentIfThresholdReached(servletRequest); this.async = servletRequest.startAsync(); async.setTimeout(0); metricReporter.uriLength(jettyRequest.getOriginalURI().length()); @@ -102,15 +104,6 @@ class HttpRequestDispatch { } } - private void honourMaxKeepAliveRequests() { - if (jDiscContext.serverConfig.maxKeepAliveRequests() > 0) { - HttpConnection connection = getConnection(jettyRequest); - if (connection.getMessagesIn() >= jDiscContext.serverConfig.maxKeepAliveRequests()) { - connection.getGenerator().setPersistent(false); - } - } - } - private BiConsumer<Void, Throwable> completeRequestCallback; { AtomicBoolean completeRequestCalled = new AtomicBoolean(false); @@ -151,6 +144,25 @@ class HttpRequestDispatch { }; } + private static void markConnectionAsNonPersistentIfThresholdReached(HttpServletRequest request) { + ConnectorConfig connectorConfig = getConnector(request).connectorConfig(); + int maxRequestsPerConnection = connectorConfig.maxRequestsPerConnection(); + if (maxRequestsPerConnection > 0) { + HttpConnection connection = getConnection(request); + if (connection.getMessagesIn() >= maxRequestsPerConnection) { + connection.getGenerator().setPersistent(false); + } + } + double maxConnectionLifeInSeconds = connectorConfig.maxConnectionLife(); + if (maxConnectionLifeInSeconds > 0) { + HttpConnection connection = getConnection(request); + Instant expireAt = Instant.ofEpochMilli((long)(connection.getCreatedTimeStamp() + maxConnectionLifeInSeconds * 1000)); + if (Instant.now().isAfter(expireAt)) { + connection.getGenerator().setPersistent(false); + } + } + } + @SafeVarargs @SuppressWarnings("varargs") private static boolean isErrorOfType(Throwable throwable, Class<? extends Throwable>... handledTypes) { diff --git a/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/JDiscHttpServlet.java b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/JDiscHttpServlet.java index cf66af31a79..5cbe7320f0e 100644 --- a/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/JDiscHttpServlet.java +++ b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/JDiscHttpServlet.java @@ -100,6 +100,8 @@ class JDiscHttpServlet extends HttpServlet { } } + + static JDiscServerConnector getConnector(HttpServletRequest request) { return (JDiscServerConnector)getConnection(request).getConnector(); } diff --git a/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/JettyHttpServer.java b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/JettyHttpServer.java index 71284e09669..c5f42ff9cc5 100644 --- a/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/JettyHttpServer.java +++ b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/JettyHttpServer.java @@ -34,9 +34,6 @@ import org.eclipse.jetty.servlet.ServletHolder; import org.eclipse.jetty.util.log.JavaUtilLog; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.thread.QueuedThreadPool; -import org.osgi.framework.BundleContext; -import org.osgi.framework.InvalidSyntaxException; -import org.osgi.framework.ServiceReference; import javax.management.remote.JMXServiceURL; import javax.servlet.DispatcherType; @@ -44,10 +41,8 @@ import java.io.IOException; import java.lang.management.ManagementFactory; import java.net.BindException; import java.net.MalformedURLException; -import java.nio.channels.ServerSocketChannel; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.EnumSet; import java.util.HashMap; import java.util.List; @@ -246,10 +241,13 @@ public class JettyHttpServer extends AbstractServerProvider { servletContextHandler.addServlet(jdiscServlet, "/*"); + List<ConnectorConfig> connectorConfigs = connectors.stream().map(JDiscServerConnector::connectorConfig).collect(toList()); + var secureRedirectHandler = new SecuredRedirectHandler(connectorConfigs); + secureRedirectHandler.setHandler(servletContextHandler); + var proxyHandler = new HealthCheckProxyHandler(connectors); - proxyHandler.setHandler(servletContextHandler); + proxyHandler.setHandler(secureRedirectHandler); - List<ConnectorConfig> connectorConfigs = connectors.stream().map(JDiscServerConnector::connectorConfig).collect(toList()); var authEnforcer = new TlsClientAuthenticationEnforcer(connectorConfigs); authEnforcer.setHandler(proxyHandler); @@ -282,25 +280,6 @@ public class JettyHttpServer extends AbstractServerProvider { return ports.stream().map(Object::toString).collect(Collectors.joining(":")); } - private ServerSocketChannel getChannelFromServiceLayer(int listenPort, BundleContext bundleContext) { - log.log(Level.FINE, "Retrieving channel for port " + listenPort + " from " + bundleContext.getClass().getName()); - Collection<ServiceReference<ServerSocketChannel>> refs; - final String filter = "(port=" + listenPort + ")"; - try { - refs = bundleContext.getServiceReferences(ServerSocketChannel.class, filter); - } catch (InvalidSyntaxException e) { - throw new IllegalStateException("OSGi framework rejected filter " + filter, e); - } - if (refs.isEmpty()) { - return null; - } - if (refs.size() != 1) { - throw new IllegalStateException("Got more than one service reference for " + ServerSocketChannel.class + " port " + listenPort + "."); - } - ServiceReference<ServerSocketChannel> ref = refs.iterator().next(); - return bundleContext.getService(ref); - } - private static ExecutorService newJanitor(ThreadFactory factory) { int threadPoolSize = Runtime.getRuntime().availableProcessors(); log.info("Creating janitor executor with " + threadPoolSize + " threads"); diff --git a/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/SecuredRedirectHandler.java b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/SecuredRedirectHandler.java new file mode 100644 index 00000000000..32c0628186a --- /dev/null +++ b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/SecuredRedirectHandler.java @@ -0,0 +1,52 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jdisc.http.server.jetty; + +import com.yahoo.jdisc.http.ConnectorConfig; +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.handler.HandlerWrapper; +import org.eclipse.jetty.util.URIUtil; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * A secure redirect handler inspired by {@link org.eclipse.jetty.server.handler.SecuredRedirectHandler}. + * + * @author bjorncs + */ +class SecuredRedirectHandler extends HandlerWrapper { + + private final Map<Integer, Integer> redirectMap; + + SecuredRedirectHandler(List<ConnectorConfig> connectorConfigs) { + this.redirectMap = createRedirectMap(connectorConfigs); + } + + @Override + public void handle(String target, Request request, HttpServletRequest servletRequest, HttpServletResponse servletResponse) throws IOException, ServletException { + int localPort = servletRequest.getLocalPort(); + if (!redirectMap.containsKey(localPort)) { + _handler.handle(target, request, servletRequest, servletResponse); + return; + } + servletResponse.setContentLength(0); + servletResponse.sendRedirect( + URIUtil.newURI("https", request.getServerName(), redirectMap.get(localPort), request.getRequestURI(), request.getQueryString())); + request.setHandled(true); + } + + private static Map<Integer, Integer> createRedirectMap(List<ConnectorConfig> connectorConfigs) { + var redirectMap = new HashMap<Integer, Integer>(); + for (ConnectorConfig connectorConfig : connectorConfigs) { + if (connectorConfig.secureRedirect().enabled()) { + redirectMap.put(connectorConfig.listenPort(), connectorConfig.secureRedirect().port()); + } + } + return redirectMap; + } +} diff --git a/jdisc_http_service/src/main/resources/configdefinitions/jdisc.http.connector.def b/jdisc_http_service/src/main/resources/configdefinitions/jdisc.http.connector.def index 8027525521c..fa7ed6657d9 100644 --- a/jdisc_http_service/src/main/resources/configdefinitions/jdisc.http.connector.def +++ b/jdisc_http_service/src/main/resources/configdefinitions/jdisc.http.connector.def @@ -106,3 +106,15 @@ proxyProtocol.enabled bool default=false # Allow https in parallel with proxy protocol proxyProtocol.mixedMode bool default=false + +# Redirect all requests to https port +secureRedirect.enabled bool default=false + +# Target port for redirect +secureRedirect.port int default=443 + +# Maximum number of request per connection before server marks connections as non-persistent. Set to '0' to disable. +maxRequestsPerConnection int default=0 + +# Maximum number of seconds a connection can live before it's marked as non-persistent. Set to '0' to disable. +maxConnectionLife double default=0.0 diff --git a/jdisc_http_service/src/main/resources/configdefinitions/jdisc.http.server.def b/jdisc_http_service/src/main/resources/configdefinitions/jdisc.http.server.def index 0836a080e1f..33f82963243 100644 --- a/jdisc_http_service/src/main/resources/configdefinitions/jdisc.http.server.def +++ b/jdisc_http_service/src/main/resources/configdefinitions/jdisc.http.server.def @@ -7,13 +7,16 @@ developerMode bool default=false # The gzip compression level to use, if compression is enabled in a request. responseCompressionLevel int default=6 -# Whether to enable HTTP keep-alive for requests that support this. +# DEPRECATED - Ignored, no longer in use. httpKeepAliveEnabled bool default=true +# TODO Vespa 8 Remove httpKeepAliveEnabled # Maximum number of request per http connection before server will hangup. # Naming taken from apache http server. # 0 means never hangup. +# DEPRECATED - Ignored, no longer in use. Use similar parameter in connector config instead. maxKeepAliveRequests int default=0 +# TODO Vespa 8 Remove maxKeepAliveRequests # Whether the request body of POSTed forms should be removed (form parameters are available as request parameters). removeRawPostBodyForWwwUrlEncodedPost bool default=false diff --git a/jdisc_http_service/src/test/java/com/yahoo/jdisc/http/server/jetty/HttpServerTest.java b/jdisc_http_service/src/test/java/com/yahoo/jdisc/http/server/jetty/HttpServerTest.java index 6ace9699b42..f2f3fb0ef11 100644 --- a/jdisc_http_service/src/test/java/com/yahoo/jdisc/http/server/jetty/HttpServerTest.java +++ b/jdisc_http_service/src/test/java/com/yahoo/jdisc/http/server/jetty/HttpServerTest.java @@ -484,8 +484,8 @@ public class HttpServerTest { public void requireThatConnectionIsClosedAfterXRequests() throws Exception { final int MAX_KEEPALIVE_REQUESTS = 100; final TestDriver driver = TestDrivers.newConfiguredInstance(new EchoRequestHandler(), - new ServerConfig.Builder().maxKeepAliveRequests(MAX_KEEPALIVE_REQUESTS), - new ConnectorConfig.Builder()); + new ServerConfig.Builder(), + new ConnectorConfig.Builder().maxRequestsPerConnection(MAX_KEEPALIVE_REQUESTS)); for (int i = 0; i < MAX_KEEPALIVE_REQUESTS - 1; i++) { driver.client().get("/status.html") .expectStatusCode(is(OK)) diff --git a/messagebus/src/tests/CMakeLists.txt b/messagebus/src/tests/CMakeLists.txt index cb2a403f55d..e05f732d8b4 100644 --- a/messagebus/src/tests/CMakeLists.txt +++ b/messagebus/src/tests/CMakeLists.txt @@ -9,7 +9,6 @@ add_subdirectory(context) add_subdirectory(emptyreply) add_subdirectory(error) add_subdirectory(identity) -add_subdirectory(loadbalance) add_subdirectory(messagebus) add_subdirectory(messageordering) add_subdirectory(messenger) diff --git a/messagebus/src/tests/error/error.cpp b/messagebus/src/tests/error/error.cpp index 1d8a489a5ed..244efe0bf99 100644 --- a/messagebus/src/tests/error/error.cpp +++ b/messagebus/src/tests/error/error.cpp @@ -18,8 +18,6 @@ using namespace mbus; -TEST_SETUP(Test); - RoutingSpec getRouting() { return RoutingSpec() .addTable(RoutingTableSpec("Simple") @@ -28,10 +26,7 @@ RoutingSpec getRouting() { .addRoute(RouteSpec("test").addHop("pxy").addHop("dst"))); } -int -Test::Main() -{ - TEST_INIT("error_test"); +TEST("error_test") { Slobrok slobrok; TestServer srcNet(Identity("test/src"), getRouting(), slobrok); @@ -51,30 +46,31 @@ Test::Main() ASSERT_TRUE(pxyNet.waitSlobrok("test/dst/session")); for (int i = 0; i < 5; i++) { - ASSERT_TRUE(ss->send(SimpleMessage::UP(new SimpleMessage("test message")), "test").isAccepted()); + ASSERT_TRUE(ss->send(std::make_unique<SimpleMessage>("test message"), "test").isAccepted()); Message::UP msg = pxy.getMessage(); - ASSERT_TRUE(msg.get() != 0); + ASSERT_TRUE(msg); is->forward(std::move(msg)); msg = dst.getMessage(); - ASSERT_TRUE(msg.get() != 0); - Reply::UP reply(new EmptyReply()); + ASSERT_TRUE(msg); + Reply::UP reply = std::make_unique<EmptyReply>(); msg->swapState(*reply); reply->addError(Error(ErrorCode::APP_FATAL_ERROR, "fatality")); ds->reply(std::move(reply)); reply = pxy.getReply(); - ASSERT_TRUE(reply.get() != 0); + ASSERT_TRUE(reply); ASSERT_EQUAL(reply->getNumErrors(), 1u); EXPECT_EQUAL(reply->getError(0).getService(), "test/dst/session"); reply->addError(Error(ErrorCode::APP_FATAL_ERROR, "fatality")); is->forward(std::move(reply)); reply = src.getReply(); - ASSERT_TRUE(reply.get() != 0); + ASSERT_TRUE(reply); ASSERT_EQUAL(reply->getNumErrors(), 2u); EXPECT_EQUAL(reply->getError(0).getService(), "test/dst/session"); EXPECT_EQUAL(reply->getError(1).getService(), "test/pxy/session"); } - TEST_DONE(); } + +TEST_MAIN() { TEST_RUN_ALL(); }
\ No newline at end of file diff --git a/messagebus/src/tests/loadbalance/.gitignore b/messagebus/src/tests/loadbalance/.gitignore deleted file mode 100644 index d1cbb5977f1..00000000000 --- a/messagebus/src/tests/loadbalance/.gitignore +++ /dev/null @@ -1,4 +0,0 @@ -.depend -Makefile -loadbalance_test -messagebus_loadbalance_test_app diff --git a/messagebus/src/tests/loadbalance/CMakeLists.txt b/messagebus/src/tests/loadbalance/CMakeLists.txt deleted file mode 100644 index e249a8284a6..00000000000 --- a/messagebus/src/tests/loadbalance/CMakeLists.txt +++ /dev/null @@ -1,9 +0,0 @@ -# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -vespa_add_executable(messagebus_loadbalance_test_app TEST - SOURCES - loadbalance.cpp - DEPENDS - messagebus_messagebus-test - messagebus -) -vespa_add_test(NAME messagebus_loadbalance_test_app COMMAND messagebus_loadbalance_test_app) diff --git a/messagebus/src/tests/loadbalance/loadbalance.cpp b/messagebus/src/tests/loadbalance/loadbalance.cpp deleted file mode 100644 index 05ea6d78871..00000000000 --- a/messagebus/src/tests/loadbalance/loadbalance.cpp +++ /dev/null @@ -1,88 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include <vespa/vespalib/testkit/testapp.h> -#include <vespa/messagebus/destinationsession.h> -#include <vespa/messagebus/intermediatesession.h> -#include <vespa/messagebus/messagebus.h> -#include <vespa/messagebus/routablequeue.h> -#include <vespa/messagebus/sourcesession.h> -#include <vespa/messagebus/sourcesessionparams.h> -#include <vespa/messagebus/testlib/receptor.h> -#include <vespa/messagebus/routing/routingspec.h> -#include <vespa/messagebus/testlib/simplemessage.h> -#include <vespa/messagebus/testlib/simplereply.h> -#include <vespa/messagebus/testlib/simpleprotocol.h> -#include <vespa/messagebus/testlib/slobrok.h> -#include <vespa/messagebus/testlib/testserver.h> - -using namespace mbus; -using namespace std::chrono_literals; - -struct Handler : public IMessageHandler -{ - DestinationSession::UP session; - uint32_t cnt; - - Handler(MessageBus &mb) : session(), cnt(0) { - session = mb.createDestinationSession("session", true, *this); - } - ~Handler() { - session.reset(); - } - void handleMessage(Message::UP msg) override { - ++cnt; - session->acknowledge(std::move(msg)); - } -}; - -RoutingSpec getRouting() { - return RoutingSpec() - .addTable(RoutingTableSpec("Simple") - .addHop(HopSpec("dst", "test/*/session")) - .addRoute(RouteSpec("test").addHop("dst"))); -} - -TEST_SETUP(Test); - -int -Test::Main() -{ - TEST_INIT("loadbalance_test"); - - Slobrok slobrok; - TestServer src(Identity(""), getRouting(), slobrok); - TestServer dst1(Identity("test/dst1"), getRouting(), slobrok); - TestServer dst2(Identity("test/dst2"), getRouting(), slobrok); - TestServer dst3(Identity("test/dst3"), getRouting(), slobrok); - - Handler h1(dst1.mb); - Handler h2(dst2.mb); - Handler h3(dst3.mb); - - ASSERT_TRUE(src.waitSlobrok("test/dst1/session")); - ASSERT_TRUE(src.waitSlobrok("test/dst2/session")); - ASSERT_TRUE(src.waitSlobrok("test/dst3/session")); - - RoutableQueue queue; - SourceSessionParams params; - params.setTimeout(30s); - params.setThrottlePolicy(IThrottlePolicy::SP()); - SourceSession::UP ss = src.mb.createSourceSession(queue, params); - - uint32_t msgCnt = 90; - ASSERT_TRUE(msgCnt % 3 == 0); - for (uint32_t i = 0; i < msgCnt; ++i) { - ss->send(Message::UP(new SimpleMessage("test")), "test"); - } - for (uint32_t i = 0; i < 1000; ++i) { - if (queue.size() == msgCnt) { - break; - } - std::this_thread::sleep_for(10ms); - } - EXPECT_TRUE(queue.size() == msgCnt); - EXPECT_TRUE(h1.cnt == msgCnt / 3); - EXPECT_TRUE(h2.cnt == msgCnt / 3); - EXPECT_TRUE(h3.cnt == msgCnt / 3); - TEST_DONE(); -} diff --git a/messagebus/src/tests/messagebus/messagebus.cpp b/messagebus/src/tests/messagebus/messagebus.cpp index 86c7bf91f2a..367bfc997d0 100644 --- a/messagebus/src/tests/messagebus/messagebus.cpp +++ b/messagebus/src/tests/messagebus/messagebus.cpp @@ -112,13 +112,10 @@ public: Test(); ~Test(); int Main() override; - void testSendToAny(); void testSendToCol(); - void testSendToAnyThenCol(); void testDirectHop(); void testDirectRoute(); void testRoutingPolicyCache(); - void debugTrace(); private: void setup(); @@ -131,21 +128,18 @@ private: TEST_APPHOOK(Test); -Test::Test() {} -Test::~Test() {} +Test::Test() = default; +Test::~Test() = default; int Test::Main() { TEST_INIT("messagebus_test"); - testSendToAny(); TEST_FLUSH(); testSendToCol(); TEST_FLUSH(); - testSendToAnyThenCol(); TEST_FLUSH(); testDirectHop(); TEST_FLUSH(); testDirectRoute(); TEST_FLUSH(); testRoutingPolicyCache(); TEST_FLUSH(); - debugTrace(); TEST_FLUSH(); TEST_DONE(); } @@ -206,38 +200,6 @@ void Test::teardown() } void -Test::testSendToAny() -{ - setup(); - for (uint32_t i = 0; i < 300; ++i) { - Message::UP msg(new SimpleMessage("test")); - EXPECT_TRUE(client->session->send(std::move(msg), "DocProc").isAccepted()); - } - EXPECT_TRUE(dp0->waitQueueSize(100)); - EXPECT_TRUE(dp1->waitQueueSize(100)); - EXPECT_TRUE(dp2->waitQueueSize(100)); - for (uint32_t i = 0; i < dpVec.size(); ++i) { - DocProc *p = dpVec[i]; - while (p->queue.size() > 0) { - Routable::UP msg = p->queue.dequeue(); - ASSERT_TRUE(msg); - Reply::UP reply(new EmptyReply()); - msg->swapState(*reply); - reply->addError(Error(ErrorCode::FATAL_ERROR, "")); - p->session->forward(std::move(reply)); - } - } - EXPECT_TRUE(client->waitQueueSize(300)); - while (client->queue.size() > 0) { - Routable::UP reply = client->queue.dequeue(); - ASSERT_TRUE(reply); - ASSERT_TRUE(reply->isReply()); - EXPECT_TRUE(static_cast<Reply&>(*reply).getNumErrors() == 1); - } - teardown(); -} - -void Test::testSendToCol() { setup(); @@ -282,83 +244,6 @@ Test::testSendToCol() } void -Test::testSendToAnyThenCol() -{ - setup(); - ASSERT_TRUE(SimpleMessage("msg").getHash() % 2 == 0); - for (uint32_t i = 0; i < 150; ++i) { - Message::UP msg(new SimpleMessage("msg")); - EXPECT_TRUE(client->session->send(std::move(msg), "Index").isAccepted()); - } - EXPECT_TRUE(dp0->waitQueueSize(50)); - EXPECT_TRUE(dp1->waitQueueSize(50)); - EXPECT_TRUE(dp2->waitQueueSize(50)); - for (uint32_t i = 0; i < dpVec.size(); ++i) { - DocProc *p = dpVec[i]; - while (p->queue.size() > 0) { - Routable::UP r = p->queue.dequeue(); - ASSERT_TRUE(r); - p->session->forward(std::move(r)); - } - } - EXPECT_TRUE(search00->waitQueueSize(150)); - EXPECT_TRUE(search01->waitQueueSize(0)); - EXPECT_TRUE(search10->waitQueueSize(150)); - EXPECT_TRUE(search11->waitQueueSize(0)); - ASSERT_TRUE(SimpleMessage("msh").getHash() % 2 == 1); - for (uint32_t i = 0; i < 150; ++i) { - Message::UP msg(new SimpleMessage("msh")); - ASSERT_TRUE(client->session->send(std::move(msg), "Index").isAccepted()); - } - EXPECT_TRUE(dp0->waitQueueSize(50)); - EXPECT_TRUE(dp1->waitQueueSize(50)); - EXPECT_TRUE(dp2->waitQueueSize(50)); - for (uint32_t i = 0; i < dpVec.size(); ++i) { - DocProc *p = dpVec[i]; - while (p->queue.size() > 0) { - Routable::UP r = p->queue.dequeue(); - ASSERT_TRUE(r); - p->session->forward(std::move(r)); - } - } - EXPECT_TRUE(search00->waitQueueSize(150)); - EXPECT_TRUE(search01->waitQueueSize(150)); - EXPECT_TRUE(search10->waitQueueSize(150)); - EXPECT_TRUE(search11->waitQueueSize(150)); - for (uint32_t i = 0; i < searchVec.size(); ++i) { - Search *s = searchVec[i]; - while (s->queue.size() > 0) { - Routable::UP msg = s->queue.dequeue(); - ASSERT_TRUE(msg); - Reply::UP reply(new EmptyReply()); - msg->swapState(*reply); - s->session->reply(std::move(reply)); - } - } - EXPECT_TRUE(dp0->waitQueueSize(100)); - EXPECT_TRUE(dp1->waitQueueSize(100)); - EXPECT_TRUE(dp2->waitQueueSize(100)); - for (uint32_t i = 0; i < dpVec.size(); ++i) { - DocProc *p = dpVec[i]; - while (p->queue.size() > 0) { - Routable::UP r = p->queue.dequeue(); - ASSERT_TRUE(r); - p->session->forward(std::move(r)); - } - } - client->waitQueueSize(300); - std::this_thread::sleep_for(100ms); - client->waitQueueSize(300); - while (client->queue.size() > 0) { - Routable::UP reply = client->queue.dequeue(); - ASSERT_TRUE(reply); - ASSERT_TRUE(reply->isReply()); - EXPECT_TRUE(static_cast<Reply&>(*reply).getNumErrors() == 0); - } - teardown(); -} - -void Test::testDirectHop() { setup(); @@ -468,65 +353,3 @@ Test::testRoutingPolicyCache() teardown(); } - -void -Test::debugTrace() -{ - setup(); - ASSERT_TRUE(SimpleMessage("msg").getHash() % 2 == 0); - for (uint32_t i = 0; i < 3; ++i) { - Message::UP msg(new SimpleMessage("msg")); - msg->getTrace().setLevel(4 + i); - EXPECT_TRUE(client->session->send(std::move(msg), "Index").isAccepted()); - } - EXPECT_TRUE(dp0->waitQueueSize(1)); - EXPECT_TRUE(dp1->waitQueueSize(1)); - EXPECT_TRUE(dp2->waitQueueSize(1)); - for (uint32_t i = 0; i < dpVec.size(); ++i) { - DocProc *p = dpVec[i]; - while (p->queue.size() > 0) { - Routable::UP r = p->queue.dequeue(); - ASSERT_TRUE(r); - p->session->forward(std::move(r)); - } - } - EXPECT_TRUE(search00->waitQueueSize(3)); - EXPECT_TRUE(search01->waitQueueSize(0)); - EXPECT_TRUE(search10->waitQueueSize(3)); - EXPECT_TRUE(search11->waitQueueSize(0)); - for (uint32_t i = 0; i < searchVec.size(); ++i) { - Search *s = searchVec[i]; - while (s->queue.size() > 0) { - Routable::UP msg = s->queue.dequeue(); - ASSERT_TRUE(msg); - Reply::UP reply(new EmptyReply()); - msg->swapState(*reply); - s->session->reply(std::move(reply)); - } - } - EXPECT_TRUE(dp0->waitQueueSize(1)); - EXPECT_TRUE(dp1->waitQueueSize(1)); - EXPECT_TRUE(dp2->waitQueueSize(1)); - for (uint32_t i = 0; i < dpVec.size(); ++i) { - DocProc *p = dpVec[i]; - while (p->queue.size() > 0) { - Routable::UP r = p->queue.dequeue(); - ASSERT_TRUE(r); - p->session->forward(std::move(r)); - } - } - client->waitQueueSize(3); - Routable::UP reply = client->queue.dequeue(); - fprintf(stderr, "\nTRACE DUMP(level=%d):\n%s\n\n", - reply->getTrace().getLevel(), - reply->getTrace().toString().c_str()); - reply = client->queue.dequeue(); - fprintf(stderr, "\nTRACE DUMP(level=%d):\n%s\n\n", - reply->getTrace().getLevel(), - reply->getTrace().toString().c_str()); - reply = client->queue.dequeue(); - fprintf(stderr, "\nTRACE DUMP(level=%d):\n%s\n\n", - reply->getTrace().getLevel(), - reply->getTrace().toString().c_str()); - teardown(); -} diff --git a/messagebus/src/tests/serviceaddress/serviceaddress.cpp b/messagebus/src/tests/serviceaddress/serviceaddress.cpp index 441da5a80ac..c5d07bc437a 100644 --- a/messagebus/src/tests/serviceaddress/serviceaddress.cpp +++ b/messagebus/src/tests/serviceaddress/serviceaddress.cpp @@ -7,34 +7,49 @@ using namespace mbus; -class Test : public vespalib::TestApp { -public: - int Main() override; - void testAddrServiceAddress(); - void testNameServiceAddress(); - -private: - bool waitSlobrok(RPCNetwork &network, const string &pattern, size_t num); - bool testAddress(RPCNetwork& network, const string &pattern, - const string &expectedSpec, const string &expectedSession); - bool testNullAddress(RPCNetwork &network, const string &pattern); -}; - -int -Test::Main() +bool +waitSlobrok(RPCNetwork &network, const string &pattern, size_t num) { - TEST_INIT("serviceaddress_test"); - - testAddrServiceAddress(); TEST_FLUSH(); - testNameServiceAddress(); TEST_FLUSH(); + for (int i = 0; i < 1000; i++) { + slobrok::api::IMirrorAPI::SpecList res = network.getMirror().lookup(pattern); + if (res.size() == num) { + return true; + } + std::this_thread::sleep_for(10ms); + } + return false; +} - TEST_DONE(); +bool +testNullAddress(RPCNetwork &network, const string &pattern) +{ + RPCService service(network.getMirror(), pattern); + RPCServiceAddress::UP obj = service.make_address(); + if ( ! EXPECT_FALSE(obj)) { + return false; + } + return true; } -TEST_APPHOOK(Test); +bool +testAddress(RPCNetwork &network, const string &pattern, + const string &expectedSpec, const string &expectedSession) +{ + RPCService service(network.getMirror(), pattern); + RPCServiceAddress::UP obj = service.make_address(); + if (!EXPECT_TRUE(obj)) { + return false; + } + if (!EXPECT_EQUAL(expectedSpec, obj->getConnectionSpec())) { + return false; + } + if (!EXPECT_EQUAL(expectedSession, obj->getSessionName())) { + return false; + } + return true; +} -void -Test::testAddrServiceAddress() +TEST("testAddrServiceAddress") { Slobrok slobrok; RPCNetwork network(RPCNetworkParams(slobrok.config()) @@ -55,8 +70,7 @@ Test::testAddrServiceAddress() network.shutdown(); } -void -Test::testNameServiceAddress() +TEST("testNameServiceAddress") { Slobrok slobrok; RPCNetwork network(RPCNetworkParams(slobrok.config()) @@ -74,45 +88,4 @@ Test::testNameServiceAddress() network.shutdown(); } -bool -Test::waitSlobrok(RPCNetwork &network, const string &pattern, size_t num) -{ - for (int i = 0; i < 1000; i++) { - slobrok::api::IMirrorAPI::SpecList res = network.getMirror().lookup(pattern); - if (res.size() == num) { - return true; - } - std::this_thread::sleep_for(10ms); - } - return false; -} - -bool -Test::testNullAddress(RPCNetwork &network, const string &pattern) -{ - RPCService service(network.getMirror(), pattern); - RPCServiceAddress::UP obj = service.resolve(); - if (!EXPECT_TRUE(obj.get() == NULL)) { - return false; - } - return true; -} - -bool -Test::testAddress(RPCNetwork &network, const string &pattern, - const string &expectedSpec, const string &expectedSession) -{ - RPCService service(network.getMirror(), pattern); - RPCServiceAddress::UP obj = service.resolve(); - if (!EXPECT_TRUE(obj.get() != NULL)) { - return false; - } - if (!EXPECT_EQUAL(expectedSpec, obj->getConnectionSpec())) { - return false; - } - if (!EXPECT_EQUAL(expectedSession, obj->getSessionName())) { - return false; - } - return true; -} - +TEST_MAIN() { TEST_RUN_ALL(); }
\ No newline at end of file diff --git a/messagebus/src/tests/servicepool/servicepool.cpp b/messagebus/src/tests/servicepool/servicepool.cpp index 1334831c30c..58fc76f6b50 100644 --- a/messagebus/src/tests/servicepool/servicepool.cpp +++ b/messagebus/src/tests/servicepool/servicepool.cpp @@ -4,67 +4,59 @@ #include <vespa/messagebus/network/rpcnetworkparams.h> #include <vespa/messagebus/network/rpcservicepool.h> #include <vespa/messagebus/testlib/slobrok.h> +#include <vespa/messagebus/testlib/testserver.h> #include <vespa/vespalib/testkit/testapp.h> using namespace mbus; -class Test : public vespalib::TestApp { -private: - void testMaxSize(); - -public: - int Main() override { - TEST_INIT("servicepool_test"); - - testMaxSize(); TEST_FLUSH(); - - TEST_DONE(); - } -}; - -TEST_APPHOOK(Test); - -void -Test::testMaxSize() +TEST("testMaxSize") { Slobrok slobrok; - RPCNetwork net(RPCNetworkParams(slobrok.config())); - RPCServicePool pool(net, 2); - net.start(); - - pool.resolve("foo"); + TestServer me(Identity("me"), RoutingSpec(), slobrok); + RPCNetwork & net = me.net; + net.registerSession("foo"); + net.registerSession("bar"); + net.registerSession("baz"); + me.waitSlobrok("me/foo"); + me.waitSlobrok("me/bar"); + me.waitSlobrok("me/baz"); + RPCServicePool pool(net.getMirror(), 2); + + RPCServiceAddress::UP addr = pool.resolve("me/foo"); EXPECT_EQUAL(1u, pool.getSize()); - EXPECT_TRUE(pool.hasService("foo")); - EXPECT_TRUE(!pool.hasService("bar")); - EXPECT_TRUE(!pool.hasService("baz")); + EXPECT_TRUE(pool.hasService("me/foo")); + EXPECT_TRUE(!pool.hasService("me/bar")); + EXPECT_TRUE(!pool.hasService("me/baz")); - pool.resolve("foo"); + addr = pool.resolve("me/foo"); EXPECT_EQUAL(1u, pool.getSize()); - EXPECT_TRUE(pool.hasService("foo")); - EXPECT_TRUE(!pool.hasService("bar")); - EXPECT_TRUE(!pool.hasService("baz")); + EXPECT_TRUE(pool.hasService("me/foo")); + EXPECT_TRUE(!pool.hasService("me/bar")); + EXPECT_TRUE(!pool.hasService("me/baz")); - pool.resolve("bar"); + addr = pool.resolve("me/bar"); EXPECT_EQUAL(2u, pool.getSize()); - EXPECT_TRUE(pool.hasService("foo")); - EXPECT_TRUE(pool.hasService("bar")); - EXPECT_TRUE(!pool.hasService("baz")); + EXPECT_TRUE(pool.hasService("me/foo")); + EXPECT_TRUE(pool.hasService("me/bar")); + EXPECT_TRUE(!pool.hasService("me/baz")); - pool.resolve("baz"); + addr = pool.resolve("me/baz"); EXPECT_EQUAL(2u, pool.getSize()); - EXPECT_TRUE(!pool.hasService("foo")); - EXPECT_TRUE(pool.hasService("bar")); - EXPECT_TRUE(pool.hasService("baz")); + EXPECT_TRUE(!pool.hasService("me/foo")); + EXPECT_TRUE(pool.hasService("me/bar")); + EXPECT_TRUE(pool.hasService("me/baz")); - pool.resolve("bar"); + addr = pool.resolve("me/bar"); EXPECT_EQUAL(2u, pool.getSize()); - EXPECT_TRUE(!pool.hasService("foo")); - EXPECT_TRUE(pool.hasService("bar")); - EXPECT_TRUE(pool.hasService("baz")); + EXPECT_TRUE(!pool.hasService("me/foo")); + EXPECT_TRUE(pool.hasService("me/bar")); + EXPECT_TRUE(pool.hasService("me/baz")); - pool.resolve("foo"); + addr = pool.resolve("me/foo"); EXPECT_EQUAL(2u, pool.getSize()); - EXPECT_TRUE(pool.hasService("foo")); - EXPECT_TRUE(pool.hasService("bar")); - EXPECT_TRUE(!pool.hasService("baz")); + EXPECT_TRUE(pool.hasService("me/foo")); + EXPECT_TRUE(pool.hasService("me/bar")); + EXPECT_TRUE(!pool.hasService("me/baz")); } + +TEST_MAIN() { TEST_RUN_ALL(); }
\ No newline at end of file diff --git a/messagebus/src/tests/shutdown/shutdown.cpp b/messagebus/src/tests/shutdown/shutdown.cpp index e415622707f..07d9f0fae5d 100644 --- a/messagebus/src/tests/shutdown/shutdown.cpp +++ b/messagebus/src/tests/shutdown/shutdown.cpp @@ -12,30 +12,9 @@ using namespace mbus; -class Test : public vespalib::TestApp { -private: - void requireThatListenFailedIsExceptionSafe(); - void requireThatShutdownOnSourceWithPendingIsSafe(); - void requireThatShutdownOnIntermediateWithPendingIsSafe(); - -public: - int Main() override { - TEST_INIT("shutdown_test"); - - requireThatListenFailedIsExceptionSafe(); TEST_FLUSH(); - requireThatShutdownOnSourceWithPendingIsSafe(); TEST_FLUSH(); - requireThatShutdownOnIntermediateWithPendingIsSafe(); TEST_FLUSH(); - - TEST_DONE(); - } -}; - static const duration TIMEOUT = 120s; -TEST_APPHOOK(Test); - -void -Test::requireThatListenFailedIsExceptionSafe() +TEST("requireThatListenFailedIsExceptionSafe") { fnet::frt::StandaloneFRT orb; ASSERT_TRUE(orb.supervisor().Listen(0)); @@ -51,8 +30,7 @@ Test::requireThatListenFailedIsExceptionSafe() } } -void -Test::requireThatShutdownOnSourceWithPendingIsSafe() +TEST("requireThatShutdownOnSourceWithPendingIsSafe") { Slobrok slobrok; TestServer dstServer(MessageBusParams() @@ -87,8 +65,7 @@ Test::requireThatShutdownOnSourceWithPendingIsSafe() } } -void -Test::requireThatShutdownOnIntermediateWithPendingIsSafe() +TEST("requireThatShutdownOnIntermediateWithPendingIsSafe") { Slobrok slobrok; TestServer dstServer(MessageBusParams() @@ -114,7 +91,7 @@ Test::requireThatShutdownOnIntermediateWithPendingIsSafe() ASSERT_TRUE(srcServer.waitSlobrok("dst/session", 1)); for (uint32_t i = 0; i < 10; ++i) { - Message::UP msg(new SimpleMessage("msg")); + Message::UP msg = std::make_unique<SimpleMessage>("msg"); { TestServer itrServer(MessageBusParams() .setRetryPolicy(std::make_shared<RetryTransientErrorsPolicy>()) @@ -141,3 +118,5 @@ Test::requireThatShutdownOnIntermediateWithPendingIsSafe() dstServer.mb.sync(); } } + +TEST_MAIN() { TEST_RUN_ALL(); }
\ No newline at end of file diff --git a/messagebus/src/tests/targetpool/targetpool.cpp b/messagebus/src/tests/targetpool/targetpool.cpp index 0e0e566f2be..9259f992d6c 100644 --- a/messagebus/src/tests/targetpool/targetpool.cpp +++ b/messagebus/src/tests/targetpool/targetpool.cpp @@ -22,12 +22,7 @@ public: } }; -TEST_SETUP(Test); - -int -Test::Main() -{ - TEST_INIT("targetpool_test"); +TEST("targetpool_test") { // Necessary setup to be able to resolve targets. Slobrok slobrok; @@ -46,9 +41,9 @@ Test::Main() // Assert that all connections expire. RPCTarget::SP target; - ASSERT_TRUE((target = pool.getTarget(orb, adr1)).get() != NULL); target.reset(); - ASSERT_TRUE((target = pool.getTarget(orb, adr2)).get() != NULL); target.reset(); - ASSERT_TRUE((target = pool.getTarget(orb, adr3)).get() != NULL); target.reset(); + ASSERT_TRUE((target = pool.getTarget(orb, adr1))); target.reset(); + ASSERT_TRUE((target = pool.getTarget(orb, adr2))); target.reset(); + ASSERT_TRUE((target = pool.getTarget(orb, adr3))); target.reset(); EXPECT_EQUAL(3u, pool.size()); for (uint32_t i = 0; i < 10; ++i) { pool.flushTargets(false); @@ -59,19 +54,19 @@ Test::Main() EXPECT_EQUAL(0u, pool.size()); // Assert that only idle connections expire. - ASSERT_TRUE((target = pool.getTarget(orb, adr1)).get() != NULL); target.reset(); - ASSERT_TRUE((target = pool.getTarget(orb, adr2)).get() != NULL); target.reset(); - ASSERT_TRUE((target = pool.getTarget(orb, adr3)).get() != NULL); target.reset(); + ASSERT_TRUE((target = pool.getTarget(orb, adr1))); target.reset(); + ASSERT_TRUE((target = pool.getTarget(orb, adr2))); target.reset(); + ASSERT_TRUE((target = pool.getTarget(orb, adr3))); target.reset(); EXPECT_EQUAL(3u, pool.size()); timer.millis += 444; pool.flushTargets(false); EXPECT_EQUAL(3u, pool.size()); - ASSERT_TRUE((target = pool.getTarget(orb, adr2)).get() != NULL); target.reset(); - ASSERT_TRUE((target = pool.getTarget(orb, adr3)).get() != NULL); target.reset(); + ASSERT_TRUE((target = pool.getTarget(orb, adr2))); target.reset(); + ASSERT_TRUE((target = pool.getTarget(orb, adr3))); target.reset(); timer.millis += 444; pool.flushTargets(false); EXPECT_EQUAL(2u, pool.size()); - ASSERT_TRUE((target = pool.getTarget(orb, adr3)).get() != NULL); target.reset(); + ASSERT_TRUE((target = pool.getTarget(orb, adr3))); target.reset(); timer.millis += 444; pool.flushTargets(false); EXPECT_EQUAL(1u, pool.size()); @@ -80,7 +75,7 @@ Test::Main() EXPECT_EQUAL(0u, pool.size()); // Assert that connections never expire while they are referenced. - ASSERT_TRUE((target = pool.getTarget(orb, adr1)).get() != NULL); + ASSERT_TRUE((target = pool.getTarget(orb, adr1))); EXPECT_EQUAL(1u, pool.size()); for (int i = 0; i < 10; ++i) { timer.millis += 999; @@ -91,6 +86,6 @@ Test::Main() timer.millis += 999; pool.flushTargets(false); EXPECT_EQUAL(0u, pool.size()); - - TEST_DONE(); } + +TEST_MAIN() { TEST_RUN_ALL(); }
\ No newline at end of file diff --git a/messagebus/src/vespa/messagebus/callstack.cpp b/messagebus/src/vespa/messagebus/callstack.cpp index b7179e14cad..ab22f1ace34 100644 --- a/messagebus/src/vespa/messagebus/callstack.cpp +++ b/messagebus/src/vespa/messagebus/callstack.cpp @@ -20,7 +20,7 @@ CallStack::discard() } } -CallStack::~CallStack() { } +CallStack::~CallStack() = default; IReplyHandler & CallStack::pop(Reply &reply) diff --git a/messagebus/src/vespa/messagebus/messenger.cpp b/messagebus/src/vespa/messagebus/messenger.cpp index 5313c4adcbb..36211d8ec38 100644 --- a/messagebus/src/vespa/messagebus/messenger.cpp +++ b/messagebus/src/vespa/messagebus/messenger.cpp @@ -246,13 +246,13 @@ Messenger::start() void Messenger::deliverMessage(Message::UP msg, IMessageHandler &handler) { - enqueue(std::make_unique<MessageTask>(std::move(msg), handler)); + handler.handleMessage(std::move(msg)); } void Messenger::deliverReply(Reply::UP reply, IReplyHandler &handler) { - enqueue(std::make_unique<ReplyTask>(std::move(reply), handler)); + handler.handleReply(std::move(reply)); } void diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp index 4b498c4c014..faa67b9bece 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp @@ -17,6 +17,8 @@ #include <vespa/fnet/scheduler.h> #include <vespa/fnet/transport.h> #include <vespa/fnet/frt/supervisor.h> +#include <vespa/vespalib/util/singleexecutor.h> +#include <vespa/vespalib/util/lambdatask.h> #include <vespa/fastos/thread.h> #include <thread> @@ -26,6 +28,8 @@ LOG_SETUP(".rpcnetwork"); using vespalib::make_string; using namespace std::chrono_literals; +namespace mbus { + namespace { /** @@ -45,7 +49,9 @@ public: _gate() { ScheduleNow(); } - ~SyncTask() override = default; + ~SyncTask() override { + Kill(); + } void await() { _gate.await(); @@ -56,9 +62,36 @@ public: } }; -} // namespace <unnamed> +struct TargetPoolTask : public FNET_Task { + RPCTargetPool &_pool; -namespace mbus { + TargetPoolTask(FNET_Scheduler &scheduler, RPCTargetPool &pool) + : FNET_Task(&scheduler), + _pool(pool) + { + ScheduleNow(); + } + ~TargetPoolTask() override { + Kill(); + } + void PerformTask() override { + _pool.flushTargets(false); + Schedule(1.0); + } +}; + +std::unique_ptr<vespalib::SyncableThreadExecutor> +createExecutor(RPCNetworkParams::OptimizeFor optimizeFor) { + switch (optimizeFor) { + case RPCNetworkParams::OptimizeFor::LATENCY: + return std::make_unique<vespalib::ThreadStackExecutor>(1, 0x10000); + case RPCNetworkParams::OptimizeFor::THROUGHPUT: + default: + return std::make_unique<vespalib::SingleExecutor>(100); + } +} + +} RPCNetwork::SendContext::SendContext(RPCNetwork &net, const Message &msg, const std::vector<RoutingNode*> &recipients) @@ -87,40 +120,35 @@ RPCNetwork::SendContext::handleVersion(const vespalib::Version *version) } } if (shouldSend) { - _net.send(*this); - delete this; + if (_net.allowDispatchForEncode()) { + auto rejected = _net.getEncodeExecutor(true).execute(vespalib::makeLambdaTask([this]() { + _net.send(*this); + delete this; + })); + assert (!rejected); + } else { + _net.send(*this); + delete this; + } } } -RPCNetwork::TargetPoolTask::TargetPoolTask(FNET_Scheduler &scheduler, RPCTargetPool &pool) - : FNET_Task(&scheduler), - _pool(pool) -{ - ScheduleNow(); -} - -void -RPCNetwork::TargetPoolTask::PerformTask() -{ - _pool.flushTargets(false); - Schedule(1.0); -} - RPCNetwork::RPCNetwork(const RPCNetworkParams ¶ms) : _owner(nullptr), _ident(params.getIdentity()), _threadPool(std::make_unique<FastOS_ThreadPool>(128000, 0)), - _transport(std::make_unique<FNET_Transport>()), + _transport(std::make_unique<FNET_Transport>(params.getNumThreads())), _orb(std::make_unique<FRT_Supervisor>(_transport.get())), _scheduler(*_transport->GetScheduler()), - _targetPool(std::make_unique<RPCTargetPool>(params.getConnectionExpireSecs())), - _targetPoolTask(_scheduler, *_targetPool), - _servicePool(std::make_unique<RPCServicePool>(*this, 4096)), _slobrokCfgFactory(std::make_unique<slobrok::ConfiguratorFactory>(params.getSlobrokConfig())), _mirror(std::make_unique<slobrok::api::MirrorAPI>(*_orb, *_slobrokCfgFactory)), _regAPI(std::make_unique<slobrok::api::RegisterAPI>(*_orb, *_slobrokCfgFactory)), _requestedPort(params.getListenPort()), - _executor(std::make_unique<vespalib::ThreadStackExecutor>(params.getNumThreads(), 65536)), + _targetPool(std::make_unique<RPCTargetPool>(params.getConnectionExpireSecs())), + _targetPoolTask(std::make_unique<TargetPoolTask>(_scheduler, *_targetPool)), + _servicePool(std::make_unique<RPCServicePool>(*_mirror, 4096)), + _singleEncodeExecutor(createExecutor(params.getOptimizeFor())), + _singleDecodeExecutor(createExecutor(params.getOptimizeFor())), _sendV1(std::make_unique<RPCSendV1>()), _sendV2(std::make_unique<RPCSendV2>()), _sendAdapters(), @@ -130,7 +158,6 @@ RPCNetwork::RPCNetwork(const RPCNetworkParams ¶ms) : { _transport->SetMaxInputBufferSize(params.getMaxInputBufferSize()); _transport->SetMaxOutputBufferSize(params.getMaxOutputBufferSize()); - _transport->SetTCPNoDelay(params.getTcpNoDelay()); } RPCNetwork::~RPCNetwork() @@ -400,7 +427,8 @@ void RPCNetwork::sync() { SyncTask task(_scheduler); - _executor->sync(); + _singleEncodeExecutor->sync(); + _singleDecodeExecutor->sync(); task.await(); } @@ -409,8 +437,10 @@ RPCNetwork::shutdown() { _transport->ShutDown(true); _threadPool->Close(); - _executor->shutdown(); - _executor->sync(); + _singleEncodeExecutor->shutdown(); + _singleDecodeExecutor->shutdown(); + _singleEncodeExecutor->sync(); + _singleDecodeExecutor->sync(); } void diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.h b/messagebus/src/vespa/messagebus/network/rpcnetwork.h index a6c2724929d..a510aae9014 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.h +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.h @@ -50,13 +50,6 @@ private: void handleVersion(const vespalib::Version *version) override; }; - struct TargetPoolTask : public FNET_Task { - RPCTargetPool &_pool; - - TargetPoolTask(FNET_Scheduler &scheduler, RPCTargetPool &pool); - void PerformTask() override; - }; - using SendAdapterMap = std::map<vespalib::Version, RPCSendAdapter*>; INetworkOwner *_owner; @@ -65,14 +58,15 @@ private: std::unique_ptr<FNET_Transport> _transport; std::unique_ptr<FRT_Supervisor> _orb; FNET_Scheduler &_scheduler; - std::unique_ptr<RPCTargetPool> _targetPool; - TargetPoolTask _targetPoolTask; - std::unique_ptr<RPCServicePool> _servicePool; std::unique_ptr<slobrok::ConfiguratorFactory> _slobrokCfgFactory; std::unique_ptr<slobrok::api::IMirrorAPI> _mirror; std::unique_ptr<slobrok::api::RegisterAPI> _regAPI; int _requestedPort; - std::unique_ptr<vespalib::ThreadStackExecutor> _executor; + std::unique_ptr<RPCTargetPool> _targetPool; + std::unique_ptr<FNET_Task> _targetPoolTask; + std::unique_ptr<RPCServicePool> _servicePool; + std::unique_ptr<vespalib::SyncableThreadExecutor> _singleEncodeExecutor; + std::unique_ptr<vespalib::SyncableThreadExecutor> _singleDecodeExecutor; std::unique_ptr<RPCSendAdapter> _sendV1; std::unique_ptr<RPCSendAdapter> _sendV2; SendAdapterMap _sendAdapters; @@ -80,7 +74,6 @@ private: bool _allowDispatchForEncode; bool _allowDispatchForDecode; - /** * Resolves and assigns a service address for the given recipient using the * given address. This is called by the {@link @@ -231,7 +224,8 @@ public: const slobrok::api::IMirrorAPI &getMirror() const override; CompressionConfig getCompressionConfig() { return _compressionConfig; } void invoke(FRT_RPCRequest *req); - vespalib::Executor & getExecutor() const { return *_executor; } + vespalib::Executor & getEncodeExecutor(bool requireSequencing) const { return requireSequencing ? *_singleEncodeExecutor : *_singleEncodeExecutor; } + vespalib::Executor & getDecodeExecutor(bool requireSequencing) const { return requireSequencing ? *_singleDecodeExecutor : *_singleDecodeExecutor; } bool allowDispatchForEncode() const { return _allowDispatchForEncode; } bool allowDispatchForDecode() const { return _allowDispatchForDecode; } diff --git a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp index 5bf277a8ee6..482a46b2564 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp @@ -14,8 +14,8 @@ RPCNetworkParams::RPCNetworkParams(config::ConfigUri configUri) : _listenPort(0), _maxInputBufferSize(256*1024), _maxOutputBufferSize(256*1024), - _numThreads(4), - _tcpNoDelay(true), + _numThreads(1), + _optimizeFor(OptimizeFor::LATENCY), _dispatchOnEncode(true), _dispatchOnDecode(false), _connectionExpireSecs(600), diff --git a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h index 140f81c611c..a4b752f46d4 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h +++ b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h @@ -12,21 +12,10 @@ namespace mbus { * held by this class. This class has reasonable default values for each parameter. */ class RPCNetworkParams { -private: +public: + enum class OptimizeFor { LATENCY, THROUGHPUT}; using CompressionConfig = vespalib::compression::CompressionConfig; - Identity _identity; - config::ConfigUri _slobrokConfig; - int _listenPort; - uint32_t _maxInputBufferSize; - uint32_t _maxOutputBufferSize; - uint32_t _numThreads; - bool _tcpNoDelay; - bool _dispatchOnEncode; - bool _dispatchOnDecode; - double _connectionExpireSecs; - CompressionConfig _compressionConfig; -public: RPCNetworkParams(); RPCNetworkParams(config::ConfigUri configUri); ~RPCNetworkParams(); @@ -107,12 +96,12 @@ public: uint32_t getNumThreads() const { return _numThreads; } - RPCNetworkParams &setTcpNoDelay(bool tcpNoDelay) { - _tcpNoDelay = tcpNoDelay; + RPCNetworkParams &setOptimizeFor(OptimizeFor tcpNoDelay) { + _optimizeFor = tcpNoDelay; return *this; } - bool getTcpNoDelay() const { return _tcpNoDelay; } + OptimizeFor getOptimizeFor() const { return _optimizeFor; } /** * Returns the number of seconds before an idle network connection expires. @@ -198,6 +187,18 @@ public: } uint32_t getDispatchOnEncode() const { return _dispatchOnEncode; } +private: + Identity _identity; + config::ConfigUri _slobrokConfig; + int _listenPort; + uint32_t _maxInputBufferSize; + uint32_t _maxOutputBufferSize; + uint32_t _numThreads; + OptimizeFor _optimizeFor; + bool _dispatchOnEncode; + bool _dispatchOnDecode; + double _connectionExpireSecs; + CompressionConfig _compressionConfig; }; } diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.cpp b/messagebus/src/vespa/messagebus/network/rpcsend.cpp index 2422638dc05..d217c7964d6 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsend.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcsend.cpp @@ -148,7 +148,14 @@ RPCSend::send(RoutingNode &recipient, const vespalib::Version &version, void RPCSend::RequestDone(FRT_RPCRequest *req) { - doRequestDone(req); + if ( _net->allowDispatchForDecode()) { + auto rejected = _net->getDecodeExecutor(true).execute(makeLambdaTask([this, req]() { + doRequestDone(req); + })); + assert (!rejected); + } else { + doRequestDone(req); + } } void @@ -221,13 +228,13 @@ void RPCSend::handleReply(Reply::UP reply) { const IProtocol * protocol = _net->getOwner().getProtocol(reply->getProtocol()); - if (!protocol || protocol->requireSequencing() || !_net->allowDispatchForEncode()) { - doHandleReply(protocol, std::move(reply)); - } else { - auto rejected = _net->getExecutor().execute(makeLambdaTask([this, protocol, reply = std::move(reply)]() mutable { + if (protocol && _net->allowDispatchForEncode()) { + auto rejected = _net->getEncodeExecutor(protocol->requireSequencing()).execute(makeLambdaTask([this, protocol, reply = std::move(reply)]() mutable { doHandleReply(protocol, std::move(reply)); })); assert (!rejected); + } else { + doHandleReply(protocol, std::move(reply)); } } @@ -266,13 +273,13 @@ RPCSend::invoke(FRT_RPCRequest *req) vespalib::string(params->getProtocol()).c_str(), _serverIdent.c_str()))); return; } - if (protocol->requireSequencing() || !_net->allowDispatchForDecode()) { - doRequest(req, protocol, std::move(params)); - } else { - auto rejected = _net->getExecutor().execute(makeLambdaTask([this, req, protocol, params = std::move(params)]() mutable { + if (_net->allowDispatchForDecode()) { + auto rejected = _net->getDecodeExecutor(protocol->requireSequencing()).execute(makeLambdaTask([this, req, protocol, params = std::move(params)]() mutable { doRequest(req, protocol, std::move(params)); })); assert (!rejected); + } else { + doRequest(req, protocol, std::move(params)); } } diff --git a/messagebus/src/vespa/messagebus/network/rpcservice.cpp b/messagebus/src/vespa/messagebus/network/rpcservice.cpp index fd1b84f545f..ecf40973187 100644 --- a/messagebus/src/vespa/messagebus/network/rpcservice.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcservice.cpp @@ -6,37 +6,37 @@ namespace mbus { RPCService::RPCService(const Mirror &mirror, const string &pattern) : - _mirror(mirror), - _pattern(pattern), - _addressIdx(random()), - _addressGen(0), - _addressList() -{ } - -RPCService::~RPCService() = default; - -RPCServiceAddress::UP -RPCService::resolve() + _serviceName(), + _connectionSpec() { - if (_pattern.find("tcp/") == 0) { - size_t pos = _pattern.find_last_of('/'); - if (pos != string::npos && pos < _pattern.size() - 1) { - auto ret = std::make_unique<RPCServiceAddress>(_pattern, _pattern.substr(0, pos)); - if (!ret->isMalformed()) { - return ret; + if (pattern.find("tcp/") == 0) { + size_t pos = pattern.find_last_of('/'); + if (pos != string::npos && pos < pattern.size() - 1) { + RPCServiceAddress test(pattern, pattern.substr(0, pos)); + if ( ! test.isMalformed()) { + _serviceName = pattern; + _connectionSpec = pattern.substr(0, pos); } } } else { - if (_addressGen != _mirror.updates()) { - _addressGen = _mirror.updates(); - _addressList = _mirror.lookup(_pattern); - } - if (!_addressList.empty()) { - _addressIdx = (_addressIdx + 1) % _addressList.size(); - const AddressList::value_type &entry = _addressList[_addressIdx]; - return std::make_unique<RPCServiceAddress>(entry.first, entry.second); + Mirror::SpecList addressList = mirror.lookup(pattern); + if (!addressList.empty()) { + assert(addressList.size() == 1); //TODO URGENT remove assert after a few factory runs. + const auto &entry = addressList.front(); + _serviceName = entry.first; + _connectionSpec = entry.second; } } +} + +RPCService::~RPCService() = default; + +RPCServiceAddress::UP +RPCService::make_address() +{ + if ( !_serviceName.empty()) { + return std::make_unique<RPCServiceAddress>(_serviceName, _connectionSpec); + } return RPCServiceAddress::UP(); } diff --git a/messagebus/src/vespa/messagebus/network/rpcservice.h b/messagebus/src/vespa/messagebus/network/rpcservice.h index 18c847b0298..13792163693 100644 --- a/messagebus/src/vespa/messagebus/network/rpcservice.h +++ b/messagebus/src/vespa/messagebus/network/rpcservice.h @@ -16,13 +16,9 @@ class RPCNetwork; class RPCService { private: typedef slobrok::api::IMirrorAPI Mirror; - typedef Mirror::SpecList AddressList; - const Mirror &_mirror; - string _pattern; - uint32_t _addressIdx; - uint32_t _addressGen; - AddressList _addressList; + string _serviceName; + string _connectionSpec; public: using UP = std::unique_ptr<RPCService>; @@ -44,15 +40,9 @@ public: * * @return A concrete service address. */ - RPCServiceAddress::UP resolve(); + RPCServiceAddress::UP make_address(); - /** - * Returns the pattern used when querying for the naming server for - * addresses. This is given at construtor time. - * - * @return The service pattern. - */ - const string &getPattern() const { return _pattern; } + bool isValid() const { return ! _connectionSpec.empty(); } }; } // namespace mbus diff --git a/messagebus/src/vespa/messagebus/network/rpcserviceaddress.h b/messagebus/src/vespa/messagebus/network/rpcserviceaddress.h index 99a9f383e75..77d64517c40 100644 --- a/messagebus/src/vespa/messagebus/network/rpcserviceaddress.h +++ b/messagebus/src/vespa/messagebus/network/rpcserviceaddress.h @@ -33,7 +33,7 @@ public: * @param connectionSpec The connection specification. */ RPCServiceAddress(const string &serviceName, const string &connectionSpec); - ~RPCServiceAddress(); + ~RPCServiceAddress() override; /** * Returns whether or not this service address is malformed. diff --git a/messagebus/src/vespa/messagebus/network/rpcservicepool.cpp b/messagebus/src/vespa/messagebus/network/rpcservicepool.cpp index fb40ccff62b..358698570d2 100644 --- a/messagebus/src/vespa/messagebus/network/rpcservicepool.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcservicepool.cpp @@ -6,11 +6,14 @@ namespace mbus { -RPCServicePool::RPCServicePool(RPCNetwork &net, uint32_t maxSize) : - _net(net), - _lru(maxSize) +RPCServicePool::RPCServicePool(const slobrok::api::IMirrorAPI & mirror, uint32_t maxSize) : + _mirror(mirror), + _lock(), + _lru(std::make_unique<ServiceCache>(maxSize)), + _updateGen(0), + _maxSize(maxSize) { - _lru.reserve(maxSize); + _lru->reserve(maxSize); assert(maxSize > 0); } @@ -19,27 +22,52 @@ RPCServicePool::~RPCServicePool() = default; RPCServiceAddress::UP RPCServicePool::resolve(const string &pattern) { - std::unique_ptr<RPCService> * found = _lru.findAndRef(pattern); - if (found) { - return (*found)->resolve(); + std::shared_ptr<RPCService> service; + { + LockGuard guard(_lock); + handleMirrorUpdates(guard); + std::shared_ptr<RPCService> *found = _lru->findAndRef(pattern); + if (found) { + service = *found; + } + } + + if (service) { + return service->make_address(); } else { - auto service = std::make_unique<RPCService>(_net.getMirror(), pattern); - auto result = service->resolve(); - _lru[pattern] = std::move(service); + service = std::make_shared<RPCService>(_mirror, pattern); + auto result = service->make_address(); + if (service->isValid()) { + LockGuard guard(_lock); + (*_lru)[pattern] = std::move(service); + } return result; } + +} + +void +RPCServicePool::handleMirrorUpdates(const LockGuard &) { + uint32_t currentgen = _mirror.updates(); + if (_updateGen != currentgen) { + auto lru = std::make_unique<ServiceCache>(_maxSize); + _lru.swap(lru); + _updateGen = currentgen; + } } uint32_t RPCServicePool::getSize() const { - return _lru.size(); + LockGuard guard(_lock); + return _lru->size(); } bool RPCServicePool::hasService(const string &pattern) const { - return _lru.hasKey(pattern); + LockGuard guard(_lock); + return _lru->hasKey(pattern); } } // namespace mbus diff --git a/messagebus/src/vespa/messagebus/network/rpcservicepool.h b/messagebus/src/vespa/messagebus/network/rpcservicepool.h index 2614363838c..212c975a38c 100644 --- a/messagebus/src/vespa/messagebus/network/rpcservicepool.h +++ b/messagebus/src/vespa/messagebus/network/rpcservicepool.h @@ -13,12 +13,6 @@ class RPCNetwork; * the rpc network. */ class RPCServicePool { -private: - typedef vespalib::lrucache_map< vespalib::LruParam<string, RPCService::UP> > ServiceCache; - - RPCNetwork &_net; - ServiceCache _lru; - public: RPCServicePool(const RPCServicePool &) = delete; RPCServicePool & operator = (const RPCServicePool &) = delete; @@ -28,7 +22,7 @@ public: * @param net The underlying RPC network. * @param maxSize The max number of services to cache. */ - RPCServicePool(RPCNetwork &net, uint32_t maxSize); + RPCServicePool(const slobrok::api::IMirrorAPI & mirror, uint32_t maxSize); /** * Destructor. Frees any allocated resources. @@ -61,6 +55,17 @@ public: * @return True if a corresponding service is in the pool. */ bool hasService(const string &pattern) const; +private: + using ServiceCache = vespalib::lrucache_map< vespalib::LruParam<string, std::shared_ptr<RPCService> >>; + using LockGuard = std::lock_guard<std::mutex>; + + void handleMirrorUpdates(const LockGuard & guard); + + const slobrok::api::IMirrorAPI & _mirror; + mutable std::mutex _lock; + std::unique_ptr<ServiceCache> _lru; + uint32_t _updateGen; + uint32_t _maxSize; }; } // namespace mbus diff --git a/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp b/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp index cc09e44c460..b42ac47e54d 100644 --- a/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp +++ b/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp @@ -32,7 +32,7 @@ void RPCTargetPool::flushTargets(bool force) { uint64_t currentTime = _timer->getMilliTime(); - vespalib::LockGuard guard(_lock); + LockGuard guard(_lock); TargetMap::iterator it = _targets.begin(); while (it != _targets.end()) { Entry &entry = it->second; @@ -56,7 +56,7 @@ RPCTargetPool::flushTargets(bool force) size_t RPCTargetPool::size() { - vespalib::LockGuard guard(_lock); + LockGuard guard(_lock); return _targets.size(); } @@ -65,7 +65,7 @@ RPCTargetPool::getTarget(FRT_Supervisor &orb, const RPCServiceAddress &address) { const string & spec = address.getConnectionSpec(); uint64_t currentTime = _timer->getMilliTime(); - vespalib::LockGuard guard(_lock); + LockGuard guard(_lock); auto it = _targets.find(spec); if (it != _targets.end()) { Entry &entry = it->second; diff --git a/messagebus/src/vespa/messagebus/network/rpctargetpool.h b/messagebus/src/vespa/messagebus/network/rpctargetpool.h index 5f858f66993..d47fd977356 100644 --- a/messagebus/src/vespa/messagebus/network/rpctargetpool.h +++ b/messagebus/src/vespa/messagebus/network/rpctargetpool.h @@ -28,9 +28,10 @@ private: Entry(RPCTarget::SP target, uint64_t lastUse); }; - typedef std::map<string, Entry> TargetMap; + using TargetMap = std::map<string, Entry>; + using LockGuard = std::lock_guard<std::mutex>; - vespalib::Lock _lock; + std::mutex _lock; TargetMap _targets; ITimer::UP _timer; uint64_t _expireMillis; diff --git a/messagebus_test/src/tests/error/cpp-client.cpp b/messagebus_test/src/tests/error/cpp-client.cpp index abc7967bfe5..6296253d3df 100644 --- a/messagebus_test/src/tests/error/cpp-client.cpp +++ b/messagebus_test/src/tests/error/cpp-client.cpp @@ -7,7 +7,6 @@ #include <vespa/messagebus/rpcmessagebus.h> #include <vespa/messagebus/network/rpcnetworkparams.h> #include <vespa/messagebus/testlib/receptor.h> -#include <vespa/vespalib/util/time.h> #include <thread> #include <vespa/fastos/app.h> @@ -34,11 +33,11 @@ App::Main() SourceSession::UP ss = mb.getMessageBus().createSourceSession(src, SourceSessionParams().setTimeout(300s)); for (int i = 0; i < 10; ++i) { - msg.reset(new SimpleMessage("test")); + msg = std::make_unique<SimpleMessage>("test"); msg->getTrace().setLevel(9); ss->send(std::move(msg), "test"); reply = src.getReply(600s); // 10 minutes timeout - if (reply.get() == 0) { + if ( ! reply) { fprintf(stderr, "CPP-CLIENT: no reply\n"); } else { fprintf(stderr, "CPP-CLIENT:\n%s\n", @@ -49,7 +48,7 @@ App::Main() } std::this_thread::sleep_for(1s); } - if (reply.get() == 0) { + if ( ! reply) { fprintf(stderr, "CPP-CLIENT: no reply\n"); return 1; } diff --git a/messagebus_test/src/tests/error/error.cpp b/messagebus_test/src/tests/error/error.cpp index e5749db452b..87eb391fc86 100644 --- a/messagebus_test/src/tests/error/error.cpp +++ b/messagebus_test/src/tests/error/error.cpp @@ -1,48 +1,47 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <vespa/log/log.h> -LOG_SETUP("error_test"); + #include <vespa/vespalib/testkit/testapp.h> #include <vespa/vespalib/testkit/test_kit.h> #include <vespa/messagebus/testlib/slobrok.h> #include <vespa/vespalib/util/stringfmt.h> +#include <vespa/log/log.h> +LOG_SETUP("error_test"); + using namespace mbus; using vespalib::make_string; -TEST_SETUP(Test); -int -Test::Main() -{ - TEST_INIT("error_test"); +TEST("error_test") { Slobrok slobrok; const std::string routing_template = TEST_PATH("routing-template.cfg"); const std::string ctl_script = TEST_PATH("ctl.sh"); { // Make slobrok config - EXPECT_TRUE(system("echo slobrok[1] > slobrok.cfg") == 0); - EXPECT_TRUE(system(make_string("echo 'slobrok[0].connectionspec tcp/localhost:%d' " - ">> slobrok.cfg", slobrok.port()).c_str()) == 0); + EXPECT_EQUAL(0, system("echo slobrok[1] > slobrok.cfg")); + EXPECT_EQUAL(0, system(make_string("echo 'slobrok[0].connectionspec tcp/localhost:%d' " + ">> slobrok.cfg", slobrok.port()).c_str())); } { // CPP SERVER { // Make routing config - EXPECT_TRUE(system(("cat " + routing_template + " | sed 's#session#cpp/session#' > routing.cfg").c_str()) == 0); + EXPECT_EQUAL(0, system(("cat " + routing_template + " | sed 's#session#cpp/session#' > routing.cfg").c_str())); } fprintf(stderr, "STARTING CPP-SERVER\n"); - EXPECT_TRUE(system((ctl_script + " start server cpp").c_str()) == 0); - EXPECT_TRUE(system("./messagebus_test_cpp-client-error_app") == 0); - EXPECT_TRUE(system("../../binref/runjava JavaClient") == 0); - EXPECT_TRUE(system((ctl_script + " stop server cpp").c_str()) == 0); + EXPECT_EQUAL(0, system((ctl_script + " start server cpp").c_str())); + EXPECT_EQUAL(0, system("./messagebus_test_cpp-client-error_app")); + EXPECT_EQUAL(0, system("../../binref/runjava JavaClient")); + EXPECT_EQUAL(0, system((ctl_script + " stop server cpp").c_str())); } { // JAVA SERVER { // Make routing config - EXPECT_TRUE(system(("cat " + routing_template + " | sed 's#session#java/session#' > routing.cfg").c_str()) == 0); + EXPECT_EQUAL(0, system(("cat " + routing_template + " | sed 's#session#java/session#' > routing.cfg").c_str())); } fprintf(stderr, "STARTING JAVA-SERVER\n"); - EXPECT_TRUE(system((ctl_script + " start server java").c_str()) == 0); - EXPECT_TRUE(system("./messagebus_test_cpp-client-error_app") == 0); - EXPECT_TRUE(system("../../binref/runjava JavaClient") == 0); - EXPECT_TRUE(system((ctl_script + " stop server java").c_str()) == 0); + EXPECT_EQUAL(0, system((ctl_script + " start server java").c_str())); + EXPECT_EQUAL(0, system("./messagebus_test_cpp-client-error_app")); + EXPECT_EQUAL(0, system("../../binref/runjava JavaClient")); + EXPECT_EQUAL(0, system((ctl_script + " stop server java").c_str())); } - TEST_DONE(); } + +TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/yum/YumPackageName.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/yum/YumPackageName.java index 54c8719bceb..3f5c3025850 100644 --- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/yum/YumPackageName.java +++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/yum/YumPackageName.java @@ -72,6 +72,15 @@ public class YumPackageName { architecture = packageName.architecture; } + /** + * Set the epoch of the YUM package. + * + * <p>WARNING: Should only be invoked if the YUM package actually has an epoch. Typically + * YUM packages doesn't have one explicitly set, and in case "0" will be used with + * {@link #toVersionLockName()} (otherwise it fails), but it will be absent from an + * install with {@link #toName()} (otherwise it fails). This typically means that + * you should set this only if the epoch is != "0".</p> + */ public Builder setEpoch(String epoch) { this.epoch = Optional.of(epoch); return this; } public Builder setName(String name) { this.name = name; return this; } public Builder setVersion(String version) { this.version = Optional.of(version); return this; } @@ -235,7 +244,7 @@ public class YumPackageName { */ public String toVersionLockName() { return String.format("%s:%s-%s-%s.%s", - epoch.orElseThrow(() -> new IllegalStateException("Epoch is missing for YUM package " + name)), + epoch.orElse("0"), name, version.orElseThrow(() -> new IllegalStateException("Version is missing for YUM package " + name)), release.orElseThrow(() -> new IllegalStateException("Release is missing for YUM package " + name)), diff --git a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/task/util/yum/YumPackageNameTest.java b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/task/util/yum/YumPackageNameTest.java index 01664f5c22b..64e2997d486 100644 --- a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/task/util/yum/YumPackageNameTest.java +++ b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/task/util/yum/YumPackageNameTest.java @@ -56,7 +56,7 @@ public class YumPackageNameTest { "1.el7", null, "docker-engine-selinux-1.12.6-1.el7", - null); + "0:docker-engine-selinux-1.12.6-1.el7.*"); // name-ver-rel.arch verifyPackageName("docker-engine-selinux-1.12.6-1.el7.x86_64", @@ -66,7 +66,7 @@ public class YumPackageNameTest { "1.el7", "x86_64", "docker-engine-selinux-1.12.6-1.el7.x86_64", - null); + "0:docker-engine-selinux-1.12.6-1.el7.*"); // name-epoch:ver-rel.arch verifyPackageName( @@ -112,7 +112,7 @@ public class YumPackageNameTest { yumPackageName.toVersionLockName(); fail(); } catch (IllegalStateException e) { - assertThat(e.getMessage(), containsStringIgnoringCase("epoch is missing")); + assertThat(e.getMessage(), containsStringIgnoringCase("Version is missing ")); } } else { assertEquals(toVersionName, yumPackageName.toVersionLockName()); diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/MetricsReporter.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/MetricsReporter.java index e107abf8fbb..098d706bf05 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/MetricsReporter.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/MetricsReporter.java @@ -29,6 +29,7 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import static com.yahoo.config.provision.NodeResources.DiskSpeed.any; +import static com.yahoo.vespa.hosted.provision.Node.State.active; /** * @author oyving @@ -249,16 +250,16 @@ public class MetricsReporter extends Maintainer { } private static NodeResources getCapacityTotal(NodeList nodes) { - return nodes.nodeType(NodeType.host).asList().stream() + return nodes.nodeType(NodeType.host).state(active).asList().stream() .map(host -> host.flavor().resources()) - .map(resources -> resources.justNumbers()) + .map(NodeResources::justNumbers) .reduce(new NodeResources(0, 0, 0, 0, any), NodeResources::add); } private static NodeResources getFreeCapacityTotal(NodeList nodes) { - return nodes.nodeType(NodeType.host).asList().stream() + return nodes.nodeType(NodeType.host).state(active).asList().stream() .map(n -> freeCapacityOf(nodes, n)) - .map(resources -> resources.justNumbers()) + .map(NodeResources::justNumbers) .reduce(new NodeResources(0, 0, 0, 0, any), NodeResources::add); } diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/CapacityPolicies.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/CapacityPolicies.java index f010e6905e1..648bf52f455 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/CapacityPolicies.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/CapacityPolicies.java @@ -67,7 +67,6 @@ public class CapacityPolicies { private void ensureSufficientResources(NodeResources resources, ClusterSpec cluster) { double minMemoryGb = nodeResourceLimits.minMemoryGb(cluster.type()); if (resources.memoryGb() >= minMemoryGb) return; - throw new IllegalArgumentException(String.format(Locale.ENGLISH, "Must specify at least %.2f Gb of memory for %s cluster '%s', was: %.2f Gb", minMemoryGb, cluster.type().name(), cluster.id().value(), resources.memoryGb())); diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/MetricsReporterTest.java b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/MetricsReporterTest.java index aa262bdf751..ca7c33f96bd 100644 --- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/MetricsReporterTest.java +++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/MetricsReporterTest.java @@ -11,6 +11,7 @@ import com.yahoo.config.provision.NodeType; import com.yahoo.config.provision.Zone; import com.yahoo.jdisc.Metric; import com.yahoo.test.ManualClock; +import com.yahoo.transaction.NestedTransaction; import com.yahoo.vespa.applicationmodel.ApplicationInstance; import com.yahoo.vespa.applicationmodel.ApplicationInstanceReference; import com.yahoo.vespa.curator.Curator; @@ -163,6 +164,10 @@ public class MetricsReporterTest { container2 = container2.with(allocation(Optional.of("app2"), container2).get()); nodeRepository.addDockerNodes(new LockedNodeList(List.of(container2), nodeRepository.lockUnallocated())); + NestedTransaction transaction = new NestedTransaction(); + nodeRepository.activate(nodeRepository.getNodes(NodeType.host), transaction); + transaction.commit(); + Orchestrator orchestrator = mock(Orchestrator.class); when(orchestrator.getHostInfo(eq(reference), any())).thenReturn(HostInfo.createNoRemarks()); diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/ProvisioningTest.java b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/ProvisioningTest.java index 76258e86de9..bd8be5063fd 100644 --- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/ProvisioningTest.java +++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/ProvisioningTest.java @@ -403,6 +403,21 @@ public class ProvisioningTest { prepare(application, 1, 2, 3, 3, defaultResources, tester); } + @Test + public void below_resource_limit() { + ProvisioningTester tester = new ProvisioningTester.Builder().zone(new Zone(Environment.prod, RegionName.from("us-east"))).build(); + + ApplicationId application = tester.makeApplicationId(); + tester.makeReadyNodes(10, defaultResources); + try { + prepare(application, 2, 2, 3, 3, + new NodeResources(2, 2, 10, 2), tester); + } + catch (IllegalArgumentException e) { + assertEquals("Must specify at least 4.00 Gb of memory for container cluster 'container0', was: 2.00 Gb", e.getMessage()); + } + } + /** Dev always uses the zone default flavor */ @Test public void dev_deployment_flavor() { diff --git a/searchlib/src/vespa/searchlib/tensor/hnsw_graph.cpp b/searchlib/src/vespa/searchlib/tensor/hnsw_graph.cpp index db8d1067980..6f902a30861 100644 --- a/searchlib/src/vespa/searchlib/tensor/hnsw_graph.cpp +++ b/searchlib/src/vespa/searchlib/tensor/hnsw_graph.cpp @@ -50,4 +50,23 @@ HnswGraph::set_link_array(uint32_t docid, uint32_t level, const LinkArrayRef& ne links.remove(old_links_ref); } +std::vector<uint32_t> +HnswGraph::level_histogram() const +{ + std::vector<uint32_t> result; + size_t num_nodes = node_refs.size(); + for (size_t i = 0; i < num_nodes; ++i) { + uint32_t levels = 0; + auto node_ref = node_refs[i].load_acquire(); + if (node_ref.valid()) { + levels = nodes.get(node_ref).size(); + } + while (result.size() <= levels) { + result.push_back(0); + } + ++result[levels]; + } + return result; +} + } // namespace diff --git a/searchlib/src/vespa/searchlib/tensor/hnsw_graph.h b/searchlib/src/vespa/searchlib/tensor/hnsw_graph.h index 64892d06f09..233b9087af7 100644 --- a/searchlib/src/vespa/searchlib/tensor/hnsw_graph.h +++ b/searchlib/src/vespa/searchlib/tensor/hnsw_graph.h @@ -69,6 +69,8 @@ struct HnswGraph { } size_t size() const { return node_refs.size(); } + + std::vector<uint32_t> level_histogram() const; }; } diff --git a/searchlib/src/vespa/searchlib/tensor/hnsw_index.cpp b/searchlib/src/vespa/searchlib/tensor/hnsw_index.cpp index de6daba650c..b08d862ae6d 100644 --- a/searchlib/src/vespa/searchlib/tensor/hnsw_index.cpp +++ b/searchlib/src/vespa/searchlib/tensor/hnsw_index.cpp @@ -381,6 +381,20 @@ HnswIndex::get_state(const vespalib::slime::Inserter& inserter) const { auto& object = inserter.insertObject(); StateExplorerUtils::memory_usage_to_slime(memory_usage(), object.setObject("memory_usage")); + object.setLong("nodes", _graph.size()); + auto& histogram_array = object.setArray("level_histogram"); + auto level_histogram = _graph.level_histogram(); + for (uint32_t hist_val : level_histogram) { + histogram_array.addLong(hist_val); + } + uint32_t reachable = count_reachable_nodes(); + uint32_t unreachable = _graph.size() - reachable; + if (level_histogram.size() > 0) { + unreachable -= level_histogram[0]; + } + object.setLong("unreachable_nodes", unreachable); + object.setLong("entry_docid", _graph.entry_docid); + object.setLong("entry_level", _graph.entry_level); } std::unique_ptr<NearestNeighborIndexSaver> @@ -498,4 +512,31 @@ HnswIndex::check_link_symmetry() const return all_sym; } +uint32_t +HnswIndex::count_reachable_nodes() const +{ + int search_level = get_entry_level(); + if (search_level < 0) { + return 0; + } + auto visited = _visited_set_pool.get(_graph.size()); + uint32_t entry_id = get_entry_docid(); + LinkArray found_links; + found_links.push_back(entry_id); + visited.mark(entry_id); + while (search_level >= 0) { + for (uint32_t idx = 0; idx < found_links.size(); ++idx) { + uint32_t docid = found_links[idx]; + auto neighbors = _graph.get_link_array(docid, search_level); + for (uint32_t neighbor : neighbors) { + if (visited.is_marked(neighbor)) continue; + visited.mark(neighbor); + found_links.push_back(neighbor); + } + } + --search_level; + } + return found_links.size(); +} + } // namespace diff --git a/searchlib/src/vespa/searchlib/tensor/hnsw_index.h b/searchlib/src/vespa/searchlib/tensor/hnsw_index.h index b5d57c2ebfd..95001853710 100644 --- a/searchlib/src/vespa/searchlib/tensor/hnsw_index.h +++ b/searchlib/src/vespa/searchlib/tensor/hnsw_index.h @@ -147,6 +147,7 @@ public: HnswNode get_node(uint32_t docid) const; void set_node(uint32_t docid, const HnswNode &node); bool check_link_symmetry() const; + uint32_t count_reachable_nodes() const; static search::datastore::ArrayStoreConfig make_default_node_store_config(); static search::datastore::ArrayStoreConfig make_default_link_store_config(); diff --git a/searchlib/src/vespa/searchlib/tensor/hnsw_index_loader.cpp b/searchlib/src/vespa/searchlib/tensor/hnsw_index_loader.cpp index 75c62e5202f..f02ead86a8d 100644 --- a/searchlib/src/vespa/searchlib/tensor/hnsw_index_loader.cpp +++ b/searchlib/src/vespa/searchlib/tensor/hnsw_index_loader.cpp @@ -19,19 +19,19 @@ HnswIndexLoader::load(const fileutil::LoadedBuffer& buf) size_t num_readable = buf.size(sizeof(uint32_t)); _ptr = static_cast<const uint32_t *>(buf.buffer()); _end = _ptr + num_readable; - uint32_t entry_docid = nextVal(); - int32_t entry_level = nextVal(); - uint32_t num_nodes = nextVal(); + uint32_t entry_docid = next_int(); + int32_t entry_level = next_int(); + uint32_t num_nodes = next_int(); std::vector<uint32_t> link_array; for (uint32_t docid = 0; docid < num_nodes; ++docid) { - uint32_t num_levels = nextVal(); + uint32_t num_levels = next_int(); if (num_levels > 0) { _graph.make_node_for_document(docid, num_levels); for (uint32_t level = 0; level < num_levels; ++level) { - uint32_t num_links = nextVal(); + uint32_t num_links = next_int(); link_array.clear(); while (num_links-- > 0) { - link_array.push_back(nextVal()); + link_array.push_back(next_int()); } _graph.set_link_array(docid, level, link_array); } diff --git a/searchlib/src/vespa/searchlib/tensor/hnsw_index_loader.h b/searchlib/src/vespa/searchlib/tensor/hnsw_index_loader.h index abc68889a1b..174f66b95ec 100644 --- a/searchlib/src/vespa/searchlib/tensor/hnsw_index_loader.h +++ b/searchlib/src/vespa/searchlib/tensor/hnsw_index_loader.h @@ -23,7 +23,7 @@ private: const uint32_t *_ptr; const uint32_t *_end; bool _failed; - uint32_t nextVal() { + uint32_t next_int() { if (__builtin_expect((_ptr == _end), false)) { _failed = true; return 0; diff --git a/slobrok/src/tests/configure/configure.cpp b/slobrok/src/tests/configure/configure.cpp index 579e468db45..aa9826045ef 100644 --- a/slobrok/src/tests/configure/configure.cpp +++ b/slobrok/src/tests/configure/configure.cpp @@ -23,9 +23,6 @@ using slobrok::ConfigShim; using slobrok::SlobrokServer; using slobrok::ConfiguratorFactory; -TEST_SETUP(Test); - - std::string createSpec(int port) { @@ -93,10 +90,7 @@ compare(MirrorAPI &api, const char *pattern, SpecList expect) return false; } -int -Test::Main() -{ - TEST_INIT("configure_test"); +TEST("configure_test") { fnet::frt::StandaloneFRT orb1; fnet::frt::StandaloneFRT orb2; @@ -214,10 +208,10 @@ Test::Main() serverOne.stop(); serverTwo.stop(); - TEST_DONE(); - orb4.supervisor().GetTransport()->ShutDown(true); orb3.supervisor().GetTransport()->ShutDown(true); orb2.supervisor().GetTransport()->ShutDown(true); orb1.supervisor().GetTransport()->ShutDown(true); } + +TEST_MAIN() { TEST_RUN_ALL(); }
\ No newline at end of file diff --git a/storage/src/tests/storageserver/communicationmanagertest.cpp b/storage/src/tests/storageserver/communicationmanagertest.cpp index 6657a9f1600..8444319b395 100644 --- a/storage/src/tests/storageserver/communicationmanagertest.cpp +++ b/storage/src/tests/storageserver/communicationmanagertest.cpp @@ -158,6 +158,7 @@ TEST_F(CommunicationManagerTest, commands_are_dequeued_in_fifo_order) { storConfig.getConfigId()); DummyStorageLink *storageLink = new DummyStorageLink(); storage.push_back(std::unique_ptr<StorageLink>(storageLink)); + storage.open(); // Message dequeing does not start before we invoke `open` on the storage // link chain, so we enqueue messages in randomized priority order before @@ -168,7 +169,6 @@ TEST_F(CommunicationManagerTest, commands_are_dequeued_in_fifo_order) { for (auto pri : pris) { storage.enqueue(createDummyCommand(pri)); } - storage.open(); storageLink->waitForMessages(pris.size(), MESSAGE_WAIT_TIME_SEC); for (size_t i = 0; i < pris.size(); ++i) { @@ -191,12 +191,12 @@ TEST_F(CommunicationManagerTest, replies_are_dequeued_in_fifo_order) { storConfig.getConfigId()); DummyStorageLink *storageLink = new DummyStorageLink(); storage.push_back(std::unique_ptr<StorageLink>(storageLink)); + storage.open(); std::vector<api::StorageMessage::Priority> pris{200, 0, 255, 128}; for (auto pri : pris) { storage.enqueue(createDummyCommand(pri)->makeReply()); } - storage.open(); storageLink->waitForMessages(pris.size(), MESSAGE_WAIT_TIME_SEC); // Want FIFO order for replies, not priority-sorted order. diff --git a/storage/src/vespa/storage/common/storagelink.cpp b/storage/src/vespa/storage/common/storagelink.cpp index 431c90b27f2..a593cc913a8 100644 --- a/storage/src/vespa/storage/common/storagelink.cpp +++ b/storage/src/vespa/storage/common/storagelink.cpp @@ -123,9 +123,9 @@ void StorageLink::sendDown(const StorageMessage::SP& msg) default: LOG(error, "Link %s trying to send %s down while in state %s", toString().c_str(), msg->toString().c_str(), stateToString(getState())); - assert(false); + return; } - assert(msg.get()); + assert(msg); LOG(spam, "Storage Link %s to handle %s", toString().c_str(), msg->toString().c_str()); if (isBottom()) { LOG(spam, "Storage link %s at bottom of chain got message %s.", toString().c_str(), msg->toString().c_str()); @@ -165,9 +165,9 @@ void StorageLink::sendUp(const shared_ptr<StorageMessage> & msg) default: LOG(error, "Link %s trying to send %s up while in state %s", toString().c_str(), msg->toString(true).c_str(), stateToString(getState())); - assert(false); + return; } - assert(msg.get()); + assert(msg); if (isTop()) { ostringstream ost; ost << "Unhandled message at top of chain " << *msg << "."; diff --git a/storage/src/vespa/storage/config/stor-communicationmanager.def b/storage/src/vespa/storage/config/stor-communicationmanager.def index 8f5b22aa7fa..4536ea97855 100644 --- a/storage/src/vespa/storage/config/stor-communicationmanager.def +++ b/storage/src/vespa/storage/config/stor-communicationmanager.def @@ -29,9 +29,9 @@ mbus.compress.type enum {NONE, LZ4, ZSTD} default=LZ4 ## TTL for rpc target cache mbus.rpctargetcache.ttl double default = 600 -## Number of threads for mbus threadpool +## Number of threads for network. ## Any value below 1 will be 1. -mbus.num_threads int default=4 +mbus.num_threads int default=1 mbus.optimize_for enum {LATENCY, THROUGHPUT} default = LATENCY @@ -42,4 +42,4 @@ mbus.dispatch_on_encode bool default=true ## Enable to use above thread pool for decoding replies ## False will use network(fnet) thread ## Todo: Change default once verified in large scale deployment. -mbus.dispatch_on_decode bool default=false +mbus.dispatch_on_decode bool default=true diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp index fa2b0cda018..aff2b0f624f 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp +++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp @@ -14,17 +14,16 @@ #include <vespa/storageapi/message/state.h> #include <vespa/storageframework/generic/clock/timer.h> #include <vespa/vespalib/stllike/asciistream.h> -#include <vespa/vespalib/stllike/hash_map.hpp> #include <vespa/vespalib/util/stringfmt.h> - -#include <vespa/log/bufferedlogger.h> #include <vespa/document/bucket/fixed_bucket_spaces.h> -#include <vespa/documentapi/messagebus/messages/getdocumentreply.h> +#include <vespa/vespalib/stllike/hash_map.hpp> +#include <vespa/log/bufferedlogger.h> LOG_SETUP(".communication.manager"); using vespalib::make_string; using document::FixedBucketSpaces; +using CommunicationManagerConfig = vespa::config::content::core::StorCommunicationmanagerConfig; namespace storage { @@ -281,6 +280,17 @@ struct PlaceHolderBucketResolver : public BucketResolver { } }; +mbus::RPCNetworkParams::OptimizeFor +convert(CommunicationManagerConfig::Mbus::OptimizeFor optimizeFor) { + switch (optimizeFor) { + case CommunicationManagerConfig::Mbus::OptimizeFor::LATENCY: + return mbus::RPCNetworkParams::OptimizeFor::LATENCY; + case CommunicationManagerConfig::Mbus::OptimizeFor::THROUGHPUT: + default: + return mbus::RPCNetworkParams::OptimizeFor::THROUGHPUT; + } +} + } CommunicationManager::CommunicationManager(StorageComponentRegister& compReg, const config::ConfigUri & configUri) @@ -290,7 +300,6 @@ CommunicationManager::CommunicationManager(StorageComponentRegister& compReg, co _listener(), _eventQueue(), _mbus(), - _count(0), _configUri(configUri), _closed(false), _docApiConverter(configUri, std::make_shared<PlaceHolderBucketResolver>()) @@ -415,7 +424,7 @@ void CommunicationManager::configure(std::unique_ptr<CommunicationManagerConfig> params.setNumThreads(std::max(1, config->mbus.numThreads)); params.setDispatchOnDecode(config->mbus.dispatchOnDecode); params.setDispatchOnEncode(config->mbus.dispatchOnEncode); - params.setTcpNoDelay(config->mbus.optimizeFor == CommunicationManagerConfig::Mbus::OptimizeFor::LATENCY); + params.setOptimizeFor(convert(config->mbus.optimizeFor)); params.setIdentity(mbus::Identity(_component.getIdentity())); if (config->mbusport != -1) { @@ -480,8 +489,8 @@ void CommunicationManager::enqueue(std::shared_ptr<api::StorageMessage> msg) { assert(msg); - LOG(spam, "Enq storage message %s, priority %d", msg->toString().c_str(), msg->getPriority()); - _eventQueue.enqueue(std::move(msg)); + LOG(spam, "Process storage message %s, priority %d", msg->toString().c_str(), msg->getPriority()); + process(msg); } bool diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.h b/storage/src/vespa/storage/storageserver/communicationmanager.h index c08ad214768..8983dbdf057 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.h +++ b/storage/src/vespa/storage/storageserver/communicationmanager.h @@ -116,7 +116,7 @@ private: void process(const std::shared_ptr<api::StorageMessage>& msg); - using CommunicationManagerConfig= vespa::config::content::core::StorCommunicationmanagerConfig; + using CommunicationManagerConfig = vespa::config::content::core::StorCommunicationmanagerConfig; using BucketspacesConfig = vespa::config::content::core::BucketspacesConfig; void configureMessageBusLimits(const CommunicationManagerConfig& cfg); @@ -133,7 +133,6 @@ private: std::unique_ptr<mbus::RPCMessageBus> _mbus; std::unique_ptr<mbus::DestinationSession> _messageBusSession; std::unique_ptr<mbus::SourceSession> _sourceSession; - uint32_t _count; vespalib::Lock _messageBusSentLock; std::map<api::StorageMessage::Id, std::shared_ptr<api::StorageCommand> > _messageBusSent; diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java index 32cdbf9af5c..94176bbb658 100644 --- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java +++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java @@ -6,18 +6,21 @@ import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonGenerator; import com.yahoo.vespa.hadoop.mapreduce.util.TupleTools; import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration; -import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.pig.EvalFunc; import org.apache.pig.PigWarning; import org.apache.pig.data.DataBag; import org.apache.pig.data.DataByteArray; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; +import org.apache.pig.tools.pigstats.PigStatusReporter; import org.apache.pig.impl.logicalLayer.schema.Schema; import org.joda.time.DateTime; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.io.UncheckedIOException; import java.math.BigDecimal; import java.math.BigInteger; import java.util.*; @@ -68,16 +71,45 @@ public class VespaDocumentOperation extends EvalFunc<String> { private static final String SIMPLE_ARRAY_FIELDS = "simple-array-fields"; private static final String SIMPLE_OBJECT_FIELDS = "simple-object-fields"; private static final String CREATE_TENSOR_FIELDS = "create-tensor-fields"; + private static final String REMOVE_TENSOR_FIELDS = "remove-tensor-fields"; + private static final String UPDATE_TENSOR_FIELDS = "update-tensor-fields"; + private static final String REMOVE_MAP_FIELDS = "remove-map-fields"; + private static final String UPDATE_MAP_FIELDS = "update-map-fields"; private static final String EXCLUDE_FIELDS = "exclude-fields"; private static final String TESTSET_CONDITION = "condition"; - private static final String PARTIAL_UPDATE_ASSIGN = "assign"; + private static final String PARTIAL_UPDATE_ADD = "add"; + private static final String PARTIAL_UPDATE_REMOVE = "remove"; + + private static Map<String, String> mapPartialOperationMap; + + static { + mapPartialOperationMap = new HashMap<>(); + mapPartialOperationMap.put(REMOVE_MAP_FIELDS, PARTIAL_UPDATE_REMOVE); + mapPartialOperationMap.put(UPDATE_MAP_FIELDS, PARTIAL_UPDATE_ASSIGN); + } + + private static Map<String, String> partialOperationMap; + + static { + partialOperationMap = new HashMap<>(); + partialOperationMap.put(REMOVE_TENSOR_FIELDS, PARTIAL_UPDATE_REMOVE); + partialOperationMap.put(UPDATE_TENSOR_FIELDS, PARTIAL_UPDATE_ADD); + partialOperationMap.put(REMOVE_MAP_FIELDS, PARTIAL_UPDATE_REMOVE); + partialOperationMap.put(UPDATE_MAP_FIELDS, PARTIAL_UPDATE_ASSIGN); + } private final String template; private final Operation operation; private final Properties properties; + private PigStatusReporter statusReporter; public VespaDocumentOperation(String... params) { + statusReporter = PigStatusReporter.getInstance(); + if (statusReporter != null) { + statusReporter.incrCounter("Vespa Document Operation Counters", "Document operation ok", 0); + statusReporter.incrCounter("Vespa Document Operation Counters", "Document operation failed", 0); + } properties = VespaConfiguration.loadProperties(params); template = properties.getProperty(PROPERTY_ID_TEMPLATE); operation = Operation.fromString(properties.getProperty(PROPERTY_OPERATION, "put")); @@ -86,14 +118,20 @@ public class VespaDocumentOperation extends EvalFunc<String> { @Override public String exec(Tuple tuple) throws IOException { if (tuple == null || tuple.size() == 0) { + if (statusReporter != null) { + statusReporter.incrCounter("Vespa Document Operation Counters", "Document operation failed", 1); + } return null; } if (template == null || template.length() == 0) { - warn("No valid document id template found. Skipping.", PigWarning.UDF_WARNING_1); + if (statusReporter != null) { + statusReporter.incrCounter("Vespa Document Operation Counters", "Document operation failed", 1); + } + warnLog("No valid document id template found. Skipping.", PigWarning.UDF_WARNING_1); return null; } if (operation == null) { - warn("No valid operation found. Skipping.", PigWarning.UDF_WARNING_1); + warnLog("No valid operation found. Skipping.", PigWarning.UDF_WARNING_2); return null; } @@ -107,25 +145,29 @@ public class VespaDocumentOperation extends EvalFunc<String> { Schema inputSchema = getInputSchema(); Map<String, Object> fields = TupleTools.tupleMap(inputSchema, tuple); String docId = TupleTools.toString(fields, template); - // create json json = create(operation, docId, fields, properties, inputSchema); if (json == null || json.length() == 0) { - warn("No valid document operation could be created.", PigWarning.UDF_WARNING_1); + warnLog("No valid document operation could be created.", PigWarning.UDF_WARNING_3); return null; } } catch (Exception e) { + if (statusReporter != null) { + statusReporter.incrCounter("Vespa Document Operation Counters", "Document operation failed", 1); + } StringBuilder sb = new StringBuilder(); sb.append("Caught exception processing input row: \n"); sb.append(tuple.toString()); sb.append("\nException: "); - sb.append(ExceptionUtils.getStackTrace(e)); - warn(sb.toString(), PigWarning.UDF_WARNING_1); + sb.append(getStackTraceAsString(e)); + warnLog(sb.toString(), PigWarning.UDF_WARNING_4); return null; } - + if (statusReporter != null) { + statusReporter.incrCounter("Vespa Document Operation Counters", "Document operation ok", 1); + } return json; } @@ -134,14 +176,14 @@ public class VespaDocumentOperation extends EvalFunc<String> { * Create a JSON Vespa document operation given the supplied fields, * operation and document id template. * - * @param op Operation (put, remove, update) - * @param docId Document id - * @param fields Fields to put in document operation - * @return A valid JSON Vespa document operation + * @param op Operation (put, remove, update) + * @param docId Document id + * @param fields Fields to put in document operation + * @return A valid JSON Vespa document operation * @throws IOException ... */ public static String create(Operation op, String docId, Map<String, Object> fields, Properties properties, - Schema schema) throws IOException { + Schema schema) throws IOException { if (op == null) { return null; } @@ -178,15 +220,72 @@ public class VespaDocumentOperation extends EvalFunc<String> { return out.toString(); } + private static String getPartialOperation(Map<String, String> operationMap, String name, Properties properties) { + // This function checks if the property of the name falls into the map provided + // if yes, return the desired operation. if no, return null + // for example, input: + // operationMap map{"update-map-fields":"assign","remove-map-fields":"remove"} + // name date + // properties "update-map-fields":"date,month" + // output: assign + for (String label : operationMap.keySet()) { + if (properties.getProperty(label) != null) { + String[] p = properties.getProperty(label).split(","); + if (Arrays.asList(p).contains(name)) { + return operationMap.get(label); + } + } + } + return null; + } @SuppressWarnings("unchecked") private static void writeField(String name, Object value, Byte type, JsonGenerator g, Properties properties, Schema schema, Operation op, int depth) throws IOException { if (shouldWriteField(name, properties, depth)) { - g.writeFieldName(name); - if (shouldWritePartialUpdate(op, depth)) { - writePartialUpdate(value, type, g, name, properties, schema, op, depth); + String operation = getPartialOperation(mapPartialOperationMap, name, properties); + // check if the name has the property update-map-fields/remove-map-fields + // if yes, we need special treatments here as we need to loop through the tuple + // be aware the the operation here is not vespa operation such as "put" and "update" + // operation here are the field name we wish use to such as "assign" and "remove" + if (operation != null) { + writePartialUpdateAndRemoveMap(name, value, g, properties, schema, op, depth, operation); } else { - writeValue(value, type, g, name, properties, schema, op, depth); + g.writeFieldName(name); + if (shouldWritePartialUpdate(op, depth)) { + writePartialUpdate(value, type, g, name, properties, schema, op, depth); + } else { + writeValue(value, type, g, name, properties, schema, op, depth); + } + } + + } + } + + private static void writePartialUpdateAndRemoveMap(String name, Object value, JsonGenerator g, Properties properties, Schema schema, Operation op, int depth, String operation) throws IOException { + schema = (schema != null) ? schema.getField(0).schema : null; + // extract the key of map and keys in map for writing json when partial updating maps + Schema valueSchema = (schema != null) ? schema.getField(1).schema : null; + // data format { ( key; id, value: (abc,123,(123234,bbaa))) } + // the first element of each tuple in the bag will be the map to update + // the second element of each tuple in the bag will be the new value of the map + DataBag bag = (DataBag) value; + for (Tuple element : bag) { + if (element.size() != 2) { + continue; + } + String k = (String) element.get(0); + Object v = element.get(1); + Byte t = DataType.findType(v); + if (t == DataType.TUPLE) { + g.writeFieldName(name + "{" + k + "}"); + if (operation.equals(PARTIAL_UPDATE_REMOVE)) { + g.writeStartObject(); + g.writeFieldName(PARTIAL_UPDATE_REMOVE); + g.writeNumber(0); + g.writeEndObject(); + } else { + writePartialUpdate(v, t, g, name, properties, valueSchema, op, depth); + } } } } @@ -235,14 +334,18 @@ public class VespaDocumentOperation extends EvalFunc<String> { g.writeStartObject(); Map<Object, Object> map = (Map<Object, Object>) value; if (shouldCreateTensor(map, name, properties)) { - writeTensor(map, g); + if (isRemoveTensor(name, properties)) { + writeRemoveTensor(map, g); + } else { + writeTensor(map, g); + } } else { for (Map.Entry<Object, Object> entry : map.entrySet()) { String k = entry.getKey().toString(); Object v = entry.getValue(); - Byte t = DataType.findType(v); + Byte t = DataType.findType(v); Schema fieldSchema = (schema != null) ? schema.getField(k).schema : null; - writeField(k, v, t, g, properties, fieldSchema, op, depth+1); + writeField(k, v, t, g, properties, fieldSchema, op, depth + 1); } } g.writeEndObject(); @@ -269,7 +372,6 @@ public class VespaDocumentOperation extends EvalFunc<String> { DataBag bag = (DataBag) value; // get the schema of the tuple in bag schema = (schema != null) ? schema.getField(0).schema : null; - if (shouldWriteBagAsMap(name, properties)) { // when treating bag as map, the schema of bag should be {(key, val)....} // the size of tuple in bag should be 2. 1st one is key. 2nd one is val. @@ -285,9 +387,9 @@ public class VespaDocumentOperation extends EvalFunc<String> { Byte t = DataType.findType(v); if (t == DataType.TUPLE) { Map<String, Object> fields = TupleTools.tupleMap(valueSchema, (Tuple) v); - writeField(k, fields, DataType.MAP, g, properties, valueSchema, op, depth); + writeField(k, fields, DataType.MAP, g, properties, valueSchema, op, depth + 1); } else { - writeField(k, v, t, g, properties, valueSchema, op, depth); + writeField(k, v, t, g, properties, valueSchema, op, depth + 1); } } g.writeEndObject(); @@ -309,7 +411,15 @@ public class VespaDocumentOperation extends EvalFunc<String> { private static void writePartialUpdate(Object value, Byte type, JsonGenerator g, String name, Properties properties, Schema schema, Operation op, int depth) throws IOException { g.writeStartObject(); - g.writeFieldName(PARTIAL_UPDATE_ASSIGN); // TODO: lookup field name in a property to determine correct operation + // here we check if the operation falls into the four partial operations we do on map/tensor structure + // if no, we assume it's a update on the whole document and we write assign here + // if yes, we write the desired operation here + String operation = getPartialOperation(partialOperationMap, name, properties); + if (operation != null) { + g.writeFieldName(operation); + } else { + g.writeFieldName(PARTIAL_UPDATE_ASSIGN); + } writeValue(value, type, g, name, properties, schema, op, depth); g.writeEndObject(); } @@ -335,21 +445,38 @@ public class VespaDocumentOperation extends EvalFunc<String> { } private static boolean shouldWriteTupleAsMap(String name, Properties properties) { + // include UPDATE_MAP_FIELDS here because when updating the map + // the second element in each tuple should be written as a map if (properties == null) { return false; } + String addBagAsMapFields = properties.getProperty(UPDATE_MAP_FIELDS); String simpleObjectFields = properties.getProperty(SIMPLE_OBJECT_FIELDS); - if (simpleObjectFields == null) { + if (simpleObjectFields == null && addBagAsMapFields == null) { return false; } - if (simpleObjectFields.equals("*")) { - return true; + if (addBagAsMapFields != null) { + if (addBagAsMapFields.equals("*")) { + return true; + } + String[] fields = addBagAsMapFields.split(","); + for (String field : fields) { + if (field.trim().equalsIgnoreCase(name)) { + return true; + } + } + } - String[] fields = simpleObjectFields.split(","); - for (String field : fields) { - if (field.trim().equalsIgnoreCase(name)) { + if (simpleObjectFields != null) { + if (simpleObjectFields.equals("*")) { return true; } + String[] fields = simpleObjectFields.split(","); + for (String field : fields) { + if (field.trim().equalsIgnoreCase(name)) { + return true; + } + } } return false; } @@ -378,11 +505,50 @@ public class VespaDocumentOperation extends EvalFunc<String> { if (properties == null) { return false; } - String tensorFields = properties.getProperty(CREATE_TENSOR_FIELDS); - if (tensorFields == null) { + String createTensorFields = properties.getProperty(CREATE_TENSOR_FIELDS); + String addTensorFields = properties.getProperty(UPDATE_TENSOR_FIELDS); + String removeTensorFields = properties.getProperty(REMOVE_TENSOR_FIELDS); + + if (createTensorFields == null && addTensorFields == null && removeTensorFields == null) { return false; } - String[] fields = tensorFields.split(","); + String[] fields; + if (createTensorFields != null) { + fields = createTensorFields.split(","); + for (String field : fields) { + if (field.trim().equalsIgnoreCase(name)) { + return true; + } + } + } + if (addTensorFields != null) { + fields = addTensorFields.split(","); + for (String field : fields) { + if (field.trim().equalsIgnoreCase(name)) { + return true; + } + } + } + if (removeTensorFields != null) { + fields = removeTensorFields.split(","); + for (String field : fields) { + if (field.trim().equalsIgnoreCase(name)) { + return true; + } + } + } + return false; + } + + private static boolean isRemoveTensor(String name, Properties properties) { + if (properties == null) { + return false; + } + String removeTensorFields = properties.getProperty(REMOVE_TENSOR_FIELDS); + if (removeTensorFields == null) { + return false; + } + String[] fields = removeTensorFields.split(","); for (String field : fields) { if (field.trim().equalsIgnoreCase(name)) { return true; @@ -449,4 +615,49 @@ public class VespaDocumentOperation extends EvalFunc<String> { g.writeEndArray(); } + private static void writeRemoveTensor(Map<Object, Object> map, JsonGenerator g) throws IOException { + g.writeFieldName("addresses"); + g.writeStartArray(); + for (Map.Entry<Object, Object> entry : map.entrySet()) { + String k = entry.getKey().toString(); + String[] dimensions = k.split(","); + for (String dimension : dimensions) { + g.writeStartObject(); + if (dimension == null || dimension.isEmpty()) { + continue; + } + String[] address = dimension.split(":"); + if (address.length != 2) { + throw new IllegalArgumentException("Malformed cell address: " + dimension); + } + String dim = address[0]; + String label = address[1]; + if (dim == null || label == null || dim.isEmpty() || label.isEmpty()) { + throw new IllegalArgumentException("Malformed cell address: " + dimension); + } + g.writeFieldName(dim.trim()); + g.writeString(label.trim()); + g.writeEndObject(); + // Write address + } + } + g.writeEndArray(); + } + + // copied from vespajlib for reducing dependency and building with JDK 8 + private static String getStackTraceAsString(Throwable throwable) { + try (StringWriter stringWriter = new StringWriter(); + PrintWriter printWriter = new PrintWriter(stringWriter, true)) { + throwable.printStackTrace(printWriter); + return stringWriter.getBuffer().toString(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + // wrapper to emit logs + private void warnLog(String msg, PigWarning warning) { + warn(msg, warning); + System.err.println(msg); + } } diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java index 3c6805019b8..67003273cac 100644 --- a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java +++ b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java @@ -10,6 +10,7 @@ import org.junit.Test; import java.io.IOException; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; import static org.junit.Assert.assertEquals; @@ -18,7 +19,6 @@ import static org.junit.Assert.assertNull; @SuppressWarnings("serial") public class VespaDocumentOperationTest { - @Test public void requireThatUDFReturnsCorrectJson() throws Exception { String json = getDocumentOperationJson("docid=id:<application>:metrics::<name>-<date>"); @@ -85,6 +85,189 @@ public class VespaDocumentOperationTest { @Test + public void requireThatUDFCorrectlyGeneratesRemoveBagAsMapOperation() throws Exception { + DataBag bag = BagFactory.getInstance().newDefaultBag(); + + Schema innerObjectSchema = new Schema(); + Tuple innerObjectTuple = TupleFactory.getInstance().newTuple(); + addToTuple("year", DataType.CHARARRAY, "2020", innerObjectSchema, innerObjectTuple); + addToTuple("month", DataType.INTEGER, 3, innerObjectSchema, innerObjectTuple); + + Schema objectSchema = new Schema(); + Tuple objectTuple = TupleFactory.getInstance().newTuple(); + addToTuple("key", DataType.CHARARRAY, "234566", objectSchema, objectTuple); + addToTuple("value", DataType.TUPLE, innerObjectTuple,innerObjectSchema,objectSchema, objectTuple); + + Schema bagSchema = new Schema(); + addToBagWithSchema("firstLayerTuple",DataType.TUPLE,objectTuple,objectSchema,bagSchema,bag); + + innerObjectSchema = new Schema(); + innerObjectTuple = TupleFactory.getInstance().newTuple(); + addToTuple("year", DataType.CHARARRAY, "2020", innerObjectSchema, innerObjectTuple); + addToTuple("month", DataType.INTEGER, 3, innerObjectSchema, innerObjectTuple); + + objectSchema = new Schema(); + objectTuple = TupleFactory.getInstance().newTuple(); + addToTuple("key", DataType.CHARARRAY, "123456", objectSchema, objectTuple); + addToTuple("value", DataType.TUPLE, innerObjectTuple,innerObjectSchema,objectSchema, objectTuple); + + addToBagWithSchema("firstLayerTuple",DataType.TUPLE,objectTuple,objectSchema,bagSchema,bag); + + Schema schema = new Schema(); + Tuple tuple = TupleFactory.getInstance().newTuple(); + addToTuple("bag", DataType.BAG, bag, bagSchema, schema, tuple); + addToTuple("id", DataType.CHARARRAY, "123", schema, tuple); + + VespaDocumentOperation docOp = new VespaDocumentOperation("docid=id", "remove-map-fields=bag","operation=update"); + docOp.setInputSchema(schema); + String json = docOp.exec(tuple); + + ObjectMapper m = new ObjectMapper(); + JsonNode root = m.readTree(json); + JsonNode fields = root.get("fields"); + assertEquals("{\"remove\":0}", fields.get("bag{123456}").toString()); + assertEquals("{\"remove\":0}", fields.get("bag{234566}").toString()); + + } + + @Test + public void requireThatUDFCorrectlyGeneratesAddBagAsMapOperation() throws Exception { + + DataBag bag = BagFactory.getInstance().newDefaultBag(); + + Schema innerObjectSchema = new Schema(); + Tuple innerObjectTuple = TupleFactory.getInstance().newTuple(); + addToTuple("year", DataType.CHARARRAY, "2020", innerObjectSchema, innerObjectTuple); + addToTuple("month", DataType.INTEGER, 3, innerObjectSchema, innerObjectTuple); + + Schema objectSchema = new Schema(); + Tuple objectTuple = TupleFactory.getInstance().newTuple(); + addToTuple("key", DataType.CHARARRAY, "123456", objectSchema, objectTuple); + addToTuple("value", DataType.TUPLE, innerObjectTuple,innerObjectSchema,objectSchema, objectTuple); + + Schema bagSchema = new Schema(); + addToBagWithSchema("firstLayerTuple",DataType.TUPLE,objectTuple,objectSchema,bagSchema,bag); + + Schema schema = new Schema(); + Tuple tuple = TupleFactory.getInstance().newTuple(); + addToTuple("bag", DataType.BAG, bag, bagSchema, schema, tuple); + addToTuple("id", DataType.CHARARRAY, "123", schema, tuple); + VespaDocumentOperation docOp = new VespaDocumentOperation("docid=id", "update-map-fields=bag","operation=update"); + docOp.setInputSchema(schema); + String json = docOp.exec(tuple); + + ObjectMapper m = new ObjectMapper(); + JsonNode root = m.readTree(json); + + JsonNode fields = root.get("fields"); + JsonNode value = fields.get("bag{123456}"); + JsonNode assign = value.get("assign"); + assertEquals("2020", assign.get("year").getTextValue()); + assertEquals(3, assign.get("month").getIntValue()); + } + + @Test + public void requireThatUDFCorrectlyGeneratesAddTensorOperation() throws Exception { + + Schema schema = new Schema(); + Tuple tuple = TupleFactory.getInstance().newTuple(); + + // Please refer to the tensor format documentation + + Map<String, Double> tensor = new HashMap<String, Double>() {{ + put("x:label1,y:label2,z:label4", 2.0); + put("x:label3", 3.0); + }}; + + addToTuple("id", DataType.CHARARRAY, "123", schema, tuple); + addToTuple("tensor", DataType.MAP, tensor, schema, tuple); + + VespaDocumentOperation docOp = new VespaDocumentOperation("docid=empty", "update-tensor-fields=tensor","operation=update"); + docOp.setInputSchema(schema); + String json = docOp.exec(tuple); + + ObjectMapper m = new ObjectMapper(); + JsonNode root = m.readTree(json); + JsonNode fields = root.get("fields"); + JsonNode tensorValue = fields.get("tensor"); + JsonNode add = tensorValue.get("add"); + JsonNode cells = add.get("cells"); + Iterator<JsonNode> cellsIterator = cells.getElements(); + + JsonNode element = cellsIterator.next(); + assertEquals("label1", element.get("address").get("x").getTextValue()); + assertEquals("label2", element.get("address").get("y").getTextValue()); + assertEquals("label4", element.get("address").get("z").getTextValue()); + assertEquals("2.0", element.get("value").toString()); + + element = cellsIterator.next(); + assertEquals("label3", element.get("address").get("x").getTextValue()); + assertEquals("3.0", element.get("value").toString()); + } + + @Test + public void requireThatUDFCorrectlyGeneratesRemoveTensorOperation() throws Exception { + + Schema schema = new Schema(); + Tuple tuple = TupleFactory.getInstance().newTuple(); + + // Please refer to the tensor format documentation + + Map<String, Double> tensor = new HashMap<String, Double>() {{ + put("x:label1,y:label2,z:label4", 2.0); + put("x:label3", 3.0); + }}; + + addToTuple("id", DataType.CHARARRAY, "123", schema, tuple); + addToTuple("tensor", DataType.MAP, tensor, schema, tuple); + + VespaDocumentOperation docOp = new VespaDocumentOperation("docid=empty", "remove-tensor-fields=tensor","operation=update"); + docOp.setInputSchema(schema); + String json = docOp.exec(tuple); + + ObjectMapper m = new ObjectMapper(); + JsonNode root = m.readTree(json); + JsonNode fields = root.get("fields"); + JsonNode tensorValue = fields.get("tensor"); + JsonNode remove = tensorValue.get("remove"); + JsonNode address = remove.get("addresses"); + + Iterator<JsonNode> addressIterator = address.getElements(); + + JsonNode element = addressIterator.next(); + assertEquals("label1", element.get("x").getTextValue()); + + element = addressIterator.next(); + assertEquals("label2", element.get("y").getTextValue()); + + element = addressIterator.next(); + assertEquals("label4", element.get("z").getTextValue()); + + element = addressIterator.next(); + assertEquals("label3", element.get("x").getTextValue()); + } + + @Test + public void requireThatUDFReturnsNullWhenExceptionHappens() throws IOException { + Schema schema = new Schema(); + Tuple tuple = TupleFactory.getInstance().newTuple(); + + // broken DELTA format that would throw internally + Map<String, Double> tensor = new HashMap<String, Double>() {{ + put("xlabel1", 2.0); // missing : between 'x' and 'label1' + }}; + + addToTuple("id", DataType.CHARARRAY, "123", schema, tuple); + addToTuple("tensor", DataType.MAP, tensor, schema, tuple); + + VespaDocumentOperation docOp = new VespaDocumentOperation("docid=empty", "create-tensor-fields=tensor"); + docOp.setInputSchema(schema); + String json = docOp.exec(tuple); + + assertNull(json); + } + + @Test public void requireThatUDFCorrectlyGeneratesRemoveOperation() throws Exception { String json = getDocumentOperationJson("operation=remove", "docid=id:<application>:metrics::<name>-<date>"); ObjectMapper m = new ObjectMapper(); @@ -368,4 +551,10 @@ public class VespaDocumentOperationTest { schema.add(new Schema.FieldSchema(alias, schemaInField, type)); tuple.append(value); } + + private void addToBagWithSchema(String alias, byte type, Tuple value, Schema schemaInField, Schema schema,DataBag bag) + throws FrontendException { + schema.add(new Schema.FieldSchema(alias, schemaInField, type)); + bag.add(value); + } } diff --git a/vespalib/src/vespa/vespalib/util/gencnt.cpp b/vespalib/src/vespa/vespalib/util/gencnt.cpp index 5adbf14a757..ad82cf2e67c 100644 --- a/vespalib/src/vespa/vespalib/util/gencnt.cpp +++ b/vespalib/src/vespa/vespalib/util/gencnt.cpp @@ -58,7 +58,7 @@ GenCnt::distance(const GenCnt &other) const GenCnt & GenCnt::operator=(const GenCnt &src) { - _val = src._val; + _val = src.getAsInt(); return *this; } diff --git a/vespalib/src/vespa/vespalib/util/gencnt.h b/vespalib/src/vespa/vespalib/util/gencnt.h index 7bfc5a7e49b..cac868a8adb 100644 --- a/vespalib/src/vespa/vespalib/util/gencnt.h +++ b/vespalib/src/vespa/vespalib/util/gencnt.h @@ -2,6 +2,7 @@ #pragma once #include <cstdint> +#include <atomic> namespace vespalib { @@ -16,7 +17,7 @@ namespace vespalib { class GenCnt { private: - uint32_t _val; + std::atomic<uint32_t> _val; public: /** @@ -31,12 +32,12 @@ public: **/ GenCnt(uint32_t val) : _val(val) {} - GenCnt(const GenCnt &rhs) = default; + GenCnt(const GenCnt &rhs) : _val(rhs.getAsInt()) {} /** * @brief empty destructor **/ - ~GenCnt() {} + ~GenCnt() = default; /** * @brief Increase the generation count held by this object @@ -95,7 +96,7 @@ public: * * @return generation counter **/ - uint32_t getAsInt() const { return _val; } + uint32_t getAsInt() const { return _val.load(std::memory_order_relaxed); } /** * @brief Set the generation counter from an integer diff --git a/vespalib/src/vespa/vespalib/util/signalhandler.cpp b/vespalib/src/vespa/vespalib/util/signalhandler.cpp index 21543ef10d8..c4fb7cfa517 100644 --- a/vespalib/src/vespa/vespalib/util/signalhandler.cpp +++ b/vespalib/src/vespa/vespalib/util/signalhandler.cpp @@ -2,6 +2,11 @@ #include "signalhandler.h" #include <cassert> +#include <atomic> +#include <chrono> +#include <thread> + +using namespace std::chrono_literals; namespace vespalib { @@ -9,6 +14,9 @@ std::vector<SignalHandler*> SignalHandler::_handlers; namespace { +// 31 bit concurrency counter, 1 (lsb) bit indicating shutdown +std::atomic<int> signal_counter; + class Shutdown { public: @@ -19,9 +27,6 @@ public: } -// Clear SignalHandler::_handlers in a slightly less unsafe manner. -Shutdown shutdown; - SignalHandler SignalHandler::HUP(SIGHUP); SignalHandler SignalHandler::INT(SIGINT); SignalHandler SignalHandler::TERM(SIGTERM); @@ -36,12 +41,19 @@ SignalHandler SignalHandler::FPE(SIGFPE); SignalHandler SignalHandler::QUIT(SIGQUIT); SignalHandler SignalHandler::USR1(SIGUSR1); +// Clear SignalHandler::_handlers in a slightly less unsafe manner. +Shutdown shutdown; + void SignalHandler::handleSignal(int signal) { - if ((((size_t)signal) < _handlers.size()) && (_handlers[signal] != 0)) { - _handlers[signal]->gotSignal(); + static_assert(std::atomic<int>::is_always_lock_free, "signal_counter must be lock free"); + if ((signal_counter.fetch_add(2) & 1) == 0) { + if ((((size_t)signal) < _handlers.size()) && (_handlers[signal] != 0)) { + _handlers[signal]->gotSignal(); + } } + signal_counter.fetch_sub(2); } void @@ -108,12 +120,21 @@ SignalHandler::unhook() void SignalHandler::shutdown() { + while ((signal_counter.fetch_or(1) & ~1) != 0) { + std::this_thread::sleep_for(10ms); + } for (std::vector<SignalHandler*>::iterator it = _handlers.begin(), ite = _handlers.end(); it != ite; ++it) { - if (*it != nullptr) - (*it)->unhook(); + if (*it != nullptr) { + // Ignore SIGTERM at shutdown in case valgrind is used. + if ((*it)->_signal == SIGTERM) { + (*it)->ignore(); + } else { + (*it)->unhook(); + } + } } std::vector<SignalHandler *>().swap(_handlers); } diff --git a/vespalog/abi-spec.json b/vespalog/abi-spec.json index 09ac3fa75d3..996cc0259a0 100644 --- a/vespalog/abi-spec.json +++ b/vespalog/abi-spec.json @@ -186,19 +186,6 @@ ], "fields": [] }, - "com.yahoo.log.MappedLevelControllerRepo": { - "superClass": "java.lang.Object", - "interfaces": [], - "attributes": [ - "public" - ], - "methods": [ - "public void <init>(java.nio.MappedByteBuffer, int, int, java.lang.String)", - "public com.yahoo.log.LevelController getLevelController(java.lang.String)", - "public void checkBack()" - ], - "fields": [] - }, "com.yahoo.log.RejectFilter": { "superClass": "java.lang.Object", "interfaces": [], @@ -300,15 +287,10 @@ "public" ], "methods": [ - "public void <init>(java.lang.String, java.lang.String, java.lang.String)", - "public com.yahoo.log.LevelController getLevelControl(java.lang.String)", "public com.yahoo.log.LevelController getLevelController(java.lang.String)", "public void close()" ], - "fields": [ - "public static final int controlFileHeaderLength", - "public static final int numLevels" - ] + "fields": [] }, "com.yahoo.log.event.Collection": { "superClass": "com.yahoo.log.event.Event", diff --git a/vespalog/src/main/java/com/yahoo/log/LevelController.java b/vespalog/src/main/java/com/yahoo/log/LevelController.java index ccd18f126d6..0efe0d4e7c1 100644 --- a/vespalog/src/main/java/com/yahoo/log/LevelController.java +++ b/vespalog/src/main/java/com/yahoo/log/LevelController.java @@ -1,4 +1,8 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.log; + +import java.util.logging.Level; + /** * This is the interface for controlling the log level of a * component logger. This hides the actual controlling @@ -7,32 +11,24 @@ * @author arnej27959 * */ - -/** - * @author arnej27959 - **/ -package com.yahoo.log; - -import java.util.logging.Level; - public interface LevelController { /** * should we actually publish a log message with the given Level now? - **/ - public boolean shouldLog(Level level); + */ + boolean shouldLog(Level level); /** * return a string suitable for printing in a logctl file. * the string must be be 4 * 8 characters, where each group * of 4 characters is either " ON" or " OFF". - **/ - public String getOnOffString(); + */ + String getOnOffString(); /** * check the current state of logging and reflect it into the * associated Logger instance, if available. - **/ - public void checkBack(); - public Level getLevelLimit(); + */ + void checkBack(); + Level getLevelLimit(); } diff --git a/vespalog/src/main/java/com/yahoo/log/MappedLevelControllerRepo.java b/vespalog/src/main/java/com/yahoo/log/MappedLevelControllerRepo.java index f02d8793b23..53f4de4f264 100644 --- a/vespalog/src/main/java/com/yahoo/log/MappedLevelControllerRepo.java +++ b/vespalog/src/main/java/com/yahoo/log/MappedLevelControllerRepo.java @@ -13,14 +13,14 @@ import java.util.Map; * @author Ulf Lilleengen * @since 5.1 */ -public class MappedLevelControllerRepo { +class MappedLevelControllerRepo { private final Map<String, LevelController> levelControllerMap = new HashMap<>(); private final MappedByteBuffer mapBuf; private final int controlFileHeaderLength; private final int numLevels; private final String logControlFilename; - public MappedLevelControllerRepo(MappedByteBuffer mapBuf, int controlFileHeaderLength, int numLevels, String logControlFilename) { + MappedLevelControllerRepo(MappedByteBuffer mapBuf, int controlFileHeaderLength, int numLevels, String logControlFilename) { this.mapBuf = mapBuf; this.controlFileHeaderLength = controlFileHeaderLength; this.numLevels = numLevels; @@ -101,12 +101,12 @@ public class MappedLevelControllerRepo { return MappedLevelController.checkOnOff(mapBuf, levstart); } - public LevelController getLevelController(String suffix) { + LevelController getLevelController(String suffix) { return levelControllerMap.get(suffix); } - public void checkBack() { + void checkBack() { for (LevelController ctrl : levelControllerMap.values()) { ctrl.checkBack(); } diff --git a/vespalog/src/main/java/com/yahoo/log/VespaLevelControllerRepo.java b/vespalog/src/main/java/com/yahoo/log/VespaLevelControllerRepo.java index 85d92075827..86eba1c019e 100644 --- a/vespalog/src/main/java/com/yahoo/log/VespaLevelControllerRepo.java +++ b/vespalog/src/main/java/com/yahoo/log/VespaLevelControllerRepo.java @@ -30,12 +30,12 @@ public class VespaLevelControllerRepo implements LevelControllerRepo { /** * length of fixed header content of a control file, constant: **/ - public static final int controlFileHeaderLength; + static final int controlFileHeaderLength; /** * number of distinctly controlled levels (in logctl files), * must be compatible with C++ Vespa logging **/ - public static final int numLevels = 8; + static final int numLevels = 8; static { controlFileHeaderLength = CFHEADER.length() @@ -50,7 +50,7 @@ public class VespaLevelControllerRepo implements LevelControllerRepo { **/ private LevelController defaultLevelCtrl; - public VespaLevelControllerRepo(String logCtlFn, String logLevel, String applicationPrefix) { + VespaLevelControllerRepo(String logCtlFn, String logLevel, String applicationPrefix) { this.logControlFilename = logCtlFn; this.appPrefix = applicationPrefix; defaultLevelCtrl = new DefaultLevelController(logLevel); @@ -116,17 +116,17 @@ public class VespaLevelControllerRepo implements LevelControllerRepo { ctlFile.writeBytes(appPrefix); } ctlFile.writeBytes("\n"); - for (int i = appLen; i < maxPrefix; i++) { + for (int i = appLen; i < maxPrefix + 2; i++) { byte space = ' '; ctlFile.write(space); } ctlFile.writeBytes("\n"); ctlFile.setLength(ctlFile.getFilePointer()); - if (ctlFile.getFilePointer() != controlFileHeaderLength) { + if (ctlFile.getFilePointer() != (controlFileHeaderLength + 2)) { System.err.println("internal error, bad header length: " + ctlFile.getFilePointer() + " (should have been: " - + controlFileHeaderLength + + (controlFileHeaderLength + 2) + ")"); } } @@ -142,7 +142,7 @@ public class VespaLevelControllerRepo implements LevelControllerRepo { levelControllerRepo = new MappedLevelControllerRepo(mapBuf, controlFileHeaderLength, numLevels, logControlFilename); } - public LevelController getLevelControl(String suffix) { + private LevelController getLevelControl(String suffix) { LevelController ctrl = null; if (levelControllerRepo != null) { if (suffix == null || suffix.equals("default")) { diff --git a/vespalog/src/main/java/com/yahoo/log/VespaLogHandler.java b/vespalog/src/main/java/com/yahoo/log/VespaLogHandler.java index 331780f226b..32b1003c20c 100644 --- a/vespalog/src/main/java/com/yahoo/log/VespaLogHandler.java +++ b/vespalog/src/main/java/com/yahoo/log/VespaLogHandler.java @@ -44,7 +44,7 @@ class VespaLogHandler extends StreamHandler { /** * Publish a log record into the Vespa log target. */ - public synchronized void publish (LogRecord record) { + public synchronized void publish(LogRecord record) { Level level = record.getLevel(); String component = record.getLoggerName(); diff --git a/vespalog/src/test/java/com/yahoo/log/VespaLevelControllerRepoTest.java b/vespalog/src/test/java/com/yahoo/log/VespaLevelControllerRepoTest.java index 3bd4de03f4e..9d04e079f55 100644 --- a/vespalog/src/test/java/com/yahoo/log/VespaLevelControllerRepoTest.java +++ b/vespalog/src/test/java/com/yahoo/log/VespaLevelControllerRepoTest.java @@ -62,12 +62,14 @@ public class VespaLevelControllerRepoTest { RandomAccessFile lcfile = new RandomAccessFile(lcf, "rw"); - lcfile.seek(VespaLevelControllerRepo.controlFileHeaderLength); + lcfile.seek(VespaLevelControllerRepo.controlFileHeaderLength+1); + assertEquals(lcfile.readByte(), '\n'); + lcfile.seek(VespaLevelControllerRepo.controlFileHeaderLength+2); assertEquals(lcfile.readByte(), 'd'); - lcfile.seek(VespaLevelControllerRepo.controlFileHeaderLength + 7); + lcfile.seek(VespaLevelControllerRepo.controlFileHeaderLength+2 + 7); assertEquals(lcfile.readByte(), ':'); - assertEquals(0, (VespaLevelControllerRepo.controlFileHeaderLength+9) % 4); - lcfile.seek(VespaLevelControllerRepo.controlFileHeaderLength + 9); + assertEquals(0, (VespaLevelControllerRepo.controlFileHeaderLength+13) % 4); + lcfile.seek(VespaLevelControllerRepo.controlFileHeaderLength + 13); assertEquals(0x20204f4e, lcfile.readInt()); int off = findControlString(lcfile, "com.yahoo.log.test"); diff --git a/vespalog/src/test/java/com/yahoo/log/VespaLogHandlerTestCase.java b/vespalog/src/test/java/com/yahoo/log/VespaLogHandlerTestCase.java index c0dd856b634..220e5e9271e 100644 --- a/vespalog/src/test/java/com/yahoo/log/VespaLogHandlerTestCase.java +++ b/vespalog/src/test/java/com/yahoo/log/VespaLogHandlerTestCase.java @@ -13,6 +13,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStream; +import java.nio.charset.StandardCharsets; import java.util.LinkedList; import java.util.List; import java.util.concurrent.BrokenBarrierException; @@ -32,20 +33,20 @@ import static org.junit.Assert.fail; * @author Bjorn Borud */ public class VespaLogHandlerTestCase { - protected static String hostname; - protected static String pid; + private static String hostname; + private static String pid; - protected static LogRecord record1; - protected static String record1String; + static LogRecord record1; + static String record1String; - protected static LogRecord record2; - protected static String record2String; + static LogRecord record2; + private static String record2String; - protected static LogRecord record3; - protected static String record3String; + private static LogRecord record3; + private static String record3String; - protected static LogRecord record4; - protected static String record4String; + private static LogRecord record4; + private static String record4String; static { hostname = Util.getHostName(); @@ -139,7 +140,7 @@ public class VespaLogHandlerTestCase { } @Test - public void testFallback() throws FileNotFoundException { + public void testFallback() { File file = new File("mydir2"); file.delete(); assertTrue(file.mkdir()); @@ -157,7 +158,7 @@ public class VespaLogHandlerTestCase { * Perform simple test */ @Test - public void testLogCtl () throws InterruptedException, FileNotFoundException { + public void testLogCtl () { MockLevelController ctl = new MockLevelController(); MockLevelControllerRepo ctlRepo = new MockLevelControllerRepo(ctl); MockLogTarget target = new MockLogTarget(); @@ -203,7 +204,7 @@ public class VespaLogHandlerTestCase { @Test public void testRotate () throws IOException { // Doesn't work in Windows. TODO: Fix the logging stuff - if (System.getProperty("os.name").toLowerCase().indexOf("win")>=0) + if (System.getProperty("os.name").toLowerCase().contains("win")) return; try { VespaLogHandler h @@ -269,10 +270,8 @@ public class VespaLogHandlerTestCase { ); class LogRacer implements Runnable { - private int n; - public LogRacer (int n) { - this.n = n; + private LogRacer() { } public void run () { @@ -285,7 +284,7 @@ public class VespaLogHandlerTestCase { } } - public void logLikeCrazy () { + void logLikeCrazy() { for (int j = 0; j < numLogEntries; j++) { try { h.publish(record1); @@ -299,7 +298,7 @@ public class VespaLogHandlerTestCase { } for (int i = 0; i < numThreads; i++) { - t[i] = new Thread(new LogRacer(i)); + t[i] = new Thread(new LogRacer()); t[i].start(); } @@ -361,35 +360,23 @@ public class VespaLogHandlerTestCase { * */ protected static String[] readFile (String fileName) { - BufferedReader br = null; - List<String> lines = new LinkedList<String>(); - try { - br = new BufferedReader( - new InputStreamReader(new FileInputStream(new File(fileName)), "UTF-8")); + List<String> lines = new LinkedList<>(); + try (BufferedReader br = new BufferedReader( + new InputStreamReader(new FileInputStream(new File(fileName)), StandardCharsets.UTF_8))) { for (String line = br.readLine(); line != null; - line = br.readLine()) - { + line = br.readLine()) { lines.add(line); } - return lines.toArray(new String[lines.size()]); - } - catch (Throwable e) { + return lines.toArray(new String[0]); + } catch (Throwable e) { return new String[0]; } - finally { - if (br != null) { - try { - br.close(); - } - catch (IOException e) {} - } - } } private static class MockLevelControllerRepo implements LevelControllerRepo { private LevelController levelController; - public MockLevelControllerRepo(LevelController controller) { + MockLevelControllerRepo(LevelController controller) { this.levelController = controller; } @@ -411,7 +398,7 @@ public class VespaLogHandlerTestCase { return (level.equals(logLevel)); } - public void setShouldLog(Level level) { + void setShouldLog(Level level) { this.logLevel = level; } @@ -431,7 +418,7 @@ public class VespaLogHandlerTestCase { private static class MockLogTarget implements LogTarget { private final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - public String[] getLines() { + String[] getLines() { return baos.toString().split("\n"); } @Override diff --git a/vespalog/src/vespa/log/control-file.cpp b/vespalog/src/vespa/log/control-file.cpp index 4a4fd36e0ac..a4fe9e98b8d 100644 --- a/vespalog/src/vespa/log/control-file.cpp +++ b/vespalog/src/vespa/log/control-file.cpp @@ -69,7 +69,7 @@ ControlFile::ensureHeader() perror("log::ControlFile write(A) failed"); } - char spaces[_maxPrefix + 1]; + char spaces[_maxPrefix + 3]; memset(spaces, ' ', sizeof spaces); spaces[sizeof(spaces) - 1] = '\0'; |