diff options
86 files changed, 1920 insertions, 570 deletions
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/ConfigSentinel.java b/config-model/src/main/java/com/yahoo/vespa/model/ConfigSentinel.java index 800bf73cdbb..d05913143e4 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/ConfigSentinel.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/ConfigSentinel.java @@ -86,11 +86,11 @@ public class ConfigSentinel extends AbstractService implements SentinelConfig.Pr private SentinelConfig.Connectivity.Builder getConnectivityConfig(boolean enable) { var builder = new SentinelConfig.Connectivity.Builder(); if (enable) { - builder.maxBadOutPercent(60); - builder.maxBadReverseCount(3); + builder.minOkPercent(40); + builder.maxBadCount(3); } else { - builder.maxBadOutPercent(100); - builder.maxBadReverseCount(Integer.MAX_VALUE); + builder.minOkPercent(0); + builder.maxBadCount(Integer.MAX_VALUE); } return builder; } diff --git a/config-model/src/main/java/com/yahoo/vespa/model/content/ClusterResourceLimits.java b/config-model/src/main/java/com/yahoo/vespa/model/content/ClusterResourceLimits.java index 8db656a5f2c..638864d85bb 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/content/ClusterResourceLimits.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/content/ClusterResourceLimits.java @@ -1,6 +1,7 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.model.content; +import com.yahoo.config.application.api.DeployLogger; import com.yahoo.vespa.model.builder.xml.dom.ModelElement; import com.yahoo.vespa.model.content.cluster.DomResourceLimitsBuilder; @@ -37,13 +38,16 @@ public class ClusterResourceLimits { private final boolean enableFeedBlockInDistributor; private final boolean hostedVespa; private final boolean throwIfSpecified; + private final DeployLogger deployLogger; + private ResourceLimits.Builder ctrlBuilder = new ResourceLimits.Builder(); private ResourceLimits.Builder nodeBuilder = new ResourceLimits.Builder(); - public Builder(boolean enableFeedBlockInDistributor, boolean hostedVespa, boolean throwIfSpecified) { + public Builder(boolean enableFeedBlockInDistributor, boolean hostedVespa, boolean throwIfSpecified, DeployLogger deployLogger) { this.enableFeedBlockInDistributor = enableFeedBlockInDistributor; this.hostedVespa = hostedVespa; this.throwIfSpecified = throwIfSpecified; + this.deployLogger = deployLogger; } public ClusterResourceLimits build(ModelElement clusterElem) { @@ -57,7 +61,7 @@ public class ClusterResourceLimits { private ResourceLimits.Builder createBuilder(ModelElement element) { return element == null ? new ResourceLimits.Builder() - : DomResourceLimitsBuilder.createBuilder(element, hostedVespa, throwIfSpecified); + : DomResourceLimitsBuilder.createBuilder(element, hostedVespa, throwIfSpecified, deployLogger); } public void setClusterControllerBuilder(ResourceLimits.Builder builder) { diff --git a/config-model/src/main/java/com/yahoo/vespa/model/content/cluster/ContentCluster.java b/config-model/src/main/java/com/yahoo/vespa/model/content/cluster/ContentCluster.java index e9264a6d9fc..c298b7f5f5a 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/content/cluster/ContentCluster.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/content/cluster/ContentCluster.java @@ -123,7 +123,8 @@ public class ContentCluster extends AbstractConfigProducer<AbstractConfigProduce boolean enableFeedBlockInDistributor = deployState.getProperties().featureFlags().enableFeedBlockInDistributor(); var resourceLimits = new ClusterResourceLimits.Builder(enableFeedBlockInDistributor, stateIsHosted(deployState), - deployState.featureFlags().throwIfResourceLimitsSpecified()) + deployState.featureFlags().throwIfResourceLimitsSpecified(), + deployState.getDeployLogger()) .build(contentElement); c.clusterControllerConfig = new ClusterControllerConfig.Builder(getClusterId(contentElement), contentElement, diff --git a/config-model/src/main/java/com/yahoo/vespa/model/content/cluster/DomResourceLimitsBuilder.java b/config-model/src/main/java/com/yahoo/vespa/model/content/cluster/DomResourceLimitsBuilder.java index bab991efe51..37adb73bc15 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/content/cluster/DomResourceLimitsBuilder.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/content/cluster/DomResourceLimitsBuilder.java @@ -1,9 +1,12 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.model.content.cluster; +import com.yahoo.config.application.api.DeployLogger; import com.yahoo.vespa.model.builder.xml.dom.ModelElement; import com.yahoo.vespa.model.content.ResourceLimits; +import java.util.logging.Level; + /** * Builder for feed block resource limits. * @@ -11,13 +14,21 @@ import com.yahoo.vespa.model.content.ResourceLimits; */ public class DomResourceLimitsBuilder { - public static ResourceLimits.Builder createBuilder(ModelElement contentXml, boolean hostedVespa, boolean throwIfSpecified) { + public static ResourceLimits.Builder createBuilder(ModelElement contentXml, + boolean hostedVespa, + boolean throwIfSpecified, + DeployLogger deployLogger) { ResourceLimits.Builder builder = new ResourceLimits.Builder(); ModelElement resourceLimits = contentXml.child("resource-limits"); if (resourceLimits == null) { return builder; } - if (hostedVespa && throwIfSpecified) - throw new IllegalArgumentException("Element '" + resourceLimits + "' is not allowed to be set"); + if (hostedVespa) { + String message = "Element '" + resourceLimits + "' is not allowed to be set"; + if (throwIfSpecified) + throw new IllegalArgumentException(message); + else + deployLogger.logApplicationPackage(Level.WARNING, message); + } if (resourceLimits.child("disk") != null) { builder.setDiskLimit(resourceLimits.childAsDouble("disk")); diff --git a/config-model/src/test/java/com/yahoo/vespa/model/content/ClusterResourceLimitsTest.java b/config-model/src/test/java/com/yahoo/vespa/model/content/ClusterResourceLimitsTest.java index ad1f5331a91..4324f257922 100644 --- a/config-model/src/test/java/com/yahoo/vespa/model/content/ClusterResourceLimitsTest.java +++ b/config-model/src/test/java/com/yahoo/vespa/model/content/ClusterResourceLimitsTest.java @@ -1,6 +1,9 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.model.content; +import com.yahoo.config.application.api.DeployLogger; +import com.yahoo.config.model.application.provider.BaseDeployLogger; +import com.yahoo.searchdefinition.derived.TestableDeployLogger; import com.yahoo.text.XML; import com.yahoo.vespa.model.builder.xml.dom.ModelElement; import org.junit.Rule; @@ -49,7 +52,10 @@ public class ClusterResourceLimitsTest { return this; } public ClusterResourceLimits build() { - var builder = new ClusterResourceLimits.Builder(enableFeedBlockInDistributor, false, false); + var builder = new ClusterResourceLimits.Builder(enableFeedBlockInDistributor, + false, + false, + new BaseDeployLogger()); builder.setClusterControllerBuilder(ctrlBuilder); builder.setContentNodeBuilder(nodeBuilder); return builder.build(); @@ -125,24 +131,39 @@ public class ClusterResourceLimitsTest { @Test public void exception_is_thrown_when_resource_limits_are_specified() { - final boolean hosted = true; + TestableDeployLogger logger = new TestableDeployLogger(); - Document clusterXml = XML.getDocument("<cluster id=\"test\">" + - " <tuning>\n" + - " <resource-limits>\n" + - " <memory>0.92</memory>\n" + - " </resource-limits>\n" + - " </tuning>\n" + - "</cluster>"); + buildClusterResourceLimitsAndLogIfSpecified(logger); + assertEquals(1, logger.warnings.size()); + assertEquals("Element 'resource-limits' is not allowed to be set", logger.warnings.get(0)); expectedException.expect(IllegalArgumentException.class); expectedException.expectMessage(containsString("Element 'resource-limits' is not allowed to be set")); - ClusterResourceLimits.Builder builder = new ClusterResourceLimits.Builder(true, hosted, true); - builder.build(new ModelElement(clusterXml.getDocumentElement())); + buildClusterResourceLimitsAndThrowIfSpecified(logger); + } + + private void buildClusterResourceLimitsAndThrowIfSpecified(DeployLogger deployLogger) { + buildClusterResourceLimits(true, deployLogger); + } - expectedException = ExpectedException.none(); - ClusterResourceLimits.Builder builder2 = new ClusterResourceLimits.Builder(true, hosted, false); - builder2.build(new ModelElement(clusterXml.getDocumentElement())); + private void buildClusterResourceLimitsAndLogIfSpecified(DeployLogger deployLogger) { + buildClusterResourceLimits(false, deployLogger); + } + + private void buildClusterResourceLimits(boolean throwIfSpecified, DeployLogger deployLogger) { + Document clusterXml = XML.getDocument("<cluster id=\"test\">" + + " <tuning>\n" + + " <resource-limits>\n" + + " <memory>0.92</memory>\n" + + " </resource-limits>\n" + + " </tuning>\n" + + "</cluster>"); + + ClusterResourceLimits.Builder builder = new ClusterResourceLimits.Builder(true, + true, + throwIfSpecified, + deployLogger); + builder.build(new ModelElement(clusterXml.getDocumentElement())); } private void assertLimits(Double expCtrlDisk, Double expCtrlMemory, Double expNodeDisk, Double expNodeMemory, Fixture f) { diff --git a/config-model/src/test/java/com/yahoo/vespa/model/content/FleetControllerClusterTest.java b/config-model/src/test/java/com/yahoo/vespa/model/content/FleetControllerClusterTest.java index 2eecfa9440e..10bb00168bb 100644 --- a/config-model/src/test/java/com/yahoo/vespa/model/content/FleetControllerClusterTest.java +++ b/config-model/src/test/java/com/yahoo/vespa/model/content/FleetControllerClusterTest.java @@ -1,6 +1,7 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.model.content; +import com.yahoo.config.model.application.provider.BaseDeployLogger; import com.yahoo.config.model.deploy.DeployState; import com.yahoo.config.model.deploy.TestProperties; import com.yahoo.config.model.test.MockRoot; @@ -23,7 +24,10 @@ public class FleetControllerClusterTest { var clusterElement = new ModelElement(doc.getDocumentElement()); return new ClusterControllerConfig.Builder("storage", clusterElement, - new ClusterResourceLimits.Builder(enableFeedBlockInDistributor, false, false) + new ClusterResourceLimits.Builder(enableFeedBlockInDistributor, + false, + false, + new BaseDeployLogger()) .build(clusterElement).getClusterControllerLimits()) .build(root.getDeployState(), root, clusterElement.getXml()); } diff --git a/configd/src/apps/sentinel/connectivity.cpp b/configd/src/apps/sentinel/connectivity.cpp index 8314a090616..c1c49e3068a 100644 --- a/configd/src/apps/sentinel/connectivity.cpp +++ b/configd/src/apps/sentinel/connectivity.cpp @@ -120,8 +120,8 @@ void Connectivity::configure(const SentinelConfig::Connectivity &config, const ModelConfig &model) { _config = config; - LOG(config, "connectivity.maxBadReverseCount = %d", _config.maxBadReverseCount); - LOG(config, "connectivity.maxBadOutPercent = %d", _config.maxBadOutPercent); + LOG(config, "connectivity.maxBadCount = %d", _config.maxBadCount); + LOG(config, "connectivity.minOkPercent = %d", _config.minOkPercent); _checkSpecs = specsFrom(model); } @@ -143,7 +143,7 @@ Connectivity::checkConnectivity(RpcServer &rpcServer) { } checkContext.latch.await(); classifyConnFails(connectivityMap, _checkSpecs, rpcServer); - Accumulated accumulated; + Accumulator accumulated; for (const auto & [hostname, check] : connectivityMap) { std::string detail = toString(check.result()); std::string prev = _detailsPerHost[hostname]; @@ -152,42 +152,43 @@ Connectivity::checkConnectivity(RpcServer &rpcServer) { } _detailsPerHost[hostname] = detail; LOG_ASSERT(check.result() != CcResult::UNKNOWN); - accumulate(accumulated, check.result()); + accumulated.handleResult(check.result()); } - return enoughOk(accumulated, clusterSize); + return accumulated.enoughOk(_config); } -void Connectivity::accumulate(Accumulated &target, CcResult value) { +void Connectivity::Accumulator::handleResult(CcResult value) { + ++_numHandled; switch (value) { case CcResult::UNKNOWN: case CcResult::UNREACHABLE_UP: case CcResult::INDIRECT_PING_FAIL: - ++target.numSeriousIssues; - ++target.numIssues; + ++_numBad; break; case CcResult::CONN_FAIL: - ++target.numIssues; + // not OK, but not a serious issue either break; case CcResult::INDIRECT_PING_UNAVAIL: case CcResult::ALL_OK: + ++_numOk; break; } } -bool Connectivity::enoughOk(const Accumulated &results, size_t clusterSize) { +bool Connectivity::Accumulator::enoughOk(const SentinelConfig::Connectivity &config) const { bool enough = true; - if (results.numSeriousIssues > size_t(_config.maxBadReverseCount)) { + if (_numBad > size_t(config.maxBadCount)) { LOG(warning, "%zu of %zu nodes up but with network connectivity problems (max is %d)", - results.numSeriousIssues, clusterSize, _config.maxBadReverseCount); + _numBad, _numHandled, config.maxBadCount); enough = false; } - if (results.numIssues * 100.0 > _config.maxBadOutPercent * clusterSize) { - double pct = results.numIssues * 100.0 / clusterSize; - LOG(warning, "Problems with connection to %zu of %zu nodes, %.1f%% (max is %d%%)", - results.numIssues, clusterSize, pct, _config.maxBadOutPercent); + if (_numOk * 100.0 < config.minOkPercent * _numHandled) { + double pct = _numOk * 100.0 / _numHandled; + LOG(warning, "Only %zu of %zu nodes are up and OK, %.1f%% (min is %d%%)", + _numOk, _numHandled, pct, config.minOkPercent); enough = false; } - if (results.numIssues == 0) { + if (_numOk == _numHandled) { LOG(info, "All connectivity checks OK, proceeding with service startup"); } else if (enough) { LOG(info, "Enough connectivity checks OK, proceeding with service startup"); diff --git a/configd/src/apps/sentinel/connectivity.h b/configd/src/apps/sentinel/connectivity.h index 2ba17f5c07c..8d923387ffa 100644 --- a/configd/src/apps/sentinel/connectivity.h +++ b/configd/src/apps/sentinel/connectivity.h @@ -29,12 +29,15 @@ public: bool checkConnectivity(RpcServer &rpcServer); static SpecMap specsFrom(const ModelConfig &model); private: - struct Accumulated { - size_t numIssues = 0; - size_t numSeriousIssues = 0; + class Accumulator { + private: + size_t _numOk = 0; + size_t _numBad = 0; + size_t _numHandled = 0; + public: + void handleResult(CcResult value); + bool enoughOk(const SentinelConfig::Connectivity &config) const; }; - void accumulate(Accumulated &target, CcResult value); - bool enoughOk(const Accumulated &results, size_t clusterSize); SentinelConfig::Connectivity _config; SpecMap _checkSpecs; std::map<std::string, std::string> _detailsPerHost; diff --git a/configdefinitions/src/vespa/sentinel.def b/configdefinitions/src/vespa/sentinel.def index 45ef9b21cfd..cf19e701717 100644 --- a/configdefinitions/src/vespa/sentinel.def +++ b/configdefinitions/src/vespa/sentinel.def @@ -22,11 +22,11 @@ application.region string default="default" # those that can connect back to us. We delay starting services # if we have more problems than the following limits allow: -## Percentage we fail to talk to, maximum -connectivity.maxBadOutPercent int default=100 +## Percentage of nodes that must be up and fully OK, minimum +connectivity.minOkPercent int default=0 -## Absolute number of nodes that fail to talk back to us, maximum -connectivity.maxBadReverseCount int default=999999999 +## Absolute number of nodes with confirmed network connectivity problems, maximum +connectivity.maxBadCount int default=999999999 ## The command to run. This will be run by sh -c, and the following ## environment variables are defined: $ROOT, $VESPA_SERVICE_NAME, diff --git a/configserver-client/src/test/java/ai/vespa/hosted/client/HttpConfigServerClientTest.java b/configserver-client/src/test/java/ai/vespa/hosted/client/HttpConfigServerClientTest.java index 956bc90380f..90fd6203a21 100644 --- a/configserver-client/src/test/java/ai/vespa/hosted/client/HttpConfigServerClientTest.java +++ b/configserver-client/src/test/java/ai/vespa/hosted/client/HttpConfigServerClientTest.java @@ -92,6 +92,7 @@ class HttpConfigServerClientTest { assertEquals("GET http://localhost:" + server.port() + "/ failed with status 409 and body 'hi'", thrown.getMessage()); server.verify(1, getRequestedFor(urlEqualTo("/"))); server.verify(1, anyRequestedFor(anyUrl())); + server.resetRequests(); } } diff --git a/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ConnectorFactory.java b/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ConnectorFactory.java index ad682273d0d..92d2cc5d1cd 100644 --- a/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ConnectorFactory.java +++ b/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ConnectorFactory.java @@ -8,6 +8,7 @@ import com.yahoo.jdisc.http.ssl.SslContextFactoryProvider; import com.yahoo.security.tls.MixedMode; import com.yahoo.security.tls.TransportSecurityUtils; import org.eclipse.jetty.alpn.server.ALPNServerConnectionFactory; +import org.eclipse.jetty.http2.parser.RateControl; import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory; import org.eclipse.jetty.server.ConnectionFactory; import org.eclipse.jetty.server.DetectorConnectionFactory; @@ -164,6 +165,8 @@ public class ConnectorFactory { HTTP2ServerConnectionFactory factory = new HTTP2ServerConnectionFactory(newHttpConfiguration()); factory.setStreamIdleTimeout(toMillis(connectorConfig.http2().streamIdleTimeout())); factory.setMaxConcurrentStreams(connectorConfig.http2().maxConcurrentStreams()); + factory.setInitialSessionRecvWindow(1 << 24); + factory.setInitialStreamRecvWindow(1 << 20); return factory; } diff --git a/container-core/src/test/java/com/yahoo/jdisc/http/server/jetty/HttpServerTest.java b/container-core/src/test/java/com/yahoo/jdisc/http/server/jetty/HttpServerTest.java index 0dec711f4c0..0f625b5c3df 100644 --- a/container-core/src/test/java/com/yahoo/jdisc/http/server/jetty/HttpServerTest.java +++ b/container-core/src/test/java/com/yahoo/jdisc/http/server/jetty/HttpServerTest.java @@ -41,6 +41,7 @@ import org.apache.hc.client5.http.entity.mime.FormBodyPart; import org.apache.hc.client5.http.entity.mime.FormBodyPartBuilder; import org.apache.hc.client5.http.entity.mime.StringBody; import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.client5.http.impl.async.H2AsyncClientBuilder; import org.apache.hc.client5.http.impl.async.HttpAsyncClientBuilder; import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder; import org.apache.hc.client5.http.ssl.ClientTlsStrategyBuilder; @@ -969,14 +970,12 @@ public class HttpServerTest { private static CloseableHttpAsyncClient createHttp2Client(JettyTestDriver driver) { TlsStrategy tlsStrategy = ClientTlsStrategyBuilder.create() - .setSslContext(driver.sslContext()) - .build(); - var client = HttpAsyncClientBuilder.create() - .setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_2) - .disableConnectionState() - .disableAutomaticRetries() - .setConnectionManager(PoolingAsyncClientConnectionManagerBuilder.create().setTlsStrategy(tlsStrategy).build()) - .build(); + .setSslContext(driver.sslContext()) + .build(); + var client = H2AsyncClientBuilder.create() + .disableAutomaticRetries() + .setTlsStrategy(tlsStrategy) + .build(); client.start(); return client; } diff --git a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/horizon/HorizonClient.java b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/horizon/HorizonClient.java index bd6c2607fc2..c9f6f67df6c 100644 --- a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/horizon/HorizonClient.java +++ b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/horizon/HorizonClient.java @@ -1,7 +1,6 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.hosted.controller.api.integration.horizon; -import com.yahoo.slime.Slime; import java.io.InputStream; /** diff --git a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/noderepository/NodeHistory.java b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/noderepository/NodeHistory.java index 624e4c61662..cf40ac00d64 100644 --- a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/noderepository/NodeHistory.java +++ b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/noderepository/NodeHistory.java @@ -17,7 +17,7 @@ public class NodeHistory { @JsonProperty("at") public Long at; @JsonProperty("agent") - public Agent agent; + public String agent; @JsonProperty("event") public String event; @@ -25,7 +25,7 @@ public class NodeHistory { return at; } - public Agent getAgent() { + public String getAgent() { return agent; } @@ -33,24 +33,4 @@ public class NodeHistory { return event; } - public enum Agent { - operator, - application, - system, - DirtyExpirer, - DynamicProvisioningMaintainer, - FailedExpirer, - InactiveExpirer, - NodeFailer, - NodeHealthTracker, - ProvisionedExpirer, - Rebalancer, - ReservationExpirer, - RetiringUpgrader, - RebuildingOsUpgrader, - SpareCapacityMaintainer, - SwitchRebalancer, - HostEncrypter, - } - } diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/horizon/HorizonApiHandler.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/horizon/HorizonApiHandler.java index 422b8f22000..be8613f5eff 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/horizon/HorizonApiHandler.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/horizon/HorizonApiHandler.java @@ -16,7 +16,9 @@ import com.yahoo.yolean.Exceptions; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.io.UncheckedIOException; import java.util.Optional; +import java.util.function.Supplier; import java.util.logging.Level; /** @@ -57,10 +59,10 @@ public class HorizonApiHandler extends LoggingRequestHandler { private HttpResponse get(HttpRequest request) { Path path = new Path(request.getUri()); - if (path.matches("/horizon/v1/config/dashboard/topFolders")) return new JsonInputStreamResponse(client.getTopFolders()); - if (path.matches("/horizon/v1/config/dashboard/file/{id}")) return new JsonInputStreamResponse(client.getDashboard(path.get("id"))); - if (path.matches("/horizon/v1/config/dashboard/favorite")) return new JsonInputStreamResponse(client.getFavorite(request.getProperty("user"))); - if (path.matches("/horizon/v1/config/dashboard/recent")) return new JsonInputStreamResponse(client.getRecent(request.getProperty("user"))); + if (path.matches("/horizon/v1/config/dashboard/topFolders")) return jsonInputStreamResponse(client::getTopFolders); + if (path.matches("/horizon/v1/config/dashboard/file/{id}")) return jsonInputStreamResponse(() -> client.getDashboard(path.get("id"))); + if (path.matches("/horizon/v1/config/dashboard/favorite")) return jsonInputStreamResponse(() -> client.getFavorite(request.getProperty("user"))); + if (path.matches("/horizon/v1/config/dashboard/recent")) return jsonInputStreamResponse(() -> client.getRecent(request.getProperty("user"))); return ErrorResponse.notFoundError("Nothing at " + path); } @@ -73,7 +75,7 @@ public class HorizonApiHandler extends LoggingRequestHandler { private HttpResponse put(HttpRequest request) { Path path = new Path(request.getUri()); - if (path.matches("/horizon/v1/config/user")) return new JsonInputStreamResponse(client.getUser()); + if (path.matches("/horizon/v1/config/user")) return jsonInputStreamResponse(client::getUser); return ErrorResponse.notFoundError("Nothing at " + path); } @@ -81,7 +83,7 @@ public class HorizonApiHandler extends LoggingRequestHandler { SecurityContext securityContext = getAttribute(request, SecurityContext.ATTRIBUTE_NAME, SecurityContext.class); try { byte[] data = TsdbQueryRewriter.rewrite(request.getData().readAllBytes(), securityContext.roles(), systemName); - return new JsonInputStreamResponse(client.getMetrics(data)); + return jsonInputStreamResponse(() -> client.getMetrics(data)); } catch (TsdbQueryRewriter.UnauthorizedException e) { return ErrorResponse.forbidden("Access denied"); } catch (IOException e) { @@ -96,6 +98,14 @@ public class HorizonApiHandler extends LoggingRequestHandler { .orElseThrow(() -> new IllegalArgumentException("Attribute '" + attributeName + "' was not set on request")); } + private JsonInputStreamResponse jsonInputStreamResponse(Supplier<InputStream> supplier) { + try (InputStream inputStream = supplier.get()) { + return new JsonInputStreamResponse(inputStream); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + private static class JsonInputStreamResponse extends HttpResponse { private final InputStream jsonInputStream; diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/DeploymentExpirerTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/DeploymentExpirerTest.java index 232521c9609..ffc82f90ad4 100644 --- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/DeploymentExpirerTest.java +++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/DeploymentExpirerTest.java @@ -70,7 +70,7 @@ public class DeploymentExpirerTest { assertEquals(1, permanentDeployments(prodApp.instance())); // Dev application expires when enough time has passed since most recent attempt - tester.clock().advance(Duration.ofDays(12)); + tester.clock().advance(Duration.ofDays(12).plus(Duration.ofSeconds(1))); expirer.maintain(); assertEquals(0, permanentDeployments(devApp.instance())); assertEquals(1, permanentDeployments(prodApp.instance())); diff --git a/eval/CMakeLists.txt b/eval/CMakeLists.txt index 302b6768cea..16c9c72d8a5 100644 --- a/eval/CMakeLists.txt +++ b/eval/CMakeLists.txt @@ -5,6 +5,7 @@ vespa_define_module( staging_vespalib APPS + src/apps/analyze_onnx_model src/apps/eval_expr src/apps/make_tensor_binary_format_test_spec src/apps/tensor_conformance diff --git a/eval/src/apps/analyze_onnx_model/.gitignore b/eval/src/apps/analyze_onnx_model/.gitignore new file mode 100644 index 00000000000..12ce20b03ba --- /dev/null +++ b/eval/src/apps/analyze_onnx_model/.gitignore @@ -0,0 +1 @@ +/vespa-analyze-onnx-model diff --git a/eval/src/apps/analyze_onnx_model/CMakeLists.txt b/eval/src/apps/analyze_onnx_model/CMakeLists.txt new file mode 100644 index 00000000000..47cbb6504f4 --- /dev/null +++ b/eval/src/apps/analyze_onnx_model/CMakeLists.txt @@ -0,0 +1,8 @@ +# Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(vespa-analyze-onnx-model + SOURCES + analyze_onnx_model.cpp + INSTALL bin + DEPENDS + vespaeval +) diff --git a/eval/src/apps/analyze_onnx_model/analyze_onnx_model.cpp b/eval/src/apps/analyze_onnx_model/analyze_onnx_model.cpp new file mode 100644 index 00000000000..f1cc3b28751 --- /dev/null +++ b/eval/src/apps/analyze_onnx_model/analyze_onnx_model.cpp @@ -0,0 +1,59 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/eval/onnx/onnx_wrapper.h> +#include <vespa/vespalib/util/guard.h> + +using vespalib::FilePointer; +using namespace vespalib::eval; + +bool read_line(FilePointer &file, vespalib::string &line) { + char line_buffer[1024]; + char *res = fgets(line_buffer, sizeof(line_buffer), file.fp()); + if (res == nullptr) { + line.clear(); + return false; + } + line = line_buffer; + while (!line.empty() && isspace(line[line.size() - 1])) { + line.pop_back(); + } + return true; +} + +void extract(const vespalib::string &str, const vespalib::string &prefix, vespalib::string &dst) { + if (starts_with(str, prefix)) { + size_t pos = prefix.size(); + while ((str.size() > pos) && isspace(str[pos])) { + ++pos; + } + dst = str.substr(pos); + } +} + +void report_memory_usage(const vespalib::string &desc) { + vespalib::string vm_size = "unknown"; + vespalib::string vm_rss = "unknown"; + vespalib::string line; + FilePointer file(fopen("/proc/self/status", "r")); + while (read_line(file, line)) { + extract(line, "VmSize:", vm_size); + extract(line, "VmRSS:", vm_rss); + } + fprintf(stderr, "vm_size: %s, vm_rss: %s (%s)\n", vm_size.c_str(), vm_rss.c_str(), desc.c_str()); +} + +int usage(const char *self) { + fprintf(stderr, "usage: %s <onnx-model>\n", self); + fprintf(stderr, " load onnx model and report memory usage\n"); + return 1; +} + +int main(int argc, char **argv) { + if (argc != 2) { + return usage(argv[0]); + } + report_memory_usage("before loading model"); + Onnx onnx(argv[1], Onnx::Optimize::ENABLE); + report_memory_usage("after loading model"); + return 0; +} diff --git a/jdisc-security-filters/src/main/java/com/yahoo/jdisc/http/filter/security/athenz/AthenzAuthorizationFilter.java b/jdisc-security-filters/src/main/java/com/yahoo/jdisc/http/filter/security/athenz/AthenzAuthorizationFilter.java index dd4b62ee494..6136bcdfd3a 100644 --- a/jdisc-security-filters/src/main/java/com/yahoo/jdisc/http/filter/security/athenz/AthenzAuthorizationFilter.java +++ b/jdisc-security-filters/src/main/java/com/yahoo/jdisc/http/filter/security/athenz/AthenzAuthorizationFilter.java @@ -3,12 +3,14 @@ package com.yahoo.jdisc.http.filter.security.athenz; import com.google.inject.Inject; import com.yahoo.jdisc.Metric; +import com.yahoo.jdisc.http.HttpRequest; import com.yahoo.jdisc.http.filter.DiscFilterRequest; import com.yahoo.jdisc.http.filter.security.athenz.RequestResourceMapper.ResourceNameAndAction; import com.yahoo.jdisc.http.filter.security.base.JsonSecurityRequestFilterBase; import com.yahoo.vespa.athenz.api.AthenzAccessToken; import com.yahoo.vespa.athenz.api.AthenzIdentity; import com.yahoo.vespa.athenz.api.AthenzPrincipal; +import com.yahoo.vespa.athenz.api.AthenzRole; import com.yahoo.vespa.athenz.api.ZToken; import com.yahoo.vespa.athenz.tls.AthenzX509CertificateUtils; import com.yahoo.vespa.athenz.utils.AthenzIdentities; @@ -20,6 +22,7 @@ import java.security.cert.X509Certificate; import java.util.EnumSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.logging.Level; @@ -56,16 +59,20 @@ public class AthenzAuthorizationFilter extends JsonSecurityRequestFilterBase { private final RequestResourceMapper requestResourceMapper; private final Metric metric; private final Set<AthenzIdentity> allowedProxyIdentities; + private final Optional<AthenzRole> readRole; + private final Optional<AthenzRole> writeRole; @Inject public AthenzAuthorizationFilter(AthenzAuthorizationFilterConfig config, RequestResourceMapper resourceMapper, Metric metric) { - this(config, resourceMapper, new DefaultZpe(), metric); + this(config, resourceMapper, new DefaultZpe(), metric, null, null); } public AthenzAuthorizationFilter(AthenzAuthorizationFilterConfig config, RequestResourceMapper resourceMapper, Zpe zpe, - Metric metric) { + Metric metric, + AthenzRole readRole, + AthenzRole writeRole) { this.roleTokenHeaderName = config.roleTokenHeaderName(); List<EnabledCredentials.Enum> enabledCredentials = config.enabledCredentials(); this.enabledCredentials = enabledCredentials.isEmpty() @@ -77,6 +84,8 @@ public class AthenzAuthorizationFilter extends JsonSecurityRequestFilterBase { this.allowedProxyIdentities = config.allowedProxyIdentities().stream() .map(AthenzIdentities::from) .collect(Collectors.toSet()); + this.readRole = Optional.ofNullable(readRole); + this.writeRole = Optional.ofNullable(writeRole); } @Override @@ -86,7 +95,7 @@ public class AthenzAuthorizationFilter extends JsonSecurityRequestFilterBase { requestResourceMapper.getResourceNameAndAction(request); log.log(Level.FINE, () -> String.format("Resource mapping for '%s': %s", request, resourceMapping)); if (resourceMapping.isEmpty()) { - incrementAcceptedMetrics(request, false); + incrementAcceptedMetrics(request, false, Optional.empty()); return Optional.empty(); } Result result = checkAccessAllowed(request, resourceMapping.get()); @@ -94,15 +103,15 @@ public class AthenzAuthorizationFilter extends JsonSecurityRequestFilterBase { setAttribute(request, RESULT_ATTRIBUTE, resultType.name()); if (resultType == AuthorizationResult.Type.ALLOW) { populateRequestWithResult(request, result); - incrementAcceptedMetrics(request, true); + incrementAcceptedMetrics(request, true, Optional.of(result)); return Optional.empty(); } log.log(Level.FINE, () -> String.format("Forbidden (403) for '%s': %s", request, resultType.name())); - incrementRejectedMetrics(request, FORBIDDEN, resultType.name()); + incrementRejectedMetrics(request, FORBIDDEN, resultType.name(), Optional.of(result)); return Optional.of(new ErrorResponse(FORBIDDEN, "Access forbidden: " + resultType.getDescription())); } catch (IllegalArgumentException e) { log.log(Level.FINE, () -> String.format("Unauthorized (401) for '%s': %s", request, e.getMessage())); - incrementRejectedMetrics(request, UNAUTHORIZED, "Unauthorized"); + incrementRejectedMetrics(request, UNAUTHORIZED, "Unauthorized", Optional.empty()); return Optional.of(new ErrorResponse(UNAUTHORIZED, e.getMessage())); } } @@ -130,33 +139,53 @@ public class AthenzAuthorizationFilter extends JsonSecurityRequestFilterBase { X509Certificate identityCertificate = getClientCertificate(request).get(); AthenzIdentity peerIdentity = AthenzIdentities.from(identityCertificate); if (allowedProxyIdentities.contains(peerIdentity)) { - return checkAccessWithProxiedAccessToken(resourceAndAction, accessToken, identityCertificate); + return checkAccessWithProxiedAccessToken(request, resourceAndAction, accessToken, identityCertificate); } else { var zpeResult = zpe.checkAccessAllowed( accessToken, identityCertificate, resourceAndAction.resourceName(), resourceAndAction.action()); - return new Result(ACCESS_TOKEN, peerIdentity, zpeResult); + return getResult(ACCESS_TOKEN, peerIdentity, zpeResult, request, resourceAndAction, mapToRequestPrivileges(accessToken.roles())); } } - private Result checkAccessWithProxiedAccessToken(ResourceNameAndAction resourceAndAction, AthenzAccessToken accessToken, X509Certificate identityCertificate) { + private Result getResult(EnabledCredentials.Enum credentialType, AthenzIdentity identity, AuthorizationResult zpeResult, DiscFilterRequest request, ResourceNameAndAction resourceAndAction, List<String> privileges) { + String currentAction = resourceAndAction.action(); + String futureAction = resourceAndAction.futureAction(); + return new Result(credentialType, identity, zpeResult, privileges, currentAction, futureAction); + } + + private List<String> mapToRequestPrivileges(List<AthenzRole> roles) { + return roles.stream() + .map(this::rolePrivilege) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + } + + private String rolePrivilege(AthenzRole role) { + if (readRole.stream().anyMatch(role::equals)) return "read"; + if (writeRole.stream().anyMatch(role::equals)) return "write"; + return null; + } + + private Result checkAccessWithProxiedAccessToken(DiscFilterRequest request, ResourceNameAndAction resourceAndAction, AthenzAccessToken accessToken, X509Certificate identityCertificate) { AthenzIdentity proxyIdentity = AthenzIdentities.from(identityCertificate); log.log(Level.FINE, () -> String.format("Checking proxied access token. Proxy identity: '%s'. Allowed identities: %s", proxyIdentity, allowedProxyIdentities)); var zpeResult = zpe.checkAccessAllowed(accessToken, resourceAndAction.resourceName(), resourceAndAction.action()); - return new Result(ACCESS_TOKEN, AthenzIdentities.from(identityCertificate), zpeResult); + return getResult(ACCESS_TOKEN, AthenzIdentities.from(identityCertificate), zpeResult, request, resourceAndAction, mapToRequestPrivileges(accessToken.roles())); } private Result checkAccessWithRoleCertificate(DiscFilterRequest request, ResourceNameAndAction resourceAndAction) { X509Certificate roleCertificate = getClientCertificate(request).get(); var zpeResult = zpe.checkAccessAllowed(roleCertificate, resourceAndAction.resourceName(), resourceAndAction.action()); AthenzIdentity identity = AthenzX509CertificateUtils.getIdentityFromRoleCertificate(roleCertificate); - return new Result(ROLE_CERTIFICATE, identity, zpeResult); + AthenzX509CertificateUtils.getRolesFromRoleCertificate(roleCertificate).roleName(); + return getResult(ROLE_CERTIFICATE, identity, zpeResult, request, resourceAndAction, mapToRequestPrivileges(List.of(AthenzX509CertificateUtils.getRolesFromRoleCertificate(roleCertificate)))); } private Result checkAccessWithRoleToken(DiscFilterRequest request, ResourceNameAndAction resourceAndAction) { ZToken roleToken = getRoleToken(request); var zpeResult = zpe.checkAccessAllowed(roleToken, resourceAndAction.resourceName(), resourceAndAction.action()); - return new Result(ROLE_TOKEN, roleToken.getIdentity(), zpeResult); + return getResult(ROLE_TOKEN, roleToken.getIdentity(), zpeResult, request, resourceAndAction, mapToRequestPrivileges(roleToken.getRoles())); } private static boolean isAccessTokenPresent(DiscFilterRequest request) { @@ -246,20 +275,30 @@ public class AthenzAuthorizationFilter extends JsonSecurityRequestFilterBase { request.setAttribute(name, value); } - private void incrementAcceptedMetrics(DiscFilterRequest request, boolean authzRequired) { + private void incrementAcceptedMetrics(DiscFilterRequest request, boolean authzRequired, Optional<Result> result) { String hostHeader = request.getHeader("Host"); Metric.Context context = metric.createContext(Map.of( "endpoint", hostHeader != null ? hostHeader : "", - "authz-required", Boolean.toString(authzRequired))); + "authz-required", Boolean.toString(authzRequired), + "httpMethod", HttpRequest.Method.valueOf(request.getMethod()).name(), + "requestPrivileges", result.map(r -> String.join(",", r.requestPrivileges)).orElse(""), + "currentRequestMapping", result.map(r -> r.currentAction).orElse(""), + "futureRequestMapping", result.map(r -> r.futureAction).orElse("") + )); metric.add(ACCEPTED_METRIC_NAME, 1L, context); } - private void incrementRejectedMetrics(DiscFilterRequest request, int statusCode, String zpeCode) { + private void incrementRejectedMetrics(DiscFilterRequest request, int statusCode, String zpeCode, Optional<Result> result) { String hostHeader = request.getHeader("Host"); Metric.Context context = metric.createContext(Map.of( "endpoint", hostHeader != null ? hostHeader : "", "status-code", Integer.toString(statusCode), - "zpe-status", zpeCode)); + "zpe-status", zpeCode, + "httpMethod", HttpRequest.Method.valueOf(request.getMethod()), + "requestPrivileges", result.map(r -> String.join(",", r.requestPrivileges)).orElse(""), + "currentRequestMapping", result.map(r -> r.currentAction).orElse(""), + "futureRequestMapping", result.map(r -> r.futureAction).orElse("") + )); metric.add(REJECTED_METRIC_NAME, 1L, context); } @@ -267,11 +306,17 @@ public class AthenzAuthorizationFilter extends JsonSecurityRequestFilterBase { final EnabledCredentials.Enum credentialType; final AthenzIdentity identity; final AuthorizationResult zpeResult; + final List<String> requestPrivileges; + final String currentAction; + final String futureAction; - Result(EnabledCredentials.Enum credentialType, AthenzIdentity identity, AuthorizationResult zpeResult) { + public Result(EnabledCredentials.Enum credentialType, AthenzIdentity identity, AuthorizationResult zpeResult, List<String> requestPrivileges, String currentAction, String futureAction) { this.credentialType = credentialType; this.identity = identity; this.zpeResult = zpeResult; + this.requestPrivileges = requestPrivileges; + this.currentAction = currentAction; + this.futureAction = futureAction; } } } diff --git a/jdisc-security-filters/src/main/java/com/yahoo/jdisc/http/filter/security/athenz/RequestResourceMapper.java b/jdisc-security-filters/src/main/java/com/yahoo/jdisc/http/filter/security/athenz/RequestResourceMapper.java index 56c52bd71c4..c962e973959 100644 --- a/jdisc-security-filters/src/main/java/com/yahoo/jdisc/http/filter/security/athenz/RequestResourceMapper.java +++ b/jdisc-security-filters/src/main/java/com/yahoo/jdisc/http/filter/security/athenz/RequestResourceMapper.java @@ -28,10 +28,15 @@ public interface RequestResourceMapper { class ResourceNameAndAction { private final AthenzResourceName resourceName; private final String action; + private final String futureAction; public ResourceNameAndAction(AthenzResourceName resourceName, String action) { + this(resourceName, action, action); + } + public ResourceNameAndAction(AthenzResourceName resourceName, String action, String futureAction) { this.resourceName = resourceName; this.action = action; + this.futureAction = futureAction; } public AthenzResourceName resourceName() { @@ -42,6 +47,14 @@ public interface RequestResourceMapper { return action; } + public ResourceNameAndAction withFutureAction(String futureAction) { + return new ResourceNameAndAction(resourceName, action, futureAction); + } + + public String futureAction() { + return futureAction; + } + @Override public String toString() { return "ResourceNameAndAction{" + diff --git a/jdisc-security-filters/src/test/java/com/yahoo/jdisc/http/filter/security/athenz/AthenzAuthorizationFilterTest.java b/jdisc-security-filters/src/test/java/com/yahoo/jdisc/http/filter/security/athenz/AthenzAuthorizationFilterTest.java index bfe02d1f279..137e4653670 100644 --- a/jdisc-security-filters/src/test/java/com/yahoo/jdisc/http/filter/security/athenz/AthenzAuthorizationFilterTest.java +++ b/jdisc-security-filters/src/test/java/com/yahoo/jdisc/http/filter/security/athenz/AthenzAuthorizationFilterTest.java @@ -296,7 +296,9 @@ public class AthenzAuthorizationFilterTest { .allowedProxyIdentities(allowedProxyIdentities)), new StaticRequestResourceMapper(RESOURCE_NAME, ACTION), zpe, - metric); + metric, + new AthenzRole("domain","reader"), + new AthenzRole("domain", "writer")); } private static void assertAuthorizationResult(DiscFilterRequest request, Type expectedResult) { diff --git a/metrics/src/tests/summetrictest.cpp b/metrics/src/tests/summetrictest.cpp index e3d58659daf..d0380a630f1 100644 --- a/metrics/src/tests/summetrictest.cpp +++ b/metrics/src/tests/summetrictest.cpp @@ -125,4 +125,45 @@ TEST(SumMetricTest, test_start_value) EXPECT_EQ(int64_t(60), sum.getLongValue("value")); } +namespace { + +struct MetricSetWithSum : public MetricSet +{ + LongValueMetric _v1; + LongValueMetric _v2; + SumMetric<LongValueMetric> _sum; + MetricSetWithSum(); + ~MetricSetWithSum() override; +}; + +MetricSetWithSum::MetricSetWithSum() + : MetricSet("MetricSetWithSum", {}, ""), + _v1("v1", {}, "", this), + _v2("v2", {}, "", this), + _sum("sum", {}, "", this) +{ + _sum.addMetricToSum(_v1); + _sum.addMetricToSum(_v2); +} + +MetricSetWithSum::~MetricSetWithSum() = default; + +} + +TEST(SumMetricTest, test_nested_sum) +{ + MetricSetWithSum w1; + MetricSetWithSum w2; + MetricSetWithSum sum; + w1._v1.addValue(10); + w1._v2.addValue(13); + w2._v1.addValue(27); + w2._v2.addValue(29); + w1.addToPart(sum); + w2.addToPart(sum); + EXPECT_EQ(int64_t(37), sum._v1.getLongValue("value")); + EXPECT_EQ(int64_t(42), sum._v2.getLongValue("value")); + EXPECT_EQ(int64_t(79), sum._sum.getLongValue("value")); +} + } diff --git a/metrics/src/vespa/metrics/metric.cpp b/metrics/src/vespa/metrics/metric.cpp index a8d8194b26d..50fc36c62cb 100644 --- a/metrics/src/vespa/metrics/metric.cpp +++ b/metrics/src/vespa/metrics/metric.cpp @@ -232,4 +232,11 @@ Metric::assignValues(const Metric& m) { assert(ownerList.empty()); return this; } + +bool +Metric::is_sum_metric() const +{ + return false; +} + } // metrics diff --git a/metrics/src/vespa/metrics/metric.h b/metrics/src/vespa/metrics/metric.h index 10b74a2da22..c8fb3031278 100644 --- a/metrics/src/vespa/metrics/metric.h +++ b/metrics/src/vespa/metrics/metric.h @@ -247,6 +247,8 @@ public: virtual bool isMetricSet() const { return false; } + virtual bool is_sum_metric() const; + private: /** diff --git a/metrics/src/vespa/metrics/summetric.h b/metrics/src/vespa/metrics/summetric.h index f04c1696638..7b60c968e5b 100644 --- a/metrics/src/vespa/metrics/summetric.h +++ b/metrics/src/vespa/metrics/summetric.h @@ -69,6 +69,7 @@ public: void printDebug(std::ostream&, const std::string& indent="") const override; void addToPart(Metric&) const override; void addToSnapshot(Metric&, std::vector<Metric::UP> &) const override; + bool is_sum_metric() const override; private: friend struct MetricManagerTest; diff --git a/metrics/src/vespa/metrics/summetric.hpp b/metrics/src/vespa/metrics/summetric.hpp index 9520456a974..e067b9643c2 100644 --- a/metrics/src/vespa/metrics/summetric.hpp +++ b/metrics/src/vespa/metrics/summetric.hpp @@ -142,8 +142,17 @@ template<typename AddendMetric> void SumMetric<AddendMetric>::addToPart(Metric& m) const { - std::pair<std::vector<Metric::UP>, Metric::UP> sum(generateSum()); - sum.second->addToPart(m); + if (!m.is_sum_metric()) { + std::pair<std::vector<Metric::UP>, Metric::UP> sum(generateSum()); + sum.second->addToPart(m); + } +} + +template<typename AddendMetric> +bool +SumMetric<AddendMetric>::is_sum_metric() const +{ + return true; } template<typename AddendMetric> diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeadmin/NodeAdminImpl.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeadmin/NodeAdminImpl.java index ef9520969af..5d7ab48753f 100644 --- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeadmin/NodeAdminImpl.java +++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeadmin/NodeAdminImpl.java @@ -15,11 +15,13 @@ import java.time.Clock; import java.time.Duration; import java.time.Instant; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.Stream; /** * Administers a host (for now only docker hosts) and its nodes (docker containers nodes). @@ -130,7 +132,7 @@ public class NodeAdminImpl implements NodeAdmin { } // Use filter with count instead of allMatch() because allMatch() will short circuit on first non-match - boolean allNodeAgentsConverged = nodeAgentWithSchedulerByHostname.values().parallelStream() + boolean allNodeAgentsConverged = parallelStreamOfNodeAgentWithScheduler() .filter(nodeAgentScheduler -> !nodeAgentScheduler.setFrozen(wantFrozen, freezeTimeout)) .count() == 0; @@ -158,9 +160,7 @@ public class NodeAdminImpl implements NodeAdmin { @Override public void stopNodeAgentServices() { // Each container may spend 1-1:30 minutes stopping - nodeAgentWithSchedulerByHostname.values() - .parallelStream() - .forEach(NodeAgentWithScheduler::stopForHostSuspension); + parallelStreamOfNodeAgentWithScheduler().forEach(NodeAgentWithScheduler::stopForHostSuspension); } @Override @@ -171,7 +171,18 @@ public class NodeAdminImpl implements NodeAdmin { @Override public void stop() { // Stop all node-agents in parallel, will block until the last NodeAgent is stopped - nodeAgentWithSchedulerByHostname.values().parallelStream().forEach(NodeAgentWithScheduler::stopForRemoval); + parallelStreamOfNodeAgentWithScheduler().forEach(NodeAgentWithScheduler::stopForRemoval); + } + + /** + * Returns a parallel stream of NodeAgentWithScheduler. + * + * <p>Why not just call nodeAgentWithSchedulerByHostname.values().parallelStream()? Experiments + * with Java 11 have shown that with 10 nodes and forEach(), there are a maximum of 3 concurrent + * threads. With HashMap it produces 5. With List it produces 10 concurrent threads.</p> + */ + private Stream<NodeAgentWithScheduler> parallelStreamOfNodeAgentWithScheduler() { + return List.copyOf(nodeAgentWithSchedulerByHostname.values()).parallelStream(); } // Set-difference. Returns minuend minus subtrahend. diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImpl.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImpl.java index df3f075e8d9..05c765c9d78 100644 --- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImpl.java +++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImpl.java @@ -1,4 +1,4 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.hosted.node.admin.nodeagent; import com.yahoo.config.provision.ApplicationId; @@ -357,7 +357,7 @@ public class NodeAgentImpl implements NodeAgent { } try { - if (context.node().state() != NodeState.dirty) { + if (context.node().state() == NodeState.active) { suspend(context); } stopServices(context); diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/OperatorChangeApplicationMaintainer.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/ExpeditedChangeApplicationMaintainer.java index 47337518a65..39183688340 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/OperatorChangeApplicationMaintainer.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/ExpeditedChangeApplicationMaintainer.java @@ -20,19 +20,22 @@ import java.util.Set; import java.util.stream.Collectors; /** - * The application maintainer detects manual operator changes to nodes and redeploys affected applications. - * The purpose of this is to redeploy affected applications faster than achieved by the regular application - * maintenance to reduce the time period where the node repository and the application model is out of sync. + * This maintainer detects changes to nodes that must be expedited, and redeploys affected applications. + * + * The purpose of this is to redeploy affected applications faster than achieved by + * {@link PeriodicApplicationMaintainer}, to reduce the time period where the node repository and the application model + * is out of sync. * * Why can't the manual change directly make the application redeployment? - * Because the redeployment must run at the right config server, while the node state change may be running - * at any config server. + * + * Because we want to queue redeployments to avoid overloading config servers. * * @author bratseth + * @author mpolden */ -public class OperatorChangeApplicationMaintainer extends ApplicationMaintainer { +public class ExpeditedChangeApplicationMaintainer extends ApplicationMaintainer { - OperatorChangeApplicationMaintainer(Deployer deployer, Metric metric, NodeRepository nodeRepository, Duration interval) { + ExpeditedChangeApplicationMaintainer(Deployer deployer, Metric metric, NodeRepository nodeRepository, Duration interval) { super(deployer, metric, nodeRepository, interval); } @@ -57,7 +60,7 @@ public class OperatorChangeApplicationMaintainer extends ApplicationMaintainer { boolean deployed = deployWithLock(application); if (deployed) log.info("Redeployed application " + application.toShortString() + - " as a manual change was made to its nodes"); + " as an expedited change was made to its nodes"); } private boolean hasNodesWithChanges(ApplicationId applicationId, NodeList nodes) { @@ -66,7 +69,7 @@ public class OperatorChangeApplicationMaintainer extends ApplicationMaintainer { return nodes.stream() .flatMap(node -> node.history().events().stream()) - .filter(event -> event.agent() == Agent.operator) + .filter(event -> expediteChangeBy(event.agent())) .map(History.Event::at) .anyMatch(e -> lastDeployTime.get().isBefore(e)); } @@ -84,5 +87,14 @@ public class OperatorChangeApplicationMaintainer extends ApplicationMaintainer { .groupingBy(node -> node.allocation().get().owner()); } + /** Returns whether to expedite changes performed by agent */ + private boolean expediteChangeBy(Agent agent) { + switch (agent) { + case operator: + case RebuildingOsUpgrader: + case HostEncrypter: return true; + } + return false; + } } diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/MetricsReporter.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/MetricsReporter.java index 4537d99d107..9f84322fe0f 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/MetricsReporter.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/MetricsReporter.java @@ -200,8 +200,10 @@ public class MetricsReporter extends NodeRepositoryMaintainer { metric.set("wantToDeprovision", node.status().wantToDeprovision() ? 1 : 0, context); metric.set("failReport", NodeFailer.reasonsToFailParentHost(node).isEmpty() ? 0 : 1, context); - metric.set("wantToEncrypt", node.reports().getReport("wantToEncrypt").isPresent() ? 1 : 0, context); - metric.set("diskEncrypted", node.reports().getReport("diskEncrypted").isPresent() ? 1 : 0, context); + if (node.type().isHost()) { + metric.set("wantToEncrypt", node.reports().getReport("wantToEncrypt").isPresent() ? 1 : 0, context); + metric.set("diskEncrypted", node.reports().getReport("diskEncrypted").isPresent() ? 1 : 0, context); + } HostName hostname = new HostName(node.hostname()); diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/NodeRepositoryMaintenance.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/NodeRepositoryMaintenance.java index 44ee8b5a8b3..cdb5202603a 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/NodeRepositoryMaintenance.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/NodeRepositoryMaintenance.java @@ -48,7 +48,7 @@ public class NodeRepositoryMaintenance extends AbstractComponent { maintainers.add(new NodeFailer(deployer, nodeRepository, defaults.failGrace, defaults.nodeFailerInterval, orchestrator, defaults.throttlePolicy, metric)); maintainers.add(new NodeHealthTracker(hostLivenessTracker, serviceMonitor, nodeRepository, defaults.nodeFailureStatusUpdateInterval, metric)); - maintainers.add(new OperatorChangeApplicationMaintainer(deployer, metric, nodeRepository, defaults.operatorChangeRedeployInterval)); + maintainers.add(new ExpeditedChangeApplicationMaintainer(deployer, metric, nodeRepository, defaults.expeditedChangeRedeployInterval)); maintainers.add(new ReservationExpirer(nodeRepository, defaults.reservationExpiry, metric)); maintainers.add(new RetiredExpirer(nodeRepository, orchestrator, deployer, metric, defaults.retiredInterval, defaults.retiredExpiry)); maintainers.add(new InactiveExpirer(nodeRepository, defaults.inactiveExpiry, Map.of(NodeType.config, defaults.inactiveConfigServerExpiry, @@ -67,6 +67,7 @@ public class NodeRepositoryMaintenance extends AbstractComponent { maintainers.add(new ScalingSuggestionsMaintainer(nodeRepository, defaults.scalingSuggestionsInterval, metric)); maintainers.add(new SwitchRebalancer(nodeRepository, defaults.switchRebalancerInterval, metric, deployer)); maintainers.add(new HostEncrypter(nodeRepository, defaults.hostEncrypterInterval, metric)); + maintainers.add(new ParkedExpirer(nodeRepository, defaults.parkedExpirerInterval, metric)); provisionServiceProvider.getLoadBalancerService(nodeRepository) .map(lbService -> new LoadBalancerExpirer(nodeRepository, defaults.loadBalancerExpirerInterval, lbService, metric)) @@ -91,7 +92,7 @@ public class NodeRepositoryMaintenance extends AbstractComponent { /** Time between each run of maintainer that does periodic redeployment */ private final Duration redeployMaintainerInterval; /** Applications are redeployed after manual operator changes within this time period */ - private final Duration operatorChangeRedeployInterval; + private final Duration expeditedChangeRedeployInterval; /** The time a node must be continuously unresponsive before it is failed */ private final Duration failGrace; @@ -119,6 +120,7 @@ public class NodeRepositoryMaintenance extends AbstractComponent { private final Duration scalingSuggestionsInterval; private final Duration switchRebalancerInterval; private final Duration hostEncrypterInterval; + private final Duration parkedExpirerInterval; private final NodeFailer.ThrottlePolicy throttlePolicy; @@ -133,7 +135,7 @@ public class NodeRepositoryMaintenance extends AbstractComponent { nodeFailerInterval = Duration.ofMinutes(15); nodeFailureStatusUpdateInterval = Duration.ofMinutes(2); nodeMetricsCollectionInterval = Duration.ofMinutes(1); - operatorChangeRedeployInterval = Duration.ofMinutes(3); + expeditedChangeRedeployInterval = Duration.ofMinutes(3); // Vespa upgrade frequency is higher in CD so (de)activate OS upgrades more frequently as well osUpgradeActivatorInterval = zone.system().isCd() ? Duration.ofSeconds(30) : Duration.ofMinutes(5); periodicRedeployInterval = Duration.ofMinutes(60); @@ -149,6 +151,7 @@ public class NodeRepositoryMaintenance extends AbstractComponent { throttlePolicy = NodeFailer.ThrottlePolicy.hosted; inactiveConfigServerExpiry = Duration.ofMinutes(5); inactiveControllerExpiry = Duration.ofMinutes(5); + parkedExpirerInterval = Duration.ofMinutes(30); if (zone.environment().isProduction() && ! zone.system().isCd()) { inactiveExpiry = Duration.ofHours(4); // enough time for the application owner to discover and redeploy diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/ParkedExpirer.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/ParkedExpirer.java new file mode 100644 index 00000000000..c5259ee84c4 --- /dev/null +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/ParkedExpirer.java @@ -0,0 +1,63 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.hosted.provision.maintenance; + +import com.yahoo.config.provision.NodeType; +import com.yahoo.jdisc.Metric; +import com.yahoo.vespa.hosted.provision.Node; +import com.yahoo.vespa.hosted.provision.NodeRepository; +import com.yahoo.vespa.hosted.provision.node.Agent; +import com.yahoo.vespa.hosted.provision.node.History; + +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.logging.Logger; + +/** + * + * Expires parked nodes in dynamically provisioned zones. + * If number of parked hosts exceed MAX_ALLOWED_PARKED_HOSTS, recycle in a queue order + * + * @author olaa + */ +public class ParkedExpirer extends NodeRepositoryMaintainer { + + private static final int MAX_ALLOWED_PARKED_HOSTS = 20; + private static final Logger log = Logger.getLogger(ParkedExpirer.class.getName()); + + + private final NodeRepository nodeRepository; + + ParkedExpirer(NodeRepository nodeRepository, Duration interval, Metric metric) { + super(nodeRepository, interval, metric); + this.nodeRepository = nodeRepository; + } + + @Override + protected double maintain() { + if (!nodeRepository.zone().getCloud().dynamicProvisioning()) + return 1.0; + + var parkedHosts = new ArrayList<>(nodeRepository.nodes().list(Node.State.parked) + .nodeType(NodeType.host) + .asList()); + + int hostsToExpire = Math.max(0, parkedHosts.size() - MAX_ALLOWED_PARKED_HOSTS); + nodeRepository.nodes().list(Node.State.parked).nodeType(NodeType.host) + .sortedBy(Comparator.comparing(this::getParkedTime)) + .first(hostsToExpire) + .forEach(host -> { + log.info("Allowed number of parked nodes exceeded. Recycling " + host.hostname()); + nodeRepository.nodes().deallocate(host, Agent.ParkedExpirer, "Expired by ParkedExpirer"); + }); + + return 1.0; + } + + private Instant getParkedTime(Node node) { + return node.history().event(History.Event.Type.parked) + .map(History.Event::at) + .orElse(Instant.EPOCH); // Should not happen + } +} diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/node/Agent.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/node/Agent.java index d1c3f00ddca..ed82470fa42 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/node/Agent.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/node/Agent.java @@ -21,6 +21,7 @@ public enum Agent { InactiveExpirer, ProvisionedExpirer, ReservationExpirer, + ParkedExpirer, DynamicProvisioningMaintainer, RetiringUpgrader, RebuildingOsUpgrader, diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/persistence/NodeSerializer.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/persistence/NodeSerializer.java index d83f21e5fec..dff4a66bd42 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/persistence/NodeSerializer.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/persistence/NodeSerializer.java @@ -482,6 +482,7 @@ public class NodeSerializer { case "SpareCapacityMaintainer": return Agent.SpareCapacityMaintainer; case "SwitchRebalancer": return Agent.SwitchRebalancer; case "HostEncrypter": return Agent.HostEncrypter; + case "ParkedExpirer": return Agent.ParkedExpirer; } throw new IllegalArgumentException("Unknown node event agent '" + eventAgentField.asString() + "'"); } @@ -504,6 +505,7 @@ public class NodeSerializer { case SpareCapacityMaintainer: return "SpareCapacityMaintainer"; case SwitchRebalancer: return "SwitchRebalancer"; case HostEncrypter: return "HostEncrypter"; + case ParkedExpirer: return "ParkedExpirer"; } throw new IllegalArgumentException("Serialized form of '" + agent + "' not defined"); } diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/OperatorChangeApplicationMaintainerTest.java b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/ExpeditedChangeApplicationMaintainerTest.java index db6aebacddc..62d09c99f16 100644 --- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/OperatorChangeApplicationMaintainerTest.java +++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/ExpeditedChangeApplicationMaintainerTest.java @@ -26,7 +26,7 @@ import static org.junit.Assert.assertEquals; /** * @author bratseth */ -public class OperatorChangeApplicationMaintainerTest { +public class ExpeditedChangeApplicationMaintainerTest { @Test public void test_application_maintenance() { @@ -42,10 +42,10 @@ public class OperatorChangeApplicationMaintainerTest { // Create applications fixture.activate(); assertEquals("Initial applications are deployed", 3, fixture.deployer.redeployments); - OperatorChangeApplicationMaintainer maintainer = new OperatorChangeApplicationMaintainer(fixture.deployer, - new TestMetric(), - nodeRepository, - Duration.ofMinutes(1)); + ExpeditedChangeApplicationMaintainer maintainer = new ExpeditedChangeApplicationMaintainer(fixture.deployer, + new TestMetric(), + nodeRepository, + Duration.ofMinutes(1)); clock.advance(Duration.ofMinutes(2)); maintainer.maintain(); diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/MetricsReporterTest.java b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/MetricsReporterTest.java index 4251d1276f8..7fa6810c1ba 100644 --- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/MetricsReporterTest.java +++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/MetricsReporterTest.java @@ -43,6 +43,7 @@ import java.util.TreeMap; import java.util.stream.Collectors; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; @@ -113,8 +114,6 @@ public class MetricsReporterTest { expectedMetrics.put("wantToDeprovision", 0); expectedMetrics.put("failReport", 0); - expectedMetrics.put("wantToEncrypt", 0); - expectedMetrics.put("diskEncrypted", 0); expectedMetrics.put("allowedToBeDown", 1); expectedMetrics.put("suspended", 1); @@ -150,6 +149,27 @@ public class MetricsReporterTest { assertEquals(expectedMetrics, new TreeMap<>(metric.values)); } + @Test + public void test_registered_metrics_for_host() { + NodeFlavors nodeFlavors = FlavorConfigBuilder.createDummies("default"); + Orchestrator orchestrator = mock(Orchestrator.class); + when(orchestrator.getHostInfo(eq(reference), any())).thenReturn( + HostInfo.createSuspended(HostStatus.ALLOWED_TO_BE_DOWN, Instant.ofEpochSecond(1))); + ProvisioningTester tester = new ProvisioningTester.Builder().flavors(nodeFlavors.getFlavors()).orchestrator(orchestrator).build(); + tester.makeProvisionedNodes(1, "default", NodeType.host, 0); + + tester.clock().setInstant(Instant.ofEpochSecond(124)); + + TestMetric metric = new TestMetric(); + MetricsReporter metricsReporter = metricsReporter(metric, tester); + metricsReporter.maintain(); + + // Only verify metrics that are set for hosts + TreeMap<String, Number> metrics = new TreeMap<>(metric.values); + assertTrue(metrics.containsKey("wantToEncrypt")); + assertTrue(metrics.containsKey("diskEncrypted")); + } + private void verifyAndRemoveIntegerMetricSum(TestMetric metric, String key, int expected) { assertEquals(expected, (int) metric.sumNumberValues(key)); metric.remove(key); diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/ParkedExpirerTest.java b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/ParkedExpirerTest.java new file mode 100644 index 00000000000..7fb22f33453 --- /dev/null +++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/ParkedExpirerTest.java @@ -0,0 +1,70 @@ +package com.yahoo.vespa.hosted.provision.maintenance; + +import com.yahoo.config.provision.Cloud; +import com.yahoo.config.provision.Environment; +import com.yahoo.config.provision.Flavor; +import com.yahoo.config.provision.NodeResources; +import com.yahoo.config.provision.NodeType; +import com.yahoo.config.provision.RegionName; +import com.yahoo.config.provision.SystemName; +import com.yahoo.config.provision.Zone; +import com.yahoo.vespa.hosted.provision.Node; +import com.yahoo.vespa.hosted.provision.node.Agent; +import com.yahoo.vespa.hosted.provision.provisioning.ProvisioningTester; +import com.yahoo.vespa.hosted.provision.testutils.MockHostProvisioner; +import org.junit.Test; + +import java.time.Duration; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.junit.Assert.*; + +/** + * @author olaa + */ +public class ParkedExpirerTest { + + private ProvisioningTester tester; + + @Test + public void noop_if_not_dynamic_provisioning() { + tester = getTester(false); + populateNodeRepo(); + + var expirer = new ParkedExpirer(tester.nodeRepository(), Duration.ofMinutes(4), new TestMetric()); + expirer.maintain(); + + assertEquals(0, tester.nodeRepository().nodes().list(Node.State.dirty).size()); + assertEquals(25, tester.nodeRepository().nodes().list(Node.State.parked).size()); + } + + @Test + public void recycles_correct_subset_of_parked_hosts() { + tester = getTester(true); + populateNodeRepo(); + + var expirer = new ParkedExpirer(tester.nodeRepository(), Duration.ofMinutes(4), new TestMetric()); + expirer.maintain(); + + assertEquals(5, tester.nodeRepository().nodes().list(Node.State.dirty).size()); + assertEquals(20, tester.nodeRepository().nodes().list(Node.State.parked).size()); + + } + + private ProvisioningTester getTester(boolean dynamicProvisioning) { + var zone = new Zone(Cloud.builder().dynamicProvisioning(dynamicProvisioning).build(), SystemName.main, Environment.prod, RegionName.from("us-east")); + return new ProvisioningTester.Builder().zone(zone) + .hostProvisioner(dynamicProvisioning ? new MockHostProvisioner(List.of()) : null) + .build(); + } + + private void populateNodeRepo() { + var nodes = IntStream.range(0, 25) + .mapToObj(i -> Node.create("id-" + i, "host-" + i, new Flavor(NodeResources.unspecified()), Node.State.parked, NodeType.host).build()) + .collect(Collectors.toList()); + tester.nodeRepository().database().addNodesInState(nodes, Node.State.parked, Agent.system); + } + +}
\ No newline at end of file diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/restapi/responses/maintenance.json b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/restapi/responses/maintenance.json index 2dcf2d0b838..26d711945c6 100644 --- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/restapi/responses/maintenance.json +++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/restapi/responses/maintenance.json @@ -7,6 +7,9 @@ "name": "DirtyExpirer" }, { + "name": "ExpeditedChangeApplicationMaintainer" + }, + { "name": "FailedExpirer" }, { @@ -37,10 +40,10 @@ "name": "NodeRebooter" }, { - "name": "OperatorChangeApplicationMaintainer" + "name": "OsUpgradeActivator" }, { - "name": "OsUpgradeActivator" + "name": "ParkedExpirer" }, { "name": "PeriodicApplicationMaintainer" diff --git a/searchlib/src/tests/predicate/predicate_index_test.cpp b/searchlib/src/tests/predicate/predicate_index_test.cpp index facf0054c4a..19ad0301b5c 100644 --- a/searchlib/src/tests/predicate/predicate_index_test.cpp +++ b/searchlib/src/tests/predicate/predicate_index_test.cpp @@ -113,7 +113,6 @@ TEST("require that PredicateIndex can index document") { EXPECT_FALSE(index.getIntervalIndex().lookup(hash).valid()); indexFeature(index, doc_id, min_feature, {{hash, interval}}, {}); index.commit(); - auto posting_it = lookupPosting(index, hash); EXPECT_EQUAL(doc_id, posting_it.getKey()); uint32_t size; @@ -123,6 +122,25 @@ TEST("require that PredicateIndex can index document") { EXPECT_EQUAL(interval, interval_list[0]); } +TEST("require that bit vector cache is initialized correctly") { + BitVectorCache::KeyAndCountSet keySet; + keySet.emplace_back(hash, dummy_provider.getDocIdLimit()/2); + PredicateIndex index(generation_holder, dummy_provider, simple_index_config, 10); + EXPECT_FALSE(index.getIntervalIndex().lookup(hash).valid()); + indexFeature(index, doc_id, min_feature, {{hash, interval}}, {}); + index.requireCachePopulation(); + index.populateIfNeeded(dummy_provider.getDocIdLimit()); + EXPECT_TRUE(index.lookupCachedSet(keySet).empty()); + index.commit(); + EXPECT_TRUE(index.getIntervalIndex().lookup(hash).valid()); + EXPECT_TRUE(index.lookupCachedSet(keySet).empty()); + + index.requireCachePopulation(); + index.populateIfNeeded(dummy_provider.getDocIdLimit()); + EXPECT_FALSE(index.lookupCachedSet(keySet).empty()); +} + + TEST("require that PredicateIndex can index document with bounds") { PredicateIndex index(generation_holder, dummy_provider, simple_index_config, 10); EXPECT_FALSE(index.getIntervalIndex().lookup(hash).valid()); diff --git a/searchlib/src/tests/queryeval/predicate/predicate_blueprint_test.cpp b/searchlib/src/tests/queryeval/predicate/predicate_blueprint_test.cpp index 3dd2ec26dea..5b8d5f5b9ce 100644 --- a/searchlib/src/tests/queryeval/predicate/predicate_blueprint_test.cpp +++ b/searchlib/src/tests/queryeval/predicate/predicate_blueprint_test.cpp @@ -86,8 +86,7 @@ TEST_F("require that blueprint with empty index estimates empty.", Fixture) { EXPECT_EQUAL(0u, blueprint.getState().estimate().estHits); } -TEST_F("require that blueprint with zero-constraint doc estimates non-empty.", - Fixture) { +TEST_F("require that blueprint with zero-constraint doc estimates non-empty.", Fixture) { f.indexEmptyDocument(42); PredicateBlueprint blueprint(f.field, f.guard(), f.query); EXPECT_FALSE(blueprint.getState().estimate().empty); @@ -98,11 +97,9 @@ const int min_feature = 1; const uint32_t doc_id = 2; const uint32_t interval = 0x0001ffff; -TEST_F("require that blueprint with posting list entry estimates non-empty.", - Fixture) { +TEST_F("require that blueprint with posting list entry estimates non-empty.", Fixture) { PredicateTreeAnnotations annotations(min_feature); - annotations.interval_map[PredicateHash::hash64("key=value")] = - std::vector<Interval>{{interval}}; + annotations.interval_map[PredicateHash::hash64("key=value")] = std::vector<Interval>{{interval}}; f.indexDocument(doc_id, annotations); PredicateBlueprint blueprint(f.field, f.guard(), f.query); @@ -110,8 +107,7 @@ TEST_F("require that blueprint with posting list entry estimates non-empty.", EXPECT_EQUAL(0u, blueprint.getState().estimate().estHits); } -TEST_F("require that blueprint with 'bounds' posting list entry estimates " - "non-empty.", Fixture) { +TEST_F("require that blueprint with 'bounds' posting list entry estimates non-empty.", Fixture) { PredicateTreeAnnotations annotations(min_feature); annotations.bounds_map[PredicateHash::hash64("range_key=40")] = std::vector<IntervalWithBounds>{{interval, 0x80000003}}; @@ -122,34 +118,50 @@ TEST_F("require that blueprint with 'bounds' posting list entry estimates " EXPECT_EQUAL(0u, blueprint.getState().estimate().estHits); } -TEST_F("require that blueprint with zstar-compressed estimates non-empty.", - Fixture) { +TEST_F("require that blueprint with zstar-compressed estimates non-empty.", Fixture) { PredicateTreeAnnotations annotations(1); - annotations.interval_map[Constants::z_star_compressed_hash] =std::vector<Interval>{{0xfffe0000}}; + annotations.interval_map[Constants::z_star_compressed_hash] = std::vector<Interval>{{0xfffe0000}}; f.indexDocument(doc_id, annotations); PredicateBlueprint blueprint(f.field, f.guard(), f.query); EXPECT_FALSE(blueprint.getState().estimate().empty); EXPECT_EQUAL(0u, blueprint.getState().estimate().estHits); } -TEST_F("require that blueprint can create search", Fixture) { - PredicateTreeAnnotations annotations(1); - annotations.interval_map[PredicateHash::hash64("key=value")] =std::vector<Interval>{{interval}}; - f.indexDocument(doc_id, annotations); - +void +runQuery(Fixture & f, std::vector<uint32_t> expected, bool expectCachedSize, uint32_t expectedKV) { PredicateBlueprint blueprint(f.field, f.guard(), f.query); blueprint.fetchPostings(ExecuteInfo::TRUE); + EXPECT_EQUAL(expectCachedSize, blueprint.getCachedFeatures().size()); + for (uint32_t docId : expected) { + EXPECT_EQUAL(expectedKV, uint32_t(blueprint.getKV()[docId])); + } TermFieldMatchDataArray tfmda; SearchIterator::UP it = blueprint.createLeafSearch(tfmda, true); ASSERT_TRUE(it.get()); it->initFullRange(); EXPECT_EQUAL(SearchIterator::beginId(), it->getDocId()); - EXPECT_FALSE(it->seek(doc_id - 1)); - EXPECT_EQUAL(doc_id, it->getDocId()); - EXPECT_TRUE(it->seek(doc_id)); - EXPECT_EQUAL(doc_id, it->getDocId()); - EXPECT_FALSE(it->seek(doc_id + 1)); - EXPECT_TRUE(it->isAtEnd()); + std::vector<uint32_t> actual; + for (it->seek(1); ! it->isAtEnd(); it->seek(it->getDocId()+1)) { + actual.push_back(it->getDocId()); + } + EXPECT_EQUAL(expected.size(), actual.size()); + for (size_t i(0); i < expected.size(); i++) { + EXPECT_EQUAL(expected[i], actual[i]); + } +} + +TEST_F("require that blueprint can create search", Fixture) { + PredicateTreeAnnotations annotations(1); + annotations.interval_map[PredicateHash::hash64("key=value")] = std::vector<Interval>{{interval}}; + for (size_t i(0); i < 9; i++) { + f.indexDocument(doc_id + i, annotations); + } + runQuery(f, {2,3,4,5,6,7,8,9,10}, 0, 1); + f.indexDocument(doc_id+9, annotations); + runQuery(f, {2, 3,4,5,6,7,8,9,10,11}, 0, 1); + f.index().requireCachePopulation(); + f.indexDocument(doc_id+10, annotations); + runQuery(f, {2,3,4,5,6,7,8,9,10,11,12}, 1, 1); } TEST_F("require that blueprint can create more advanced search", Fixture) { diff --git a/searchlib/src/vespa/searchlib/attribute/predicate_attribute.cpp b/searchlib/src/vespa/searchlib/attribute/predicate_attribute.cpp index c4a0e036a01..555117126a9 100644 --- a/searchlib/src/vespa/searchlib/attribute/predicate_attribute.cpp +++ b/searchlib/src/vespa/searchlib/attribute/predicate_attribute.cpp @@ -100,8 +100,8 @@ PredicateAttribute::getValueCount(DocId) const void PredicateAttribute::onCommit() { - populateIfNeeded(); _index->commit(); + populateIfNeeded(); incGeneration(); } diff --git a/searchlib/src/vespa/searchlib/common/bitvectorcache.h b/searchlib/src/vespa/searchlib/common/bitvectorcache.h index f81fd0163d8..a642d66f42f 100644 --- a/searchlib/src/vespa/searchlib/common/bitvectorcache.h +++ b/searchlib/src/vespa/searchlib/common/bitvectorcache.h @@ -41,6 +41,7 @@ public: void adjustDocIdLimit(uint32_t docId); void populate(uint32_t count, const PopulateInterface &); bool needPopulation() const { return _needPopulation; } + void requirePopulation() { _needPopulation = true; } private: class KeyMeta { public: diff --git a/searchlib/src/vespa/searchlib/predicate/predicate_index.h b/searchlib/src/vespa/searchlib/predicate/predicate_index.h index f4c89a2b369..49bf77f2fcc 100644 --- a/searchlib/src/vespa/searchlib/predicate/predicate_index.h +++ b/searchlib/src/vespa/searchlib/predicate/predicate_index.h @@ -54,8 +54,6 @@ private: template <typename IntervalT> void indexDocumentFeatures(uint32_t doc_id, const FeatureMap<IntervalT> &interval_map); - PopulateInterface::Iterator::UP lookup(uint64_t key) const override; - public: PredicateIndex(GenerationHolder &genHolder, const DocIdLimitProvider &limit_provider, @@ -105,6 +103,9 @@ public: * Adjust size of structures to have space for docId. */ void adjustDocIdLimit(uint32_t docId); + PopulateInterface::Iterator::UP lookup(uint64_t key) const override; + // Exposed for testing + void requireCachePopulation() const { _cache.requirePopulation(); } }; extern template class SimpleIndex<vespalib::datastore::EntryRef>; diff --git a/searchlib/src/vespa/searchlib/predicate/simple_index.h b/searchlib/src/vespa/searchlib/predicate/simple_index.h index 1398bb0817c..75dc540f787 100644 --- a/searchlib/src/vespa/searchlib/predicate/simple_index.h +++ b/searchlib/src/vespa/searchlib/predicate/simple_index.h @@ -168,8 +168,6 @@ private: } public: - SimpleIndex(GenerationHolder &generation_holder, const DocIdLimitProvider &provider) : - SimpleIndex(generation_holder, provider, SimpleIndexConfig()) {} SimpleIndex(GenerationHolder &generation_holder, const DocIdLimitProvider &provider, const SimpleIndexConfig &config) : _generation_holder(generation_holder), _config(config), _limit_provider(provider) {} diff --git a/searchlib/src/vespa/searchlib/queryeval/predicate_blueprint.h b/searchlib/src/vespa/searchlib/queryeval/predicate_blueprint.h index 9609cd4f6c9..ef225e86c50 100644 --- a/searchlib/src/vespa/searchlib/queryeval/predicate_blueprint.h +++ b/searchlib/src/vespa/searchlib/queryeval/predicate_blueprint.h @@ -50,8 +50,11 @@ public: void fetchPostings(const ExecuteInfo &execInfo) override; SearchIterator::UP - createLeafSearch(const fef::TermFieldMatchDataArray &tfmda, - bool strict) const override; + createLeafSearch(const fef::TermFieldMatchDataArray &tfmda, bool strict) const override; + + // Exposed for testing + const BitVectorCache::CountVector & getKV() const { return _kV; } + const BitVectorCache::KeySet & getCachedFeatures() const { return _cachedFeatures; } private: using BTreeIterator = predicate::SimpleIndex<vespalib::datastore::EntryRef>::BTreeIterator; using VectorIterator = predicate::SimpleIndex<vespalib::datastore::EntryRef>::VectorIterator; @@ -70,24 +73,24 @@ private: void addZeroConstraintToK(); std::vector<predicate::PredicatePostingList::UP> createPostingLists() const; - const PredicateAttribute & _attribute; + const PredicateAttribute & _attribute; const predicate::PredicateIndex &_index; - Alloc _kVBacking; - BitVectorCache::CountVector _kV; - BitVectorCache::KeySet _cachedFeatures; + Alloc _kVBacking; + BitVectorCache::CountVector _kV; + BitVectorCache::KeySet _cachedFeatures; - std::vector<IntervalEntry> _interval_dict_entries; - std::vector<BoundsEntry> _bounds_dict_entries; - vespalib::datastore::EntryRef _zstar_dict_entry; + std::vector<IntervalEntry> _interval_dict_entries; + std::vector<BoundsEntry> _bounds_dict_entries; + vespalib::datastore::EntryRef _zstar_dict_entry; - std::vector<IntervalIteratorEntry<BTreeIterator>> _interval_btree_iterators; + std::vector<IntervalIteratorEntry<BTreeIterator>> _interval_btree_iterators; std::vector<IntervalIteratorEntry<VectorIterator>> _interval_vector_iterators; - std::vector<BoundsIteratorEntry<BTreeIterator>> _bounds_btree_iterators; - std::vector<BoundsIteratorEntry<VectorIterator>> _bounds_vector_iterators; + std::vector<BoundsIteratorEntry<BTreeIterator>> _bounds_btree_iterators; + std::vector<BoundsIteratorEntry<VectorIterator>> _bounds_vector_iterators; // The zstar iterator is either a vector or a btree iterator. - optional<BTreeIterator> _zstar_btree_iterator; + optional<BTreeIterator> _zstar_btree_iterator; optional<VectorIterator> _zstar_vector_iterator; - bool _fetch_postings_done; + bool _fetch_postings_done; }; } diff --git a/storage/src/tests/distributor/distributor_host_info_reporter_test.cpp b/storage/src/tests/distributor/distributor_host_info_reporter_test.cpp index e1010285dba..934ecc7456b 100644 --- a/storage/src/tests/distributor/distributor_host_info_reporter_test.cpp +++ b/storage/src/tests/distributor/distributor_host_info_reporter_test.cpp @@ -11,10 +11,12 @@ namespace storage::distributor { -using PerNodeBucketSpacesStats = BucketSpacesStatsProvider::PerNodeBucketSpacesStats; using End = vespalib::JsonStream::End; using File = vespalib::File; +using MinReplicaStats = std::unordered_map<uint16_t, uint32_t>; using Object = vespalib::JsonStream::Object; +using PerNodeBucketSpacesStats = BucketSpacesStatsProvider::PerNodeBucketSpacesStats; +using BucketSpacesStats = BucketSpacesStatsProvider::BucketSpacesStats; using namespace ::testing; struct DistributorHostInfoReporterTest : Test { @@ -35,7 +37,7 @@ namespace { // My kingdom for GoogleMock! struct MockedMinReplicaProvider : MinReplicaProvider { - std::unordered_map<uint16_t, uint32_t> minReplica; + MinReplicaStats minReplica; std::unordered_map<uint16_t, uint32_t> getMinReplica() const override { return minReplica; @@ -121,7 +123,7 @@ struct Fixture { TEST_F(DistributorHostInfoReporterTest, min_replica_stats_are_reported) { Fixture f; - std::unordered_map<uint16_t, uint32_t> minReplica; + MinReplicaStats minReplica; minReplica[0] = 2; minReplica[5] = 9; f.minReplicaProvider.minReplica = minReplica; @@ -133,10 +135,30 @@ TEST_F(DistributorHostInfoReporterTest, min_replica_stats_are_reported) { EXPECT_EQ(9, getMinReplica(root, 5)); } +TEST_F(DistributorHostInfoReporterTest, merge_min_replica_stats) { + + MinReplicaStats min_replica_a; + min_replica_a[3] = 2; + min_replica_a[5] = 4; + + MinReplicaStats min_replica_b; + min_replica_b[5] = 6; + min_replica_b[7] = 8; + + MinReplicaStats result; + merge_min_replica_stats(result, min_replica_a); + merge_min_replica_stats(result, min_replica_b); + + EXPECT_EQ(3, result.size()); + EXPECT_EQ(2, result[3]); + EXPECT_EQ(4, result[5]); + EXPECT_EQ(8, result[7]); +} + TEST_F(DistributorHostInfoReporterTest, generate_example_json) { Fixture f; - std::unordered_map<uint16_t, uint32_t> minReplica; + MinReplicaStats minReplica; minReplica[0] = 2; minReplica[5] = 9; f.minReplicaProvider.minReplica = minReplica; @@ -175,7 +197,7 @@ TEST_F(DistributorHostInfoReporterTest, no_report_generated_if_disabled) { Fixture f; f.reporter.enableReporting(false); - std::unordered_map<uint16_t, uint32_t> minReplica; + MinReplicaStats minReplica; minReplica[0] = 2; minReplica[5] = 9; f.minReplicaProvider.minReplica = minReplica; @@ -210,5 +232,41 @@ TEST_F(DistributorHostInfoReporterTest, bucket_spaces_stats_are_reported) { } } +TEST_F(DistributorHostInfoReporterTest, merge_per_node_bucket_spaces_stats) { + + PerNodeBucketSpacesStats stats_a; + stats_a[3]["default"] = BucketSpaceStats(3, 2); + stats_a[3]["global"] = BucketSpaceStats(5, 4); + stats_a[5]["default"] = BucketSpaceStats(7, 6); + stats_a[5]["global"] = BucketSpaceStats(9, 8); + + PerNodeBucketSpacesStats stats_b; + stats_b[5]["default"] = BucketSpaceStats(11, 10); + stats_b[5]["global"] = BucketSpaceStats(13, 12); + stats_b[7]["default"] = BucketSpaceStats(15, 14); + + PerNodeBucketSpacesStats result; + merge_per_node_bucket_spaces_stats(result, stats_a); + merge_per_node_bucket_spaces_stats(result, stats_b); + + PerNodeBucketSpacesStats exp; + exp[3]["default"] = BucketSpaceStats(3, 2); + exp[3]["global"] = BucketSpaceStats(5, 4); + exp[5]["default"] = BucketSpaceStats(7+11, 6+10); + exp[5]["global"] = BucketSpaceStats(9+13, 8+12); + exp[7]["default"] = BucketSpaceStats(15, 14); + + EXPECT_EQ(exp, result); } +TEST_F(DistributorHostInfoReporterTest, merge_bucket_space_stats_maintains_valid_flag) { + BucketSpaceStats stats_a(5, 3); + BucketSpaceStats stats_b; + + stats_a.merge(stats_b); + EXPECT_FALSE(stats_a.valid()); + EXPECT_EQ(5, stats_a.bucketsTotal()); + EXPECT_EQ(3, stats_a.bucketsPending()); +} + +} diff --git a/storage/src/tests/distributor/simplemaintenancescannertest.cpp b/storage/src/tests/distributor/simplemaintenancescannertest.cpp index 58dc2430041..1bf3809b135 100644 --- a/storage/src/tests/distributor/simplemaintenancescannertest.cpp +++ b/storage/src/tests/distributor/simplemaintenancescannertest.cpp @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <tests/distributor/maintenancemocks.h> +#include <vespa/document/bucket/fixed_bucket_spaces.h> #include <vespa/document/test/make_bucket_space.h> #include <vespa/storage/distributor/distributor_bucket_space.h> #include <vespa/storage/distributor/distributor_bucket_space_repo.h> @@ -209,4 +210,88 @@ TEST_F(SimpleMaintenanceScannerTest, per_node_maintenance_stats_are_tracked) { } } +TEST_F(SimpleMaintenanceScannerTest, merge_node_maintenance_stats) { + + NodeMaintenanceStats stats_a; + stats_a.movingOut = 1; + stats_a.syncing = 2; + stats_a.copyingIn = 3; + stats_a.copyingOut = 4; + stats_a.total = 5; + + NodeMaintenanceStats stats_b; + stats_b.movingOut = 10; + stats_b.syncing = 20; + stats_b.copyingIn = 30; + stats_b.copyingOut = 40; + stats_b.total = 50; + + NodeMaintenanceStats result; + result.merge(stats_a); + result.merge(stats_b); + + NodeMaintenanceStats exp; + exp.movingOut = 11; + exp.syncing = 22; + exp.copyingIn = 33; + exp.copyingOut = 44; + exp.total = 55; + EXPECT_EQ(exp, result); +} + +TEST_F(SimpleMaintenanceScannerTest, merge_pending_maintenance_stats) { + auto default_space = document::FixedBucketSpaces::default_space(); + auto global_space = document::FixedBucketSpaces::global_space(); + + PendingStats stats_a; + stats_a.global.pending[MaintenanceOperation::DELETE_BUCKET] = 1; + stats_a.global.pending[MaintenanceOperation::MERGE_BUCKET] = 2; + stats_a.global.pending[MaintenanceOperation::SPLIT_BUCKET] = 3; + stats_a.global.pending[MaintenanceOperation::JOIN_BUCKET] = 4; + stats_a.global.pending[MaintenanceOperation::SET_BUCKET_STATE] = 5; + stats_a.global.pending[MaintenanceOperation::GARBAGE_COLLECTION] = 6; + stats_a.perNodeStats.incMovingOut(3, default_space); + stats_a.perNodeStats.incSyncing(3, global_space); + stats_a.perNodeStats.incCopyingIn(5, default_space); + stats_a.perNodeStats.incCopyingOut(5, global_space); + stats_a.perNodeStats.incTotal(5, default_space); + + PendingStats stats_b; + stats_b.global.pending[MaintenanceOperation::DELETE_BUCKET] = 10; + stats_b.global.pending[MaintenanceOperation::MERGE_BUCKET] = 20; + stats_b.global.pending[MaintenanceOperation::SPLIT_BUCKET] = 30; + stats_b.global.pending[MaintenanceOperation::JOIN_BUCKET] = 40; + stats_b.global.pending[MaintenanceOperation::SET_BUCKET_STATE] = 50; + stats_b.global.pending[MaintenanceOperation::GARBAGE_COLLECTION] = 60; + stats_b.perNodeStats.incMovingOut(7, default_space); + stats_b.perNodeStats.incSyncing(7, global_space); + stats_b.perNodeStats.incCopyingIn(5, default_space); + stats_b.perNodeStats.incCopyingOut(5, global_space); + stats_b.perNodeStats.incTotal(5, default_space); + + PendingStats result; + result.merge(stats_a); + result.merge(stats_b); + + PendingStats exp; + exp.global.pending[MaintenanceOperation::DELETE_BUCKET] = 11; + exp.global.pending[MaintenanceOperation::MERGE_BUCKET] = 22; + exp.global.pending[MaintenanceOperation::SPLIT_BUCKET] = 33; + exp.global.pending[MaintenanceOperation::JOIN_BUCKET] = 44; + exp.global.pending[MaintenanceOperation::SET_BUCKET_STATE] = 55; + exp.global.pending[MaintenanceOperation::GARBAGE_COLLECTION] = 66; + exp.perNodeStats.incMovingOut(3, default_space); + exp.perNodeStats.incSyncing(3, global_space); + exp.perNodeStats.incCopyingIn(5, default_space); + exp.perNodeStats.incCopyingIn(5, default_space); + exp.perNodeStats.incCopyingOut(5, global_space); + exp.perNodeStats.incCopyingOut(5, global_space); + exp.perNodeStats.incTotal(5, default_space); + exp.perNodeStats.incTotal(5, default_space); + exp.perNodeStats.incMovingOut(7, default_space); + exp.perNodeStats.incSyncing(7, global_space); + EXPECT_EQ(exp.global, result.global); + EXPECT_EQ(exp.perNodeStats, result.perNodeStats); +} + } diff --git a/storage/src/vespa/storage/distributor/CMakeLists.txt b/storage/src/vespa/storage/distributor/CMakeLists.txt index 7b048e9f109..231361d72d6 100644 --- a/storage/src/vespa/storage/distributor/CMakeLists.txt +++ b/storage/src/vespa/storage/distributor/CMakeLists.txt @@ -7,6 +7,7 @@ vespa_add_library(storage_distributor bucket_space_distribution_configs.cpp bucket_space_distribution_context.cpp bucket_space_state_map.cpp + bucket_spaces_stats_provider.cpp bucketdbupdater.cpp bucketgctimecalculator.cpp bucketlistmerger.cpp @@ -22,6 +23,7 @@ vespa_add_library(storage_distributor distributor_stripe_component.cpp distributor_stripe_pool.cpp distributor_stripe_thread.cpp + distributor_total_metrics.cpp distributormessagesender.cpp distributormetricsset.cpp externaloperationhandler.cpp @@ -29,6 +31,7 @@ vespa_add_library(storage_distributor idealstatemanager.cpp idealstatemetricsset.cpp messagetracker.cpp + min_replica_provider.cpp multi_threaded_stripe_access_guard.cpp nodeinfo.cpp operation_routing_snapshot.cpp diff --git a/storage/src/vespa/storage/distributor/bucket_spaces_stats_provider.cpp b/storage/src/vespa/storage/distributor/bucket_spaces_stats_provider.cpp new file mode 100644 index 00000000000..2b12d437aaa --- /dev/null +++ b/storage/src/vespa/storage/distributor/bucket_spaces_stats_provider.cpp @@ -0,0 +1,40 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "bucket_spaces_stats_provider.h" + +namespace storage::distributor { + +std::ostream& +operator<<(std::ostream& out, const BucketSpaceStats& stats) +{ + out << "{valid=" << stats.valid() << ", bucketsTotal=" << stats.bucketsTotal() << ", bucketsPending=" << stats.bucketsPending() << "}"; + return out; +} + +void +merge_bucket_spaces_stats(BucketSpacesStatsProvider::BucketSpacesStats& dest, + const BucketSpacesStatsProvider::BucketSpacesStats& src) +{ + for (const auto& entry : src) { + const auto& bucket_space_name = entry.first; + auto itr = dest.find(bucket_space_name); + if (itr != dest.end()) { + itr->second.merge(entry.second); + } else { + // We need to explicitly handle this case to avoid creating an empty BucketSpaceStats that is not valid. + dest[bucket_space_name] = entry.second; + } + } +} + +void +merge_per_node_bucket_spaces_stats(BucketSpacesStatsProvider::PerNodeBucketSpacesStats& dest, + const BucketSpacesStatsProvider::PerNodeBucketSpacesStats& src) +{ + for (const auto& entry : src) { + auto node_index = entry.first; + merge_bucket_spaces_stats(dest[node_index], entry.second); + } +} + +} diff --git a/storage/src/vespa/storage/distributor/bucket_spaces_stats_provider.h b/storage/src/vespa/storage/distributor/bucket_spaces_stats_provider.h index 3d7b60f4471..c8ba04ed1ab 100644 --- a/storage/src/vespa/storage/distributor/bucket_spaces_stats_provider.h +++ b/storage/src/vespa/storage/distributor/bucket_spaces_stats_provider.h @@ -3,6 +3,7 @@ #include <vespa/vespalib/stllike/string.h> #include <map> +#include <ostream> #include <unordered_map> namespace storage::distributor { @@ -32,8 +33,22 @@ public: bool valid() const noexcept { return _valid; } size_t bucketsTotal() const noexcept { return _bucketsTotal; } size_t bucketsPending() const noexcept { return _bucketsPending; } + + bool operator==(const BucketSpaceStats& rhs) const { + return (_valid == rhs._valid) && + (_bucketsTotal == rhs._bucketsTotal) && + (_bucketsPending == rhs._bucketsPending); + } + + void merge(const BucketSpaceStats& rhs) { + _valid = _valid && rhs._valid; + _bucketsTotal += rhs._bucketsTotal; + _bucketsPending += rhs._bucketsPending; + } }; +std::ostream& operator<<(std::ostream& out, const BucketSpaceStats& stats); + /** * Interface that provides snapshots of bucket spaces statistics per content node. */ @@ -48,4 +63,10 @@ public: virtual PerNodeBucketSpacesStats getBucketSpacesStats() const = 0; }; +void merge_bucket_spaces_stats(BucketSpacesStatsProvider::BucketSpacesStats& dest, + const BucketSpacesStatsProvider::BucketSpacesStats& src); + +void merge_per_node_bucket_spaces_stats(BucketSpacesStatsProvider::PerNodeBucketSpacesStats& dest, + const BucketSpacesStatsProvider::PerNodeBucketSpacesStats& src); + } diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp index 7ac15b3929c..be9ad1179fb 100644 --- a/storage/src/vespa/storage/distributor/distributor.cpp +++ b/storage/src/vespa/storage/distributor/distributor.cpp @@ -9,7 +9,7 @@ #include "distributor_stripe.h" #include "distributor_stripe_pool.h" #include "distributor_stripe_thread.h" -#include "distributormetricsset.h" +#include "distributor_total_metrics.h" #include "idealstatemetricsset.h" #include "multi_threaded_stripe_access_guard.h" #include "operation_sequencer.h" @@ -59,11 +59,12 @@ Distributor::Distributor(DistributorComponentRegister& compReg, : StorageLink("distributor"), framework::StatusReporter("distributor", "Distributor"), _comp_reg(compReg), + _use_legacy_mode(num_distributor_stripes == 0), _metrics(std::make_shared<DistributorMetricSet>()), + _total_metrics(_use_legacy_mode ? std::shared_ptr<DistributorTotalMetrics>() : std::make_shared<DistributorTotalMetrics>(num_distributor_stripes)), _messageSender(messageSender), - _use_legacy_mode(num_distributor_stripes == 0), _n_stripe_bits(0), - _stripe(std::make_unique<DistributorStripe>(compReg, *_metrics, node_identity, threadPool, + _stripe(std::make_unique<DistributorStripe>(compReg, _use_legacy_mode ? *_metrics : _total_metrics->stripe(0), node_identity, threadPool, doneInitHandler, *this, *this, _use_legacy_mode)), _stripe_pool(stripe_pool), _stripes(), @@ -91,7 +92,7 @@ Distributor::Distributor(DistributorComponentRegister& compReg, _next_distribution(), _current_internal_config_generation(_component.internal_config_generation()) { - _component.registerMetric(*_metrics); + _component.registerMetric(_use_legacy_mode ? *_metrics : *_total_metrics); _component.registerMetricUpdateHook(_metricUpdateHook, framework::SecondTime(0)); if (!_use_legacy_mode) { assert(num_distributor_stripes == adjusted_num_stripes(num_distributor_stripes)); @@ -105,7 +106,7 @@ Distributor::Distributor(DistributorComponentRegister& compReg, *_stripe_accessor); _stripes.emplace_back(std::move(_stripe)); for (size_t i = 1; i < num_distributor_stripes; ++i) { - _stripes.emplace_back(std::make_unique<DistributorStripe>(compReg, *_metrics, node_identity, threadPool, + _stripes.emplace_back(std::make_unique<DistributorStripe>(compReg, _total_metrics->stripe(i), node_identity, threadPool, doneInitHandler, *this, *this, _use_legacy_mode, i)); } _stripe_scan_stats.resize(num_distributor_stripes); @@ -124,6 +125,12 @@ Distributor::~Distributor() closeNextLink(); } +DistributorMetricSet& +Distributor::getMetrics() +{ + return _use_legacy_mode ? *_metrics : _total_metrics->bucket_db_updater_metrics(); +} + // TODO STRIPE remove DistributorStripe& Distributor::first_stripe() noexcept { @@ -322,6 +329,7 @@ namespace { bool should_be_handled_by_top_level_bucket_db_updater(const api::StorageMessage& msg) noexcept { switch (msg.getType().getId()) { case api::MessageType::SETSYSTEMSTATE_ID: + case api::MessageType::GETNODESTATE_ID: case api::MessageType::ACTIVATE_CLUSTER_STATE_VERSION_ID: return true; case api::MessageType::REQUESTBUCKETINFO_REPLY_ID: @@ -521,44 +529,54 @@ Distributor::propagateDefaultDistribution( std::unordered_map<uint16_t, uint32_t> Distributor::getMinReplica() const { - // TODO STRIPE merged snapshot from all stripes if (_use_legacy_mode) { return _stripe->getMinReplica(); } else { - return first_stripe().getMinReplica(); + std::unordered_map<uint16_t, uint32_t> result; + for (const auto& stripe : _stripes) { + merge_min_replica_stats(result, stripe->getMinReplica()); + } + return result; } } BucketSpacesStatsProvider::PerNodeBucketSpacesStats Distributor::getBucketSpacesStats() const { - // TODO STRIPE merged snapshot from all stripes if (_use_legacy_mode) { return _stripe->getBucketSpacesStats(); } else { - return first_stripe().getBucketSpacesStats(); + BucketSpacesStatsProvider::PerNodeBucketSpacesStats result; + for (const auto& stripe : _stripes) { + merge_per_node_bucket_spaces_stats(result, stripe->getBucketSpacesStats()); + } + return result; } } SimpleMaintenanceScanner::PendingMaintenanceStats Distributor::pending_maintenance_stats() const { - // TODO STRIPE merged snapshot from all stripes if (_use_legacy_mode) { return _stripe->pending_maintenance_stats(); } else { - return first_stripe().pending_maintenance_stats(); + SimpleMaintenanceScanner::PendingMaintenanceStats result; + for (const auto& stripe : _stripes) { + result.merge(stripe->pending_maintenance_stats()); + } + return result; } } void Distributor::propagateInternalScanMetricsToExternal() { - // TODO STRIPE propagate to all stripes - // TODO STRIPE reconsider metric wiring... if (_use_legacy_mode) { _stripe->propagateInternalScanMetricsToExternal(); } else { - first_stripe().propagateInternalScanMetricsToExternal(); + for (auto &stripe : _stripes) { + stripe->propagateInternalScanMetricsToExternal(); + } + _total_metrics->aggregate(); } } diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h index 6f9654b78b8..48ad061859f 100644 --- a/storage/src/vespa/storage/distributor/distributor.h +++ b/storage/src/vespa/storage/distributor/distributor.h @@ -44,6 +44,7 @@ class DistributorBucketSpaceRepo; class DistributorStatus; class DistributorStripe; class DistributorStripePool; +class DistributorTotalMetrics; class StripeAccessor; class OperationSequencer; class OwnershipTransferSafeTimePointCalculator; @@ -78,7 +79,7 @@ public: void sendUp(const std::shared_ptr<api::StorageMessage>&) override; void sendDown(const std::shared_ptr<api::StorageMessage>&) override; - DistributorMetricSet& getMetrics() { return *_metrics; } + DistributorMetricSet& getMetrics(); // Implements DistributorInterface and DistributorMessageSender. DistributorMetricSet& metrics() override { return getMetrics(); } @@ -201,9 +202,10 @@ private: using MessageQueue = std::vector<std::shared_ptr<api::StorageMessage>>; DistributorComponentRegister& _comp_reg; + const bool _use_legacy_mode; std::shared_ptr<DistributorMetricSet> _metrics; + std::shared_ptr<DistributorTotalMetrics> _total_metrics; ChainedMessageSender* _messageSender; - const bool _use_legacy_mode; // TODO STRIPE multiple stripes...! This is for proof of concept of wiring. uint8_t _n_stripe_bits; std::unique_ptr<DistributorStripe> _stripe; diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.cpp b/storage/src/vespa/storage/distributor/distributor_stripe.cpp index 4f6e2d5016b..c94c1a415b0 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe.cpp +++ b/storage/src/vespa/storage/distributor/distributor_stripe.cpp @@ -748,7 +748,7 @@ void DistributorStripe::send_updated_host_info_if_required() { if (_use_legacy_mode) { _component.getStateUpdater().immediately_send_get_node_state_replies(); } else { - _stripe_host_info_notifier.notify_stripe_wants_to_send_host_info(0); // TODO STRIPE correct stripe index! + _stripe_host_info_notifier.notify_stripe_wants_to_send_host_info(_stripe_index); } _must_send_updated_host_info = false; } diff --git a/storage/src/vespa/storage/distributor/distributor_total_metrics.cpp b/storage/src/vespa/storage/distributor/distributor_total_metrics.cpp new file mode 100644 index 00000000000..543712cc4d2 --- /dev/null +++ b/storage/src/vespa/storage/distributor/distributor_total_metrics.cpp @@ -0,0 +1,40 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "distributor_total_metrics.h" + +namespace storage::distributor { + +DistributorTotalMetrics::DistributorTotalMetrics(uint32_t num_distributor_stripes) + : DistributorMetricSet(), + _stripes_metrics(), + _bucket_db_updater_metrics() +{ + _stripes_metrics.reserve(num_distributor_stripes); + for (uint32_t i = 0; i < num_distributor_stripes; ++i) { + _stripes_metrics.emplace_back(std::make_shared<DistributorMetricSet>()); + } +} + +DistributorTotalMetrics::~DistributorTotalMetrics() = default; + +void +DistributorTotalMetrics::aggregate() +{ + DistributorMetricSet::reset(); + _bucket_db_updater_metrics.addToPart(*this); + for (auto &stripe_metrics : _stripes_metrics) { + stripe_metrics->addToPart(*this); + } +} + +void +DistributorTotalMetrics::reset() +{ + DistributorMetricSet::reset(); + _bucket_db_updater_metrics.reset(); + for (auto &stripe_metrics : _stripes_metrics) { + stripe_metrics->reset(); + } +} + +} diff --git a/storage/src/vespa/storage/distributor/distributor_total_metrics.h b/storage/src/vespa/storage/distributor/distributor_total_metrics.h new file mode 100644 index 00000000000..14116af3d3b --- /dev/null +++ b/storage/src/vespa/storage/distributor/distributor_total_metrics.h @@ -0,0 +1,27 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "distributormetricsset.h" + +namespace storage::distributor { + +/* + * Class presenting total metrics (as a DistributorMetricSet) to the + * metric framework, while managing a DistributorMetricSet for each + * stripe and an extra one for the top level bucket db updater. + */ +class DistributorTotalMetrics : public DistributorMetricSet +{ + std::vector<std::shared_ptr<DistributorMetricSet>> _stripes_metrics; + DistributorMetricSet _bucket_db_updater_metrics; +public: + explicit DistributorTotalMetrics(uint32_t num_distributor_stripes); + ~DistributorTotalMetrics() override; + void aggregate(); + void reset() override; + DistributorMetricSet& stripe(uint32_t stripe_index) { return *_stripes_metrics[stripe_index]; } + DistributorMetricSet& bucket_db_updater_metrics() { return _bucket_db_updater_metrics; } +}; + +} diff --git a/storage/src/vespa/storage/distributor/maintenance/node_maintenance_stats_tracker.cpp b/storage/src/vespa/storage/distributor/maintenance/node_maintenance_stats_tracker.cpp index b954ef93c76..4e7f7d9d89d 100644 --- a/storage/src/vespa/storage/distributor/maintenance/node_maintenance_stats_tracker.cpp +++ b/storage/src/vespa/storage/distributor/maintenance/node_maintenance_stats_tracker.cpp @@ -7,6 +7,39 @@ namespace storage::distributor { const NodeMaintenanceStats NodeMaintenanceStatsTracker::_emptyNodeMaintenanceStats; +void +NodeMaintenanceStats::merge(const NodeMaintenanceStats& rhs) +{ + movingOut += rhs.movingOut; + syncing += rhs.syncing; + copyingIn += rhs.copyingIn; + copyingOut += rhs.copyingOut; + total += rhs.total; +} + +namespace { + +void +merge_bucket_spaces_stats(NodeMaintenanceStatsTracker::BucketSpacesStats& dest, + const NodeMaintenanceStatsTracker::BucketSpacesStats& src) +{ + for (const auto& entry : src) { + auto bucket_space = entry.first; + dest[bucket_space].merge(entry.second); + } +} + +} + +void +NodeMaintenanceStatsTracker::merge(const NodeMaintenanceStatsTracker& rhs) +{ + for (const auto& entry : rhs._stats) { + auto node_index = entry.first; + merge_bucket_spaces_stats(_stats[node_index], entry.second); + } +} + std::ostream& operator<<(std::ostream& os, const NodeMaintenanceStats& stats) { diff --git a/storage/src/vespa/storage/distributor/maintenance/node_maintenance_stats_tracker.h b/storage/src/vespa/storage/distributor/maintenance/node_maintenance_stats_tracker.h index faf253fc84c..6399e53089b 100644 --- a/storage/src/vespa/storage/distributor/maintenance/node_maintenance_stats_tracker.h +++ b/storage/src/vespa/storage/distributor/maintenance/node_maintenance_stats_tracker.h @@ -37,6 +37,8 @@ struct NodeMaintenanceStats bool operator!=(const NodeMaintenanceStats& other) const noexcept { return !(*this == other); } + + void merge(const NodeMaintenanceStats& rhs); }; std::ostream& operator<<(std::ostream&, const NodeMaintenanceStats&); @@ -93,6 +95,11 @@ public: const PerNodeStats& perNodeStats() const { return _stats; } + + bool operator==(const NodeMaintenanceStatsTracker& rhs) const { + return _stats == rhs._stats; + } + void merge(const NodeMaintenanceStatsTracker& rhs); }; } // storage::distributor diff --git a/storage/src/vespa/storage/distributor/maintenance/simplemaintenancescanner.cpp b/storage/src/vespa/storage/distributor/maintenance/simplemaintenancescanner.cpp index 15a57c1e7ee..2bfce9569cc 100644 --- a/storage/src/vespa/storage/distributor/maintenance/simplemaintenancescanner.cpp +++ b/storage/src/vespa/storage/distributor/maintenance/simplemaintenancescanner.cpp @@ -19,6 +19,28 @@ SimpleMaintenanceScanner::SimpleMaintenanceScanner(BucketPriorityDatabase& bucke SimpleMaintenanceScanner::~SimpleMaintenanceScanner() = default; +bool +SimpleMaintenanceScanner::GlobalMaintenanceStats::operator==(const GlobalMaintenanceStats& rhs) const +{ + return pending == rhs.pending; +} + +void +SimpleMaintenanceScanner::GlobalMaintenanceStats::merge(const GlobalMaintenanceStats& rhs) +{ + assert(pending.size() == rhs.pending.size()); + for (size_t i = 0; i < pending.size(); ++i) { + pending[i] += rhs.pending[i]; + } +} + +void +SimpleMaintenanceScanner::PendingMaintenanceStats::merge(const PendingMaintenanceStats& rhs) +{ + global.merge(rhs.global); + perNodeStats.merge(rhs.perNodeStats); +} + SimpleMaintenanceScanner::PendingMaintenanceStats::PendingMaintenanceStats() = default; SimpleMaintenanceScanner::PendingMaintenanceStats::~PendingMaintenanceStats() = default; SimpleMaintenanceScanner::PendingMaintenanceStats::PendingMaintenanceStats(const PendingMaintenanceStats &) = default; diff --git a/storage/src/vespa/storage/distributor/maintenance/simplemaintenancescanner.h b/storage/src/vespa/storage/distributor/maintenance/simplemaintenancescanner.h index 254b3244171..69e63fd4c65 100644 --- a/storage/src/vespa/storage/distributor/maintenance/simplemaintenancescanner.h +++ b/storage/src/vespa/storage/distributor/maintenance/simplemaintenancescanner.h @@ -18,6 +18,9 @@ public: GlobalMaintenanceStats() : pending(MaintenanceOperation::OPERATION_COUNT) { } + + bool operator==(const GlobalMaintenanceStats& rhs) const; + void merge(const GlobalMaintenanceStats& rhs); }; struct PendingMaintenanceStats { PendingMaintenanceStats(); @@ -26,6 +29,8 @@ public: ~PendingMaintenanceStats(); GlobalMaintenanceStats global; NodeMaintenanceStatsTracker perNodeStats; + + void merge(const PendingMaintenanceStats& rhs); }; private: BucketPriorityDatabase& _bucketPriorityDb; diff --git a/storage/src/vespa/storage/distributor/min_replica_provider.cpp b/storage/src/vespa/storage/distributor/min_replica_provider.cpp new file mode 100644 index 00000000000..c9929940560 --- /dev/null +++ b/storage/src/vespa/storage/distributor/min_replica_provider.cpp @@ -0,0 +1,19 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "min_replica_provider.h" + +namespace storage::distributor { + +void +merge_min_replica_stats(std::unordered_map<uint16_t, uint32_t>& dest, + const std::unordered_map<uint16_t, uint32_t>& src) +{ + for (const auto& entry : src) { + auto node_index = entry.first; + auto itr = dest.find(node_index); + auto new_min_replica = (itr != dest.end()) ? std::min(itr->second, entry.second) : entry.second; + dest[node_index] = new_min_replica; + } +} + +} diff --git a/storage/src/vespa/storage/distributor/min_replica_provider.h b/storage/src/vespa/storage/distributor/min_replica_provider.h index 6d644f4e9d4..ba946cd5a7f 100644 --- a/storage/src/vespa/storage/distributor/min_replica_provider.h +++ b/storage/src/vespa/storage/distributor/min_replica_provider.h @@ -4,8 +4,7 @@ #include <stdint.h> #include <unordered_map> -namespace storage { -namespace distributor { +namespace storage::distributor { class MinReplicaProvider { @@ -21,5 +20,8 @@ public: virtual std::unordered_map<uint16_t, uint32_t> getMinReplica() const = 0; }; -} // distributor -} // storage +void merge_min_replica_stats(std::unordered_map<uint16_t, uint32_t>& dest, + const std::unordered_map<uint16_t, uint32_t>& src); + +} + diff --git a/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/api/AthenzAccessToken.java b/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/api/AthenzAccessToken.java index c085be7c205..561b20a9c8a 100644 --- a/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/api/AthenzAccessToken.java +++ b/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/api/AthenzAccessToken.java @@ -6,7 +6,10 @@ import com.auth0.jwt.interfaces.DecodedJWT; import com.yahoo.vespa.athenz.utils.AthenzIdentities; import java.time.Instant; +import java.util.List; import java.util.Objects; +import java.util.Optional; +import java.util.stream.Collectors; /** * Represents an Athenz Access Token @@ -18,6 +21,8 @@ public class AthenzAccessToken { public static final String HTTP_HEADER_NAME = "Authorization"; private static final String BEARER_TOKEN_PREFIX = "Bearer "; + private static final String SCOPE_CLAIM = "scp"; + private static final String AUDIENCE_CLAIM = "aud"; private final String value; private volatile DecodedJWT jwt; @@ -43,6 +48,12 @@ public class AthenzAccessToken { return jwt().getExpiresAt().toInstant(); } public AthenzIdentity getAthenzIdentity() { return AthenzIdentities.from(jwt().getClaim("client_id").asString()); } + public List<AthenzRole> roles() { + String domain = Optional.ofNullable(jwt().getClaim(AUDIENCE_CLAIM).asString()).orElse(""); + return Optional.ofNullable(jwt().getClaim(SCOPE_CLAIM).asList(String.class)).orElse(List.of()).stream() + .map(role -> new AthenzRole(domain, role)) + .collect(Collectors.toList()); + } private DecodedJWT jwt() { if (jwt == null) { diff --git a/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java b/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java index 9d4f3525c32..83abe0bb872 100644 --- a/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java +++ b/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java @@ -84,7 +84,7 @@ public class CliClient { builder.setHostnameVerifier(AcceptAllHostnameVerifier.INSTANCE); } cliArgs.certificateAndKey().ifPresent(c -> builder.setCertificate(c.certificateFile, c.privateKeyFile)); - cliArgs.caCertificates().ifPresent(builder::setCaCertificates); + cliArgs.caCertificates().ifPresent(builder::setCaCertificatesFile); cliArgs.headers().forEach(builder::addRequestHeader); return builder.build(); } @@ -127,7 +127,7 @@ public class CliClient { JsonFactory factory = new JsonFactory(); long okCount = stats.successes(); long errorCount = stats.requests() - okCount; - double throughput = okCount * 1e6D / Math.max(1, durationNanos); + double throughput = okCount * 1e9 / Math.max(1, durationNanos); try (JsonGenerator generator = factory.createGenerator(systemOut).useDefaultPrettyPrinter()) { generator.writeStartObject(); generator.writeNumberField("feeder.runtime", durationNanos / 1_000_000); diff --git a/vespa-feed-client/pom.xml b/vespa-feed-client/pom.xml index 7759e9d2308..02d4a0128ea 100644 --- a/vespa-feed-client/pom.xml +++ b/vespa-feed-client/pom.xml @@ -25,6 +25,18 @@ <scope>compile</scope> </dependency> <dependency> + <groupId>org.eclipse.jetty.http2</groupId> + <artifactId>http2-http-client-transport</artifactId> + <version>${jetty.version}</version> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-client</artifactId> + <version>${jetty.version}</version> + <scope>compile</scope> + </dependency> + <dependency> <groupId>org.apache.httpcomponents.client5</groupId> <artifactId>httpclient5</artifactId> <scope>compile</scope> diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java index 672f5f080b5..e5d45a2f211 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java @@ -34,7 +34,7 @@ class ApacheCluster implements Cluster { private final List<Endpoint> endpoints = new ArrayList<>(); - public ApacheCluster(FeedClientBuilder builder) throws IOException { + ApacheCluster(FeedClientBuilder builder) throws IOException { for (URI endpoint : builder.endpoints) for (int i = 0; i < builder.connectionsPerEndpoint; i++) endpoints.add(new Endpoint(createHttpClient(builder), endpoint)); @@ -127,7 +127,7 @@ class ApacheCluster implements Cluster { .setInitialWindowSize(Integer.MAX_VALUE) .build()); - SSLContext sslContext = constructSslContext(builder); + SSLContext sslContext = builder.constructSslContext(); String[] allowedCiphers = excludeH2Blacklisted(excludeWeak(sslContext.getSupportedSSLParameters().getCipherSuites())); if (allowedCiphers.length == 0) throw new IllegalStateException("No adequate SSL cipher suites supported by the JVM"); @@ -142,19 +142,6 @@ class ApacheCluster implements Cluster { .build(); } - private static SSLContext constructSslContext(FeedClientBuilder builder) throws IOException { - if (builder.sslContext != null) return builder.sslContext; - SslContextBuilder sslContextBuilder = new SslContextBuilder(); - if (builder.certificate != null && builder.privateKey != null) { - sslContextBuilder.withCertificateAndKey(builder.certificate, builder.privateKey); - } - if (builder.caCertificates != null) { - sslContextBuilder.withCaCertificates(builder.caCertificates); - } - return sslContextBuilder.build(); - } - - private static class ApacheHttpResponse implements HttpResponse { private final SimpleHttpResponse wrapped; @@ -163,7 +150,6 @@ class ApacheCluster implements Cluster { this.wrapped = wrapped; } - @Override public int code() { return wrapped.getCode(); diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java index d0a221ed358..df1f3bcd54c 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java @@ -7,8 +7,11 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.net.URI; import java.nio.file.Path; +import java.security.PrivateKey; +import java.security.cert.X509Certificate; import java.time.Duration; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -35,9 +38,12 @@ public class FeedClientBuilder { int maxStreamsPerConnection = 128; FeedClient.RetryStrategy retryStrategy = defaultRetryStrategy; FeedClient.CircuitBreaker circuitBreaker = new GracePeriodCircuitBreaker(Duration.ofSeconds(1), Duration.ofMinutes(10)); - Path certificate; - Path privateKey; - Path caCertificates; + Path certificateFile; + Path privateKeyFile; + Path caCertificatesFile; + Collection<X509Certificate> certificate; + PrivateKey privateKey; + Collection<X509Certificate> caCertificates; public static FeedClientBuilder create(URI endpoint) { return new FeedClientBuilder(Collections.singletonList(endpoint)); } @@ -81,9 +87,6 @@ public class FeedClientBuilder { } public FeedClientBuilder setSslContext(SSLContext context) { - if (certificate != null || caCertificates != null || privateKey != null) { - throw new IllegalArgumentException("Cannot set both SSLContext and certificate / CA certificates"); - } this.sslContext = requireNonNull(context); return this; } @@ -113,24 +116,77 @@ public class FeedClientBuilder { } public FeedClientBuilder setCertificate(Path certificatePemFile, Path privateKeyPemFile) { - if (sslContext != null) throw new IllegalArgumentException("Cannot set both SSLContext and certificate"); - this.certificate = certificatePemFile; - this.privateKey = privateKeyPemFile; + this.certificateFile = certificatePemFile; + this.privateKeyFile = privateKeyPemFile; + return this; + } + + public FeedClientBuilder setCertificate(Collection<X509Certificate> certificate, PrivateKey privateKey) { + this.certificate = certificate; + this.privateKey = privateKey; + return this; + } + + public FeedClientBuilder setCertificate(X509Certificate certificate, PrivateKey privateKey) { + return setCertificate(Collections.singletonList(certificate), privateKey); + } + + public FeedClientBuilder setCaCertificatesFile(Path caCertificatesFile) { + this.caCertificatesFile = caCertificatesFile; return this; } - public FeedClientBuilder setCaCertificates(Path caCertificatesFile) { - if (sslContext != null) throw new IllegalArgumentException("Cannot set both SSLContext and CA certificate"); - this.caCertificates = caCertificatesFile; + public FeedClientBuilder setCaCertificates(Collection<X509Certificate> caCertificates) { + this.caCertificates = caCertificates; return this; } public FeedClient build() { try { + validateConfiguration(); return new HttpFeedClient(this); } catch (IOException e) { throw new UncheckedIOException(e); } } + SSLContext constructSslContext() throws IOException { + if (sslContext != null) return sslContext; + SslContextBuilder sslContextBuilder = new SslContextBuilder(); + if (certificateFile != null && privateKeyFile != null) { + sslContextBuilder.withCertificateAndKey(certificateFile, privateKeyFile); + } else if (certificate != null && privateKey != null) { + sslContextBuilder.withCertificateAndKey(certificate, privateKey); + } + if (caCertificatesFile != null) { + sslContextBuilder.withCaCertificates(caCertificatesFile); + } else if (caCertificates != null) { + sslContextBuilder.withCaCertificates(caCertificates); + } + return sslContextBuilder.build(); + } + + private void validateConfiguration() { + if (sslContext != null && ( + certificateFile != null || caCertificatesFile != null || privateKeyFile != null || + certificate != null || caCertificates != null || privateKey != null)) { + throw new IllegalArgumentException("Cannot set both SSLContext and certificate / CA certificates"); + } + if (certificate != null && certificateFile != null) { + throw new IllegalArgumentException("Cannot set both certificate directly and as file"); + } + if (privateKey != null && privateKeyFile != null) { + throw new IllegalArgumentException("Cannot set both private key directly and as file"); + } + if (caCertificates != null && caCertificatesFile != null) { + throw new IllegalArgumentException("Cannot set both CA certificates directly and as file"); + } + if (certificate != null && certificate.isEmpty()) { + throw new IllegalArgumentException("Certificate cannot be empty"); + } + if (caCertificates != null && caCertificates.isEmpty()) { + throw new IllegalArgumentException("CA certificates cannot be empty"); + } + } + } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java index c572f84db54..256d3ae535c 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java @@ -4,18 +4,14 @@ package ai.vespa.feed.client; import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonToken; -import org.apache.hc.core5.http.ContentType; -import org.apache.hc.core5.net.URIBuilder; import java.io.IOException; import java.io.UncheckedIOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; +import java.io.UnsupportedEncodingException; +import java.net.URLEncoder; import java.util.HashMap; -import java.util.List; import java.util.Map; +import java.util.StringJoiner; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; @@ -86,8 +82,10 @@ class HttpFeedClient implements FeedClient { private CompletableFuture<Result> send(String method, DocumentId documentId, String operationJson, OperationParameters params) { ensureOpen(); - String path = operationPath(documentId, params).toString(); - HttpRequest request = new HttpRequest(method, path, requestHeaders, operationJson.getBytes(UTF_8)); // TODO: make it bytes all the way? + HttpRequest request = new HttpRequest(method, + getPath(documentId) + getQuery(params), + requestHeaders, + operationJson.getBytes(UTF_8)); // TODO: make it bytes all the way? return requestStrategy.enqueue(documentId, request) .thenApply(response -> toResult(request, response, documentId)); @@ -134,44 +132,45 @@ class HttpFeedClient implements FeedClient { return new Result(type, documentId, message, trace); } - static List<String> toPath(DocumentId documentId) { - List<String> path = new ArrayList<>(); + static String getPath(DocumentId documentId) { + StringJoiner path = new StringJoiner("/", "/", ""); path.add("document"); path.add("v1"); - path.add(documentId.namespace()); - path.add(documentId.documentType()); + path.add(encode(documentId.namespace())); + path.add(encode(documentId.documentType())); if (documentId.number().isPresent()) { path.add("number"); path.add(Long.toUnsignedString(documentId.number().getAsLong())); } else if (documentId.group().isPresent()) { path.add("group"); - path.add(documentId.group().get()); + path.add(encode(documentId.group().get())); } else { path.add("docid"); } - path.add(documentId.userSpecific()); + path.add(encode(documentId.userSpecific())); - return path; + return path.toString(); } - static URI operationPath(DocumentId documentId, OperationParameters params) { - URIBuilder url = new URIBuilder(); - url.setPathSegments(toPath(documentId)); - - if (params.createIfNonExistent()) url.addParameter("create", "true"); - params.testAndSetCondition().ifPresent(condition -> url.addParameter("condition", condition)); - params.timeout().ifPresent(timeout -> url.addParameter("timeout", timeout.toMillis() + "ms")); - params.route().ifPresent(route -> url.addParameter("route", route)); - params.tracelevel().ifPresent(tracelevel -> url.addParameter("tracelevel", Integer.toString(tracelevel))); - + static String encode(String raw) { try { - return url.build(); + return URLEncoder.encode(raw, UTF_8.name()); } - catch (URISyntaxException e) { + catch (UnsupportedEncodingException e) { throw new IllegalStateException(e); } } + static String getQuery(OperationParameters params) { + StringJoiner query = new StringJoiner("&", "?", "").setEmptyValue(""); + if (params.createIfNonExistent()) query.add("create=true"); + params.testAndSetCondition().ifPresent(condition -> query.add("condition=" + encode(condition))); + params.timeout().ifPresent(timeout -> query.add("timeout=" + timeout.toMillis() + "ms")); + params.route().ifPresent(route -> query.add("route=" + encode(route))); + params.tracelevel().ifPresent(tracelevel -> query.add("tracelevel=" + tracelevel)); + return query.toString(); + } + } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java index 5646d37cde3..f9fc7544501 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java @@ -59,7 +59,7 @@ class HttpRequestStrategy implements RequestStrategy { }); HttpRequestStrategy(FeedClientBuilder builder) throws IOException { - this(builder, new BenchmarkingCluster(new ApacheCluster(builder))); + this(builder, new BenchmarkingCluster(new JettyCluster(builder))); } HttpRequestStrategy(FeedClientBuilder builder, Cluster cluster) { diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JettyCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JettyCluster.java new file mode 100644 index 00000000000..dc889d29d36 --- /dev/null +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JettyCluster.java @@ -0,0 +1,123 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client; + +import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.HttpClientTransport; +import org.eclipse.jetty.client.api.Request; +import org.eclipse.jetty.client.api.Result; +import org.eclipse.jetty.client.util.BufferingResponseListener; +import org.eclipse.jetty.client.util.BytesContentProvider; +import org.eclipse.jetty.http.HttpField; +import org.eclipse.jetty.http2.client.HTTP2Client; +import org.eclipse.jetty.http2.client.http.HttpClientTransportOverHTTP2; +import org.eclipse.jetty.util.ssl.SslContextFactory; + +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * @author jonmv + */ +class JettyCluster implements Cluster { + + private final List<Endpoint> endpoints = new ArrayList<>(); + + JettyCluster(FeedClientBuilder builder) { + for (URI endpoint : builder.endpoints) + endpoints.add(new Endpoint(createJettyHttpClient(builder), endpoint)); + } + + private static HttpClient createJettyHttpClient(FeedClientBuilder builder) { + try { + SslContextFactory.Client clientSslCtxFactory = new SslContextFactory.Client(); + clientSslCtxFactory.setSslContext(builder.constructSslContext()); + clientSslCtxFactory.setHostnameVerifier(builder.hostnameVerifier); + + HTTP2Client wrapped = new HTTP2Client(); + wrapped.setSelectors(8); + wrapped.setMaxConcurrentPushedStreams(builder.maxStreamsPerConnection); + HttpClientTransport transport = new HttpClientTransportOverHTTP2(wrapped); + HttpClient client = new HttpClient(transport, clientSslCtxFactory); + client.setUserAgentField(new HttpField("User-Agent", String.format("vespa-feed-client/%s", Vespa.VERSION))); + client.setDefaultRequestContentType("application/json"); + client.setFollowRedirects(false); + client.setMaxRequestsQueuedPerDestination(builder.connectionsPerEndpoint * builder.maxStreamsPerConnection); + client.setIdleTimeout(10000); + client.setMaxConnectionsPerDestination(builder.connectionsPerEndpoint); + client.setRequestBufferSize(1 << 16); + + client.start(); + return client; + } + catch (Exception e) { + throw new IllegalStateException(e); + } + } + + + @Override + public void dispatch(HttpRequest request, CompletableFuture<HttpResponse> vessel) { + int index = 0; + int min = Integer.MAX_VALUE; + for (int i = 0; i < endpoints.size(); i++) + if (endpoints.get(i).inflight.get() < min) { + index = i; + min = endpoints.get(i).inflight.get(); + } + + Endpoint endpoint = endpoints.get(index); + endpoint.inflight.incrementAndGet(); + try { + Request jettyRequest = endpoint.client.newRequest(endpoint.uri.resolve(request.path())) + .method(request.method()) + .timeout(5, TimeUnit.MINUTES) + .content(request.body() == null ? null : new BytesContentProvider("application/json", request.body())); + request.headers().forEach((name, value) -> jettyRequest.header(name, value.get())); + jettyRequest.send(new BufferingResponseListener() { + @Override public void onComplete(Result result) { + if (result.isSucceeded()) + vessel.complete(HttpResponse.of(result.getResponse().getStatus(), + getContent())); + else + vessel.completeExceptionally(result.getFailure()); + } + }); + } + catch (Throwable thrown) { + vessel.completeExceptionally(thrown); + } + vessel.whenComplete((__, ___) -> endpoint.inflight.decrementAndGet()); + } + + @Override + public void close() { + Throwable thrown = null; + for (Endpoint endpoint : endpoints) + try { + endpoint.client.stop(); + } + catch (Throwable t) { + if (thrown == null) thrown = t; + else thrown.addSuppressed(t); + } + if (thrown != null) throw new RuntimeException(thrown); + } + + + private static class Endpoint { + + private final HttpClient client; + private final AtomicInteger inflight = new AtomicInteger(0); + private final URI uri; + + private Endpoint(HttpClient client, URI uri) { + this.client = client; + this.uri = uri; + } + + } +} diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java index de32e7abdf5..b3a7aca1808 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java @@ -74,6 +74,31 @@ public class JsonFeeder implements Closeable { public static Builder builder(FeedClient client) { return new Builder(client); } + /** Feeds single JSON feed operations on the form + * <pre> + * { + * "id": "id:ns:type::boo", + * "fields": { ... document fields ... } + * } + * </pre> + */ + public CompletableFuture<Result> feedSingle(String json) { + CompletableFuture<Result> result = new CompletableFuture<>(); + try { + SingleOperationParserAndExecutor parser = new SingleOperationParserAndExecutor(json.getBytes(UTF_8)); + parser.next().whenCompleteAsync((operationResult, error) -> { + if (error != null) { + result.completeExceptionally(error); + } else { + result.complete(operationResult); + } + }, resultExecutor); + } catch (Exception e) { + resultExecutor.execute(() -> result.completeExceptionally(e)); + } + return result; + } + /** Feeds a stream containing a JSON array of feed operations on the form * <pre> * [ @@ -288,87 +313,111 @@ public class JsonFeeder implements Closeable { } } + private class SingleOperationParserAndExecutor extends OperationParserAndExecutor { + + private final byte[] json; + + SingleOperationParserAndExecutor(byte[] json) throws IOException { + super(factory.createParser(json), false); + this.json = json; + } + + @Override + String getDocumentJson(long start, long end) { + return new String(json, (int) start, (int) (end - start), UTF_8); + } + } + private abstract class OperationParserAndExecutor { private final JsonParser parser; private final boolean multipleOperations; + private boolean arrayPrefixParsed; protected OperationParserAndExecutor(JsonParser parser, boolean multipleOperations) throws IOException { this.parser = parser; this.multipleOperations = multipleOperations; - if (multipleOperations) expect(START_ARRAY); } abstract String getDocumentJson(long start, long end); CompletableFuture<Result> next() throws IOException { - JsonToken token = parser.nextToken(); - if (token == END_ARRAY && multipleOperations) return null; - else if (token == null && !multipleOperations) return null; - else if (token == START_OBJECT); - else throw new IllegalArgumentException("Unexpected token '" + parser.currentToken() + "' at offset " + parser.getTokenLocation().getByteOffset()); - long start = 0, end = -1; - OperationType type = null; - DocumentId id = null; - OperationParameters parameters = protoParameters; - loop: while (true) { - switch (parser.nextToken()) { - case FIELD_NAME: - switch (parser.getText()) { - case "id": - case "put": type = PUT; id = readId(); break; - case "update": type = UPDATE; id = readId(); break; - case "remove": type = REMOVE; id = readId(); break; - case "condition": parameters = parameters.testAndSetCondition(readString()); break; - case "create": parameters = parameters.createIfNonExistent(readBoolean()); break; - case "fields": { - expect(START_OBJECT); - start = parser.getTokenLocation().getByteOffset(); - int depth = 1; - while (depth > 0) switch (parser.nextToken()) { - case START_OBJECT: ++depth; break; - case END_OBJECT: --depth; break; + try { + if (multipleOperations && !arrayPrefixParsed){ + expect(START_ARRAY); + arrayPrefixParsed = true; + } + + JsonToken token = parser.nextToken(); + if (token == END_ARRAY && multipleOperations) return null; + else if (token == null && !multipleOperations) return null; + else if (token == START_OBJECT); + else throw new JsonParseException("Unexpected token '" + parser.currentToken() + "' at offset " + parser.getTokenLocation().getByteOffset()); + long start = 0, end = -1; + OperationType type = null; + DocumentId id = null; + OperationParameters parameters = protoParameters; + loop: while (true) { + switch (parser.nextToken()) { + case FIELD_NAME: + switch (parser.getText()) { + case "id": + case "put": type = PUT; id = readId(); break; + case "update": type = UPDATE; id = readId(); break; + case "remove": type = REMOVE; id = readId(); break; + case "condition": parameters = parameters.testAndSetCondition(readString()); break; + case "create": parameters = parameters.createIfNonExistent(readBoolean()); break; + case "fields": { + expect(START_OBJECT); + start = parser.getTokenLocation().getByteOffset(); + int depth = 1; + while (depth > 0) switch (parser.nextToken()) { + case START_OBJECT: ++depth; break; + case END_OBJECT: --depth; break; + } + end = parser.getTokenLocation().getByteOffset() + 1; + break; } - end = parser.getTokenLocation().getByteOffset() + 1; - break; + default: throw new JsonParseException("Unexpected field name '" + parser.getText() + "' at offset " + + parser.getTokenLocation().getByteOffset()); } - default: throw new IllegalArgumentException("Unexpected field name '" + parser.getText() + "' at offset " + - parser.getTokenLocation().getByteOffset()); - } - break; + break; - case END_OBJECT: - break loop; + case END_OBJECT: + break loop; - default: - throw new IllegalArgumentException("Unexpected token '" + parser.currentToken() + "' at offset " + - parser.getTokenLocation().getByteOffset()); + default: + throw new JsonParseException("Unexpected token '" + parser.currentToken() + "' at offset " + + parser.getTokenLocation().getByteOffset()); + } } - } - if (id == null) - throw new IllegalArgumentException("No document id for document at offset " + start); - - if (end < start) - throw new IllegalArgumentException("No 'fields' object for document at offset " + parser.getTokenLocation().getByteOffset()); - String payload = getDocumentJson(start, end); - switch (type) { - case PUT: return client.put (id, payload, parameters); - case UPDATE: return client.update(id, payload, parameters); - case REMOVE: return client.remove(id, parameters); - default: throw new IllegalStateException("Unexpected operation type '" + type + "'"); + if (id == null) + throw new JsonParseException("No document id for document at offset " + start); + + if (end < start) + throw new JsonParseException("No 'fields' object for document at offset " + parser.getTokenLocation().getByteOffset()); + String payload = getDocumentJson(start, end); + switch (type) { + case PUT: return client.put (id, payload, parameters); + case UPDATE: return client.update(id, payload, parameters); + case REMOVE: return client.remove(id, parameters); + default: throw new JsonParseException("Unexpected operation type '" + type + "'"); + } + } catch (com.fasterxml.jackson.core.JacksonException e) { + throw new JsonParseException("Failed to parse JSON", e); } } - void expect(JsonToken token) throws IOException { + private void expect(JsonToken token) throws IOException { if (parser.nextToken() != token) - throw new IllegalArgumentException("Expected '" + token + "' at offset " + parser.getTokenLocation().getByteOffset() + + throw new JsonParseException("Expected '" + token + "' at offset " + parser.getTokenLocation().getByteOffset() + ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")"); } private String readString() throws IOException { String value = parser.nextTextValue(); if (value == null) - throw new IllegalArgumentException("Expected '" + VALUE_STRING + "' at offset " + parser.getTokenLocation().getByteOffset() + + throw new JsonParseException("Expected '" + VALUE_STRING + "' at offset " + parser.getTokenLocation().getByteOffset() + ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")"); return value; @@ -377,7 +426,7 @@ public class JsonFeeder implements Closeable { private boolean readBoolean() throws IOException { Boolean value = parser.nextBooleanValue(); if (value == null) - throw new IllegalArgumentException("Expected '" + VALUE_FALSE + "' or '" + VALUE_TRUE + "' at offset " + parser.getTokenLocation().getByteOffset() + + throw new JsonParseException("Expected '" + VALUE_FALSE + "' or '" + VALUE_TRUE + "' at offset " + parser.getTokenLocation().getByteOffset() + ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")"); return value; diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonParseException.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonParseException.java new file mode 100644 index 00000000000..785ffd8eb4c --- /dev/null +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonParseException.java @@ -0,0 +1,13 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client; + +/** + * @author bjorncs + */ +public class JsonParseException extends FeedException { + + public JsonParseException(String message) { super(message); } + + public JsonParseException(String message, Throwable cause) { super(message, cause); } + +} diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/SslContextBuilder.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/SslContextBuilder.java index 7200d5fd943..9114e22f4a6 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/SslContextBuilder.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/SslContextBuilder.java @@ -20,11 +20,14 @@ import java.nio.file.Path; import java.security.GeneralSecurityException; import java.security.KeyFactory; import java.security.KeyStore; +import java.security.KeyStoreException; import java.security.PrivateKey; import java.security.cert.Certificate; import java.security.cert.X509Certificate; import java.security.spec.PKCS8EncodedKeySpec; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.List; /** @@ -39,6 +42,9 @@ class SslContextBuilder { private Path certificateFile; private Path privateKeyFile; private Path caCertificatesFile; + private Collection<X509Certificate> certificate; + private PrivateKey privateKey; + private Collection<X509Certificate> caCertificates; SslContextBuilder withCertificateAndKey(Path certificate, Path privateKey) { this.certificateFile = certificate; @@ -46,20 +52,35 @@ class SslContextBuilder { return this; } + SslContextBuilder withCertificateAndKey(Collection<X509Certificate> certificate, PrivateKey privateKey) { + this.certificate = certificate; + this.privateKey = privateKey; + return this; + } + SslContextBuilder withCaCertificates(Path caCertificates) { this.caCertificatesFile = caCertificates; return this; } + SslContextBuilder withCaCertificates(Collection<X509Certificate> caCertificates) { + this.caCertificates = caCertificates; + return this; + } + SSLContext build() throws IOException { try { KeyStore keystore = KeyStore.getInstance("PKCS12"); keystore.load(null); if (certificateFile != null && privateKeyFile != null) { keystore.setKeyEntry("cert", privateKey(privateKeyFile), new char[0], certificates(certificateFile)); + } else if (certificate != null && privateKey != null) { + keystore.setKeyEntry("cert", privateKey, new char[0], certificate.toArray(new Certificate[0])); } if (caCertificatesFile != null) { - keystore.setCertificateEntry("ca-cert", certificates(caCertificatesFile)[0]); + addCaCertificates(keystore, Arrays.asList(certificates(caCertificatesFile))); + } else if (caCertificates != null) { + addCaCertificates(keystore, caCertificates); } KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); kmf.init(keystore, new char[0]); @@ -73,6 +94,13 @@ class SslContextBuilder { } } + private static void addCaCertificates(KeyStore keystore, Collection<? extends Certificate> certificates) throws KeyStoreException { + int i = 0; + for (Certificate cert : certificates) { + keystore.setCertificateEntry("ca-cert-" + ++i, cert); + } + } + private static Certificate[] certificates(Path file) throws IOException, GeneralSecurityException { try (PEMParser parser = new PEMParser(Files.newBufferedReader(file))) { List<X509Certificate> result = new ArrayList<>(); diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java index e2ae5bc7155..03194e23d47 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java @@ -11,6 +11,7 @@ import java.nio.file.StandardOpenOption; import java.util.HashSet; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -51,52 +52,72 @@ class JsonFeederTest { try (InputStream in = Files.newInputStream(tmpFile, StandardOpenOption.READ, StandardOpenOption.DELETE_ON_CLOSE)) { AtomicInteger resultsReceived = new AtomicInteger(); AtomicBoolean completedSuccessfully = new AtomicBoolean(); - Set<String> ids = new HashSet<>(); long startNanos = System.nanoTime(); - JsonFeeder.builder(new FeedClient() { + SimpleClient feedClient = new SimpleClient(); + JsonFeeder.builder(feedClient).build() + .feedMany(in, 1 << 7, + new JsonFeeder.ResultCallback() { // TODO: hangs when buffer is smaller than largest document + @Override + public void onNextResult(Result result, Throwable error) { resultsReceived.incrementAndGet(); } - @Override - public CompletableFuture<Result> put(DocumentId documentId, String documentJson, OperationParameters params) { - ids.add(documentId.userSpecific()); - return createSuccessResult(documentId); - } + @Override + public void onError(Throwable error) { exceptionThrow.set(error); } - @Override - public CompletableFuture<Result> update(DocumentId documentId, String updateJson, OperationParameters params) { - return createSuccessResult(documentId); - } + @Override + public void onComplete() { completedSuccessfully.set(true); } + }) + .join(); - @Override - public CompletableFuture<Result> remove(DocumentId documentId, OperationParameters params) { - return createSuccessResult(documentId); - } + System.err.println((json.length() / 1048576.0) + " MB in " + (System.nanoTime() - startNanos) * 1e-9 + " seconds"); + assertEquals(docs + 1, feedClient.ids.size()); + assertEquals(docs + 1, resultsReceived.get()); + assertTrue(completedSuccessfully.get()); + assertNull(exceptionThrow.get()); + } + } - @Override - public OperationStats stats() { return null; } + @Test + public void singleJsonOperationIsDispatchedToFeedClient() throws IOException, ExecutionException, InterruptedException { + try (JsonFeeder feeder = JsonFeeder.builder(new SimpleClient()).build()) { + String json = "{\"put\": \"id:ns:type::abc1\",\n" + + " \"fields\": {\n" + + " \"lul\":\"lal\"\n" + + " }\n" + + " }\n"; + Result result = feeder.feedSingle(json).get(); + assertEquals(DocumentId.of("id:ns:type::abc1"), result.documentId()); + assertEquals(Result.Type.success, result.type()); + assertEquals("success", result.resultMessage().get()); + } + } - @Override - public void close(boolean graceful) { } + private static class SimpleClient implements FeedClient { + final Set<String> ids = new HashSet<>(); - private CompletableFuture<Result> createSuccessResult(DocumentId documentId) { - return CompletableFuture.completedFuture(new Result(Result.Type.success, documentId, "success", null)); - } + @Override + public CompletableFuture<Result> put(DocumentId documentId, String documentJson, OperationParameters params) { + ids.add(documentId.userSpecific()); + return createSuccessResult(documentId); + } - }).build().feedMany(in, 1 << 7, new JsonFeeder.ResultCallback() { // TODO: hangs when buffer is smaller than largest document - @Override - public void onNextResult(Result result, Throwable error) { resultsReceived.incrementAndGet(); } + @Override + public CompletableFuture<Result> update(DocumentId documentId, String updateJson, OperationParameters params) { + return createSuccessResult(documentId); + } - @Override - public void onError(Throwable error) { exceptionThrow.set(error); } + @Override + public CompletableFuture<Result> remove(DocumentId documentId, OperationParameters params) { + return createSuccessResult(documentId); + } - @Override - public void onComplete() { completedSuccessfully.set(true); } - }).join(); + @Override + public OperationStats stats() { return null; } - System.err.println((json.length() / 1048576.0) + " MB in " + (System.nanoTime() - startNanos) * 1e-9 + " seconds"); - assertEquals(docs + 1, ids.size()); - assertEquals(docs + 1, resultsReceived.get()); - assertTrue(completedSuccessfully.get()); - assertNull(exceptionThrow.get()); + @Override + public void close(boolean graceful) { } + + private CompletableFuture<Result> createSuccessResult(DocumentId documentId) { + return CompletableFuture.completedFuture(new Result(Result.Type.success, documentId, "success", null)); } } diff --git a/vespa-hadoop/pom.xml b/vespa-hadoop/pom.xml index 382c28dc884..39f10d84f9b 100644 --- a/vespa-hadoop/pom.xml +++ b/vespa-hadoop/pom.xml @@ -101,17 +101,22 @@ <scope>test</scope> </dependency> <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter</artifactId> <scope>test</scope> </dependency> - <!-- Vespa feeding dependencies --> <dependency> <groupId>com.yahoo.vespa</groupId> <artifactId>vespa-http-client</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>com.yahoo.vespa</groupId> + <artifactId>vespa-feed-client</artifactId> + <version>${project.version}</version> + <scope>compile</scope> + </dependency> </dependencies> diff --git a/vespa-hadoop/src/main/java/ai/vespa/feed/client/DryrunResult.java b/vespa-hadoop/src/main/java/ai/vespa/feed/client/DryrunResult.java new file mode 100644 index 00000000000..5974a8df271 --- /dev/null +++ b/vespa-hadoop/src/main/java/ai/vespa/feed/client/DryrunResult.java @@ -0,0 +1,18 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client; + +import ai.vespa.feed.client.Result.Type; + +/** + * Workaround for package-private {@link Result} constructor. + * + * @author bjorncs + */ +public class DryrunResult { + + private DryrunResult() {} + + public static Result create(Type type, DocumentId documentId, String resultMessage, String traceMessage) { + return new Result(type, documentId, resultMessage, traceMessage); + } +} diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/LegacyVespaRecordWriter.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/LegacyVespaRecordWriter.java new file mode 100644 index 00000000000..b716c55beb5 --- /dev/null +++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/LegacyVespaRecordWriter.java @@ -0,0 +1,235 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.hadoop.mapreduce; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration; +import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters; +import com.yahoo.vespa.hadoop.pig.VespaDocumentOperation; +import com.yahoo.vespa.http.client.FeedClient; +import com.yahoo.vespa.http.client.FeedClientFactory; +import com.yahoo.vespa.http.client.Result; +import com.yahoo.vespa.http.client.config.Cluster; +import com.yahoo.vespa.http.client.config.ConnectionParams; +import com.yahoo.vespa.http.client.config.Endpoint; +import com.yahoo.vespa.http.client.config.FeedParams; +import com.yahoo.vespa.http.client.config.FeedParams.DataFormat; +import com.yahoo.vespa.http.client.config.SessionParams; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import javax.xml.namespace.QName; +import javax.xml.stream.FactoryConfigurationError; +import javax.xml.stream.XMLEventReader; +import javax.xml.stream.XMLInputFactory; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.events.StartElement; +import javax.xml.stream.events.XMLEvent; +import java.io.IOException; +import java.io.StringReader; +import java.time.Duration; +import java.util.List; +import java.util.StringTokenizer; +import java.util.concurrent.ThreadLocalRandom; +import java.util.logging.Logger; + +/** + * {@link LegacyVespaRecordWriter} sends the output <key, value> to one or more Vespa endpoints using vespa-http-client. + * + * @author lesters + */ +@SuppressWarnings("rawtypes") +public class LegacyVespaRecordWriter extends RecordWriter { + + private final static Logger log = Logger.getLogger(LegacyVespaRecordWriter.class.getCanonicalName()); + + private boolean initialized = false; + private FeedClient feedClient; + private final VespaCounters counters; + private final int progressInterval; + + final VespaConfiguration configuration; + + LegacyVespaRecordWriter(VespaConfiguration configuration, VespaCounters counters) { + this.counters = counters; + this.configuration = configuration; + this.progressInterval = configuration.progressInterval(); + } + + + @Override + public void write(Object key, Object data) throws IOException, InterruptedException { + if (!initialized) { + initialize(); + } + + String doc = data.toString().trim(); + + // Parse data to find document id - if none found, skip this write + String docId = DataFormat.JSON_UTF8.equals(configuration.dataFormat()) ? findDocId(doc) + : findDocIdFromXml(doc); + if (docId != null && docId.length() >= 0) { + feedClient.stream(docId, doc); + counters.incrementDocumentsSent(1); + } else { + counters.incrementDocumentsSkipped(1); + } + + if (counters.getDocumentsSent() % progressInterval == 0) { + String progress = String.format("Feed progress: %d / %d / %d / %d (sent, ok, failed, skipped)", + counters.getDocumentsSent(), + counters.getDocumentsOk(), + counters.getDocumentsFailed(), + counters.getDocumentsSkipped()); + log.info(progress); + } + + } + + + @Override + public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { + if (feedClient != null) { + feedClient.close(); + } + } + + protected ConnectionParams.Builder configureConnectionParams() { + ConnectionParams.Builder connParamsBuilder = new ConnectionParams.Builder(); + connParamsBuilder.setDryRun(configuration.dryrun()); + connParamsBuilder.setUseCompression(configuration.useCompression()); + connParamsBuilder.setNumPersistentConnectionsPerEndpoint(configuration.numConnections()); + connParamsBuilder.setMaxRetries(configuration.numRetries()); + if (configuration.proxyHost() != null) { + connParamsBuilder.setProxyHost(configuration.proxyHost()); + } + if (configuration.proxyPort() >= 0) { + connParamsBuilder.setProxyPort(configuration.proxyPort()); + } + return connParamsBuilder; + } + + protected FeedParams.Builder configureFeedParams() { + FeedParams.Builder feedParamsBuilder = new FeedParams.Builder(); + feedParamsBuilder.setDataFormat(configuration.dataFormat()); + feedParamsBuilder.setRoute(configuration.route()); + feedParamsBuilder.setMaxSleepTimeMs(configuration.maxSleepTimeMs()); + feedParamsBuilder.setMaxInFlightRequests(configuration.maxInFlightRequests()); + feedParamsBuilder.setLocalQueueTimeOut(Duration.ofMinutes(10).toMillis()); + return feedParamsBuilder; + } + + protected SessionParams.Builder configureSessionParams() { + SessionParams.Builder sessionParamsBuilder = new SessionParams.Builder(); + sessionParamsBuilder.setThrottlerMinSize(configuration.throttlerMinSize()); + sessionParamsBuilder.setClientQueueSize(configuration.maxInFlightRequests()*2); + return sessionParamsBuilder; + } + + private void initialize() { + if (!configuration.dryrun() && configuration.randomStartupSleepMs() > 0) { + int delay = ThreadLocalRandom.current().nextInt(configuration.randomStartupSleepMs()); + log.info("VespaStorage: Delaying startup by " + delay + " ms"); + try { + Thread.sleep(delay); + } catch (Exception e) {} + } + + ConnectionParams.Builder connParamsBuilder = configureConnectionParams(); + FeedParams.Builder feedParamsBuilder = configureFeedParams(); + SessionParams.Builder sessionParams = configureSessionParams(); + + sessionParams.setConnectionParams(connParamsBuilder.build()); + sessionParams.setFeedParams(feedParamsBuilder.build()); + + String endpoints = configuration.endpoint(); + StringTokenizer tokenizer = new StringTokenizer(endpoints, ","); + while (tokenizer.hasMoreTokens()) { + String endpoint = tokenizer.nextToken().trim(); + sessionParams.addCluster(new Cluster.Builder().addEndpoint( + Endpoint.create(endpoint, configuration.defaultPort(), configuration.useSSL()) + ).build()); + } + + ResultCallback resultCallback = new ResultCallback(counters); + feedClient = FeedClientFactory.create(sessionParams.build(), resultCallback); + + initialized = true; + log.info("VespaStorage configuration:\n" + configuration.toString()); + log.info(feedClient.getStatsAsJson()); + } + + private String findDocIdFromXml(String xml) { + try { + XMLEventReader eventReader = XMLInputFactory.newInstance().createXMLEventReader(new StringReader(xml)); + while (eventReader.hasNext()) { + XMLEvent event = eventReader.nextEvent(); + if (event.getEventType() == XMLEvent.START_ELEMENT) { + StartElement element = event.asStartElement(); + String elementName = element.getName().getLocalPart(); + if (VespaDocumentOperation.Operation.valid(elementName)) { + return element.getAttributeByName(QName.valueOf("documentid")).getValue(); + } + } + } + } catch (XMLStreamException | FactoryConfigurationError e) { + // as json dude does + return null; + } + return null; + } + + private String findDocId(String json) throws IOException { + JsonFactory factory = new JsonFactory(); + try(JsonParser parser = factory.createParser(json)) { + if (parser.nextToken() != JsonToken.START_OBJECT) { + return null; + } + while (parser.nextToken() != JsonToken.END_OBJECT) { + String fieldName = parser.getCurrentName(); + parser.nextToken(); + if (VespaDocumentOperation.Operation.valid(fieldName)) { + String docId = parser.getText(); + return docId; + } else { + parser.skipChildren(); + } + } + } catch (JsonParseException ex) { + return null; + } + return null; + } + + + static class ResultCallback implements FeedClient.ResultCallback { + final VespaCounters counters; + + public ResultCallback(VespaCounters counters) { + this.counters = counters; + } + + @Override + public void onCompletion(String docId, Result documentResult) { + if (!documentResult.isSuccess()) { + counters.incrementDocumentsFailed(1); + StringBuilder sb = new StringBuilder(); + sb.append("Problems with docid "); + sb.append(docId); + sb.append(": "); + List<Result.Detail> details = documentResult.getDetails(); + for (Result.Detail detail : details) { + sb.append(detail.toString()); + sb.append(" "); + } + log.warning(sb.toString()); + return; + } + counters.incrementDocumentsOk(1); + } + + } + +} diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java index bef51e9ae08..97bc7dc838e 100644 --- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java +++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java @@ -10,7 +10,7 @@ import java.util.Properties; /** * An output specification for writing to Vespa instances in a Map-Reduce job. - * Mainly returns an instance of a {@link VespaRecordWriter} that does the + * Mainly returns an instance of a {@link LegacyVespaRecordWriter} that does the * actual feeding to Vespa. * * @author lesters @@ -35,7 +35,9 @@ public class VespaOutputFormat extends OutputFormat { public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { VespaCounters counters = VespaCounters.get(context); VespaConfiguration configuration = VespaConfiguration.get(context.getConfiguration(), configOverride); - return new VespaRecordWriter(configuration, counters); + return configuration.useLegacyClient() + ? new LegacyVespaRecordWriter(configuration, counters) + : new VespaRecordWriter(configuration, counters); } diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java index 4cc93bfd538..73e10c39419 100644 --- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java +++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java @@ -1,83 +1,71 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.hadoop.mapreduce; -import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.core.JsonParseException; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonToken; +import ai.vespa.feed.client.DocumentId; +import ai.vespa.feed.client.DryrunResult; +import ai.vespa.feed.client.FeedClient; +import ai.vespa.feed.client.FeedClientBuilder; +import ai.vespa.feed.client.JsonFeeder; +import ai.vespa.feed.client.JsonParseException; +import ai.vespa.feed.client.OperationParameters; +import ai.vespa.feed.client.OperationStats; +import ai.vespa.feed.client.Result; import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration; import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters; -import com.yahoo.vespa.hadoop.pig.VespaDocumentOperation; -import com.yahoo.vespa.http.client.FeedClient; -import com.yahoo.vespa.http.client.FeedClientFactory; -import com.yahoo.vespa.http.client.Result; -import com.yahoo.vespa.http.client.config.Cluster; -import com.yahoo.vespa.http.client.config.ConnectionParams; -import com.yahoo.vespa.http.client.config.Endpoint; import com.yahoo.vespa.http.client.config.FeedParams; -import com.yahoo.vespa.http.client.config.FeedParams.DataFormat; -import com.yahoo.vespa.http.client.config.SessionParams; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import javax.xml.namespace.QName; -import javax.xml.stream.FactoryConfigurationError; -import javax.xml.stream.XMLEventReader; -import javax.xml.stream.XMLInputFactory; -import javax.xml.stream.XMLStreamException; -import javax.xml.stream.events.StartElement; -import javax.xml.stream.events.XMLEvent; import java.io.IOException; -import java.io.StringReader; +import java.net.URI; import java.time.Duration; +import java.util.Arrays; import java.util.List; -import java.util.StringTokenizer; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadLocalRandom; import java.util.logging.Logger; +import static java.util.stream.Collectors.toList; + /** - * VespaRecordWriter sends the output <key, value> to one or more Vespa endpoints. + * {@link VespaRecordWriter} sends the output <key, value> to one or more Vespa endpoints using vespa-feed-client. * - * @author lesters + * @author bjorncs */ -@SuppressWarnings("rawtypes") -public class VespaRecordWriter extends RecordWriter { +public class VespaRecordWriter extends RecordWriter<Object, Object> { private final static Logger log = Logger.getLogger(VespaRecordWriter.class.getCanonicalName()); - private boolean initialized = false; - private FeedClient feedClient; private final VespaCounters counters; - private final int progressInterval; + private final VespaConfiguration config; - final VespaConfiguration configuration; + private boolean initialized = false; + private JsonFeeder feeder; - VespaRecordWriter(VespaConfiguration configuration, VespaCounters counters) { + protected VespaRecordWriter(VespaConfiguration config, VespaCounters counters) { this.counters = counters; - this.configuration = configuration; - this.progressInterval = configuration.progressInterval(); + this.config = config; } - @Override - public void write(Object key, Object data) throws IOException, InterruptedException { - if (!initialized) { - initialize(); - } - - String doc = data.toString().trim(); - - // Parse data to find document id - if none found, skip this write - String docId = DataFormat.JSON_UTF8.equals(configuration.dataFormat()) ? findDocId(doc) - : findDocIdFromXml(doc); - if (docId != null && docId.length() >= 0) { - feedClient.stream(docId, doc); - counters.incrementDocumentsSent(1); - } else { - counters.incrementDocumentsSkipped(1); - } - - if (counters.getDocumentsSent() % progressInterval == 0) { + public void write(Object key, Object data) throws IOException { + initializeOnFirstWrite(); + String json = data.toString().trim(); + feeder.feedSingle(json) + .whenComplete((result, error) -> { + if (error != null) { + if (error instanceof JsonParseException) { + counters.incrementDocumentsSkipped(1); + } else { + log.warning("Failed to feed single document: " + error); + counters.incrementDocumentsFailed(1); + } + } else { + counters.incrementDocumentsOk(1); + } + }); + counters.incrementDocumentsSent(1); + if (counters.getDocumentsSent() % config.progressInterval() == 0) { String progress = String.format("Feed progress: %d / %d / %d / %d (sent, ok, failed, skipped)", counters.getDocumentsSent(), counters.getDocumentsOk(), @@ -85,151 +73,115 @@ public class VespaRecordWriter extends RecordWriter { counters.getDocumentsSkipped()); log.info(progress); } - } - @Override - public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { - if (feedClient != null) { - feedClient.close(); + public void close(TaskAttemptContext context) throws IOException { + if (feeder != null) { + feeder.close(); + feeder = null; + initialized = false; } } - protected ConnectionParams.Builder configureConnectionParams() { - ConnectionParams.Builder connParamsBuilder = new ConnectionParams.Builder(); - connParamsBuilder.setDryRun(configuration.dryrun()); - connParamsBuilder.setUseCompression(configuration.useCompression()); - connParamsBuilder.setNumPersistentConnectionsPerEndpoint(configuration.numConnections()); - connParamsBuilder.setMaxRetries(configuration.numRetries()); - if (configuration.proxyHost() != null) { - connParamsBuilder.setProxyHost(configuration.proxyHost()); - } - if (configuration.proxyPort() >= 0) { - connParamsBuilder.setProxyPort(configuration.proxyPort()); - } - return connParamsBuilder; - } + /** Override method to alter {@link FeedClient} configuration */ + protected void onFeedClientInitialization(FeedClientBuilder builder) {} - protected FeedParams.Builder configureFeedParams() { - FeedParams.Builder feedParamsBuilder = new FeedParams.Builder(); - feedParamsBuilder.setDataFormat(configuration.dataFormat()); - feedParamsBuilder.setRoute(configuration.route()); - feedParamsBuilder.setMaxSleepTimeMs(configuration.maxSleepTimeMs()); - feedParamsBuilder.setMaxInFlightRequests(configuration.maxInFlightRequests()); - feedParamsBuilder.setLocalQueueTimeOut(Duration.ofMinutes(10).toMillis()); - return feedParamsBuilder; + private void initializeOnFirstWrite() { + if (initialized) return; + validateConfig(); + useRandomizedStartupDelayIfEnabled(); + feeder = createJsonStreamFeeder(); + initialized = true; } - protected SessionParams.Builder configureSessionParams() { - SessionParams.Builder sessionParamsBuilder = new SessionParams.Builder(); - sessionParamsBuilder.setThrottlerMinSize(configuration.throttlerMinSize()); - sessionParamsBuilder.setClientQueueSize(configuration.maxInFlightRequests()*2); - return sessionParamsBuilder; + private void validateConfig() { + if (!config.useSSL()) { + throw new IllegalArgumentException("SSL is required for this feed client implementation"); + } + if (config.dataFormat() != FeedParams.DataFormat.JSON_UTF8) { + throw new IllegalArgumentException("Only JSON is support by this feed client implementation"); + } + if (config.proxyHost() != null) { + log.warning(String.format("Ignoring proxy config (host='%s', port=%d)", config.proxyHost(), config.proxyPort())); + } } - - private void initialize() { - if (!configuration.dryrun() && configuration.randomStartupSleepMs() > 0) { - int delay = ThreadLocalRandom.current().nextInt(configuration.randomStartupSleepMs()); - log.info("VespaStorage: Delaying startup by " + delay + " ms"); + + private void useRandomizedStartupDelayIfEnabled() { + if (!config.dryrun() && config.randomStartupSleepMs() > 0) { + int delay = ThreadLocalRandom.current().nextInt(config.randomStartupSleepMs()); + log.info("Delaying startup by " + delay + " ms"); try { Thread.sleep(delay); } catch (Exception e) {} } + } - ConnectionParams.Builder connParamsBuilder = configureConnectionParams(); - FeedParams.Builder feedParamsBuilder = configureFeedParams(); - SessionParams.Builder sessionParams = configureSessionParams(); - - sessionParams.setConnectionParams(connParamsBuilder.build()); - sessionParams.setFeedParams(feedParamsBuilder.build()); - String endpoints = configuration.endpoint(); - StringTokenizer tokenizer = new StringTokenizer(endpoints, ","); - while (tokenizer.hasMoreTokens()) { - String endpoint = tokenizer.nextToken().trim(); - sessionParams.addCluster(new Cluster.Builder().addEndpoint( - Endpoint.create(endpoint, configuration.defaultPort(), configuration.useSSL()) - ).build()); + private JsonFeeder createJsonStreamFeeder() { + FeedClient feedClient = createFeedClient(); + JsonFeeder.Builder builder = JsonFeeder.builder(feedClient) + .withTimeout(Duration.ofMinutes(10)); + if (config.route() != null) { + builder.withRoute(config.route()); } + return builder.build(); - ResultCallback resultCallback = new ResultCallback(counters); - feedClient = FeedClientFactory.create(sessionParams.build(), resultCallback); - - initialized = true; - log.info("VespaStorage configuration:\n" + configuration.toString()); - log.info(feedClient.getStatsAsJson()); } - private String findDocIdFromXml(String xml) { - try { - XMLEventReader eventReader = XMLInputFactory.newInstance().createXMLEventReader(new StringReader(xml)); - while (eventReader.hasNext()) { - XMLEvent event = eventReader.nextEvent(); - if (event.getEventType() == XMLEvent.START_ELEMENT) { - StartElement element = event.asStartElement(); - String elementName = element.getName().getLocalPart(); - if (VespaDocumentOperation.Operation.valid(elementName)) { - return element.getAttributeByName(QName.valueOf("documentid")).getValue(); - } - } - } - } catch (XMLStreamException | FactoryConfigurationError e) { - // as json dude does - return null; + private FeedClient createFeedClient() { + if (config.dryrun()) { + return new DryrunClient(); + } else { + FeedClientBuilder feedClientBuilder = FeedClientBuilder.create(endpointUris(config)) + .setConnectionsPerEndpoint(config.numConnections()) + .setMaxStreamPerConnection(streamsPerConnection(config)) + .setRetryStrategy(retryStrategy(config)); + + onFeedClientInitialization(feedClientBuilder); + return feedClientBuilder.build(); } - return null; } - - private String findDocId(String json) throws IOException { - JsonFactory factory = new JsonFactory(); - try(JsonParser parser = factory.createParser(json)) { - if (parser.nextToken() != JsonToken.START_OBJECT) { - return null; - } - while (parser.nextToken() != JsonToken.END_OBJECT) { - String fieldName = parser.getCurrentName(); - parser.nextToken(); - if (VespaDocumentOperation.Operation.valid(fieldName)) { - String docId = parser.getText(); - return docId; - } else { - parser.skipChildren(); - } - } - } catch (JsonParseException ex) { - return null; - } - return null; + + private static FeedClient.RetryStrategy retryStrategy(VespaConfiguration config) { + int maxRetries = config.numRetries(); + return new FeedClient.RetryStrategy() { + @Override public int retries() { return maxRetries; } + }; } + private static int streamsPerConnection(VespaConfiguration config) { + return Math.min(256, config.maxInFlightRequests() / config.numConnections()); + } + + private static List<URI> endpointUris(VespaConfiguration config) { + return Arrays.stream(config.endpoint().split(",")) + .map(hostname -> URI.create(String.format("https://%s:%d/", hostname, config.defaultPort()))) + .collect(toList()); + } - static class ResultCallback implements FeedClient.ResultCallback { - final VespaCounters counters; + private static class DryrunClient implements FeedClient { - public ResultCallback(VespaCounters counters) { - this.counters = counters; + @Override + public CompletableFuture<Result> put(DocumentId documentId, String documentJson, OperationParameters params) { + return createSuccessResult(documentId); } @Override - public void onCompletion(String docId, Result documentResult) { - if (!documentResult.isSuccess()) { - counters.incrementDocumentsFailed(1); - StringBuilder sb = new StringBuilder(); - sb.append("Problems with docid "); - sb.append(docId); - sb.append(": "); - List<Result.Detail> details = documentResult.getDetails(); - for (Result.Detail detail : details) { - sb.append(detail.toString()); - sb.append(" "); - } - log.warning(sb.toString()); - return; - } - counters.incrementDocumentsOk(1); + public CompletableFuture<Result> update(DocumentId documentId, String updateJson, OperationParameters params) { + return createSuccessResult(documentId); } - } + @Override + public CompletableFuture<Result> remove(DocumentId documentId, OperationParameters params) { + return createSuccessResult(documentId); + } + + @Override public OperationStats stats() { return null; } + @Override public void close(boolean graceful) {} + private static CompletableFuture<Result> createSuccessResult(DocumentId documentId) { + return CompletableFuture.completedFuture(DryrunResult.create(Result.Type.success, documentId, "ok", null)); + } + } } diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java index 2a1179dbec6..7219e621486 100644 --- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java +++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java @@ -27,6 +27,7 @@ public class VespaConfiguration { public static final String MAX_IN_FLIGHT_REQUESTS = "vespa.feed.max.in.flight.requests"; public static final String RANDOM_STARTUP_SLEEP = "vespa.feed.random.startup.sleep.ms"; public static final String NUM_RETRIES = "vespa.feed.num.retries"; + public static final String USE_LEGACY_CLIENT = "vespa.feed.uselegacyclient"; private final Configuration conf; private final Properties override; @@ -130,6 +131,7 @@ public class VespaConfiguration { return getInt(PROGRESS_REPORT, 1000); } + public boolean useLegacyClient() { return getBoolean(USE_LEGACY_CLIENT, true); } public String getString(String name) { if (override != null && override.containsKey(name)) { diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/MapReduceTest.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/MapReduceTest.java index ebb34dbc1b1..fa7965acbc1 100644 --- a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/MapReduceTest.java +++ b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/MapReduceTest.java @@ -4,10 +4,9 @@ package com.yahoo.vespa.hadoop.pig; import com.fasterxml.jackson.core.JsonEncoding; import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonGenerator; +import com.yahoo.vespa.hadoop.mapreduce.VespaOutputFormat; import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration; import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters; -import com.yahoo.vespa.hadoop.mapreduce.VespaOutputFormat; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -17,22 +16,25 @@ import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.test.PathUtils; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; - -import java.io.*; -import java.util.*; +import java.io.BufferedInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.StringTokenizer; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; public class MapReduceTest { @@ -44,7 +46,7 @@ public class MapReduceTest { protected static Path metricsJsonPath; protected static Path metricsCsvPath; - @BeforeClass + @BeforeAll public static void setUp() throws IOException { hdfsBaseDir = new File(PathUtils.getTestDir(MapReduceTest.class).getCanonicalPath()); @@ -62,7 +64,7 @@ public class MapReduceTest { copyToHdfs("src/test/resources/tabular_data.csv", metricsCsvPath, "data"); } - @AfterClass + @AfterAll public static void tearDown() throws IOException { Path testDir = new Path(hdfsBaseDir.getParent()); hdfs.delete(testDir, true); @@ -82,7 +84,7 @@ public class MapReduceTest { FileInputFormat.setInputPaths(job, metricsJsonPath); boolean success = job.waitForCompletion(true); - assertTrue("Job Failed", success); + assertTrue(success, "Job Failed"); VespaCounters counters = VespaCounters.get(job); assertEquals(10, counters.getDocumentsSent()); @@ -103,7 +105,7 @@ public class MapReduceTest { FileInputFormat.setInputPaths(job, metricsJsonPath); boolean success = job.waitForCompletion(true); - assertTrue("Job Failed", success); + assertTrue(success, "Job Failed"); VespaCounters counters = VespaCounters.get(job); assertEquals(10, counters.getDocumentsSent()); @@ -125,7 +127,7 @@ public class MapReduceTest { FileInputFormat.setInputPaths(job, metricsCsvPath); boolean success = job.waitForCompletion(true); - assertTrue("Job Failed", success); + assertTrue(success, "Job Failed"); VespaCounters counters = VespaCounters.get(job); assertEquals(10, counters.getDocumentsSent()); diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java index bafeb593e4f..db2fab9b05e 100644 --- a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java +++ b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java @@ -12,9 +12,9 @@ import org.apache.pig.impl.logicalLayer.FrontendException; import org.apache.pig.impl.logicalLayer.schema.Schema; import org.codehaus.jackson.JsonNode; import org.codehaus.jackson.map.ObjectMapper; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -23,22 +23,22 @@ import java.util.HashMap; import java.util.Iterator; import java.util.Map; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; @SuppressWarnings("serial") public class VespaDocumentOperationTest { private final ByteArrayOutputStream outContent = new ByteArrayOutputStream(); private final PrintStream originalOut = System.out; - @Before + @BeforeEach public void setUpStreams() { System.setOut(new PrintStream(outContent)); } - @After + @AfterEach public void restoreStreams() { System.setOut(originalOut); } diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaQueryTest.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaQueryTest.java index 2d55017b13e..b0e2dd32c04 100644 --- a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaQueryTest.java +++ b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaQueryTest.java @@ -8,12 +8,16 @@ import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.pig.ExecType; import org.apache.pig.PigServer; import org.apache.pig.data.Tuple; -import org.junit.Test; +import org.junit.jupiter.api.Test; import java.net.InetSocketAddress; -import java.util.*; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; -import static org.junit.Assert.assertEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; public class VespaQueryTest { diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java index 7ca401a0cc8..3565db37126 100644 --- a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java +++ b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java @@ -1,14 +1,8 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.hadoop.pig; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.function.Supplier; - +import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration; +import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.mapred.Counters; @@ -18,11 +12,14 @@ import org.apache.pig.backend.executionengine.ExecJob; import org.apache.pig.tools.pigstats.JobStats; import org.apache.pig.tools.pigstats.PigStats; import org.apache.pig.tools.pigstats.mapreduce.MRJobStats; -import org.junit.Test; +import org.junit.jupiter.api.Test; -import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration; -import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; public class VespaStorageTest { @@ -51,6 +48,13 @@ public class VespaStorageTest { assertAllDocumentsOk("src/test/pig/feed_operations_with_json_loader.pig"); } + @Test + public void requireThatPremadeOperationsWithJsonLoaderFeedAndNonLegacyClientSucceeds() throws Exception { + Configuration conf = new HdfsConfiguration(); + conf.set(VespaConfiguration.USE_SSL, Boolean.TRUE.toString()); + conf.set(VespaConfiguration.USE_LEGACY_CLIENT, Boolean.FALSE.toString()); + assertAllDocumentsOk("src/test/pig/feed_operations_with_json_loader.pig", conf); + } @Test public void requireThatCreateOperationsFeedSucceeds() throws Exception { diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/util/TupleToolsTest.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/util/TupleToolsTest.java index 27080a8b2af..93e6a0abfdd 100644 --- a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/util/TupleToolsTest.java +++ b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/util/TupleToolsTest.java @@ -6,11 +6,11 @@ import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.logicalLayer.schema.Schema; -import org.junit.Test; +import org.junit.jupiter.api.Test; import java.io.IOException; -import static org.junit.Assert.assertEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; public class TupleToolsTest { |