summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--CMakeLists.txt1
-rw-r--r--config-model-api/abi-spec.json2
-rw-r--r--config-model-api/src/main/java/com/yahoo/config/application/api/DeploymentInstanceSpec.java4
-rw-r--r--config-model-api/src/main/java/com/yahoo/config/application/api/DeploymentSpec.java13
-rw-r--r--config-model-api/src/test/java/com/yahoo/config/application/api/DeploymentSpecTest.java47
-rw-r--r--config-model-api/src/test/java/com/yahoo/config/application/api/DeploymentSpecWithoutInstanceTest.java41
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java6
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/tenant/TenantRepository.java13
-rw-r--r--configserver/src/test/java/com/yahoo/vespa/config/server/tenant/TenantRepositoryTest.java3
-rw-r--r--configserver/src/test/java/com/yahoo/vespa/config/server/tenant/TestTenantRepository.java3
-rw-r--r--container-core/abi-spec.json1
-rw-r--r--container-core/src/main/java/com/yahoo/container/handler/LogHandler.java74
-rw-r--r--container-core/src/main/java/com/yahoo/container/handler/LogReader.java12
-rw-r--r--container-core/src/main/java/com/yahoo/container/jdisc/ContentChannelOutputStream.java20
-rw-r--r--container-core/src/test/java/com/yahoo/container/jdisc/LoggingTestCase.java2
-rw-r--r--eval/CMakeLists.txt1
-rw-r--r--eval/src/tests/instruction/sparse_full_overlap_join_function/CMakeLists.txt9
-rw-r--r--eval/src/tests/instruction/sparse_full_overlap_join_function/sparse_full_overlap_join_function_test.cpp92
-rw-r--r--eval/src/vespa/eval/eval/fast_value.hpp33
-rw-r--r--eval/src/vespa/eval/eval/optimize_tensor_function.cpp2
-rw-r--r--eval/src/vespa/eval/instruction/CMakeLists.txt1
-rw-r--r--eval/src/vespa/eval/instruction/detect_type.h76
-rw-r--r--eval/src/vespa/eval/instruction/generic_join.cpp44
-rw-r--r--eval/src/vespa/eval/instruction/generic_merge.cpp2
-rw-r--r--eval/src/vespa/eval/instruction/sparse_full_overlap_join_function.cpp134
-rw-r--r--eval/src/vespa/eval/instruction/sparse_full_overlap_join_function.h22
-rw-r--r--jdisc-security-filters/pom.xml17
-rw-r--r--jdisc-security-filters/src/main/java/com/yahoo/jdisc/http/filter/security/rule/RuleBasedRequestFilter.java118
-rw-r--r--jdisc-security-filters/src/test/java/com/yahoo/jdisc/http/filter/security/rule/RuleBasedRequestFilterTest.java174
-rw-r--r--node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/StorageMaintainer.java57
-rw-r--r--node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/sync/SyncFileInfo.java66
-rw-r--r--node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImpl.java1
-rw-r--r--node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/maintenance/StorageMaintainerTest.java256
-rw-r--r--node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/maintenance/sync/SyncFileInfoTest.java75
-rw-r--r--searchcore/src/tests/proton/documentdb/documentbucketmover/CMakeLists.txt12
-rw-r--r--searchcore/src/tests/proton/documentdb/documentbucketmover/bucketmover_common.h2
-rw-r--r--searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp2
-rw-r--r--searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_v2_test.cpp578
-rw-r--r--searchcore/src/tests/proton/documentdb/documentbucketmover/documentmover_test.cpp6
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt1
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp7
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp368
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h119
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp116
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h76
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp52
-rw-r--r--vespajlib/src/main/java/com/yahoo/io/WritableByteTransmitter.java2
-rw-r--r--zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Configurator.java7
-rw-r--r--zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java25
-rw-r--r--zookeeper-server/zookeeper-server-common/src/test/java/com/yahoo/vespa/zookeeper/ReconfigurerTest.java18
50 files changed, 2239 insertions, 574 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index eae4cb338eb..db673c9184c 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -113,6 +113,7 @@ add_subdirectory(orchestrator)
add_subdirectory(persistence)
add_subdirectory(persistencetypes)
add_subdirectory(predicate-search)
+add_subdirectory(processing)
add_subdirectory(searchcommon)
add_subdirectory(searchcore)
add_subdirectory(searchcorespi)
diff --git a/config-model-api/abi-spec.json b/config-model-api/abi-spec.json
index fd229b35778..ca736dd90d8 100644
--- a/config-model-api/abi-spec.json
+++ b/config-model-api/abi-spec.json
@@ -198,7 +198,6 @@
"public java.util.Optional globalServiceId()",
"public boolean canUpgradeAt(java.time.Instant)",
"public boolean canChangeRevisionAt(java.time.Instant)",
- "public java.util.Optional athenzDomain()",
"public java.util.Optional athenzService(com.yahoo.config.provision.Environment, com.yahoo.config.provision.RegionName)",
"public com.yahoo.config.application.api.Notifications notifications()",
"public java.util.List endpoints()",
@@ -304,7 +303,6 @@
"methods": [
"public void <init>()",
"public final boolean concerns(com.yahoo.config.provision.Environment)",
- "public boolean deploysTo(com.yahoo.config.provision.Environment, java.util.Optional)",
"public abstract boolean concerns(com.yahoo.config.provision.Environment, java.util.Optional)",
"public java.util.List zones()",
"public java.time.Duration delay()",
diff --git a/config-model-api/src/main/java/com/yahoo/config/application/api/DeploymentInstanceSpec.java b/config-model-api/src/main/java/com/yahoo/config/application/api/DeploymentInstanceSpec.java
index 9c6013a127d..8813eaf9c8c 100644
--- a/config-model-api/src/main/java/com/yahoo/config/application/api/DeploymentInstanceSpec.java
+++ b/config-model-api/src/main/java/com/yahoo/config/application/api/DeploymentInstanceSpec.java
@@ -154,10 +154,6 @@ public class DeploymentInstanceSpec extends DeploymentSpec.Steps {
.noneMatch(block -> block.window().includes(instant));
}
- /** Returns the athenz domain if configured */
- // TODO jonmv: Remove when 7.162 is older than the oldest deployed version.
- public Optional<AthenzDomain> athenzDomain() { return Optional.empty(); }
-
/** Returns the athenz service for environment/region if configured, defaulting to that of the instance */
public Optional<AthenzService> athenzService(Environment environment, RegionName region) {
return zones().stream()
diff --git a/config-model-api/src/main/java/com/yahoo/config/application/api/DeploymentSpec.java b/config-model-api/src/main/java/com/yahoo/config/application/api/DeploymentSpec.java
index a8fcf5e1315..24d83e2b7b1 100644
--- a/config-model-api/src/main/java/com/yahoo/config/application/api/DeploymentSpec.java
+++ b/config-model-api/src/main/java/com/yahoo/config/application/api/DeploymentSpec.java
@@ -254,13 +254,10 @@ public class DeploymentSpec {
return concerns(environment, Optional.empty());
}
- /** Returns whether this step specifies the given environment, and, optionally, region. */
- // TODO jonmv: Remove when 7.147 is the oldest version.
- public boolean deploysTo(Environment environment, Optional<RegionName> region) {
- return concerns(environment, region);
- }
-
- /** Returns whether this step specifies the given environment, and, optionally, region. */
+ /**
+ * Returns whether this step specifies the given environment, and, optionally,
+ * if this step specifies a region, whether this is also the given region.
+ */
public abstract boolean concerns(Environment environment, Optional<RegionName> region);
/** Returns the zones deployed to in this step. */
@@ -348,7 +345,7 @@ public class DeploymentSpec {
@Override
public boolean concerns(Environment environment, Optional<RegionName> region) {
if (environment != this.environment) return false;
- if (region.isPresent() && ! region.equals(this.region)) return false;
+ if (region.isPresent() && this.region.isPresent() && ! region.equals(this.region)) return false;
return true;
}
diff --git a/config-model-api/src/test/java/com/yahoo/config/application/api/DeploymentSpecTest.java b/config-model-api/src/test/java/com/yahoo/config/application/api/DeploymentSpecTest.java
index 06db4fe44be..5561ebdef63 100644
--- a/config-model-api/src/test/java/com/yahoo/config/application/api/DeploymentSpecTest.java
+++ b/config-model-api/src/test/java/com/yahoo/config/application/api/DeploymentSpecTest.java
@@ -44,10 +44,10 @@ public class DeploymentSpecTest {
assertEquals(1, spec.requireInstance("default").steps().size());
assertFalse(spec.majorVersion().isPresent());
assertTrue(spec.requireInstance("default").steps().get(0).concerns(Environment.test));
- assertTrue(spec.requireInstance("default").deploysTo(Environment.test, Optional.empty()));
- assertFalse(spec.requireInstance("default").deploysTo(Environment.test, Optional.of(RegionName.from("region1"))));
- assertFalse(spec.requireInstance("default").deploysTo(Environment.staging, Optional.empty()));
- assertFalse(spec.requireInstance("default").deploysTo(Environment.prod, Optional.empty()));
+ assertTrue(spec.requireInstance("default").concerns(Environment.test, Optional.empty()));
+ assertTrue(spec.requireInstance("default").concerns(Environment.test, Optional.of(RegionName.from("region1")))); // test steps specify no region
+ assertFalse(spec.requireInstance("default").concerns(Environment.staging, Optional.empty()));
+ assertFalse(spec.requireInstance("default").concerns(Environment.prod, Optional.empty()));
assertFalse(spec.requireInstance("default").globalServiceId().isPresent());
}
@@ -81,9 +81,9 @@ public class DeploymentSpecTest {
assertEquals(1, spec.steps().size());
assertEquals(1, spec.requireInstance("default").steps().size());
assertTrue(spec.requireInstance("default").steps().get(0).concerns(Environment.staging));
- assertFalse(spec.requireInstance("default").deploysTo(Environment.test, Optional.empty()));
- assertTrue(spec.requireInstance("default").deploysTo(Environment.staging, Optional.empty()));
- assertFalse(spec.requireInstance("default").deploysTo(Environment.prod, Optional.empty()));
+ assertFalse(spec.requireInstance("default").concerns(Environment.test, Optional.empty()));
+ assertTrue(spec.requireInstance("default").concerns(Environment.staging, Optional.empty()));
+ assertFalse(spec.requireInstance("default").concerns(Environment.prod, Optional.empty()));
assertFalse(spec.requireInstance("default").globalServiceId().isPresent());
}
@@ -110,11 +110,11 @@ public class DeploymentSpecTest {
assertTrue(spec.requireInstance("default").steps().get(1).concerns(Environment.prod, Optional.of(RegionName.from("us-west1"))));
assertTrue(((DeploymentSpec.DeclaredZone)spec.requireInstance("default").steps().get(1)).active());
- assertFalse(spec.requireInstance("default").deploysTo(Environment.test, Optional.empty()));
- assertFalse(spec.requireInstance("default").deploysTo(Environment.staging, Optional.empty()));
- assertTrue(spec.requireInstance("default").deploysTo(Environment.prod, Optional.of(RegionName.from("us-east1"))));
- assertTrue(spec.requireInstance("default").deploysTo(Environment.prod, Optional.of(RegionName.from("us-west1"))));
- assertFalse(spec.requireInstance("default").deploysTo(Environment.prod, Optional.of(RegionName.from("no-such-region"))));
+ assertFalse(spec.requireInstance("default").concerns(Environment.test, Optional.empty()));
+ assertFalse(spec.requireInstance("default").concerns(Environment.staging, Optional.empty()));
+ assertTrue(spec.requireInstance("default").concerns(Environment.prod, Optional.of(RegionName.from("us-east1"))));
+ assertTrue(spec.requireInstance("default").concerns(Environment.prod, Optional.of(RegionName.from("us-west1"))));
+ assertFalse(spec.requireInstance("default").concerns(Environment.prod, Optional.of(RegionName.from("no-such-region"))));
assertFalse(spec.requireInstance("default").globalServiceId().isPresent());
assertEquals(DeploymentSpec.UpgradePolicy.defaultPolicy, spec.requireInstance("default").upgradePolicy());
@@ -289,12 +289,12 @@ public class DeploymentSpecTest {
assertTrue(instance.steps().get(4).concerns(Environment.prod, Optional.of(RegionName.from("us-west1"))));
assertTrue(((DeploymentSpec.DeclaredZone)instance.steps().get(4)).active());
- assertTrue(instance.deploysTo(Environment.test, Optional.empty()));
- assertFalse(instance.deploysTo(Environment.test, Optional.of(RegionName.from("region1"))));
- assertTrue(instance.deploysTo(Environment.staging, Optional.empty()));
- assertTrue(instance.deploysTo(Environment.prod, Optional.of(RegionName.from("us-east1"))));
- assertTrue(instance.deploysTo(Environment.prod, Optional.of(RegionName.from("us-west1"))));
- assertFalse(instance.deploysTo(Environment.prod, Optional.of(RegionName.from("no-such-region"))));
+ assertTrue(instance.concerns(Environment.test, Optional.empty()));
+ assertTrue(instance.concerns(Environment.test, Optional.of(RegionName.from("region1")))); // test steps specify no region
+ assertTrue(instance.concerns(Environment.staging, Optional.empty()));
+ assertTrue(instance.concerns(Environment.prod, Optional.of(RegionName.from("us-east1"))));
+ assertTrue(instance.concerns(Environment.prod, Optional.of(RegionName.from("us-west1"))));
+ assertFalse(instance.concerns(Environment.prod, Optional.of(RegionName.from("no-such-region"))));
assertFalse(instance.globalServiceId().isPresent());
}
@@ -909,9 +909,10 @@ public class DeploymentSpecTest {
@Test
public void athenz_service_is_overridden_from_environment() {
StringReader r = new StringReader(
- "<deployment athenz-domain='domain' athenz-service='service'>" +
+ "<deployment athenz-domain='domain' athenz-service='unused-service'>" +
" <instance id='default' athenz-service='service'>" +
- " <test/>" +
+ " <test />" +
+ " <staging athenz-service='staging-service' />" +
" <prod athenz-service='prod-service'>" +
" <region active='true'>us-west-1</region>" +
" </prod>" +
@@ -919,6 +920,12 @@ public class DeploymentSpecTest {
"</deployment>"
);
DeploymentSpec spec = DeploymentSpec.fromXml(r);
+ assertEquals("service",
+ spec.requireInstance("default").athenzService(Environment.test,
+ RegionName.from("us-east-1")).get().value());
+ assertEquals("staging-service",
+ spec.requireInstance("default").athenzService(Environment.staging,
+ RegionName.from("us-north-1")).get().value());
assertEquals("prod-service",
spec.requireInstance("default").athenzService(Environment.prod,
RegionName.from("us-west-1")).get().value());
diff --git a/config-model-api/src/test/java/com/yahoo/config/application/api/DeploymentSpecWithoutInstanceTest.java b/config-model-api/src/test/java/com/yahoo/config/application/api/DeploymentSpecWithoutInstanceTest.java
index 450be3c9b06..77ce5c2175d 100644
--- a/config-model-api/src/test/java/com/yahoo/config/application/api/DeploymentSpecWithoutInstanceTest.java
+++ b/config-model-api/src/test/java/com/yahoo/config/application/api/DeploymentSpecWithoutInstanceTest.java
@@ -43,10 +43,10 @@ public class DeploymentSpecWithoutInstanceTest {
assertEquals(1, spec.steps().size());
assertFalse(spec.majorVersion().isPresent());
assertTrue(spec.steps().get(0).concerns(Environment.test));
- assertTrue(spec.requireInstance("default").deploysTo(Environment.test, Optional.empty()));
- assertFalse(spec.requireInstance("default").deploysTo(Environment.test, Optional.of(RegionName.from("region1"))));
- assertFalse(spec.requireInstance("default").deploysTo(Environment.staging, Optional.empty()));
- assertFalse(spec.requireInstance("default").deploysTo(Environment.prod, Optional.empty()));
+ assertTrue(spec.requireInstance("default").concerns(Environment.test, Optional.empty()));
+ assertTrue(spec.requireInstance("default").concerns(Environment.test, Optional.of(RegionName.from("region1")))); // test steps specify no region
+ assertFalse(spec.requireInstance("default").concerns(Environment.staging, Optional.empty()));
+ assertFalse(spec.requireInstance("default").concerns(Environment.prod, Optional.empty()));
assertFalse(spec.requireInstance("default").globalServiceId().isPresent());
}
@@ -76,9 +76,9 @@ public class DeploymentSpecWithoutInstanceTest {
assertEquals(1, spec.steps().size());
assertEquals(1, spec.requireInstance("default").steps().size());
assertTrue(spec.requireInstance("default").steps().get(0).concerns(Environment.staging));
- assertFalse(spec.requireInstance("default").deploysTo(Environment.test, Optional.empty()));
- assertTrue(spec.requireInstance("default").deploysTo(Environment.staging, Optional.empty()));
- assertFalse(spec.requireInstance("default").deploysTo(Environment.prod, Optional.empty()));
+ assertFalse(spec.requireInstance("default").concerns(Environment.test, Optional.empty()));
+ assertTrue(spec.requireInstance("default").concerns(Environment.staging, Optional.empty()));
+ assertFalse(spec.requireInstance("default").concerns(Environment.prod, Optional.empty()));
assertFalse(spec.requireInstance("default").globalServiceId().isPresent());
}
@@ -103,11 +103,11 @@ public class DeploymentSpecWithoutInstanceTest {
assertTrue(spec.requireInstance("default").steps().get(1).concerns(Environment.prod, Optional.of(RegionName.from("us-west1"))));
assertTrue(((DeploymentSpec.DeclaredZone)spec.requireInstance("default").steps().get(1)).active());
- assertFalse(spec.requireInstance("default").deploysTo(Environment.test, Optional.empty()));
- assertFalse(spec.requireInstance("default").deploysTo(Environment.staging, Optional.empty()));
- assertTrue(spec.requireInstance("default").deploysTo(Environment.prod, Optional.of(RegionName.from("us-east1"))));
- assertTrue(spec.requireInstance("default").deploysTo(Environment.prod, Optional.of(RegionName.from("us-west1"))));
- assertFalse(spec.requireInstance("default").deploysTo(Environment.prod, Optional.of(RegionName.from("no-such-region"))));
+ assertFalse(spec.requireInstance("default").concerns(Environment.test, Optional.empty()));
+ assertFalse(spec.requireInstance("default").concerns(Environment.staging, Optional.empty()));
+ assertTrue(spec.requireInstance("default").concerns(Environment.prod, Optional.of(RegionName.from("us-east1"))));
+ assertTrue(spec.requireInstance("default").concerns(Environment.prod, Optional.of(RegionName.from("us-west1"))));
+ assertFalse(spec.requireInstance("default").concerns(Environment.prod, Optional.of(RegionName.from("no-such-region"))));
assertFalse(spec.requireInstance("default").globalServiceId().isPresent());
assertEquals(DeploymentSpec.UpgradePolicy.defaultPolicy, spec.requireInstance("default").upgradePolicy());
@@ -144,12 +144,12 @@ public class DeploymentSpecWithoutInstanceTest {
assertTrue(spec.requireInstance("default").steps().get(4).concerns(Environment.prod, Optional.of(RegionName.from("us-west1"))));
assertTrue(((DeploymentSpec.DeclaredZone)spec.requireInstance("default").steps().get(4)).active());
- assertTrue(spec.requireInstance("default").deploysTo(Environment.test, Optional.empty()));
- assertFalse(spec.requireInstance("default").deploysTo(Environment.test, Optional.of(RegionName.from("region1"))));
- assertTrue(spec.requireInstance("default").deploysTo(Environment.staging, Optional.empty()));
- assertTrue(spec.requireInstance("default").deploysTo(Environment.prod, Optional.of(RegionName.from("us-east1"))));
- assertTrue(spec.requireInstance("default").deploysTo(Environment.prod, Optional.of(RegionName.from("us-west1"))));
- assertFalse(spec.requireInstance("default").deploysTo(Environment.prod, Optional.of(RegionName.from("no-such-region"))));
+ assertTrue(spec.requireInstance("default").concerns(Environment.test, Optional.empty()));
+ assertTrue(spec.requireInstance("default").concerns(Environment.test, Optional.of(RegionName.from("region1")))); // test steps specify no region
+ assertTrue(spec.requireInstance("default").concerns(Environment.staging, Optional.empty()));
+ assertTrue(spec.requireInstance("default").concerns(Environment.prod, Optional.of(RegionName.from("us-east1"))));
+ assertTrue(spec.requireInstance("default").concerns(Environment.prod, Optional.of(RegionName.from("us-west1"))));
+ assertFalse(spec.requireInstance("default").concerns(Environment.prod, Optional.of(RegionName.from("no-such-region"))));
assertFalse(spec.requireInstance("default").globalServiceId().isPresent());
}
@@ -552,7 +552,8 @@ public class DeploymentSpecWithoutInstanceTest {
public void athenz_service_is_overridden_from_environment() {
StringReader r = new StringReader(
"<deployment athenz-domain='domain' athenz-service='service'>\n" +
- " <test/>\n" +
+ " <test />\n" +
+ " <staging athenz-service='staging-service' />\n" +
" <prod athenz-service='prod-service'>\n" +
" <region active='true'>us-west-1</region>\n" +
" </prod>\n" +
@@ -561,6 +562,8 @@ public class DeploymentSpecWithoutInstanceTest {
DeploymentSpec spec = DeploymentSpec.fromXml(r);
assertEquals("service", spec.athenzService().get().value());
assertEquals(spec.athenzDomain().get().value(), "domain");
+ assertEquals(spec.requireInstance("default").athenzService(Environment.test, RegionName.from("us-east-1")).get().value(), "service");
+ assertEquals(spec.requireInstance("default").athenzService(Environment.staging, RegionName.from("us-north-1")).get().value(), "staging-service");
assertEquals(spec.requireInstance("default").athenzService(Environment.prod, RegionName.from("us-west-1")).get().value(), "prod-service");
}
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java
index 81e0574d373..76fd57a51cf 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java
@@ -124,7 +124,7 @@ public class SessionRepository {
public SessionRepository(TenantName tenantName,
TenantApplications applicationRepo,
SessionPreparer sessionPreparer,
- Curator curator,
+ ConfigCurator configCurator,
Metrics metrics,
StripedExecutor<TenantName> zkWatcherExecutor,
PermanentApplicationPackage permanentApplicationPackage,
@@ -140,11 +140,11 @@ public class SessionRepository {
ConfigDefinitionRepo configDefinitionRepo,
TenantListener tenantListener) {
this.tenantName = tenantName;
- this.configCurator = ConfigCurator.create(curator);
+ this.configCurator = configCurator;
sessionCounter = new SessionCounter(configCurator, tenantName);
this.sessionsPath = TenantRepository.getSessionsPath(tenantName);
this.clock = clock;
- this.curator = curator;
+ this.curator = configCurator.curator();
this.sessionLifetime = Duration.ofSeconds(configserverConfig.sessionLifetime());
this.zkWatcherExecutor = command -> zkWatcherExecutor.execute(tenantName, command);
this.permanentApplicationPackage = permanentApplicationPackage;
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/tenant/TenantRepository.java b/configserver/src/main/java/com/yahoo/vespa/config/server/tenant/TenantRepository.java
index 1b8e1f2c7e1..babb7e4a596 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/tenant/TenantRepository.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/tenant/TenantRepository.java
@@ -30,6 +30,7 @@ import com.yahoo.vespa.config.server.monitoring.Metrics;
import com.yahoo.vespa.config.server.provision.HostProvisionerProvider;
import com.yahoo.vespa.config.server.session.SessionPreparer;
import com.yahoo.vespa.config.server.session.SessionRepository;
+import com.yahoo.vespa.config.server.zookeeper.ConfigCurator;
import com.yahoo.vespa.curator.Curator;
import com.yahoo.vespa.curator.transaction.CuratorOperations;
import com.yahoo.vespa.curator.transaction.CuratorTransaction;
@@ -96,6 +97,7 @@ public class TenantRepository {
private final Locks<TenantName> tenantLocks = new Locks<>(1, TimeUnit.MINUTES);
private final HostRegistry hostRegistry;
private final TenantListener tenantListener;
+ private final ConfigCurator configCurator;
private final Curator curator;
private final Metrics metrics;
private final MetricUpdater metricUpdater;
@@ -123,7 +125,7 @@ public class TenantRepository {
*/
@Inject
public TenantRepository(HostRegistry hostRegistry,
- Curator curator,
+ ConfigCurator configCurator,
Metrics metrics,
FlagSource flagSource,
SecretStore secretStore,
@@ -136,7 +138,7 @@ public class TenantRepository {
ReloadListener reloadListener,
TenantListener tenantListener) {
this(hostRegistry,
- curator,
+ configCurator,
metrics,
new StripedExecutor<>(),
new FileDistributionFactory(configserverConfig),
@@ -155,7 +157,7 @@ public class TenantRepository {
}
public TenantRepository(HostRegistry hostRegistry,
- Curator curator,
+ ConfigCurator configCurator,
Metrics metrics,
StripedExecutor<TenantName> zkWatcherExecutor,
FileDistributionFactory fileDistributionFactory,
@@ -175,7 +177,7 @@ public class TenantRepository {
this.configserverConfig = configserverConfig;
this.bootstrapExecutor = Executors.newFixedThreadPool(configserverConfig.numParallelTenantLoaders(),
new DaemonThreadFactory("bootstrap-tenant-"));
- this.curator = curator;
+ this.curator = configCurator.curator();
this.metrics = metrics;
metricUpdater = metrics.getOrCreateMetricUpdater(Collections.emptyMap());
this.zkCacheExecutor = zkCacheExecutor;
@@ -191,6 +193,7 @@ public class TenantRepository {
this.configDefinitionRepo = configDefinitionRepo;
this.reloadListener = reloadListener;
this.tenantListener = tenantListener;
+ this.configCurator = configCurator;
curator.framework().getConnectionStateListenable().addListener(this::stateChanged);
@@ -332,7 +335,7 @@ public class TenantRepository {
SessionRepository sessionRepository = new SessionRepository(tenantName,
applicationRepo,
sessionPreparer,
- curator,
+ configCurator,
metrics,
zkWatcherExecutor,
permanentApplicationPackage,
diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/tenant/TenantRepositoryTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/tenant/TenantRepositoryTest.java
index 70bbe2031aa..b3d1495eb6b 100644
--- a/configserver/src/test/java/com/yahoo/vespa/config/server/tenant/TenantRepositoryTest.java
+++ b/configserver/src/test/java/com/yahoo/vespa/config/server/tenant/TenantRepositoryTest.java
@@ -27,6 +27,7 @@ import com.yahoo.vespa.config.server.modelfactory.ModelFactoryRegistry;
import com.yahoo.vespa.config.server.monitoring.MetricUpdater;
import com.yahoo.vespa.config.server.monitoring.Metrics;
import com.yahoo.vespa.config.server.provision.HostProvisionerProvider;
+import com.yahoo.vespa.config.server.zookeeper.ConfigCurator;
import com.yahoo.vespa.curator.Curator;
import com.yahoo.vespa.curator.mock.MockCurator;
import com.yahoo.vespa.flags.InMemoryFlagSource;
@@ -208,7 +209,7 @@ public class TenantRepositoryTest {
public FailingDuringBootstrapTenantRepository(ConfigserverConfig configserverConfig) {
super(new HostRegistry(),
- new MockCurator(),
+ ConfigCurator.create(new MockCurator()),
Metrics.createTestMetrics(),
new StripedExecutor<>(new InThreadExecutorService()),
new FileDistributionFactory(new ConfigserverConfig.Builder().build()),
diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/tenant/TestTenantRepository.java b/configserver/src/test/java/com/yahoo/vespa/config/server/tenant/TestTenantRepository.java
index ad20abaeaf5..8279bf4df5e 100644
--- a/configserver/src/test/java/com/yahoo/vespa/config/server/tenant/TestTenantRepository.java
+++ b/configserver/src/test/java/com/yahoo/vespa/config/server/tenant/TestTenantRepository.java
@@ -17,6 +17,7 @@ import com.yahoo.vespa.config.server.host.HostRegistry;
import com.yahoo.vespa.config.server.modelfactory.ModelFactoryRegistry;
import com.yahoo.vespa.config.server.monitoring.Metrics;
import com.yahoo.vespa.config.server.provision.HostProvisionerProvider;
+import com.yahoo.vespa.config.server.zookeeper.ConfigCurator;
import com.yahoo.vespa.curator.Curator;
import com.yahoo.vespa.curator.mock.MockCurator;
import com.yahoo.vespa.flags.FlagSource;
@@ -46,7 +47,7 @@ public class TestTenantRepository extends TenantRepository {
ReloadListener reloadListener,
TenantListener tenantListener) {
super(hostRegistry,
- curator,
+ ConfigCurator.create(curator),
metrics,
new StripedExecutor<>(new InThreadExecutorService()),
fileDistributionFactory,
diff --git a/container-core/abi-spec.json b/container-core/abi-spec.json
index a2a9c48d61b..46b7a852afd 100644
--- a/container-core/abi-spec.json
+++ b/container-core/abi-spec.json
@@ -376,6 +376,7 @@
"public void nonCopyingWrite(byte[], int, int)",
"public void nonCopyingWrite(byte[])",
"public void send(java.nio.ByteBuffer)",
+ "protected void send(java.nio.ByteBuffer, com.yahoo.jdisc.handler.CompletionHandler)",
"public long written()"
],
"fields": []
diff --git a/container-core/src/main/java/com/yahoo/container/handler/LogHandler.java b/container-core/src/main/java/com/yahoo/container/handler/LogHandler.java
index 4b23eafaa9c..991cd83ffa8 100644
--- a/container-core/src/main/java/com/yahoo/container/handler/LogHandler.java
+++ b/container-core/src/main/java/com/yahoo/container/handler/LogHandler.java
@@ -15,13 +15,15 @@ import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.Optional;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
public class LogHandler extends ThreadedHttpRequestHandler {
private final LogReader logReader;
+ private static final long MB = 1024*1024;
@Inject
public LogHandler(Executor executor, LogHandlerConfig config) {
@@ -45,7 +47,7 @@ public class LogHandler extends ThreadedHttpRequestHandler {
@Override
public void render(OutputStream output, ContentChannel networkChannel, CompletionHandler handler) {
try {
- OutputStream blockingOutput = new BlockingFlushContentChannelOutputStream(networkChannel);
+ OutputStream blockingOutput = new MaxPendingContentChannelOutputStream(networkChannel, 1*MB);
logReader.writeLogs(blockingOutput, from, to, hostname);
blockingOutput.close();
}
@@ -60,29 +62,71 @@ public class LogHandler extends ThreadedHttpRequestHandler {
}
- private static class BlockingFlushContentChannelOutputStream extends ContentChannelOutputStream {
+ private static class MaxPendingContentChannelOutputStream extends ContentChannelOutputStream {
+ private final long maxPending;
+ private final AtomicLong sent = new AtomicLong(0);
+ private final AtomicLong acked = new AtomicLong(0);
- private final ContentChannel channel;
-
- public BlockingFlushContentChannelOutputStream(ContentChannel endpoint) {
+ public MaxPendingContentChannelOutputStream(ContentChannel endpoint, long maxPending) {
super(endpoint);
- this.channel = endpoint;
+ this.maxPending = maxPending;
+ }
+
+ private long pendingBytes() {
+ return sent.get() - acked.get();
+ }
+
+ private class TrackCompletition implements CompletionHandler {
+ private final long written;
+ private final AtomicBoolean replied = new AtomicBoolean(false);
+ TrackCompletition(long written) {
+ this.written = written;
+ sent.addAndGet(written);
+ }
+ @Override
+ public void completed() {
+ if (!replied.getAndSet(true)) {
+ acked.addAndGet(written);
+ }
+ }
+
+ @Override
+ public void failed(Throwable t) {
+ if (!replied.getAndSet(true)) {
+ acked.addAndGet(written);
+ }
+ }
+ }
+ @Override
+ public void send(ByteBuffer src) throws IOException {
+ try {
+ stallWhilePendingAbove(maxPending);
+ } catch (InterruptedException ignored) {
+ throw new IOException("Interrupted waiting for IO");
+ }
+ CompletionHandler pendingTracker = new TrackCompletition(src.remaining());
+ try {
+ send(src, pendingTracker);
+ } catch (Throwable throwable) {
+ pendingTracker.failed(throwable);
+ throw throwable;
+ }
+ }
+
+ private void stallWhilePendingAbove(long pending) throws InterruptedException {
+ while (pendingBytes() > pending) {
+ Thread.sleep(1);
+ }
}
@Override
public void flush() throws IOException {
super.flush();
- CountDownLatch latch = new CountDownLatch(1);
- channel.write(ByteBuffer.allocate(0), // :'(
- new CompletionHandler() {
- @Override public void completed() { latch.countDown(); }
- @Override public void failed(Throwable t) { latch.countDown(); }
- });
try {
- latch.await();
+ stallWhilePendingAbove(0);
}
catch (InterruptedException e) {
- throw new RuntimeException("Interrupted waiting for underlying IO to complete", e);
+ throw new IOException("Interrupted waiting for IO");
}
}
diff --git a/container-core/src/main/java/com/yahoo/container/handler/LogReader.java b/container-core/src/main/java/com/yahoo/container/handler/LogReader.java
index ea2f431f5a6..5b6a87d02df 100644
--- a/container-core/src/main/java/com/yahoo/container/handler/LogReader.java
+++ b/container-core/src/main/java/com/yahoo/container/handler/LogReader.java
@@ -43,13 +43,13 @@ import static java.nio.charset.StandardCharsets.UTF_8;
* @author jonmv
*/
class LogReader {
-
static final Pattern logArchivePathPattern = Pattern.compile("(\\d{4})/(\\d{2})/(\\d{2})/(\\d{2})-\\d+(.gz)?");
static final Pattern vespaLogPathPattern = Pattern.compile("vespa\\.log(?:-(\\d{4})-(\\d{2})-(\\d{2})\\.(\\d{2})-(\\d{2})-(\\d{2})(?:.gz)?)?");
private final Path logDirectory;
private final Pattern logFilePattern;
+
LogReader(String logDirectory, String logFilePattern) {
this(Paths.get(Defaults.getDefaults().underVespaHome(logDirectory)), Pattern.compile(logFilePattern));
}
@@ -72,12 +72,10 @@ class LogReader {
Iterator<LineWithTimestamp> lines = Iterators.mergeSorted(logLineIterators,
Comparator.comparingDouble(LineWithTimestamp::timestamp));
- long linesWritten = 0;
while (lines.hasNext()) {
- writer.write(lines.next().line());
+ String line = lines.next().line();
+ writer.write(line);
writer.newLine();
- if ((++linesWritten & ((1 << 16) - 1)) == 0)
- writer.flush();
}
}
catch (IOException e) {
@@ -188,9 +186,11 @@ class LogReader {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
- if ( logFilePattern.matcher(file.getFileName().toString()).matches()
+ if (logFilePattern.matcher(file.getFileName().toString()).matches()
&& ! attrs.lastModifiedTime().toInstant().isBefore(from))
+ {
paths.add(file);
+ }
return FileVisitResult.CONTINUE;
}
diff --git a/container-core/src/main/java/com/yahoo/container/jdisc/ContentChannelOutputStream.java b/container-core/src/main/java/com/yahoo/container/jdisc/ContentChannelOutputStream.java
index 1d4c20efe5e..270da0c4ab0 100644
--- a/container-core/src/main/java/com/yahoo/container/jdisc/ContentChannelOutputStream.java
+++ b/container-core/src/main/java/com/yahoo/container/jdisc/ContentChannelOutputStream.java
@@ -12,7 +12,6 @@ import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
-import java.util.logging.Level;
import java.util.logging.Logger;
/**
@@ -125,9 +124,13 @@ public class ContentChannelOutputStream extends OutputStream implements Writable
@Override
public void send(ByteBuffer src) throws IOException {
// Don't do a buffer.flush() from here, this method is used by the buffer itself
+ send(src, null);
+ }
+
+ protected void send(ByteBuffer src, CompletionHandler completionHandler) throws IOException {
try {
byteBufferData += src.remaining();
- endpoint.write(src, new LoggingCompletionHandler());
+ endpoint.write(src, new LoggingCompletionHandler(completionHandler));
} catch (RuntimeException e) {
throw new IOException(Exceptions.toMessageString(e), e);
}
@@ -138,10 +141,16 @@ public class ContentChannelOutputStream extends OutputStream implements Writable
return buffer.appended() + byteBufferData;
}
- class LoggingCompletionHandler implements CompletionHandler {
-
+ private class LoggingCompletionHandler implements CompletionHandler {
+ private final CompletionHandler nested;
+ LoggingCompletionHandler(CompletionHandler nested) {
+ this.nested = nested;
+ }
@Override
public void completed() {
+ if (nested != null) {
+ nested.completed();
+ }
}
@Override
@@ -158,6 +167,9 @@ public class ContentChannelOutputStream extends OutputStream implements Writable
if (log.isLoggable(logLevel)) {
log.log(logLevel, "Got exception when writing to client: " + Exceptions.toMessageString(t));
}
+ if (nested != null) {
+ nested.failed(t);
+ }
}
}
diff --git a/container-core/src/test/java/com/yahoo/container/jdisc/LoggingTestCase.java b/container-core/src/test/java/com/yahoo/container/jdisc/LoggingTestCase.java
index 22e55eb5291..bdaf6f7919f 100644
--- a/container-core/src/test/java/com/yahoo/container/jdisc/LoggingTestCase.java
+++ b/container-core/src/test/java/com/yahoo/container/jdisc/LoggingTestCase.java
@@ -96,7 +96,7 @@ public class LoggingTestCase {
}
@Test
- public final void testFailed() throws IOException, InterruptedException {
+ public final void testFailed() throws IOException {
stream.send(createData());
stream.send(createData());
stream.send(createData());
diff --git a/eval/CMakeLists.txt b/eval/CMakeLists.txt
index 22750a27f20..66270bb323b 100644
--- a/eval/CMakeLists.txt
+++ b/eval/CMakeLists.txt
@@ -66,6 +66,7 @@ vespa_define_module(
src/tests/instruction/pow_as_map_optimizer
src/tests/instruction/remove_trivial_dimension_optimizer
src/tests/instruction/sparse_dot_product_function
+ src/tests/instruction/sparse_full_overlap_join_function
src/tests/instruction/sparse_merge_function
src/tests/instruction/sparse_no_overlap_join_function
src/tests/instruction/sum_max_dot_product_function
diff --git a/eval/src/tests/instruction/sparse_full_overlap_join_function/CMakeLists.txt b/eval/src/tests/instruction/sparse_full_overlap_join_function/CMakeLists.txt
new file mode 100644
index 00000000000..54841140278
--- /dev/null
+++ b/eval/src/tests/instruction/sparse_full_overlap_join_function/CMakeLists.txt
@@ -0,0 +1,9 @@
+# Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_executable(eval_sparse_full_overlap_join_function_test_app TEST
+ SOURCES
+ sparse_full_overlap_join_function_test.cpp
+ DEPENDS
+ vespaeval
+ GTest::GTest
+)
+vespa_add_test(NAME eval_sparse_full_overlap_join_function_test_app COMMAND eval_sparse_full_overlap_join_function_test_app)
diff --git a/eval/src/tests/instruction/sparse_full_overlap_join_function/sparse_full_overlap_join_function_test.cpp b/eval/src/tests/instruction/sparse_full_overlap_join_function/sparse_full_overlap_join_function_test.cpp
new file mode 100644
index 00000000000..e3001b17602
--- /dev/null
+++ b/eval/src/tests/instruction/sparse_full_overlap_join_function/sparse_full_overlap_join_function_test.cpp
@@ -0,0 +1,92 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/eval/eval/fast_value.h>
+#include <vespa/eval/eval/simple_value.h>
+#include <vespa/eval/instruction/sparse_full_overlap_join_function.h>
+#include <vespa/eval/eval/test/eval_fixture.h>
+#include <vespa/vespalib/gtest/gtest.h>
+
+using namespace vespalib::eval;
+using namespace vespalib::eval::test;
+
+const ValueBuilderFactory &prod_factory = FastValueBuilderFactory::get();
+const ValueBuilderFactory &test_factory = SimpleValueBuilderFactory::get();
+
+//-----------------------------------------------------------------------------
+
+EvalFixture::ParamRepo make_params() {
+ return EvalFixture::ParamRepo()
+ .add_variants("v1_a", GenSpec(3.0).map("a", 8, 1))
+ .add_variants("v2_a", GenSpec(7.0).map("a", 4, 2))
+ .add_variants("v2_a_trivial", GenSpec(7.0).map("a", 4, 2).idx("b", 1).idx("c", 1))
+ .add_variants("v3_b", GenSpec(5.0).map("b", 4, 2))
+ .add("m1_ab", GenSpec(3.0).map("a", 8, 1).map("b", 8, 1))
+ .add("m2_ab", GenSpec(17.0).map("a", 4, 2).map("b", 4, 2))
+ .add("m3_bc", GenSpec(11.0).map("b", 4, 2).map("c", 4, 2))
+ .add("scalar", GenSpec(1.0))
+ .add("dense_a", GenSpec().idx("a", 5))
+ .add("mixed_ab", GenSpec().map("a", 5, 1).idx("b", 5));
+}
+EvalFixture::ParamRepo param_repo = make_params();
+
+void assert_optimized(const vespalib::string &expr) {
+ EvalFixture fast_fixture(prod_factory, expr, param_repo, true);
+ EvalFixture test_fixture(test_factory, expr, param_repo, true);
+ EvalFixture slow_fixture(prod_factory, expr, param_repo, false);
+ EXPECT_EQ(fast_fixture.result(), EvalFixture::ref(expr, param_repo));
+ EXPECT_EQ(test_fixture.result(), EvalFixture::ref(expr, param_repo));
+ EXPECT_EQ(slow_fixture.result(), EvalFixture::ref(expr, param_repo));
+ EXPECT_EQ(fast_fixture.find_all<SparseFullOverlapJoinFunction>().size(), 1u);
+ EXPECT_EQ(test_fixture.find_all<SparseFullOverlapJoinFunction>().size(), 1u);
+ EXPECT_EQ(slow_fixture.find_all<SparseFullOverlapJoinFunction>().size(), 0u);
+}
+
+void assert_not_optimized(const vespalib::string &expr) {
+ EvalFixture fast_fixture(prod_factory, expr, param_repo, true);
+ EXPECT_EQ(fast_fixture.result(), EvalFixture::ref(expr, param_repo));
+ EXPECT_EQ(fast_fixture.find_all<SparseFullOverlapJoinFunction>().size(), 0u);
+}
+
+//-----------------------------------------------------------------------------
+
+TEST(SparseFullOverlapJoin, expression_can_be_optimized)
+{
+ assert_optimized("v1_a-v2_a");
+ assert_optimized("v2_a-v1_a");
+ assert_optimized("join(v1_a,v2_a,f(x,y)(max(x,y)))");
+}
+
+TEST(SparseFullOverlapJoin, multi_dimensional_expression_can_be_optimized)
+{
+ assert_optimized("m1_ab-m2_ab");
+ assert_optimized("m2_ab-m1_ab");
+ assert_optimized("join(m1_ab,m2_ab,f(x,y)(max(x,y)))");
+}
+
+TEST(SparseFullOverlapJoin, trivial_dimensions_are_ignored)
+{
+ assert_optimized("v1_a*v2_a_trivial");
+ assert_optimized("v2_a_trivial*v1_a");
+}
+
+TEST(SparseFullOverlapJoin, inappropriate_shapes_are_not_optimized)
+{
+ assert_not_optimized("v1_a*scalar");
+ assert_not_optimized("v1_a*mixed_ab");
+ assert_not_optimized("v1_a*v3_b");
+ assert_not_optimized("v1_a*m1_ab");
+ assert_not_optimized("m1_ab*m3_bc");
+ assert_not_optimized("scalar*scalar");
+ assert_not_optimized("dense_a*dense_a");
+ assert_not_optimized("mixed_ab*mixed_ab");
+}
+
+TEST(SparseFullOverlapJoin, mixed_cell_types_are_not_optimized)
+{
+ assert_not_optimized("v1_a*v2_a_f");
+ assert_not_optimized("v1_a_f*v2_a");
+}
+
+//-----------------------------------------------------------------------------
+
+GTEST_MAIN_RUN_ALL_TESTS()
diff --git a/eval/src/vespa/eval/eval/fast_value.hpp b/eval/src/vespa/eval/eval/fast_value.hpp
index d5cfc9c6368..33624fb920e 100644
--- a/eval/src/vespa/eval/eval/fast_value.hpp
+++ b/eval/src/vespa/eval/eval/fast_value.hpp
@@ -47,7 +47,7 @@ struct FastFilterView : public Value::Index::View {
const FastAddrMap &map;
std::vector<size_t> match_dims;
std::vector<size_t> extract_dims;
- std::vector<string_id> query;
+ std::vector<string_id> query;
size_t pos;
bool is_match(ConstArrayRef<string_id> addr) const {
@@ -141,12 +141,6 @@ struct FastValueIndex final : Value::Index {
FastAddrMap map;
FastValueIndex(size_t num_mapped_dims_in, const std::vector<string_id> &labels, size_t expected_subspaces_in)
: map(num_mapped_dims_in, labels, expected_subspaces_in) {}
-
- template <typename LCT, typename RCT, typename OCT, typename Fun>
- static const Value &sparse_full_overlap_join(const ValueType &res_type, const Fun &fun,
- const FastValueIndex &lhs, const FastValueIndex &rhs,
- ConstArrayRef<LCT> lhs_cells, ConstArrayRef<RCT> rhs_cells, Stash &stash);
-
size_t size() const override { return map.size(); }
std::unique_ptr<View> create_view(const std::vector<size_t> &dims) const override;
};
@@ -267,6 +261,10 @@ struct FastValue final : Value, ValueBuilder<T> {
}
my_index.map.add_mapping(hash);
}
+ void add_singledim_mapping(string_id label) {
+ my_handles.push_back(label);
+ my_index.map.add_mapping(FastAddrMap::hash_label(label));
+ }
ArrayRef<T> add_subspace(ConstArrayRef<vespalib::stringref> addr) override {
add_mapping(addr);
return my_cells.add_cells(my_subspace_size);
@@ -344,25 +342,4 @@ struct FastScalarBuilder final : ValueBuilder<T> {
//-----------------------------------------------------------------------------
-template <typename LCT, typename RCT, typename OCT, typename Fun>
-const Value &
-FastValueIndex::sparse_full_overlap_join(const ValueType &res_type, const Fun &fun,
- const FastValueIndex &lhs, const FastValueIndex &rhs,
- ConstArrayRef<LCT> lhs_cells, ConstArrayRef<RCT> rhs_cells, Stash &stash)
-{
- auto &result = stash.create<FastValue<OCT,true>>(res_type, lhs.map.addr_size(), 1, lhs.map.size());
- lhs.map.each_map_entry([&](auto lhs_subspace, auto hash) {
- auto lhs_addr = lhs.map.get_addr(lhs_subspace);
- auto rhs_subspace = rhs.map.lookup(lhs_addr, hash);
- if (rhs_subspace != FastAddrMap::npos()) {
- result.add_mapping(lhs_addr, hash);
- auto cell_value = fun(lhs_cells[lhs_subspace], rhs_cells[rhs_subspace]);
- result.my_cells.push_back_fast(cell_value);
- }
- });
- return result;
-}
-
-//-----------------------------------------------------------------------------
-
}
diff --git a/eval/src/vespa/eval/eval/optimize_tensor_function.cpp b/eval/src/vespa/eval/eval/optimize_tensor_function.cpp
index aef49a2c75b..f1ce293b18c 100644
--- a/eval/src/vespa/eval/eval/optimize_tensor_function.cpp
+++ b/eval/src/vespa/eval/eval/optimize_tensor_function.cpp
@@ -8,6 +8,7 @@
#include <vespa/eval/instruction/sparse_dot_product_function.h>
#include <vespa/eval/instruction/sparse_merge_function.h>
#include <vespa/eval/instruction/sparse_no_overlap_join_function.h>
+#include <vespa/eval/instruction/sparse_full_overlap_join_function.h>
#include <vespa/eval/instruction/mixed_inner_product_function.h>
#include <vespa/eval/instruction/sum_max_dot_product_function.h>
#include <vespa/eval/instruction/dense_xw_product_function.h>
@@ -76,6 +77,7 @@ const TensorFunction &optimize_for_factory(const ValueBuilderFactory &, const Te
child.set(DenseSingleReduceFunction::optimize(child.get(), stash));
child.set(SparseMergeFunction::optimize(child.get(), stash));
child.set(SparseNoOverlapJoinFunction::optimize(child.get(), stash));
+ child.set(SparseFullOverlapJoinFunction::optimize(child.get(), stash));
nodes.pop_back();
}
}
diff --git a/eval/src/vespa/eval/instruction/CMakeLists.txt b/eval/src/vespa/eval/instruction/CMakeLists.txt
index 50f7dbe7005..97838f2adb9 100644
--- a/eval/src/vespa/eval/instruction/CMakeLists.txt
+++ b/eval/src/vespa/eval/instruction/CMakeLists.txt
@@ -33,6 +33,7 @@ vespa_add_library(eval_instruction OBJECT
remove_trivial_dimension_optimizer.cpp
replace_type_function.cpp
sparse_dot_product_function.cpp
+ sparse_full_overlap_join_function.cpp
sparse_merge_function.cpp
sparse_no_overlap_join_function.cpp
sum_max_dot_product_function.cpp
diff --git a/eval/src/vespa/eval/instruction/detect_type.h b/eval/src/vespa/eval/instruction/detect_type.h
deleted file mode 100644
index f1769fa15cc..00000000000
--- a/eval/src/vespa/eval/instruction/detect_type.h
+++ /dev/null
@@ -1,76 +0,0 @@
-// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include <typeindex>
-#include <array>
-#include <cstddef>
-
-#pragma once
-
-namespace vespalib::eval::instruction {
-
-/*
- * Utilities for detecting implementation class by comparing
- * typeindex(typeid(T)); for now these are local to this
- * namespace, but we can consider moving them to a more
- * common place (probably vespalib) if we see more use-cases.
- */
-
-/**
- * Recognize a (const) instance of type T. This is cheaper than
- * dynamic_cast, but requires the object to be exactly of class T.
- * Returns a pointer to the object as T if recognized, nullptr
- * otherwise.
- **/
-template<typename T, typename U>
-const T *
-recognize_by_type_index(const U & object)
-{
- if (std::type_index(typeid(object)) == std::type_index(typeid(T))) {
- return static_cast<const T *>(&object);
- }
- return nullptr;
-}
-
-/**
- * Packs N recognized values into one object, used as return value
- * from detect_type<T>.
- *
- * Use all_converted() or the equivalent bool cast operator to check
- * if all objects were recognized. After this check is successful use
- * get<0>(), get<1>() etc to get a reference to the objects.
- **/
-template<typename T, size_t N>
-class RecognizedValues
-{
-private:
- std::array<const T *, N> _pointers;
-public:
- RecognizedValues(std::array<const T *, N> && pointers)
- : _pointers(std::move(pointers))
- {}
- bool all_converted() const {
- for (auto p : _pointers) {
- if (p == nullptr) return false;
- }
- return true;
- }
- operator bool() const { return all_converted(); }
- template<size_t idx> const T& get() const {
- static_assert(idx < N);
- return *_pointers[idx];
- }
-};
-
-/**
- * For all arguments, detect if they have typeid(T), convert to T if
- * possible, and return a RecognizedValues packing the converted
- * values.
- **/
-template<typename T, typename... Args>
-RecognizedValues<T, sizeof...(Args)>
-detect_type(const Args &... args)
-{
- return RecognizedValues<T, sizeof...(Args)>({(recognize_by_type_index<T>(args))...});
-}
-
-} // namespace
diff --git a/eval/src/vespa/eval/instruction/generic_join.cpp b/eval/src/vespa/eval/instruction/generic_join.cpp
index 4b3755509c7..fb714bcf16e 100644
--- a/eval/src/vespa/eval/instruction/generic_join.cpp
+++ b/eval/src/vespa/eval/instruction/generic_join.cpp
@@ -1,9 +1,7 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "generic_join.h"
-#include "detect_type.h"
#include <vespa/eval/eval/inline_operation.h>
-#include <vespa/eval/eval/fast_value.hpp>
#include <vespa/eval/eval/wrap_param.h>
#include <vespa/vespalib/util/overload.h>
#include <vespa/vespalib/util/stash.h>
@@ -69,43 +67,6 @@ void my_mixed_join_op(State &state, uint64_t param_in) {
//-----------------------------------------------------------------------------
-template <typename LCT, typename RCT, typename OCT, typename Fun>
-void my_sparse_full_overlap_join_op(State &state, uint64_t param_in) {
- const auto &param = unwrap_param<JoinParam>(param_in);
- const Value &lhs = state.peek(1);
- const Value &rhs = state.peek(0);
- auto lhs_cells = lhs.cells().typify<LCT>();
- auto rhs_cells = rhs.cells().typify<RCT>();
- const Value::Index &lhs_index = lhs.index();
- const Value::Index &rhs_index = rhs.index();
- if (auto indexes = detect_type<FastValueIndex>(lhs_index, rhs_index)) {
- const auto &lhs_fast = indexes.get<0>();
- const auto &rhs_fast = indexes.get<1>();
- return (rhs_fast.map.size() < lhs_fast.map.size())
- ? state.pop_pop_push(FastValueIndex::sparse_full_overlap_join<RCT,LCT,OCT,SwapArgs2<Fun>>
- (param.res_type, SwapArgs2<Fun>(param.function), rhs_fast, lhs_fast, rhs_cells, lhs_cells, state.stash))
- : state.pop_pop_push(FastValueIndex::sparse_full_overlap_join<LCT,RCT,OCT,Fun>
- (param.res_type, Fun(param.function), lhs_fast, rhs_fast, lhs_cells, rhs_cells, state.stash));
- }
- Fun fun(param.function);
- SparseJoinState sparse(param.sparse_plan, lhs_index, rhs_index);
- auto builder = param.factory.create_transient_value_builder<OCT>(param.res_type, param.sparse_plan.sources.size(), param.dense_plan.out_size, sparse.first_index.size());
- auto outer = sparse.first_index.create_view({});
- auto inner = sparse.second_index.create_view(sparse.second_view_dims);
- outer->lookup({});
- while (outer->next_result(sparse.first_address, sparse.first_subspace)) {
- inner->lookup(sparse.address_overlap);
- if (inner->next_result(sparse.second_only_address, sparse.second_subspace)) {
- builder->add_subspace(sparse.full_address)[0] = fun(lhs_cells[sparse.lhs_subspace], rhs_cells[sparse.rhs_subspace]);
- }
- }
- auto &result = state.stash.create<std::unique_ptr<Value>>(builder->build(std::move(builder)));
- const Value &result_ref = *(result.get());
- state.pop_pop_push(result_ref);
-};
-
-//-----------------------------------------------------------------------------
-
template <typename LCT, typename RCT, typename OCT, typename Fun, bool forward_lhs>
void my_mixed_dense_join_op(State &state, uint64_t param_in) {
const auto &param = unwrap_param<JoinParam>(param_in);
@@ -175,11 +136,6 @@ struct SelectGenericJoinOp {
if (param.sparse_plan.should_forward_rhs_index()) {
return my_mixed_dense_join_op<LCT,RCT,OCT,Fun,false>;
}
- if ((param.dense_plan.out_size == 1) &&
- (param.sparse_plan.sources.size() == param.sparse_plan.lhs_overlap.size()))
- {
- return my_sparse_full_overlap_join_op<LCT,RCT,OCT,Fun>;
- }
return my_mixed_join_op<LCT,RCT,OCT,Fun>;
}
};
diff --git a/eval/src/vespa/eval/instruction/generic_merge.cpp b/eval/src/vespa/eval/instruction/generic_merge.cpp
index 9b098db7763..434ab308c3c 100644
--- a/eval/src/vespa/eval/instruction/generic_merge.cpp
+++ b/eval/src/vespa/eval/instruction/generic_merge.cpp
@@ -1,9 +1,7 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include "detect_type.h"
#include "generic_merge.h"
#include <vespa/eval/eval/inline_operation.h>
-#include <vespa/eval/eval/fast_value.hpp>
#include <vespa/eval/eval/wrap_param.h>
#include <vespa/vespalib/util/stash.h>
#include <vespa/vespalib/util/typify.h>
diff --git a/eval/src/vespa/eval/instruction/sparse_full_overlap_join_function.cpp b/eval/src/vespa/eval/instruction/sparse_full_overlap_join_function.cpp
new file mode 100644
index 00000000000..480af3315b1
--- /dev/null
+++ b/eval/src/vespa/eval/instruction/sparse_full_overlap_join_function.cpp
@@ -0,0 +1,134 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "sparse_full_overlap_join_function.h"
+#include "generic_join.h"
+#include <vespa/eval/eval/fast_value.hpp>
+#include <vespa/vespalib/util/typify.h>
+
+namespace vespalib::eval {
+
+using namespace tensor_function;
+using namespace operation;
+using namespace instruction;
+
+namespace {
+
+template <typename CT, typename Fun, bool single_dim>
+const Value &my_fast_sparse_full_overlap_join(const FastAddrMap &lhs_map, const FastAddrMap &rhs_map,
+ const CT *lhs_cells, const CT *rhs_cells,
+ const JoinParam &param, Stash &stash)
+{
+ Fun fun(param.function);
+ auto &result = stash.create<FastValue<CT,true>>(param.res_type, lhs_map.addr_size(), 1, lhs_map.size());
+ if constexpr (single_dim) {
+ const auto &labels = lhs_map.labels();
+ for (size_t i = 0; i < labels.size(); ++i) {
+ auto rhs_subspace = rhs_map.lookup_singledim(labels[i]);
+ if (rhs_subspace != FastAddrMap::npos()) {
+ result.add_singledim_mapping(labels[i]);
+ auto cell_value = fun(lhs_cells[i], rhs_cells[rhs_subspace]);
+ result.my_cells.push_back_fast(cell_value);
+ }
+ }
+ } else {
+ lhs_map.each_map_entry([&](auto lhs_subspace, auto hash) {
+ auto lhs_addr = lhs_map.get_addr(lhs_subspace);
+ auto rhs_subspace = rhs_map.lookup(lhs_addr, hash);
+ if (rhs_subspace != FastAddrMap::npos()) {
+ result.add_mapping(lhs_addr, hash);
+ auto cell_value = fun(lhs_cells[lhs_subspace], rhs_cells[rhs_subspace]);
+ result.my_cells.push_back_fast(cell_value);
+ }
+ });
+ }
+ return result;
+}
+
+template <typename CT, typename Fun, bool single_dim>
+const Value &my_fast_sparse_full_overlap_join_dispatch(const FastAddrMap &lhs_map, const FastAddrMap &rhs_map,
+ const CT *lhs_cells, const CT *rhs_cells,
+ const JoinParam &param, Stash &stash)
+{
+ return (rhs_map.size() < lhs_map.size())
+ ? my_fast_sparse_full_overlap_join<CT,SwapArgs2<Fun>,single_dim>(rhs_map, lhs_map, rhs_cells, lhs_cells, param, stash)
+ : my_fast_sparse_full_overlap_join<CT,Fun,single_dim>(lhs_map, rhs_map, lhs_cells, rhs_cells, param, stash);
+}
+
+template <typename CT, typename Fun, bool single_dim>
+void my_sparse_full_overlap_join_op(InterpretedFunction::State &state, uint64_t param_in) {
+ const auto &param = unwrap_param<JoinParam>(param_in);
+ const Value &lhs = state.peek(1);
+ const Value &rhs = state.peek(0);
+ const auto &lhs_idx = lhs.index();
+ const auto &rhs_idx = rhs.index();
+ if (__builtin_expect(are_fast(lhs_idx, rhs_idx), true)) {
+ const Value &res = my_fast_sparse_full_overlap_join_dispatch<CT,Fun,single_dim>(as_fast(lhs_idx).map, as_fast(rhs_idx).map,
+ lhs.cells().typify<CT>().cbegin(), rhs.cells().typify<CT>().cbegin(), param, state.stash);
+ state.pop_pop_push(res);
+ } else {
+ auto res = generic_mixed_join<CT,CT,CT,Fun>(lhs, rhs, param);
+ state.pop_pop_push(*state.stash.create<std::unique_ptr<Value>>(std::move(res)));
+ }
+}
+
+struct SelectSparseFullOverlapJoinOp {
+ template <typename CT, typename Fun, typename SINGLE_DIM>
+ static auto invoke() { return my_sparse_full_overlap_join_op<CT,Fun,SINGLE_DIM::value>; }
+};
+
+using MyTypify = TypifyValue<TypifyCellType,operation::TypifyOp2,TypifyBool>;
+
+bool is_sparse_like(const ValueType &type) {
+ return ((type.count_mapped_dimensions() > 0) && (type.dense_subspace_size() == 1));
+}
+
+} // namespace <unnamed>
+
+SparseFullOverlapJoinFunction::SparseFullOverlapJoinFunction(const tensor_function::Join &original)
+ : tensor_function::Join(original.result_type(),
+ original.lhs(),
+ original.rhs(),
+ original.function())
+{
+ assert(compatible_types(result_type(), lhs().result_type(), rhs().result_type()));
+}
+
+InterpretedFunction::Instruction
+SparseFullOverlapJoinFunction::compile_self(const ValueBuilderFactory &factory, Stash &stash) const
+{
+ const auto &param = stash.create<JoinParam>(lhs().result_type(), rhs().result_type(), function(), factory);
+ assert(param.res_type == result_type());
+ bool single_dim = (result_type().count_mapped_dimensions() == 1);
+ auto op = typify_invoke<3,MyTypify,SelectSparseFullOverlapJoinOp>(result_type().cell_type(), function(), single_dim);
+ return InterpretedFunction::Instruction(op, wrap_param<JoinParam>(param));
+}
+
+bool
+SparseFullOverlapJoinFunction::compatible_types(const ValueType &res, const ValueType &lhs, const ValueType &rhs)
+{
+ if ((lhs.cell_type() == rhs.cell_type()) &&
+ is_sparse_like(lhs) && is_sparse_like(rhs) &&
+ (res.count_mapped_dimensions() == lhs.count_mapped_dimensions()) &&
+ (res.count_mapped_dimensions() == rhs.count_mapped_dimensions()))
+ {
+ assert(is_sparse_like(res));
+ assert(res.cell_type() == lhs.cell_type());
+ return true;
+ }
+ return false;
+}
+
+const TensorFunction &
+SparseFullOverlapJoinFunction::optimize(const TensorFunction &expr, Stash &stash)
+{
+ if (auto join = as<Join>(expr)) {
+ const TensorFunction &lhs = join->lhs();
+ const TensorFunction &rhs = join->rhs();
+ if (compatible_types(expr.result_type(), lhs.result_type(), rhs.result_type())) {
+ return stash.create<SparseFullOverlapJoinFunction>(*join);
+ }
+ }
+ return expr;
+}
+
+} // namespace
diff --git a/eval/src/vespa/eval/instruction/sparse_full_overlap_join_function.h b/eval/src/vespa/eval/instruction/sparse_full_overlap_join_function.h
new file mode 100644
index 00000000000..13d35065997
--- /dev/null
+++ b/eval/src/vespa/eval/instruction/sparse_full_overlap_join_function.h
@@ -0,0 +1,22 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/eval/eval/tensor_function.h>
+
+namespace vespalib::eval {
+
+/**
+ * Tensor function for joining tensors with full sparse overlap.
+ */
+class SparseFullOverlapJoinFunction : public tensor_function::Join
+{
+public:
+ SparseFullOverlapJoinFunction(const tensor_function::Join &original);
+ InterpretedFunction::Instruction compile_self(const ValueBuilderFactory &factory, Stash &stash) const override;
+ bool result_is_mutable() const override { return true; }
+ static bool compatible_types(const ValueType &res, const ValueType &lhs, const ValueType &rhs);
+ static const TensorFunction &optimize(const TensorFunction &expr, Stash &stash);
+};
+
+} // namespace
diff --git a/jdisc-security-filters/pom.xml b/jdisc-security-filters/pom.xml
index 49f77cd60e7..5f6189c5cae 100644
--- a/jdisc-security-filters/pom.xml
+++ b/jdisc-security-filters/pom.xml
@@ -41,7 +41,22 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
-
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.junit.vintage</groupId>
+ <artifactId>junit-vintage-engine</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.yahoo.vespa</groupId>
+ <artifactId>testutil</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/jdisc-security-filters/src/main/java/com/yahoo/jdisc/http/filter/security/rule/RuleBasedRequestFilter.java b/jdisc-security-filters/src/main/java/com/yahoo/jdisc/http/filter/security/rule/RuleBasedRequestFilter.java
new file mode 100644
index 00000000000..71f1965c764
--- /dev/null
+++ b/jdisc-security-filters/src/main/java/com/yahoo/jdisc/http/filter/security/rule/RuleBasedRequestFilter.java
@@ -0,0 +1,118 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jdisc.http.filter.security.rule;
+
+import com.google.inject.Inject;
+import com.yahoo.jdisc.Metric;
+import com.yahoo.jdisc.http.filter.DiscFilterRequest;
+import com.yahoo.jdisc.http.filter.security.base.JsonSecurityRequestFilterBase;
+import com.yahoo.jdisc.http.filter.security.rule.RuleBasedFilterConfig.Rule.Action;
+import com.yahoo.restapi.Path;
+
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+
+/**
+ * Security request filter that filters requests based on host, method and uri path.
+ *
+ * @author bjorncs
+ */
+public class RuleBasedRequestFilter extends JsonSecurityRequestFilterBase {
+
+ private static final Logger log = Logger.getLogger(RuleBasedRequestFilter.class.getName());
+
+ private final Metric metric;
+ private final boolean dryrun;
+ private final List<Rule> rules;
+ private final ErrorResponse defaultResponse;
+
+ @Inject
+ public RuleBasedRequestFilter(Metric metric, RuleBasedFilterConfig config) {
+ this.metric = metric;
+ this.dryrun = config.dryrun();
+ this.rules = Rule.fromConfig(config.rule());
+ this.defaultResponse = createDefaultResponse(config.defaultRule());
+ }
+
+ @Override
+ protected Optional<ErrorResponse> filter(DiscFilterRequest request) {
+ String method = request.getMethod();
+ URI uri = request.getUri();
+ for (Rule rule : rules) {
+ if (rule.matches(method, uri)) {
+ log.log(Level.FINE, () ->
+ String.format("Request '%h' with method '%s' and uri '%s' matched rule '%s'", request, method, uri, rule.name));
+ return responseFor(request, rule.name, rule.response);
+ }
+ }
+ return responseFor(request, "default", defaultResponse);
+ }
+
+ private static ErrorResponse createDefaultResponse(RuleBasedFilterConfig.DefaultRule defaultRule) {
+ switch (defaultRule.action()) {
+ case ALLOW: return null;
+ case BLOCK: return new ErrorResponse(defaultRule.blockResponseCode(), defaultRule.blockResponseMessage());
+ default: throw new IllegalArgumentException(defaultRule.action().name());
+ }
+ }
+
+ private Optional<ErrorResponse> responseFor(DiscFilterRequest request, String ruleName, ErrorResponse response) {
+ int statusCode = response != null ? response.getResponse().getStatus() : 0;
+ Metric.Context metricContext = metric.createContext(Map.of(
+ "rule", ruleName,
+ "dryrun", Boolean.toString(dryrun),
+ "statusCode", Integer.toString(statusCode)));
+ if (response != null) {
+ metric.add("jdisc.http.filter.rule.blocked_requests", 1L, metricContext);
+ log.log(Level.FINE, () -> String.format(
+ "Blocking request '%h' with status code '%d' using rule '%s' (dryrun=%b)", request, statusCode, ruleName, dryrun));
+ return dryrun ? Optional.empty() : Optional.of(response);
+ } else {
+ metric.add("jdisc.http.filter.rule.allowed_requests", 1L, metricContext);
+ log.log(Level.FINE, () -> String.format("Allowing request '%h' using rule '%s' (dryrun=%b)", request, ruleName, dryrun));
+ return Optional.empty();
+ }
+ }
+
+ private static class Rule {
+
+ final String name;
+ final Set<String> hostnames;
+ final Set<String> methods;
+ final Set<String> pathGlobExpressions;
+ final ErrorResponse response;
+
+ static List<Rule> fromConfig(List<RuleBasedFilterConfig.Rule> config) {
+ return config.stream()
+ .map(Rule::new)
+ .collect(Collectors.toList());
+ }
+
+ Rule(RuleBasedFilterConfig.Rule config) {
+ this.name = config.name();
+ this.hostnames = Set.copyOf(config.hostNames());
+ this.methods = config.methods().stream()
+ .map(m -> m.name().toUpperCase())
+ .collect(Collectors.toSet());
+ this.pathGlobExpressions = Set.copyOf(config.pathExpressions());
+ this.response = config.action() == Action.Enum.BLOCK
+ ? new ErrorResponse(config.blockResponseCode(), config.blockResponseMessage())
+ : null;
+ }
+
+ boolean matches(String method, URI uri) {
+ boolean methodMatches = methods.isEmpty() || methods.contains(method.toUpperCase());
+ String host = uri.getHost();
+ boolean hostnameMatches = hostnames.isEmpty() || (host != null && hostnames.contains(host));
+ Path pathMatcher = new Path(uri);
+ boolean pathMatches = pathGlobExpressions.isEmpty() || pathGlobExpressions.stream().anyMatch(pathMatcher::matches);
+ return methodMatches && hostnameMatches && pathMatches;
+ }
+
+ }
+}
diff --git a/jdisc-security-filters/src/test/java/com/yahoo/jdisc/http/filter/security/rule/RuleBasedRequestFilterTest.java b/jdisc-security-filters/src/test/java/com/yahoo/jdisc/http/filter/security/rule/RuleBasedRequestFilterTest.java
new file mode 100644
index 00000000000..c67d3b430c8
--- /dev/null
+++ b/jdisc-security-filters/src/test/java/com/yahoo/jdisc/http/filter/security/rule/RuleBasedRequestFilterTest.java
@@ -0,0 +1,174 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jdisc.http.filter.security.rule;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.yahoo.container.jdisc.RequestHandlerTestDriver.MockResponseHandler;
+import com.yahoo.jdisc.Metric;
+import com.yahoo.jdisc.Response;
+import com.yahoo.jdisc.http.filter.DiscFilterRequest;
+import com.yahoo.jdisc.http.filter.security.rule.RuleBasedFilterConfig.DefaultRule;
+import com.yahoo.jdisc.http.filter.security.rule.RuleBasedFilterConfig.Rule;
+import com.yahoo.test.json.JsonTestHelper;
+import com.yahoo.vespa.jdk8compat.List;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * @author bjorncs
+ */
+class RuleBasedRequestFilterTest {
+
+ private static final ObjectMapper jsonMapper = new ObjectMapper();
+
+ @Test
+ void matches_rule_that_allows_all_methods_and_paths() {
+ RuleBasedFilterConfig config = new RuleBasedFilterConfig.Builder()
+ .dryrun(false)
+ .defaultRule(new DefaultRule.Builder()
+ .action(DefaultRule.Action.Enum.BLOCK))
+ .rule(new Rule.Builder()
+ .name("first")
+ .hostNames("myserver")
+ .pathExpressions(List.of())
+ .methods(List.of())
+ .action(Rule.Action.Enum.ALLOW))
+ .build();
+
+ Metric metric = mock(Metric.class);
+ RuleBasedRequestFilter filter = new RuleBasedRequestFilter(metric, config);
+ MockResponseHandler responseHandler = new MockResponseHandler();
+ filter.filter(request("PATCH", "http://myserver:80/path-to-resource"), responseHandler);
+
+ assertAllowed(responseHandler, metric);
+
+ }
+
+ @Test
+ void performs_action_on_first_matching_rule() throws IOException {
+ RuleBasedFilterConfig config = new RuleBasedFilterConfig.Builder()
+ .dryrun(false)
+ .defaultRule(new DefaultRule.Builder()
+ .action(DefaultRule.Action.Enum.ALLOW))
+ .rule(new Rule.Builder()
+ .name("first")
+ .pathExpressions("/path-to-resource")
+ .methods(Rule.Methods.Enum.DELETE)
+ .action(Rule.Action.Enum.BLOCK)
+ .blockResponseCode(403))
+ .rule(new Rule.Builder()
+ .name("second")
+ .pathExpressions("/path-to-resource")
+ .methods(Rule.Methods.Enum.GET)
+ .action(Rule.Action.Enum.BLOCK)
+ .blockResponseCode(404))
+ .build();
+
+ Metric metric = mock(Metric.class);
+ RuleBasedRequestFilter filter = new RuleBasedRequestFilter(metric, config);
+ MockResponseHandler responseHandler = new MockResponseHandler();
+ filter.filter(request("GET", "http://myserver:80/path-to-resource"), responseHandler);
+
+ assertBlocked(responseHandler, metric, 404, "");
+ }
+
+ @Test
+ void performs_default_action_if_no_rule_matches() throws IOException {
+ RuleBasedFilterConfig config = new RuleBasedFilterConfig.Builder()
+ .dryrun(false)
+ .defaultRule(new DefaultRule.Builder()
+ .action(DefaultRule.Action.Enum.BLOCK)
+ .blockResponseCode(403)
+ .blockResponseMessage("my custom message"))
+ .rule(new Rule.Builder()
+ .name("rule")
+ .pathExpressions("/path-to-resource")
+ .methods(Rule.Methods.Enum.GET)
+ .action(Rule.Action.Enum.ALLOW))
+ .build();
+
+ Metric metric = mock(Metric.class);
+ RuleBasedRequestFilter filter = new RuleBasedRequestFilter(metric, config);
+ MockResponseHandler responseHandler = new MockResponseHandler();
+ filter.filter(request("POST", "http://myserver:80/"), responseHandler);
+
+ assertBlocked(responseHandler, metric, 403, "my custom message");
+ }
+
+ @Test
+ void matches_rule_with_multiple_alternatives_for_host_path_and_method() throws IOException {
+ RuleBasedFilterConfig config = new RuleBasedFilterConfig.Builder()
+ .dryrun(false)
+ .defaultRule(new DefaultRule.Builder()
+ .action(DefaultRule.Action.Enum.ALLOW))
+ .rule(new Rule.Builder()
+ .name("rule")
+ .hostNames(Set.of("server1", "server2", "server3"))
+ .pathExpressions(Set.of("/path-to-resource/{*}", "/another-path"))
+ .methods(Set.of(Rule.Methods.Enum.GET, Rule.Methods.POST, Rule.Methods.DELETE))
+ .action(Rule.Action.Enum.BLOCK)
+ .blockResponseCode(404)
+ .blockResponseMessage("not found"))
+ .build();
+
+ Metric metric = mock(Metric.class);
+ RuleBasedRequestFilter filter = new RuleBasedRequestFilter(metric, config);
+ MockResponseHandler responseHandler = new MockResponseHandler();
+ filter.filter(request("POST", "https://server1:443/path-to-resource/id/1/subid/2"), responseHandler);
+
+ assertBlocked(responseHandler, metric, 404, "not found");
+ }
+
+ @Test
+ void no_filtering_if_request_is_allowed() {
+ RuleBasedFilterConfig config = new RuleBasedFilterConfig.Builder()
+ .dryrun(false)
+ .defaultRule(new DefaultRule.Builder()
+ .action(DefaultRule.Action.Enum.ALLOW))
+ .build();
+
+ Metric metric = mock(Metric.class);
+ RuleBasedRequestFilter filter = new RuleBasedRequestFilter(metric, config);
+ MockResponseHandler responseHandler = new MockResponseHandler();
+ filter.filter(request("DELETE", "http://myserver:80/"), responseHandler);
+
+ assertAllowed(responseHandler, metric);
+ }
+
+ private static DiscFilterRequest request(String method, String uri) {
+ DiscFilterRequest request = mock(DiscFilterRequest.class);
+ when(request.getMethod()).thenReturn(method);
+ when(request.getUri()).thenReturn(URI.create(uri));
+ return request;
+ }
+
+ private static void assertAllowed(MockResponseHandler handler, Metric metric) {
+ verify(metric).add(eq("jdisc.http.filter.rule.allowed_requests"), eq(1L), any());
+ assertNull(handler.getResponse());
+ }
+
+ private static void assertBlocked(MockResponseHandler handler, Metric metric, int expectedCode, String expectedMessage) throws IOException {
+ verify(metric).add(eq("jdisc.http.filter.rule.blocked_requests"), eq(1L), any());
+ Response response = handler.getResponse();
+ assertNotNull(response);
+ assertEquals(expectedCode, response.getStatus());
+ ObjectNode expectedJson = jsonMapper.createObjectNode();
+ expectedJson.put("message", expectedMessage).put("code", expectedCode);
+ JsonNode actualJson = jsonMapper.readTree(handler.readAll().getBytes());
+ JsonTestHelper.assertJsonEquals(expectedJson, actualJson);
+ }
+
+} \ No newline at end of file
diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/StorageMaintainer.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/StorageMaintainer.java
index a5efec1dcb7..c30add928ba 100644
--- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/StorageMaintainer.java
+++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/StorageMaintainer.java
@@ -3,8 +3,13 @@ package com.yahoo.vespa.hosted.node.admin.maintenance;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
+import com.yahoo.config.provision.ApplicationId;
import com.yahoo.config.provision.DockerImage;
import com.yahoo.config.provision.NodeType;
+import com.yahoo.vespa.flags.FetchVector;
+import com.yahoo.vespa.flags.FlagSource;
+import com.yahoo.vespa.flags.Flags;
+import com.yahoo.vespa.flags.StringFlag;
import com.yahoo.vespa.hosted.dockerapi.Container;
import com.yahoo.vespa.hosted.dockerapi.ContainerName;
import com.yahoo.vespa.hosted.node.admin.component.TaskContext;
@@ -13,6 +18,8 @@ import com.yahoo.vespa.hosted.node.admin.maintenance.disk.CoredumpCleanupRule;
import com.yahoo.vespa.hosted.node.admin.maintenance.disk.DiskCleanup;
import com.yahoo.vespa.hosted.node.admin.maintenance.disk.DiskCleanupRule;
import com.yahoo.vespa.hosted.node.admin.maintenance.disk.LinearCleanupRule;
+import com.yahoo.vespa.hosted.node.admin.maintenance.sync.SyncClient;
+import com.yahoo.vespa.hosted.node.admin.maintenance.sync.SyncFileInfo;
import com.yahoo.vespa.hosted.node.admin.nodeadmin.ConvergenceException;
import com.yahoo.vespa.hosted.node.admin.nodeagent.NodeAgentContext;
import com.yahoo.vespa.hosted.node.admin.nodeagent.NodeAgentTask;
@@ -31,6 +38,7 @@ import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -39,6 +47,7 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;
+import java.util.stream.Collectors;
import static com.yahoo.vespa.hosted.node.admin.maintenance.disk.DiskCleanupRule.Priority;
import static com.yahoo.yolean.Exceptions.uncheck;
@@ -53,9 +62,11 @@ public class StorageMaintainer {
private final Terminal terminal;
private final CoredumpHandler coredumpHandler;
- private final Path archiveContainerStoragePath;
private final DiskCleanup diskCleanup;
+ private final SyncClient syncClient;
private final Clock clock;
+ private final Path archiveContainerStoragePath;
+ private final StringFlag syncBucketNameFlag;
// We cache disk usage to avoid doing expensive disk operations so often
private final Cache<ContainerName, DiskSize> diskUsage = CacheBuilder.newBuilder()
@@ -63,16 +74,34 @@ public class StorageMaintainer {
.expireAfterWrite(5, TimeUnit.MINUTES)
.build();
- public StorageMaintainer(Terminal terminal, CoredumpHandler coredumpHandler, Path archiveContainerStoragePath) {
- this(terminal, coredumpHandler, archiveContainerStoragePath, new DiskCleanup(), Clock.systemUTC());
- }
-
- public StorageMaintainer(Terminal terminal, CoredumpHandler coredumpHandler, Path archiveContainerStoragePath, DiskCleanup diskCleanup, Clock clock) {
+ public StorageMaintainer(Terminal terminal, CoredumpHandler coredumpHandler, DiskCleanup diskCleanup,
+ SyncClient syncClient, Clock clock, Path archiveContainerStoragePath, FlagSource flagSource) {
this.terminal = terminal;
this.coredumpHandler = coredumpHandler;
- this.archiveContainerStoragePath = archiveContainerStoragePath;
this.diskCleanup = diskCleanup;
+ this.syncClient = syncClient;
this.clock = clock;
+ this.archiveContainerStoragePath = archiveContainerStoragePath;
+ this.syncBucketNameFlag = Flags.SYNC_HOST_LOGS_TO_S3_BUCKET.bindTo(flagSource);
+ }
+
+ public boolean syncLogs(NodeAgentContext context) {
+ Optional<ApplicationId> app = context.node().owner();
+ if (app.isEmpty()) return false;
+ String bucketName = syncBucketNameFlag
+ .with(FetchVector.Dimension.NODE_TYPE, NodeType.tenant.name())
+ .with(FetchVector.Dimension.APPLICATION_ID, app.get().serializedForm())
+ .value();
+ if (bucketName.isBlank()) return false;
+
+ List<SyncFileInfo> syncFileInfos = FileFinder.files(pathOnHostUnderContainerVespaHome(context, "logs/vespa"))
+ .maxDepth(2)
+ .stream()
+ .sorted(Comparator.comparing(FileFinder.FileAttributes::lastModifiedTime))
+ .flatMap(fa -> SyncFileInfo.tenantLog(bucketName, app.get(), context.hostname(), fa.path()).stream())
+ .collect(Collectors.toList());
+
+ return syncClient.sync(context, syncFileInfos, 1);
}
public Optional<DiskSize> diskUsageFor(NodeAgentContext context) {
@@ -127,22 +156,20 @@ public class StorageMaintainer {
Instant start = clock.instant();
double oneMonthSeconds = Duration.ofDays(30).getSeconds();
Function<Instant, Double> monthNormalizer = instant -> Duration.between(instant, start).getSeconds() / oneMonthSeconds;
- Function<String, Path> pathOnHostUnderContainerVespaHome = path ->
- context.pathOnHostFromPathInNode(context.pathInNodeUnderVespaHome(path));
List<DiskCleanupRule> rules = new ArrayList<>();
- rules.add(CoredumpCleanupRule.forContainer(pathOnHostUnderContainerVespaHome.apply("var/crash")));
+ rules.add(CoredumpCleanupRule.forContainer(pathOnHostUnderContainerVespaHome(context, "var/crash")));
if (context.node().membership().map(m -> m.type().isContainer()).orElse(false))
- rules.add(new LinearCleanupRule(() -> FileFinder.files(pathOnHostUnderContainerVespaHome.apply("logs/vespa/qrs")).list(),
+ rules.add(new LinearCleanupRule(() -> FileFinder.files(pathOnHostUnderContainerVespaHome(context, "logs/vespa/qrs")).list(),
fa -> monthNormalizer.apply(fa.lastModifiedTime()), Priority.LOWEST, Priority.HIGHEST));
if (context.nodeType() == NodeType.tenant && context.node().membership().map(m -> m.type().isAdmin()).orElse(false))
- rules.add(new LinearCleanupRule(() -> FileFinder.files(pathOnHostUnderContainerVespaHome.apply("logs/vespa/logarchive")).list(),
+ rules.add(new LinearCleanupRule(() -> FileFinder.files(pathOnHostUnderContainerVespaHome(context, "logs/vespa/logarchive")).list(),
fa -> monthNormalizer.apply(fa.lastModifiedTime()), Priority.LOWEST, Priority.HIGHEST));
if (context.nodeType() == NodeType.proxy)
- rules.add(new LinearCleanupRule(() -> FileFinder.files(pathOnHostUnderContainerVespaHome.apply("logs/nginx")).list(),
+ rules.add(new LinearCleanupRule(() -> FileFinder.files(pathOnHostUnderContainerVespaHome(context, "logs/nginx")).list(),
fa -> monthNormalizer.apply(fa.lastModifiedTime()), Priority.LOWEST, Priority.MEDIUM));
return rules;
@@ -212,4 +239,8 @@ public class StorageMaintainer {
.orElse("<none>")
);
}
+
+ private static Path pathOnHostUnderContainerVespaHome(NodeAgentContext context, String path) {
+ return context.pathOnHostFromPathInNode(context.pathInNodeUnderVespaHome(path));
+ }
}
diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/sync/SyncFileInfo.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/sync/SyncFileInfo.java
index 3b88d28613c..19ec8c0e283 100644
--- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/sync/SyncFileInfo.java
+++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/sync/SyncFileInfo.java
@@ -4,11 +4,9 @@ package com.yahoo.vespa.hosted.node.admin.maintenance.sync;
import com.yahoo.config.provision.ApplicationId;
import com.yahoo.config.provision.HostName;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.Optional;
/**
* @author freva
@@ -18,55 +16,64 @@ public class SyncFileInfo {
private final String bucketName;
private final Path srcPath;
private final Path destPath;
- private final boolean compressWithZstd;
+ private final Compression uploadCompression;
- private SyncFileInfo(String bucketName, Path srcPath, Path destPath, boolean compressWithZstd) {
+ private SyncFileInfo(String bucketName, Path srcPath, Path destPath, Compression uploadCompression) {
this.bucketName = bucketName;
this.srcPath = srcPath;
this.destPath = destPath;
- this.compressWithZstd = compressWithZstd;
+ this.uploadCompression = uploadCompression;
}
public String bucketName() {
return bucketName;
}
+ /** Source path of the file to sync */
public Path srcPath() {
return srcPath;
}
+ /** Remote path to store the file at */
public Path destPath() {
return destPath;
}
- public InputStream inputStream() throws IOException {
- InputStream is = Files.newInputStream(srcPath);
- if (compressWithZstd) return new ZstdCompressingInputStream(is, 4 << 20);
- return is;
+ /** Compression algorithm to use when uploading the file */
+ public Compression uploadCompression() {
+ return uploadCompression;
}
+ public static Optional<SyncFileInfo> tenantLog(String bucketName, ApplicationId applicationId, HostName hostName, Path logFile) {
+ String filename = logFile.getFileName().toString();
+ Compression compression = Compression.NONE;
+ String dir = null;
+
+ if (filename.startsWith("vespa.log-")) {
+ dir = "logs/vespa";
+ compression = Compression.ZSTD;
+ } else if (filename.endsWith(".zst")) {
+ if (filename.startsWith("JsonAccessLog.") || filename.startsWith("access"))
+ dir = "logs/access";
+ else if (filename.startsWith("ConnectionLog."))
+ dir = "logs/connection";
+ }
- public static SyncFileInfo tenantVespaLog(String bucketName, ApplicationId applicationId, HostName hostName, Path vespaLogFile) {
- return new SyncFileInfo(bucketName, vespaLogFile, destination(applicationId, hostName, "logs/vespa", vespaLogFile, ".zst"), true);
- }
-
- public static SyncFileInfo tenantAccessLog(String bucketName, ApplicationId applicationId, HostName hostName, Path accessLogFile) {
- return new SyncFileInfo(bucketName, accessLogFile, destination(applicationId, hostName, "logs/access", accessLogFile, null), false);
+ if (dir == null) return Optional.empty();
+ return Optional.of(new SyncFileInfo(
+ bucketName, logFile, destination(applicationId, hostName, dir, logFile, compression), compression));
}
public static SyncFileInfo infrastructureVespaLog(String bucketName, HostName hostName, Path vespaLogFile) {
- return new SyncFileInfo(bucketName, vespaLogFile, destination(null, hostName, "logs/vespa", vespaLogFile, ".zst"), true);
- }
-
- public static SyncFileInfo infrastructureAccessLog(String bucketName, HostName hostName, Path accessLogFile) {
- return new SyncFileInfo(bucketName, accessLogFile, destination(null, hostName, "logs/access", accessLogFile, null), false);
+ Compression compression = Compression.ZSTD;
+ return new SyncFileInfo(bucketName, vespaLogFile, destination(null, hostName, "logs/vespa", vespaLogFile, compression), compression);
}
- private static Path destination(ApplicationId app, HostName hostName, String dir, Path filename, String extension) {
- StringBuilder sb = new StringBuilder(100).append('/');
+ private static Path destination(ApplicationId app, HostName hostName, String dir, Path filename, Compression uploadCompression) {
+ StringBuilder sb = new StringBuilder(100);
if (app == null) sb.append("infrastructure");
- else sb.append(app.tenant().value()).append('.').append(app.application().value()).append('.').append(app.instance().value());
+ else sb.append(app.tenant().value()).append('/').append(app.application().value()).append('/').append(app.instance().value());
sb.append('/');
for (char c: hostName.value().toCharArray()) {
@@ -76,8 +83,17 @@ public class SyncFileInfo {
sb.append('/').append(dir).append('/').append(filename.getFileName().toString());
- if (extension != null) sb.append(extension);
+ if (uploadCompression.extension != null) sb.append(uploadCompression.extension);
return Paths.get(sb.toString());
}
+
+ public enum Compression {
+ NONE(null), ZSTD(".zst");
+
+ private final String extension;
+ Compression(String extension) {
+ this.extension = extension;
+ }
+ }
}
diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImpl.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImpl.java
index 24928b4c8a6..4bc6fae4e56 100644
--- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImpl.java
+++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImpl.java
@@ -499,6 +499,7 @@ public class NodeAgentImpl implements NodeAgent {
orchestrator.resume(context.hostname().value());
suspendedInOrchestrator = false;
}
+ storageMaintainer.syncLogs(context);
break;
case provisioned:
nodeRepository.setNodeState(context.hostname().value(), NodeState.dirty);
diff --git a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/maintenance/StorageMaintainerTest.java b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/maintenance/StorageMaintainerTest.java
index cb615a94615..a17ffadcb45 100644
--- a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/maintenance/StorageMaintainerTest.java
+++ b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/maintenance/StorageMaintainerTest.java
@@ -3,8 +3,11 @@ package com.yahoo.vespa.hosted.node.admin.maintenance;
import com.yahoo.config.provision.NodeResources;
import com.yahoo.test.ManualClock;
+import com.yahoo.vespa.flags.InMemoryFlagSource;
import com.yahoo.vespa.hosted.node.admin.configserver.noderepository.NodeSpec;
+import com.yahoo.vespa.hosted.node.admin.maintenance.coredump.CoredumpHandler;
import com.yahoo.vespa.hosted.node.admin.maintenance.disk.DiskCleanup;
+import com.yahoo.vespa.hosted.node.admin.maintenance.sync.SyncClient;
import com.yahoo.vespa.hosted.node.admin.nodeagent.NodeAgentContext;
import com.yahoo.vespa.hosted.node.admin.nodeagent.NodeAgentContextImpl;
import com.yahoo.vespa.hosted.node.admin.task.util.file.FileFinder;
@@ -13,8 +16,6 @@ import com.yahoo.vespa.hosted.node.admin.task.util.process.TestTerminal;
import com.yahoo.vespa.test.file.TestFileSystem;
import org.junit.After;
import org.junit.Test;
-import org.junit.experimental.runners.Enclosed;
-import org.junit.runner.RunWith;
import java.io.IOException;
import java.nio.file.FileSystem;
@@ -38,150 +39,141 @@ import static org.mockito.Mockito.verifyNoInteractions;
/**
* @author dybis
*/
-@RunWith(Enclosed.class)
public class StorageMaintainerTest {
- public static class DiskUsageTests {
-
- private final TestTerminal terminal = new TestTerminal();
-
- @Test
- public void testDiskUsed() throws IOException {
- StorageMaintainer storageMaintainer = new StorageMaintainer(terminal, null, null);
- FileSystem fileSystem = TestFileSystem.create();
- NodeAgentContext context = new NodeAgentContextImpl.Builder("host-1.domain.tld").fileSystem(fileSystem).build();
- Files.createDirectories(context.pathOnHostFromPathInNode("/"));
+ private final TestTerminal terminal = new TestTerminal();
+ private final CoredumpHandler coredumpHandler = mock(CoredumpHandler.class);
+ private final DiskCleanup diskCleanup = mock(DiskCleanup.class);
+ private final SyncClient syncClient = mock(SyncClient.class);
+ private final ManualClock clock = new ManualClock(Instant.ofEpochSecond(1234567890));
+ private final InMemoryFlagSource flagSource = new InMemoryFlagSource();
+ private final FileSystem fileSystem = TestFileSystem.create();
+ private final StorageMaintainer storageMaintainer = new StorageMaintainer(terminal, coredumpHandler, diskCleanup, syncClient, clock,
+ fileSystem.getPath("/home/docker/container-storage/container-archive"), flagSource);
+
+ @Test
+ public void testDiskUsed() throws IOException {
+ NodeAgentContext context = new NodeAgentContextImpl.Builder("host-1.domain.tld").fileSystem(fileSystem).build();
+ Files.createDirectories(context.pathOnHostFromPathInNode("/"));
+
+ terminal.expectCommand("du -xsk /home/docker/container-storage/host-1 2>&1", 0, "321\t/home/docker/container-storage/host-1/");
+ assertEquals(Optional.of(DiskSize.of(328_704)), storageMaintainer.diskUsageFor(context));
+
+ // Value should still be cached, no new execution against the terminal
+ assertEquals(Optional.of(DiskSize.of(328_704)), storageMaintainer.diskUsageFor(context));
+ }
- terminal.expectCommand("du -xsk /home/docker/container-storage/host-1 2>&1", 0, "321\t/home/docker/container-storage/host-1/");
- assertEquals(Optional.of(DiskSize.of(328_704)), storageMaintainer.diskUsageFor(context));
+ @Test
+ public void testNonExistingDiskUsed() {
+ DiskSize size = storageMaintainer.getDiskUsed(null, Paths.get("/fake/path"));
+ assertEquals(DiskSize.ZERO, size);
+ }
- // Value should still be cached, no new execution against the terminal
- assertEquals(Optional.of(DiskSize.of(328_704)), storageMaintainer.diskUsageFor(context));
- }
+ @Test
+ public void archive_container_data_test() throws IOException {
+ // Create some files in containers
+ NodeAgentContext context1 = createNodeAgentContextAndContainerStorage(fileSystem, "container-1");
+ createNodeAgentContextAndContainerStorage(fileSystem, "container-2");
+
+ Path pathToArchiveDir = fileSystem.getPath("/home/docker/container-storage/container-archive");
+ Files.createDirectories(pathToArchiveDir);
+
+ Path containerStorageRoot = context1.pathOnHostFromPathInNode("/").getParent();
+ Set<String> containerStorageRootContentsBeforeArchive = FileFinder.from(containerStorageRoot)
+ .maxDepth(1)
+ .stream()
+ .map(FileFinder.FileAttributes::filename)
+ .collect(Collectors.toSet());
+ assertEquals(Set.of("container-archive", "container-1", "container-2"), containerStorageRootContentsBeforeArchive);
+
+
+ // Archive container-1
+ storageMaintainer.archiveNodeStorage(context1);
+
+ clock.advance(Duration.ofSeconds(3));
+ storageMaintainer.archiveNodeStorage(context1);
+
+ // container-1 should be gone from container-storage
+ Set<String> containerStorageRootContentsAfterArchive = FileFinder.from(containerStorageRoot)
+ .maxDepth(1)
+ .stream()
+ .map(FileFinder.FileAttributes::filename)
+ .collect(Collectors.toSet());
+ assertEquals(Set.of("container-archive", "container-2"), containerStorageRootContentsAfterArchive);
+
+ // container archive directory should contain exactly 1 directory - the one we just archived
+ List<FileFinder.FileAttributes> containerArchiveContentsAfterArchive = FileFinder.from(pathToArchiveDir).maxDepth(1).list();
+ assertEquals(1, containerArchiveContentsAfterArchive.size());
+ Path archivedContainerStoragePath = containerArchiveContentsAfterArchive.get(0).path();
+ assertEquals("container-1_20090213233130", archivedContainerStoragePath.getFileName().toString());
+ Set<String> archivedContainerStorageContents = FileFinder.files(archivedContainerStoragePath)
+ .stream()
+ .map(fileAttributes -> archivedContainerStoragePath.relativize(fileAttributes.path()).toString())
+ .collect(Collectors.toSet());
+ assertEquals(Set.of("opt/vespa/logs/vespa/vespa.log", "opt/vespa/logs/vespa/zookeeper.log"), archivedContainerStorageContents);
+ }
- @Test
- public void testNonExistingDiskUsed() {
- StorageMaintainer storageMaintainer = new StorageMaintainer(terminal, null, null);
- DiskSize size = storageMaintainer.getDiskUsed(null, Paths.get("/fake/path"));
- assertEquals(DiskSize.ZERO, size);
- }
+ private static NodeAgentContext createNodeAgentContextAndContainerStorage(FileSystem fileSystem, String containerName) throws IOException {
+ NodeAgentContext context = new NodeAgentContextImpl.Builder(containerName + ".domain.tld")
+ .fileSystem(fileSystem).build();
- @After
- public void after() {
- terminal.verifyAllCommandsExecuted();
- }
+ Path containerVespaHomeOnHost = context.pathOnHostFromPathInNode(context.pathInNodeUnderVespaHome(""));
+ Files.createDirectories(context.pathOnHostFromPathInNode("/etc/something"));
+ Files.createFile(context.pathOnHostFromPathInNode("/etc/something/conf"));
+
+ Files.createDirectories(containerVespaHomeOnHost.resolve("logs/vespa"));
+ Files.createFile(containerVespaHomeOnHost.resolve("logs/vespa/vespa.log"));
+ Files.createFile(containerVespaHomeOnHost.resolve("logs/vespa/zookeeper.log"));
+
+ Files.createDirectories(containerVespaHomeOnHost.resolve("var/db"));
+ Files.createFile(containerVespaHomeOnHost.resolve("var/db/some-file"));
+
+ Path containerRootOnHost = context.pathOnHostFromPathInNode("/");
+ Set<String> actualContents = FileFinder.files(containerRootOnHost)
+ .stream()
+ .map(fileAttributes -> containerRootOnHost.relativize(fileAttributes.path()).toString())
+ .collect(Collectors.toSet());
+ Set<String> expectedContents = Set.of(
+ "etc/something/conf",
+ "opt/vespa/logs/vespa/vespa.log",
+ "opt/vespa/logs/vespa/zookeeper.log",
+ "opt/vespa/var/db/some-file");
+ assertEquals(expectedContents, actualContents);
+ return context;
}
- public static class ArchiveContainerDataTests {
- @Test
- public void archive_container_data_test() throws IOException {
- // Create some files in containers
- FileSystem fileSystem = TestFileSystem.create();
- NodeAgentContext context1 = createNodeAgentContextAndContainerStorage(fileSystem, "container-1");
- createNodeAgentContextAndContainerStorage(fileSystem, "container-2");
-
- Path pathToArchiveDir = fileSystem.getPath("/home/docker/container-storage/container-archive");
- Files.createDirectories(pathToArchiveDir);
-
- Path containerStorageRoot = context1.pathOnHostFromPathInNode("/").getParent();
- Set<String> containerStorageRootContentsBeforeArchive = FileFinder.from(containerStorageRoot)
- .maxDepth(1)
- .stream()
- .map(FileFinder.FileAttributes::filename)
- .collect(Collectors.toSet());
- assertEquals(Set.of("container-archive", "container-1", "container-2"), containerStorageRootContentsBeforeArchive);
-
-
- // Archive container-1
- ManualClock clock = new ManualClock(Instant.ofEpochSecond(1234567890));
- StorageMaintainer storageMaintainer = new StorageMaintainer(null, null, pathToArchiveDir, null, clock);
- storageMaintainer.archiveNodeStorage(context1);
-
- clock.advance(Duration.ofSeconds(3));
- storageMaintainer.archiveNodeStorage(context1);
-
- // container-1 should be gone from container-storage
- Set<String> containerStorageRootContentsAfterArchive = FileFinder.from(containerStorageRoot)
- .maxDepth(1)
- .stream()
- .map(FileFinder.FileAttributes::filename)
- .collect(Collectors.toSet());
- assertEquals(Set.of("container-archive", "container-2"), containerStorageRootContentsAfterArchive);
-
- // container archive directory should contain exactly 1 directory - the one we just archived
- List<FileFinder.FileAttributes> containerArchiveContentsAfterArchive = FileFinder.from(pathToArchiveDir).maxDepth(1).list();
- assertEquals(1, containerArchiveContentsAfterArchive.size());
- Path archivedContainerStoragePath = containerArchiveContentsAfterArchive.get(0).path();
- assertEquals("container-1_20090213233130", archivedContainerStoragePath.getFileName().toString());
- Set<String> archivedContainerStorageContents = FileFinder.files(archivedContainerStoragePath)
- .stream()
- .map(fileAttributes -> archivedContainerStoragePath.relativize(fileAttributes.path()).toString())
- .collect(Collectors.toSet());
- assertEquals(Set.of("opt/vespa/logs/vespa/vespa.log", "opt/vespa/logs/vespa/zookeeper.log"), archivedContainerStorageContents);
- }
-
- private NodeAgentContext createNodeAgentContextAndContainerStorage(FileSystem fileSystem, String containerName) throws IOException {
- NodeAgentContext context = new NodeAgentContextImpl.Builder(containerName + ".domain.tld")
- .fileSystem(fileSystem).build();
-
- Path containerVespaHomeOnHost = context.pathOnHostFromPathInNode(context.pathInNodeUnderVespaHome(""));
- Files.createDirectories(context.pathOnHostFromPathInNode("/etc/something"));
- Files.createFile(context.pathOnHostFromPathInNode("/etc/something/conf"));
-
- Files.createDirectories(containerVespaHomeOnHost.resolve("logs/vespa"));
- Files.createFile(containerVespaHomeOnHost.resolve("logs/vespa/vespa.log"));
- Files.createFile(containerVespaHomeOnHost.resolve("logs/vespa/zookeeper.log"));
-
- Files.createDirectories(containerVespaHomeOnHost.resolve("var/db"));
- Files.createFile(containerVespaHomeOnHost.resolve("var/db/some-file"));
-
- Path containerRootOnHost = context.pathOnHostFromPathInNode("/");
- Set<String> actualContents = FileFinder.files(containerRootOnHost)
- .stream()
- .map(fileAttributes -> containerRootOnHost.relativize(fileAttributes.path()).toString())
- .collect(Collectors.toSet());
- Set<String> expectedContents = Set.of(
- "etc/something/conf",
- "opt/vespa/logs/vespa/vespa.log",
- "opt/vespa/logs/vespa/zookeeper.log",
- "opt/vespa/var/db/some-file");
- assertEquals(expectedContents, actualContents);
- return context;
- }
- }
+ @Test
+ public void not_run_if_not_enough_used() throws IOException {
+ NodeAgentContext context = new NodeAgentContextImpl.Builder(
+ NodeSpec.Builder.testSpec("h123a.domain.tld").resources(new NodeResources(1, 1, 1, 1)).build())
+ .fileSystem(fileSystem).build();
+ Files.createDirectories(context.pathOnHostFromPathInNode("/"));
+ mockDiskUsage(500L);
- public static class CleanupTests {
+ storageMaintainer.cleanDiskIfFull(context);
+ verifyNoInteractions(diskCleanup);
+ }
- private final TestTerminal terminal = new TestTerminal();
- private final DiskCleanup diskCleanup = mock(DiskCleanup.class);
- private final ManualClock clock = new ManualClock();
- private final StorageMaintainer storageMaintainer = new StorageMaintainer(terminal, null, null, diskCleanup, clock);
- private final FileSystem fileSystem = TestFileSystem.create();
- private final NodeAgentContext context = new NodeAgentContextImpl
- .Builder(NodeSpec.Builder.testSpec("h123a.domain.tld").resources(new NodeResources(1, 1, 1, 1)).build())
+ @Test
+ public void deletes_correct_amount() throws IOException {
+ NodeAgentContext context = new NodeAgentContextImpl.Builder(
+ NodeSpec.Builder.testSpec("h123a.domain.tld").resources(new NodeResources(1, 1, 1, 1)).build())
.fileSystem(fileSystem).build();
- @Test
- public void not_run_if_not_enough_used() throws IOException {
- Files.createDirectories(context.pathOnHostFromPathInNode("/"));
- mockDiskUsage(500L);
+ Files.createDirectories(context.pathOnHostFromPathInNode("/"));
+ mockDiskUsage(950_000L);
- storageMaintainer.cleanDiskIfFull(context);
- verifyNoInteractions(diskCleanup);
- }
-
- @Test
- public void deletes_correct_amount() throws IOException {
- Files.createDirectories(context.pathOnHostFromPathInNode("/"));
- mockDiskUsage(950_000L);
+ storageMaintainer.cleanDiskIfFull(context);
+ // Allocated size: 1 GB, usage: 950_000 kiB (972.8 MB). Wanted usage: 70% => 700 MB
+ verify(diskCleanup).cleanup(eq(context), any(), eq(272_800_000L));
+ }
- storageMaintainer.cleanDiskIfFull(context);
- // Allocated size: 1 GB, usage: 950_000 kiB (972.8 MB). Wanted usage: 70% => 700 MB
- verify(diskCleanup).cleanup(eq(context), any(), eq(272_800_000L));
- }
+ @After
+ public void after() {
+ terminal.verifyAllCommandsExecuted();
+ }
- private void mockDiskUsage(long kBytes) {
- terminal.expectCommand("du -xsk /home/docker/container-storage/h123a 2>&1", 0, kBytes + "\t/path");
- }
+ private void mockDiskUsage(long kBytes) {
+ terminal.expectCommand("du -xsk /home/docker/container-storage/h123a 2>&1", 0, kBytes + "\t/path");
}
}
diff --git a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/maintenance/sync/SyncFileInfoTest.java b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/maintenance/sync/SyncFileInfoTest.java
index 0d596a46d77..e32fc8e5355 100644
--- a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/maintenance/sync/SyncFileInfoTest.java
+++ b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/maintenance/sync/SyncFileInfoTest.java
@@ -4,19 +4,15 @@ package com.yahoo.vespa.hosted.node.admin.maintenance.sync;
import com.yahoo.config.provision.ApplicationId;
import com.yahoo.config.provision.HostName;
import com.yahoo.vespa.test.file.TestFileSystem;
-import org.junit.BeforeClass;
import org.junit.Test;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.UncheckedIOException;
import java.nio.file.FileSystem;
-import java.nio.file.Files;
import java.nio.file.Path;
-import java.nio.file.Paths;
+import java.util.Optional;
+import static com.yahoo.vespa.hosted.node.admin.maintenance.sync.SyncFileInfo.Compression.NONE;
+import static com.yahoo.vespa.hosted.node.admin.maintenance.sync.SyncFileInfo.Compression.ZSTD;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
/**
* @author freva
@@ -26,54 +22,41 @@ public class SyncFileInfoTest {
private static final FileSystem fileSystem = TestFileSystem.create();
private static final String bucket = "logs-region-acdf21";
- private static final ApplicationId application = ApplicationId.from("tenant", "application", "instance");
+ private static final ApplicationId application = ApplicationId.from("ten", "app", "ins");
private static final HostName hostname = HostName.from("h12352a.env.region-1.vespa.domain.example");
- private static final Path accessLogPath = fileSystem.getPath("/opt/vespa/logs/qrs/access.json-20210212.zst");
- private static final Path vespaLogPath = fileSystem.getPath("/opt/vespa/logs/vespa.log-2021-02-12");
+ private static final Path accessLogPath1 = fileSystem.getPath("/opt/vespa/logs/qrs/access.log.20210211");
+ private static final Path accessLogPath2 = fileSystem.getPath("/opt/vespa/logs/qrs/access.log.20210212.zst");
+ private static final Path accessLogPath3 = fileSystem.getPath("/opt/vespa/logs/qrs/access-json.log.20210213.zst");
+ private static final Path accessLogPath4 = fileSystem.getPath("/opt/vespa/logs/qrs/JsonAccessLog.default.20210214.zst");
+ private static final Path connectionLogPath1 = fileSystem.getPath("/opt/vespa/logs/qrs/ConnectionLog.default.20210210");
+ private static final Path connectionLogPath2 = fileSystem.getPath("/opt/vespa/logs/qrs/ConnectionLog.default.20210212.zst");
+ private static final Path vespaLogPath1 = fileSystem.getPath("/opt/vespa/logs/vespa.log");
+ private static final Path vespaLogPath2 = fileSystem.getPath("/opt/vespa/logs/vespa.log-2021-02-12");
@Test
- public void tenant_access_log() {
- SyncFileInfo sfi = SyncFileInfo.tenantAccessLog(bucket, application, hostname, accessLogPath);
- assertEquals(Paths.get("/tenant.application.instance/h12352a/logs/access/access.json-20210212.zst"), sfi.destPath());
- assertEquals(bucket, sfi.bucketName());
- assertNotEquals(ZstdCompressingInputStream.class, getInputStreamType(sfi));
- }
+ public void tenant_log() {
+ assertTenantSyncFileInfo(accessLogPath1, null, null);
+ assertTenantSyncFileInfo(accessLogPath2, "ten/app/ins/h12352a/logs/access/access.log.20210212.zst", NONE);
+ assertTenantSyncFileInfo(accessLogPath3, "ten/app/ins/h12352a/logs/access/access-json.log.20210213.zst", NONE);
+ assertTenantSyncFileInfo(accessLogPath4, "ten/app/ins/h12352a/logs/access/JsonAccessLog.default.20210214.zst", NONE);
- @Test
- public void tenant_vespa_log() {
- SyncFileInfo sfi = SyncFileInfo.tenantVespaLog(bucket, application, hostname, vespaLogPath);
- assertEquals(Paths.get("/tenant.application.instance/h12352a/logs/vespa/vespa.log-2021-02-12.zst"), sfi.destPath());
- assertEquals(ZstdCompressingInputStream.class, getInputStreamType(sfi));
- }
+ assertTenantSyncFileInfo(connectionLogPath1, null, null);
+ assertTenantSyncFileInfo(connectionLogPath2, "ten/app/ins/h12352a/logs/connection/ConnectionLog.default.20210212.zst", NONE);
- @Test
- public void infra_access_log() {
- SyncFileInfo sfi = SyncFileInfo.infrastructureAccessLog(bucket, hostname, accessLogPath);
- assertEquals(Paths.get("/infrastructure/h12352a/logs/access/access.json-20210212.zst"), sfi.destPath());
- assertNotEquals(ZstdCompressingInputStream.class, getInputStreamType(sfi));
+ assertTenantSyncFileInfo(vespaLogPath1, null, null);
+ assertTenantSyncFileInfo(vespaLogPath2, "ten/app/ins/h12352a/logs/vespa/vespa.log-2021-02-12.zst", ZSTD);
}
@Test
public void infra_vespa_log() {
- SyncFileInfo sfi = SyncFileInfo.infrastructureVespaLog(bucket, hostname, vespaLogPath);
- assertEquals(Paths.get("/infrastructure/h12352a/logs/vespa/vespa.log-2021-02-12.zst"), sfi.destPath());
- assertEquals(ZstdCompressingInputStream.class, getInputStreamType(sfi));
+ SyncFileInfo sfi = SyncFileInfo.infrastructureVespaLog(bucket, hostname, vespaLogPath2);
+ assertEquals("infrastructure/h12352a/logs/vespa/vespa.log-2021-02-12.zst", sfi.destPath().toString());
+ assertEquals(ZSTD, sfi.uploadCompression());
}
- @BeforeClass
- public static void setup() throws IOException {
- Files.createDirectories(vespaLogPath.getParent());
- Files.createFile(vespaLogPath);
- Files.createDirectories(accessLogPath.getParent());
- Files.createFile(accessLogPath);
+ private static void assertTenantSyncFileInfo(Path srcPath, String destPath, SyncFileInfo.Compression compression) {
+ Optional<SyncFileInfo> sfi = SyncFileInfo.tenantLog(bucket, application, hostname, srcPath);
+ assertEquals(destPath, sfi.map(SyncFileInfo::destPath).map(Path::toString).orElse(null));
+ assertEquals(compression, sfi.map(SyncFileInfo::uploadCompression).orElse(null));
}
-
- private static Class<? extends InputStream> getInputStreamType(SyncFileInfo syncFileInfo) {
- try (InputStream inputStream = syncFileInfo.inputStream()) {
- return inputStream.getClass();
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
- }
-
-} \ No newline at end of file
+}
diff --git a/searchcore/src/tests/proton/documentdb/documentbucketmover/CMakeLists.txt b/searchcore/src/tests/proton/documentdb/documentbucketmover/CMakeLists.txt
index 1aa0b1c585d..d11da09b737 100644
--- a/searchcore/src/tests/proton/documentdb/documentbucketmover/CMakeLists.txt
+++ b/searchcore/src/tests/proton/documentdb/documentbucketmover/CMakeLists.txt
@@ -17,6 +17,18 @@ vespa_add_executable(searchcore_documentbucketmover_test_app TEST
)
vespa_add_test(NAME searchcore_documentbucketmover_test_app COMMAND searchcore_documentbucketmover_test_app)
+vespa_add_executable(searchcore_documentbucketmover_v2_test_app TEST
+ SOURCES
+ documentbucketmover_v2_test.cpp
+ DEPENDS
+ searchcore_bucketmover_test
+ searchcore_test
+ searchcore_server
+ searchcore_feedoperation
+ GTest::GTest
+)
+vespa_add_test(NAME searchcore_documentbucketmover_v2_test_app COMMAND searchcore_documentbucketmover_v2_test_app)
+
vespa_add_executable(searchcore_scaniterator_test_app TEST
SOURCES
scaniterator_test.cpp
diff --git a/searchcore/src/tests/proton/documentdb/documentbucketmover/bucketmover_common.h b/searchcore/src/tests/proton/documentdb/documentbucketmover/bucketmover_common.h
index 65e206d7327..e2cd5e268d7 100644
--- a/searchcore/src/tests/proton/documentdb/documentbucketmover/bucketmover_common.h
+++ b/searchcore/src/tests/proton/documentdb/documentbucketmover/bucketmover_common.h
@@ -82,7 +82,7 @@ struct MyDocumentRetriever : public DocumentRetrieverBaseForTest {
struct MyBucketModifiedHandler : public IBucketModifiedHandler {
using BucketId = document::BucketId;
- BucketId::List _modified;
+ std::vector<BucketId> _modified;
void notifyBucketModified(const BucketId &bucket) override;
diff --git a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp
index 502639d1dca..5556ed0d475 100644
--- a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp
@@ -112,7 +112,7 @@ struct ControllerFixtureBase : public ::testing::Test
const MoveOperationVector &docsMoved() const {
return _moveHandler._moves;
}
- const BucketId::List &bucketsModified() const {
+ const std::vector<BucketId> &bucketsModified() const {
return _modifiedHandler._modified;
}
const BucketId::List &calcAsked() const {
diff --git a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_v2_test.cpp b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_v2_test.cpp
new file mode 100644
index 00000000000..e7dc9b9e873
--- /dev/null
+++ b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_v2_test.cpp
@@ -0,0 +1,578 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "bucketmover_common.h"
+#include <vespa/searchcore/proton/server/bucketmovejobv2.h>
+#include <vespa/persistence/dummyimpl/dummy_bucket_executor.h>
+#include <vespa/vespalib/gtest/gtest.h>
+
+#include <vespa/log/log.h>
+LOG_SETUP("document_bucket_mover_test");
+
+using namespace proton;
+using namespace proton::move::test;
+using document::BucketId;
+using document::test::makeBucketSpace;
+using proton::bucketdb::BucketCreateNotifier;
+using storage::spi::BucketInfo;
+using BlockedReason = IBlockableMaintenanceJob::BlockedReason;
+using MoveOperationVector = std::vector<MoveOperation>;
+using storage::spi::dummy::DummyBucketExecutor;
+using vespalib::ThreadStackExecutor;
+
+struct ControllerFixtureBase : public ::testing::Test
+{
+ test::UserDocumentsBuilder _builder;
+ test::BucketStateCalculator::SP _calc;
+ test::ClusterStateHandler _clusterStateHandler;
+ test::BucketHandler _bucketHandler;
+ MyBucketModifiedHandler _modifiedHandler;
+ std::shared_ptr<bucketdb::BucketDBOwner> _bucketDB;
+ MySubDb _ready;
+ MySubDb _notReady;
+ BucketCreateNotifier _bucketCreateNotifier;
+ test::DiskMemUsageNotifier _diskMemUsageNotifier;
+ ThreadStackExecutor _singleExecutor;
+ ExecutorThreadService _master;
+ DummyBucketExecutor _bucketExecutor;
+ MyMoveHandler _moveHandler;
+ BucketMoveJobV2 _bmj;
+ MyCountJobRunner _runner;
+ ControllerFixtureBase(const BlockableMaintenanceJobConfig &blockableConfig, bool storeMoveDoneContexts);
+ ~ControllerFixtureBase();
+ ControllerFixtureBase &addReady(const BucketId &bucket) {
+ _calc->addReady(bucket);
+ return *this;
+ }
+ ControllerFixtureBase &remReady(const BucketId &bucket) {
+ _calc->remReady(bucket);
+ return *this;
+ }
+ ControllerFixtureBase &changeCalc() {
+ _calc->resetAsked();
+ _moveHandler.reset();
+ _modifiedHandler.reset();
+ _clusterStateHandler.notifyClusterStateChanged(_calc);
+ return *this;
+ }
+ ControllerFixtureBase &activateBucket(const BucketId &bucket) {
+ _ready.setBucketState(bucket, true);
+ _bucketHandler.notifyBucketStateChanged(bucket, BucketInfo::ActiveState::ACTIVE);
+ return *this;
+ }
+ ControllerFixtureBase &deactivateBucket(const BucketId &bucket) {
+ _ready.setBucketState(bucket, false);
+ _bucketHandler.notifyBucketStateChanged(bucket, BucketInfo::ActiveState::NOT_ACTIVE);
+ return *this;
+ }
+ const MoveOperationVector &docsMoved() const {
+ return _moveHandler._moves;
+ }
+ const std::vector<BucketId> &bucketsModified() const {
+ return _modifiedHandler._modified;
+ }
+ const BucketId::List &calcAsked() const {
+ return _calc->asked();
+ }
+ void runLoop() {
+ while (!_bmj.isBlocked() && !_bmj.run()) {
+ }
+ }
+ void sync() {
+ _bucketExecutor.sync();
+ _master.sync();
+ _master.sync(); // Handle that master schedules onto master again
+ }
+};
+
+ControllerFixtureBase::ControllerFixtureBase(const BlockableMaintenanceJobConfig &blockableConfig, bool storeMoveDoneContexts)
+ : _builder(),
+ _calc(std::make_shared<test::BucketStateCalculator>()),
+ _bucketHandler(),
+ _modifiedHandler(),
+ _bucketDB(std::make_shared<bucketdb::BucketDBOwner>()),
+ _ready(_builder.getRepo(), _bucketDB, 1, SubDbType::READY),
+ _notReady(_builder.getRepo(), _bucketDB, 2, SubDbType::NOTREADY),
+ _bucketCreateNotifier(),
+ _diskMemUsageNotifier(),
+ _singleExecutor(1, 0x10000),
+ _master(_singleExecutor),
+ _bucketExecutor(4),
+ _moveHandler(*_bucketDB, storeMoveDoneContexts),
+ _bmj(_calc, _moveHandler, _modifiedHandler, _master, _bucketExecutor, _ready._subDb,
+ _notReady._subDb, _bucketCreateNotifier, _clusterStateHandler, _bucketHandler,
+ _diskMemUsageNotifier, blockableConfig,
+ "test", makeBucketSpace()),
+ _runner(_bmj)
+{
+}
+
+ControllerFixtureBase::~ControllerFixtureBase() = default;
+constexpr double RESOURCE_LIMIT_FACTOR = 1.0;
+constexpr uint32_t MAX_OUTSTANDING_OPS = 10;
+const BlockableMaintenanceJobConfig BLOCKABLE_CONFIG(RESOURCE_LIMIT_FACTOR, MAX_OUTSTANDING_OPS);
+
+struct ControllerFixture : public ControllerFixtureBase
+{
+ ControllerFixture(const BlockableMaintenanceJobConfig &blockableConfig = BLOCKABLE_CONFIG)
+ : ControllerFixtureBase(blockableConfig, blockableConfig.getMaxOutstandingMoveOps() != MAX_OUTSTANDING_OPS)
+ {
+ _builder.createDocs(1, 1, 4); // 3 docs
+ _builder.createDocs(2, 4, 6); // 2 docs
+ _ready.insertDocs(_builder.getDocs());
+ _builder.clearDocs();
+ _builder.createDocs(3, 1, 3); // 2 docs
+ _builder.createDocs(4, 3, 6); // 3 docs
+ _notReady.insertDocs(_builder.getDocs());
+ }
+};
+
+struct OnlyReadyControllerFixture : public ControllerFixtureBase
+{
+ OnlyReadyControllerFixture() : ControllerFixtureBase(BLOCKABLE_CONFIG, false)
+ {
+ _builder.createDocs(1, 1, 2); // 1 docs
+ _builder.createDocs(2, 2, 4); // 2 docs
+ _builder.createDocs(3, 4, 7); // 3 docs
+ _builder.createDocs(4, 7, 11); // 4 docs
+ _ready.insertDocs(_builder.getDocs());
+ }
+};
+
+TEST_F(ControllerFixture, require_that_nothing_is_moved_if_bucket_state_says_so)
+{
+ EXPECT_TRUE(_bmj.done());
+ addReady(_ready.bucket(1));
+ addReady(_ready.bucket(2));
+ _bmj.recompute();
+ EXPECT_TRUE(_bmj.scanAndMove(4, 3));
+ EXPECT_TRUE(_bmj.done());
+ EXPECT_TRUE(docsMoved().empty());
+ EXPECT_TRUE(bucketsModified().empty());
+}
+
+TEST_F(ControllerFixture, require_that_not_ready_bucket_is_moved_to_ready_if_bucket_state_says_so)
+{
+ // bucket 4 should be moved
+ addReady(_ready.bucket(1));
+ addReady(_ready.bucket(2));
+ addReady(_notReady.bucket(4));
+ _bmj.recompute();
+ EXPECT_FALSE(_bmj.done());
+ EXPECT_TRUE(_bmj.scanAndMove(4, 3));
+ EXPECT_TRUE(_bmj.done());
+ sync();
+ EXPECT_EQ(3u, docsMoved().size());
+ assertEqual(_notReady.bucket(4), _notReady.docs(4)[0], 2, 1, docsMoved()[0]);
+ assertEqual(_notReady.bucket(4), _notReady.docs(4)[1], 2, 1, docsMoved()[1]);
+ assertEqual(_notReady.bucket(4), _notReady.docs(4)[2], 2, 1, docsMoved()[2]);
+ ASSERT_EQ(1u, bucketsModified().size());
+ EXPECT_EQ(_notReady.bucket(4), bucketsModified()[0]);
+}
+
+TEST_F(ControllerFixture, require_that_ready_bucket_is_moved_to_not_ready_if_bucket_state_says_so)
+{
+ // bucket 2 should be moved
+ addReady(_ready.bucket(1));
+ _bmj.recompute();
+ EXPECT_FALSE(_bmj.done());
+ EXPECT_TRUE(_bmj.scanAndMove(4, 3));
+ EXPECT_TRUE(_bmj.done());
+ sync();
+ EXPECT_EQ(2u, docsMoved().size());
+ assertEqual(_ready.bucket(2), _ready.docs(2)[0], 1, 2, docsMoved()[0]);
+ assertEqual(_ready.bucket(2), _ready.docs(2)[1], 1, 2, docsMoved()[1]);
+ EXPECT_EQ(1u, bucketsModified().size());
+ EXPECT_EQ(_ready.bucket(2), bucketsModified()[0]);
+}
+
+
+TEST_F(ControllerFixture, require_that_we_move_buckets_in_several_steps)
+{
+ // bucket 2, 3, and 4 should be moved
+ addReady(_ready.bucket(1));
+ addReady(_notReady.bucket(3));
+ addReady(_notReady.bucket(4));
+
+ _bmj.recompute();
+ EXPECT_FALSE(_bmj.done());
+
+ EXPECT_FALSE(_bmj.scanAndMove(1, 2));
+ EXPECT_FALSE(_bmj.done());
+ sync();
+ EXPECT_EQ(2u, docsMoved().size());
+
+ EXPECT_FALSE(_bmj.scanAndMove(1, 2));
+ EXPECT_FALSE(_bmj.done());
+ sync();
+ EXPECT_EQ(4u, docsMoved().size());
+
+ EXPECT_FALSE(_bmj.scanAndMove(1, 2));
+ EXPECT_FALSE(_bmj.done());
+ sync();
+ EXPECT_EQ(6u, docsMoved().size());
+
+ // move bucket 4, docs 3
+ EXPECT_TRUE(_bmj.scanAndMove(1,2));
+ EXPECT_TRUE(_bmj.done());
+ sync();
+ EXPECT_EQ(7u, docsMoved().size());
+ EXPECT_EQ(3u, bucketsModified().size());
+ EXPECT_EQ(_ready.bucket(2), bucketsModified()[0]);
+ EXPECT_EQ(_notReady.bucket(3), bucketsModified()[1]);
+ EXPECT_EQ(_notReady.bucket(4), bucketsModified()[2]);
+}
+
+TEST_F(ControllerFixture, require_that_last_bucket_is_moved_before_reporting_done)
+{
+ // bucket 4 should be moved
+ addReady(_ready.bucket(1));
+ addReady(_ready.bucket(2));
+ addReady(_notReady.bucket(4));
+ _bmj.recompute();
+ EXPECT_FALSE(_bmj.done());
+
+ EXPECT_FALSE(_bmj.scanAndMove(1, 1));
+ EXPECT_FALSE(_bmj.done());
+ sync();
+ EXPECT_EQ(1u, docsMoved().size());
+ EXPECT_EQ(4u, calcAsked().size());
+ EXPECT_TRUE(_bmj.scanAndMove(1, 2));
+ EXPECT_TRUE(_bmj.done());
+ sync();
+ EXPECT_EQ(3u, docsMoved().size());
+ EXPECT_EQ(4u, calcAsked().size());
+}
+
+
+TEST_F(ControllerFixture, require_that_active_bucket_is_not_moved_from_ready_to_not_ready_until_being_not_active)
+{
+ // bucket 1 should be moved but is active
+ addReady(_ready.bucket(2));
+ _bmj.recompute();
+ EXPECT_FALSE(_bmj.done());
+ activateBucket(_ready.bucket(1));
+ EXPECT_TRUE(_bmj.scanAndMove(4, 3)); // scan all, delay active bucket 1
+ EXPECT_TRUE(_bmj.done());
+ sync();
+ EXPECT_EQ(0u, docsMoved().size());
+ EXPECT_EQ(0u, bucketsModified().size());
+
+ deactivateBucket(_ready.bucket(1));
+ EXPECT_FALSE(_bmj.done());
+ EXPECT_TRUE(_bmj.scanAndMove(4, 3)); // move delayed and de-activated bucket 1
+ EXPECT_TRUE(_bmj.done());
+ sync();
+ EXPECT_EQ(3u, docsMoved().size());
+ EXPECT_EQ(1u, bucketsModified().size());
+ EXPECT_EQ(_ready.bucket(1), bucketsModified()[0]);
+}
+
+TEST_F(ControllerFixture, require_that_current_bucket_moving_is_cancelled_when_we_change_calculator)
+{
+ // bucket 1 should be moved
+ addReady(_ready.bucket(2));
+ _bmj.recompute();
+ _bmj.scanAndMove(1, 1);
+ EXPECT_FALSE(_bmj.done());
+ sync();
+ EXPECT_EQ(1u, docsMoved().size());
+ EXPECT_EQ(4u, calcAsked().size());
+ changeCalc(); // Not cancelled, bucket 1 still moving to notReady
+ EXPECT_EQ(4u, calcAsked().size());
+ EXPECT_EQ(_ready.bucket(1), calcAsked()[0]);
+ _calc->resetAsked();
+ _bmj.scanAndMove(1, 1);
+ EXPECT_FALSE(_bmj.done());
+ sync();
+ EXPECT_EQ(1u, docsMoved().size());
+ EXPECT_EQ(0u, calcAsked().size());
+ addReady(_ready.bucket(1));
+ changeCalc(); // cancelled, bucket 1 no longer moving to notReady
+ EXPECT_EQ(4u, calcAsked().size());
+ EXPECT_EQ(_ready.bucket(1), calcAsked()[0]);
+ _calc->resetAsked();
+ remReady(_ready.bucket(1));
+ _calc->resetAsked();
+ changeCalc(); // not cancelled. No active bucket move
+ EXPECT_EQ(4u, calcAsked().size());
+ _bmj.scanAndMove(1, 1);
+ sync();
+ EXPECT_EQ(1u, docsMoved().size());
+ EXPECT_EQ(4u, calcAsked().size());
+ EXPECT_EQ(_ready.bucket(2), calcAsked()[1]);
+ EXPECT_EQ(_notReady.bucket(3), calcAsked()[2]);
+ _bmj.scanAndMove(2, 3);
+ EXPECT_TRUE(_bmj.done());
+ sync();
+ EXPECT_EQ(3u, docsMoved().size());
+ EXPECT_EQ(4u, calcAsked().size());
+ EXPECT_EQ(_notReady.bucket(4), calcAsked()[3]);
+ EXPECT_EQ(_ready.bucket(1), calcAsked()[0]);
+}
+
+TEST_F(ControllerFixture, require_that_de_activated_bucket_is_not_moved_if_new_calculator_does_not_say_so)
+{
+ // bucket 1 should be moved
+ addReady(_ready.bucket(2));
+ _bmj.recompute();
+ activateBucket(_ready.bucket(1));
+ _bmj.scanAndMove(4, 3); // scan all, delay active bucket 1
+ sync();
+ EXPECT_EQ(0u, docsMoved().size());
+ EXPECT_EQ(0u, bucketsModified().size());
+
+ deactivateBucket(_ready.bucket(1));
+ addReady(_ready.bucket(1));
+ changeCalc();
+ _bmj.scanAndMove(4, 3); // consider delayed bucket 3
+ sync();
+ EXPECT_EQ(0u, docsMoved().size());
+ EXPECT_EQ(0u, bucketsModified().size());
+ EXPECT_EQ(4u, calcAsked().size());
+ EXPECT_EQ(_ready.bucket(1), calcAsked()[0]);
+}
+
+TEST_F(ControllerFixture, ready_bucket_not_moved_to_not_ready_if_node_is_marked_as_retired)
+{
+ _calc->setNodeRetired(true);
+ // Bucket 2 would be moved from ready to not ready in a non-retired case, but not when retired.
+ addReady(_ready.bucket(1));
+ _bmj.recompute();
+ _bmj.scanAndMove(4, 3);
+ EXPECT_TRUE(_bmj.done());
+ sync();
+ EXPECT_EQ(0u, docsMoved().size());
+}
+
+// Technically this should never happen since a retired node is never in the ideal state,
+// but test this case for the sake of completion.
+TEST_F(ControllerFixture, inactive_not_ready_bucket_not_moved_to_ready_if_node_is_marked_as_retired)
+{
+ _calc->setNodeRetired(true);
+ addReady(_ready.bucket(1));
+ addReady(_ready.bucket(2));
+ addReady(_notReady.bucket(3));
+ _bmj.recompute();
+ _bmj.scanAndMove(4, 3);
+ EXPECT_TRUE(_bmj.done());
+ sync();
+ EXPECT_EQ(0u, docsMoved().size());
+}
+
+TEST_F(ControllerFixture, explicitly_active_not_ready_bucket_can_be_moved_to_ready_even_if_node_is_marked_as_retired)
+{
+ _calc->setNodeRetired(true);
+ addReady(_ready.bucket(1));
+ addReady(_ready.bucket(2));
+ addReady(_notReady.bucket(3));
+ _bmj.recompute();
+ activateBucket(_notReady.bucket(3));
+ _bmj.scanAndMove(4, 3);
+ EXPECT_TRUE(_bmj.done());
+ sync();
+ ASSERT_EQ(2u, docsMoved().size());
+ assertEqual(_notReady.bucket(3), _notReady.docs(3)[0], 2, 1, docsMoved()[0]);
+ assertEqual(_notReady.bucket(3), _notReady.docs(3)[1], 2, 1, docsMoved()[1]);
+ ASSERT_EQ(1u, bucketsModified().size());
+ EXPECT_EQ(_notReady.bucket(3), bucketsModified()[0]);
+}
+
+
+TEST_F(ControllerFixture, require_that_notifyCreateBucket_causes_bucket_to_be_reconsidered_by_job)
+{
+ EXPECT_TRUE(_bmj.done());
+ addReady(_ready.bucket(1));
+ addReady(_ready.bucket(2));
+ runLoop();
+ EXPECT_TRUE(_bmj.done());
+ sync();
+ EXPECT_TRUE(docsMoved().empty());
+ EXPECT_TRUE(bucketsModified().empty());
+ addReady(_notReady.bucket(3)); // bucket 3 now ready, no notify
+ EXPECT_TRUE(_bmj.done()); // move job still believes work done
+ sync();
+ EXPECT_TRUE(bucketsModified().empty());
+ _bmj.notifyCreateBucket(_bucketDB->takeGuard(), _notReady.bucket(3)); // reconsider bucket 3
+ EXPECT_FALSE(_bmj.done());
+ EXPECT_TRUE(bucketsModified().empty());
+ sync();
+ EXPECT_TRUE(bucketsModified().empty());
+ runLoop();
+ EXPECT_TRUE(_bmj.done());
+ sync();
+
+ EXPECT_EQ(1u, bucketsModified().size());
+ EXPECT_EQ(2u, docsMoved().size());
+}
+
+
+struct ResourceLimitControllerFixture : public ControllerFixture
+{
+ ResourceLimitControllerFixture(double resourceLimitFactor = RESOURCE_LIMIT_FACTOR) :
+ ControllerFixture(BlockableMaintenanceJobConfig(resourceLimitFactor, MAX_OUTSTANDING_OPS))
+ {}
+
+ void testJobStopping(DiskMemUsageState blockingUsageState) {
+ // Bucket 1 should be moved
+ addReady(_ready.bucket(2));
+ _bmj.recompute();
+ EXPECT_FALSE(_bmj.done());
+ // Note: This depends on _bmj.run() moving max 1 documents
+ EXPECT_FALSE(_bmj.run());
+ sync();
+ EXPECT_EQ(1u, docsMoved().size());
+ EXPECT_EQ(0u, bucketsModified().size());
+ // Notify that we've over limit
+ _diskMemUsageNotifier.notify(blockingUsageState);
+ EXPECT_TRUE(_bmj.run());
+ sync();
+ EXPECT_EQ(1u, docsMoved().size());
+ EXPECT_EQ(0u, bucketsModified().size());
+ // Notify that we've under limit
+ _diskMemUsageNotifier.notify(DiskMemUsageState());
+ EXPECT_FALSE(_bmj.run());
+ sync();
+ EXPECT_EQ(2u, docsMoved().size());
+ EXPECT_EQ(0u, bucketsModified().size());
+ }
+
+ void testJobNotStopping(DiskMemUsageState blockingUsageState) {
+ // Bucket 1 should be moved
+ addReady(_ready.bucket(2));
+ _bmj.recompute();
+ EXPECT_FALSE(_bmj.done());
+ // Note: This depends on _bmj.run() moving max 1 documents
+ EXPECT_FALSE(_bmj.run());
+ sync();
+ EXPECT_EQ(1u, docsMoved().size());
+ EXPECT_EQ(0u, bucketsModified().size());
+ // Notify that we've over limit, but not over adjusted limit
+ _diskMemUsageNotifier.notify(blockingUsageState);
+ EXPECT_FALSE(_bmj.run());
+ sync();
+ EXPECT_EQ(2u, docsMoved().size());
+ EXPECT_EQ(0u, bucketsModified().size());
+ }
+};
+
+struct ResourceLimitControllerFixture_1_2 : public ResourceLimitControllerFixture {
+ ResourceLimitControllerFixture_1_2() : ResourceLimitControllerFixture(1.2) {}
+};
+
+TEST_F(ResourceLimitControllerFixture, require_that_bucket_move_stops_when_disk_limit_is_reached)
+{
+ testJobStopping(DiskMemUsageState(ResourceUsageState(0.7, 0.8), ResourceUsageState()));
+}
+
+TEST_F(ResourceLimitControllerFixture, require_that_bucket_move_stops_when_memory_limit_is_reached)
+{
+ testJobStopping(DiskMemUsageState(ResourceUsageState(), ResourceUsageState(0.7, 0.8)));
+}
+
+TEST_F(ResourceLimitControllerFixture_1_2, require_that_bucket_move_uses_resource_limit_factor_for_disk_resource_limit)
+{
+ testJobNotStopping(DiskMemUsageState(ResourceUsageState(0.7, 0.8), ResourceUsageState()));
+}
+
+TEST_F(ResourceLimitControllerFixture_1_2, require_that_bucket_move_uses_resource_limit_factor_for_memory_resource_limit)
+{
+ testJobNotStopping(DiskMemUsageState(ResourceUsageState(), ResourceUsageState(0.7, 0.8)));
+}
+
+
+struct MaxOutstandingMoveOpsFixture : public ControllerFixtureBase
+{
+ MaxOutstandingMoveOpsFixture(uint32_t maxOutstandingOps)
+ : ControllerFixtureBase(BlockableMaintenanceJobConfig(RESOURCE_LIMIT_FACTOR, maxOutstandingOps), true)
+ {
+ _builder.createDocs(1, 1, 2);
+ _builder.createDocs(2, 2, 3);
+ _builder.createDocs(3, 3, 4);
+ _builder.createDocs(4, 4, 5);
+ _ready.insertDocs(_builder.getDocs());
+ _builder.clearDocs();
+ _builder.createDocs(11, 1, 2);
+ _builder.createDocs(12, 2, 3);
+ _builder.createDocs(13, 3, 4);
+ _builder.createDocs(14, 4, 5);
+ _notReady.insertDocs(_builder.getDocs());
+ addReady(_ready.bucket(3));
+ _bmj.recompute();
+ }
+
+ void assertRunToBlocked() {
+ EXPECT_TRUE(_bmj.run()); // job becomes blocked as max outstanding limit is reached
+ EXPECT_FALSE(_bmj.done());
+ EXPECT_TRUE(_bmj.isBlocked());
+ EXPECT_TRUE(_bmj.isBlocked(BlockedReason::OUTSTANDING_OPS));
+ }
+ void assertRunToNotBlocked() {
+ EXPECT_FALSE(_bmj.run());
+ EXPECT_FALSE(_bmj.done());
+ EXPECT_FALSE(_bmj.isBlocked());
+ }
+ void assertRunToFinished() {
+ EXPECT_TRUE(_bmj.run());
+ EXPECT_TRUE(_bmj.done());
+ EXPECT_FALSE(_bmj.isBlocked());
+ }
+ void assertDocsMoved(uint32_t expDocsMovedCnt, uint32_t expMoveContextsCnt) {
+ EXPECT_EQ(expDocsMovedCnt, docsMoved().size());
+ EXPECT_EQ(expMoveContextsCnt, _moveHandler._moveDoneContexts.size());
+ }
+ void unblockJob(uint32_t expRunnerCnt) {
+ _moveHandler.clearMoveDoneContexts(); // unblocks job and try to execute it via runner
+ EXPECT_EQ(expRunnerCnt, _runner.runCount);
+ EXPECT_FALSE(_bmj.isBlocked());
+ }
+};
+
+struct MaxOutstandingMoveOpsFixture_1 : public MaxOutstandingMoveOpsFixture {
+ MaxOutstandingMoveOpsFixture_1() : MaxOutstandingMoveOpsFixture(1) {}
+};
+
+struct MaxOutstandingMoveOpsFixture_2 : public MaxOutstandingMoveOpsFixture {
+ MaxOutstandingMoveOpsFixture_2() : MaxOutstandingMoveOpsFixture(2) {}
+};
+
+TEST_F(MaxOutstandingMoveOpsFixture_1, require_that_bucket_move_job_is_blocked_if_it_has_too_many_outstanding_move_operations_max_1)
+{
+ assertRunToBlocked();
+ sync();
+ assertDocsMoved(1, 1);
+ assertRunToBlocked();
+ assertDocsMoved(1, 1);
+
+ unblockJob(1);
+ assertRunToBlocked();
+ sync();
+ assertDocsMoved(2, 1);
+
+ unblockJob(2);
+ assertRunToBlocked();
+ sync();
+ assertDocsMoved(3, 1);
+
+ unblockJob(3);
+ assertRunToFinished();
+ sync();
+ assertDocsMoved(3, 0);
+}
+
+TEST_F(MaxOutstandingMoveOpsFixture_2, require_that_bucket_move_job_is_blocked_if_it_has_too_many_outstanding_move_operations_max_2)
+{
+ assertRunToNotBlocked();
+ sync();
+ assertDocsMoved(1, 1);
+
+ assertRunToBlocked();
+ sync();
+ assertDocsMoved(2, 2);
+
+ unblockJob(2);
+ assertRunToFinished();
+ sync();
+ assertDocsMoved(3, 1);
+}
+
+GTEST_MAIN_RUN_ALL_TESTS()
diff --git a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentmover_test.cpp b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentmover_test.cpp
index 420f45d99e8..ae504fef603 100644
--- a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentmover_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentmover_test.cpp
@@ -43,7 +43,7 @@ struct DocumentMoverTest : ::testing::Test
: _builder(),
_bucketDB(std::make_shared<bucketdb::BucketDBOwner>()),
_limiter(),
- _mover(_limiter),
+ _mover(_limiter, _bucketDb),
_source(_builder, _bucketDB, 0u, SubDbType::READY),
_bucketDb(),
_handler(_bucketDb)
@@ -58,7 +58,7 @@ struct DocumentMoverTest : ::testing::Test
_source._subDb.retriever(),
_source._subDb.feed_view(),
&_pendingLidsForCommit);
- _mover.setupForBucket(bucket, &_source._subDb, targetSubDbId, _handler, _bucketDb);
+ _mover.setupForBucket(bucket, &_source._subDb, targetSubDbId, _handler);
}
bool moveDocuments(size_t maxDocsToMove) {
return _mover.moveDocuments(maxDocsToMove);
@@ -68,7 +68,7 @@ struct DocumentMoverTest : ::testing::Test
TEST_F(DocumentMoverTest, require_that_initial_bucket_mover_is_done)
{
MyMoveOperationLimiter limiter;
- DocumentBucketMover mover(limiter);
+ DocumentBucketMover mover(limiter, _bucketDb);
EXPECT_TRUE(mover.bucketDone());
mover.moveDocuments(2);
EXPECT_TRUE(mover.bucketDone());
diff --git a/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt
index 26fba8a780f..86d238c7554 100644
--- a/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt
+++ b/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt
@@ -6,6 +6,7 @@ vespa_add_library(searchcore_server STATIC
bootstrapconfigmanager.cpp
buckethandler.cpp
bucketmovejob.cpp
+ bucketmovejobv2.cpp
clusterstatehandler.cpp
combiningfeedview.cpp
ddbstate.cpp
diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp
index 8b9b3c539d6..30b1e8b623c 100644
--- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp
@@ -64,8 +64,7 @@ BucketMoveJob::checkBucket(const BucketId &bucket,
const MaintenanceDocumentSubDB &target(wantReady ? _ready : _notReady);
LOG(debug, "checkBucket(): mover.setupForBucket(%s, source:%u, target:%u)",
bucket.toString().c_str(), source.sub_db_id(), target.sub_db_id());
- mover.setupForBucket(bucket, &source, target.sub_db_id(),
- _moveHandler, _ready.meta_store()->getBucketDB());
+ mover.setupForBucket(bucket, &source, target.sub_db_id(), _moveHandler);
}
BucketMoveJob::ScanResult
@@ -151,7 +150,7 @@ BucketMoveJob(const IBucketStateCalculator::SP &calc,
_modifiedHandler(modifiedHandler),
_ready(ready),
_notReady(notReady),
- _mover(getLimiter()),
+ _mover(getLimiter(), _ready.meta_store()->getBucketDB()),
_doneScan(false),
_scanPos(),
_scanPass(ScanPass::FIRST),
@@ -161,7 +160,7 @@ BucketMoveJob(const IBucketStateCalculator::SP &calc,
_delayedBucketsFrozen(),
_frozenBuckets(frozenBuckets),
_bucketCreateNotifier(bucketCreateNotifier),
- _delayedMover(getLimiter()),
+ _delayedMover(getLimiter(), _ready.meta_store()->getBucketDB()),
_clusterStateChangedNotifier(clusterStateChangedNotifier),
_bucketStateChangedNotifier(bucketStateChangedNotifier),
_diskMemUsageNotifier(diskMemUsageNotifier)
diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp
new file mode 100644
index 00000000000..77942b72478
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp
@@ -0,0 +1,368 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "bucketmovejobv2.h"
+#include "imaintenancejobrunner.h"
+#include "ibucketstatechangednotifier.h"
+#include "iclusterstatechangednotifier.h"
+#include "maintenancedocumentsubdb.h"
+#include "i_disk_mem_usage_notifier.h"
+#include "ibucketmodifiedhandler.h"
+#include "move_operation_limiter.h"
+#include "document_db_maintenance_config.h"
+#include <vespa/searchcore/proton/bucketdb/i_bucket_create_notifier.h>
+#include <vespa/searchcore/proton/feedoperation/moveoperation.h>
+#include <vespa/searchcore/proton/documentmetastore/i_document_meta_store.h>
+#include <vespa/searchcorespi/index/i_thread_service.h>
+#include <vespa/persistence/spi/bucket_tasks.h>
+#include <vespa/vespalib/util/destructor_callbacks.h>
+#include <vespa/vespalib/util/lambdatask.h>
+
+#include <vespa/log/log.h>
+LOG_SETUP(".proton.server.bucketmovejob");
+
+using document::BucketId;
+using storage::spi::BucketInfo;
+using storage::spi::Bucket;
+using storage::spi::makeBucketTask;
+using proton::bucketdb::BucketMover;
+using vespalib::makeLambdaTask;
+
+namespace proton {
+
+namespace {
+
+const char * bool2str(bool v) { return (v ? "T" : "F"); }
+
+bool
+blockedDueToClusterState(const IBucketStateCalculator::SP &calc)
+{
+ bool clusterUp = calc && calc->clusterUp();
+ bool nodeUp = calc && calc->nodeUp();
+ bool nodeInitializing = calc && calc->nodeInitializing();
+ return !(clusterUp && nodeUp && !nodeInitializing);
+}
+
+}
+
+BucketMoveJobV2::BucketMoveJobV2(const IBucketStateCalculator::SP &calc,
+ IDocumentMoveHandler &moveHandler,
+ IBucketModifiedHandler &modifiedHandler,
+ IThreadService & master,
+ BucketExecutor & bucketExecutor,
+ const MaintenanceDocumentSubDB &ready,
+ const MaintenanceDocumentSubDB &notReady,
+ bucketdb::IBucketCreateNotifier &bucketCreateNotifier,
+ IClusterStateChangedNotifier &clusterStateChangedNotifier,
+ IBucketStateChangedNotifier &bucketStateChangedNotifier,
+ IDiskMemUsageNotifier &diskMemUsageNotifier,
+ const BlockableMaintenanceJobConfig &blockableConfig,
+ const vespalib::string &docTypeName,
+ document::BucketSpace bucketSpace)
+ : BlockableMaintenanceJob("move_buckets." + docTypeName, vespalib::duration::zero(), vespalib::duration::zero(), blockableConfig),
+ IClusterStateChangedHandler(),
+ bucketdb::IBucketCreateListener(),
+ IBucketStateChangedHandler(),
+ IDiskMemUsageListener(),
+ _calc(calc),
+ _moveHandler(moveHandler),
+ _modifiedHandler(modifiedHandler),
+ _master(master),
+ _bucketExecutor(bucketExecutor),
+ _ready(ready),
+ _notReady(notReady),
+ _bucketSpace(bucketSpace),
+ _iterateCount(0),
+ _movers(),
+ _buckets2Move(),
+ _stopped(false),
+ _startedCount(0),
+ _executedCount(0),
+ _bucketCreateNotifier(bucketCreateNotifier),
+ _clusterStateChangedNotifier(clusterStateChangedNotifier),
+ _bucketStateChangedNotifier(bucketStateChangedNotifier),
+ _diskMemUsageNotifier(diskMemUsageNotifier)
+{
+ _movers.reserve(std::min(100u, blockableConfig.getMaxOutstandingMoveOps()));
+ if (blockedDueToClusterState(_calc)) {
+ setBlocked(BlockedReason::CLUSTER_STATE);
+ }
+
+ _bucketCreateNotifier.addListener(this);
+ _clusterStateChangedNotifier.addClusterStateChangedHandler(this);
+ _bucketStateChangedNotifier.addBucketStateChangedHandler(this);
+ _diskMemUsageNotifier.addDiskMemUsageListener(this);
+ recompute();
+}
+
+BucketMoveJobV2::~BucketMoveJobV2()
+{
+ _bucketCreateNotifier.removeListener(this);
+ _clusterStateChangedNotifier.removeClusterStateChangedHandler(this);
+ _bucketStateChangedNotifier.removeBucketStateChangedHandler(this);
+ _diskMemUsageNotifier.removeDiskMemUsageListener(this);
+}
+
+BucketMoveJobV2::NeedResult
+BucketMoveJobV2::needMove(const ScanIterator &itr) const {
+ NeedResult noMove(false, false);
+ const bool hasReadyDocs = itr.hasReadyBucketDocs();
+ const bool hasNotReadyDocs = itr.hasNotReadyBucketDocs();
+ if (!hasReadyDocs && !hasNotReadyDocs) {
+ return noMove; // No documents for bucket in ready or notready subdbs
+ }
+ const bool isActive = itr.isActive();
+ // No point in moving buckets when node is retired and everything will be deleted soon.
+ // However, allow moving of explicitly activated buckets, as this implies a lack of other good replicas.
+ if (!_calc || (_calc->nodeRetired() && !isActive)) {
+ return noMove;
+ }
+ const bool shouldBeReady = _calc->shouldBeReady(document::Bucket(_bucketSpace, itr.getBucket()));
+ const bool wantReady = shouldBeReady || isActive;
+ LOG(spam, "checkBucket(): bucket(%s), shouldBeReady(%s), active(%s)",
+ itr.getBucket().toString().c_str(), bool2str(shouldBeReady), bool2str(isActive));
+ if (wantReady) {
+ if (!hasNotReadyDocs)
+ return noMove; // No notready bucket to make ready
+ } else {
+ if (isActive)
+ return noMove; // Do not move rom ready to not ready when active
+ if (!hasReadyDocs)
+ return noMove; // No ready bucket to make notready
+ }
+ return {true, wantReady};
+}
+
+void
+BucketMoveJobV2::startMove(BucketMoverSP mover, size_t maxDocsToMove) {
+ auto [keys, done] = mover->getKeysToMove(maxDocsToMove);
+ if (done) {
+ mover->setBucketDone();
+ }
+ if (keys.empty()) return;
+ if (_stopped.load(std::memory_order_relaxed)) return;
+ mover->updateLastValidGid(keys.back()._gid);
+ auto context = getLimiter().beginOperation();
+ Bucket spiBucket(document::Bucket(_bucketSpace, mover->getBucket()));
+ auto bucketTask = makeBucketTask(
+ [this, mover=std::move(mover), keys=std::move(keys),opsTracker=getLimiter().beginOperation()]
+ (const Bucket & bucket, std::shared_ptr<IDestructorCallback> onDone) mutable
+ {
+ assert(mover->getBucket() == bucket.getBucketId());
+ using DoneContext = vespalib::KeepAlive<std::pair<IDestructorCallbackSP, IDestructorCallbackSP>>;
+ prepareMove(std::move(mover), std::move(keys),
+ std::make_shared<DoneContext>(std::make_pair(std::move(opsTracker), std::move(onDone))));
+ });
+ auto failed = _bucketExecutor.execute(spiBucket, std::move(bucketTask));
+ if (!failed) {
+ _startedCount.fetch_add(1, std::memory_order_relaxed);
+ }
+}
+
+namespace {
+
+class IncOnDestruct {
+public:
+ IncOnDestruct(std::atomic<size_t> & count) : _count(count) {}
+ ~IncOnDestruct() {
+ _count.fetch_add(1, std::memory_order_relaxed);
+ }
+private:
+ std::atomic<size_t> & _count;
+};
+
+}
+
+void
+BucketMoveJobV2::prepareMove(BucketMoverSP mover, std::vector<MoveKey> keys, IDestructorCallbackSP onDone)
+{
+ IncOnDestruct countGuard(_executedCount);
+ if (_stopped.load(std::memory_order_relaxed)) return;
+ auto moveOps = mover->createMoveOperations(keys);
+ _master.execute(makeLambdaTask([this, mover=std::move(mover), moveOps=std::move(moveOps), onDone=std::move(onDone)]() mutable {
+ completeMove(std::move(mover), std::move(moveOps), std::move(onDone));
+ }));
+}
+
+void
+BucketMoveJobV2::completeMove(BucketMoverSP mover, std::vector<GuardedMoveOp> ops, IDestructorCallbackSP onDone) {
+ mover->moveDocuments(std::move(ops), std::move(onDone));
+ if (mover->bucketDone() && mover->inSync()) {
+ _modifiedHandler.notifyBucketModified(mover->getBucket());
+ }
+}
+
+void
+BucketMoveJobV2::cancelMovesForBucket(BucketId bucket) {
+ for (auto itr = _movers.begin(); itr != _movers.end(); itr++) {
+ if (bucket == (*itr)->getBucket()) {
+ _movers.erase(itr);
+ backFillMovers();
+ return;
+ }
+ }
+}
+
+void
+BucketMoveJobV2::considerBucket(const bucketdb::Guard & guard, BucketId bucket)
+{
+ ScanIterator itr(guard, bucket);
+ auto [mustMove, wantReady] = needMove(itr);
+ if (mustMove) {
+ _buckets2Move[bucket] = wantReady;
+ } else {
+ _buckets2Move.erase(bucket);
+ cancelMovesForBucket(bucket);
+ }
+ backFillMovers();
+ considerRun();
+}
+
+void
+BucketMoveJobV2::notifyCreateBucket(const bucketdb::Guard & guard, const BucketId &bucket)
+{
+ considerBucket(guard, bucket);
+}
+
+BucketMoveJobV2::BucketSet
+BucketMoveJobV2::computeBuckets2Move()
+{
+ BucketMoveJobV2::BucketSet toMove;
+ for (ScanIterator itr(_ready.meta_store()->getBucketDB().takeGuard(), BucketId()); itr.valid(); ++itr) {
+ auto [mustMove, wantReady] = needMove(itr);
+ if (mustMove) {
+ toMove[itr.getBucket()] = wantReady;
+ }
+ }
+ return toMove;
+}
+
+std::shared_ptr<BucketMover>
+BucketMoveJobV2::createMover(BucketId bucket, bool wantReady) {
+ const MaintenanceDocumentSubDB &source(wantReady ? _notReady : _ready);
+ const MaintenanceDocumentSubDB &target(wantReady ? _ready : _notReady);
+ LOG(debug, "checkBucket(): mover.setupForBucket(%s, source:%u, target:%u)",
+ bucket.toString().c_str(), source.sub_db_id(), target.sub_db_id());
+ return std::make_shared<BucketMover>(bucket, &source, target.sub_db_id(), _moveHandler);
+}
+
+std::shared_ptr<BucketMover>
+BucketMoveJobV2::greedyCreateMover() {
+ if ( ! _buckets2Move.empty()) {
+ auto next = _buckets2Move.begin();
+ auto mover = createMover(next->first, next->second);
+ _buckets2Move.erase(next->first);
+ return mover;
+ }
+ return {};
+}
+
+bool
+BucketMoveJobV2::moveDocs(size_t maxDocsToMove) {
+ if (done()) return true;
+
+ // Select mover
+ size_t index = _iterateCount++ % _movers.size();
+ const auto & mover = _movers[index];
+
+ //Move, or reduce movers as we are tailing off
+ if (!mover->bucketDone()) {
+ startMove(mover, maxDocsToMove);
+ if (mover->bucketDone()) {
+ auto next = greedyCreateMover();
+ if (next) {
+ _movers[index] = next;
+ } else {
+ _movers.erase(_movers.begin() + index);
+ }
+ }
+ }
+ return done();
+}
+
+bool
+BucketMoveJobV2::scanAndMove(size_t maxBuckets2Move, size_t maxDocsToMovePerBucket) {
+ for (size_t i(0); i < maxBuckets2Move; i++) {
+ moveDocs(maxDocsToMovePerBucket);
+ }
+ return isBlocked() || done();
+}
+
+bool
+BucketMoveJobV2::done() const {
+ return _buckets2Move.empty() && _movers.empty() && !isBlocked();
+}
+
+bool
+BucketMoveJobV2::run()
+{
+ if (isBlocked()) {
+ return true; // indicate work is done, since node state is bad
+ }
+ /// Returning false here will immediately post the job back on the executor. This will give a busy loop,
+ /// but this is considered fine as it is very rare and it will be intermingled with multiple feed operations.
+ if ( ! scanAndMove(1, 1) ) {
+ return false;
+ }
+
+ if (isBlocked(BlockedReason::OUTSTANDING_OPS)) {
+ return true;
+ }
+ return done();
+}
+
+void
+BucketMoveJobV2::recompute() {
+ _movers.clear();
+ _buckets2Move = computeBuckets2Move();
+ backFillMovers();
+}
+
+void
+BucketMoveJobV2::backFillMovers() {
+ // Ensure we have enough movers.
+ while ( ! _buckets2Move.empty() && (_movers.size() < _movers.capacity())) {
+ _movers.push_back(greedyCreateMover());
+ }
+}
+void
+BucketMoveJobV2::notifyClusterStateChanged(const IBucketStateCalculator::SP &newCalc)
+{
+ // Called by master write thread
+ _calc = newCalc;
+ recompute();
+ if (blockedDueToClusterState(_calc)) {
+ setBlocked(BlockedReason::CLUSTER_STATE);
+ } else {
+ unBlock(BlockedReason::CLUSTER_STATE);
+ }
+}
+
+void
+BucketMoveJobV2::notifyBucketStateChanged(const BucketId &bucketId, BucketInfo::ActiveState)
+{
+ // Called by master write thread
+ considerBucket(_ready.meta_store()->getBucketDB().takeGuard(), bucketId);
+}
+
+void
+BucketMoveJobV2::notifyDiskMemUsage(DiskMemUsageState state)
+{
+ // Called by master write thread
+ internalNotifyDiskMemUsage(state);
+}
+
+bool
+BucketMoveJobV2::inSync() const {
+ return _executedCount == _startedCount;
+}
+
+void
+BucketMoveJobV2::onStop() {
+ // Called by master write thread
+ _stopped = true;
+ while ( ! inSync() ) {
+ std::this_thread::sleep_for(1ms);
+ }
+}
+
+} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h
new file mode 100644
index 00000000000..714f24b6ff6
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h
@@ -0,0 +1,119 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "blockable_maintenance_job.h"
+#include "documentbucketmover.h"
+#include "i_disk_mem_usage_listener.h"
+#include "ibucketstatechangedhandler.h"
+#include "iclusterstatechangedhandler.h"
+#include <vespa/searchcore/proton/bucketdb/bucketscaniterator.h>
+#include <vespa/searchcore/proton/bucketdb/i_bucket_create_listener.h>
+
+namespace storage::spi { struct BucketExecutor; }
+namespace searchcorespi::index { struct IThreadService; }
+
+namespace proton {
+
+class BlockableMaintenanceJobConfig;
+class IBucketStateChangedNotifier;
+class IClusterStateChangedNotifier;
+class IDiskMemUsageNotifier;
+class IBucketModifiedHandler;
+
+namespace bucketdb { class IBucketCreateNotifier; }
+
+/**
+ * Class used to control the moving of buckets between the ready and
+ * not ready sub databases based on the readiness of buckets according to the cluster state.
+ * It will first compute the set of buckets to be moved. Then N of these buckets will be iterated in parallel and
+ * the documents scheduled for move. The movment will happen in 3 phases.
+ * 1 - Collect meta info for documents. Must happend in master thread
+ * 2 - Acquire bucket lock and fetch documents and very against meta data. This is done in BucketExecutor threads.
+ * 3 - Actual movement is then done in master thread while still holding bucket lock. Once bucket has fully moved
+ * bucket modified notification is sent.
+ */
+class BucketMoveJobV2 : public BlockableMaintenanceJob,
+ public IClusterStateChangedHandler,
+ public bucketdb::IBucketCreateListener,
+ public IBucketStateChangedHandler,
+ public IDiskMemUsageListener
+{
+private:
+ using BucketExecutor = storage::spi::BucketExecutor;
+ using IDestructorCallback = vespalib::IDestructorCallback;
+ using IDestructorCallbackSP = std::shared_ptr<IDestructorCallback>;
+ using IThreadService = searchcorespi::index::IThreadService;
+ using BucketId = document::BucketId;
+ using ScanIterator = bucketdb::ScanIterator;
+ using BucketSet = std::map<BucketId, bool>;
+ using NeedResult = std::pair<bool, bool>;
+ using ActiveState = storage::spi::BucketInfo::ActiveState;
+ using BucketMover = bucketdb::BucketMover;
+ using BucketMoverSP = std::shared_ptr<BucketMover>;
+ using Movers = std::vector<std::shared_ptr<BucketMover>>;
+ using MoveKey = BucketMover::MoveKey;
+ using GuardedMoveOp = BucketMover::GuardedMoveOp;
+ std::shared_ptr<IBucketStateCalculator> _calc;
+ IDocumentMoveHandler &_moveHandler;
+ IBucketModifiedHandler &_modifiedHandler;
+ IThreadService &_master;
+ BucketExecutor &_bucketExecutor;
+ const MaintenanceDocumentSubDB &_ready;
+ const MaintenanceDocumentSubDB &_notReady;
+ const document::BucketSpace _bucketSpace;
+ size_t _iterateCount;
+ Movers _movers;
+ BucketSet _buckets2Move;
+ std::atomic<bool> _stopped;
+ std::atomic<size_t> _startedCount;
+ std::atomic<size_t> _executedCount;
+
+ bucketdb::IBucketCreateNotifier &_bucketCreateNotifier;
+ IClusterStateChangedNotifier &_clusterStateChangedNotifier;
+ IBucketStateChangedNotifier &_bucketStateChangedNotifier;
+ IDiskMemUsageNotifier &_diskMemUsageNotifier;
+
+ void startMove(BucketMoverSP mover, size_t maxDocsToMove);
+ void prepareMove(BucketMoverSP mover, std::vector<MoveKey> keysToMove, IDestructorCallbackSP context);
+ void completeMove(BucketMoverSP mover, std::vector<GuardedMoveOp> keys, IDestructorCallbackSP context);
+ void considerBucket(const bucketdb::Guard & guard, BucketId bucket);
+ NeedResult needMove(const ScanIterator &itr) const;
+ BucketSet computeBuckets2Move();
+ BucketMoverSP createMover(BucketId bucket, bool wantReady);
+ BucketMoverSP greedyCreateMover();
+ void backFillMovers();
+ void cancelMovesForBucket(BucketId bucket);
+ bool moveDocs(size_t maxDocsToMove);
+public:
+ BucketMoveJobV2(const IBucketStateCalculator::SP &calc,
+ IDocumentMoveHandler &moveHandler,
+ IBucketModifiedHandler &modifiedHandler,
+ IThreadService & master,
+ BucketExecutor & bucketExecutor,
+ const MaintenanceDocumentSubDB &ready,
+ const MaintenanceDocumentSubDB &notReady,
+ bucketdb::IBucketCreateNotifier &bucketCreateNotifier,
+ IClusterStateChangedNotifier &clusterStateChangedNotifier,
+ IBucketStateChangedNotifier &bucketStateChangedNotifier,
+ IDiskMemUsageNotifier &diskMemUsageNotifier,
+ const BlockableMaintenanceJobConfig &blockableConfig,
+ const vespalib::string &docTypeName,
+ document::BucketSpace bucketSpace);
+
+ ~BucketMoveJobV2() override;
+
+ bool scanAndMove(size_t maxBuckets2Move, size_t maxDocsToMovePerBucket);
+ bool done() const;
+ void recompute();
+ bool inSync() const;
+
+ bool run() override;
+ void notifyClusterStateChanged(const IBucketStateCalculator::SP &newCalc) override;
+ void notifyBucketStateChanged(const BucketId &bucketId, ActiveState newState) override;
+ void notifyDiskMemUsage(DiskMemUsageState state) override;
+ void notifyCreateBucket(const bucketdb::Guard & guard, const BucketId &bucket) override;
+ void onStop() override;
+};
+
+} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp
index 76a65df4d1a..6defd3e7037 100644
--- a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp
@@ -20,8 +20,8 @@ namespace proton::bucketdb {
typedef IDocumentMetaStore::Iterator Iterator;
-MoveOperation::UP
-BucketMover::createMoveOperation(const MoveKey &key) {
+BucketMover::GuardedMoveOp
+BucketMover::createMoveOperation(MoveKey &key) {
if (_source->lidNeedsCommit(key._lid)) {
return {};
}
@@ -29,9 +29,10 @@ BucketMover::createMoveOperation(const MoveKey &key) {
if (!doc || doc->getId().getGlobalId() != key._gid)
return {}; // Failed to retrieve document, removed or changed identity
BucketId bucketId = _bucket.stripUnused();
- return std::make_unique<MoveOperation>(bucketId, key._timestamp, std::move(doc),
- DbDocumentId(_source->sub_db_id(), key._lid),
- _targetSubDbId);
+ return BucketMover::GuardedMoveOp(std::make_unique<MoveOperation>(bucketId, key._timestamp, std::move(doc),
+ DbDocumentId(_source->sub_db_id(), key._lid),
+ _targetSubDbId),
+ std::move(key._guard));
}
void
@@ -39,21 +40,34 @@ BucketMover::moveDocument(MoveOperationUP moveOp, IDestructorCallbackSP onDone)
_handler->handleMove(*moveOp, std::move(onDone));
}
+BucketMover::MoveKey::MoveKey(uint32_t lid, const document::GlobalId &gid, Timestamp timestamp, MoveGuard guard) noexcept
+ : _lid(lid),
+ _gid(gid),
+ _timestamp(timestamp),
+ _guard(std::move(guard))
+{ }
+
+BucketMover::MoveKey::~MoveKey() = default;
-BucketMover::BucketMover(const BucketId &bucket, const MaintenanceDocumentSubDB *source, uint32_t targetSubDbId,
- IDocumentMoveHandler &handler, BucketDBOwner &bucketDb) noexcept
+BucketMover::BucketMover(const BucketId &bucket, const MaintenanceDocumentSubDB *source,
+ uint32_t targetSubDbId, IDocumentMoveHandler &handler) noexcept
: _source(source),
_handler(&handler),
- _bucketDb(&bucketDb),
_bucket(bucket),
_targetSubDbId(targetSubDbId),
+ _started(0),
+ _completed(0),
_bucketDone(false),
- _lastGid(),
- _lastGidValid(false)
+ _lastGidValid(false),
+ _lastGid()
{ }
+BucketMover::~BucketMover() {
+ assert(inSync());
+}
+
std::pair<std::vector<BucketMover::MoveKey>, bool>
-BucketMover::getKeysToMove(size_t maxDocsToMove) const {
+BucketMover::getKeysToMove(size_t maxDocsToMove) {
std::pair<std::vector<BucketMover::MoveKey>, bool> result;
Iterator itr = (_lastGidValid ? _source->meta_store()->upperBound(_lastGid)
: _source->meta_store()->lowerBound(_bucket));
@@ -63,7 +77,7 @@ BucketMover::getKeysToMove(size_t maxDocsToMove) const {
uint32_t lid = itr.getKey().get_lid();
const RawDocumentMetaData &metaData = _source->meta_store()->getRawMetaData(lid);
if (metaData.getBucketUsedBits() == _bucket.getUsedBits()) {
- result.first.emplace_back(lid, metaData.getGid(), metaData.getTimestamp());
+ result.first.emplace_back(lid, metaData.getGid(), metaData.getTimestamp(), MoveGuard(*this));
++docsMoved;
}
}
@@ -71,13 +85,13 @@ BucketMover::getKeysToMove(size_t maxDocsToMove) const {
return result;
}
-std::vector<MoveOperation::UP>
-BucketMover::createMoveOperations(const std::vector<MoveKey> &toMove) {
- std::vector<MoveOperation::UP> successfulReads;
+std::vector<BucketMover::GuardedMoveOp>
+BucketMover::createMoveOperations(std::vector<MoveKey> &toMove) {
+ std::vector<GuardedMoveOp> successfulReads;
successfulReads.reserve(toMove.size());
- for (const MoveKey &key : toMove) {
+ for (MoveKey &key : toMove) {
auto moveOp = createMoveOperation(key);
- if (!moveOp) {
+ if (!moveOp.first) {
break;
}
successfulReads.push_back(std::move(moveOp));
@@ -86,37 +100,10 @@ BucketMover::createMoveOperations(const std::vector<MoveKey> &toMove) {
}
void
-BucketMover::moveDocuments(std::vector<MoveOperation::UP> moveOps, IDestructorCallbackSP onDone) {
- for (auto & moveOp : moveOps) {
- moveDocument(std::move(moveOp), onDone);
- }
-}
-
-bool
-BucketMover::moveDocuments(size_t maxDocsToMove, IMoveOperationLimiter &limiter)
-{
- if (_bucketDone) {
- return true;
- }
- auto [keys, done] = getKeysToMove(maxDocsToMove);
- auto moveOps = createMoveOperations(keys);
- bool allOk = keys.size() == moveOps.size();
- if (done && allOk) {
- setBucketDone();
- }
- if (moveOps.empty()) return allOk;
-
- updateLastValidGid(moveOps.back()->getDocument()->getId().getGlobalId());
-
+BucketMover::moveDocuments(std::vector<GuardedMoveOp> moveOps, IDestructorCallbackSP onDone) {
for (auto & moveOp : moveOps) {
- // We cache the bucket for the document we are going to move to avoid getting
- // inconsistent bucket info (getBucketInfo()) while moving between ready and not-ready
- // sub dbs as the bucket info is not updated atomically in this case.
- _bucketDb->takeGuard()->cacheBucket(moveOp->getBucketId());
- _handler->handleMove(*moveOp, limiter.beginOperation());
- _bucketDb->takeGuard()->uncacheBucket();
+ moveDocument(std::move(moveOp.first), std::move(onDone));
}
- return allOk;
}
}
@@ -124,22 +111,51 @@ BucketMover::moveDocuments(size_t maxDocsToMove, IMoveOperationLimiter &limiter)
namespace proton {
using bucketdb::BucketMover;
+using bucketdb::BucketDBOwner;
-DocumentBucketMover::DocumentBucketMover(IMoveOperationLimiter &limiter) noexcept
+DocumentBucketMover::DocumentBucketMover(IMoveOperationLimiter &limiter, BucketDBOwner &bucketDb) noexcept
: _limiter(limiter),
+ _bucketDb(&bucketDb),
_impl()
{}
void
DocumentBucketMover::setupForBucket(const document::BucketId &bucket, const MaintenanceDocumentSubDB *source,
- uint32_t targetSubDbId, IDocumentMoveHandler &handler, bucketdb::BucketDBOwner &bucketDb)
+ uint32_t targetSubDbId, IDocumentMoveHandler &handler)
{
- _impl = std::make_unique<BucketMover>(bucket, source, targetSubDbId, handler, bucketDb);
+ _impl = std::make_unique<BucketMover>(bucket, source, targetSubDbId, handler);
}
bool
DocumentBucketMover::moveDocuments(size_t maxDocsToMove) {
- return !_impl || _impl->moveDocuments(maxDocsToMove, _limiter);
+ return !_impl || moveDocuments(maxDocsToMove, _limiter);
+}
+
+bool
+DocumentBucketMover::moveDocuments(size_t maxDocsToMove, IMoveOperationLimiter &limiter)
+{
+ if (_impl->bucketDone()) {
+ return true;
+ }
+ auto [keys, done] = _impl->getKeysToMove(maxDocsToMove);
+ auto moveOps = _impl->createMoveOperations(keys);
+ bool allOk = keys.size() == moveOps.size();
+ if (done && allOk) {
+ _impl->setBucketDone();
+ }
+ if (moveOps.empty()) return allOk;
+
+ _impl->updateLastValidGid(moveOps.back().first->getDocument()->getId().getGlobalId());
+
+ for (auto & moveOp : moveOps) {
+ // We cache the bucket for the document we are going to move to avoid getting
+ // inconsistent bucket info (getBucketInfo()) while moving between ready and not-ready
+ // sub dbs as the bucket info is not updated atomically in this case.
+ _bucketDb->takeGuard()->cacheBucket(moveOp.first->getBucketId());
+ _impl->moveDocument(std::move(moveOp.first), limiter.beginOperation());
+ _bucketDb->takeGuard()->uncacheBucket();
+ }
+ return allOk;
}
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h
index dc91d40ce4e..c4f94a88cfa 100644
--- a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h
+++ b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h
@@ -5,6 +5,7 @@
#include <vespa/document/bucket/bucketid.h>
#include <vespa/document/base/globalid.h>
#include <persistence/spi/types.h>
+#include <atomic>
namespace vespalib { class IDestructorCallback; }
@@ -30,40 +31,60 @@ public:
using MoveOperationUP = std::unique_ptr<MoveOperation>;
using IDestructorCallback = vespalib::IDestructorCallback;
using IDestructorCallbackSP = std::shared_ptr<IDestructorCallback>;
+ class MoveGuard {
+ public:
+ MoveGuard() noexcept : _mover(nullptr) {}
+ MoveGuard(BucketMover & mover) noexcept
+ : _mover(&mover)
+ {
+ _mover->_started.fetch_add(1, std::memory_order_relaxed);
+ }
+ MoveGuard(MoveGuard && rhs) noexcept : _mover(rhs._mover) { rhs._mover = nullptr; }
+ MoveGuard & operator = (MoveGuard && mover) = delete;
+ MoveGuard(const MoveGuard & rhs) = delete;
+ MoveGuard & operator = (const MoveGuard & mover) = delete;
+ ~MoveGuard() {
+ if (_mover) {
+ _mover->_completed.fetch_add(1, std::memory_order_relaxed);
+ }
+ }
+ private:
+ BucketMover *_mover;
+ };
struct MoveKey
{
using Timestamp = storage::spi::Timestamp;
- MoveKey(uint32_t lid, const document::GlobalId &gid, Timestamp timestamp) noexcept
- : _lid(lid),
- _gid(gid),
- _timestamp(timestamp)
- { }
+ MoveKey(uint32_t lid, const document::GlobalId &gid, Timestamp timestamp, MoveGuard guard) noexcept;
+ MoveKey(MoveKey &&) noexcept = default;
+ ~MoveKey();
uint32_t _lid;
document::GlobalId _gid;
Timestamp _timestamp;
+ MoveGuard _guard;
};
- BucketMover(const document::BucketId &bucket, const MaintenanceDocumentSubDB *source, uint32_t targetSubDbId,
- IDocumentMoveHandler &handler, BucketDBOwner &bucketDb) noexcept;
+ BucketMover(const document::BucketId &bucket, const MaintenanceDocumentSubDB *source,
+ uint32_t targetSubDbId, IDocumentMoveHandler &handler) noexcept;
BucketMover(BucketMover &&) noexcept = default;
BucketMover & operator=(BucketMover &&) noexcept = delete;
BucketMover(const BucketMover &) = delete;
BucketMover & operator=(const BucketMover &) = delete;
+ ~BucketMover();
- // TODO remove once we have switched bucket move job
- bool moveDocuments(size_t maxDocsToMove, IMoveOperationLimiter &limiter);
-
+ using GuardedMoveOp = std::pair<MoveOperationUP, MoveGuard>;
/// Must be called in master thread
- std::pair<std::vector<MoveKey>, bool> getKeysToMove(size_t maxDocsToMove) const;
+ std::pair<std::vector<MoveKey>, bool> getKeysToMove(size_t maxDocsToMove);
/// Call from any thread
- std::vector<MoveOperationUP> createMoveOperations(const std::vector<MoveKey> & toMove);
+ std::vector<GuardedMoveOp> createMoveOperations(std::vector<MoveKey> & toMove);
/// Must be called in master thread
- void moveDocuments(std::vector<MoveOperationUP> moveOps, IDestructorCallbackSP onDone);
+ void moveDocuments(std::vector<GuardedMoveOp> moveOps, IDestructorCallbackSP onDone);
+ void moveDocument(MoveOperationUP moveOp, IDestructorCallbackSP onDone);
const document::BucketId &getBucket() const { return _bucket; }
void cancel() { setBucketDone(); }
void setBucketDone() { _bucketDone = true; }
+ /// Signals all documents have been scheduled for move
bool bucketDone() const { return _bucketDone; }
const MaintenanceDocumentSubDB * getSource() const { return _source; }
/// Must be called in master thread
@@ -71,19 +92,24 @@ public:
_lastGid = gid;
_lastGidValid = true;
}
+ bool inSync() const {
+ return pending() == 0;
+ }
private:
const MaintenanceDocumentSubDB *_source;
IDocumentMoveHandler *_handler;
- BucketDBOwner *_bucketDb;
const document::BucketId _bucket;
const uint32_t _targetSubDbId;
- bool _bucketDone;
- document::GlobalId _lastGid;
+ std::atomic<uint32_t> _started;
+ std::atomic<uint32_t> _completed;
+ bool _bucketDone; // All moves started, or operation has been cancelled
bool _lastGidValid;
-
- void moveDocument(MoveOperationUP moveOp, IDestructorCallbackSP onDone);
- MoveOperationUP createMoveOperation(const MoveKey & key);
+ document::GlobalId _lastGid;
+ GuardedMoveOp createMoveOperation(MoveKey & key);
+ size_t pending() const {
+ return _started.load(std::memory_order_relaxed) - _completed.load(std::memory_order_relaxed);
+ }
};
}
@@ -95,10 +121,13 @@ private:
class DocumentBucketMover
{
private:
- IMoveOperationLimiter &_limiter;
- std::unique_ptr<bucketdb::BucketMover> _impl;
+ IMoveOperationLimiter &_limiter;
+ bucketdb::BucketDBOwner *_bucketDb;
+ std::unique_ptr<bucketdb::BucketMover> _impl;
+
+ bool moveDocuments(size_t maxDocsToMove, IMoveOperationLimiter &limiter);
public:
- DocumentBucketMover(IMoveOperationLimiter &limiter) noexcept;
+ DocumentBucketMover(IMoveOperationLimiter &limiter, bucketdb::BucketDBOwner &bucketDb) noexcept;
DocumentBucketMover(DocumentBucketMover &&) noexcept = default;
DocumentBucketMover & operator=(DocumentBucketMover &&) noexcept = delete;
DocumentBucketMover(const DocumentBucketMover &) = delete;
@@ -106,8 +135,7 @@ public:
void setupForBucket(const document::BucketId &bucket,
const MaintenanceDocumentSubDB *source,
uint32_t targetSubDbId,
- IDocumentMoveHandler &handler,
- bucketdb::BucketDBOwner &bucketDb);
+ IDocumentMoveHandler &handler);
const document::BucketId &getBucket() const { return _impl->getBucket(); }
bool moveDocuments(size_t maxDocsToMove);
void cancel() { _impl->cancel(); }
diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp b/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp
index c8429984b1f..ce6acb4795e 100644
--- a/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp
@@ -1,6 +1,7 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "bucketmovejob.h"
+#include "bucketmovejobv2.h"
#include "heart_beat_job.h"
#include "job_tracked_maintenance_job.h"
#include "lid_space_compaction_job.h"
@@ -37,8 +38,7 @@ injectLidSpaceCompactionJobs(MaintenanceController &controller,
document::BucketSpace bucketSpace)
{
for (auto &lidHandler : lscHandlers) {
-
- IMaintenanceJob::UP job;
+ std::unique_ptr<IMaintenanceJob> job;
if (config.getLidSpaceCompactionConfig().useBucketExecutor()) {
job = std::make_unique<lidspace::CompactionJob>(
config.getLidSpaceCompactionConfig(),
@@ -65,6 +65,7 @@ void
injectBucketMoveJob(MaintenanceController &controller,
const DocumentDBMaintenanceConfig &config,
IFrozenBucketHandler &fbHandler,
+ storage::spi::BucketExecutor & bucketExecutor,
bucketdb::IBucketCreateNotifier &bucketCreateNotifier,
const vespalib::string &docTypeName,
document::BucketSpace bucketSpace,
@@ -76,18 +77,35 @@ injectBucketMoveJob(MaintenanceController &controller,
DocumentDBJobTrackers &jobTrackers,
IDiskMemUsageNotifier &diskMemUsageNotifier)
{
- auto bmj = std::make_unique<BucketMoveJob>(calc,
- moveHandler,
- bucketModifiedHandler,
- controller.getReadySubDB(),
- controller.getNotReadySubDB(),
- fbHandler,
- bucketCreateNotifier,
- clusterStateChangedNotifier,
- bucketStateChangedNotifier,
- diskMemUsageNotifier,
- config.getBlockableJobConfig(),
- docTypeName, bucketSpace);
+ std::unique_ptr<IMaintenanceJob> bmj;
+ if (config.getBucketMoveConfig().useBucketExecutor()) {
+ bmj = std::make_unique<BucketMoveJobV2>(calc,
+ moveHandler,
+ bucketModifiedHandler,
+ controller.masterThread(),
+ bucketExecutor,
+ controller.getReadySubDB(),
+ controller.getNotReadySubDB(),
+ bucketCreateNotifier,
+ clusterStateChangedNotifier,
+ bucketStateChangedNotifier,
+ diskMemUsageNotifier,
+ config.getBlockableJobConfig(),
+ docTypeName, bucketSpace);
+ } else {
+ bmj = std::make_unique<BucketMoveJob>(calc,
+ moveHandler,
+ bucketModifiedHandler,
+ controller.getReadySubDB(),
+ controller.getNotReadySubDB(),
+ fbHandler,
+ bucketCreateNotifier,
+ clusterStateChangedNotifier,
+ bucketStateChangedNotifier,
+ diskMemUsageNotifier,
+ config.getBlockableJobConfig(),
+ docTypeName, bucketSpace);
+ }
controller.registerJobInMasterThread(trackJob(jobTrackers.getBucketMove(), std::move(bmj)));
}
@@ -133,9 +151,9 @@ MaintenanceJobsInjector::injectJobs(MaintenanceController &controller,
clusterStateChangedNotifier, calc, bucketSpace);
}
- injectBucketMoveJob(controller, config, fbHandler, bucketCreateNotifier, docTypeName, bucketSpace, moveHandler,
- bucketModifiedHandler, clusterStateChangedNotifier, bucketStateChangedNotifier, calc,
- jobTrackers, diskMemUsageNotifier);
+ injectBucketMoveJob(controller, config, fbHandler, bucketExecutor, bucketCreateNotifier, docTypeName, bucketSpace,
+ moveHandler, bucketModifiedHandler, clusterStateChangedNotifier, bucketStateChangedNotifier,
+ calc, jobTrackers, diskMemUsageNotifier);
controller.registerJobInMasterThread(
std::make_unique<SampleAttributeUsageJob>(readyAttributeManager, notReadyAttributeManager,
diff --git a/vespajlib/src/main/java/com/yahoo/io/WritableByteTransmitter.java b/vespajlib/src/main/java/com/yahoo/io/WritableByteTransmitter.java
index d9021e56480..89d449a2eb0 100644
--- a/vespajlib/src/main/java/com/yahoo/io/WritableByteTransmitter.java
+++ b/vespajlib/src/main/java/com/yahoo/io/WritableByteTransmitter.java
@@ -10,5 +10,5 @@ import java.nio.ByteBuffer;
* @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a>
*/
public interface WritableByteTransmitter {
- public void send(ByteBuffer src) throws IOException;
+ void send(ByteBuffer src) throws IOException;
}
diff --git a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Configurator.java b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Configurator.java
index 5f8ebf29ccd..ba79969469a 100644
--- a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Configurator.java
+++ b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Configurator.java
@@ -162,8 +162,11 @@ public class Configurator {
.append("\n");
}
- static Set<String> zookeeperServerHostnames(ZookeeperServerConfig zookeeperServerConfig) {
- return zookeeperServerConfig.server().stream().map(ZookeeperServerConfig.Server::hostname).collect(Collectors.toSet());
+ static List<String> zookeeperServerHostnames(ZookeeperServerConfig zookeeperServerConfig) {
+ return zookeeperServerConfig.server().stream()
+ .map(ZookeeperServerConfig.Server::hostname)
+ .distinct()
+ .collect(Collectors.toList());
}
Path makeAbsolutePath(String filename) {
diff --git a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java
index 712e37be452..4ef73ec2374 100644
--- a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java
+++ b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java
@@ -86,12 +86,14 @@ public class Reconfigurer extends AbstractComponent {
if (newConfig.server().size() == 1) shutdownAndDie(server, Duration.ZERO);
List<String> newServers = difference(servers(newConfig), servers(activeConfig));
- String leavingServers = String.join(",", difference(serverIds(activeConfig), serverIds(newConfig)));
- String joiningServers = String.join(",", newServers);
- leavingServers = leavingServers.isEmpty() ? null : leavingServers;
- joiningServers = joiningServers.isEmpty() ? null : joiningServers;
- log.log(Level.INFO, "Will reconfigure ZooKeeper cluster. Joining servers: " + joiningServers +
- ", leaving servers: " + leavingServers);
+ String leavingServerIds = String.join(",", serverIdsDifference(activeConfig, newConfig));
+ String joiningServersSpec = String.join(",", newServers);
+ leavingServerIds = leavingServerIds.isEmpty() ? null : leavingServerIds;
+ joiningServersSpec = joiningServersSpec.isEmpty() ? null : joiningServersSpec;
+ log.log(Level.INFO, "Will reconfigure ZooKeeper cluster. \nJoining servers: " + joiningServersSpec +
+ "\nleaving servers: " + leavingServerIds +
+ "\nServers in active config:" + servers(activeConfig) +
+ "\nServers in new config:" + servers(newConfig));
String connectionSpec = localConnectionSpec(activeConfig);
Instant now = Instant.now();
Duration reconfigTimeout = reconfigTimeout(newServers.size());
@@ -100,7 +102,7 @@ public class Reconfigurer extends AbstractComponent {
for (int attempt = 1; now.isBefore(end); attempt++) {
try {
Instant reconfigStarted = Instant.now();
- vespaZooKeeperAdmin.reconfigure(connectionSpec, joiningServers, leavingServers);
+ vespaZooKeeperAdmin.reconfigure(connectionSpec, joiningServersSpec, leavingServerIds);
Instant reconfigEnded = Instant.now();
log.log(Level.INFO, "Reconfiguration completed in " +
Duration.between(reconfigTriggered, reconfigEnded) +
@@ -139,11 +141,10 @@ public class Reconfigurer extends AbstractComponent {
return HostName.getLocalhost() + ":" + config.clientPort();
}
- private static List<String> serverIds(ZookeeperServerConfig config) {
- return config.server().stream()
- .map(ZookeeperServerConfig.Server::id)
- .map(String::valueOf)
- .collect(Collectors.toList());
+ private static List<String> serverIdsDifference(ZookeeperServerConfig oldConfig, ZookeeperServerConfig newConfig) {
+ return difference(servers(oldConfig), servers(newConfig)).stream()
+ .map(server -> server.substring(0, server.indexOf('=')))
+ .collect(Collectors.toList());
}
private static List<String> servers(ZookeeperServerConfig config) {
diff --git a/zookeeper-server/zookeeper-server-common/src/test/java/com/yahoo/vespa/zookeeper/ReconfigurerTest.java b/zookeeper-server/zookeeper-server-common/src/test/java/com/yahoo/vespa/zookeeper/ReconfigurerTest.java
index a1b98a23bd0..5cee0de2b6e 100644
--- a/zookeeper-server/zookeeper-server-common/src/test/java/com/yahoo/vespa/zookeeper/ReconfigurerTest.java
+++ b/zookeeper-server/zookeeper-server-common/src/test/java/com/yahoo/vespa/zookeeper/ReconfigurerTest.java
@@ -13,7 +13,7 @@ import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.time.Duration;
-import java.util.stream.IntStream;
+import java.util.Arrays;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
@@ -68,6 +68,14 @@ public class ReconfigurerTest {
assertNull("No servers are joining", reconfigurer.joiningServers());
assertEquals("3,4", reconfigurer.leavingServers());
assertSame(nextConfig, reconfigurer.activeConfig());
+
+ // Cluster loses node1, but node3 joins. Indices are shuffled.
+ nextConfig = createConfig(3, true, 1);
+ reconfigurer.startOrReconfigure(nextConfig);
+ assertEquals(3, reconfigurer.reconfigurations());
+ assertEquals("1=node2:2182:2183;2181,2=node3:2182:2183;2181", reconfigurer.joiningServers());
+ assertEquals("1,2", reconfigurer.leavingServers());
+ assertSame(nextConfig, reconfigurer.activeConfig());
}
@Test
@@ -107,11 +115,15 @@ public class ReconfigurerTest {
reconfigurer.shutdown();
}
- private ZookeeperServerConfig createConfig(int numberOfServers, boolean dynamicReconfiguration) {
+ private ZookeeperServerConfig createConfig(int numberOfServers, boolean dynamicReconfiguration, int... skipIndices) {
+ Arrays.sort(skipIndices);
ZookeeperServerConfig.Builder builder = new ZookeeperServerConfig.Builder();
builder.zooKeeperConfigFile(cfgFile.getAbsolutePath());
builder.myidFile(idFile.getAbsolutePath());
- IntStream.range(0, numberOfServers).forEach(i -> builder.server(newServer(i, "node" + i)));
+ for (int i = 0, index = 0; i < numberOfServers; i++, index++) {
+ while (Arrays.binarySearch(skipIndices, index) >= 0) index++;
+ builder.server(newServer(i, "node" + index));
+ }
builder.myid(0);
builder.dynamicReconfiguration(dynamicReconfiguration);
return builder.build();