summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--config-model/src/main/java/com/yahoo/vespa/model/ConfigSentinel.java8
-rw-r--r--config-model/src/main/java/com/yahoo/vespa/model/content/ClusterResourceLimits.java8
-rw-r--r--config-model/src/main/java/com/yahoo/vespa/model/content/cluster/ContentCluster.java3
-rw-r--r--config-model/src/main/java/com/yahoo/vespa/model/content/cluster/DomResourceLimitsBuilder.java17
-rw-r--r--config-model/src/test/java/com/yahoo/vespa/model/content/ClusterResourceLimitsTest.java49
-rw-r--r--config-model/src/test/java/com/yahoo/vespa/model/content/FleetControllerClusterTest.java6
-rw-r--r--configd/src/apps/sentinel/connectivity.cpp35
-rw-r--r--configd/src/apps/sentinel/connectivity.h13
-rw-r--r--configdefinitions/src/vespa/sentinel.def8
-rw-r--r--configserver-client/src/test/java/ai/vespa/hosted/client/HttpConfigServerClientTest.java1
-rw-r--r--container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ConnectorFactory.java3
-rw-r--r--container-core/src/test/java/com/yahoo/jdisc/http/server/jetty/HttpServerTest.java15
-rw-r--r--controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/horizon/HorizonClient.java1
-rw-r--r--controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/noderepository/NodeHistory.java24
-rw-r--r--controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/horizon/HorizonApiHandler.java22
-rw-r--r--controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/DeploymentExpirerTest.java2
-rw-r--r--eval/CMakeLists.txt1
-rw-r--r--eval/src/apps/analyze_onnx_model/.gitignore1
-rw-r--r--eval/src/apps/analyze_onnx_model/CMakeLists.txt8
-rw-r--r--eval/src/apps/analyze_onnx_model/analyze_onnx_model.cpp59
-rw-r--r--jdisc-security-filters/src/main/java/com/yahoo/jdisc/http/filter/security/athenz/AthenzAuthorizationFilter.java79
-rw-r--r--jdisc-security-filters/src/main/java/com/yahoo/jdisc/http/filter/security/athenz/RequestResourceMapper.java13
-rw-r--r--jdisc-security-filters/src/test/java/com/yahoo/jdisc/http/filter/security/athenz/AthenzAuthorizationFilterTest.java4
-rw-r--r--metrics/src/tests/summetrictest.cpp41
-rw-r--r--metrics/src/vespa/metrics/metric.cpp7
-rw-r--r--metrics/src/vespa/metrics/metric.h2
-rw-r--r--metrics/src/vespa/metrics/summetric.h1
-rw-r--r--metrics/src/vespa/metrics/summetric.hpp13
-rw-r--r--node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeadmin/NodeAdminImpl.java21
-rw-r--r--node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImpl.java4
-rw-r--r--node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/ExpeditedChangeApplicationMaintainer.java (renamed from node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/OperatorChangeApplicationMaintainer.java)30
-rw-r--r--node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/MetricsReporter.java6
-rw-r--r--node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/NodeRepositoryMaintenance.java9
-rw-r--r--node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/ParkedExpirer.java63
-rw-r--r--node-repository/src/main/java/com/yahoo/vespa/hosted/provision/node/Agent.java1
-rw-r--r--node-repository/src/main/java/com/yahoo/vespa/hosted/provision/persistence/NodeSerializer.java2
-rw-r--r--node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/ExpeditedChangeApplicationMaintainerTest.java (renamed from node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/OperatorChangeApplicationMaintainerTest.java)10
-rw-r--r--node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/MetricsReporterTest.java24
-rw-r--r--node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/ParkedExpirerTest.java70
-rw-r--r--node-repository/src/test/java/com/yahoo/vespa/hosted/provision/restapi/responses/maintenance.json7
-rw-r--r--searchlib/src/tests/predicate/predicate_index_test.cpp20
-rw-r--r--searchlib/src/tests/queryeval/predicate/predicate_blueprint_test.cpp56
-rw-r--r--searchlib/src/vespa/searchlib/attribute/predicate_attribute.cpp2
-rw-r--r--searchlib/src/vespa/searchlib/common/bitvectorcache.h1
-rw-r--r--searchlib/src/vespa/searchlib/predicate/predicate_index.h5
-rw-r--r--searchlib/src/vespa/searchlib/predicate/simple_index.h2
-rw-r--r--searchlib/src/vespa/searchlib/queryeval/predicate_blueprint.h31
-rw-r--r--storage/src/tests/distributor/distributor_host_info_reporter_test.cpp68
-rw-r--r--storage/src/tests/distributor/simplemaintenancescannertest.cpp85
-rw-r--r--storage/src/vespa/storage/distributor/CMakeLists.txt3
-rw-r--r--storage/src/vespa/storage/distributor/bucket_spaces_stats_provider.cpp40
-rw-r--r--storage/src/vespa/storage/distributor/bucket_spaces_stats_provider.h21
-rw-r--r--storage/src/vespa/storage/distributor/distributor.cpp46
-rw-r--r--storage/src/vespa/storage/distributor/distributor.h6
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/distributor_total_metrics.cpp40
-rw-r--r--storage/src/vespa/storage/distributor/distributor_total_metrics.h27
-rw-r--r--storage/src/vespa/storage/distributor/maintenance/node_maintenance_stats_tracker.cpp33
-rw-r--r--storage/src/vespa/storage/distributor/maintenance/node_maintenance_stats_tracker.h7
-rw-r--r--storage/src/vespa/storage/distributor/maintenance/simplemaintenancescanner.cpp22
-rw-r--r--storage/src/vespa/storage/distributor/maintenance/simplemaintenancescanner.h5
-rw-r--r--storage/src/vespa/storage/distributor/min_replica_provider.cpp19
-rw-r--r--storage/src/vespa/storage/distributor/min_replica_provider.h10
-rw-r--r--vespa-athenz/src/main/java/com/yahoo/vespa/athenz/api/AthenzAccessToken.java11
-rw-r--r--vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java4
-rw-r--r--vespa-feed-client/pom.xml12
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java18
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java80
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java55
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java2
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/JettyCluster.java123
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java157
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonParseException.java13
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/SslContextBuilder.java30
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java91
-rw-r--r--vespa-hadoop/pom.xml11
-rw-r--r--vespa-hadoop/src/main/java/ai/vespa/feed/client/DryrunResult.java18
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/LegacyVespaRecordWriter.java235
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java6
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java288
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java2
-rw-r--r--vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/MapReduceTest.java34
-rw-r--r--vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java18
-rw-r--r--vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaQueryTest.java10
-rw-r--r--vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java26
-rw-r--r--vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/util/TupleToolsTest.java4
86 files changed, 1920 insertions, 570 deletions
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/ConfigSentinel.java b/config-model/src/main/java/com/yahoo/vespa/model/ConfigSentinel.java
index 800bf73cdbb..d05913143e4 100644
--- a/config-model/src/main/java/com/yahoo/vespa/model/ConfigSentinel.java
+++ b/config-model/src/main/java/com/yahoo/vespa/model/ConfigSentinel.java
@@ -86,11 +86,11 @@ public class ConfigSentinel extends AbstractService implements SentinelConfig.Pr
private SentinelConfig.Connectivity.Builder getConnectivityConfig(boolean enable) {
var builder = new SentinelConfig.Connectivity.Builder();
if (enable) {
- builder.maxBadOutPercent(60);
- builder.maxBadReverseCount(3);
+ builder.minOkPercent(40);
+ builder.maxBadCount(3);
} else {
- builder.maxBadOutPercent(100);
- builder.maxBadReverseCount(Integer.MAX_VALUE);
+ builder.minOkPercent(0);
+ builder.maxBadCount(Integer.MAX_VALUE);
}
return builder;
}
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/content/ClusterResourceLimits.java b/config-model/src/main/java/com/yahoo/vespa/model/content/ClusterResourceLimits.java
index 8db656a5f2c..638864d85bb 100644
--- a/config-model/src/main/java/com/yahoo/vespa/model/content/ClusterResourceLimits.java
+++ b/config-model/src/main/java/com/yahoo/vespa/model/content/ClusterResourceLimits.java
@@ -1,6 +1,7 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.model.content;
+import com.yahoo.config.application.api.DeployLogger;
import com.yahoo.vespa.model.builder.xml.dom.ModelElement;
import com.yahoo.vespa.model.content.cluster.DomResourceLimitsBuilder;
@@ -37,13 +38,16 @@ public class ClusterResourceLimits {
private final boolean enableFeedBlockInDistributor;
private final boolean hostedVespa;
private final boolean throwIfSpecified;
+ private final DeployLogger deployLogger;
+
private ResourceLimits.Builder ctrlBuilder = new ResourceLimits.Builder();
private ResourceLimits.Builder nodeBuilder = new ResourceLimits.Builder();
- public Builder(boolean enableFeedBlockInDistributor, boolean hostedVespa, boolean throwIfSpecified) {
+ public Builder(boolean enableFeedBlockInDistributor, boolean hostedVespa, boolean throwIfSpecified, DeployLogger deployLogger) {
this.enableFeedBlockInDistributor = enableFeedBlockInDistributor;
this.hostedVespa = hostedVespa;
this.throwIfSpecified = throwIfSpecified;
+ this.deployLogger = deployLogger;
}
public ClusterResourceLimits build(ModelElement clusterElem) {
@@ -57,7 +61,7 @@ public class ClusterResourceLimits {
private ResourceLimits.Builder createBuilder(ModelElement element) {
return element == null
? new ResourceLimits.Builder()
- : DomResourceLimitsBuilder.createBuilder(element, hostedVespa, throwIfSpecified);
+ : DomResourceLimitsBuilder.createBuilder(element, hostedVespa, throwIfSpecified, deployLogger);
}
public void setClusterControllerBuilder(ResourceLimits.Builder builder) {
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/content/cluster/ContentCluster.java b/config-model/src/main/java/com/yahoo/vespa/model/content/cluster/ContentCluster.java
index e9264a6d9fc..c298b7f5f5a 100644
--- a/config-model/src/main/java/com/yahoo/vespa/model/content/cluster/ContentCluster.java
+++ b/config-model/src/main/java/com/yahoo/vespa/model/content/cluster/ContentCluster.java
@@ -123,7 +123,8 @@ public class ContentCluster extends AbstractConfigProducer<AbstractConfigProduce
boolean enableFeedBlockInDistributor = deployState.getProperties().featureFlags().enableFeedBlockInDistributor();
var resourceLimits = new ClusterResourceLimits.Builder(enableFeedBlockInDistributor,
stateIsHosted(deployState),
- deployState.featureFlags().throwIfResourceLimitsSpecified())
+ deployState.featureFlags().throwIfResourceLimitsSpecified(),
+ deployState.getDeployLogger())
.build(contentElement);
c.clusterControllerConfig = new ClusterControllerConfig.Builder(getClusterId(contentElement),
contentElement,
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/content/cluster/DomResourceLimitsBuilder.java b/config-model/src/main/java/com/yahoo/vespa/model/content/cluster/DomResourceLimitsBuilder.java
index bab991efe51..37adb73bc15 100644
--- a/config-model/src/main/java/com/yahoo/vespa/model/content/cluster/DomResourceLimitsBuilder.java
+++ b/config-model/src/main/java/com/yahoo/vespa/model/content/cluster/DomResourceLimitsBuilder.java
@@ -1,9 +1,12 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.model.content.cluster;
+import com.yahoo.config.application.api.DeployLogger;
import com.yahoo.vespa.model.builder.xml.dom.ModelElement;
import com.yahoo.vespa.model.content.ResourceLimits;
+import java.util.logging.Level;
+
/**
* Builder for feed block resource limits.
*
@@ -11,13 +14,21 @@ import com.yahoo.vespa.model.content.ResourceLimits;
*/
public class DomResourceLimitsBuilder {
- public static ResourceLimits.Builder createBuilder(ModelElement contentXml, boolean hostedVespa, boolean throwIfSpecified) {
+ public static ResourceLimits.Builder createBuilder(ModelElement contentXml,
+ boolean hostedVespa,
+ boolean throwIfSpecified,
+ DeployLogger deployLogger) {
ResourceLimits.Builder builder = new ResourceLimits.Builder();
ModelElement resourceLimits = contentXml.child("resource-limits");
if (resourceLimits == null) { return builder; }
- if (hostedVespa && throwIfSpecified)
- throw new IllegalArgumentException("Element '" + resourceLimits + "' is not allowed to be set");
+ if (hostedVespa) {
+ String message = "Element '" + resourceLimits + "' is not allowed to be set";
+ if (throwIfSpecified)
+ throw new IllegalArgumentException(message);
+ else
+ deployLogger.logApplicationPackage(Level.WARNING, message);
+ }
if (resourceLimits.child("disk") != null) {
builder.setDiskLimit(resourceLimits.childAsDouble("disk"));
diff --git a/config-model/src/test/java/com/yahoo/vespa/model/content/ClusterResourceLimitsTest.java b/config-model/src/test/java/com/yahoo/vespa/model/content/ClusterResourceLimitsTest.java
index ad1f5331a91..4324f257922 100644
--- a/config-model/src/test/java/com/yahoo/vespa/model/content/ClusterResourceLimitsTest.java
+++ b/config-model/src/test/java/com/yahoo/vespa/model/content/ClusterResourceLimitsTest.java
@@ -1,6 +1,9 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.model.content;
+import com.yahoo.config.application.api.DeployLogger;
+import com.yahoo.config.model.application.provider.BaseDeployLogger;
+import com.yahoo.searchdefinition.derived.TestableDeployLogger;
import com.yahoo.text.XML;
import com.yahoo.vespa.model.builder.xml.dom.ModelElement;
import org.junit.Rule;
@@ -49,7 +52,10 @@ public class ClusterResourceLimitsTest {
return this;
}
public ClusterResourceLimits build() {
- var builder = new ClusterResourceLimits.Builder(enableFeedBlockInDistributor, false, false);
+ var builder = new ClusterResourceLimits.Builder(enableFeedBlockInDistributor,
+ false,
+ false,
+ new BaseDeployLogger());
builder.setClusterControllerBuilder(ctrlBuilder);
builder.setContentNodeBuilder(nodeBuilder);
return builder.build();
@@ -125,24 +131,39 @@ public class ClusterResourceLimitsTest {
@Test
public void exception_is_thrown_when_resource_limits_are_specified() {
- final boolean hosted = true;
+ TestableDeployLogger logger = new TestableDeployLogger();
- Document clusterXml = XML.getDocument("<cluster id=\"test\">" +
- " <tuning>\n" +
- " <resource-limits>\n" +
- " <memory>0.92</memory>\n" +
- " </resource-limits>\n" +
- " </tuning>\n" +
- "</cluster>");
+ buildClusterResourceLimitsAndLogIfSpecified(logger);
+ assertEquals(1, logger.warnings.size());
+ assertEquals("Element 'resource-limits' is not allowed to be set", logger.warnings.get(0));
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage(containsString("Element 'resource-limits' is not allowed to be set"));
- ClusterResourceLimits.Builder builder = new ClusterResourceLimits.Builder(true, hosted, true);
- builder.build(new ModelElement(clusterXml.getDocumentElement()));
+ buildClusterResourceLimitsAndThrowIfSpecified(logger);
+ }
+
+ private void buildClusterResourceLimitsAndThrowIfSpecified(DeployLogger deployLogger) {
+ buildClusterResourceLimits(true, deployLogger);
+ }
- expectedException = ExpectedException.none();
- ClusterResourceLimits.Builder builder2 = new ClusterResourceLimits.Builder(true, hosted, false);
- builder2.build(new ModelElement(clusterXml.getDocumentElement()));
+ private void buildClusterResourceLimitsAndLogIfSpecified(DeployLogger deployLogger) {
+ buildClusterResourceLimits(false, deployLogger);
+ }
+
+ private void buildClusterResourceLimits(boolean throwIfSpecified, DeployLogger deployLogger) {
+ Document clusterXml = XML.getDocument("<cluster id=\"test\">" +
+ " <tuning>\n" +
+ " <resource-limits>\n" +
+ " <memory>0.92</memory>\n" +
+ " </resource-limits>\n" +
+ " </tuning>\n" +
+ "</cluster>");
+
+ ClusterResourceLimits.Builder builder = new ClusterResourceLimits.Builder(true,
+ true,
+ throwIfSpecified,
+ deployLogger);
+ builder.build(new ModelElement(clusterXml.getDocumentElement()));
}
private void assertLimits(Double expCtrlDisk, Double expCtrlMemory, Double expNodeDisk, Double expNodeMemory, Fixture f) {
diff --git a/config-model/src/test/java/com/yahoo/vespa/model/content/FleetControllerClusterTest.java b/config-model/src/test/java/com/yahoo/vespa/model/content/FleetControllerClusterTest.java
index 2eecfa9440e..10bb00168bb 100644
--- a/config-model/src/test/java/com/yahoo/vespa/model/content/FleetControllerClusterTest.java
+++ b/config-model/src/test/java/com/yahoo/vespa/model/content/FleetControllerClusterTest.java
@@ -1,6 +1,7 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.model.content;
+import com.yahoo.config.model.application.provider.BaseDeployLogger;
import com.yahoo.config.model.deploy.DeployState;
import com.yahoo.config.model.deploy.TestProperties;
import com.yahoo.config.model.test.MockRoot;
@@ -23,7 +24,10 @@ public class FleetControllerClusterTest {
var clusterElement = new ModelElement(doc.getDocumentElement());
return new ClusterControllerConfig.Builder("storage",
clusterElement,
- new ClusterResourceLimits.Builder(enableFeedBlockInDistributor, false, false)
+ new ClusterResourceLimits.Builder(enableFeedBlockInDistributor,
+ false,
+ false,
+ new BaseDeployLogger())
.build(clusterElement).getClusterControllerLimits())
.build(root.getDeployState(), root, clusterElement.getXml());
}
diff --git a/configd/src/apps/sentinel/connectivity.cpp b/configd/src/apps/sentinel/connectivity.cpp
index 8314a090616..c1c49e3068a 100644
--- a/configd/src/apps/sentinel/connectivity.cpp
+++ b/configd/src/apps/sentinel/connectivity.cpp
@@ -120,8 +120,8 @@ void Connectivity::configure(const SentinelConfig::Connectivity &config,
const ModelConfig &model)
{
_config = config;
- LOG(config, "connectivity.maxBadReverseCount = %d", _config.maxBadReverseCount);
- LOG(config, "connectivity.maxBadOutPercent = %d", _config.maxBadOutPercent);
+ LOG(config, "connectivity.maxBadCount = %d", _config.maxBadCount);
+ LOG(config, "connectivity.minOkPercent = %d", _config.minOkPercent);
_checkSpecs = specsFrom(model);
}
@@ -143,7 +143,7 @@ Connectivity::checkConnectivity(RpcServer &rpcServer) {
}
checkContext.latch.await();
classifyConnFails(connectivityMap, _checkSpecs, rpcServer);
- Accumulated accumulated;
+ Accumulator accumulated;
for (const auto & [hostname, check] : connectivityMap) {
std::string detail = toString(check.result());
std::string prev = _detailsPerHost[hostname];
@@ -152,42 +152,43 @@ Connectivity::checkConnectivity(RpcServer &rpcServer) {
}
_detailsPerHost[hostname] = detail;
LOG_ASSERT(check.result() != CcResult::UNKNOWN);
- accumulate(accumulated, check.result());
+ accumulated.handleResult(check.result());
}
- return enoughOk(accumulated, clusterSize);
+ return accumulated.enoughOk(_config);
}
-void Connectivity::accumulate(Accumulated &target, CcResult value) {
+void Connectivity::Accumulator::handleResult(CcResult value) {
+ ++_numHandled;
switch (value) {
case CcResult::UNKNOWN:
case CcResult::UNREACHABLE_UP:
case CcResult::INDIRECT_PING_FAIL:
- ++target.numSeriousIssues;
- ++target.numIssues;
+ ++_numBad;
break;
case CcResult::CONN_FAIL:
- ++target.numIssues;
+ // not OK, but not a serious issue either
break;
case CcResult::INDIRECT_PING_UNAVAIL:
case CcResult::ALL_OK:
+ ++_numOk;
break;
}
}
-bool Connectivity::enoughOk(const Accumulated &results, size_t clusterSize) {
+bool Connectivity::Accumulator::enoughOk(const SentinelConfig::Connectivity &config) const {
bool enough = true;
- if (results.numSeriousIssues > size_t(_config.maxBadReverseCount)) {
+ if (_numBad > size_t(config.maxBadCount)) {
LOG(warning, "%zu of %zu nodes up but with network connectivity problems (max is %d)",
- results.numSeriousIssues, clusterSize, _config.maxBadReverseCount);
+ _numBad, _numHandled, config.maxBadCount);
enough = false;
}
- if (results.numIssues * 100.0 > _config.maxBadOutPercent * clusterSize) {
- double pct = results.numIssues * 100.0 / clusterSize;
- LOG(warning, "Problems with connection to %zu of %zu nodes, %.1f%% (max is %d%%)",
- results.numIssues, clusterSize, pct, _config.maxBadOutPercent);
+ if (_numOk * 100.0 < config.minOkPercent * _numHandled) {
+ double pct = _numOk * 100.0 / _numHandled;
+ LOG(warning, "Only %zu of %zu nodes are up and OK, %.1f%% (min is %d%%)",
+ _numOk, _numHandled, pct, config.minOkPercent);
enough = false;
}
- if (results.numIssues == 0) {
+ if (_numOk == _numHandled) {
LOG(info, "All connectivity checks OK, proceeding with service startup");
} else if (enough) {
LOG(info, "Enough connectivity checks OK, proceeding with service startup");
diff --git a/configd/src/apps/sentinel/connectivity.h b/configd/src/apps/sentinel/connectivity.h
index 2ba17f5c07c..8d923387ffa 100644
--- a/configd/src/apps/sentinel/connectivity.h
+++ b/configd/src/apps/sentinel/connectivity.h
@@ -29,12 +29,15 @@ public:
bool checkConnectivity(RpcServer &rpcServer);
static SpecMap specsFrom(const ModelConfig &model);
private:
- struct Accumulated {
- size_t numIssues = 0;
- size_t numSeriousIssues = 0;
+ class Accumulator {
+ private:
+ size_t _numOk = 0;
+ size_t _numBad = 0;
+ size_t _numHandled = 0;
+ public:
+ void handleResult(CcResult value);
+ bool enoughOk(const SentinelConfig::Connectivity &config) const;
};
- void accumulate(Accumulated &target, CcResult value);
- bool enoughOk(const Accumulated &results, size_t clusterSize);
SentinelConfig::Connectivity _config;
SpecMap _checkSpecs;
std::map<std::string, std::string> _detailsPerHost;
diff --git a/configdefinitions/src/vespa/sentinel.def b/configdefinitions/src/vespa/sentinel.def
index 45ef9b21cfd..cf19e701717 100644
--- a/configdefinitions/src/vespa/sentinel.def
+++ b/configdefinitions/src/vespa/sentinel.def
@@ -22,11 +22,11 @@ application.region string default="default"
# those that can connect back to us. We delay starting services
# if we have more problems than the following limits allow:
-## Percentage we fail to talk to, maximum
-connectivity.maxBadOutPercent int default=100
+## Percentage of nodes that must be up and fully OK, minimum
+connectivity.minOkPercent int default=0
-## Absolute number of nodes that fail to talk back to us, maximum
-connectivity.maxBadReverseCount int default=999999999
+## Absolute number of nodes with confirmed network connectivity problems, maximum
+connectivity.maxBadCount int default=999999999
## The command to run. This will be run by sh -c, and the following
## environment variables are defined: $ROOT, $VESPA_SERVICE_NAME,
diff --git a/configserver-client/src/test/java/ai/vespa/hosted/client/HttpConfigServerClientTest.java b/configserver-client/src/test/java/ai/vespa/hosted/client/HttpConfigServerClientTest.java
index 956bc90380f..90fd6203a21 100644
--- a/configserver-client/src/test/java/ai/vespa/hosted/client/HttpConfigServerClientTest.java
+++ b/configserver-client/src/test/java/ai/vespa/hosted/client/HttpConfigServerClientTest.java
@@ -92,6 +92,7 @@ class HttpConfigServerClientTest {
assertEquals("GET http://localhost:" + server.port() + "/ failed with status 409 and body 'hi'", thrown.getMessage());
server.verify(1, getRequestedFor(urlEqualTo("/")));
server.verify(1, anyRequestedFor(anyUrl()));
+ server.resetRequests();
}
}
diff --git a/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ConnectorFactory.java b/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ConnectorFactory.java
index ad682273d0d..92d2cc5d1cd 100644
--- a/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ConnectorFactory.java
+++ b/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ConnectorFactory.java
@@ -8,6 +8,7 @@ import com.yahoo.jdisc.http.ssl.SslContextFactoryProvider;
import com.yahoo.security.tls.MixedMode;
import com.yahoo.security.tls.TransportSecurityUtils;
import org.eclipse.jetty.alpn.server.ALPNServerConnectionFactory;
+import org.eclipse.jetty.http2.parser.RateControl;
import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory;
import org.eclipse.jetty.server.ConnectionFactory;
import org.eclipse.jetty.server.DetectorConnectionFactory;
@@ -164,6 +165,8 @@ public class ConnectorFactory {
HTTP2ServerConnectionFactory factory = new HTTP2ServerConnectionFactory(newHttpConfiguration());
factory.setStreamIdleTimeout(toMillis(connectorConfig.http2().streamIdleTimeout()));
factory.setMaxConcurrentStreams(connectorConfig.http2().maxConcurrentStreams());
+ factory.setInitialSessionRecvWindow(1 << 24);
+ factory.setInitialStreamRecvWindow(1 << 20);
return factory;
}
diff --git a/container-core/src/test/java/com/yahoo/jdisc/http/server/jetty/HttpServerTest.java b/container-core/src/test/java/com/yahoo/jdisc/http/server/jetty/HttpServerTest.java
index 0dec711f4c0..0f625b5c3df 100644
--- a/container-core/src/test/java/com/yahoo/jdisc/http/server/jetty/HttpServerTest.java
+++ b/container-core/src/test/java/com/yahoo/jdisc/http/server/jetty/HttpServerTest.java
@@ -41,6 +41,7 @@ import org.apache.hc.client5.http.entity.mime.FormBodyPart;
import org.apache.hc.client5.http.entity.mime.FormBodyPartBuilder;
import org.apache.hc.client5.http.entity.mime.StringBody;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
+import org.apache.hc.client5.http.impl.async.H2AsyncClientBuilder;
import org.apache.hc.client5.http.impl.async.HttpAsyncClientBuilder;
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
import org.apache.hc.client5.http.ssl.ClientTlsStrategyBuilder;
@@ -969,14 +970,12 @@ public class HttpServerTest {
private static CloseableHttpAsyncClient createHttp2Client(JettyTestDriver driver) {
TlsStrategy tlsStrategy = ClientTlsStrategyBuilder.create()
- .setSslContext(driver.sslContext())
- .build();
- var client = HttpAsyncClientBuilder.create()
- .setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_2)
- .disableConnectionState()
- .disableAutomaticRetries()
- .setConnectionManager(PoolingAsyncClientConnectionManagerBuilder.create().setTlsStrategy(tlsStrategy).build())
- .build();
+ .setSslContext(driver.sslContext())
+ .build();
+ var client = H2AsyncClientBuilder.create()
+ .disableAutomaticRetries()
+ .setTlsStrategy(tlsStrategy)
+ .build();
client.start();
return client;
}
diff --git a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/horizon/HorizonClient.java b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/horizon/HorizonClient.java
index bd6c2607fc2..c9f6f67df6c 100644
--- a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/horizon/HorizonClient.java
+++ b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/horizon/HorizonClient.java
@@ -1,7 +1,6 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.hosted.controller.api.integration.horizon;
-import com.yahoo.slime.Slime;
import java.io.InputStream;
/**
diff --git a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/noderepository/NodeHistory.java b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/noderepository/NodeHistory.java
index 624e4c61662..cf40ac00d64 100644
--- a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/noderepository/NodeHistory.java
+++ b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/noderepository/NodeHistory.java
@@ -17,7 +17,7 @@ public class NodeHistory {
@JsonProperty("at")
public Long at;
@JsonProperty("agent")
- public Agent agent;
+ public String agent;
@JsonProperty("event")
public String event;
@@ -25,7 +25,7 @@ public class NodeHistory {
return at;
}
- public Agent getAgent() {
+ public String getAgent() {
return agent;
}
@@ -33,24 +33,4 @@ public class NodeHistory {
return event;
}
- public enum Agent {
- operator,
- application,
- system,
- DirtyExpirer,
- DynamicProvisioningMaintainer,
- FailedExpirer,
- InactiveExpirer,
- NodeFailer,
- NodeHealthTracker,
- ProvisionedExpirer,
- Rebalancer,
- ReservationExpirer,
- RetiringUpgrader,
- RebuildingOsUpgrader,
- SpareCapacityMaintainer,
- SwitchRebalancer,
- HostEncrypter,
- }
-
}
diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/horizon/HorizonApiHandler.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/horizon/HorizonApiHandler.java
index 422b8f22000..be8613f5eff 100644
--- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/horizon/HorizonApiHandler.java
+++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/horizon/HorizonApiHandler.java
@@ -16,7 +16,9 @@ import com.yahoo.yolean.Exceptions;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.io.UncheckedIOException;
import java.util.Optional;
+import java.util.function.Supplier;
import java.util.logging.Level;
/**
@@ -57,10 +59,10 @@ public class HorizonApiHandler extends LoggingRequestHandler {
private HttpResponse get(HttpRequest request) {
Path path = new Path(request.getUri());
- if (path.matches("/horizon/v1/config/dashboard/topFolders")) return new JsonInputStreamResponse(client.getTopFolders());
- if (path.matches("/horizon/v1/config/dashboard/file/{id}")) return new JsonInputStreamResponse(client.getDashboard(path.get("id")));
- if (path.matches("/horizon/v1/config/dashboard/favorite")) return new JsonInputStreamResponse(client.getFavorite(request.getProperty("user")));
- if (path.matches("/horizon/v1/config/dashboard/recent")) return new JsonInputStreamResponse(client.getRecent(request.getProperty("user")));
+ if (path.matches("/horizon/v1/config/dashboard/topFolders")) return jsonInputStreamResponse(client::getTopFolders);
+ if (path.matches("/horizon/v1/config/dashboard/file/{id}")) return jsonInputStreamResponse(() -> client.getDashboard(path.get("id")));
+ if (path.matches("/horizon/v1/config/dashboard/favorite")) return jsonInputStreamResponse(() -> client.getFavorite(request.getProperty("user")));
+ if (path.matches("/horizon/v1/config/dashboard/recent")) return jsonInputStreamResponse(() -> client.getRecent(request.getProperty("user")));
return ErrorResponse.notFoundError("Nothing at " + path);
}
@@ -73,7 +75,7 @@ public class HorizonApiHandler extends LoggingRequestHandler {
private HttpResponse put(HttpRequest request) {
Path path = new Path(request.getUri());
- if (path.matches("/horizon/v1/config/user")) return new JsonInputStreamResponse(client.getUser());
+ if (path.matches("/horizon/v1/config/user")) return jsonInputStreamResponse(client::getUser);
return ErrorResponse.notFoundError("Nothing at " + path);
}
@@ -81,7 +83,7 @@ public class HorizonApiHandler extends LoggingRequestHandler {
SecurityContext securityContext = getAttribute(request, SecurityContext.ATTRIBUTE_NAME, SecurityContext.class);
try {
byte[] data = TsdbQueryRewriter.rewrite(request.getData().readAllBytes(), securityContext.roles(), systemName);
- return new JsonInputStreamResponse(client.getMetrics(data));
+ return jsonInputStreamResponse(() -> client.getMetrics(data));
} catch (TsdbQueryRewriter.UnauthorizedException e) {
return ErrorResponse.forbidden("Access denied");
} catch (IOException e) {
@@ -96,6 +98,14 @@ public class HorizonApiHandler extends LoggingRequestHandler {
.orElseThrow(() -> new IllegalArgumentException("Attribute '" + attributeName + "' was not set on request"));
}
+ private JsonInputStreamResponse jsonInputStreamResponse(Supplier<InputStream> supplier) {
+ try (InputStream inputStream = supplier.get()) {
+ return new JsonInputStreamResponse(inputStream);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
private static class JsonInputStreamResponse extends HttpResponse {
private final InputStream jsonInputStream;
diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/DeploymentExpirerTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/DeploymentExpirerTest.java
index 232521c9609..ffc82f90ad4 100644
--- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/DeploymentExpirerTest.java
+++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/DeploymentExpirerTest.java
@@ -70,7 +70,7 @@ public class DeploymentExpirerTest {
assertEquals(1, permanentDeployments(prodApp.instance()));
// Dev application expires when enough time has passed since most recent attempt
- tester.clock().advance(Duration.ofDays(12));
+ tester.clock().advance(Duration.ofDays(12).plus(Duration.ofSeconds(1)));
expirer.maintain();
assertEquals(0, permanentDeployments(devApp.instance()));
assertEquals(1, permanentDeployments(prodApp.instance()));
diff --git a/eval/CMakeLists.txt b/eval/CMakeLists.txt
index 302b6768cea..16c9c72d8a5 100644
--- a/eval/CMakeLists.txt
+++ b/eval/CMakeLists.txt
@@ -5,6 +5,7 @@ vespa_define_module(
staging_vespalib
APPS
+ src/apps/analyze_onnx_model
src/apps/eval_expr
src/apps/make_tensor_binary_format_test_spec
src/apps/tensor_conformance
diff --git a/eval/src/apps/analyze_onnx_model/.gitignore b/eval/src/apps/analyze_onnx_model/.gitignore
new file mode 100644
index 00000000000..12ce20b03ba
--- /dev/null
+++ b/eval/src/apps/analyze_onnx_model/.gitignore
@@ -0,0 +1 @@
+/vespa-analyze-onnx-model
diff --git a/eval/src/apps/analyze_onnx_model/CMakeLists.txt b/eval/src/apps/analyze_onnx_model/CMakeLists.txt
new file mode 100644
index 00000000000..47cbb6504f4
--- /dev/null
+++ b/eval/src/apps/analyze_onnx_model/CMakeLists.txt
@@ -0,0 +1,8 @@
+# Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_executable(vespa-analyze-onnx-model
+ SOURCES
+ analyze_onnx_model.cpp
+ INSTALL bin
+ DEPENDS
+ vespaeval
+)
diff --git a/eval/src/apps/analyze_onnx_model/analyze_onnx_model.cpp b/eval/src/apps/analyze_onnx_model/analyze_onnx_model.cpp
new file mode 100644
index 00000000000..f1cc3b28751
--- /dev/null
+++ b/eval/src/apps/analyze_onnx_model/analyze_onnx_model.cpp
@@ -0,0 +1,59 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/eval/onnx/onnx_wrapper.h>
+#include <vespa/vespalib/util/guard.h>
+
+using vespalib::FilePointer;
+using namespace vespalib::eval;
+
+bool read_line(FilePointer &file, vespalib::string &line) {
+ char line_buffer[1024];
+ char *res = fgets(line_buffer, sizeof(line_buffer), file.fp());
+ if (res == nullptr) {
+ line.clear();
+ return false;
+ }
+ line = line_buffer;
+ while (!line.empty() && isspace(line[line.size() - 1])) {
+ line.pop_back();
+ }
+ return true;
+}
+
+void extract(const vespalib::string &str, const vespalib::string &prefix, vespalib::string &dst) {
+ if (starts_with(str, prefix)) {
+ size_t pos = prefix.size();
+ while ((str.size() > pos) && isspace(str[pos])) {
+ ++pos;
+ }
+ dst = str.substr(pos);
+ }
+}
+
+void report_memory_usage(const vespalib::string &desc) {
+ vespalib::string vm_size = "unknown";
+ vespalib::string vm_rss = "unknown";
+ vespalib::string line;
+ FilePointer file(fopen("/proc/self/status", "r"));
+ while (read_line(file, line)) {
+ extract(line, "VmSize:", vm_size);
+ extract(line, "VmRSS:", vm_rss);
+ }
+ fprintf(stderr, "vm_size: %s, vm_rss: %s (%s)\n", vm_size.c_str(), vm_rss.c_str(), desc.c_str());
+}
+
+int usage(const char *self) {
+ fprintf(stderr, "usage: %s <onnx-model>\n", self);
+ fprintf(stderr, " load onnx model and report memory usage\n");
+ return 1;
+}
+
+int main(int argc, char **argv) {
+ if (argc != 2) {
+ return usage(argv[0]);
+ }
+ report_memory_usage("before loading model");
+ Onnx onnx(argv[1], Onnx::Optimize::ENABLE);
+ report_memory_usage("after loading model");
+ return 0;
+}
diff --git a/jdisc-security-filters/src/main/java/com/yahoo/jdisc/http/filter/security/athenz/AthenzAuthorizationFilter.java b/jdisc-security-filters/src/main/java/com/yahoo/jdisc/http/filter/security/athenz/AthenzAuthorizationFilter.java
index dd4b62ee494..6136bcdfd3a 100644
--- a/jdisc-security-filters/src/main/java/com/yahoo/jdisc/http/filter/security/athenz/AthenzAuthorizationFilter.java
+++ b/jdisc-security-filters/src/main/java/com/yahoo/jdisc/http/filter/security/athenz/AthenzAuthorizationFilter.java
@@ -3,12 +3,14 @@ package com.yahoo.jdisc.http.filter.security.athenz;
import com.google.inject.Inject;
import com.yahoo.jdisc.Metric;
+import com.yahoo.jdisc.http.HttpRequest;
import com.yahoo.jdisc.http.filter.DiscFilterRequest;
import com.yahoo.jdisc.http.filter.security.athenz.RequestResourceMapper.ResourceNameAndAction;
import com.yahoo.jdisc.http.filter.security.base.JsonSecurityRequestFilterBase;
import com.yahoo.vespa.athenz.api.AthenzAccessToken;
import com.yahoo.vespa.athenz.api.AthenzIdentity;
import com.yahoo.vespa.athenz.api.AthenzPrincipal;
+import com.yahoo.vespa.athenz.api.AthenzRole;
import com.yahoo.vespa.athenz.api.ZToken;
import com.yahoo.vespa.athenz.tls.AthenzX509CertificateUtils;
import com.yahoo.vespa.athenz.utils.AthenzIdentities;
@@ -20,6 +22,7 @@ import java.security.cert.X509Certificate;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.logging.Level;
@@ -56,16 +59,20 @@ public class AthenzAuthorizationFilter extends JsonSecurityRequestFilterBase {
private final RequestResourceMapper requestResourceMapper;
private final Metric metric;
private final Set<AthenzIdentity> allowedProxyIdentities;
+ private final Optional<AthenzRole> readRole;
+ private final Optional<AthenzRole> writeRole;
@Inject
public AthenzAuthorizationFilter(AthenzAuthorizationFilterConfig config, RequestResourceMapper resourceMapper, Metric metric) {
- this(config, resourceMapper, new DefaultZpe(), metric);
+ this(config, resourceMapper, new DefaultZpe(), metric, null, null);
}
public AthenzAuthorizationFilter(AthenzAuthorizationFilterConfig config,
RequestResourceMapper resourceMapper,
Zpe zpe,
- Metric metric) {
+ Metric metric,
+ AthenzRole readRole,
+ AthenzRole writeRole) {
this.roleTokenHeaderName = config.roleTokenHeaderName();
List<EnabledCredentials.Enum> enabledCredentials = config.enabledCredentials();
this.enabledCredentials = enabledCredentials.isEmpty()
@@ -77,6 +84,8 @@ public class AthenzAuthorizationFilter extends JsonSecurityRequestFilterBase {
this.allowedProxyIdentities = config.allowedProxyIdentities().stream()
.map(AthenzIdentities::from)
.collect(Collectors.toSet());
+ this.readRole = Optional.ofNullable(readRole);
+ this.writeRole = Optional.ofNullable(writeRole);
}
@Override
@@ -86,7 +95,7 @@ public class AthenzAuthorizationFilter extends JsonSecurityRequestFilterBase {
requestResourceMapper.getResourceNameAndAction(request);
log.log(Level.FINE, () -> String.format("Resource mapping for '%s': %s", request, resourceMapping));
if (resourceMapping.isEmpty()) {
- incrementAcceptedMetrics(request, false);
+ incrementAcceptedMetrics(request, false, Optional.empty());
return Optional.empty();
}
Result result = checkAccessAllowed(request, resourceMapping.get());
@@ -94,15 +103,15 @@ public class AthenzAuthorizationFilter extends JsonSecurityRequestFilterBase {
setAttribute(request, RESULT_ATTRIBUTE, resultType.name());
if (resultType == AuthorizationResult.Type.ALLOW) {
populateRequestWithResult(request, result);
- incrementAcceptedMetrics(request, true);
+ incrementAcceptedMetrics(request, true, Optional.of(result));
return Optional.empty();
}
log.log(Level.FINE, () -> String.format("Forbidden (403) for '%s': %s", request, resultType.name()));
- incrementRejectedMetrics(request, FORBIDDEN, resultType.name());
+ incrementRejectedMetrics(request, FORBIDDEN, resultType.name(), Optional.of(result));
return Optional.of(new ErrorResponse(FORBIDDEN, "Access forbidden: " + resultType.getDescription()));
} catch (IllegalArgumentException e) {
log.log(Level.FINE, () -> String.format("Unauthorized (401) for '%s': %s", request, e.getMessage()));
- incrementRejectedMetrics(request, UNAUTHORIZED, "Unauthorized");
+ incrementRejectedMetrics(request, UNAUTHORIZED, "Unauthorized", Optional.empty());
return Optional.of(new ErrorResponse(UNAUTHORIZED, e.getMessage()));
}
}
@@ -130,33 +139,53 @@ public class AthenzAuthorizationFilter extends JsonSecurityRequestFilterBase {
X509Certificate identityCertificate = getClientCertificate(request).get();
AthenzIdentity peerIdentity = AthenzIdentities.from(identityCertificate);
if (allowedProxyIdentities.contains(peerIdentity)) {
- return checkAccessWithProxiedAccessToken(resourceAndAction, accessToken, identityCertificate);
+ return checkAccessWithProxiedAccessToken(request, resourceAndAction, accessToken, identityCertificate);
} else {
var zpeResult = zpe.checkAccessAllowed(
accessToken, identityCertificate, resourceAndAction.resourceName(), resourceAndAction.action());
- return new Result(ACCESS_TOKEN, peerIdentity, zpeResult);
+ return getResult(ACCESS_TOKEN, peerIdentity, zpeResult, request, resourceAndAction, mapToRequestPrivileges(accessToken.roles()));
}
}
- private Result checkAccessWithProxiedAccessToken(ResourceNameAndAction resourceAndAction, AthenzAccessToken accessToken, X509Certificate identityCertificate) {
+ private Result getResult(EnabledCredentials.Enum credentialType, AthenzIdentity identity, AuthorizationResult zpeResult, DiscFilterRequest request, ResourceNameAndAction resourceAndAction, List<String> privileges) {
+ String currentAction = resourceAndAction.action();
+ String futureAction = resourceAndAction.futureAction();
+ return new Result(credentialType, identity, zpeResult, privileges, currentAction, futureAction);
+ }
+
+ private List<String> mapToRequestPrivileges(List<AthenzRole> roles) {
+ return roles.stream()
+ .map(this::rolePrivilege)
+ .filter(Objects::nonNull)
+ .collect(Collectors.toList());
+ }
+
+ private String rolePrivilege(AthenzRole role) {
+ if (readRole.stream().anyMatch(role::equals)) return "read";
+ if (writeRole.stream().anyMatch(role::equals)) return "write";
+ return null;
+ }
+
+ private Result checkAccessWithProxiedAccessToken(DiscFilterRequest request, ResourceNameAndAction resourceAndAction, AthenzAccessToken accessToken, X509Certificate identityCertificate) {
AthenzIdentity proxyIdentity = AthenzIdentities.from(identityCertificate);
log.log(Level.FINE,
() -> String.format("Checking proxied access token. Proxy identity: '%s'. Allowed identities: %s", proxyIdentity, allowedProxyIdentities));
var zpeResult = zpe.checkAccessAllowed(accessToken, resourceAndAction.resourceName(), resourceAndAction.action());
- return new Result(ACCESS_TOKEN, AthenzIdentities.from(identityCertificate), zpeResult);
+ return getResult(ACCESS_TOKEN, AthenzIdentities.from(identityCertificate), zpeResult, request, resourceAndAction, mapToRequestPrivileges(accessToken.roles()));
}
private Result checkAccessWithRoleCertificate(DiscFilterRequest request, ResourceNameAndAction resourceAndAction) {
X509Certificate roleCertificate = getClientCertificate(request).get();
var zpeResult = zpe.checkAccessAllowed(roleCertificate, resourceAndAction.resourceName(), resourceAndAction.action());
AthenzIdentity identity = AthenzX509CertificateUtils.getIdentityFromRoleCertificate(roleCertificate);
- return new Result(ROLE_CERTIFICATE, identity, zpeResult);
+ AthenzX509CertificateUtils.getRolesFromRoleCertificate(roleCertificate).roleName();
+ return getResult(ROLE_CERTIFICATE, identity, zpeResult, request, resourceAndAction, mapToRequestPrivileges(List.of(AthenzX509CertificateUtils.getRolesFromRoleCertificate(roleCertificate))));
}
private Result checkAccessWithRoleToken(DiscFilterRequest request, ResourceNameAndAction resourceAndAction) {
ZToken roleToken = getRoleToken(request);
var zpeResult = zpe.checkAccessAllowed(roleToken, resourceAndAction.resourceName(), resourceAndAction.action());
- return new Result(ROLE_TOKEN, roleToken.getIdentity(), zpeResult);
+ return getResult(ROLE_TOKEN, roleToken.getIdentity(), zpeResult, request, resourceAndAction, mapToRequestPrivileges(roleToken.getRoles()));
}
private static boolean isAccessTokenPresent(DiscFilterRequest request) {
@@ -246,20 +275,30 @@ public class AthenzAuthorizationFilter extends JsonSecurityRequestFilterBase {
request.setAttribute(name, value);
}
- private void incrementAcceptedMetrics(DiscFilterRequest request, boolean authzRequired) {
+ private void incrementAcceptedMetrics(DiscFilterRequest request, boolean authzRequired, Optional<Result> result) {
String hostHeader = request.getHeader("Host");
Metric.Context context = metric.createContext(Map.of(
"endpoint", hostHeader != null ? hostHeader : "",
- "authz-required", Boolean.toString(authzRequired)));
+ "authz-required", Boolean.toString(authzRequired),
+ "httpMethod", HttpRequest.Method.valueOf(request.getMethod()).name(),
+ "requestPrivileges", result.map(r -> String.join(",", r.requestPrivileges)).orElse(""),
+ "currentRequestMapping", result.map(r -> r.currentAction).orElse(""),
+ "futureRequestMapping", result.map(r -> r.futureAction).orElse("")
+ ));
metric.add(ACCEPTED_METRIC_NAME, 1L, context);
}
- private void incrementRejectedMetrics(DiscFilterRequest request, int statusCode, String zpeCode) {
+ private void incrementRejectedMetrics(DiscFilterRequest request, int statusCode, String zpeCode, Optional<Result> result) {
String hostHeader = request.getHeader("Host");
Metric.Context context = metric.createContext(Map.of(
"endpoint", hostHeader != null ? hostHeader : "",
"status-code", Integer.toString(statusCode),
- "zpe-status", zpeCode));
+ "zpe-status", zpeCode,
+ "httpMethod", HttpRequest.Method.valueOf(request.getMethod()),
+ "requestPrivileges", result.map(r -> String.join(",", r.requestPrivileges)).orElse(""),
+ "currentRequestMapping", result.map(r -> r.currentAction).orElse(""),
+ "futureRequestMapping", result.map(r -> r.futureAction).orElse("")
+ ));
metric.add(REJECTED_METRIC_NAME, 1L, context);
}
@@ -267,11 +306,17 @@ public class AthenzAuthorizationFilter extends JsonSecurityRequestFilterBase {
final EnabledCredentials.Enum credentialType;
final AthenzIdentity identity;
final AuthorizationResult zpeResult;
+ final List<String> requestPrivileges;
+ final String currentAction;
+ final String futureAction;
- Result(EnabledCredentials.Enum credentialType, AthenzIdentity identity, AuthorizationResult zpeResult) {
+ public Result(EnabledCredentials.Enum credentialType, AthenzIdentity identity, AuthorizationResult zpeResult, List<String> requestPrivileges, String currentAction, String futureAction) {
this.credentialType = credentialType;
this.identity = identity;
this.zpeResult = zpeResult;
+ this.requestPrivileges = requestPrivileges;
+ this.currentAction = currentAction;
+ this.futureAction = futureAction;
}
}
}
diff --git a/jdisc-security-filters/src/main/java/com/yahoo/jdisc/http/filter/security/athenz/RequestResourceMapper.java b/jdisc-security-filters/src/main/java/com/yahoo/jdisc/http/filter/security/athenz/RequestResourceMapper.java
index 56c52bd71c4..c962e973959 100644
--- a/jdisc-security-filters/src/main/java/com/yahoo/jdisc/http/filter/security/athenz/RequestResourceMapper.java
+++ b/jdisc-security-filters/src/main/java/com/yahoo/jdisc/http/filter/security/athenz/RequestResourceMapper.java
@@ -28,10 +28,15 @@ public interface RequestResourceMapper {
class ResourceNameAndAction {
private final AthenzResourceName resourceName;
private final String action;
+ private final String futureAction;
public ResourceNameAndAction(AthenzResourceName resourceName, String action) {
+ this(resourceName, action, action);
+ }
+ public ResourceNameAndAction(AthenzResourceName resourceName, String action, String futureAction) {
this.resourceName = resourceName;
this.action = action;
+ this.futureAction = futureAction;
}
public AthenzResourceName resourceName() {
@@ -42,6 +47,14 @@ public interface RequestResourceMapper {
return action;
}
+ public ResourceNameAndAction withFutureAction(String futureAction) {
+ return new ResourceNameAndAction(resourceName, action, futureAction);
+ }
+
+ public String futureAction() {
+ return futureAction;
+ }
+
@Override
public String toString() {
return "ResourceNameAndAction{" +
diff --git a/jdisc-security-filters/src/test/java/com/yahoo/jdisc/http/filter/security/athenz/AthenzAuthorizationFilterTest.java b/jdisc-security-filters/src/test/java/com/yahoo/jdisc/http/filter/security/athenz/AthenzAuthorizationFilterTest.java
index bfe02d1f279..137e4653670 100644
--- a/jdisc-security-filters/src/test/java/com/yahoo/jdisc/http/filter/security/athenz/AthenzAuthorizationFilterTest.java
+++ b/jdisc-security-filters/src/test/java/com/yahoo/jdisc/http/filter/security/athenz/AthenzAuthorizationFilterTest.java
@@ -296,7 +296,9 @@ public class AthenzAuthorizationFilterTest {
.allowedProxyIdentities(allowedProxyIdentities)),
new StaticRequestResourceMapper(RESOURCE_NAME, ACTION),
zpe,
- metric);
+ metric,
+ new AthenzRole("domain","reader"),
+ new AthenzRole("domain", "writer"));
}
private static void assertAuthorizationResult(DiscFilterRequest request, Type expectedResult) {
diff --git a/metrics/src/tests/summetrictest.cpp b/metrics/src/tests/summetrictest.cpp
index e3d58659daf..d0380a630f1 100644
--- a/metrics/src/tests/summetrictest.cpp
+++ b/metrics/src/tests/summetrictest.cpp
@@ -125,4 +125,45 @@ TEST(SumMetricTest, test_start_value)
EXPECT_EQ(int64_t(60), sum.getLongValue("value"));
}
+namespace {
+
+struct MetricSetWithSum : public MetricSet
+{
+ LongValueMetric _v1;
+ LongValueMetric _v2;
+ SumMetric<LongValueMetric> _sum;
+ MetricSetWithSum();
+ ~MetricSetWithSum() override;
+};
+
+MetricSetWithSum::MetricSetWithSum()
+ : MetricSet("MetricSetWithSum", {}, ""),
+ _v1("v1", {}, "", this),
+ _v2("v2", {}, "", this),
+ _sum("sum", {}, "", this)
+{
+ _sum.addMetricToSum(_v1);
+ _sum.addMetricToSum(_v2);
+}
+
+MetricSetWithSum::~MetricSetWithSum() = default;
+
+}
+
+TEST(SumMetricTest, test_nested_sum)
+{
+ MetricSetWithSum w1;
+ MetricSetWithSum w2;
+ MetricSetWithSum sum;
+ w1._v1.addValue(10);
+ w1._v2.addValue(13);
+ w2._v1.addValue(27);
+ w2._v2.addValue(29);
+ w1.addToPart(sum);
+ w2.addToPart(sum);
+ EXPECT_EQ(int64_t(37), sum._v1.getLongValue("value"));
+ EXPECT_EQ(int64_t(42), sum._v2.getLongValue("value"));
+ EXPECT_EQ(int64_t(79), sum._sum.getLongValue("value"));
+}
+
}
diff --git a/metrics/src/vespa/metrics/metric.cpp b/metrics/src/vespa/metrics/metric.cpp
index a8d8194b26d..50fc36c62cb 100644
--- a/metrics/src/vespa/metrics/metric.cpp
+++ b/metrics/src/vespa/metrics/metric.cpp
@@ -232,4 +232,11 @@ Metric::assignValues(const Metric& m) {
assert(ownerList.empty());
return this;
}
+
+bool
+Metric::is_sum_metric() const
+{
+ return false;
+}
+
} // metrics
diff --git a/metrics/src/vespa/metrics/metric.h b/metrics/src/vespa/metrics/metric.h
index 10b74a2da22..c8fb3031278 100644
--- a/metrics/src/vespa/metrics/metric.h
+++ b/metrics/src/vespa/metrics/metric.h
@@ -247,6 +247,8 @@ public:
virtual bool isMetricSet() const { return false; }
+ virtual bool is_sum_metric() const;
+
private:
/**
diff --git a/metrics/src/vespa/metrics/summetric.h b/metrics/src/vespa/metrics/summetric.h
index f04c1696638..7b60c968e5b 100644
--- a/metrics/src/vespa/metrics/summetric.h
+++ b/metrics/src/vespa/metrics/summetric.h
@@ -69,6 +69,7 @@ public:
void printDebug(std::ostream&, const std::string& indent="") const override;
void addToPart(Metric&) const override;
void addToSnapshot(Metric&, std::vector<Metric::UP> &) const override;
+ bool is_sum_metric() const override;
private:
friend struct MetricManagerTest;
diff --git a/metrics/src/vespa/metrics/summetric.hpp b/metrics/src/vespa/metrics/summetric.hpp
index 9520456a974..e067b9643c2 100644
--- a/metrics/src/vespa/metrics/summetric.hpp
+++ b/metrics/src/vespa/metrics/summetric.hpp
@@ -142,8 +142,17 @@ template<typename AddendMetric>
void
SumMetric<AddendMetric>::addToPart(Metric& m) const
{
- std::pair<std::vector<Metric::UP>, Metric::UP> sum(generateSum());
- sum.second->addToPart(m);
+ if (!m.is_sum_metric()) {
+ std::pair<std::vector<Metric::UP>, Metric::UP> sum(generateSum());
+ sum.second->addToPart(m);
+ }
+}
+
+template<typename AddendMetric>
+bool
+SumMetric<AddendMetric>::is_sum_metric() const
+{
+ return true;
}
template<typename AddendMetric>
diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeadmin/NodeAdminImpl.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeadmin/NodeAdminImpl.java
index ef9520969af..5d7ab48753f 100644
--- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeadmin/NodeAdminImpl.java
+++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeadmin/NodeAdminImpl.java
@@ -15,11 +15,13 @@ import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
/**
* Administers a host (for now only docker hosts) and its nodes (docker containers nodes).
@@ -130,7 +132,7 @@ public class NodeAdminImpl implements NodeAdmin {
}
// Use filter with count instead of allMatch() because allMatch() will short circuit on first non-match
- boolean allNodeAgentsConverged = nodeAgentWithSchedulerByHostname.values().parallelStream()
+ boolean allNodeAgentsConverged = parallelStreamOfNodeAgentWithScheduler()
.filter(nodeAgentScheduler -> !nodeAgentScheduler.setFrozen(wantFrozen, freezeTimeout))
.count() == 0;
@@ -158,9 +160,7 @@ public class NodeAdminImpl implements NodeAdmin {
@Override
public void stopNodeAgentServices() {
// Each container may spend 1-1:30 minutes stopping
- nodeAgentWithSchedulerByHostname.values()
- .parallelStream()
- .forEach(NodeAgentWithScheduler::stopForHostSuspension);
+ parallelStreamOfNodeAgentWithScheduler().forEach(NodeAgentWithScheduler::stopForHostSuspension);
}
@Override
@@ -171,7 +171,18 @@ public class NodeAdminImpl implements NodeAdmin {
@Override
public void stop() {
// Stop all node-agents in parallel, will block until the last NodeAgent is stopped
- nodeAgentWithSchedulerByHostname.values().parallelStream().forEach(NodeAgentWithScheduler::stopForRemoval);
+ parallelStreamOfNodeAgentWithScheduler().forEach(NodeAgentWithScheduler::stopForRemoval);
+ }
+
+ /**
+ * Returns a parallel stream of NodeAgentWithScheduler.
+ *
+ * <p>Why not just call nodeAgentWithSchedulerByHostname.values().parallelStream()? Experiments
+ * with Java 11 have shown that with 10 nodes and forEach(), there are a maximum of 3 concurrent
+ * threads. With HashMap it produces 5. With List it produces 10 concurrent threads.</p>
+ */
+ private Stream<NodeAgentWithScheduler> parallelStreamOfNodeAgentWithScheduler() {
+ return List.copyOf(nodeAgentWithSchedulerByHostname.values()).parallelStream();
}
// Set-difference. Returns minuend minus subtrahend.
diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImpl.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImpl.java
index df3f075e8d9..05c765c9d78 100644
--- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImpl.java
+++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImpl.java
@@ -1,4 +1,4 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.hosted.node.admin.nodeagent;
import com.yahoo.config.provision.ApplicationId;
@@ -357,7 +357,7 @@ public class NodeAgentImpl implements NodeAgent {
}
try {
- if (context.node().state() != NodeState.dirty) {
+ if (context.node().state() == NodeState.active) {
suspend(context);
}
stopServices(context);
diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/OperatorChangeApplicationMaintainer.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/ExpeditedChangeApplicationMaintainer.java
index 47337518a65..39183688340 100644
--- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/OperatorChangeApplicationMaintainer.java
+++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/ExpeditedChangeApplicationMaintainer.java
@@ -20,19 +20,22 @@ import java.util.Set;
import java.util.stream.Collectors;
/**
- * The application maintainer detects manual operator changes to nodes and redeploys affected applications.
- * The purpose of this is to redeploy affected applications faster than achieved by the regular application
- * maintenance to reduce the time period where the node repository and the application model is out of sync.
+ * This maintainer detects changes to nodes that must be expedited, and redeploys affected applications.
+ *
+ * The purpose of this is to redeploy affected applications faster than achieved by
+ * {@link PeriodicApplicationMaintainer}, to reduce the time period where the node repository and the application model
+ * is out of sync.
*
* Why can't the manual change directly make the application redeployment?
- * Because the redeployment must run at the right config server, while the node state change may be running
- * at any config server.
+ *
+ * Because we want to queue redeployments to avoid overloading config servers.
*
* @author bratseth
+ * @author mpolden
*/
-public class OperatorChangeApplicationMaintainer extends ApplicationMaintainer {
+public class ExpeditedChangeApplicationMaintainer extends ApplicationMaintainer {
- OperatorChangeApplicationMaintainer(Deployer deployer, Metric metric, NodeRepository nodeRepository, Duration interval) {
+ ExpeditedChangeApplicationMaintainer(Deployer deployer, Metric metric, NodeRepository nodeRepository, Duration interval) {
super(deployer, metric, nodeRepository, interval);
}
@@ -57,7 +60,7 @@ public class OperatorChangeApplicationMaintainer extends ApplicationMaintainer {
boolean deployed = deployWithLock(application);
if (deployed)
log.info("Redeployed application " + application.toShortString() +
- " as a manual change was made to its nodes");
+ " as an expedited change was made to its nodes");
}
private boolean hasNodesWithChanges(ApplicationId applicationId, NodeList nodes) {
@@ -66,7 +69,7 @@ public class OperatorChangeApplicationMaintainer extends ApplicationMaintainer {
return nodes.stream()
.flatMap(node -> node.history().events().stream())
- .filter(event -> event.agent() == Agent.operator)
+ .filter(event -> expediteChangeBy(event.agent()))
.map(History.Event::at)
.anyMatch(e -> lastDeployTime.get().isBefore(e));
}
@@ -84,5 +87,14 @@ public class OperatorChangeApplicationMaintainer extends ApplicationMaintainer {
.groupingBy(node -> node.allocation().get().owner());
}
+ /** Returns whether to expedite changes performed by agent */
+ private boolean expediteChangeBy(Agent agent) {
+ switch (agent) {
+ case operator:
+ case RebuildingOsUpgrader:
+ case HostEncrypter: return true;
+ }
+ return false;
+ }
}
diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/MetricsReporter.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/MetricsReporter.java
index 4537d99d107..9f84322fe0f 100644
--- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/MetricsReporter.java
+++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/MetricsReporter.java
@@ -200,8 +200,10 @@ public class MetricsReporter extends NodeRepositoryMaintainer {
metric.set("wantToDeprovision", node.status().wantToDeprovision() ? 1 : 0, context);
metric.set("failReport", NodeFailer.reasonsToFailParentHost(node).isEmpty() ? 0 : 1, context);
- metric.set("wantToEncrypt", node.reports().getReport("wantToEncrypt").isPresent() ? 1 : 0, context);
- metric.set("diskEncrypted", node.reports().getReport("diskEncrypted").isPresent() ? 1 : 0, context);
+ if (node.type().isHost()) {
+ metric.set("wantToEncrypt", node.reports().getReport("wantToEncrypt").isPresent() ? 1 : 0, context);
+ metric.set("diskEncrypted", node.reports().getReport("diskEncrypted").isPresent() ? 1 : 0, context);
+ }
HostName hostname = new HostName(node.hostname());
diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/NodeRepositoryMaintenance.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/NodeRepositoryMaintenance.java
index 44ee8b5a8b3..cdb5202603a 100644
--- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/NodeRepositoryMaintenance.java
+++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/NodeRepositoryMaintenance.java
@@ -48,7 +48,7 @@ public class NodeRepositoryMaintenance extends AbstractComponent {
maintainers.add(new NodeFailer(deployer, nodeRepository, defaults.failGrace, defaults.nodeFailerInterval, orchestrator, defaults.throttlePolicy, metric));
maintainers.add(new NodeHealthTracker(hostLivenessTracker, serviceMonitor, nodeRepository, defaults.nodeFailureStatusUpdateInterval, metric));
- maintainers.add(new OperatorChangeApplicationMaintainer(deployer, metric, nodeRepository, defaults.operatorChangeRedeployInterval));
+ maintainers.add(new ExpeditedChangeApplicationMaintainer(deployer, metric, nodeRepository, defaults.expeditedChangeRedeployInterval));
maintainers.add(new ReservationExpirer(nodeRepository, defaults.reservationExpiry, metric));
maintainers.add(new RetiredExpirer(nodeRepository, orchestrator, deployer, metric, defaults.retiredInterval, defaults.retiredExpiry));
maintainers.add(new InactiveExpirer(nodeRepository, defaults.inactiveExpiry, Map.of(NodeType.config, defaults.inactiveConfigServerExpiry,
@@ -67,6 +67,7 @@ public class NodeRepositoryMaintenance extends AbstractComponent {
maintainers.add(new ScalingSuggestionsMaintainer(nodeRepository, defaults.scalingSuggestionsInterval, metric));
maintainers.add(new SwitchRebalancer(nodeRepository, defaults.switchRebalancerInterval, metric, deployer));
maintainers.add(new HostEncrypter(nodeRepository, defaults.hostEncrypterInterval, metric));
+ maintainers.add(new ParkedExpirer(nodeRepository, defaults.parkedExpirerInterval, metric));
provisionServiceProvider.getLoadBalancerService(nodeRepository)
.map(lbService -> new LoadBalancerExpirer(nodeRepository, defaults.loadBalancerExpirerInterval, lbService, metric))
@@ -91,7 +92,7 @@ public class NodeRepositoryMaintenance extends AbstractComponent {
/** Time between each run of maintainer that does periodic redeployment */
private final Duration redeployMaintainerInterval;
/** Applications are redeployed after manual operator changes within this time period */
- private final Duration operatorChangeRedeployInterval;
+ private final Duration expeditedChangeRedeployInterval;
/** The time a node must be continuously unresponsive before it is failed */
private final Duration failGrace;
@@ -119,6 +120,7 @@ public class NodeRepositoryMaintenance extends AbstractComponent {
private final Duration scalingSuggestionsInterval;
private final Duration switchRebalancerInterval;
private final Duration hostEncrypterInterval;
+ private final Duration parkedExpirerInterval;
private final NodeFailer.ThrottlePolicy throttlePolicy;
@@ -133,7 +135,7 @@ public class NodeRepositoryMaintenance extends AbstractComponent {
nodeFailerInterval = Duration.ofMinutes(15);
nodeFailureStatusUpdateInterval = Duration.ofMinutes(2);
nodeMetricsCollectionInterval = Duration.ofMinutes(1);
- operatorChangeRedeployInterval = Duration.ofMinutes(3);
+ expeditedChangeRedeployInterval = Duration.ofMinutes(3);
// Vespa upgrade frequency is higher in CD so (de)activate OS upgrades more frequently as well
osUpgradeActivatorInterval = zone.system().isCd() ? Duration.ofSeconds(30) : Duration.ofMinutes(5);
periodicRedeployInterval = Duration.ofMinutes(60);
@@ -149,6 +151,7 @@ public class NodeRepositoryMaintenance extends AbstractComponent {
throttlePolicy = NodeFailer.ThrottlePolicy.hosted;
inactiveConfigServerExpiry = Duration.ofMinutes(5);
inactiveControllerExpiry = Duration.ofMinutes(5);
+ parkedExpirerInterval = Duration.ofMinutes(30);
if (zone.environment().isProduction() && ! zone.system().isCd()) {
inactiveExpiry = Duration.ofHours(4); // enough time for the application owner to discover and redeploy
diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/ParkedExpirer.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/ParkedExpirer.java
new file mode 100644
index 00000000000..c5259ee84c4
--- /dev/null
+++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/ParkedExpirer.java
@@ -0,0 +1,63 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.hosted.provision.maintenance;
+
+import com.yahoo.config.provision.NodeType;
+import com.yahoo.jdisc.Metric;
+import com.yahoo.vespa.hosted.provision.Node;
+import com.yahoo.vespa.hosted.provision.NodeRepository;
+import com.yahoo.vespa.hosted.provision.node.Agent;
+import com.yahoo.vespa.hosted.provision.node.History;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.logging.Logger;
+
+/**
+ *
+ * Expires parked nodes in dynamically provisioned zones.
+ * If number of parked hosts exceed MAX_ALLOWED_PARKED_HOSTS, recycle in a queue order
+ *
+ * @author olaa
+ */
+public class ParkedExpirer extends NodeRepositoryMaintainer {
+
+ private static final int MAX_ALLOWED_PARKED_HOSTS = 20;
+ private static final Logger log = Logger.getLogger(ParkedExpirer.class.getName());
+
+
+ private final NodeRepository nodeRepository;
+
+ ParkedExpirer(NodeRepository nodeRepository, Duration interval, Metric metric) {
+ super(nodeRepository, interval, metric);
+ this.nodeRepository = nodeRepository;
+ }
+
+ @Override
+ protected double maintain() {
+ if (!nodeRepository.zone().getCloud().dynamicProvisioning())
+ return 1.0;
+
+ var parkedHosts = new ArrayList<>(nodeRepository.nodes().list(Node.State.parked)
+ .nodeType(NodeType.host)
+ .asList());
+
+ int hostsToExpire = Math.max(0, parkedHosts.size() - MAX_ALLOWED_PARKED_HOSTS);
+ nodeRepository.nodes().list(Node.State.parked).nodeType(NodeType.host)
+ .sortedBy(Comparator.comparing(this::getParkedTime))
+ .first(hostsToExpire)
+ .forEach(host -> {
+ log.info("Allowed number of parked nodes exceeded. Recycling " + host.hostname());
+ nodeRepository.nodes().deallocate(host, Agent.ParkedExpirer, "Expired by ParkedExpirer");
+ });
+
+ return 1.0;
+ }
+
+ private Instant getParkedTime(Node node) {
+ return node.history().event(History.Event.Type.parked)
+ .map(History.Event::at)
+ .orElse(Instant.EPOCH); // Should not happen
+ }
+}
diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/node/Agent.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/node/Agent.java
index d1c3f00ddca..ed82470fa42 100644
--- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/node/Agent.java
+++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/node/Agent.java
@@ -21,6 +21,7 @@ public enum Agent {
InactiveExpirer,
ProvisionedExpirer,
ReservationExpirer,
+ ParkedExpirer,
DynamicProvisioningMaintainer,
RetiringUpgrader,
RebuildingOsUpgrader,
diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/persistence/NodeSerializer.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/persistence/NodeSerializer.java
index d83f21e5fec..dff4a66bd42 100644
--- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/persistence/NodeSerializer.java
+++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/persistence/NodeSerializer.java
@@ -482,6 +482,7 @@ public class NodeSerializer {
case "SpareCapacityMaintainer": return Agent.SpareCapacityMaintainer;
case "SwitchRebalancer": return Agent.SwitchRebalancer;
case "HostEncrypter": return Agent.HostEncrypter;
+ case "ParkedExpirer": return Agent.ParkedExpirer;
}
throw new IllegalArgumentException("Unknown node event agent '" + eventAgentField.asString() + "'");
}
@@ -504,6 +505,7 @@ public class NodeSerializer {
case SpareCapacityMaintainer: return "SpareCapacityMaintainer";
case SwitchRebalancer: return "SwitchRebalancer";
case HostEncrypter: return "HostEncrypter";
+ case ParkedExpirer: return "ParkedExpirer";
}
throw new IllegalArgumentException("Serialized form of '" + agent + "' not defined");
}
diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/OperatorChangeApplicationMaintainerTest.java b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/ExpeditedChangeApplicationMaintainerTest.java
index db6aebacddc..62d09c99f16 100644
--- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/OperatorChangeApplicationMaintainerTest.java
+++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/ExpeditedChangeApplicationMaintainerTest.java
@@ -26,7 +26,7 @@ import static org.junit.Assert.assertEquals;
/**
* @author bratseth
*/
-public class OperatorChangeApplicationMaintainerTest {
+public class ExpeditedChangeApplicationMaintainerTest {
@Test
public void test_application_maintenance() {
@@ -42,10 +42,10 @@ public class OperatorChangeApplicationMaintainerTest {
// Create applications
fixture.activate();
assertEquals("Initial applications are deployed", 3, fixture.deployer.redeployments);
- OperatorChangeApplicationMaintainer maintainer = new OperatorChangeApplicationMaintainer(fixture.deployer,
- new TestMetric(),
- nodeRepository,
- Duration.ofMinutes(1));
+ ExpeditedChangeApplicationMaintainer maintainer = new ExpeditedChangeApplicationMaintainer(fixture.deployer,
+ new TestMetric(),
+ nodeRepository,
+ Duration.ofMinutes(1));
clock.advance(Duration.ofMinutes(2));
maintainer.maintain();
diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/MetricsReporterTest.java b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/MetricsReporterTest.java
index 4251d1276f8..7fa6810c1ba 100644
--- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/MetricsReporterTest.java
+++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/MetricsReporterTest.java
@@ -43,6 +43,7 @@ import java.util.TreeMap;
import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
@@ -113,8 +114,6 @@ public class MetricsReporterTest {
expectedMetrics.put("wantToDeprovision", 0);
expectedMetrics.put("failReport", 0);
- expectedMetrics.put("wantToEncrypt", 0);
- expectedMetrics.put("diskEncrypted", 0);
expectedMetrics.put("allowedToBeDown", 1);
expectedMetrics.put("suspended", 1);
@@ -150,6 +149,27 @@ public class MetricsReporterTest {
assertEquals(expectedMetrics, new TreeMap<>(metric.values));
}
+ @Test
+ public void test_registered_metrics_for_host() {
+ NodeFlavors nodeFlavors = FlavorConfigBuilder.createDummies("default");
+ Orchestrator orchestrator = mock(Orchestrator.class);
+ when(orchestrator.getHostInfo(eq(reference), any())).thenReturn(
+ HostInfo.createSuspended(HostStatus.ALLOWED_TO_BE_DOWN, Instant.ofEpochSecond(1)));
+ ProvisioningTester tester = new ProvisioningTester.Builder().flavors(nodeFlavors.getFlavors()).orchestrator(orchestrator).build();
+ tester.makeProvisionedNodes(1, "default", NodeType.host, 0);
+
+ tester.clock().setInstant(Instant.ofEpochSecond(124));
+
+ TestMetric metric = new TestMetric();
+ MetricsReporter metricsReporter = metricsReporter(metric, tester);
+ metricsReporter.maintain();
+
+ // Only verify metrics that are set for hosts
+ TreeMap<String, Number> metrics = new TreeMap<>(metric.values);
+ assertTrue(metrics.containsKey("wantToEncrypt"));
+ assertTrue(metrics.containsKey("diskEncrypted"));
+ }
+
private void verifyAndRemoveIntegerMetricSum(TestMetric metric, String key, int expected) {
assertEquals(expected, (int) metric.sumNumberValues(key));
metric.remove(key);
diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/ParkedExpirerTest.java b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/ParkedExpirerTest.java
new file mode 100644
index 00000000000..7fb22f33453
--- /dev/null
+++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/ParkedExpirerTest.java
@@ -0,0 +1,70 @@
+package com.yahoo.vespa.hosted.provision.maintenance;
+
+import com.yahoo.config.provision.Cloud;
+import com.yahoo.config.provision.Environment;
+import com.yahoo.config.provision.Flavor;
+import com.yahoo.config.provision.NodeResources;
+import com.yahoo.config.provision.NodeType;
+import com.yahoo.config.provision.RegionName;
+import com.yahoo.config.provision.SystemName;
+import com.yahoo.config.provision.Zone;
+import com.yahoo.vespa.hosted.provision.Node;
+import com.yahoo.vespa.hosted.provision.node.Agent;
+import com.yahoo.vespa.hosted.provision.provisioning.ProvisioningTester;
+import com.yahoo.vespa.hosted.provision.testutils.MockHostProvisioner;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.junit.Assert.*;
+
+/**
+ * @author olaa
+ */
+public class ParkedExpirerTest {
+
+ private ProvisioningTester tester;
+
+ @Test
+ public void noop_if_not_dynamic_provisioning() {
+ tester = getTester(false);
+ populateNodeRepo();
+
+ var expirer = new ParkedExpirer(tester.nodeRepository(), Duration.ofMinutes(4), new TestMetric());
+ expirer.maintain();
+
+ assertEquals(0, tester.nodeRepository().nodes().list(Node.State.dirty).size());
+ assertEquals(25, tester.nodeRepository().nodes().list(Node.State.parked).size());
+ }
+
+ @Test
+ public void recycles_correct_subset_of_parked_hosts() {
+ tester = getTester(true);
+ populateNodeRepo();
+
+ var expirer = new ParkedExpirer(tester.nodeRepository(), Duration.ofMinutes(4), new TestMetric());
+ expirer.maintain();
+
+ assertEquals(5, tester.nodeRepository().nodes().list(Node.State.dirty).size());
+ assertEquals(20, tester.nodeRepository().nodes().list(Node.State.parked).size());
+
+ }
+
+ private ProvisioningTester getTester(boolean dynamicProvisioning) {
+ var zone = new Zone(Cloud.builder().dynamicProvisioning(dynamicProvisioning).build(), SystemName.main, Environment.prod, RegionName.from("us-east"));
+ return new ProvisioningTester.Builder().zone(zone)
+ .hostProvisioner(dynamicProvisioning ? new MockHostProvisioner(List.of()) : null)
+ .build();
+ }
+
+ private void populateNodeRepo() {
+ var nodes = IntStream.range(0, 25)
+ .mapToObj(i -> Node.create("id-" + i, "host-" + i, new Flavor(NodeResources.unspecified()), Node.State.parked, NodeType.host).build())
+ .collect(Collectors.toList());
+ tester.nodeRepository().database().addNodesInState(nodes, Node.State.parked, Agent.system);
+ }
+
+} \ No newline at end of file
diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/restapi/responses/maintenance.json b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/restapi/responses/maintenance.json
index 2dcf2d0b838..26d711945c6 100644
--- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/restapi/responses/maintenance.json
+++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/restapi/responses/maintenance.json
@@ -7,6 +7,9 @@
"name": "DirtyExpirer"
},
{
+ "name": "ExpeditedChangeApplicationMaintainer"
+ },
+ {
"name": "FailedExpirer"
},
{
@@ -37,10 +40,10 @@
"name": "NodeRebooter"
},
{
- "name": "OperatorChangeApplicationMaintainer"
+ "name": "OsUpgradeActivator"
},
{
- "name": "OsUpgradeActivator"
+ "name": "ParkedExpirer"
},
{
"name": "PeriodicApplicationMaintainer"
diff --git a/searchlib/src/tests/predicate/predicate_index_test.cpp b/searchlib/src/tests/predicate/predicate_index_test.cpp
index facf0054c4a..19ad0301b5c 100644
--- a/searchlib/src/tests/predicate/predicate_index_test.cpp
+++ b/searchlib/src/tests/predicate/predicate_index_test.cpp
@@ -113,7 +113,6 @@ TEST("require that PredicateIndex can index document") {
EXPECT_FALSE(index.getIntervalIndex().lookup(hash).valid());
indexFeature(index, doc_id, min_feature, {{hash, interval}}, {});
index.commit();
-
auto posting_it = lookupPosting(index, hash);
EXPECT_EQUAL(doc_id, posting_it.getKey());
uint32_t size;
@@ -123,6 +122,25 @@ TEST("require that PredicateIndex can index document") {
EXPECT_EQUAL(interval, interval_list[0]);
}
+TEST("require that bit vector cache is initialized correctly") {
+ BitVectorCache::KeyAndCountSet keySet;
+ keySet.emplace_back(hash, dummy_provider.getDocIdLimit()/2);
+ PredicateIndex index(generation_holder, dummy_provider, simple_index_config, 10);
+ EXPECT_FALSE(index.getIntervalIndex().lookup(hash).valid());
+ indexFeature(index, doc_id, min_feature, {{hash, interval}}, {});
+ index.requireCachePopulation();
+ index.populateIfNeeded(dummy_provider.getDocIdLimit());
+ EXPECT_TRUE(index.lookupCachedSet(keySet).empty());
+ index.commit();
+ EXPECT_TRUE(index.getIntervalIndex().lookup(hash).valid());
+ EXPECT_TRUE(index.lookupCachedSet(keySet).empty());
+
+ index.requireCachePopulation();
+ index.populateIfNeeded(dummy_provider.getDocIdLimit());
+ EXPECT_FALSE(index.lookupCachedSet(keySet).empty());
+}
+
+
TEST("require that PredicateIndex can index document with bounds") {
PredicateIndex index(generation_holder, dummy_provider, simple_index_config, 10);
EXPECT_FALSE(index.getIntervalIndex().lookup(hash).valid());
diff --git a/searchlib/src/tests/queryeval/predicate/predicate_blueprint_test.cpp b/searchlib/src/tests/queryeval/predicate/predicate_blueprint_test.cpp
index 3dd2ec26dea..5b8d5f5b9ce 100644
--- a/searchlib/src/tests/queryeval/predicate/predicate_blueprint_test.cpp
+++ b/searchlib/src/tests/queryeval/predicate/predicate_blueprint_test.cpp
@@ -86,8 +86,7 @@ TEST_F("require that blueprint with empty index estimates empty.", Fixture) {
EXPECT_EQUAL(0u, blueprint.getState().estimate().estHits);
}
-TEST_F("require that blueprint with zero-constraint doc estimates non-empty.",
- Fixture) {
+TEST_F("require that blueprint with zero-constraint doc estimates non-empty.", Fixture) {
f.indexEmptyDocument(42);
PredicateBlueprint blueprint(f.field, f.guard(), f.query);
EXPECT_FALSE(blueprint.getState().estimate().empty);
@@ -98,11 +97,9 @@ const int min_feature = 1;
const uint32_t doc_id = 2;
const uint32_t interval = 0x0001ffff;
-TEST_F("require that blueprint with posting list entry estimates non-empty.",
- Fixture) {
+TEST_F("require that blueprint with posting list entry estimates non-empty.", Fixture) {
PredicateTreeAnnotations annotations(min_feature);
- annotations.interval_map[PredicateHash::hash64("key=value")] =
- std::vector<Interval>{{interval}};
+ annotations.interval_map[PredicateHash::hash64("key=value")] = std::vector<Interval>{{interval}};
f.indexDocument(doc_id, annotations);
PredicateBlueprint blueprint(f.field, f.guard(), f.query);
@@ -110,8 +107,7 @@ TEST_F("require that blueprint with posting list entry estimates non-empty.",
EXPECT_EQUAL(0u, blueprint.getState().estimate().estHits);
}
-TEST_F("require that blueprint with 'bounds' posting list entry estimates "
- "non-empty.", Fixture) {
+TEST_F("require that blueprint with 'bounds' posting list entry estimates non-empty.", Fixture) {
PredicateTreeAnnotations annotations(min_feature);
annotations.bounds_map[PredicateHash::hash64("range_key=40")] =
std::vector<IntervalWithBounds>{{interval, 0x80000003}};
@@ -122,34 +118,50 @@ TEST_F("require that blueprint with 'bounds' posting list entry estimates "
EXPECT_EQUAL(0u, blueprint.getState().estimate().estHits);
}
-TEST_F("require that blueprint with zstar-compressed estimates non-empty.",
- Fixture) {
+TEST_F("require that blueprint with zstar-compressed estimates non-empty.", Fixture) {
PredicateTreeAnnotations annotations(1);
- annotations.interval_map[Constants::z_star_compressed_hash] =std::vector<Interval>{{0xfffe0000}};
+ annotations.interval_map[Constants::z_star_compressed_hash] = std::vector<Interval>{{0xfffe0000}};
f.indexDocument(doc_id, annotations);
PredicateBlueprint blueprint(f.field, f.guard(), f.query);
EXPECT_FALSE(blueprint.getState().estimate().empty);
EXPECT_EQUAL(0u, blueprint.getState().estimate().estHits);
}
-TEST_F("require that blueprint can create search", Fixture) {
- PredicateTreeAnnotations annotations(1);
- annotations.interval_map[PredicateHash::hash64("key=value")] =std::vector<Interval>{{interval}};
- f.indexDocument(doc_id, annotations);
-
+void
+runQuery(Fixture & f, std::vector<uint32_t> expected, bool expectCachedSize, uint32_t expectedKV) {
PredicateBlueprint blueprint(f.field, f.guard(), f.query);
blueprint.fetchPostings(ExecuteInfo::TRUE);
+ EXPECT_EQUAL(expectCachedSize, blueprint.getCachedFeatures().size());
+ for (uint32_t docId : expected) {
+ EXPECT_EQUAL(expectedKV, uint32_t(blueprint.getKV()[docId]));
+ }
TermFieldMatchDataArray tfmda;
SearchIterator::UP it = blueprint.createLeafSearch(tfmda, true);
ASSERT_TRUE(it.get());
it->initFullRange();
EXPECT_EQUAL(SearchIterator::beginId(), it->getDocId());
- EXPECT_FALSE(it->seek(doc_id - 1));
- EXPECT_EQUAL(doc_id, it->getDocId());
- EXPECT_TRUE(it->seek(doc_id));
- EXPECT_EQUAL(doc_id, it->getDocId());
- EXPECT_FALSE(it->seek(doc_id + 1));
- EXPECT_TRUE(it->isAtEnd());
+ std::vector<uint32_t> actual;
+ for (it->seek(1); ! it->isAtEnd(); it->seek(it->getDocId()+1)) {
+ actual.push_back(it->getDocId());
+ }
+ EXPECT_EQUAL(expected.size(), actual.size());
+ for (size_t i(0); i < expected.size(); i++) {
+ EXPECT_EQUAL(expected[i], actual[i]);
+ }
+}
+
+TEST_F("require that blueprint can create search", Fixture) {
+ PredicateTreeAnnotations annotations(1);
+ annotations.interval_map[PredicateHash::hash64("key=value")] = std::vector<Interval>{{interval}};
+ for (size_t i(0); i < 9; i++) {
+ f.indexDocument(doc_id + i, annotations);
+ }
+ runQuery(f, {2,3,4,5,6,7,8,9,10}, 0, 1);
+ f.indexDocument(doc_id+9, annotations);
+ runQuery(f, {2, 3,4,5,6,7,8,9,10,11}, 0, 1);
+ f.index().requireCachePopulation();
+ f.indexDocument(doc_id+10, annotations);
+ runQuery(f, {2,3,4,5,6,7,8,9,10,11,12}, 1, 1);
}
TEST_F("require that blueprint can create more advanced search", Fixture) {
diff --git a/searchlib/src/vespa/searchlib/attribute/predicate_attribute.cpp b/searchlib/src/vespa/searchlib/attribute/predicate_attribute.cpp
index c4a0e036a01..555117126a9 100644
--- a/searchlib/src/vespa/searchlib/attribute/predicate_attribute.cpp
+++ b/searchlib/src/vespa/searchlib/attribute/predicate_attribute.cpp
@@ -100,8 +100,8 @@ PredicateAttribute::getValueCount(DocId) const
void
PredicateAttribute::onCommit()
{
- populateIfNeeded();
_index->commit();
+ populateIfNeeded();
incGeneration();
}
diff --git a/searchlib/src/vespa/searchlib/common/bitvectorcache.h b/searchlib/src/vespa/searchlib/common/bitvectorcache.h
index f81fd0163d8..a642d66f42f 100644
--- a/searchlib/src/vespa/searchlib/common/bitvectorcache.h
+++ b/searchlib/src/vespa/searchlib/common/bitvectorcache.h
@@ -41,6 +41,7 @@ public:
void adjustDocIdLimit(uint32_t docId);
void populate(uint32_t count, const PopulateInterface &);
bool needPopulation() const { return _needPopulation; }
+ void requirePopulation() { _needPopulation = true; }
private:
class KeyMeta {
public:
diff --git a/searchlib/src/vespa/searchlib/predicate/predicate_index.h b/searchlib/src/vespa/searchlib/predicate/predicate_index.h
index f4c89a2b369..49bf77f2fcc 100644
--- a/searchlib/src/vespa/searchlib/predicate/predicate_index.h
+++ b/searchlib/src/vespa/searchlib/predicate/predicate_index.h
@@ -54,8 +54,6 @@ private:
template <typename IntervalT>
void indexDocumentFeatures(uint32_t doc_id, const FeatureMap<IntervalT> &interval_map);
- PopulateInterface::Iterator::UP lookup(uint64_t key) const override;
-
public:
PredicateIndex(GenerationHolder &genHolder,
const DocIdLimitProvider &limit_provider,
@@ -105,6 +103,9 @@ public:
* Adjust size of structures to have space for docId.
*/
void adjustDocIdLimit(uint32_t docId);
+ PopulateInterface::Iterator::UP lookup(uint64_t key) const override;
+ // Exposed for testing
+ void requireCachePopulation() const { _cache.requirePopulation(); }
};
extern template class SimpleIndex<vespalib::datastore::EntryRef>;
diff --git a/searchlib/src/vespa/searchlib/predicate/simple_index.h b/searchlib/src/vespa/searchlib/predicate/simple_index.h
index 1398bb0817c..75dc540f787 100644
--- a/searchlib/src/vespa/searchlib/predicate/simple_index.h
+++ b/searchlib/src/vespa/searchlib/predicate/simple_index.h
@@ -168,8 +168,6 @@ private:
}
public:
- SimpleIndex(GenerationHolder &generation_holder, const DocIdLimitProvider &provider) :
- SimpleIndex(generation_holder, provider, SimpleIndexConfig()) {}
SimpleIndex(GenerationHolder &generation_holder,
const DocIdLimitProvider &provider, const SimpleIndexConfig &config)
: _generation_holder(generation_holder), _config(config), _limit_provider(provider) {}
diff --git a/searchlib/src/vespa/searchlib/queryeval/predicate_blueprint.h b/searchlib/src/vespa/searchlib/queryeval/predicate_blueprint.h
index 9609cd4f6c9..ef225e86c50 100644
--- a/searchlib/src/vespa/searchlib/queryeval/predicate_blueprint.h
+++ b/searchlib/src/vespa/searchlib/queryeval/predicate_blueprint.h
@@ -50,8 +50,11 @@ public:
void fetchPostings(const ExecuteInfo &execInfo) override;
SearchIterator::UP
- createLeafSearch(const fef::TermFieldMatchDataArray &tfmda,
- bool strict) const override;
+ createLeafSearch(const fef::TermFieldMatchDataArray &tfmda, bool strict) const override;
+
+ // Exposed for testing
+ const BitVectorCache::CountVector & getKV() const { return _kV; }
+ const BitVectorCache::KeySet & getCachedFeatures() const { return _cachedFeatures; }
private:
using BTreeIterator = predicate::SimpleIndex<vespalib::datastore::EntryRef>::BTreeIterator;
using VectorIterator = predicate::SimpleIndex<vespalib::datastore::EntryRef>::VectorIterator;
@@ -70,24 +73,24 @@ private:
void addZeroConstraintToK();
std::vector<predicate::PredicatePostingList::UP> createPostingLists() const;
- const PredicateAttribute & _attribute;
+ const PredicateAttribute & _attribute;
const predicate::PredicateIndex &_index;
- Alloc _kVBacking;
- BitVectorCache::CountVector _kV;
- BitVectorCache::KeySet _cachedFeatures;
+ Alloc _kVBacking;
+ BitVectorCache::CountVector _kV;
+ BitVectorCache::KeySet _cachedFeatures;
- std::vector<IntervalEntry> _interval_dict_entries;
- std::vector<BoundsEntry> _bounds_dict_entries;
- vespalib::datastore::EntryRef _zstar_dict_entry;
+ std::vector<IntervalEntry> _interval_dict_entries;
+ std::vector<BoundsEntry> _bounds_dict_entries;
+ vespalib::datastore::EntryRef _zstar_dict_entry;
- std::vector<IntervalIteratorEntry<BTreeIterator>> _interval_btree_iterators;
+ std::vector<IntervalIteratorEntry<BTreeIterator>> _interval_btree_iterators;
std::vector<IntervalIteratorEntry<VectorIterator>> _interval_vector_iterators;
- std::vector<BoundsIteratorEntry<BTreeIterator>> _bounds_btree_iterators;
- std::vector<BoundsIteratorEntry<VectorIterator>> _bounds_vector_iterators;
+ std::vector<BoundsIteratorEntry<BTreeIterator>> _bounds_btree_iterators;
+ std::vector<BoundsIteratorEntry<VectorIterator>> _bounds_vector_iterators;
// The zstar iterator is either a vector or a btree iterator.
- optional<BTreeIterator> _zstar_btree_iterator;
+ optional<BTreeIterator> _zstar_btree_iterator;
optional<VectorIterator> _zstar_vector_iterator;
- bool _fetch_postings_done;
+ bool _fetch_postings_done;
};
}
diff --git a/storage/src/tests/distributor/distributor_host_info_reporter_test.cpp b/storage/src/tests/distributor/distributor_host_info_reporter_test.cpp
index e1010285dba..934ecc7456b 100644
--- a/storage/src/tests/distributor/distributor_host_info_reporter_test.cpp
+++ b/storage/src/tests/distributor/distributor_host_info_reporter_test.cpp
@@ -11,10 +11,12 @@
namespace storage::distributor {
-using PerNodeBucketSpacesStats = BucketSpacesStatsProvider::PerNodeBucketSpacesStats;
using End = vespalib::JsonStream::End;
using File = vespalib::File;
+using MinReplicaStats = std::unordered_map<uint16_t, uint32_t>;
using Object = vespalib::JsonStream::Object;
+using PerNodeBucketSpacesStats = BucketSpacesStatsProvider::PerNodeBucketSpacesStats;
+using BucketSpacesStats = BucketSpacesStatsProvider::BucketSpacesStats;
using namespace ::testing;
struct DistributorHostInfoReporterTest : Test {
@@ -35,7 +37,7 @@ namespace {
// My kingdom for GoogleMock!
struct MockedMinReplicaProvider : MinReplicaProvider
{
- std::unordered_map<uint16_t, uint32_t> minReplica;
+ MinReplicaStats minReplica;
std::unordered_map<uint16_t, uint32_t> getMinReplica() const override {
return minReplica;
@@ -121,7 +123,7 @@ struct Fixture {
TEST_F(DistributorHostInfoReporterTest, min_replica_stats_are_reported) {
Fixture f;
- std::unordered_map<uint16_t, uint32_t> minReplica;
+ MinReplicaStats minReplica;
minReplica[0] = 2;
minReplica[5] = 9;
f.minReplicaProvider.minReplica = minReplica;
@@ -133,10 +135,30 @@ TEST_F(DistributorHostInfoReporterTest, min_replica_stats_are_reported) {
EXPECT_EQ(9, getMinReplica(root, 5));
}
+TEST_F(DistributorHostInfoReporterTest, merge_min_replica_stats) {
+
+ MinReplicaStats min_replica_a;
+ min_replica_a[3] = 2;
+ min_replica_a[5] = 4;
+
+ MinReplicaStats min_replica_b;
+ min_replica_b[5] = 6;
+ min_replica_b[7] = 8;
+
+ MinReplicaStats result;
+ merge_min_replica_stats(result, min_replica_a);
+ merge_min_replica_stats(result, min_replica_b);
+
+ EXPECT_EQ(3, result.size());
+ EXPECT_EQ(2, result[3]);
+ EXPECT_EQ(4, result[5]);
+ EXPECT_EQ(8, result[7]);
+}
+
TEST_F(DistributorHostInfoReporterTest, generate_example_json) {
Fixture f;
- std::unordered_map<uint16_t, uint32_t> minReplica;
+ MinReplicaStats minReplica;
minReplica[0] = 2;
minReplica[5] = 9;
f.minReplicaProvider.minReplica = minReplica;
@@ -175,7 +197,7 @@ TEST_F(DistributorHostInfoReporterTest, no_report_generated_if_disabled) {
Fixture f;
f.reporter.enableReporting(false);
- std::unordered_map<uint16_t, uint32_t> minReplica;
+ MinReplicaStats minReplica;
minReplica[0] = 2;
minReplica[5] = 9;
f.minReplicaProvider.minReplica = minReplica;
@@ -210,5 +232,41 @@ TEST_F(DistributorHostInfoReporterTest, bucket_spaces_stats_are_reported) {
}
}
+TEST_F(DistributorHostInfoReporterTest, merge_per_node_bucket_spaces_stats) {
+
+ PerNodeBucketSpacesStats stats_a;
+ stats_a[3]["default"] = BucketSpaceStats(3, 2);
+ stats_a[3]["global"] = BucketSpaceStats(5, 4);
+ stats_a[5]["default"] = BucketSpaceStats(7, 6);
+ stats_a[5]["global"] = BucketSpaceStats(9, 8);
+
+ PerNodeBucketSpacesStats stats_b;
+ stats_b[5]["default"] = BucketSpaceStats(11, 10);
+ stats_b[5]["global"] = BucketSpaceStats(13, 12);
+ stats_b[7]["default"] = BucketSpaceStats(15, 14);
+
+ PerNodeBucketSpacesStats result;
+ merge_per_node_bucket_spaces_stats(result, stats_a);
+ merge_per_node_bucket_spaces_stats(result, stats_b);
+
+ PerNodeBucketSpacesStats exp;
+ exp[3]["default"] = BucketSpaceStats(3, 2);
+ exp[3]["global"] = BucketSpaceStats(5, 4);
+ exp[5]["default"] = BucketSpaceStats(7+11, 6+10);
+ exp[5]["global"] = BucketSpaceStats(9+13, 8+12);
+ exp[7]["default"] = BucketSpaceStats(15, 14);
+
+ EXPECT_EQ(exp, result);
}
+TEST_F(DistributorHostInfoReporterTest, merge_bucket_space_stats_maintains_valid_flag) {
+ BucketSpaceStats stats_a(5, 3);
+ BucketSpaceStats stats_b;
+
+ stats_a.merge(stats_b);
+ EXPECT_FALSE(stats_a.valid());
+ EXPECT_EQ(5, stats_a.bucketsTotal());
+ EXPECT_EQ(3, stats_a.bucketsPending());
+}
+
+}
diff --git a/storage/src/tests/distributor/simplemaintenancescannertest.cpp b/storage/src/tests/distributor/simplemaintenancescannertest.cpp
index 58dc2430041..1bf3809b135 100644
--- a/storage/src/tests/distributor/simplemaintenancescannertest.cpp
+++ b/storage/src/tests/distributor/simplemaintenancescannertest.cpp
@@ -1,6 +1,7 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <tests/distributor/maintenancemocks.h>
+#include <vespa/document/bucket/fixed_bucket_spaces.h>
#include <vespa/document/test/make_bucket_space.h>
#include <vespa/storage/distributor/distributor_bucket_space.h>
#include <vespa/storage/distributor/distributor_bucket_space_repo.h>
@@ -209,4 +210,88 @@ TEST_F(SimpleMaintenanceScannerTest, per_node_maintenance_stats_are_tracked) {
}
}
+TEST_F(SimpleMaintenanceScannerTest, merge_node_maintenance_stats) {
+
+ NodeMaintenanceStats stats_a;
+ stats_a.movingOut = 1;
+ stats_a.syncing = 2;
+ stats_a.copyingIn = 3;
+ stats_a.copyingOut = 4;
+ stats_a.total = 5;
+
+ NodeMaintenanceStats stats_b;
+ stats_b.movingOut = 10;
+ stats_b.syncing = 20;
+ stats_b.copyingIn = 30;
+ stats_b.copyingOut = 40;
+ stats_b.total = 50;
+
+ NodeMaintenanceStats result;
+ result.merge(stats_a);
+ result.merge(stats_b);
+
+ NodeMaintenanceStats exp;
+ exp.movingOut = 11;
+ exp.syncing = 22;
+ exp.copyingIn = 33;
+ exp.copyingOut = 44;
+ exp.total = 55;
+ EXPECT_EQ(exp, result);
+}
+
+TEST_F(SimpleMaintenanceScannerTest, merge_pending_maintenance_stats) {
+ auto default_space = document::FixedBucketSpaces::default_space();
+ auto global_space = document::FixedBucketSpaces::global_space();
+
+ PendingStats stats_a;
+ stats_a.global.pending[MaintenanceOperation::DELETE_BUCKET] = 1;
+ stats_a.global.pending[MaintenanceOperation::MERGE_BUCKET] = 2;
+ stats_a.global.pending[MaintenanceOperation::SPLIT_BUCKET] = 3;
+ stats_a.global.pending[MaintenanceOperation::JOIN_BUCKET] = 4;
+ stats_a.global.pending[MaintenanceOperation::SET_BUCKET_STATE] = 5;
+ stats_a.global.pending[MaintenanceOperation::GARBAGE_COLLECTION] = 6;
+ stats_a.perNodeStats.incMovingOut(3, default_space);
+ stats_a.perNodeStats.incSyncing(3, global_space);
+ stats_a.perNodeStats.incCopyingIn(5, default_space);
+ stats_a.perNodeStats.incCopyingOut(5, global_space);
+ stats_a.perNodeStats.incTotal(5, default_space);
+
+ PendingStats stats_b;
+ stats_b.global.pending[MaintenanceOperation::DELETE_BUCKET] = 10;
+ stats_b.global.pending[MaintenanceOperation::MERGE_BUCKET] = 20;
+ stats_b.global.pending[MaintenanceOperation::SPLIT_BUCKET] = 30;
+ stats_b.global.pending[MaintenanceOperation::JOIN_BUCKET] = 40;
+ stats_b.global.pending[MaintenanceOperation::SET_BUCKET_STATE] = 50;
+ stats_b.global.pending[MaintenanceOperation::GARBAGE_COLLECTION] = 60;
+ stats_b.perNodeStats.incMovingOut(7, default_space);
+ stats_b.perNodeStats.incSyncing(7, global_space);
+ stats_b.perNodeStats.incCopyingIn(5, default_space);
+ stats_b.perNodeStats.incCopyingOut(5, global_space);
+ stats_b.perNodeStats.incTotal(5, default_space);
+
+ PendingStats result;
+ result.merge(stats_a);
+ result.merge(stats_b);
+
+ PendingStats exp;
+ exp.global.pending[MaintenanceOperation::DELETE_BUCKET] = 11;
+ exp.global.pending[MaintenanceOperation::MERGE_BUCKET] = 22;
+ exp.global.pending[MaintenanceOperation::SPLIT_BUCKET] = 33;
+ exp.global.pending[MaintenanceOperation::JOIN_BUCKET] = 44;
+ exp.global.pending[MaintenanceOperation::SET_BUCKET_STATE] = 55;
+ exp.global.pending[MaintenanceOperation::GARBAGE_COLLECTION] = 66;
+ exp.perNodeStats.incMovingOut(3, default_space);
+ exp.perNodeStats.incSyncing(3, global_space);
+ exp.perNodeStats.incCopyingIn(5, default_space);
+ exp.perNodeStats.incCopyingIn(5, default_space);
+ exp.perNodeStats.incCopyingOut(5, global_space);
+ exp.perNodeStats.incCopyingOut(5, global_space);
+ exp.perNodeStats.incTotal(5, default_space);
+ exp.perNodeStats.incTotal(5, default_space);
+ exp.perNodeStats.incMovingOut(7, default_space);
+ exp.perNodeStats.incSyncing(7, global_space);
+ EXPECT_EQ(exp.global, result.global);
+ EXPECT_EQ(exp.perNodeStats, result.perNodeStats);
+}
+
}
diff --git a/storage/src/vespa/storage/distributor/CMakeLists.txt b/storage/src/vespa/storage/distributor/CMakeLists.txt
index 7b048e9f109..231361d72d6 100644
--- a/storage/src/vespa/storage/distributor/CMakeLists.txt
+++ b/storage/src/vespa/storage/distributor/CMakeLists.txt
@@ -7,6 +7,7 @@ vespa_add_library(storage_distributor
bucket_space_distribution_configs.cpp
bucket_space_distribution_context.cpp
bucket_space_state_map.cpp
+ bucket_spaces_stats_provider.cpp
bucketdbupdater.cpp
bucketgctimecalculator.cpp
bucketlistmerger.cpp
@@ -22,6 +23,7 @@ vespa_add_library(storage_distributor
distributor_stripe_component.cpp
distributor_stripe_pool.cpp
distributor_stripe_thread.cpp
+ distributor_total_metrics.cpp
distributormessagesender.cpp
distributormetricsset.cpp
externaloperationhandler.cpp
@@ -29,6 +31,7 @@ vespa_add_library(storage_distributor
idealstatemanager.cpp
idealstatemetricsset.cpp
messagetracker.cpp
+ min_replica_provider.cpp
multi_threaded_stripe_access_guard.cpp
nodeinfo.cpp
operation_routing_snapshot.cpp
diff --git a/storage/src/vespa/storage/distributor/bucket_spaces_stats_provider.cpp b/storage/src/vespa/storage/distributor/bucket_spaces_stats_provider.cpp
new file mode 100644
index 00000000000..2b12d437aaa
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/bucket_spaces_stats_provider.cpp
@@ -0,0 +1,40 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "bucket_spaces_stats_provider.h"
+
+namespace storage::distributor {
+
+std::ostream&
+operator<<(std::ostream& out, const BucketSpaceStats& stats)
+{
+ out << "{valid=" << stats.valid() << ", bucketsTotal=" << stats.bucketsTotal() << ", bucketsPending=" << stats.bucketsPending() << "}";
+ return out;
+}
+
+void
+merge_bucket_spaces_stats(BucketSpacesStatsProvider::BucketSpacesStats& dest,
+ const BucketSpacesStatsProvider::BucketSpacesStats& src)
+{
+ for (const auto& entry : src) {
+ const auto& bucket_space_name = entry.first;
+ auto itr = dest.find(bucket_space_name);
+ if (itr != dest.end()) {
+ itr->second.merge(entry.second);
+ } else {
+ // We need to explicitly handle this case to avoid creating an empty BucketSpaceStats that is not valid.
+ dest[bucket_space_name] = entry.second;
+ }
+ }
+}
+
+void
+merge_per_node_bucket_spaces_stats(BucketSpacesStatsProvider::PerNodeBucketSpacesStats& dest,
+ const BucketSpacesStatsProvider::PerNodeBucketSpacesStats& src)
+{
+ for (const auto& entry : src) {
+ auto node_index = entry.first;
+ merge_bucket_spaces_stats(dest[node_index], entry.second);
+ }
+}
+
+}
diff --git a/storage/src/vespa/storage/distributor/bucket_spaces_stats_provider.h b/storage/src/vespa/storage/distributor/bucket_spaces_stats_provider.h
index 3d7b60f4471..c8ba04ed1ab 100644
--- a/storage/src/vespa/storage/distributor/bucket_spaces_stats_provider.h
+++ b/storage/src/vespa/storage/distributor/bucket_spaces_stats_provider.h
@@ -3,6 +3,7 @@
#include <vespa/vespalib/stllike/string.h>
#include <map>
+#include <ostream>
#include <unordered_map>
namespace storage::distributor {
@@ -32,8 +33,22 @@ public:
bool valid() const noexcept { return _valid; }
size_t bucketsTotal() const noexcept { return _bucketsTotal; }
size_t bucketsPending() const noexcept { return _bucketsPending; }
+
+ bool operator==(const BucketSpaceStats& rhs) const {
+ return (_valid == rhs._valid) &&
+ (_bucketsTotal == rhs._bucketsTotal) &&
+ (_bucketsPending == rhs._bucketsPending);
+ }
+
+ void merge(const BucketSpaceStats& rhs) {
+ _valid = _valid && rhs._valid;
+ _bucketsTotal += rhs._bucketsTotal;
+ _bucketsPending += rhs._bucketsPending;
+ }
};
+std::ostream& operator<<(std::ostream& out, const BucketSpaceStats& stats);
+
/**
* Interface that provides snapshots of bucket spaces statistics per content node.
*/
@@ -48,4 +63,10 @@ public:
virtual PerNodeBucketSpacesStats getBucketSpacesStats() const = 0;
};
+void merge_bucket_spaces_stats(BucketSpacesStatsProvider::BucketSpacesStats& dest,
+ const BucketSpacesStatsProvider::BucketSpacesStats& src);
+
+void merge_per_node_bucket_spaces_stats(BucketSpacesStatsProvider::PerNodeBucketSpacesStats& dest,
+ const BucketSpacesStatsProvider::PerNodeBucketSpacesStats& src);
+
}
diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp
index 7ac15b3929c..be9ad1179fb 100644
--- a/storage/src/vespa/storage/distributor/distributor.cpp
+++ b/storage/src/vespa/storage/distributor/distributor.cpp
@@ -9,7 +9,7 @@
#include "distributor_stripe.h"
#include "distributor_stripe_pool.h"
#include "distributor_stripe_thread.h"
-#include "distributormetricsset.h"
+#include "distributor_total_metrics.h"
#include "idealstatemetricsset.h"
#include "multi_threaded_stripe_access_guard.h"
#include "operation_sequencer.h"
@@ -59,11 +59,12 @@ Distributor::Distributor(DistributorComponentRegister& compReg,
: StorageLink("distributor"),
framework::StatusReporter("distributor", "Distributor"),
_comp_reg(compReg),
+ _use_legacy_mode(num_distributor_stripes == 0),
_metrics(std::make_shared<DistributorMetricSet>()),
+ _total_metrics(_use_legacy_mode ? std::shared_ptr<DistributorTotalMetrics>() : std::make_shared<DistributorTotalMetrics>(num_distributor_stripes)),
_messageSender(messageSender),
- _use_legacy_mode(num_distributor_stripes == 0),
_n_stripe_bits(0),
- _stripe(std::make_unique<DistributorStripe>(compReg, *_metrics, node_identity, threadPool,
+ _stripe(std::make_unique<DistributorStripe>(compReg, _use_legacy_mode ? *_metrics : _total_metrics->stripe(0), node_identity, threadPool,
doneInitHandler, *this, *this, _use_legacy_mode)),
_stripe_pool(stripe_pool),
_stripes(),
@@ -91,7 +92,7 @@ Distributor::Distributor(DistributorComponentRegister& compReg,
_next_distribution(),
_current_internal_config_generation(_component.internal_config_generation())
{
- _component.registerMetric(*_metrics);
+ _component.registerMetric(_use_legacy_mode ? *_metrics : *_total_metrics);
_component.registerMetricUpdateHook(_metricUpdateHook, framework::SecondTime(0));
if (!_use_legacy_mode) {
assert(num_distributor_stripes == adjusted_num_stripes(num_distributor_stripes));
@@ -105,7 +106,7 @@ Distributor::Distributor(DistributorComponentRegister& compReg,
*_stripe_accessor);
_stripes.emplace_back(std::move(_stripe));
for (size_t i = 1; i < num_distributor_stripes; ++i) {
- _stripes.emplace_back(std::make_unique<DistributorStripe>(compReg, *_metrics, node_identity, threadPool,
+ _stripes.emplace_back(std::make_unique<DistributorStripe>(compReg, _total_metrics->stripe(i), node_identity, threadPool,
doneInitHandler, *this, *this, _use_legacy_mode, i));
}
_stripe_scan_stats.resize(num_distributor_stripes);
@@ -124,6 +125,12 @@ Distributor::~Distributor()
closeNextLink();
}
+DistributorMetricSet&
+Distributor::getMetrics()
+{
+ return _use_legacy_mode ? *_metrics : _total_metrics->bucket_db_updater_metrics();
+}
+
// TODO STRIPE remove
DistributorStripe&
Distributor::first_stripe() noexcept {
@@ -322,6 +329,7 @@ namespace {
bool should_be_handled_by_top_level_bucket_db_updater(const api::StorageMessage& msg) noexcept {
switch (msg.getType().getId()) {
case api::MessageType::SETSYSTEMSTATE_ID:
+ case api::MessageType::GETNODESTATE_ID:
case api::MessageType::ACTIVATE_CLUSTER_STATE_VERSION_ID:
return true;
case api::MessageType::REQUESTBUCKETINFO_REPLY_ID:
@@ -521,44 +529,54 @@ Distributor::propagateDefaultDistribution(
std::unordered_map<uint16_t, uint32_t>
Distributor::getMinReplica() const
{
- // TODO STRIPE merged snapshot from all stripes
if (_use_legacy_mode) {
return _stripe->getMinReplica();
} else {
- return first_stripe().getMinReplica();
+ std::unordered_map<uint16_t, uint32_t> result;
+ for (const auto& stripe : _stripes) {
+ merge_min_replica_stats(result, stripe->getMinReplica());
+ }
+ return result;
}
}
BucketSpacesStatsProvider::PerNodeBucketSpacesStats
Distributor::getBucketSpacesStats() const
{
- // TODO STRIPE merged snapshot from all stripes
if (_use_legacy_mode) {
return _stripe->getBucketSpacesStats();
} else {
- return first_stripe().getBucketSpacesStats();
+ BucketSpacesStatsProvider::PerNodeBucketSpacesStats result;
+ for (const auto& stripe : _stripes) {
+ merge_per_node_bucket_spaces_stats(result, stripe->getBucketSpacesStats());
+ }
+ return result;
}
}
SimpleMaintenanceScanner::PendingMaintenanceStats
Distributor::pending_maintenance_stats() const {
- // TODO STRIPE merged snapshot from all stripes
if (_use_legacy_mode) {
return _stripe->pending_maintenance_stats();
} else {
- return first_stripe().pending_maintenance_stats();
+ SimpleMaintenanceScanner::PendingMaintenanceStats result;
+ for (const auto& stripe : _stripes) {
+ result.merge(stripe->pending_maintenance_stats());
+ }
+ return result;
}
}
void
Distributor::propagateInternalScanMetricsToExternal()
{
- // TODO STRIPE propagate to all stripes
- // TODO STRIPE reconsider metric wiring...
if (_use_legacy_mode) {
_stripe->propagateInternalScanMetricsToExternal();
} else {
- first_stripe().propagateInternalScanMetricsToExternal();
+ for (auto &stripe : _stripes) {
+ stripe->propagateInternalScanMetricsToExternal();
+ }
+ _total_metrics->aggregate();
}
}
diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h
index 6f9654b78b8..48ad061859f 100644
--- a/storage/src/vespa/storage/distributor/distributor.h
+++ b/storage/src/vespa/storage/distributor/distributor.h
@@ -44,6 +44,7 @@ class DistributorBucketSpaceRepo;
class DistributorStatus;
class DistributorStripe;
class DistributorStripePool;
+class DistributorTotalMetrics;
class StripeAccessor;
class OperationSequencer;
class OwnershipTransferSafeTimePointCalculator;
@@ -78,7 +79,7 @@ public:
void sendUp(const std::shared_ptr<api::StorageMessage>&) override;
void sendDown(const std::shared_ptr<api::StorageMessage>&) override;
- DistributorMetricSet& getMetrics() { return *_metrics; }
+ DistributorMetricSet& getMetrics();
// Implements DistributorInterface and DistributorMessageSender.
DistributorMetricSet& metrics() override { return getMetrics(); }
@@ -201,9 +202,10 @@ private:
using MessageQueue = std::vector<std::shared_ptr<api::StorageMessage>>;
DistributorComponentRegister& _comp_reg;
+ const bool _use_legacy_mode;
std::shared_ptr<DistributorMetricSet> _metrics;
+ std::shared_ptr<DistributorTotalMetrics> _total_metrics;
ChainedMessageSender* _messageSender;
- const bool _use_legacy_mode;
// TODO STRIPE multiple stripes...! This is for proof of concept of wiring.
uint8_t _n_stripe_bits;
std::unique_ptr<DistributorStripe> _stripe;
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.cpp b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
index 4f6e2d5016b..c94c1a415b0 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe.cpp
+++ b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
@@ -748,7 +748,7 @@ void DistributorStripe::send_updated_host_info_if_required() {
if (_use_legacy_mode) {
_component.getStateUpdater().immediately_send_get_node_state_replies();
} else {
- _stripe_host_info_notifier.notify_stripe_wants_to_send_host_info(0); // TODO STRIPE correct stripe index!
+ _stripe_host_info_notifier.notify_stripe_wants_to_send_host_info(_stripe_index);
}
_must_send_updated_host_info = false;
}
diff --git a/storage/src/vespa/storage/distributor/distributor_total_metrics.cpp b/storage/src/vespa/storage/distributor/distributor_total_metrics.cpp
new file mode 100644
index 00000000000..543712cc4d2
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/distributor_total_metrics.cpp
@@ -0,0 +1,40 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "distributor_total_metrics.h"
+
+namespace storage::distributor {
+
+DistributorTotalMetrics::DistributorTotalMetrics(uint32_t num_distributor_stripes)
+ : DistributorMetricSet(),
+ _stripes_metrics(),
+ _bucket_db_updater_metrics()
+{
+ _stripes_metrics.reserve(num_distributor_stripes);
+ for (uint32_t i = 0; i < num_distributor_stripes; ++i) {
+ _stripes_metrics.emplace_back(std::make_shared<DistributorMetricSet>());
+ }
+}
+
+DistributorTotalMetrics::~DistributorTotalMetrics() = default;
+
+void
+DistributorTotalMetrics::aggregate()
+{
+ DistributorMetricSet::reset();
+ _bucket_db_updater_metrics.addToPart(*this);
+ for (auto &stripe_metrics : _stripes_metrics) {
+ stripe_metrics->addToPart(*this);
+ }
+}
+
+void
+DistributorTotalMetrics::reset()
+{
+ DistributorMetricSet::reset();
+ _bucket_db_updater_metrics.reset();
+ for (auto &stripe_metrics : _stripes_metrics) {
+ stripe_metrics->reset();
+ }
+}
+
+}
diff --git a/storage/src/vespa/storage/distributor/distributor_total_metrics.h b/storage/src/vespa/storage/distributor/distributor_total_metrics.h
new file mode 100644
index 00000000000..14116af3d3b
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/distributor_total_metrics.h
@@ -0,0 +1,27 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "distributormetricsset.h"
+
+namespace storage::distributor {
+
+/*
+ * Class presenting total metrics (as a DistributorMetricSet) to the
+ * metric framework, while managing a DistributorMetricSet for each
+ * stripe and an extra one for the top level bucket db updater.
+ */
+class DistributorTotalMetrics : public DistributorMetricSet
+{
+ std::vector<std::shared_ptr<DistributorMetricSet>> _stripes_metrics;
+ DistributorMetricSet _bucket_db_updater_metrics;
+public:
+ explicit DistributorTotalMetrics(uint32_t num_distributor_stripes);
+ ~DistributorTotalMetrics() override;
+ void aggregate();
+ void reset() override;
+ DistributorMetricSet& stripe(uint32_t stripe_index) { return *_stripes_metrics[stripe_index]; }
+ DistributorMetricSet& bucket_db_updater_metrics() { return _bucket_db_updater_metrics; }
+};
+
+}
diff --git a/storage/src/vespa/storage/distributor/maintenance/node_maintenance_stats_tracker.cpp b/storage/src/vespa/storage/distributor/maintenance/node_maintenance_stats_tracker.cpp
index b954ef93c76..4e7f7d9d89d 100644
--- a/storage/src/vespa/storage/distributor/maintenance/node_maintenance_stats_tracker.cpp
+++ b/storage/src/vespa/storage/distributor/maintenance/node_maintenance_stats_tracker.cpp
@@ -7,6 +7,39 @@ namespace storage::distributor {
const NodeMaintenanceStats NodeMaintenanceStatsTracker::_emptyNodeMaintenanceStats;
+void
+NodeMaintenanceStats::merge(const NodeMaintenanceStats& rhs)
+{
+ movingOut += rhs.movingOut;
+ syncing += rhs.syncing;
+ copyingIn += rhs.copyingIn;
+ copyingOut += rhs.copyingOut;
+ total += rhs.total;
+}
+
+namespace {
+
+void
+merge_bucket_spaces_stats(NodeMaintenanceStatsTracker::BucketSpacesStats& dest,
+ const NodeMaintenanceStatsTracker::BucketSpacesStats& src)
+{
+ for (const auto& entry : src) {
+ auto bucket_space = entry.first;
+ dest[bucket_space].merge(entry.second);
+ }
+}
+
+}
+
+void
+NodeMaintenanceStatsTracker::merge(const NodeMaintenanceStatsTracker& rhs)
+{
+ for (const auto& entry : rhs._stats) {
+ auto node_index = entry.first;
+ merge_bucket_spaces_stats(_stats[node_index], entry.second);
+ }
+}
+
std::ostream&
operator<<(std::ostream& os, const NodeMaintenanceStats& stats)
{
diff --git a/storage/src/vespa/storage/distributor/maintenance/node_maintenance_stats_tracker.h b/storage/src/vespa/storage/distributor/maintenance/node_maintenance_stats_tracker.h
index faf253fc84c..6399e53089b 100644
--- a/storage/src/vespa/storage/distributor/maintenance/node_maintenance_stats_tracker.h
+++ b/storage/src/vespa/storage/distributor/maintenance/node_maintenance_stats_tracker.h
@@ -37,6 +37,8 @@ struct NodeMaintenanceStats
bool operator!=(const NodeMaintenanceStats& other) const noexcept {
return !(*this == other);
}
+
+ void merge(const NodeMaintenanceStats& rhs);
};
std::ostream& operator<<(std::ostream&, const NodeMaintenanceStats&);
@@ -93,6 +95,11 @@ public:
const PerNodeStats& perNodeStats() const {
return _stats;
}
+
+ bool operator==(const NodeMaintenanceStatsTracker& rhs) const {
+ return _stats == rhs._stats;
+ }
+ void merge(const NodeMaintenanceStatsTracker& rhs);
};
} // storage::distributor
diff --git a/storage/src/vespa/storage/distributor/maintenance/simplemaintenancescanner.cpp b/storage/src/vespa/storage/distributor/maintenance/simplemaintenancescanner.cpp
index 15a57c1e7ee..2bfce9569cc 100644
--- a/storage/src/vespa/storage/distributor/maintenance/simplemaintenancescanner.cpp
+++ b/storage/src/vespa/storage/distributor/maintenance/simplemaintenancescanner.cpp
@@ -19,6 +19,28 @@ SimpleMaintenanceScanner::SimpleMaintenanceScanner(BucketPriorityDatabase& bucke
SimpleMaintenanceScanner::~SimpleMaintenanceScanner() = default;
+bool
+SimpleMaintenanceScanner::GlobalMaintenanceStats::operator==(const GlobalMaintenanceStats& rhs) const
+{
+ return pending == rhs.pending;
+}
+
+void
+SimpleMaintenanceScanner::GlobalMaintenanceStats::merge(const GlobalMaintenanceStats& rhs)
+{
+ assert(pending.size() == rhs.pending.size());
+ for (size_t i = 0; i < pending.size(); ++i) {
+ pending[i] += rhs.pending[i];
+ }
+}
+
+void
+SimpleMaintenanceScanner::PendingMaintenanceStats::merge(const PendingMaintenanceStats& rhs)
+{
+ global.merge(rhs.global);
+ perNodeStats.merge(rhs.perNodeStats);
+}
+
SimpleMaintenanceScanner::PendingMaintenanceStats::PendingMaintenanceStats() = default;
SimpleMaintenanceScanner::PendingMaintenanceStats::~PendingMaintenanceStats() = default;
SimpleMaintenanceScanner::PendingMaintenanceStats::PendingMaintenanceStats(const PendingMaintenanceStats &) = default;
diff --git a/storage/src/vespa/storage/distributor/maintenance/simplemaintenancescanner.h b/storage/src/vespa/storage/distributor/maintenance/simplemaintenancescanner.h
index 254b3244171..69e63fd4c65 100644
--- a/storage/src/vespa/storage/distributor/maintenance/simplemaintenancescanner.h
+++ b/storage/src/vespa/storage/distributor/maintenance/simplemaintenancescanner.h
@@ -18,6 +18,9 @@ public:
GlobalMaintenanceStats()
: pending(MaintenanceOperation::OPERATION_COUNT)
{ }
+
+ bool operator==(const GlobalMaintenanceStats& rhs) const;
+ void merge(const GlobalMaintenanceStats& rhs);
};
struct PendingMaintenanceStats {
PendingMaintenanceStats();
@@ -26,6 +29,8 @@ public:
~PendingMaintenanceStats();
GlobalMaintenanceStats global;
NodeMaintenanceStatsTracker perNodeStats;
+
+ void merge(const PendingMaintenanceStats& rhs);
};
private:
BucketPriorityDatabase& _bucketPriorityDb;
diff --git a/storage/src/vespa/storage/distributor/min_replica_provider.cpp b/storage/src/vespa/storage/distributor/min_replica_provider.cpp
new file mode 100644
index 00000000000..c9929940560
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/min_replica_provider.cpp
@@ -0,0 +1,19 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "min_replica_provider.h"
+
+namespace storage::distributor {
+
+void
+merge_min_replica_stats(std::unordered_map<uint16_t, uint32_t>& dest,
+ const std::unordered_map<uint16_t, uint32_t>& src)
+{
+ for (const auto& entry : src) {
+ auto node_index = entry.first;
+ auto itr = dest.find(node_index);
+ auto new_min_replica = (itr != dest.end()) ? std::min(itr->second, entry.second) : entry.second;
+ dest[node_index] = new_min_replica;
+ }
+}
+
+}
diff --git a/storage/src/vespa/storage/distributor/min_replica_provider.h b/storage/src/vespa/storage/distributor/min_replica_provider.h
index 6d644f4e9d4..ba946cd5a7f 100644
--- a/storage/src/vespa/storage/distributor/min_replica_provider.h
+++ b/storage/src/vespa/storage/distributor/min_replica_provider.h
@@ -4,8 +4,7 @@
#include <stdint.h>
#include <unordered_map>
-namespace storage {
-namespace distributor {
+namespace storage::distributor {
class MinReplicaProvider
{
@@ -21,5 +20,8 @@ public:
virtual std::unordered_map<uint16_t, uint32_t> getMinReplica() const = 0;
};
-} // distributor
-} // storage
+void merge_min_replica_stats(std::unordered_map<uint16_t, uint32_t>& dest,
+ const std::unordered_map<uint16_t, uint32_t>& src);
+
+}
+
diff --git a/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/api/AthenzAccessToken.java b/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/api/AthenzAccessToken.java
index c085be7c205..561b20a9c8a 100644
--- a/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/api/AthenzAccessToken.java
+++ b/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/api/AthenzAccessToken.java
@@ -6,7 +6,10 @@ import com.auth0.jwt.interfaces.DecodedJWT;
import com.yahoo.vespa.athenz.utils.AthenzIdentities;
import java.time.Instant;
+import java.util.List;
import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
/**
* Represents an Athenz Access Token
@@ -18,6 +21,8 @@ public class AthenzAccessToken {
public static final String HTTP_HEADER_NAME = "Authorization";
private static final String BEARER_TOKEN_PREFIX = "Bearer ";
+ private static final String SCOPE_CLAIM = "scp";
+ private static final String AUDIENCE_CLAIM = "aud";
private final String value;
private volatile DecodedJWT jwt;
@@ -43,6 +48,12 @@ public class AthenzAccessToken {
return jwt().getExpiresAt().toInstant();
}
public AthenzIdentity getAthenzIdentity() { return AthenzIdentities.from(jwt().getClaim("client_id").asString()); }
+ public List<AthenzRole> roles() {
+ String domain = Optional.ofNullable(jwt().getClaim(AUDIENCE_CLAIM).asString()).orElse("");
+ return Optional.ofNullable(jwt().getClaim(SCOPE_CLAIM).asList(String.class)).orElse(List.of()).stream()
+ .map(role -> new AthenzRole(domain, role))
+ .collect(Collectors.toList());
+ }
private DecodedJWT jwt() {
if (jwt == null) {
diff --git a/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java b/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java
index 9d4f3525c32..83abe0bb872 100644
--- a/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java
+++ b/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java
@@ -84,7 +84,7 @@ public class CliClient {
builder.setHostnameVerifier(AcceptAllHostnameVerifier.INSTANCE);
}
cliArgs.certificateAndKey().ifPresent(c -> builder.setCertificate(c.certificateFile, c.privateKeyFile));
- cliArgs.caCertificates().ifPresent(builder::setCaCertificates);
+ cliArgs.caCertificates().ifPresent(builder::setCaCertificatesFile);
cliArgs.headers().forEach(builder::addRequestHeader);
return builder.build();
}
@@ -127,7 +127,7 @@ public class CliClient {
JsonFactory factory = new JsonFactory();
long okCount = stats.successes();
long errorCount = stats.requests() - okCount;
- double throughput = okCount * 1e6D / Math.max(1, durationNanos);
+ double throughput = okCount * 1e9 / Math.max(1, durationNanos);
try (JsonGenerator generator = factory.createGenerator(systemOut).useDefaultPrettyPrinter()) {
generator.writeStartObject();
generator.writeNumberField("feeder.runtime", durationNanos / 1_000_000);
diff --git a/vespa-feed-client/pom.xml b/vespa-feed-client/pom.xml
index 7759e9d2308..02d4a0128ea 100644
--- a/vespa-feed-client/pom.xml
+++ b/vespa-feed-client/pom.xml
@@ -25,6 +25,18 @@
<scope>compile</scope>
</dependency>
<dependency>
+ <groupId>org.eclipse.jetty.http2</groupId>
+ <artifactId>http2-http-client-transport</artifactId>
+ <version>${jetty.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-client</artifactId>
+ <version>${jetty.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.httpcomponents.client5</groupId>
<artifactId>httpclient5</artifactId>
<scope>compile</scope>
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java
index 672f5f080b5..e5d45a2f211 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java
@@ -34,7 +34,7 @@ class ApacheCluster implements Cluster {
private final List<Endpoint> endpoints = new ArrayList<>();
- public ApacheCluster(FeedClientBuilder builder) throws IOException {
+ ApacheCluster(FeedClientBuilder builder) throws IOException {
for (URI endpoint : builder.endpoints)
for (int i = 0; i < builder.connectionsPerEndpoint; i++)
endpoints.add(new Endpoint(createHttpClient(builder), endpoint));
@@ -127,7 +127,7 @@ class ApacheCluster implements Cluster {
.setInitialWindowSize(Integer.MAX_VALUE)
.build());
- SSLContext sslContext = constructSslContext(builder);
+ SSLContext sslContext = builder.constructSslContext();
String[] allowedCiphers = excludeH2Blacklisted(excludeWeak(sslContext.getSupportedSSLParameters().getCipherSuites()));
if (allowedCiphers.length == 0)
throw new IllegalStateException("No adequate SSL cipher suites supported by the JVM");
@@ -142,19 +142,6 @@ class ApacheCluster implements Cluster {
.build();
}
- private static SSLContext constructSslContext(FeedClientBuilder builder) throws IOException {
- if (builder.sslContext != null) return builder.sslContext;
- SslContextBuilder sslContextBuilder = new SslContextBuilder();
- if (builder.certificate != null && builder.privateKey != null) {
- sslContextBuilder.withCertificateAndKey(builder.certificate, builder.privateKey);
- }
- if (builder.caCertificates != null) {
- sslContextBuilder.withCaCertificates(builder.caCertificates);
- }
- return sslContextBuilder.build();
- }
-
-
private static class ApacheHttpResponse implements HttpResponse {
private final SimpleHttpResponse wrapped;
@@ -163,7 +150,6 @@ class ApacheCluster implements Cluster {
this.wrapped = wrapped;
}
-
@Override
public int code() {
return wrapped.getCode();
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java
index d0a221ed358..df1f3bcd54c 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java
@@ -7,8 +7,11 @@ import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URI;
import java.nio.file.Path;
+import java.security.PrivateKey;
+import java.security.cert.X509Certificate;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -35,9 +38,12 @@ public class FeedClientBuilder {
int maxStreamsPerConnection = 128;
FeedClient.RetryStrategy retryStrategy = defaultRetryStrategy;
FeedClient.CircuitBreaker circuitBreaker = new GracePeriodCircuitBreaker(Duration.ofSeconds(1), Duration.ofMinutes(10));
- Path certificate;
- Path privateKey;
- Path caCertificates;
+ Path certificateFile;
+ Path privateKeyFile;
+ Path caCertificatesFile;
+ Collection<X509Certificate> certificate;
+ PrivateKey privateKey;
+ Collection<X509Certificate> caCertificates;
public static FeedClientBuilder create(URI endpoint) { return new FeedClientBuilder(Collections.singletonList(endpoint)); }
@@ -81,9 +87,6 @@ public class FeedClientBuilder {
}
public FeedClientBuilder setSslContext(SSLContext context) {
- if (certificate != null || caCertificates != null || privateKey != null) {
- throw new IllegalArgumentException("Cannot set both SSLContext and certificate / CA certificates");
- }
this.sslContext = requireNonNull(context);
return this;
}
@@ -113,24 +116,77 @@ public class FeedClientBuilder {
}
public FeedClientBuilder setCertificate(Path certificatePemFile, Path privateKeyPemFile) {
- if (sslContext != null) throw new IllegalArgumentException("Cannot set both SSLContext and certificate");
- this.certificate = certificatePemFile;
- this.privateKey = privateKeyPemFile;
+ this.certificateFile = certificatePemFile;
+ this.privateKeyFile = privateKeyPemFile;
+ return this;
+ }
+
+ public FeedClientBuilder setCertificate(Collection<X509Certificate> certificate, PrivateKey privateKey) {
+ this.certificate = certificate;
+ this.privateKey = privateKey;
+ return this;
+ }
+
+ public FeedClientBuilder setCertificate(X509Certificate certificate, PrivateKey privateKey) {
+ return setCertificate(Collections.singletonList(certificate), privateKey);
+ }
+
+ public FeedClientBuilder setCaCertificatesFile(Path caCertificatesFile) {
+ this.caCertificatesFile = caCertificatesFile;
return this;
}
- public FeedClientBuilder setCaCertificates(Path caCertificatesFile) {
- if (sslContext != null) throw new IllegalArgumentException("Cannot set both SSLContext and CA certificate");
- this.caCertificates = caCertificatesFile;
+ public FeedClientBuilder setCaCertificates(Collection<X509Certificate> caCertificates) {
+ this.caCertificates = caCertificates;
return this;
}
public FeedClient build() {
try {
+ validateConfiguration();
return new HttpFeedClient(this);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
+ SSLContext constructSslContext() throws IOException {
+ if (sslContext != null) return sslContext;
+ SslContextBuilder sslContextBuilder = new SslContextBuilder();
+ if (certificateFile != null && privateKeyFile != null) {
+ sslContextBuilder.withCertificateAndKey(certificateFile, privateKeyFile);
+ } else if (certificate != null && privateKey != null) {
+ sslContextBuilder.withCertificateAndKey(certificate, privateKey);
+ }
+ if (caCertificatesFile != null) {
+ sslContextBuilder.withCaCertificates(caCertificatesFile);
+ } else if (caCertificates != null) {
+ sslContextBuilder.withCaCertificates(caCertificates);
+ }
+ return sslContextBuilder.build();
+ }
+
+ private void validateConfiguration() {
+ if (sslContext != null && (
+ certificateFile != null || caCertificatesFile != null || privateKeyFile != null ||
+ certificate != null || caCertificates != null || privateKey != null)) {
+ throw new IllegalArgumentException("Cannot set both SSLContext and certificate / CA certificates");
+ }
+ if (certificate != null && certificateFile != null) {
+ throw new IllegalArgumentException("Cannot set both certificate directly and as file");
+ }
+ if (privateKey != null && privateKeyFile != null) {
+ throw new IllegalArgumentException("Cannot set both private key directly and as file");
+ }
+ if (caCertificates != null && caCertificatesFile != null) {
+ throw new IllegalArgumentException("Cannot set both CA certificates directly and as file");
+ }
+ if (certificate != null && certificate.isEmpty()) {
+ throw new IllegalArgumentException("Certificate cannot be empty");
+ }
+ if (caCertificates != null && caCertificates.isEmpty()) {
+ throw new IllegalArgumentException("CA certificates cannot be empty");
+ }
+ }
+
}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java
index c572f84db54..256d3ae535c 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java
@@ -4,18 +4,14 @@ package ai.vespa.feed.client;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
-import org.apache.hc.core5.http.ContentType;
-import org.apache.hc.core5.net.URIBuilder;
import java.io.IOException;
import java.io.UncheckedIOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
+import java.util.StringJoiner;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
@@ -86,8 +82,10 @@ class HttpFeedClient implements FeedClient {
private CompletableFuture<Result> send(String method, DocumentId documentId, String operationJson, OperationParameters params) {
ensureOpen();
- String path = operationPath(documentId, params).toString();
- HttpRequest request = new HttpRequest(method, path, requestHeaders, operationJson.getBytes(UTF_8)); // TODO: make it bytes all the way?
+ HttpRequest request = new HttpRequest(method,
+ getPath(documentId) + getQuery(params),
+ requestHeaders,
+ operationJson.getBytes(UTF_8)); // TODO: make it bytes all the way?
return requestStrategy.enqueue(documentId, request)
.thenApply(response -> toResult(request, response, documentId));
@@ -134,44 +132,45 @@ class HttpFeedClient implements FeedClient {
return new Result(type, documentId, message, trace);
}
- static List<String> toPath(DocumentId documentId) {
- List<String> path = new ArrayList<>();
+ static String getPath(DocumentId documentId) {
+ StringJoiner path = new StringJoiner("/", "/", "");
path.add("document");
path.add("v1");
- path.add(documentId.namespace());
- path.add(documentId.documentType());
+ path.add(encode(documentId.namespace()));
+ path.add(encode(documentId.documentType()));
if (documentId.number().isPresent()) {
path.add("number");
path.add(Long.toUnsignedString(documentId.number().getAsLong()));
}
else if (documentId.group().isPresent()) {
path.add("group");
- path.add(documentId.group().get());
+ path.add(encode(documentId.group().get()));
}
else {
path.add("docid");
}
- path.add(documentId.userSpecific());
+ path.add(encode(documentId.userSpecific()));
- return path;
+ return path.toString();
}
- static URI operationPath(DocumentId documentId, OperationParameters params) {
- URIBuilder url = new URIBuilder();
- url.setPathSegments(toPath(documentId));
-
- if (params.createIfNonExistent()) url.addParameter("create", "true");
- params.testAndSetCondition().ifPresent(condition -> url.addParameter("condition", condition));
- params.timeout().ifPresent(timeout -> url.addParameter("timeout", timeout.toMillis() + "ms"));
- params.route().ifPresent(route -> url.addParameter("route", route));
- params.tracelevel().ifPresent(tracelevel -> url.addParameter("tracelevel", Integer.toString(tracelevel)));
-
+ static String encode(String raw) {
try {
- return url.build();
+ return URLEncoder.encode(raw, UTF_8.name());
}
- catch (URISyntaxException e) {
+ catch (UnsupportedEncodingException e) {
throw new IllegalStateException(e);
}
}
+ static String getQuery(OperationParameters params) {
+ StringJoiner query = new StringJoiner("&", "?", "").setEmptyValue("");
+ if (params.createIfNonExistent()) query.add("create=true");
+ params.testAndSetCondition().ifPresent(condition -> query.add("condition=" + encode(condition)));
+ params.timeout().ifPresent(timeout -> query.add("timeout=" + timeout.toMillis() + "ms"));
+ params.route().ifPresent(route -> query.add("route=" + encode(route)));
+ params.tracelevel().ifPresent(tracelevel -> query.add("tracelevel=" + tracelevel));
+ return query.toString();
+ }
+
}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
index 5646d37cde3..f9fc7544501 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
@@ -59,7 +59,7 @@ class HttpRequestStrategy implements RequestStrategy {
});
HttpRequestStrategy(FeedClientBuilder builder) throws IOException {
- this(builder, new BenchmarkingCluster(new ApacheCluster(builder)));
+ this(builder, new BenchmarkingCluster(new JettyCluster(builder)));
}
HttpRequestStrategy(FeedClientBuilder builder, Cluster cluster) {
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JettyCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JettyCluster.java
new file mode 100644
index 00000000000..dc889d29d36
--- /dev/null
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JettyCluster.java
@@ -0,0 +1,123 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.client.HttpClientTransport;
+import org.eclipse.jetty.client.api.Request;
+import org.eclipse.jetty.client.api.Result;
+import org.eclipse.jetty.client.util.BufferingResponseListener;
+import org.eclipse.jetty.client.util.BytesContentProvider;
+import org.eclipse.jetty.http.HttpField;
+import org.eclipse.jetty.http2.client.HTTP2Client;
+import org.eclipse.jetty.http2.client.http.HttpClientTransportOverHTTP2;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * @author jonmv
+ */
+class JettyCluster implements Cluster {
+
+ private final List<Endpoint> endpoints = new ArrayList<>();
+
+ JettyCluster(FeedClientBuilder builder) {
+ for (URI endpoint : builder.endpoints)
+ endpoints.add(new Endpoint(createJettyHttpClient(builder), endpoint));
+ }
+
+ private static HttpClient createJettyHttpClient(FeedClientBuilder builder) {
+ try {
+ SslContextFactory.Client clientSslCtxFactory = new SslContextFactory.Client();
+ clientSslCtxFactory.setSslContext(builder.constructSslContext());
+ clientSslCtxFactory.setHostnameVerifier(builder.hostnameVerifier);
+
+ HTTP2Client wrapped = new HTTP2Client();
+ wrapped.setSelectors(8);
+ wrapped.setMaxConcurrentPushedStreams(builder.maxStreamsPerConnection);
+ HttpClientTransport transport = new HttpClientTransportOverHTTP2(wrapped);
+ HttpClient client = new HttpClient(transport, clientSslCtxFactory);
+ client.setUserAgentField(new HttpField("User-Agent", String.format("vespa-feed-client/%s", Vespa.VERSION)));
+ client.setDefaultRequestContentType("application/json");
+ client.setFollowRedirects(false);
+ client.setMaxRequestsQueuedPerDestination(builder.connectionsPerEndpoint * builder.maxStreamsPerConnection);
+ client.setIdleTimeout(10000);
+ client.setMaxConnectionsPerDestination(builder.connectionsPerEndpoint);
+ client.setRequestBufferSize(1 << 16);
+
+ client.start();
+ return client;
+ }
+ catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+
+ @Override
+ public void dispatch(HttpRequest request, CompletableFuture<HttpResponse> vessel) {
+ int index = 0;
+ int min = Integer.MAX_VALUE;
+ for (int i = 0; i < endpoints.size(); i++)
+ if (endpoints.get(i).inflight.get() < min) {
+ index = i;
+ min = endpoints.get(i).inflight.get();
+ }
+
+ Endpoint endpoint = endpoints.get(index);
+ endpoint.inflight.incrementAndGet();
+ try {
+ Request jettyRequest = endpoint.client.newRequest(endpoint.uri.resolve(request.path()))
+ .method(request.method())
+ .timeout(5, TimeUnit.MINUTES)
+ .content(request.body() == null ? null : new BytesContentProvider("application/json", request.body()));
+ request.headers().forEach((name, value) -> jettyRequest.header(name, value.get()));
+ jettyRequest.send(new BufferingResponseListener() {
+ @Override public void onComplete(Result result) {
+ if (result.isSucceeded())
+ vessel.complete(HttpResponse.of(result.getResponse().getStatus(),
+ getContent()));
+ else
+ vessel.completeExceptionally(result.getFailure());
+ }
+ });
+ }
+ catch (Throwable thrown) {
+ vessel.completeExceptionally(thrown);
+ }
+ vessel.whenComplete((__, ___) -> endpoint.inflight.decrementAndGet());
+ }
+
+ @Override
+ public void close() {
+ Throwable thrown = null;
+ for (Endpoint endpoint : endpoints)
+ try {
+ endpoint.client.stop();
+ }
+ catch (Throwable t) {
+ if (thrown == null) thrown = t;
+ else thrown.addSuppressed(t);
+ }
+ if (thrown != null) throw new RuntimeException(thrown);
+ }
+
+
+ private static class Endpoint {
+
+ private final HttpClient client;
+ private final AtomicInteger inflight = new AtomicInteger(0);
+ private final URI uri;
+
+ private Endpoint(HttpClient client, URI uri) {
+ this.client = client;
+ this.uri = uri;
+ }
+
+ }
+}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java
index de32e7abdf5..b3a7aca1808 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java
@@ -74,6 +74,31 @@ public class JsonFeeder implements Closeable {
public static Builder builder(FeedClient client) { return new Builder(client); }
+ /** Feeds single JSON feed operations on the form
+ * <pre>
+ * {
+ * "id": "id:ns:type::boo",
+ * "fields": { ... document fields ... }
+ * }
+ * </pre>
+ */
+ public CompletableFuture<Result> feedSingle(String json) {
+ CompletableFuture<Result> result = new CompletableFuture<>();
+ try {
+ SingleOperationParserAndExecutor parser = new SingleOperationParserAndExecutor(json.getBytes(UTF_8));
+ parser.next().whenCompleteAsync((operationResult, error) -> {
+ if (error != null) {
+ result.completeExceptionally(error);
+ } else {
+ result.complete(operationResult);
+ }
+ }, resultExecutor);
+ } catch (Exception e) {
+ resultExecutor.execute(() -> result.completeExceptionally(e));
+ }
+ return result;
+ }
+
/** Feeds a stream containing a JSON array of feed operations on the form
* <pre>
* [
@@ -288,87 +313,111 @@ public class JsonFeeder implements Closeable {
}
}
+ private class SingleOperationParserAndExecutor extends OperationParserAndExecutor {
+
+ private final byte[] json;
+
+ SingleOperationParserAndExecutor(byte[] json) throws IOException {
+ super(factory.createParser(json), false);
+ this.json = json;
+ }
+
+ @Override
+ String getDocumentJson(long start, long end) {
+ return new String(json, (int) start, (int) (end - start), UTF_8);
+ }
+ }
+
private abstract class OperationParserAndExecutor {
private final JsonParser parser;
private final boolean multipleOperations;
+ private boolean arrayPrefixParsed;
protected OperationParserAndExecutor(JsonParser parser, boolean multipleOperations) throws IOException {
this.parser = parser;
this.multipleOperations = multipleOperations;
- if (multipleOperations) expect(START_ARRAY);
}
abstract String getDocumentJson(long start, long end);
CompletableFuture<Result> next() throws IOException {
- JsonToken token = parser.nextToken();
- if (token == END_ARRAY && multipleOperations) return null;
- else if (token == null && !multipleOperations) return null;
- else if (token == START_OBJECT);
- else throw new IllegalArgumentException("Unexpected token '" + parser.currentToken() + "' at offset " + parser.getTokenLocation().getByteOffset());
- long start = 0, end = -1;
- OperationType type = null;
- DocumentId id = null;
- OperationParameters parameters = protoParameters;
- loop: while (true) {
- switch (parser.nextToken()) {
- case FIELD_NAME:
- switch (parser.getText()) {
- case "id":
- case "put": type = PUT; id = readId(); break;
- case "update": type = UPDATE; id = readId(); break;
- case "remove": type = REMOVE; id = readId(); break;
- case "condition": parameters = parameters.testAndSetCondition(readString()); break;
- case "create": parameters = parameters.createIfNonExistent(readBoolean()); break;
- case "fields": {
- expect(START_OBJECT);
- start = parser.getTokenLocation().getByteOffset();
- int depth = 1;
- while (depth > 0) switch (parser.nextToken()) {
- case START_OBJECT: ++depth; break;
- case END_OBJECT: --depth; break;
+ try {
+ if (multipleOperations && !arrayPrefixParsed){
+ expect(START_ARRAY);
+ arrayPrefixParsed = true;
+ }
+
+ JsonToken token = parser.nextToken();
+ if (token == END_ARRAY && multipleOperations) return null;
+ else if (token == null && !multipleOperations) return null;
+ else if (token == START_OBJECT);
+ else throw new JsonParseException("Unexpected token '" + parser.currentToken() + "' at offset " + parser.getTokenLocation().getByteOffset());
+ long start = 0, end = -1;
+ OperationType type = null;
+ DocumentId id = null;
+ OperationParameters parameters = protoParameters;
+ loop: while (true) {
+ switch (parser.nextToken()) {
+ case FIELD_NAME:
+ switch (parser.getText()) {
+ case "id":
+ case "put": type = PUT; id = readId(); break;
+ case "update": type = UPDATE; id = readId(); break;
+ case "remove": type = REMOVE; id = readId(); break;
+ case "condition": parameters = parameters.testAndSetCondition(readString()); break;
+ case "create": parameters = parameters.createIfNonExistent(readBoolean()); break;
+ case "fields": {
+ expect(START_OBJECT);
+ start = parser.getTokenLocation().getByteOffset();
+ int depth = 1;
+ while (depth > 0) switch (parser.nextToken()) {
+ case START_OBJECT: ++depth; break;
+ case END_OBJECT: --depth; break;
+ }
+ end = parser.getTokenLocation().getByteOffset() + 1;
+ break;
}
- end = parser.getTokenLocation().getByteOffset() + 1;
- break;
+ default: throw new JsonParseException("Unexpected field name '" + parser.getText() + "' at offset " +
+ parser.getTokenLocation().getByteOffset());
}
- default: throw new IllegalArgumentException("Unexpected field name '" + parser.getText() + "' at offset " +
- parser.getTokenLocation().getByteOffset());
- }
- break;
+ break;
- case END_OBJECT:
- break loop;
+ case END_OBJECT:
+ break loop;
- default:
- throw new IllegalArgumentException("Unexpected token '" + parser.currentToken() + "' at offset " +
- parser.getTokenLocation().getByteOffset());
+ default:
+ throw new JsonParseException("Unexpected token '" + parser.currentToken() + "' at offset " +
+ parser.getTokenLocation().getByteOffset());
+ }
}
- }
- if (id == null)
- throw new IllegalArgumentException("No document id for document at offset " + start);
-
- if (end < start)
- throw new IllegalArgumentException("No 'fields' object for document at offset " + parser.getTokenLocation().getByteOffset());
- String payload = getDocumentJson(start, end);
- switch (type) {
- case PUT: return client.put (id, payload, parameters);
- case UPDATE: return client.update(id, payload, parameters);
- case REMOVE: return client.remove(id, parameters);
- default: throw new IllegalStateException("Unexpected operation type '" + type + "'");
+ if (id == null)
+ throw new JsonParseException("No document id for document at offset " + start);
+
+ if (end < start)
+ throw new JsonParseException("No 'fields' object for document at offset " + parser.getTokenLocation().getByteOffset());
+ String payload = getDocumentJson(start, end);
+ switch (type) {
+ case PUT: return client.put (id, payload, parameters);
+ case UPDATE: return client.update(id, payload, parameters);
+ case REMOVE: return client.remove(id, parameters);
+ default: throw new JsonParseException("Unexpected operation type '" + type + "'");
+ }
+ } catch (com.fasterxml.jackson.core.JacksonException e) {
+ throw new JsonParseException("Failed to parse JSON", e);
}
}
- void expect(JsonToken token) throws IOException {
+ private void expect(JsonToken token) throws IOException {
if (parser.nextToken() != token)
- throw new IllegalArgumentException("Expected '" + token + "' at offset " + parser.getTokenLocation().getByteOffset() +
+ throw new JsonParseException("Expected '" + token + "' at offset " + parser.getTokenLocation().getByteOffset() +
", but found '" + parser.currentToken() + "' (" + parser.getText() + ")");
}
private String readString() throws IOException {
String value = parser.nextTextValue();
if (value == null)
- throw new IllegalArgumentException("Expected '" + VALUE_STRING + "' at offset " + parser.getTokenLocation().getByteOffset() +
+ throw new JsonParseException("Expected '" + VALUE_STRING + "' at offset " + parser.getTokenLocation().getByteOffset() +
", but found '" + parser.currentToken() + "' (" + parser.getText() + ")");
return value;
@@ -377,7 +426,7 @@ public class JsonFeeder implements Closeable {
private boolean readBoolean() throws IOException {
Boolean value = parser.nextBooleanValue();
if (value == null)
- throw new IllegalArgumentException("Expected '" + VALUE_FALSE + "' or '" + VALUE_TRUE + "' at offset " + parser.getTokenLocation().getByteOffset() +
+ throw new JsonParseException("Expected '" + VALUE_FALSE + "' or '" + VALUE_TRUE + "' at offset " + parser.getTokenLocation().getByteOffset() +
", but found '" + parser.currentToken() + "' (" + parser.getText() + ")");
return value;
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonParseException.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonParseException.java
new file mode 100644
index 00000000000..785ffd8eb4c
--- /dev/null
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonParseException.java
@@ -0,0 +1,13 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+/**
+ * @author bjorncs
+ */
+public class JsonParseException extends FeedException {
+
+ public JsonParseException(String message) { super(message); }
+
+ public JsonParseException(String message, Throwable cause) { super(message, cause); }
+
+}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/SslContextBuilder.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/SslContextBuilder.java
index 7200d5fd943..9114e22f4a6 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/SslContextBuilder.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/SslContextBuilder.java
@@ -20,11 +20,14 @@ import java.nio.file.Path;
import java.security.GeneralSecurityException;
import java.security.KeyFactory;
import java.security.KeyStore;
+import java.security.KeyStoreException;
import java.security.PrivateKey;
import java.security.cert.Certificate;
import java.security.cert.X509Certificate;
import java.security.spec.PKCS8EncodedKeySpec;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.List;
/**
@@ -39,6 +42,9 @@ class SslContextBuilder {
private Path certificateFile;
private Path privateKeyFile;
private Path caCertificatesFile;
+ private Collection<X509Certificate> certificate;
+ private PrivateKey privateKey;
+ private Collection<X509Certificate> caCertificates;
SslContextBuilder withCertificateAndKey(Path certificate, Path privateKey) {
this.certificateFile = certificate;
@@ -46,20 +52,35 @@ class SslContextBuilder {
return this;
}
+ SslContextBuilder withCertificateAndKey(Collection<X509Certificate> certificate, PrivateKey privateKey) {
+ this.certificate = certificate;
+ this.privateKey = privateKey;
+ return this;
+ }
+
SslContextBuilder withCaCertificates(Path caCertificates) {
this.caCertificatesFile = caCertificates;
return this;
}
+ SslContextBuilder withCaCertificates(Collection<X509Certificate> caCertificates) {
+ this.caCertificates = caCertificates;
+ return this;
+ }
+
SSLContext build() throws IOException {
try {
KeyStore keystore = KeyStore.getInstance("PKCS12");
keystore.load(null);
if (certificateFile != null && privateKeyFile != null) {
keystore.setKeyEntry("cert", privateKey(privateKeyFile), new char[0], certificates(certificateFile));
+ } else if (certificate != null && privateKey != null) {
+ keystore.setKeyEntry("cert", privateKey, new char[0], certificate.toArray(new Certificate[0]));
}
if (caCertificatesFile != null) {
- keystore.setCertificateEntry("ca-cert", certificates(caCertificatesFile)[0]);
+ addCaCertificates(keystore, Arrays.asList(certificates(caCertificatesFile)));
+ } else if (caCertificates != null) {
+ addCaCertificates(keystore, caCertificates);
}
KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
kmf.init(keystore, new char[0]);
@@ -73,6 +94,13 @@ class SslContextBuilder {
}
}
+ private static void addCaCertificates(KeyStore keystore, Collection<? extends Certificate> certificates) throws KeyStoreException {
+ int i = 0;
+ for (Certificate cert : certificates) {
+ keystore.setCertificateEntry("ca-cert-" + ++i, cert);
+ }
+ }
+
private static Certificate[] certificates(Path file) throws IOException, GeneralSecurityException {
try (PEMParser parser = new PEMParser(Files.newBufferedReader(file))) {
List<X509Certificate> result = new ArrayList<>();
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java
index e2ae5bc7155..03194e23d47 100644
--- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java
@@ -11,6 +11,7 @@ import java.nio.file.StandardOpenOption;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -51,52 +52,72 @@ class JsonFeederTest {
try (InputStream in = Files.newInputStream(tmpFile, StandardOpenOption.READ, StandardOpenOption.DELETE_ON_CLOSE)) {
AtomicInteger resultsReceived = new AtomicInteger();
AtomicBoolean completedSuccessfully = new AtomicBoolean();
- Set<String> ids = new HashSet<>();
long startNanos = System.nanoTime();
- JsonFeeder.builder(new FeedClient() {
+ SimpleClient feedClient = new SimpleClient();
+ JsonFeeder.builder(feedClient).build()
+ .feedMany(in, 1 << 7,
+ new JsonFeeder.ResultCallback() { // TODO: hangs when buffer is smaller than largest document
+ @Override
+ public void onNextResult(Result result, Throwable error) { resultsReceived.incrementAndGet(); }
- @Override
- public CompletableFuture<Result> put(DocumentId documentId, String documentJson, OperationParameters params) {
- ids.add(documentId.userSpecific());
- return createSuccessResult(documentId);
- }
+ @Override
+ public void onError(Throwable error) { exceptionThrow.set(error); }
- @Override
- public CompletableFuture<Result> update(DocumentId documentId, String updateJson, OperationParameters params) {
- return createSuccessResult(documentId);
- }
+ @Override
+ public void onComplete() { completedSuccessfully.set(true); }
+ })
+ .join();
- @Override
- public CompletableFuture<Result> remove(DocumentId documentId, OperationParameters params) {
- return createSuccessResult(documentId);
- }
+ System.err.println((json.length() / 1048576.0) + " MB in " + (System.nanoTime() - startNanos) * 1e-9 + " seconds");
+ assertEquals(docs + 1, feedClient.ids.size());
+ assertEquals(docs + 1, resultsReceived.get());
+ assertTrue(completedSuccessfully.get());
+ assertNull(exceptionThrow.get());
+ }
+ }
- @Override
- public OperationStats stats() { return null; }
+ @Test
+ public void singleJsonOperationIsDispatchedToFeedClient() throws IOException, ExecutionException, InterruptedException {
+ try (JsonFeeder feeder = JsonFeeder.builder(new SimpleClient()).build()) {
+ String json = "{\"put\": \"id:ns:type::abc1\",\n" +
+ " \"fields\": {\n" +
+ " \"lul\":\"lal\"\n" +
+ " }\n" +
+ " }\n";
+ Result result = feeder.feedSingle(json).get();
+ assertEquals(DocumentId.of("id:ns:type::abc1"), result.documentId());
+ assertEquals(Result.Type.success, result.type());
+ assertEquals("success", result.resultMessage().get());
+ }
+ }
- @Override
- public void close(boolean graceful) { }
+ private static class SimpleClient implements FeedClient {
+ final Set<String> ids = new HashSet<>();
- private CompletableFuture<Result> createSuccessResult(DocumentId documentId) {
- return CompletableFuture.completedFuture(new Result(Result.Type.success, documentId, "success", null));
- }
+ @Override
+ public CompletableFuture<Result> put(DocumentId documentId, String documentJson, OperationParameters params) {
+ ids.add(documentId.userSpecific());
+ return createSuccessResult(documentId);
+ }
- }).build().feedMany(in, 1 << 7, new JsonFeeder.ResultCallback() { // TODO: hangs when buffer is smaller than largest document
- @Override
- public void onNextResult(Result result, Throwable error) { resultsReceived.incrementAndGet(); }
+ @Override
+ public CompletableFuture<Result> update(DocumentId documentId, String updateJson, OperationParameters params) {
+ return createSuccessResult(documentId);
+ }
- @Override
- public void onError(Throwable error) { exceptionThrow.set(error); }
+ @Override
+ public CompletableFuture<Result> remove(DocumentId documentId, OperationParameters params) {
+ return createSuccessResult(documentId);
+ }
- @Override
- public void onComplete() { completedSuccessfully.set(true); }
- }).join();
+ @Override
+ public OperationStats stats() { return null; }
- System.err.println((json.length() / 1048576.0) + " MB in " + (System.nanoTime() - startNanos) * 1e-9 + " seconds");
- assertEquals(docs + 1, ids.size());
- assertEquals(docs + 1, resultsReceived.get());
- assertTrue(completedSuccessfully.get());
- assertNull(exceptionThrow.get());
+ @Override
+ public void close(boolean graceful) { }
+
+ private CompletableFuture<Result> createSuccessResult(DocumentId documentId) {
+ return CompletableFuture.completedFuture(new Result(Result.Type.success, documentId, "success", null));
}
}
diff --git a/vespa-hadoop/pom.xml b/vespa-hadoop/pom.xml
index 382c28dc884..39f10d84f9b 100644
--- a/vespa-hadoop/pom.xml
+++ b/vespa-hadoop/pom.xml
@@ -101,17 +101,22 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
-
<!-- Vespa feeding dependencies -->
<dependency>
<groupId>com.yahoo.vespa</groupId>
<artifactId>vespa-http-client</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.yahoo.vespa</groupId>
+ <artifactId>vespa-feed-client</artifactId>
+ <version>${project.version}</version>
+ <scope>compile</scope>
+ </dependency>
</dependencies>
diff --git a/vespa-hadoop/src/main/java/ai/vespa/feed/client/DryrunResult.java b/vespa-hadoop/src/main/java/ai/vespa/feed/client/DryrunResult.java
new file mode 100644
index 00000000000..5974a8df271
--- /dev/null
+++ b/vespa-hadoop/src/main/java/ai/vespa/feed/client/DryrunResult.java
@@ -0,0 +1,18 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+import ai.vespa.feed.client.Result.Type;
+
+/**
+ * Workaround for package-private {@link Result} constructor.
+ *
+ * @author bjorncs
+ */
+public class DryrunResult {
+
+ private DryrunResult() {}
+
+ public static Result create(Type type, DocumentId documentId, String resultMessage, String traceMessage) {
+ return new Result(type, documentId, resultMessage, traceMessage);
+ }
+}
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/LegacyVespaRecordWriter.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/LegacyVespaRecordWriter.java
new file mode 100644
index 00000000000..b716c55beb5
--- /dev/null
+++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/LegacyVespaRecordWriter.java
@@ -0,0 +1,235 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.hadoop.mapreduce;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration;
+import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters;
+import com.yahoo.vespa.hadoop.pig.VespaDocumentOperation;
+import com.yahoo.vespa.http.client.FeedClient;
+import com.yahoo.vespa.http.client.FeedClientFactory;
+import com.yahoo.vespa.http.client.Result;
+import com.yahoo.vespa.http.client.config.Cluster;
+import com.yahoo.vespa.http.client.config.ConnectionParams;
+import com.yahoo.vespa.http.client.config.Endpoint;
+import com.yahoo.vespa.http.client.config.FeedParams;
+import com.yahoo.vespa.http.client.config.FeedParams.DataFormat;
+import com.yahoo.vespa.http.client.config.SessionParams;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import javax.xml.namespace.QName;
+import javax.xml.stream.FactoryConfigurationError;
+import javax.xml.stream.XMLEventReader;
+import javax.xml.stream.XMLInputFactory;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.events.StartElement;
+import javax.xml.stream.events.XMLEvent;
+import java.io.IOException;
+import java.io.StringReader;
+import java.time.Duration;
+import java.util.List;
+import java.util.StringTokenizer;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.logging.Logger;
+
+/**
+ * {@link LegacyVespaRecordWriter} sends the output &lt;key, value&gt; to one or more Vespa endpoints using vespa-http-client.
+ *
+ * @author lesters
+ */
+@SuppressWarnings("rawtypes")
+public class LegacyVespaRecordWriter extends RecordWriter {
+
+ private final static Logger log = Logger.getLogger(LegacyVespaRecordWriter.class.getCanonicalName());
+
+ private boolean initialized = false;
+ private FeedClient feedClient;
+ private final VespaCounters counters;
+ private final int progressInterval;
+
+ final VespaConfiguration configuration;
+
+ LegacyVespaRecordWriter(VespaConfiguration configuration, VespaCounters counters) {
+ this.counters = counters;
+ this.configuration = configuration;
+ this.progressInterval = configuration.progressInterval();
+ }
+
+
+ @Override
+ public void write(Object key, Object data) throws IOException, InterruptedException {
+ if (!initialized) {
+ initialize();
+ }
+
+ String doc = data.toString().trim();
+
+ // Parse data to find document id - if none found, skip this write
+ String docId = DataFormat.JSON_UTF8.equals(configuration.dataFormat()) ? findDocId(doc)
+ : findDocIdFromXml(doc);
+ if (docId != null && docId.length() >= 0) {
+ feedClient.stream(docId, doc);
+ counters.incrementDocumentsSent(1);
+ } else {
+ counters.incrementDocumentsSkipped(1);
+ }
+
+ if (counters.getDocumentsSent() % progressInterval == 0) {
+ String progress = String.format("Feed progress: %d / %d / %d / %d (sent, ok, failed, skipped)",
+ counters.getDocumentsSent(),
+ counters.getDocumentsOk(),
+ counters.getDocumentsFailed(),
+ counters.getDocumentsSkipped());
+ log.info(progress);
+ }
+
+ }
+
+
+ @Override
+ public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
+ if (feedClient != null) {
+ feedClient.close();
+ }
+ }
+
+ protected ConnectionParams.Builder configureConnectionParams() {
+ ConnectionParams.Builder connParamsBuilder = new ConnectionParams.Builder();
+ connParamsBuilder.setDryRun(configuration.dryrun());
+ connParamsBuilder.setUseCompression(configuration.useCompression());
+ connParamsBuilder.setNumPersistentConnectionsPerEndpoint(configuration.numConnections());
+ connParamsBuilder.setMaxRetries(configuration.numRetries());
+ if (configuration.proxyHost() != null) {
+ connParamsBuilder.setProxyHost(configuration.proxyHost());
+ }
+ if (configuration.proxyPort() >= 0) {
+ connParamsBuilder.setProxyPort(configuration.proxyPort());
+ }
+ return connParamsBuilder;
+ }
+
+ protected FeedParams.Builder configureFeedParams() {
+ FeedParams.Builder feedParamsBuilder = new FeedParams.Builder();
+ feedParamsBuilder.setDataFormat(configuration.dataFormat());
+ feedParamsBuilder.setRoute(configuration.route());
+ feedParamsBuilder.setMaxSleepTimeMs(configuration.maxSleepTimeMs());
+ feedParamsBuilder.setMaxInFlightRequests(configuration.maxInFlightRequests());
+ feedParamsBuilder.setLocalQueueTimeOut(Duration.ofMinutes(10).toMillis());
+ return feedParamsBuilder;
+ }
+
+ protected SessionParams.Builder configureSessionParams() {
+ SessionParams.Builder sessionParamsBuilder = new SessionParams.Builder();
+ sessionParamsBuilder.setThrottlerMinSize(configuration.throttlerMinSize());
+ sessionParamsBuilder.setClientQueueSize(configuration.maxInFlightRequests()*2);
+ return sessionParamsBuilder;
+ }
+
+ private void initialize() {
+ if (!configuration.dryrun() && configuration.randomStartupSleepMs() > 0) {
+ int delay = ThreadLocalRandom.current().nextInt(configuration.randomStartupSleepMs());
+ log.info("VespaStorage: Delaying startup by " + delay + " ms");
+ try {
+ Thread.sleep(delay);
+ } catch (Exception e) {}
+ }
+
+ ConnectionParams.Builder connParamsBuilder = configureConnectionParams();
+ FeedParams.Builder feedParamsBuilder = configureFeedParams();
+ SessionParams.Builder sessionParams = configureSessionParams();
+
+ sessionParams.setConnectionParams(connParamsBuilder.build());
+ sessionParams.setFeedParams(feedParamsBuilder.build());
+
+ String endpoints = configuration.endpoint();
+ StringTokenizer tokenizer = new StringTokenizer(endpoints, ",");
+ while (tokenizer.hasMoreTokens()) {
+ String endpoint = tokenizer.nextToken().trim();
+ sessionParams.addCluster(new Cluster.Builder().addEndpoint(
+ Endpoint.create(endpoint, configuration.defaultPort(), configuration.useSSL())
+ ).build());
+ }
+
+ ResultCallback resultCallback = new ResultCallback(counters);
+ feedClient = FeedClientFactory.create(sessionParams.build(), resultCallback);
+
+ initialized = true;
+ log.info("VespaStorage configuration:\n" + configuration.toString());
+ log.info(feedClient.getStatsAsJson());
+ }
+
+ private String findDocIdFromXml(String xml) {
+ try {
+ XMLEventReader eventReader = XMLInputFactory.newInstance().createXMLEventReader(new StringReader(xml));
+ while (eventReader.hasNext()) {
+ XMLEvent event = eventReader.nextEvent();
+ if (event.getEventType() == XMLEvent.START_ELEMENT) {
+ StartElement element = event.asStartElement();
+ String elementName = element.getName().getLocalPart();
+ if (VespaDocumentOperation.Operation.valid(elementName)) {
+ return element.getAttributeByName(QName.valueOf("documentid")).getValue();
+ }
+ }
+ }
+ } catch (XMLStreamException | FactoryConfigurationError e) {
+ // as json dude does
+ return null;
+ }
+ return null;
+ }
+
+ private String findDocId(String json) throws IOException {
+ JsonFactory factory = new JsonFactory();
+ try(JsonParser parser = factory.createParser(json)) {
+ if (parser.nextToken() != JsonToken.START_OBJECT) {
+ return null;
+ }
+ while (parser.nextToken() != JsonToken.END_OBJECT) {
+ String fieldName = parser.getCurrentName();
+ parser.nextToken();
+ if (VespaDocumentOperation.Operation.valid(fieldName)) {
+ String docId = parser.getText();
+ return docId;
+ } else {
+ parser.skipChildren();
+ }
+ }
+ } catch (JsonParseException ex) {
+ return null;
+ }
+ return null;
+ }
+
+
+ static class ResultCallback implements FeedClient.ResultCallback {
+ final VespaCounters counters;
+
+ public ResultCallback(VespaCounters counters) {
+ this.counters = counters;
+ }
+
+ @Override
+ public void onCompletion(String docId, Result documentResult) {
+ if (!documentResult.isSuccess()) {
+ counters.incrementDocumentsFailed(1);
+ StringBuilder sb = new StringBuilder();
+ sb.append("Problems with docid ");
+ sb.append(docId);
+ sb.append(": ");
+ List<Result.Detail> details = documentResult.getDetails();
+ for (Result.Detail detail : details) {
+ sb.append(detail.toString());
+ sb.append(" ");
+ }
+ log.warning(sb.toString());
+ return;
+ }
+ counters.incrementDocumentsOk(1);
+ }
+
+ }
+
+}
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java
index bef51e9ae08..97bc7dc838e 100644
--- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java
+++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java
@@ -10,7 +10,7 @@ import java.util.Properties;
/**
* An output specification for writing to Vespa instances in a Map-Reduce job.
- * Mainly returns an instance of a {@link VespaRecordWriter} that does the
+ * Mainly returns an instance of a {@link LegacyVespaRecordWriter} that does the
* actual feeding to Vespa.
*
* @author lesters
@@ -35,7 +35,9 @@ public class VespaOutputFormat extends OutputFormat {
public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
VespaCounters counters = VespaCounters.get(context);
VespaConfiguration configuration = VespaConfiguration.get(context.getConfiguration(), configOverride);
- return new VespaRecordWriter(configuration, counters);
+ return configuration.useLegacyClient()
+ ? new LegacyVespaRecordWriter(configuration, counters)
+ : new VespaRecordWriter(configuration, counters);
}
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java
index 4cc93bfd538..73e10c39419 100644
--- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java
+++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java
@@ -1,83 +1,71 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.hadoop.mapreduce;
-import com.fasterxml.jackson.core.JsonFactory;
-import com.fasterxml.jackson.core.JsonParseException;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonToken;
+import ai.vespa.feed.client.DocumentId;
+import ai.vespa.feed.client.DryrunResult;
+import ai.vespa.feed.client.FeedClient;
+import ai.vespa.feed.client.FeedClientBuilder;
+import ai.vespa.feed.client.JsonFeeder;
+import ai.vespa.feed.client.JsonParseException;
+import ai.vespa.feed.client.OperationParameters;
+import ai.vespa.feed.client.OperationStats;
+import ai.vespa.feed.client.Result;
import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration;
import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters;
-import com.yahoo.vespa.hadoop.pig.VespaDocumentOperation;
-import com.yahoo.vespa.http.client.FeedClient;
-import com.yahoo.vespa.http.client.FeedClientFactory;
-import com.yahoo.vespa.http.client.Result;
-import com.yahoo.vespa.http.client.config.Cluster;
-import com.yahoo.vespa.http.client.config.ConnectionParams;
-import com.yahoo.vespa.http.client.config.Endpoint;
import com.yahoo.vespa.http.client.config.FeedParams;
-import com.yahoo.vespa.http.client.config.FeedParams.DataFormat;
-import com.yahoo.vespa.http.client.config.SessionParams;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import javax.xml.namespace.QName;
-import javax.xml.stream.FactoryConfigurationError;
-import javax.xml.stream.XMLEventReader;
-import javax.xml.stream.XMLInputFactory;
-import javax.xml.stream.XMLStreamException;
-import javax.xml.stream.events.StartElement;
-import javax.xml.stream.events.XMLEvent;
import java.io.IOException;
-import java.io.StringReader;
+import java.net.URI;
import java.time.Duration;
+import java.util.Arrays;
import java.util.List;
-import java.util.StringTokenizer;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.logging.Logger;
+import static java.util.stream.Collectors.toList;
+
/**
- * VespaRecordWriter sends the output &lt;key, value&gt; to one or more Vespa endpoints.
+ * {@link VespaRecordWriter} sends the output &lt;key, value&gt; to one or more Vespa endpoints using vespa-feed-client.
*
- * @author lesters
+ * @author bjorncs
*/
-@SuppressWarnings("rawtypes")
-public class VespaRecordWriter extends RecordWriter {
+public class VespaRecordWriter extends RecordWriter<Object, Object> {
private final static Logger log = Logger.getLogger(VespaRecordWriter.class.getCanonicalName());
- private boolean initialized = false;
- private FeedClient feedClient;
private final VespaCounters counters;
- private final int progressInterval;
+ private final VespaConfiguration config;
- final VespaConfiguration configuration;
+ private boolean initialized = false;
+ private JsonFeeder feeder;
- VespaRecordWriter(VespaConfiguration configuration, VespaCounters counters) {
+ protected VespaRecordWriter(VespaConfiguration config, VespaCounters counters) {
this.counters = counters;
- this.configuration = configuration;
- this.progressInterval = configuration.progressInterval();
+ this.config = config;
}
-
@Override
- public void write(Object key, Object data) throws IOException, InterruptedException {
- if (!initialized) {
- initialize();
- }
-
- String doc = data.toString().trim();
-
- // Parse data to find document id - if none found, skip this write
- String docId = DataFormat.JSON_UTF8.equals(configuration.dataFormat()) ? findDocId(doc)
- : findDocIdFromXml(doc);
- if (docId != null && docId.length() >= 0) {
- feedClient.stream(docId, doc);
- counters.incrementDocumentsSent(1);
- } else {
- counters.incrementDocumentsSkipped(1);
- }
-
- if (counters.getDocumentsSent() % progressInterval == 0) {
+ public void write(Object key, Object data) throws IOException {
+ initializeOnFirstWrite();
+ String json = data.toString().trim();
+ feeder.feedSingle(json)
+ .whenComplete((result, error) -> {
+ if (error != null) {
+ if (error instanceof JsonParseException) {
+ counters.incrementDocumentsSkipped(1);
+ } else {
+ log.warning("Failed to feed single document: " + error);
+ counters.incrementDocumentsFailed(1);
+ }
+ } else {
+ counters.incrementDocumentsOk(1);
+ }
+ });
+ counters.incrementDocumentsSent(1);
+ if (counters.getDocumentsSent() % config.progressInterval() == 0) {
String progress = String.format("Feed progress: %d / %d / %d / %d (sent, ok, failed, skipped)",
counters.getDocumentsSent(),
counters.getDocumentsOk(),
@@ -85,151 +73,115 @@ public class VespaRecordWriter extends RecordWriter {
counters.getDocumentsSkipped());
log.info(progress);
}
-
}
-
@Override
- public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
- if (feedClient != null) {
- feedClient.close();
+ public void close(TaskAttemptContext context) throws IOException {
+ if (feeder != null) {
+ feeder.close();
+ feeder = null;
+ initialized = false;
}
}
- protected ConnectionParams.Builder configureConnectionParams() {
- ConnectionParams.Builder connParamsBuilder = new ConnectionParams.Builder();
- connParamsBuilder.setDryRun(configuration.dryrun());
- connParamsBuilder.setUseCompression(configuration.useCompression());
- connParamsBuilder.setNumPersistentConnectionsPerEndpoint(configuration.numConnections());
- connParamsBuilder.setMaxRetries(configuration.numRetries());
- if (configuration.proxyHost() != null) {
- connParamsBuilder.setProxyHost(configuration.proxyHost());
- }
- if (configuration.proxyPort() >= 0) {
- connParamsBuilder.setProxyPort(configuration.proxyPort());
- }
- return connParamsBuilder;
- }
+ /** Override method to alter {@link FeedClient} configuration */
+ protected void onFeedClientInitialization(FeedClientBuilder builder) {}
- protected FeedParams.Builder configureFeedParams() {
- FeedParams.Builder feedParamsBuilder = new FeedParams.Builder();
- feedParamsBuilder.setDataFormat(configuration.dataFormat());
- feedParamsBuilder.setRoute(configuration.route());
- feedParamsBuilder.setMaxSleepTimeMs(configuration.maxSleepTimeMs());
- feedParamsBuilder.setMaxInFlightRequests(configuration.maxInFlightRequests());
- feedParamsBuilder.setLocalQueueTimeOut(Duration.ofMinutes(10).toMillis());
- return feedParamsBuilder;
+ private void initializeOnFirstWrite() {
+ if (initialized) return;
+ validateConfig();
+ useRandomizedStartupDelayIfEnabled();
+ feeder = createJsonStreamFeeder();
+ initialized = true;
}
- protected SessionParams.Builder configureSessionParams() {
- SessionParams.Builder sessionParamsBuilder = new SessionParams.Builder();
- sessionParamsBuilder.setThrottlerMinSize(configuration.throttlerMinSize());
- sessionParamsBuilder.setClientQueueSize(configuration.maxInFlightRequests()*2);
- return sessionParamsBuilder;
+ private void validateConfig() {
+ if (!config.useSSL()) {
+ throw new IllegalArgumentException("SSL is required for this feed client implementation");
+ }
+ if (config.dataFormat() != FeedParams.DataFormat.JSON_UTF8) {
+ throw new IllegalArgumentException("Only JSON is support by this feed client implementation");
+ }
+ if (config.proxyHost() != null) {
+ log.warning(String.format("Ignoring proxy config (host='%s', port=%d)", config.proxyHost(), config.proxyPort()));
+ }
}
-
- private void initialize() {
- if (!configuration.dryrun() && configuration.randomStartupSleepMs() > 0) {
- int delay = ThreadLocalRandom.current().nextInt(configuration.randomStartupSleepMs());
- log.info("VespaStorage: Delaying startup by " + delay + " ms");
+
+ private void useRandomizedStartupDelayIfEnabled() {
+ if (!config.dryrun() && config.randomStartupSleepMs() > 0) {
+ int delay = ThreadLocalRandom.current().nextInt(config.randomStartupSleepMs());
+ log.info("Delaying startup by " + delay + " ms");
try {
Thread.sleep(delay);
} catch (Exception e) {}
}
+ }
- ConnectionParams.Builder connParamsBuilder = configureConnectionParams();
- FeedParams.Builder feedParamsBuilder = configureFeedParams();
- SessionParams.Builder sessionParams = configureSessionParams();
-
- sessionParams.setConnectionParams(connParamsBuilder.build());
- sessionParams.setFeedParams(feedParamsBuilder.build());
- String endpoints = configuration.endpoint();
- StringTokenizer tokenizer = new StringTokenizer(endpoints, ",");
- while (tokenizer.hasMoreTokens()) {
- String endpoint = tokenizer.nextToken().trim();
- sessionParams.addCluster(new Cluster.Builder().addEndpoint(
- Endpoint.create(endpoint, configuration.defaultPort(), configuration.useSSL())
- ).build());
+ private JsonFeeder createJsonStreamFeeder() {
+ FeedClient feedClient = createFeedClient();
+ JsonFeeder.Builder builder = JsonFeeder.builder(feedClient)
+ .withTimeout(Duration.ofMinutes(10));
+ if (config.route() != null) {
+ builder.withRoute(config.route());
}
+ return builder.build();
- ResultCallback resultCallback = new ResultCallback(counters);
- feedClient = FeedClientFactory.create(sessionParams.build(), resultCallback);
-
- initialized = true;
- log.info("VespaStorage configuration:\n" + configuration.toString());
- log.info(feedClient.getStatsAsJson());
}
- private String findDocIdFromXml(String xml) {
- try {
- XMLEventReader eventReader = XMLInputFactory.newInstance().createXMLEventReader(new StringReader(xml));
- while (eventReader.hasNext()) {
- XMLEvent event = eventReader.nextEvent();
- if (event.getEventType() == XMLEvent.START_ELEMENT) {
- StartElement element = event.asStartElement();
- String elementName = element.getName().getLocalPart();
- if (VespaDocumentOperation.Operation.valid(elementName)) {
- return element.getAttributeByName(QName.valueOf("documentid")).getValue();
- }
- }
- }
- } catch (XMLStreamException | FactoryConfigurationError e) {
- // as json dude does
- return null;
+ private FeedClient createFeedClient() {
+ if (config.dryrun()) {
+ return new DryrunClient();
+ } else {
+ FeedClientBuilder feedClientBuilder = FeedClientBuilder.create(endpointUris(config))
+ .setConnectionsPerEndpoint(config.numConnections())
+ .setMaxStreamPerConnection(streamsPerConnection(config))
+ .setRetryStrategy(retryStrategy(config));
+
+ onFeedClientInitialization(feedClientBuilder);
+ return feedClientBuilder.build();
}
- return null;
}
-
- private String findDocId(String json) throws IOException {
- JsonFactory factory = new JsonFactory();
- try(JsonParser parser = factory.createParser(json)) {
- if (parser.nextToken() != JsonToken.START_OBJECT) {
- return null;
- }
- while (parser.nextToken() != JsonToken.END_OBJECT) {
- String fieldName = parser.getCurrentName();
- parser.nextToken();
- if (VespaDocumentOperation.Operation.valid(fieldName)) {
- String docId = parser.getText();
- return docId;
- } else {
- parser.skipChildren();
- }
- }
- } catch (JsonParseException ex) {
- return null;
- }
- return null;
+
+ private static FeedClient.RetryStrategy retryStrategy(VespaConfiguration config) {
+ int maxRetries = config.numRetries();
+ return new FeedClient.RetryStrategy() {
+ @Override public int retries() { return maxRetries; }
+ };
}
+ private static int streamsPerConnection(VespaConfiguration config) {
+ return Math.min(256, config.maxInFlightRequests() / config.numConnections());
+ }
+
+ private static List<URI> endpointUris(VespaConfiguration config) {
+ return Arrays.stream(config.endpoint().split(","))
+ .map(hostname -> URI.create(String.format("https://%s:%d/", hostname, config.defaultPort())))
+ .collect(toList());
+ }
- static class ResultCallback implements FeedClient.ResultCallback {
- final VespaCounters counters;
+ private static class DryrunClient implements FeedClient {
- public ResultCallback(VespaCounters counters) {
- this.counters = counters;
+ @Override
+ public CompletableFuture<Result> put(DocumentId documentId, String documentJson, OperationParameters params) {
+ return createSuccessResult(documentId);
}
@Override
- public void onCompletion(String docId, Result documentResult) {
- if (!documentResult.isSuccess()) {
- counters.incrementDocumentsFailed(1);
- StringBuilder sb = new StringBuilder();
- sb.append("Problems with docid ");
- sb.append(docId);
- sb.append(": ");
- List<Result.Detail> details = documentResult.getDetails();
- for (Result.Detail detail : details) {
- sb.append(detail.toString());
- sb.append(" ");
- }
- log.warning(sb.toString());
- return;
- }
- counters.incrementDocumentsOk(1);
+ public CompletableFuture<Result> update(DocumentId documentId, String updateJson, OperationParameters params) {
+ return createSuccessResult(documentId);
}
- }
+ @Override
+ public CompletableFuture<Result> remove(DocumentId documentId, OperationParameters params) {
+ return createSuccessResult(documentId);
+ }
+
+ @Override public OperationStats stats() { return null; }
+ @Override public void close(boolean graceful) {}
+ private static CompletableFuture<Result> createSuccessResult(DocumentId documentId) {
+ return CompletableFuture.completedFuture(DryrunResult.create(Result.Type.success, documentId, "ok", null));
+ }
+ }
}
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java
index 2a1179dbec6..7219e621486 100644
--- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java
+++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java
@@ -27,6 +27,7 @@ public class VespaConfiguration {
public static final String MAX_IN_FLIGHT_REQUESTS = "vespa.feed.max.in.flight.requests";
public static final String RANDOM_STARTUP_SLEEP = "vespa.feed.random.startup.sleep.ms";
public static final String NUM_RETRIES = "vespa.feed.num.retries";
+ public static final String USE_LEGACY_CLIENT = "vespa.feed.uselegacyclient";
private final Configuration conf;
private final Properties override;
@@ -130,6 +131,7 @@ public class VespaConfiguration {
return getInt(PROGRESS_REPORT, 1000);
}
+ public boolean useLegacyClient() { return getBoolean(USE_LEGACY_CLIENT, true); }
public String getString(String name) {
if (override != null && override.containsKey(name)) {
diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/MapReduceTest.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/MapReduceTest.java
index ebb34dbc1b1..fa7965acbc1 100644
--- a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/MapReduceTest.java
+++ b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/MapReduceTest.java
@@ -4,10 +4,9 @@ package com.yahoo.vespa.hadoop.pig;
import com.fasterxml.jackson.core.JsonEncoding;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
+import com.yahoo.vespa.hadoop.mapreduce.VespaOutputFormat;
import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration;
import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters;
-import com.yahoo.vespa.hadoop.mapreduce.VespaOutputFormat;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -17,22 +16,25 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.test.PathUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.*;
-import java.util.*;
+import java.io.BufferedInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.StringTokenizer;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public class MapReduceTest {
@@ -44,7 +46,7 @@ public class MapReduceTest {
protected static Path metricsJsonPath;
protected static Path metricsCsvPath;
- @BeforeClass
+ @BeforeAll
public static void setUp() throws IOException {
hdfsBaseDir = new File(PathUtils.getTestDir(MapReduceTest.class).getCanonicalPath());
@@ -62,7 +64,7 @@ public class MapReduceTest {
copyToHdfs("src/test/resources/tabular_data.csv", metricsCsvPath, "data");
}
- @AfterClass
+ @AfterAll
public static void tearDown() throws IOException {
Path testDir = new Path(hdfsBaseDir.getParent());
hdfs.delete(testDir, true);
@@ -82,7 +84,7 @@ public class MapReduceTest {
FileInputFormat.setInputPaths(job, metricsJsonPath);
boolean success = job.waitForCompletion(true);
- assertTrue("Job Failed", success);
+ assertTrue(success, "Job Failed");
VespaCounters counters = VespaCounters.get(job);
assertEquals(10, counters.getDocumentsSent());
@@ -103,7 +105,7 @@ public class MapReduceTest {
FileInputFormat.setInputPaths(job, metricsJsonPath);
boolean success = job.waitForCompletion(true);
- assertTrue("Job Failed", success);
+ assertTrue(success, "Job Failed");
VespaCounters counters = VespaCounters.get(job);
assertEquals(10, counters.getDocumentsSent());
@@ -125,7 +127,7 @@ public class MapReduceTest {
FileInputFormat.setInputPaths(job, metricsCsvPath);
boolean success = job.waitForCompletion(true);
- assertTrue("Job Failed", success);
+ assertTrue(success, "Job Failed");
VespaCounters counters = VespaCounters.get(job);
assertEquals(10, counters.getDocumentsSent());
diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java
index bafeb593e4f..db2fab9b05e 100644
--- a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java
+++ b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java
@@ -12,9 +12,9 @@ import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.map.ObjectMapper;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@@ -23,22 +23,22 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
@SuppressWarnings("serial")
public class VespaDocumentOperationTest {
private final ByteArrayOutputStream outContent = new ByteArrayOutputStream();
private final PrintStream originalOut = System.out;
- @Before
+ @BeforeEach
public void setUpStreams() {
System.setOut(new PrintStream(outContent));
}
- @After
+ @AfterEach
public void restoreStreams() {
System.setOut(originalOut);
}
diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaQueryTest.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaQueryTest.java
index 2d55017b13e..b0e2dd32c04 100644
--- a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaQueryTest.java
+++ b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaQueryTest.java
@@ -8,12 +8,16 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.data.Tuple;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.net.InetSocketAddress;
-import java.util.*;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
-import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
public class VespaQueryTest {
diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java
index 7ca401a0cc8..3565db37126 100644
--- a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java
+++ b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java
@@ -1,14 +1,8 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.hadoop.pig;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.function.Supplier;
-
+import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration;
+import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.mapred.Counters;
@@ -18,11 +12,14 @@ import org.apache.pig.backend.executionengine.ExecJob;
import org.apache.pig.tools.pigstats.JobStats;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.mapreduce.MRJobStats;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
-import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration;
-import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
public class VespaStorageTest {
@@ -51,6 +48,13 @@ public class VespaStorageTest {
assertAllDocumentsOk("src/test/pig/feed_operations_with_json_loader.pig");
}
+ @Test
+ public void requireThatPremadeOperationsWithJsonLoaderFeedAndNonLegacyClientSucceeds() throws Exception {
+ Configuration conf = new HdfsConfiguration();
+ conf.set(VespaConfiguration.USE_SSL, Boolean.TRUE.toString());
+ conf.set(VespaConfiguration.USE_LEGACY_CLIENT, Boolean.FALSE.toString());
+ assertAllDocumentsOk("src/test/pig/feed_operations_with_json_loader.pig", conf);
+ }
@Test
public void requireThatCreateOperationsFeedSucceeds() throws Exception {
diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/util/TupleToolsTest.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/util/TupleToolsTest.java
index 27080a8b2af..93e6a0abfdd 100644
--- a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/util/TupleToolsTest.java
+++ b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/util/TupleToolsTest.java
@@ -6,11 +6,11 @@ import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.io.IOException;
-import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
public class TupleToolsTest {