diff options
50 files changed, 2239 insertions, 574 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index eae4cb338eb..db673c9184c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -113,6 +113,7 @@ add_subdirectory(orchestrator) add_subdirectory(persistence) add_subdirectory(persistencetypes) add_subdirectory(predicate-search) +add_subdirectory(processing) add_subdirectory(searchcommon) add_subdirectory(searchcore) add_subdirectory(searchcorespi) diff --git a/config-model-api/abi-spec.json b/config-model-api/abi-spec.json index fd229b35778..ca736dd90d8 100644 --- a/config-model-api/abi-spec.json +++ b/config-model-api/abi-spec.json @@ -198,7 +198,6 @@ "public java.util.Optional globalServiceId()", "public boolean canUpgradeAt(java.time.Instant)", "public boolean canChangeRevisionAt(java.time.Instant)", - "public java.util.Optional athenzDomain()", "public java.util.Optional athenzService(com.yahoo.config.provision.Environment, com.yahoo.config.provision.RegionName)", "public com.yahoo.config.application.api.Notifications notifications()", "public java.util.List endpoints()", @@ -304,7 +303,6 @@ "methods": [ "public void <init>()", "public final boolean concerns(com.yahoo.config.provision.Environment)", - "public boolean deploysTo(com.yahoo.config.provision.Environment, java.util.Optional)", "public abstract boolean concerns(com.yahoo.config.provision.Environment, java.util.Optional)", "public java.util.List zones()", "public java.time.Duration delay()", diff --git a/config-model-api/src/main/java/com/yahoo/config/application/api/DeploymentInstanceSpec.java b/config-model-api/src/main/java/com/yahoo/config/application/api/DeploymentInstanceSpec.java index 9c6013a127d..8813eaf9c8c 100644 --- a/config-model-api/src/main/java/com/yahoo/config/application/api/DeploymentInstanceSpec.java +++ b/config-model-api/src/main/java/com/yahoo/config/application/api/DeploymentInstanceSpec.java @@ -154,10 +154,6 @@ public class DeploymentInstanceSpec extends DeploymentSpec.Steps { .noneMatch(block -> block.window().includes(instant)); } - /** Returns the athenz domain if configured */ - // TODO jonmv: Remove when 7.162 is older than the oldest deployed version. - public Optional<AthenzDomain> athenzDomain() { return Optional.empty(); } - /** Returns the athenz service for environment/region if configured, defaulting to that of the instance */ public Optional<AthenzService> athenzService(Environment environment, RegionName region) { return zones().stream() diff --git a/config-model-api/src/main/java/com/yahoo/config/application/api/DeploymentSpec.java b/config-model-api/src/main/java/com/yahoo/config/application/api/DeploymentSpec.java index a8fcf5e1315..24d83e2b7b1 100644 --- a/config-model-api/src/main/java/com/yahoo/config/application/api/DeploymentSpec.java +++ b/config-model-api/src/main/java/com/yahoo/config/application/api/DeploymentSpec.java @@ -254,13 +254,10 @@ public class DeploymentSpec { return concerns(environment, Optional.empty()); } - /** Returns whether this step specifies the given environment, and, optionally, region. */ - // TODO jonmv: Remove when 7.147 is the oldest version. - public boolean deploysTo(Environment environment, Optional<RegionName> region) { - return concerns(environment, region); - } - - /** Returns whether this step specifies the given environment, and, optionally, region. */ + /** + * Returns whether this step specifies the given environment, and, optionally, + * if this step specifies a region, whether this is also the given region. + */ public abstract boolean concerns(Environment environment, Optional<RegionName> region); /** Returns the zones deployed to in this step. */ @@ -348,7 +345,7 @@ public class DeploymentSpec { @Override public boolean concerns(Environment environment, Optional<RegionName> region) { if (environment != this.environment) return false; - if (region.isPresent() && ! region.equals(this.region)) return false; + if (region.isPresent() && this.region.isPresent() && ! region.equals(this.region)) return false; return true; } diff --git a/config-model-api/src/test/java/com/yahoo/config/application/api/DeploymentSpecTest.java b/config-model-api/src/test/java/com/yahoo/config/application/api/DeploymentSpecTest.java index 06db4fe44be..5561ebdef63 100644 --- a/config-model-api/src/test/java/com/yahoo/config/application/api/DeploymentSpecTest.java +++ b/config-model-api/src/test/java/com/yahoo/config/application/api/DeploymentSpecTest.java @@ -44,10 +44,10 @@ public class DeploymentSpecTest { assertEquals(1, spec.requireInstance("default").steps().size()); assertFalse(spec.majorVersion().isPresent()); assertTrue(spec.requireInstance("default").steps().get(0).concerns(Environment.test)); - assertTrue(spec.requireInstance("default").deploysTo(Environment.test, Optional.empty())); - assertFalse(spec.requireInstance("default").deploysTo(Environment.test, Optional.of(RegionName.from("region1")))); - assertFalse(spec.requireInstance("default").deploysTo(Environment.staging, Optional.empty())); - assertFalse(spec.requireInstance("default").deploysTo(Environment.prod, Optional.empty())); + assertTrue(spec.requireInstance("default").concerns(Environment.test, Optional.empty())); + assertTrue(spec.requireInstance("default").concerns(Environment.test, Optional.of(RegionName.from("region1")))); // test steps specify no region + assertFalse(spec.requireInstance("default").concerns(Environment.staging, Optional.empty())); + assertFalse(spec.requireInstance("default").concerns(Environment.prod, Optional.empty())); assertFalse(spec.requireInstance("default").globalServiceId().isPresent()); } @@ -81,9 +81,9 @@ public class DeploymentSpecTest { assertEquals(1, spec.steps().size()); assertEquals(1, spec.requireInstance("default").steps().size()); assertTrue(spec.requireInstance("default").steps().get(0).concerns(Environment.staging)); - assertFalse(spec.requireInstance("default").deploysTo(Environment.test, Optional.empty())); - assertTrue(spec.requireInstance("default").deploysTo(Environment.staging, Optional.empty())); - assertFalse(spec.requireInstance("default").deploysTo(Environment.prod, Optional.empty())); + assertFalse(spec.requireInstance("default").concerns(Environment.test, Optional.empty())); + assertTrue(spec.requireInstance("default").concerns(Environment.staging, Optional.empty())); + assertFalse(spec.requireInstance("default").concerns(Environment.prod, Optional.empty())); assertFalse(spec.requireInstance("default").globalServiceId().isPresent()); } @@ -110,11 +110,11 @@ public class DeploymentSpecTest { assertTrue(spec.requireInstance("default").steps().get(1).concerns(Environment.prod, Optional.of(RegionName.from("us-west1")))); assertTrue(((DeploymentSpec.DeclaredZone)spec.requireInstance("default").steps().get(1)).active()); - assertFalse(spec.requireInstance("default").deploysTo(Environment.test, Optional.empty())); - assertFalse(spec.requireInstance("default").deploysTo(Environment.staging, Optional.empty())); - assertTrue(spec.requireInstance("default").deploysTo(Environment.prod, Optional.of(RegionName.from("us-east1")))); - assertTrue(spec.requireInstance("default").deploysTo(Environment.prod, Optional.of(RegionName.from("us-west1")))); - assertFalse(spec.requireInstance("default").deploysTo(Environment.prod, Optional.of(RegionName.from("no-such-region")))); + assertFalse(spec.requireInstance("default").concerns(Environment.test, Optional.empty())); + assertFalse(spec.requireInstance("default").concerns(Environment.staging, Optional.empty())); + assertTrue(spec.requireInstance("default").concerns(Environment.prod, Optional.of(RegionName.from("us-east1")))); + assertTrue(spec.requireInstance("default").concerns(Environment.prod, Optional.of(RegionName.from("us-west1")))); + assertFalse(spec.requireInstance("default").concerns(Environment.prod, Optional.of(RegionName.from("no-such-region")))); assertFalse(spec.requireInstance("default").globalServiceId().isPresent()); assertEquals(DeploymentSpec.UpgradePolicy.defaultPolicy, spec.requireInstance("default").upgradePolicy()); @@ -289,12 +289,12 @@ public class DeploymentSpecTest { assertTrue(instance.steps().get(4).concerns(Environment.prod, Optional.of(RegionName.from("us-west1")))); assertTrue(((DeploymentSpec.DeclaredZone)instance.steps().get(4)).active()); - assertTrue(instance.deploysTo(Environment.test, Optional.empty())); - assertFalse(instance.deploysTo(Environment.test, Optional.of(RegionName.from("region1")))); - assertTrue(instance.deploysTo(Environment.staging, Optional.empty())); - assertTrue(instance.deploysTo(Environment.prod, Optional.of(RegionName.from("us-east1")))); - assertTrue(instance.deploysTo(Environment.prod, Optional.of(RegionName.from("us-west1")))); - assertFalse(instance.deploysTo(Environment.prod, Optional.of(RegionName.from("no-such-region")))); + assertTrue(instance.concerns(Environment.test, Optional.empty())); + assertTrue(instance.concerns(Environment.test, Optional.of(RegionName.from("region1")))); // test steps specify no region + assertTrue(instance.concerns(Environment.staging, Optional.empty())); + assertTrue(instance.concerns(Environment.prod, Optional.of(RegionName.from("us-east1")))); + assertTrue(instance.concerns(Environment.prod, Optional.of(RegionName.from("us-west1")))); + assertFalse(instance.concerns(Environment.prod, Optional.of(RegionName.from("no-such-region")))); assertFalse(instance.globalServiceId().isPresent()); } @@ -909,9 +909,10 @@ public class DeploymentSpecTest { @Test public void athenz_service_is_overridden_from_environment() { StringReader r = new StringReader( - "<deployment athenz-domain='domain' athenz-service='service'>" + + "<deployment athenz-domain='domain' athenz-service='unused-service'>" + " <instance id='default' athenz-service='service'>" + - " <test/>" + + " <test />" + + " <staging athenz-service='staging-service' />" + " <prod athenz-service='prod-service'>" + " <region active='true'>us-west-1</region>" + " </prod>" + @@ -919,6 +920,12 @@ public class DeploymentSpecTest { "</deployment>" ); DeploymentSpec spec = DeploymentSpec.fromXml(r); + assertEquals("service", + spec.requireInstance("default").athenzService(Environment.test, + RegionName.from("us-east-1")).get().value()); + assertEquals("staging-service", + spec.requireInstance("default").athenzService(Environment.staging, + RegionName.from("us-north-1")).get().value()); assertEquals("prod-service", spec.requireInstance("default").athenzService(Environment.prod, RegionName.from("us-west-1")).get().value()); diff --git a/config-model-api/src/test/java/com/yahoo/config/application/api/DeploymentSpecWithoutInstanceTest.java b/config-model-api/src/test/java/com/yahoo/config/application/api/DeploymentSpecWithoutInstanceTest.java index 450be3c9b06..77ce5c2175d 100644 --- a/config-model-api/src/test/java/com/yahoo/config/application/api/DeploymentSpecWithoutInstanceTest.java +++ b/config-model-api/src/test/java/com/yahoo/config/application/api/DeploymentSpecWithoutInstanceTest.java @@ -43,10 +43,10 @@ public class DeploymentSpecWithoutInstanceTest { assertEquals(1, spec.steps().size()); assertFalse(spec.majorVersion().isPresent()); assertTrue(spec.steps().get(0).concerns(Environment.test)); - assertTrue(spec.requireInstance("default").deploysTo(Environment.test, Optional.empty())); - assertFalse(spec.requireInstance("default").deploysTo(Environment.test, Optional.of(RegionName.from("region1")))); - assertFalse(spec.requireInstance("default").deploysTo(Environment.staging, Optional.empty())); - assertFalse(spec.requireInstance("default").deploysTo(Environment.prod, Optional.empty())); + assertTrue(spec.requireInstance("default").concerns(Environment.test, Optional.empty())); + assertTrue(spec.requireInstance("default").concerns(Environment.test, Optional.of(RegionName.from("region1")))); // test steps specify no region + assertFalse(spec.requireInstance("default").concerns(Environment.staging, Optional.empty())); + assertFalse(spec.requireInstance("default").concerns(Environment.prod, Optional.empty())); assertFalse(spec.requireInstance("default").globalServiceId().isPresent()); } @@ -76,9 +76,9 @@ public class DeploymentSpecWithoutInstanceTest { assertEquals(1, spec.steps().size()); assertEquals(1, spec.requireInstance("default").steps().size()); assertTrue(spec.requireInstance("default").steps().get(0).concerns(Environment.staging)); - assertFalse(spec.requireInstance("default").deploysTo(Environment.test, Optional.empty())); - assertTrue(spec.requireInstance("default").deploysTo(Environment.staging, Optional.empty())); - assertFalse(spec.requireInstance("default").deploysTo(Environment.prod, Optional.empty())); + assertFalse(spec.requireInstance("default").concerns(Environment.test, Optional.empty())); + assertTrue(spec.requireInstance("default").concerns(Environment.staging, Optional.empty())); + assertFalse(spec.requireInstance("default").concerns(Environment.prod, Optional.empty())); assertFalse(spec.requireInstance("default").globalServiceId().isPresent()); } @@ -103,11 +103,11 @@ public class DeploymentSpecWithoutInstanceTest { assertTrue(spec.requireInstance("default").steps().get(1).concerns(Environment.prod, Optional.of(RegionName.from("us-west1")))); assertTrue(((DeploymentSpec.DeclaredZone)spec.requireInstance("default").steps().get(1)).active()); - assertFalse(spec.requireInstance("default").deploysTo(Environment.test, Optional.empty())); - assertFalse(spec.requireInstance("default").deploysTo(Environment.staging, Optional.empty())); - assertTrue(spec.requireInstance("default").deploysTo(Environment.prod, Optional.of(RegionName.from("us-east1")))); - assertTrue(spec.requireInstance("default").deploysTo(Environment.prod, Optional.of(RegionName.from("us-west1")))); - assertFalse(spec.requireInstance("default").deploysTo(Environment.prod, Optional.of(RegionName.from("no-such-region")))); + assertFalse(spec.requireInstance("default").concerns(Environment.test, Optional.empty())); + assertFalse(spec.requireInstance("default").concerns(Environment.staging, Optional.empty())); + assertTrue(spec.requireInstance("default").concerns(Environment.prod, Optional.of(RegionName.from("us-east1")))); + assertTrue(spec.requireInstance("default").concerns(Environment.prod, Optional.of(RegionName.from("us-west1")))); + assertFalse(spec.requireInstance("default").concerns(Environment.prod, Optional.of(RegionName.from("no-such-region")))); assertFalse(spec.requireInstance("default").globalServiceId().isPresent()); assertEquals(DeploymentSpec.UpgradePolicy.defaultPolicy, spec.requireInstance("default").upgradePolicy()); @@ -144,12 +144,12 @@ public class DeploymentSpecWithoutInstanceTest { assertTrue(spec.requireInstance("default").steps().get(4).concerns(Environment.prod, Optional.of(RegionName.from("us-west1")))); assertTrue(((DeploymentSpec.DeclaredZone)spec.requireInstance("default").steps().get(4)).active()); - assertTrue(spec.requireInstance("default").deploysTo(Environment.test, Optional.empty())); - assertFalse(spec.requireInstance("default").deploysTo(Environment.test, Optional.of(RegionName.from("region1")))); - assertTrue(spec.requireInstance("default").deploysTo(Environment.staging, Optional.empty())); - assertTrue(spec.requireInstance("default").deploysTo(Environment.prod, Optional.of(RegionName.from("us-east1")))); - assertTrue(spec.requireInstance("default").deploysTo(Environment.prod, Optional.of(RegionName.from("us-west1")))); - assertFalse(spec.requireInstance("default").deploysTo(Environment.prod, Optional.of(RegionName.from("no-such-region")))); + assertTrue(spec.requireInstance("default").concerns(Environment.test, Optional.empty())); + assertTrue(spec.requireInstance("default").concerns(Environment.test, Optional.of(RegionName.from("region1")))); // test steps specify no region + assertTrue(spec.requireInstance("default").concerns(Environment.staging, Optional.empty())); + assertTrue(spec.requireInstance("default").concerns(Environment.prod, Optional.of(RegionName.from("us-east1")))); + assertTrue(spec.requireInstance("default").concerns(Environment.prod, Optional.of(RegionName.from("us-west1")))); + assertFalse(spec.requireInstance("default").concerns(Environment.prod, Optional.of(RegionName.from("no-such-region")))); assertFalse(spec.requireInstance("default").globalServiceId().isPresent()); } @@ -552,7 +552,8 @@ public class DeploymentSpecWithoutInstanceTest { public void athenz_service_is_overridden_from_environment() { StringReader r = new StringReader( "<deployment athenz-domain='domain' athenz-service='service'>\n" + - " <test/>\n" + + " <test />\n" + + " <staging athenz-service='staging-service' />\n" + " <prod athenz-service='prod-service'>\n" + " <region active='true'>us-west-1</region>\n" + " </prod>\n" + @@ -561,6 +562,8 @@ public class DeploymentSpecWithoutInstanceTest { DeploymentSpec spec = DeploymentSpec.fromXml(r); assertEquals("service", spec.athenzService().get().value()); assertEquals(spec.athenzDomain().get().value(), "domain"); + assertEquals(spec.requireInstance("default").athenzService(Environment.test, RegionName.from("us-east-1")).get().value(), "service"); + assertEquals(spec.requireInstance("default").athenzService(Environment.staging, RegionName.from("us-north-1")).get().value(), "staging-service"); assertEquals(spec.requireInstance("default").athenzService(Environment.prod, RegionName.from("us-west-1")).get().value(), "prod-service"); } diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java index 81e0574d373..76fd57a51cf 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java @@ -124,7 +124,7 @@ public class SessionRepository { public SessionRepository(TenantName tenantName, TenantApplications applicationRepo, SessionPreparer sessionPreparer, - Curator curator, + ConfigCurator configCurator, Metrics metrics, StripedExecutor<TenantName> zkWatcherExecutor, PermanentApplicationPackage permanentApplicationPackage, @@ -140,11 +140,11 @@ public class SessionRepository { ConfigDefinitionRepo configDefinitionRepo, TenantListener tenantListener) { this.tenantName = tenantName; - this.configCurator = ConfigCurator.create(curator); + this.configCurator = configCurator; sessionCounter = new SessionCounter(configCurator, tenantName); this.sessionsPath = TenantRepository.getSessionsPath(tenantName); this.clock = clock; - this.curator = curator; + this.curator = configCurator.curator(); this.sessionLifetime = Duration.ofSeconds(configserverConfig.sessionLifetime()); this.zkWatcherExecutor = command -> zkWatcherExecutor.execute(tenantName, command); this.permanentApplicationPackage = permanentApplicationPackage; diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/tenant/TenantRepository.java b/configserver/src/main/java/com/yahoo/vespa/config/server/tenant/TenantRepository.java index 1b8e1f2c7e1..babb7e4a596 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/tenant/TenantRepository.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/tenant/TenantRepository.java @@ -30,6 +30,7 @@ import com.yahoo.vespa.config.server.monitoring.Metrics; import com.yahoo.vespa.config.server.provision.HostProvisionerProvider; import com.yahoo.vespa.config.server.session.SessionPreparer; import com.yahoo.vespa.config.server.session.SessionRepository; +import com.yahoo.vespa.config.server.zookeeper.ConfigCurator; import com.yahoo.vespa.curator.Curator; import com.yahoo.vespa.curator.transaction.CuratorOperations; import com.yahoo.vespa.curator.transaction.CuratorTransaction; @@ -96,6 +97,7 @@ public class TenantRepository { private final Locks<TenantName> tenantLocks = new Locks<>(1, TimeUnit.MINUTES); private final HostRegistry hostRegistry; private final TenantListener tenantListener; + private final ConfigCurator configCurator; private final Curator curator; private final Metrics metrics; private final MetricUpdater metricUpdater; @@ -123,7 +125,7 @@ public class TenantRepository { */ @Inject public TenantRepository(HostRegistry hostRegistry, - Curator curator, + ConfigCurator configCurator, Metrics metrics, FlagSource flagSource, SecretStore secretStore, @@ -136,7 +138,7 @@ public class TenantRepository { ReloadListener reloadListener, TenantListener tenantListener) { this(hostRegistry, - curator, + configCurator, metrics, new StripedExecutor<>(), new FileDistributionFactory(configserverConfig), @@ -155,7 +157,7 @@ public class TenantRepository { } public TenantRepository(HostRegistry hostRegistry, - Curator curator, + ConfigCurator configCurator, Metrics metrics, StripedExecutor<TenantName> zkWatcherExecutor, FileDistributionFactory fileDistributionFactory, @@ -175,7 +177,7 @@ public class TenantRepository { this.configserverConfig = configserverConfig; this.bootstrapExecutor = Executors.newFixedThreadPool(configserverConfig.numParallelTenantLoaders(), new DaemonThreadFactory("bootstrap-tenant-")); - this.curator = curator; + this.curator = configCurator.curator(); this.metrics = metrics; metricUpdater = metrics.getOrCreateMetricUpdater(Collections.emptyMap()); this.zkCacheExecutor = zkCacheExecutor; @@ -191,6 +193,7 @@ public class TenantRepository { this.configDefinitionRepo = configDefinitionRepo; this.reloadListener = reloadListener; this.tenantListener = tenantListener; + this.configCurator = configCurator; curator.framework().getConnectionStateListenable().addListener(this::stateChanged); @@ -332,7 +335,7 @@ public class TenantRepository { SessionRepository sessionRepository = new SessionRepository(tenantName, applicationRepo, sessionPreparer, - curator, + configCurator, metrics, zkWatcherExecutor, permanentApplicationPackage, diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/tenant/TenantRepositoryTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/tenant/TenantRepositoryTest.java index 70bbe2031aa..b3d1495eb6b 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/tenant/TenantRepositoryTest.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/tenant/TenantRepositoryTest.java @@ -27,6 +27,7 @@ import com.yahoo.vespa.config.server.modelfactory.ModelFactoryRegistry; import com.yahoo.vespa.config.server.monitoring.MetricUpdater; import com.yahoo.vespa.config.server.monitoring.Metrics; import com.yahoo.vespa.config.server.provision.HostProvisionerProvider; +import com.yahoo.vespa.config.server.zookeeper.ConfigCurator; import com.yahoo.vespa.curator.Curator; import com.yahoo.vespa.curator.mock.MockCurator; import com.yahoo.vespa.flags.InMemoryFlagSource; @@ -208,7 +209,7 @@ public class TenantRepositoryTest { public FailingDuringBootstrapTenantRepository(ConfigserverConfig configserverConfig) { super(new HostRegistry(), - new MockCurator(), + ConfigCurator.create(new MockCurator()), Metrics.createTestMetrics(), new StripedExecutor<>(new InThreadExecutorService()), new FileDistributionFactory(new ConfigserverConfig.Builder().build()), diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/tenant/TestTenantRepository.java b/configserver/src/test/java/com/yahoo/vespa/config/server/tenant/TestTenantRepository.java index ad20abaeaf5..8279bf4df5e 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/tenant/TestTenantRepository.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/tenant/TestTenantRepository.java @@ -17,6 +17,7 @@ import com.yahoo.vespa.config.server.host.HostRegistry; import com.yahoo.vespa.config.server.modelfactory.ModelFactoryRegistry; import com.yahoo.vespa.config.server.monitoring.Metrics; import com.yahoo.vespa.config.server.provision.HostProvisionerProvider; +import com.yahoo.vespa.config.server.zookeeper.ConfigCurator; import com.yahoo.vespa.curator.Curator; import com.yahoo.vespa.curator.mock.MockCurator; import com.yahoo.vespa.flags.FlagSource; @@ -46,7 +47,7 @@ public class TestTenantRepository extends TenantRepository { ReloadListener reloadListener, TenantListener tenantListener) { super(hostRegistry, - curator, + ConfigCurator.create(curator), metrics, new StripedExecutor<>(new InThreadExecutorService()), fileDistributionFactory, diff --git a/container-core/abi-spec.json b/container-core/abi-spec.json index a2a9c48d61b..46b7a852afd 100644 --- a/container-core/abi-spec.json +++ b/container-core/abi-spec.json @@ -376,6 +376,7 @@ "public void nonCopyingWrite(byte[], int, int)", "public void nonCopyingWrite(byte[])", "public void send(java.nio.ByteBuffer)", + "protected void send(java.nio.ByteBuffer, com.yahoo.jdisc.handler.CompletionHandler)", "public long written()" ], "fields": [] diff --git a/container-core/src/main/java/com/yahoo/container/handler/LogHandler.java b/container-core/src/main/java/com/yahoo/container/handler/LogHandler.java index 4b23eafaa9c..991cd83ffa8 100644 --- a/container-core/src/main/java/com/yahoo/container/handler/LogHandler.java +++ b/container-core/src/main/java/com/yahoo/container/handler/LogHandler.java @@ -15,13 +15,15 @@ import java.io.OutputStream; import java.nio.ByteBuffer; import java.time.Instant; import java.util.Optional; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; public class LogHandler extends ThreadedHttpRequestHandler { private final LogReader logReader; + private static final long MB = 1024*1024; @Inject public LogHandler(Executor executor, LogHandlerConfig config) { @@ -45,7 +47,7 @@ public class LogHandler extends ThreadedHttpRequestHandler { @Override public void render(OutputStream output, ContentChannel networkChannel, CompletionHandler handler) { try { - OutputStream blockingOutput = new BlockingFlushContentChannelOutputStream(networkChannel); + OutputStream blockingOutput = new MaxPendingContentChannelOutputStream(networkChannel, 1*MB); logReader.writeLogs(blockingOutput, from, to, hostname); blockingOutput.close(); } @@ -60,29 +62,71 @@ public class LogHandler extends ThreadedHttpRequestHandler { } - private static class BlockingFlushContentChannelOutputStream extends ContentChannelOutputStream { + private static class MaxPendingContentChannelOutputStream extends ContentChannelOutputStream { + private final long maxPending; + private final AtomicLong sent = new AtomicLong(0); + private final AtomicLong acked = new AtomicLong(0); - private final ContentChannel channel; - - public BlockingFlushContentChannelOutputStream(ContentChannel endpoint) { + public MaxPendingContentChannelOutputStream(ContentChannel endpoint, long maxPending) { super(endpoint); - this.channel = endpoint; + this.maxPending = maxPending; + } + + private long pendingBytes() { + return sent.get() - acked.get(); + } + + private class TrackCompletition implements CompletionHandler { + private final long written; + private final AtomicBoolean replied = new AtomicBoolean(false); + TrackCompletition(long written) { + this.written = written; + sent.addAndGet(written); + } + @Override + public void completed() { + if (!replied.getAndSet(true)) { + acked.addAndGet(written); + } + } + + @Override + public void failed(Throwable t) { + if (!replied.getAndSet(true)) { + acked.addAndGet(written); + } + } + } + @Override + public void send(ByteBuffer src) throws IOException { + try { + stallWhilePendingAbove(maxPending); + } catch (InterruptedException ignored) { + throw new IOException("Interrupted waiting for IO"); + } + CompletionHandler pendingTracker = new TrackCompletition(src.remaining()); + try { + send(src, pendingTracker); + } catch (Throwable throwable) { + pendingTracker.failed(throwable); + throw throwable; + } + } + + private void stallWhilePendingAbove(long pending) throws InterruptedException { + while (pendingBytes() > pending) { + Thread.sleep(1); + } } @Override public void flush() throws IOException { super.flush(); - CountDownLatch latch = new CountDownLatch(1); - channel.write(ByteBuffer.allocate(0), // :'( - new CompletionHandler() { - @Override public void completed() { latch.countDown(); } - @Override public void failed(Throwable t) { latch.countDown(); } - }); try { - latch.await(); + stallWhilePendingAbove(0); } catch (InterruptedException e) { - throw new RuntimeException("Interrupted waiting for underlying IO to complete", e); + throw new IOException("Interrupted waiting for IO"); } } diff --git a/container-core/src/main/java/com/yahoo/container/handler/LogReader.java b/container-core/src/main/java/com/yahoo/container/handler/LogReader.java index ea2f431f5a6..5b6a87d02df 100644 --- a/container-core/src/main/java/com/yahoo/container/handler/LogReader.java +++ b/container-core/src/main/java/com/yahoo/container/handler/LogReader.java @@ -43,13 +43,13 @@ import static java.nio.charset.StandardCharsets.UTF_8; * @author jonmv */ class LogReader { - static final Pattern logArchivePathPattern = Pattern.compile("(\\d{4})/(\\d{2})/(\\d{2})/(\\d{2})-\\d+(.gz)?"); static final Pattern vespaLogPathPattern = Pattern.compile("vespa\\.log(?:-(\\d{4})-(\\d{2})-(\\d{2})\\.(\\d{2})-(\\d{2})-(\\d{2})(?:.gz)?)?"); private final Path logDirectory; private final Pattern logFilePattern; + LogReader(String logDirectory, String logFilePattern) { this(Paths.get(Defaults.getDefaults().underVespaHome(logDirectory)), Pattern.compile(logFilePattern)); } @@ -72,12 +72,10 @@ class LogReader { Iterator<LineWithTimestamp> lines = Iterators.mergeSorted(logLineIterators, Comparator.comparingDouble(LineWithTimestamp::timestamp)); - long linesWritten = 0; while (lines.hasNext()) { - writer.write(lines.next().line()); + String line = lines.next().line(); + writer.write(line); writer.newLine(); - if ((++linesWritten & ((1 << 16) - 1)) == 0) - writer.flush(); } } catch (IOException e) { @@ -188,9 +186,11 @@ class LogReader { @Override public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) { - if ( logFilePattern.matcher(file.getFileName().toString()).matches() + if (logFilePattern.matcher(file.getFileName().toString()).matches() && ! attrs.lastModifiedTime().toInstant().isBefore(from)) + { paths.add(file); + } return FileVisitResult.CONTINUE; } diff --git a/container-core/src/main/java/com/yahoo/container/jdisc/ContentChannelOutputStream.java b/container-core/src/main/java/com/yahoo/container/jdisc/ContentChannelOutputStream.java index 1d4c20efe5e..270da0c4ab0 100644 --- a/container-core/src/main/java/com/yahoo/container/jdisc/ContentChannelOutputStream.java +++ b/container-core/src/main/java/com/yahoo/container/jdisc/ContentChannelOutputStream.java @@ -12,7 +12,6 @@ import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.Arrays; -import java.util.logging.Level; import java.util.logging.Logger; /** @@ -125,9 +124,13 @@ public class ContentChannelOutputStream extends OutputStream implements Writable @Override public void send(ByteBuffer src) throws IOException { // Don't do a buffer.flush() from here, this method is used by the buffer itself + send(src, null); + } + + protected void send(ByteBuffer src, CompletionHandler completionHandler) throws IOException { try { byteBufferData += src.remaining(); - endpoint.write(src, new LoggingCompletionHandler()); + endpoint.write(src, new LoggingCompletionHandler(completionHandler)); } catch (RuntimeException e) { throw new IOException(Exceptions.toMessageString(e), e); } @@ -138,10 +141,16 @@ public class ContentChannelOutputStream extends OutputStream implements Writable return buffer.appended() + byteBufferData; } - class LoggingCompletionHandler implements CompletionHandler { - + private class LoggingCompletionHandler implements CompletionHandler { + private final CompletionHandler nested; + LoggingCompletionHandler(CompletionHandler nested) { + this.nested = nested; + } @Override public void completed() { + if (nested != null) { + nested.completed(); + } } @Override @@ -158,6 +167,9 @@ public class ContentChannelOutputStream extends OutputStream implements Writable if (log.isLoggable(logLevel)) { log.log(logLevel, "Got exception when writing to client: " + Exceptions.toMessageString(t)); } + if (nested != null) { + nested.failed(t); + } } } diff --git a/container-core/src/test/java/com/yahoo/container/jdisc/LoggingTestCase.java b/container-core/src/test/java/com/yahoo/container/jdisc/LoggingTestCase.java index 22e55eb5291..bdaf6f7919f 100644 --- a/container-core/src/test/java/com/yahoo/container/jdisc/LoggingTestCase.java +++ b/container-core/src/test/java/com/yahoo/container/jdisc/LoggingTestCase.java @@ -96,7 +96,7 @@ public class LoggingTestCase { } @Test - public final void testFailed() throws IOException, InterruptedException { + public final void testFailed() throws IOException { stream.send(createData()); stream.send(createData()); stream.send(createData()); diff --git a/eval/CMakeLists.txt b/eval/CMakeLists.txt index 22750a27f20..66270bb323b 100644 --- a/eval/CMakeLists.txt +++ b/eval/CMakeLists.txt @@ -66,6 +66,7 @@ vespa_define_module( src/tests/instruction/pow_as_map_optimizer src/tests/instruction/remove_trivial_dimension_optimizer src/tests/instruction/sparse_dot_product_function + src/tests/instruction/sparse_full_overlap_join_function src/tests/instruction/sparse_merge_function src/tests/instruction/sparse_no_overlap_join_function src/tests/instruction/sum_max_dot_product_function diff --git a/eval/src/tests/instruction/sparse_full_overlap_join_function/CMakeLists.txt b/eval/src/tests/instruction/sparse_full_overlap_join_function/CMakeLists.txt new file mode 100644 index 00000000000..54841140278 --- /dev/null +++ b/eval/src/tests/instruction/sparse_full_overlap_join_function/CMakeLists.txt @@ -0,0 +1,9 @@ +# Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(eval_sparse_full_overlap_join_function_test_app TEST + SOURCES + sparse_full_overlap_join_function_test.cpp + DEPENDS + vespaeval + GTest::GTest +) +vespa_add_test(NAME eval_sparse_full_overlap_join_function_test_app COMMAND eval_sparse_full_overlap_join_function_test_app) diff --git a/eval/src/tests/instruction/sparse_full_overlap_join_function/sparse_full_overlap_join_function_test.cpp b/eval/src/tests/instruction/sparse_full_overlap_join_function/sparse_full_overlap_join_function_test.cpp new file mode 100644 index 00000000000..e3001b17602 --- /dev/null +++ b/eval/src/tests/instruction/sparse_full_overlap_join_function/sparse_full_overlap_join_function_test.cpp @@ -0,0 +1,92 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/eval/eval/fast_value.h> +#include <vespa/eval/eval/simple_value.h> +#include <vespa/eval/instruction/sparse_full_overlap_join_function.h> +#include <vespa/eval/eval/test/eval_fixture.h> +#include <vespa/vespalib/gtest/gtest.h> + +using namespace vespalib::eval; +using namespace vespalib::eval::test; + +const ValueBuilderFactory &prod_factory = FastValueBuilderFactory::get(); +const ValueBuilderFactory &test_factory = SimpleValueBuilderFactory::get(); + +//----------------------------------------------------------------------------- + +EvalFixture::ParamRepo make_params() { + return EvalFixture::ParamRepo() + .add_variants("v1_a", GenSpec(3.0).map("a", 8, 1)) + .add_variants("v2_a", GenSpec(7.0).map("a", 4, 2)) + .add_variants("v2_a_trivial", GenSpec(7.0).map("a", 4, 2).idx("b", 1).idx("c", 1)) + .add_variants("v3_b", GenSpec(5.0).map("b", 4, 2)) + .add("m1_ab", GenSpec(3.0).map("a", 8, 1).map("b", 8, 1)) + .add("m2_ab", GenSpec(17.0).map("a", 4, 2).map("b", 4, 2)) + .add("m3_bc", GenSpec(11.0).map("b", 4, 2).map("c", 4, 2)) + .add("scalar", GenSpec(1.0)) + .add("dense_a", GenSpec().idx("a", 5)) + .add("mixed_ab", GenSpec().map("a", 5, 1).idx("b", 5)); +} +EvalFixture::ParamRepo param_repo = make_params(); + +void assert_optimized(const vespalib::string &expr) { + EvalFixture fast_fixture(prod_factory, expr, param_repo, true); + EvalFixture test_fixture(test_factory, expr, param_repo, true); + EvalFixture slow_fixture(prod_factory, expr, param_repo, false); + EXPECT_EQ(fast_fixture.result(), EvalFixture::ref(expr, param_repo)); + EXPECT_EQ(test_fixture.result(), EvalFixture::ref(expr, param_repo)); + EXPECT_EQ(slow_fixture.result(), EvalFixture::ref(expr, param_repo)); + EXPECT_EQ(fast_fixture.find_all<SparseFullOverlapJoinFunction>().size(), 1u); + EXPECT_EQ(test_fixture.find_all<SparseFullOverlapJoinFunction>().size(), 1u); + EXPECT_EQ(slow_fixture.find_all<SparseFullOverlapJoinFunction>().size(), 0u); +} + +void assert_not_optimized(const vespalib::string &expr) { + EvalFixture fast_fixture(prod_factory, expr, param_repo, true); + EXPECT_EQ(fast_fixture.result(), EvalFixture::ref(expr, param_repo)); + EXPECT_EQ(fast_fixture.find_all<SparseFullOverlapJoinFunction>().size(), 0u); +} + +//----------------------------------------------------------------------------- + +TEST(SparseFullOverlapJoin, expression_can_be_optimized) +{ + assert_optimized("v1_a-v2_a"); + assert_optimized("v2_a-v1_a"); + assert_optimized("join(v1_a,v2_a,f(x,y)(max(x,y)))"); +} + +TEST(SparseFullOverlapJoin, multi_dimensional_expression_can_be_optimized) +{ + assert_optimized("m1_ab-m2_ab"); + assert_optimized("m2_ab-m1_ab"); + assert_optimized("join(m1_ab,m2_ab,f(x,y)(max(x,y)))"); +} + +TEST(SparseFullOverlapJoin, trivial_dimensions_are_ignored) +{ + assert_optimized("v1_a*v2_a_trivial"); + assert_optimized("v2_a_trivial*v1_a"); +} + +TEST(SparseFullOverlapJoin, inappropriate_shapes_are_not_optimized) +{ + assert_not_optimized("v1_a*scalar"); + assert_not_optimized("v1_a*mixed_ab"); + assert_not_optimized("v1_a*v3_b"); + assert_not_optimized("v1_a*m1_ab"); + assert_not_optimized("m1_ab*m3_bc"); + assert_not_optimized("scalar*scalar"); + assert_not_optimized("dense_a*dense_a"); + assert_not_optimized("mixed_ab*mixed_ab"); +} + +TEST(SparseFullOverlapJoin, mixed_cell_types_are_not_optimized) +{ + assert_not_optimized("v1_a*v2_a_f"); + assert_not_optimized("v1_a_f*v2_a"); +} + +//----------------------------------------------------------------------------- + +GTEST_MAIN_RUN_ALL_TESTS() diff --git a/eval/src/vespa/eval/eval/fast_value.hpp b/eval/src/vespa/eval/eval/fast_value.hpp index d5cfc9c6368..33624fb920e 100644 --- a/eval/src/vespa/eval/eval/fast_value.hpp +++ b/eval/src/vespa/eval/eval/fast_value.hpp @@ -47,7 +47,7 @@ struct FastFilterView : public Value::Index::View { const FastAddrMap ↦ std::vector<size_t> match_dims; std::vector<size_t> extract_dims; - std::vector<string_id> query; + std::vector<string_id> query; size_t pos; bool is_match(ConstArrayRef<string_id> addr) const { @@ -141,12 +141,6 @@ struct FastValueIndex final : Value::Index { FastAddrMap map; FastValueIndex(size_t num_mapped_dims_in, const std::vector<string_id> &labels, size_t expected_subspaces_in) : map(num_mapped_dims_in, labels, expected_subspaces_in) {} - - template <typename LCT, typename RCT, typename OCT, typename Fun> - static const Value &sparse_full_overlap_join(const ValueType &res_type, const Fun &fun, - const FastValueIndex &lhs, const FastValueIndex &rhs, - ConstArrayRef<LCT> lhs_cells, ConstArrayRef<RCT> rhs_cells, Stash &stash); - size_t size() const override { return map.size(); } std::unique_ptr<View> create_view(const std::vector<size_t> &dims) const override; }; @@ -267,6 +261,10 @@ struct FastValue final : Value, ValueBuilder<T> { } my_index.map.add_mapping(hash); } + void add_singledim_mapping(string_id label) { + my_handles.push_back(label); + my_index.map.add_mapping(FastAddrMap::hash_label(label)); + } ArrayRef<T> add_subspace(ConstArrayRef<vespalib::stringref> addr) override { add_mapping(addr); return my_cells.add_cells(my_subspace_size); @@ -344,25 +342,4 @@ struct FastScalarBuilder final : ValueBuilder<T> { //----------------------------------------------------------------------------- -template <typename LCT, typename RCT, typename OCT, typename Fun> -const Value & -FastValueIndex::sparse_full_overlap_join(const ValueType &res_type, const Fun &fun, - const FastValueIndex &lhs, const FastValueIndex &rhs, - ConstArrayRef<LCT> lhs_cells, ConstArrayRef<RCT> rhs_cells, Stash &stash) -{ - auto &result = stash.create<FastValue<OCT,true>>(res_type, lhs.map.addr_size(), 1, lhs.map.size()); - lhs.map.each_map_entry([&](auto lhs_subspace, auto hash) { - auto lhs_addr = lhs.map.get_addr(lhs_subspace); - auto rhs_subspace = rhs.map.lookup(lhs_addr, hash); - if (rhs_subspace != FastAddrMap::npos()) { - result.add_mapping(lhs_addr, hash); - auto cell_value = fun(lhs_cells[lhs_subspace], rhs_cells[rhs_subspace]); - result.my_cells.push_back_fast(cell_value); - } - }); - return result; -} - -//----------------------------------------------------------------------------- - } diff --git a/eval/src/vespa/eval/eval/optimize_tensor_function.cpp b/eval/src/vespa/eval/eval/optimize_tensor_function.cpp index aef49a2c75b..f1ce293b18c 100644 --- a/eval/src/vespa/eval/eval/optimize_tensor_function.cpp +++ b/eval/src/vespa/eval/eval/optimize_tensor_function.cpp @@ -8,6 +8,7 @@ #include <vespa/eval/instruction/sparse_dot_product_function.h> #include <vespa/eval/instruction/sparse_merge_function.h> #include <vespa/eval/instruction/sparse_no_overlap_join_function.h> +#include <vespa/eval/instruction/sparse_full_overlap_join_function.h> #include <vespa/eval/instruction/mixed_inner_product_function.h> #include <vespa/eval/instruction/sum_max_dot_product_function.h> #include <vespa/eval/instruction/dense_xw_product_function.h> @@ -76,6 +77,7 @@ const TensorFunction &optimize_for_factory(const ValueBuilderFactory &, const Te child.set(DenseSingleReduceFunction::optimize(child.get(), stash)); child.set(SparseMergeFunction::optimize(child.get(), stash)); child.set(SparseNoOverlapJoinFunction::optimize(child.get(), stash)); + child.set(SparseFullOverlapJoinFunction::optimize(child.get(), stash)); nodes.pop_back(); } } diff --git a/eval/src/vespa/eval/instruction/CMakeLists.txt b/eval/src/vespa/eval/instruction/CMakeLists.txt index 50f7dbe7005..97838f2adb9 100644 --- a/eval/src/vespa/eval/instruction/CMakeLists.txt +++ b/eval/src/vespa/eval/instruction/CMakeLists.txt @@ -33,6 +33,7 @@ vespa_add_library(eval_instruction OBJECT remove_trivial_dimension_optimizer.cpp replace_type_function.cpp sparse_dot_product_function.cpp + sparse_full_overlap_join_function.cpp sparse_merge_function.cpp sparse_no_overlap_join_function.cpp sum_max_dot_product_function.cpp diff --git a/eval/src/vespa/eval/instruction/detect_type.h b/eval/src/vespa/eval/instruction/detect_type.h deleted file mode 100644 index f1769fa15cc..00000000000 --- a/eval/src/vespa/eval/instruction/detect_type.h +++ /dev/null @@ -1,76 +0,0 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include <typeindex> -#include <array> -#include <cstddef> - -#pragma once - -namespace vespalib::eval::instruction { - -/* - * Utilities for detecting implementation class by comparing - * typeindex(typeid(T)); for now these are local to this - * namespace, but we can consider moving them to a more - * common place (probably vespalib) if we see more use-cases. - */ - -/** - * Recognize a (const) instance of type T. This is cheaper than - * dynamic_cast, but requires the object to be exactly of class T. - * Returns a pointer to the object as T if recognized, nullptr - * otherwise. - **/ -template<typename T, typename U> -const T * -recognize_by_type_index(const U & object) -{ - if (std::type_index(typeid(object)) == std::type_index(typeid(T))) { - return static_cast<const T *>(&object); - } - return nullptr; -} - -/** - * Packs N recognized values into one object, used as return value - * from detect_type<T>. - * - * Use all_converted() or the equivalent bool cast operator to check - * if all objects were recognized. After this check is successful use - * get<0>(), get<1>() etc to get a reference to the objects. - **/ -template<typename T, size_t N> -class RecognizedValues -{ -private: - std::array<const T *, N> _pointers; -public: - RecognizedValues(std::array<const T *, N> && pointers) - : _pointers(std::move(pointers)) - {} - bool all_converted() const { - for (auto p : _pointers) { - if (p == nullptr) return false; - } - return true; - } - operator bool() const { return all_converted(); } - template<size_t idx> const T& get() const { - static_assert(idx < N); - return *_pointers[idx]; - } -}; - -/** - * For all arguments, detect if they have typeid(T), convert to T if - * possible, and return a RecognizedValues packing the converted - * values. - **/ -template<typename T, typename... Args> -RecognizedValues<T, sizeof...(Args)> -detect_type(const Args &... args) -{ - return RecognizedValues<T, sizeof...(Args)>({(recognize_by_type_index<T>(args))...}); -} - -} // namespace diff --git a/eval/src/vespa/eval/instruction/generic_join.cpp b/eval/src/vespa/eval/instruction/generic_join.cpp index 4b3755509c7..fb714bcf16e 100644 --- a/eval/src/vespa/eval/instruction/generic_join.cpp +++ b/eval/src/vespa/eval/instruction/generic_join.cpp @@ -1,9 +1,7 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "generic_join.h" -#include "detect_type.h" #include <vespa/eval/eval/inline_operation.h> -#include <vespa/eval/eval/fast_value.hpp> #include <vespa/eval/eval/wrap_param.h> #include <vespa/vespalib/util/overload.h> #include <vespa/vespalib/util/stash.h> @@ -69,43 +67,6 @@ void my_mixed_join_op(State &state, uint64_t param_in) { //----------------------------------------------------------------------------- -template <typename LCT, typename RCT, typename OCT, typename Fun> -void my_sparse_full_overlap_join_op(State &state, uint64_t param_in) { - const auto ¶m = unwrap_param<JoinParam>(param_in); - const Value &lhs = state.peek(1); - const Value &rhs = state.peek(0); - auto lhs_cells = lhs.cells().typify<LCT>(); - auto rhs_cells = rhs.cells().typify<RCT>(); - const Value::Index &lhs_index = lhs.index(); - const Value::Index &rhs_index = rhs.index(); - if (auto indexes = detect_type<FastValueIndex>(lhs_index, rhs_index)) { - const auto &lhs_fast = indexes.get<0>(); - const auto &rhs_fast = indexes.get<1>(); - return (rhs_fast.map.size() < lhs_fast.map.size()) - ? state.pop_pop_push(FastValueIndex::sparse_full_overlap_join<RCT,LCT,OCT,SwapArgs2<Fun>> - (param.res_type, SwapArgs2<Fun>(param.function), rhs_fast, lhs_fast, rhs_cells, lhs_cells, state.stash)) - : state.pop_pop_push(FastValueIndex::sparse_full_overlap_join<LCT,RCT,OCT,Fun> - (param.res_type, Fun(param.function), lhs_fast, rhs_fast, lhs_cells, rhs_cells, state.stash)); - } - Fun fun(param.function); - SparseJoinState sparse(param.sparse_plan, lhs_index, rhs_index); - auto builder = param.factory.create_transient_value_builder<OCT>(param.res_type, param.sparse_plan.sources.size(), param.dense_plan.out_size, sparse.first_index.size()); - auto outer = sparse.first_index.create_view({}); - auto inner = sparse.second_index.create_view(sparse.second_view_dims); - outer->lookup({}); - while (outer->next_result(sparse.first_address, sparse.first_subspace)) { - inner->lookup(sparse.address_overlap); - if (inner->next_result(sparse.second_only_address, sparse.second_subspace)) { - builder->add_subspace(sparse.full_address)[0] = fun(lhs_cells[sparse.lhs_subspace], rhs_cells[sparse.rhs_subspace]); - } - } - auto &result = state.stash.create<std::unique_ptr<Value>>(builder->build(std::move(builder))); - const Value &result_ref = *(result.get()); - state.pop_pop_push(result_ref); -}; - -//----------------------------------------------------------------------------- - template <typename LCT, typename RCT, typename OCT, typename Fun, bool forward_lhs> void my_mixed_dense_join_op(State &state, uint64_t param_in) { const auto ¶m = unwrap_param<JoinParam>(param_in); @@ -175,11 +136,6 @@ struct SelectGenericJoinOp { if (param.sparse_plan.should_forward_rhs_index()) { return my_mixed_dense_join_op<LCT,RCT,OCT,Fun,false>; } - if ((param.dense_plan.out_size == 1) && - (param.sparse_plan.sources.size() == param.sparse_plan.lhs_overlap.size())) - { - return my_sparse_full_overlap_join_op<LCT,RCT,OCT,Fun>; - } return my_mixed_join_op<LCT,RCT,OCT,Fun>; } }; diff --git a/eval/src/vespa/eval/instruction/generic_merge.cpp b/eval/src/vespa/eval/instruction/generic_merge.cpp index 9b098db7763..434ab308c3c 100644 --- a/eval/src/vespa/eval/instruction/generic_merge.cpp +++ b/eval/src/vespa/eval/instruction/generic_merge.cpp @@ -1,9 +1,7 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include "detect_type.h" #include "generic_merge.h" #include <vespa/eval/eval/inline_operation.h> -#include <vespa/eval/eval/fast_value.hpp> #include <vespa/eval/eval/wrap_param.h> #include <vespa/vespalib/util/stash.h> #include <vespa/vespalib/util/typify.h> diff --git a/eval/src/vespa/eval/instruction/sparse_full_overlap_join_function.cpp b/eval/src/vespa/eval/instruction/sparse_full_overlap_join_function.cpp new file mode 100644 index 00000000000..480af3315b1 --- /dev/null +++ b/eval/src/vespa/eval/instruction/sparse_full_overlap_join_function.cpp @@ -0,0 +1,134 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "sparse_full_overlap_join_function.h" +#include "generic_join.h" +#include <vespa/eval/eval/fast_value.hpp> +#include <vespa/vespalib/util/typify.h> + +namespace vespalib::eval { + +using namespace tensor_function; +using namespace operation; +using namespace instruction; + +namespace { + +template <typename CT, typename Fun, bool single_dim> +const Value &my_fast_sparse_full_overlap_join(const FastAddrMap &lhs_map, const FastAddrMap &rhs_map, + const CT *lhs_cells, const CT *rhs_cells, + const JoinParam ¶m, Stash &stash) +{ + Fun fun(param.function); + auto &result = stash.create<FastValue<CT,true>>(param.res_type, lhs_map.addr_size(), 1, lhs_map.size()); + if constexpr (single_dim) { + const auto &labels = lhs_map.labels(); + for (size_t i = 0; i < labels.size(); ++i) { + auto rhs_subspace = rhs_map.lookup_singledim(labels[i]); + if (rhs_subspace != FastAddrMap::npos()) { + result.add_singledim_mapping(labels[i]); + auto cell_value = fun(lhs_cells[i], rhs_cells[rhs_subspace]); + result.my_cells.push_back_fast(cell_value); + } + } + } else { + lhs_map.each_map_entry([&](auto lhs_subspace, auto hash) { + auto lhs_addr = lhs_map.get_addr(lhs_subspace); + auto rhs_subspace = rhs_map.lookup(lhs_addr, hash); + if (rhs_subspace != FastAddrMap::npos()) { + result.add_mapping(lhs_addr, hash); + auto cell_value = fun(lhs_cells[lhs_subspace], rhs_cells[rhs_subspace]); + result.my_cells.push_back_fast(cell_value); + } + }); + } + return result; +} + +template <typename CT, typename Fun, bool single_dim> +const Value &my_fast_sparse_full_overlap_join_dispatch(const FastAddrMap &lhs_map, const FastAddrMap &rhs_map, + const CT *lhs_cells, const CT *rhs_cells, + const JoinParam ¶m, Stash &stash) +{ + return (rhs_map.size() < lhs_map.size()) + ? my_fast_sparse_full_overlap_join<CT,SwapArgs2<Fun>,single_dim>(rhs_map, lhs_map, rhs_cells, lhs_cells, param, stash) + : my_fast_sparse_full_overlap_join<CT,Fun,single_dim>(lhs_map, rhs_map, lhs_cells, rhs_cells, param, stash); +} + +template <typename CT, typename Fun, bool single_dim> +void my_sparse_full_overlap_join_op(InterpretedFunction::State &state, uint64_t param_in) { + const auto ¶m = unwrap_param<JoinParam>(param_in); + const Value &lhs = state.peek(1); + const Value &rhs = state.peek(0); + const auto &lhs_idx = lhs.index(); + const auto &rhs_idx = rhs.index(); + if (__builtin_expect(are_fast(lhs_idx, rhs_idx), true)) { + const Value &res = my_fast_sparse_full_overlap_join_dispatch<CT,Fun,single_dim>(as_fast(lhs_idx).map, as_fast(rhs_idx).map, + lhs.cells().typify<CT>().cbegin(), rhs.cells().typify<CT>().cbegin(), param, state.stash); + state.pop_pop_push(res); + } else { + auto res = generic_mixed_join<CT,CT,CT,Fun>(lhs, rhs, param); + state.pop_pop_push(*state.stash.create<std::unique_ptr<Value>>(std::move(res))); + } +} + +struct SelectSparseFullOverlapJoinOp { + template <typename CT, typename Fun, typename SINGLE_DIM> + static auto invoke() { return my_sparse_full_overlap_join_op<CT,Fun,SINGLE_DIM::value>; } +}; + +using MyTypify = TypifyValue<TypifyCellType,operation::TypifyOp2,TypifyBool>; + +bool is_sparse_like(const ValueType &type) { + return ((type.count_mapped_dimensions() > 0) && (type.dense_subspace_size() == 1)); +} + +} // namespace <unnamed> + +SparseFullOverlapJoinFunction::SparseFullOverlapJoinFunction(const tensor_function::Join &original) + : tensor_function::Join(original.result_type(), + original.lhs(), + original.rhs(), + original.function()) +{ + assert(compatible_types(result_type(), lhs().result_type(), rhs().result_type())); +} + +InterpretedFunction::Instruction +SparseFullOverlapJoinFunction::compile_self(const ValueBuilderFactory &factory, Stash &stash) const +{ + const auto ¶m = stash.create<JoinParam>(lhs().result_type(), rhs().result_type(), function(), factory); + assert(param.res_type == result_type()); + bool single_dim = (result_type().count_mapped_dimensions() == 1); + auto op = typify_invoke<3,MyTypify,SelectSparseFullOverlapJoinOp>(result_type().cell_type(), function(), single_dim); + return InterpretedFunction::Instruction(op, wrap_param<JoinParam>(param)); +} + +bool +SparseFullOverlapJoinFunction::compatible_types(const ValueType &res, const ValueType &lhs, const ValueType &rhs) +{ + if ((lhs.cell_type() == rhs.cell_type()) && + is_sparse_like(lhs) && is_sparse_like(rhs) && + (res.count_mapped_dimensions() == lhs.count_mapped_dimensions()) && + (res.count_mapped_dimensions() == rhs.count_mapped_dimensions())) + { + assert(is_sparse_like(res)); + assert(res.cell_type() == lhs.cell_type()); + return true; + } + return false; +} + +const TensorFunction & +SparseFullOverlapJoinFunction::optimize(const TensorFunction &expr, Stash &stash) +{ + if (auto join = as<Join>(expr)) { + const TensorFunction &lhs = join->lhs(); + const TensorFunction &rhs = join->rhs(); + if (compatible_types(expr.result_type(), lhs.result_type(), rhs.result_type())) { + return stash.create<SparseFullOverlapJoinFunction>(*join); + } + } + return expr; +} + +} // namespace diff --git a/eval/src/vespa/eval/instruction/sparse_full_overlap_join_function.h b/eval/src/vespa/eval/instruction/sparse_full_overlap_join_function.h new file mode 100644 index 00000000000..13d35065997 --- /dev/null +++ b/eval/src/vespa/eval/instruction/sparse_full_overlap_join_function.h @@ -0,0 +1,22 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vespa/eval/eval/tensor_function.h> + +namespace vespalib::eval { + +/** + * Tensor function for joining tensors with full sparse overlap. + */ +class SparseFullOverlapJoinFunction : public tensor_function::Join +{ +public: + SparseFullOverlapJoinFunction(const tensor_function::Join &original); + InterpretedFunction::Instruction compile_self(const ValueBuilderFactory &factory, Stash &stash) const override; + bool result_is_mutable() const override { return true; } + static bool compatible_types(const ValueType &res, const ValueType &lhs, const ValueType &rhs); + static const TensorFunction &optimize(const TensorFunction &expr, Stash &stash); +}; + +} // namespace diff --git a/jdisc-security-filters/pom.xml b/jdisc-security-filters/pom.xml index 49f77cd60e7..5f6189c5cae 100644 --- a/jdisc-security-filters/pom.xml +++ b/jdisc-security-filters/pom.xml @@ -41,7 +41,22 @@ <artifactId>mockito-core</artifactId> <scope>test</scope> </dependency> - + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.junit.vintage</groupId> + <artifactId>junit-vintage-engine</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.yahoo.vespa</groupId> + <artifactId>testutil</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> </dependencies> <build> diff --git a/jdisc-security-filters/src/main/java/com/yahoo/jdisc/http/filter/security/rule/RuleBasedRequestFilter.java b/jdisc-security-filters/src/main/java/com/yahoo/jdisc/http/filter/security/rule/RuleBasedRequestFilter.java new file mode 100644 index 00000000000..71f1965c764 --- /dev/null +++ b/jdisc-security-filters/src/main/java/com/yahoo/jdisc/http/filter/security/rule/RuleBasedRequestFilter.java @@ -0,0 +1,118 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jdisc.http.filter.security.rule; + +import com.google.inject.Inject; +import com.yahoo.jdisc.Metric; +import com.yahoo.jdisc.http.filter.DiscFilterRequest; +import com.yahoo.jdisc.http.filter.security.base.JsonSecurityRequestFilterBase; +import com.yahoo.jdisc.http.filter.security.rule.RuleBasedFilterConfig.Rule.Action; +import com.yahoo.restapi.Path; + +import java.net.URI; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.logging.Level; +import java.util.logging.Logger; +import java.util.stream.Collectors; + +/** + * Security request filter that filters requests based on host, method and uri path. + * + * @author bjorncs + */ +public class RuleBasedRequestFilter extends JsonSecurityRequestFilterBase { + + private static final Logger log = Logger.getLogger(RuleBasedRequestFilter.class.getName()); + + private final Metric metric; + private final boolean dryrun; + private final List<Rule> rules; + private final ErrorResponse defaultResponse; + + @Inject + public RuleBasedRequestFilter(Metric metric, RuleBasedFilterConfig config) { + this.metric = metric; + this.dryrun = config.dryrun(); + this.rules = Rule.fromConfig(config.rule()); + this.defaultResponse = createDefaultResponse(config.defaultRule()); + } + + @Override + protected Optional<ErrorResponse> filter(DiscFilterRequest request) { + String method = request.getMethod(); + URI uri = request.getUri(); + for (Rule rule : rules) { + if (rule.matches(method, uri)) { + log.log(Level.FINE, () -> + String.format("Request '%h' with method '%s' and uri '%s' matched rule '%s'", request, method, uri, rule.name)); + return responseFor(request, rule.name, rule.response); + } + } + return responseFor(request, "default", defaultResponse); + } + + private static ErrorResponse createDefaultResponse(RuleBasedFilterConfig.DefaultRule defaultRule) { + switch (defaultRule.action()) { + case ALLOW: return null; + case BLOCK: return new ErrorResponse(defaultRule.blockResponseCode(), defaultRule.blockResponseMessage()); + default: throw new IllegalArgumentException(defaultRule.action().name()); + } + } + + private Optional<ErrorResponse> responseFor(DiscFilterRequest request, String ruleName, ErrorResponse response) { + int statusCode = response != null ? response.getResponse().getStatus() : 0; + Metric.Context metricContext = metric.createContext(Map.of( + "rule", ruleName, + "dryrun", Boolean.toString(dryrun), + "statusCode", Integer.toString(statusCode))); + if (response != null) { + metric.add("jdisc.http.filter.rule.blocked_requests", 1L, metricContext); + log.log(Level.FINE, () -> String.format( + "Blocking request '%h' with status code '%d' using rule '%s' (dryrun=%b)", request, statusCode, ruleName, dryrun)); + return dryrun ? Optional.empty() : Optional.of(response); + } else { + metric.add("jdisc.http.filter.rule.allowed_requests", 1L, metricContext); + log.log(Level.FINE, () -> String.format("Allowing request '%h' using rule '%s' (dryrun=%b)", request, ruleName, dryrun)); + return Optional.empty(); + } + } + + private static class Rule { + + final String name; + final Set<String> hostnames; + final Set<String> methods; + final Set<String> pathGlobExpressions; + final ErrorResponse response; + + static List<Rule> fromConfig(List<RuleBasedFilterConfig.Rule> config) { + return config.stream() + .map(Rule::new) + .collect(Collectors.toList()); + } + + Rule(RuleBasedFilterConfig.Rule config) { + this.name = config.name(); + this.hostnames = Set.copyOf(config.hostNames()); + this.methods = config.methods().stream() + .map(m -> m.name().toUpperCase()) + .collect(Collectors.toSet()); + this.pathGlobExpressions = Set.copyOf(config.pathExpressions()); + this.response = config.action() == Action.Enum.BLOCK + ? new ErrorResponse(config.blockResponseCode(), config.blockResponseMessage()) + : null; + } + + boolean matches(String method, URI uri) { + boolean methodMatches = methods.isEmpty() || methods.contains(method.toUpperCase()); + String host = uri.getHost(); + boolean hostnameMatches = hostnames.isEmpty() || (host != null && hostnames.contains(host)); + Path pathMatcher = new Path(uri); + boolean pathMatches = pathGlobExpressions.isEmpty() || pathGlobExpressions.stream().anyMatch(pathMatcher::matches); + return methodMatches && hostnameMatches && pathMatches; + } + + } +} diff --git a/jdisc-security-filters/src/test/java/com/yahoo/jdisc/http/filter/security/rule/RuleBasedRequestFilterTest.java b/jdisc-security-filters/src/test/java/com/yahoo/jdisc/http/filter/security/rule/RuleBasedRequestFilterTest.java new file mode 100644 index 00000000000..c67d3b430c8 --- /dev/null +++ b/jdisc-security-filters/src/test/java/com/yahoo/jdisc/http/filter/security/rule/RuleBasedRequestFilterTest.java @@ -0,0 +1,174 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jdisc.http.filter.security.rule; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.yahoo.container.jdisc.RequestHandlerTestDriver.MockResponseHandler; +import com.yahoo.jdisc.Metric; +import com.yahoo.jdisc.Response; +import com.yahoo.jdisc.http.filter.DiscFilterRequest; +import com.yahoo.jdisc.http.filter.security.rule.RuleBasedFilterConfig.DefaultRule; +import com.yahoo.jdisc.http.filter.security.rule.RuleBasedFilterConfig.Rule; +import com.yahoo.test.json.JsonTestHelper; +import com.yahoo.vespa.jdk8compat.List; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.net.URI; +import java.util.Set; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * @author bjorncs + */ +class RuleBasedRequestFilterTest { + + private static final ObjectMapper jsonMapper = new ObjectMapper(); + + @Test + void matches_rule_that_allows_all_methods_and_paths() { + RuleBasedFilterConfig config = new RuleBasedFilterConfig.Builder() + .dryrun(false) + .defaultRule(new DefaultRule.Builder() + .action(DefaultRule.Action.Enum.BLOCK)) + .rule(new Rule.Builder() + .name("first") + .hostNames("myserver") + .pathExpressions(List.of()) + .methods(List.of()) + .action(Rule.Action.Enum.ALLOW)) + .build(); + + Metric metric = mock(Metric.class); + RuleBasedRequestFilter filter = new RuleBasedRequestFilter(metric, config); + MockResponseHandler responseHandler = new MockResponseHandler(); + filter.filter(request("PATCH", "http://myserver:80/path-to-resource"), responseHandler); + + assertAllowed(responseHandler, metric); + + } + + @Test + void performs_action_on_first_matching_rule() throws IOException { + RuleBasedFilterConfig config = new RuleBasedFilterConfig.Builder() + .dryrun(false) + .defaultRule(new DefaultRule.Builder() + .action(DefaultRule.Action.Enum.ALLOW)) + .rule(new Rule.Builder() + .name("first") + .pathExpressions("/path-to-resource") + .methods(Rule.Methods.Enum.DELETE) + .action(Rule.Action.Enum.BLOCK) + .blockResponseCode(403)) + .rule(new Rule.Builder() + .name("second") + .pathExpressions("/path-to-resource") + .methods(Rule.Methods.Enum.GET) + .action(Rule.Action.Enum.BLOCK) + .blockResponseCode(404)) + .build(); + + Metric metric = mock(Metric.class); + RuleBasedRequestFilter filter = new RuleBasedRequestFilter(metric, config); + MockResponseHandler responseHandler = new MockResponseHandler(); + filter.filter(request("GET", "http://myserver:80/path-to-resource"), responseHandler); + + assertBlocked(responseHandler, metric, 404, ""); + } + + @Test + void performs_default_action_if_no_rule_matches() throws IOException { + RuleBasedFilterConfig config = new RuleBasedFilterConfig.Builder() + .dryrun(false) + .defaultRule(new DefaultRule.Builder() + .action(DefaultRule.Action.Enum.BLOCK) + .blockResponseCode(403) + .blockResponseMessage("my custom message")) + .rule(new Rule.Builder() + .name("rule") + .pathExpressions("/path-to-resource") + .methods(Rule.Methods.Enum.GET) + .action(Rule.Action.Enum.ALLOW)) + .build(); + + Metric metric = mock(Metric.class); + RuleBasedRequestFilter filter = new RuleBasedRequestFilter(metric, config); + MockResponseHandler responseHandler = new MockResponseHandler(); + filter.filter(request("POST", "http://myserver:80/"), responseHandler); + + assertBlocked(responseHandler, metric, 403, "my custom message"); + } + + @Test + void matches_rule_with_multiple_alternatives_for_host_path_and_method() throws IOException { + RuleBasedFilterConfig config = new RuleBasedFilterConfig.Builder() + .dryrun(false) + .defaultRule(new DefaultRule.Builder() + .action(DefaultRule.Action.Enum.ALLOW)) + .rule(new Rule.Builder() + .name("rule") + .hostNames(Set.of("server1", "server2", "server3")) + .pathExpressions(Set.of("/path-to-resource/{*}", "/another-path")) + .methods(Set.of(Rule.Methods.Enum.GET, Rule.Methods.POST, Rule.Methods.DELETE)) + .action(Rule.Action.Enum.BLOCK) + .blockResponseCode(404) + .blockResponseMessage("not found")) + .build(); + + Metric metric = mock(Metric.class); + RuleBasedRequestFilter filter = new RuleBasedRequestFilter(metric, config); + MockResponseHandler responseHandler = new MockResponseHandler(); + filter.filter(request("POST", "https://server1:443/path-to-resource/id/1/subid/2"), responseHandler); + + assertBlocked(responseHandler, metric, 404, "not found"); + } + + @Test + void no_filtering_if_request_is_allowed() { + RuleBasedFilterConfig config = new RuleBasedFilterConfig.Builder() + .dryrun(false) + .defaultRule(new DefaultRule.Builder() + .action(DefaultRule.Action.Enum.ALLOW)) + .build(); + + Metric metric = mock(Metric.class); + RuleBasedRequestFilter filter = new RuleBasedRequestFilter(metric, config); + MockResponseHandler responseHandler = new MockResponseHandler(); + filter.filter(request("DELETE", "http://myserver:80/"), responseHandler); + + assertAllowed(responseHandler, metric); + } + + private static DiscFilterRequest request(String method, String uri) { + DiscFilterRequest request = mock(DiscFilterRequest.class); + when(request.getMethod()).thenReturn(method); + when(request.getUri()).thenReturn(URI.create(uri)); + return request; + } + + private static void assertAllowed(MockResponseHandler handler, Metric metric) { + verify(metric).add(eq("jdisc.http.filter.rule.allowed_requests"), eq(1L), any()); + assertNull(handler.getResponse()); + } + + private static void assertBlocked(MockResponseHandler handler, Metric metric, int expectedCode, String expectedMessage) throws IOException { + verify(metric).add(eq("jdisc.http.filter.rule.blocked_requests"), eq(1L), any()); + Response response = handler.getResponse(); + assertNotNull(response); + assertEquals(expectedCode, response.getStatus()); + ObjectNode expectedJson = jsonMapper.createObjectNode(); + expectedJson.put("message", expectedMessage).put("code", expectedCode); + JsonNode actualJson = jsonMapper.readTree(handler.readAll().getBytes()); + JsonTestHelper.assertJsonEquals(expectedJson, actualJson); + } + +}
\ No newline at end of file diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/StorageMaintainer.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/StorageMaintainer.java index a5efec1dcb7..c30add928ba 100644 --- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/StorageMaintainer.java +++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/StorageMaintainer.java @@ -3,8 +3,13 @@ package com.yahoo.vespa.hosted.node.admin.maintenance; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; +import com.yahoo.config.provision.ApplicationId; import com.yahoo.config.provision.DockerImage; import com.yahoo.config.provision.NodeType; +import com.yahoo.vespa.flags.FetchVector; +import com.yahoo.vespa.flags.FlagSource; +import com.yahoo.vespa.flags.Flags; +import com.yahoo.vespa.flags.StringFlag; import com.yahoo.vespa.hosted.dockerapi.Container; import com.yahoo.vespa.hosted.dockerapi.ContainerName; import com.yahoo.vespa.hosted.node.admin.component.TaskContext; @@ -13,6 +18,8 @@ import com.yahoo.vespa.hosted.node.admin.maintenance.disk.CoredumpCleanupRule; import com.yahoo.vespa.hosted.node.admin.maintenance.disk.DiskCleanup; import com.yahoo.vespa.hosted.node.admin.maintenance.disk.DiskCleanupRule; import com.yahoo.vespa.hosted.node.admin.maintenance.disk.LinearCleanupRule; +import com.yahoo.vespa.hosted.node.admin.maintenance.sync.SyncClient; +import com.yahoo.vespa.hosted.node.admin.maintenance.sync.SyncFileInfo; import com.yahoo.vespa.hosted.node.admin.nodeadmin.ConvergenceException; import com.yahoo.vespa.hosted.node.admin.nodeagent.NodeAgentContext; import com.yahoo.vespa.hosted.node.admin.nodeagent.NodeAgentTask; @@ -31,6 +38,7 @@ import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -39,6 +47,7 @@ import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.logging.Level; import java.util.logging.Logger; +import java.util.stream.Collectors; import static com.yahoo.vespa.hosted.node.admin.maintenance.disk.DiskCleanupRule.Priority; import static com.yahoo.yolean.Exceptions.uncheck; @@ -53,9 +62,11 @@ public class StorageMaintainer { private final Terminal terminal; private final CoredumpHandler coredumpHandler; - private final Path archiveContainerStoragePath; private final DiskCleanup diskCleanup; + private final SyncClient syncClient; private final Clock clock; + private final Path archiveContainerStoragePath; + private final StringFlag syncBucketNameFlag; // We cache disk usage to avoid doing expensive disk operations so often private final Cache<ContainerName, DiskSize> diskUsage = CacheBuilder.newBuilder() @@ -63,16 +74,34 @@ public class StorageMaintainer { .expireAfterWrite(5, TimeUnit.MINUTES) .build(); - public StorageMaintainer(Terminal terminal, CoredumpHandler coredumpHandler, Path archiveContainerStoragePath) { - this(terminal, coredumpHandler, archiveContainerStoragePath, new DiskCleanup(), Clock.systemUTC()); - } - - public StorageMaintainer(Terminal terminal, CoredumpHandler coredumpHandler, Path archiveContainerStoragePath, DiskCleanup diskCleanup, Clock clock) { + public StorageMaintainer(Terminal terminal, CoredumpHandler coredumpHandler, DiskCleanup diskCleanup, + SyncClient syncClient, Clock clock, Path archiveContainerStoragePath, FlagSource flagSource) { this.terminal = terminal; this.coredumpHandler = coredumpHandler; - this.archiveContainerStoragePath = archiveContainerStoragePath; this.diskCleanup = diskCleanup; + this.syncClient = syncClient; this.clock = clock; + this.archiveContainerStoragePath = archiveContainerStoragePath; + this.syncBucketNameFlag = Flags.SYNC_HOST_LOGS_TO_S3_BUCKET.bindTo(flagSource); + } + + public boolean syncLogs(NodeAgentContext context) { + Optional<ApplicationId> app = context.node().owner(); + if (app.isEmpty()) return false; + String bucketName = syncBucketNameFlag + .with(FetchVector.Dimension.NODE_TYPE, NodeType.tenant.name()) + .with(FetchVector.Dimension.APPLICATION_ID, app.get().serializedForm()) + .value(); + if (bucketName.isBlank()) return false; + + List<SyncFileInfo> syncFileInfos = FileFinder.files(pathOnHostUnderContainerVespaHome(context, "logs/vespa")) + .maxDepth(2) + .stream() + .sorted(Comparator.comparing(FileFinder.FileAttributes::lastModifiedTime)) + .flatMap(fa -> SyncFileInfo.tenantLog(bucketName, app.get(), context.hostname(), fa.path()).stream()) + .collect(Collectors.toList()); + + return syncClient.sync(context, syncFileInfos, 1); } public Optional<DiskSize> diskUsageFor(NodeAgentContext context) { @@ -127,22 +156,20 @@ public class StorageMaintainer { Instant start = clock.instant(); double oneMonthSeconds = Duration.ofDays(30).getSeconds(); Function<Instant, Double> monthNormalizer = instant -> Duration.between(instant, start).getSeconds() / oneMonthSeconds; - Function<String, Path> pathOnHostUnderContainerVespaHome = path -> - context.pathOnHostFromPathInNode(context.pathInNodeUnderVespaHome(path)); List<DiskCleanupRule> rules = new ArrayList<>(); - rules.add(CoredumpCleanupRule.forContainer(pathOnHostUnderContainerVespaHome.apply("var/crash"))); + rules.add(CoredumpCleanupRule.forContainer(pathOnHostUnderContainerVespaHome(context, "var/crash"))); if (context.node().membership().map(m -> m.type().isContainer()).orElse(false)) - rules.add(new LinearCleanupRule(() -> FileFinder.files(pathOnHostUnderContainerVespaHome.apply("logs/vespa/qrs")).list(), + rules.add(new LinearCleanupRule(() -> FileFinder.files(pathOnHostUnderContainerVespaHome(context, "logs/vespa/qrs")).list(), fa -> monthNormalizer.apply(fa.lastModifiedTime()), Priority.LOWEST, Priority.HIGHEST)); if (context.nodeType() == NodeType.tenant && context.node().membership().map(m -> m.type().isAdmin()).orElse(false)) - rules.add(new LinearCleanupRule(() -> FileFinder.files(pathOnHostUnderContainerVespaHome.apply("logs/vespa/logarchive")).list(), + rules.add(new LinearCleanupRule(() -> FileFinder.files(pathOnHostUnderContainerVespaHome(context, "logs/vespa/logarchive")).list(), fa -> monthNormalizer.apply(fa.lastModifiedTime()), Priority.LOWEST, Priority.HIGHEST)); if (context.nodeType() == NodeType.proxy) - rules.add(new LinearCleanupRule(() -> FileFinder.files(pathOnHostUnderContainerVespaHome.apply("logs/nginx")).list(), + rules.add(new LinearCleanupRule(() -> FileFinder.files(pathOnHostUnderContainerVespaHome(context, "logs/nginx")).list(), fa -> monthNormalizer.apply(fa.lastModifiedTime()), Priority.LOWEST, Priority.MEDIUM)); return rules; @@ -212,4 +239,8 @@ public class StorageMaintainer { .orElse("<none>") ); } + + private static Path pathOnHostUnderContainerVespaHome(NodeAgentContext context, String path) { + return context.pathOnHostFromPathInNode(context.pathInNodeUnderVespaHome(path)); + } } diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/sync/SyncFileInfo.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/sync/SyncFileInfo.java index 3b88d28613c..19ec8c0e283 100644 --- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/sync/SyncFileInfo.java +++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/sync/SyncFileInfo.java @@ -4,11 +4,9 @@ package com.yahoo.vespa.hosted.node.admin.maintenance.sync; import com.yahoo.config.provision.ApplicationId; import com.yahoo.config.provision.HostName; -import java.io.IOException; -import java.io.InputStream; -import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.Optional; /** * @author freva @@ -18,55 +16,64 @@ public class SyncFileInfo { private final String bucketName; private final Path srcPath; private final Path destPath; - private final boolean compressWithZstd; + private final Compression uploadCompression; - private SyncFileInfo(String bucketName, Path srcPath, Path destPath, boolean compressWithZstd) { + private SyncFileInfo(String bucketName, Path srcPath, Path destPath, Compression uploadCompression) { this.bucketName = bucketName; this.srcPath = srcPath; this.destPath = destPath; - this.compressWithZstd = compressWithZstd; + this.uploadCompression = uploadCompression; } public String bucketName() { return bucketName; } + /** Source path of the file to sync */ public Path srcPath() { return srcPath; } + /** Remote path to store the file at */ public Path destPath() { return destPath; } - public InputStream inputStream() throws IOException { - InputStream is = Files.newInputStream(srcPath); - if (compressWithZstd) return new ZstdCompressingInputStream(is, 4 << 20); - return is; + /** Compression algorithm to use when uploading the file */ + public Compression uploadCompression() { + return uploadCompression; } + public static Optional<SyncFileInfo> tenantLog(String bucketName, ApplicationId applicationId, HostName hostName, Path logFile) { + String filename = logFile.getFileName().toString(); + Compression compression = Compression.NONE; + String dir = null; + + if (filename.startsWith("vespa.log-")) { + dir = "logs/vespa"; + compression = Compression.ZSTD; + } else if (filename.endsWith(".zst")) { + if (filename.startsWith("JsonAccessLog.") || filename.startsWith("access")) + dir = "logs/access"; + else if (filename.startsWith("ConnectionLog.")) + dir = "logs/connection"; + } - public static SyncFileInfo tenantVespaLog(String bucketName, ApplicationId applicationId, HostName hostName, Path vespaLogFile) { - return new SyncFileInfo(bucketName, vespaLogFile, destination(applicationId, hostName, "logs/vespa", vespaLogFile, ".zst"), true); - } - - public static SyncFileInfo tenantAccessLog(String bucketName, ApplicationId applicationId, HostName hostName, Path accessLogFile) { - return new SyncFileInfo(bucketName, accessLogFile, destination(applicationId, hostName, "logs/access", accessLogFile, null), false); + if (dir == null) return Optional.empty(); + return Optional.of(new SyncFileInfo( + bucketName, logFile, destination(applicationId, hostName, dir, logFile, compression), compression)); } public static SyncFileInfo infrastructureVespaLog(String bucketName, HostName hostName, Path vespaLogFile) { - return new SyncFileInfo(bucketName, vespaLogFile, destination(null, hostName, "logs/vespa", vespaLogFile, ".zst"), true); - } - - public static SyncFileInfo infrastructureAccessLog(String bucketName, HostName hostName, Path accessLogFile) { - return new SyncFileInfo(bucketName, accessLogFile, destination(null, hostName, "logs/access", accessLogFile, null), false); + Compression compression = Compression.ZSTD; + return new SyncFileInfo(bucketName, vespaLogFile, destination(null, hostName, "logs/vespa", vespaLogFile, compression), compression); } - private static Path destination(ApplicationId app, HostName hostName, String dir, Path filename, String extension) { - StringBuilder sb = new StringBuilder(100).append('/'); + private static Path destination(ApplicationId app, HostName hostName, String dir, Path filename, Compression uploadCompression) { + StringBuilder sb = new StringBuilder(100); if (app == null) sb.append("infrastructure"); - else sb.append(app.tenant().value()).append('.').append(app.application().value()).append('.').append(app.instance().value()); + else sb.append(app.tenant().value()).append('/').append(app.application().value()).append('/').append(app.instance().value()); sb.append('/'); for (char c: hostName.value().toCharArray()) { @@ -76,8 +83,17 @@ public class SyncFileInfo { sb.append('/').append(dir).append('/').append(filename.getFileName().toString()); - if (extension != null) sb.append(extension); + if (uploadCompression.extension != null) sb.append(uploadCompression.extension); return Paths.get(sb.toString()); } + + public enum Compression { + NONE(null), ZSTD(".zst"); + + private final String extension; + Compression(String extension) { + this.extension = extension; + } + } } diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImpl.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImpl.java index 24928b4c8a6..4bc6fae4e56 100644 --- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImpl.java +++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImpl.java @@ -499,6 +499,7 @@ public class NodeAgentImpl implements NodeAgent { orchestrator.resume(context.hostname().value()); suspendedInOrchestrator = false; } + storageMaintainer.syncLogs(context); break; case provisioned: nodeRepository.setNodeState(context.hostname().value(), NodeState.dirty); diff --git a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/maintenance/StorageMaintainerTest.java b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/maintenance/StorageMaintainerTest.java index cb615a94615..a17ffadcb45 100644 --- a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/maintenance/StorageMaintainerTest.java +++ b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/maintenance/StorageMaintainerTest.java @@ -3,8 +3,11 @@ package com.yahoo.vespa.hosted.node.admin.maintenance; import com.yahoo.config.provision.NodeResources; import com.yahoo.test.ManualClock; +import com.yahoo.vespa.flags.InMemoryFlagSource; import com.yahoo.vespa.hosted.node.admin.configserver.noderepository.NodeSpec; +import com.yahoo.vespa.hosted.node.admin.maintenance.coredump.CoredumpHandler; import com.yahoo.vespa.hosted.node.admin.maintenance.disk.DiskCleanup; +import com.yahoo.vespa.hosted.node.admin.maintenance.sync.SyncClient; import com.yahoo.vespa.hosted.node.admin.nodeagent.NodeAgentContext; import com.yahoo.vespa.hosted.node.admin.nodeagent.NodeAgentContextImpl; import com.yahoo.vespa.hosted.node.admin.task.util.file.FileFinder; @@ -13,8 +16,6 @@ import com.yahoo.vespa.hosted.node.admin.task.util.process.TestTerminal; import com.yahoo.vespa.test.file.TestFileSystem; import org.junit.After; import org.junit.Test; -import org.junit.experimental.runners.Enclosed; -import org.junit.runner.RunWith; import java.io.IOException; import java.nio.file.FileSystem; @@ -38,150 +39,141 @@ import static org.mockito.Mockito.verifyNoInteractions; /** * @author dybis */ -@RunWith(Enclosed.class) public class StorageMaintainerTest { - public static class DiskUsageTests { - - private final TestTerminal terminal = new TestTerminal(); - - @Test - public void testDiskUsed() throws IOException { - StorageMaintainer storageMaintainer = new StorageMaintainer(terminal, null, null); - FileSystem fileSystem = TestFileSystem.create(); - NodeAgentContext context = new NodeAgentContextImpl.Builder("host-1.domain.tld").fileSystem(fileSystem).build(); - Files.createDirectories(context.pathOnHostFromPathInNode("/")); + private final TestTerminal terminal = new TestTerminal(); + private final CoredumpHandler coredumpHandler = mock(CoredumpHandler.class); + private final DiskCleanup diskCleanup = mock(DiskCleanup.class); + private final SyncClient syncClient = mock(SyncClient.class); + private final ManualClock clock = new ManualClock(Instant.ofEpochSecond(1234567890)); + private final InMemoryFlagSource flagSource = new InMemoryFlagSource(); + private final FileSystem fileSystem = TestFileSystem.create(); + private final StorageMaintainer storageMaintainer = new StorageMaintainer(terminal, coredumpHandler, diskCleanup, syncClient, clock, + fileSystem.getPath("/home/docker/container-storage/container-archive"), flagSource); + + @Test + public void testDiskUsed() throws IOException { + NodeAgentContext context = new NodeAgentContextImpl.Builder("host-1.domain.tld").fileSystem(fileSystem).build(); + Files.createDirectories(context.pathOnHostFromPathInNode("/")); + + terminal.expectCommand("du -xsk /home/docker/container-storage/host-1 2>&1", 0, "321\t/home/docker/container-storage/host-1/"); + assertEquals(Optional.of(DiskSize.of(328_704)), storageMaintainer.diskUsageFor(context)); + + // Value should still be cached, no new execution against the terminal + assertEquals(Optional.of(DiskSize.of(328_704)), storageMaintainer.diskUsageFor(context)); + } - terminal.expectCommand("du -xsk /home/docker/container-storage/host-1 2>&1", 0, "321\t/home/docker/container-storage/host-1/"); - assertEquals(Optional.of(DiskSize.of(328_704)), storageMaintainer.diskUsageFor(context)); + @Test + public void testNonExistingDiskUsed() { + DiskSize size = storageMaintainer.getDiskUsed(null, Paths.get("/fake/path")); + assertEquals(DiskSize.ZERO, size); + } - // Value should still be cached, no new execution against the terminal - assertEquals(Optional.of(DiskSize.of(328_704)), storageMaintainer.diskUsageFor(context)); - } + @Test + public void archive_container_data_test() throws IOException { + // Create some files in containers + NodeAgentContext context1 = createNodeAgentContextAndContainerStorage(fileSystem, "container-1"); + createNodeAgentContextAndContainerStorage(fileSystem, "container-2"); + + Path pathToArchiveDir = fileSystem.getPath("/home/docker/container-storage/container-archive"); + Files.createDirectories(pathToArchiveDir); + + Path containerStorageRoot = context1.pathOnHostFromPathInNode("/").getParent(); + Set<String> containerStorageRootContentsBeforeArchive = FileFinder.from(containerStorageRoot) + .maxDepth(1) + .stream() + .map(FileFinder.FileAttributes::filename) + .collect(Collectors.toSet()); + assertEquals(Set.of("container-archive", "container-1", "container-2"), containerStorageRootContentsBeforeArchive); + + + // Archive container-1 + storageMaintainer.archiveNodeStorage(context1); + + clock.advance(Duration.ofSeconds(3)); + storageMaintainer.archiveNodeStorage(context1); + + // container-1 should be gone from container-storage + Set<String> containerStorageRootContentsAfterArchive = FileFinder.from(containerStorageRoot) + .maxDepth(1) + .stream() + .map(FileFinder.FileAttributes::filename) + .collect(Collectors.toSet()); + assertEquals(Set.of("container-archive", "container-2"), containerStorageRootContentsAfterArchive); + + // container archive directory should contain exactly 1 directory - the one we just archived + List<FileFinder.FileAttributes> containerArchiveContentsAfterArchive = FileFinder.from(pathToArchiveDir).maxDepth(1).list(); + assertEquals(1, containerArchiveContentsAfterArchive.size()); + Path archivedContainerStoragePath = containerArchiveContentsAfterArchive.get(0).path(); + assertEquals("container-1_20090213233130", archivedContainerStoragePath.getFileName().toString()); + Set<String> archivedContainerStorageContents = FileFinder.files(archivedContainerStoragePath) + .stream() + .map(fileAttributes -> archivedContainerStoragePath.relativize(fileAttributes.path()).toString()) + .collect(Collectors.toSet()); + assertEquals(Set.of("opt/vespa/logs/vespa/vespa.log", "opt/vespa/logs/vespa/zookeeper.log"), archivedContainerStorageContents); + } - @Test - public void testNonExistingDiskUsed() { - StorageMaintainer storageMaintainer = new StorageMaintainer(terminal, null, null); - DiskSize size = storageMaintainer.getDiskUsed(null, Paths.get("/fake/path")); - assertEquals(DiskSize.ZERO, size); - } + private static NodeAgentContext createNodeAgentContextAndContainerStorage(FileSystem fileSystem, String containerName) throws IOException { + NodeAgentContext context = new NodeAgentContextImpl.Builder(containerName + ".domain.tld") + .fileSystem(fileSystem).build(); - @After - public void after() { - terminal.verifyAllCommandsExecuted(); - } + Path containerVespaHomeOnHost = context.pathOnHostFromPathInNode(context.pathInNodeUnderVespaHome("")); + Files.createDirectories(context.pathOnHostFromPathInNode("/etc/something")); + Files.createFile(context.pathOnHostFromPathInNode("/etc/something/conf")); + + Files.createDirectories(containerVespaHomeOnHost.resolve("logs/vespa")); + Files.createFile(containerVespaHomeOnHost.resolve("logs/vespa/vespa.log")); + Files.createFile(containerVespaHomeOnHost.resolve("logs/vespa/zookeeper.log")); + + Files.createDirectories(containerVespaHomeOnHost.resolve("var/db")); + Files.createFile(containerVespaHomeOnHost.resolve("var/db/some-file")); + + Path containerRootOnHost = context.pathOnHostFromPathInNode("/"); + Set<String> actualContents = FileFinder.files(containerRootOnHost) + .stream() + .map(fileAttributes -> containerRootOnHost.relativize(fileAttributes.path()).toString()) + .collect(Collectors.toSet()); + Set<String> expectedContents = Set.of( + "etc/something/conf", + "opt/vespa/logs/vespa/vespa.log", + "opt/vespa/logs/vespa/zookeeper.log", + "opt/vespa/var/db/some-file"); + assertEquals(expectedContents, actualContents); + return context; } - public static class ArchiveContainerDataTests { - @Test - public void archive_container_data_test() throws IOException { - // Create some files in containers - FileSystem fileSystem = TestFileSystem.create(); - NodeAgentContext context1 = createNodeAgentContextAndContainerStorage(fileSystem, "container-1"); - createNodeAgentContextAndContainerStorage(fileSystem, "container-2"); - - Path pathToArchiveDir = fileSystem.getPath("/home/docker/container-storage/container-archive"); - Files.createDirectories(pathToArchiveDir); - - Path containerStorageRoot = context1.pathOnHostFromPathInNode("/").getParent(); - Set<String> containerStorageRootContentsBeforeArchive = FileFinder.from(containerStorageRoot) - .maxDepth(1) - .stream() - .map(FileFinder.FileAttributes::filename) - .collect(Collectors.toSet()); - assertEquals(Set.of("container-archive", "container-1", "container-2"), containerStorageRootContentsBeforeArchive); - - - // Archive container-1 - ManualClock clock = new ManualClock(Instant.ofEpochSecond(1234567890)); - StorageMaintainer storageMaintainer = new StorageMaintainer(null, null, pathToArchiveDir, null, clock); - storageMaintainer.archiveNodeStorage(context1); - - clock.advance(Duration.ofSeconds(3)); - storageMaintainer.archiveNodeStorage(context1); - - // container-1 should be gone from container-storage - Set<String> containerStorageRootContentsAfterArchive = FileFinder.from(containerStorageRoot) - .maxDepth(1) - .stream() - .map(FileFinder.FileAttributes::filename) - .collect(Collectors.toSet()); - assertEquals(Set.of("container-archive", "container-2"), containerStorageRootContentsAfterArchive); - - // container archive directory should contain exactly 1 directory - the one we just archived - List<FileFinder.FileAttributes> containerArchiveContentsAfterArchive = FileFinder.from(pathToArchiveDir).maxDepth(1).list(); - assertEquals(1, containerArchiveContentsAfterArchive.size()); - Path archivedContainerStoragePath = containerArchiveContentsAfterArchive.get(0).path(); - assertEquals("container-1_20090213233130", archivedContainerStoragePath.getFileName().toString()); - Set<String> archivedContainerStorageContents = FileFinder.files(archivedContainerStoragePath) - .stream() - .map(fileAttributes -> archivedContainerStoragePath.relativize(fileAttributes.path()).toString()) - .collect(Collectors.toSet()); - assertEquals(Set.of("opt/vespa/logs/vespa/vespa.log", "opt/vespa/logs/vespa/zookeeper.log"), archivedContainerStorageContents); - } - - private NodeAgentContext createNodeAgentContextAndContainerStorage(FileSystem fileSystem, String containerName) throws IOException { - NodeAgentContext context = new NodeAgentContextImpl.Builder(containerName + ".domain.tld") - .fileSystem(fileSystem).build(); - - Path containerVespaHomeOnHost = context.pathOnHostFromPathInNode(context.pathInNodeUnderVespaHome("")); - Files.createDirectories(context.pathOnHostFromPathInNode("/etc/something")); - Files.createFile(context.pathOnHostFromPathInNode("/etc/something/conf")); - - Files.createDirectories(containerVespaHomeOnHost.resolve("logs/vespa")); - Files.createFile(containerVespaHomeOnHost.resolve("logs/vespa/vespa.log")); - Files.createFile(containerVespaHomeOnHost.resolve("logs/vespa/zookeeper.log")); - - Files.createDirectories(containerVespaHomeOnHost.resolve("var/db")); - Files.createFile(containerVespaHomeOnHost.resolve("var/db/some-file")); - - Path containerRootOnHost = context.pathOnHostFromPathInNode("/"); - Set<String> actualContents = FileFinder.files(containerRootOnHost) - .stream() - .map(fileAttributes -> containerRootOnHost.relativize(fileAttributes.path()).toString()) - .collect(Collectors.toSet()); - Set<String> expectedContents = Set.of( - "etc/something/conf", - "opt/vespa/logs/vespa/vespa.log", - "opt/vespa/logs/vespa/zookeeper.log", - "opt/vespa/var/db/some-file"); - assertEquals(expectedContents, actualContents); - return context; - } - } + @Test + public void not_run_if_not_enough_used() throws IOException { + NodeAgentContext context = new NodeAgentContextImpl.Builder( + NodeSpec.Builder.testSpec("h123a.domain.tld").resources(new NodeResources(1, 1, 1, 1)).build()) + .fileSystem(fileSystem).build(); + Files.createDirectories(context.pathOnHostFromPathInNode("/")); + mockDiskUsage(500L); - public static class CleanupTests { + storageMaintainer.cleanDiskIfFull(context); + verifyNoInteractions(diskCleanup); + } - private final TestTerminal terminal = new TestTerminal(); - private final DiskCleanup diskCleanup = mock(DiskCleanup.class); - private final ManualClock clock = new ManualClock(); - private final StorageMaintainer storageMaintainer = new StorageMaintainer(terminal, null, null, diskCleanup, clock); - private final FileSystem fileSystem = TestFileSystem.create(); - private final NodeAgentContext context = new NodeAgentContextImpl - .Builder(NodeSpec.Builder.testSpec("h123a.domain.tld").resources(new NodeResources(1, 1, 1, 1)).build()) + @Test + public void deletes_correct_amount() throws IOException { + NodeAgentContext context = new NodeAgentContextImpl.Builder( + NodeSpec.Builder.testSpec("h123a.domain.tld").resources(new NodeResources(1, 1, 1, 1)).build()) .fileSystem(fileSystem).build(); - @Test - public void not_run_if_not_enough_used() throws IOException { - Files.createDirectories(context.pathOnHostFromPathInNode("/")); - mockDiskUsage(500L); + Files.createDirectories(context.pathOnHostFromPathInNode("/")); + mockDiskUsage(950_000L); - storageMaintainer.cleanDiskIfFull(context); - verifyNoInteractions(diskCleanup); - } - - @Test - public void deletes_correct_amount() throws IOException { - Files.createDirectories(context.pathOnHostFromPathInNode("/")); - mockDiskUsage(950_000L); + storageMaintainer.cleanDiskIfFull(context); + // Allocated size: 1 GB, usage: 950_000 kiB (972.8 MB). Wanted usage: 70% => 700 MB + verify(diskCleanup).cleanup(eq(context), any(), eq(272_800_000L)); + } - storageMaintainer.cleanDiskIfFull(context); - // Allocated size: 1 GB, usage: 950_000 kiB (972.8 MB). Wanted usage: 70% => 700 MB - verify(diskCleanup).cleanup(eq(context), any(), eq(272_800_000L)); - } + @After + public void after() { + terminal.verifyAllCommandsExecuted(); + } - private void mockDiskUsage(long kBytes) { - terminal.expectCommand("du -xsk /home/docker/container-storage/h123a 2>&1", 0, kBytes + "\t/path"); - } + private void mockDiskUsage(long kBytes) { + terminal.expectCommand("du -xsk /home/docker/container-storage/h123a 2>&1", 0, kBytes + "\t/path"); } } diff --git a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/maintenance/sync/SyncFileInfoTest.java b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/maintenance/sync/SyncFileInfoTest.java index 0d596a46d77..e32fc8e5355 100644 --- a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/maintenance/sync/SyncFileInfoTest.java +++ b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/maintenance/sync/SyncFileInfoTest.java @@ -4,19 +4,15 @@ package com.yahoo.vespa.hosted.node.admin.maintenance.sync; import com.yahoo.config.provision.ApplicationId; import com.yahoo.config.provision.HostName; import com.yahoo.vespa.test.file.TestFileSystem; -import org.junit.BeforeClass; import org.junit.Test; -import java.io.IOException; -import java.io.InputStream; -import java.io.UncheckedIOException; import java.nio.file.FileSystem; -import java.nio.file.Files; import java.nio.file.Path; -import java.nio.file.Paths; +import java.util.Optional; +import static com.yahoo.vespa.hosted.node.admin.maintenance.sync.SyncFileInfo.Compression.NONE; +import static com.yahoo.vespa.hosted.node.admin.maintenance.sync.SyncFileInfo.Compression.ZSTD; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; /** * @author freva @@ -26,54 +22,41 @@ public class SyncFileInfoTest { private static final FileSystem fileSystem = TestFileSystem.create(); private static final String bucket = "logs-region-acdf21"; - private static final ApplicationId application = ApplicationId.from("tenant", "application", "instance"); + private static final ApplicationId application = ApplicationId.from("ten", "app", "ins"); private static final HostName hostname = HostName.from("h12352a.env.region-1.vespa.domain.example"); - private static final Path accessLogPath = fileSystem.getPath("/opt/vespa/logs/qrs/access.json-20210212.zst"); - private static final Path vespaLogPath = fileSystem.getPath("/opt/vespa/logs/vespa.log-2021-02-12"); + private static final Path accessLogPath1 = fileSystem.getPath("/opt/vespa/logs/qrs/access.log.20210211"); + private static final Path accessLogPath2 = fileSystem.getPath("/opt/vespa/logs/qrs/access.log.20210212.zst"); + private static final Path accessLogPath3 = fileSystem.getPath("/opt/vespa/logs/qrs/access-json.log.20210213.zst"); + private static final Path accessLogPath4 = fileSystem.getPath("/opt/vespa/logs/qrs/JsonAccessLog.default.20210214.zst"); + private static final Path connectionLogPath1 = fileSystem.getPath("/opt/vespa/logs/qrs/ConnectionLog.default.20210210"); + private static final Path connectionLogPath2 = fileSystem.getPath("/opt/vespa/logs/qrs/ConnectionLog.default.20210212.zst"); + private static final Path vespaLogPath1 = fileSystem.getPath("/opt/vespa/logs/vespa.log"); + private static final Path vespaLogPath2 = fileSystem.getPath("/opt/vespa/logs/vespa.log-2021-02-12"); @Test - public void tenant_access_log() { - SyncFileInfo sfi = SyncFileInfo.tenantAccessLog(bucket, application, hostname, accessLogPath); - assertEquals(Paths.get("/tenant.application.instance/h12352a/logs/access/access.json-20210212.zst"), sfi.destPath()); - assertEquals(bucket, sfi.bucketName()); - assertNotEquals(ZstdCompressingInputStream.class, getInputStreamType(sfi)); - } + public void tenant_log() { + assertTenantSyncFileInfo(accessLogPath1, null, null); + assertTenantSyncFileInfo(accessLogPath2, "ten/app/ins/h12352a/logs/access/access.log.20210212.zst", NONE); + assertTenantSyncFileInfo(accessLogPath3, "ten/app/ins/h12352a/logs/access/access-json.log.20210213.zst", NONE); + assertTenantSyncFileInfo(accessLogPath4, "ten/app/ins/h12352a/logs/access/JsonAccessLog.default.20210214.zst", NONE); - @Test - public void tenant_vespa_log() { - SyncFileInfo sfi = SyncFileInfo.tenantVespaLog(bucket, application, hostname, vespaLogPath); - assertEquals(Paths.get("/tenant.application.instance/h12352a/logs/vespa/vespa.log-2021-02-12.zst"), sfi.destPath()); - assertEquals(ZstdCompressingInputStream.class, getInputStreamType(sfi)); - } + assertTenantSyncFileInfo(connectionLogPath1, null, null); + assertTenantSyncFileInfo(connectionLogPath2, "ten/app/ins/h12352a/logs/connection/ConnectionLog.default.20210212.zst", NONE); - @Test - public void infra_access_log() { - SyncFileInfo sfi = SyncFileInfo.infrastructureAccessLog(bucket, hostname, accessLogPath); - assertEquals(Paths.get("/infrastructure/h12352a/logs/access/access.json-20210212.zst"), sfi.destPath()); - assertNotEquals(ZstdCompressingInputStream.class, getInputStreamType(sfi)); + assertTenantSyncFileInfo(vespaLogPath1, null, null); + assertTenantSyncFileInfo(vespaLogPath2, "ten/app/ins/h12352a/logs/vespa/vespa.log-2021-02-12.zst", ZSTD); } @Test public void infra_vespa_log() { - SyncFileInfo sfi = SyncFileInfo.infrastructureVespaLog(bucket, hostname, vespaLogPath); - assertEquals(Paths.get("/infrastructure/h12352a/logs/vespa/vespa.log-2021-02-12.zst"), sfi.destPath()); - assertEquals(ZstdCompressingInputStream.class, getInputStreamType(sfi)); + SyncFileInfo sfi = SyncFileInfo.infrastructureVespaLog(bucket, hostname, vespaLogPath2); + assertEquals("infrastructure/h12352a/logs/vespa/vespa.log-2021-02-12.zst", sfi.destPath().toString()); + assertEquals(ZSTD, sfi.uploadCompression()); } - @BeforeClass - public static void setup() throws IOException { - Files.createDirectories(vespaLogPath.getParent()); - Files.createFile(vespaLogPath); - Files.createDirectories(accessLogPath.getParent()); - Files.createFile(accessLogPath); + private static void assertTenantSyncFileInfo(Path srcPath, String destPath, SyncFileInfo.Compression compression) { + Optional<SyncFileInfo> sfi = SyncFileInfo.tenantLog(bucket, application, hostname, srcPath); + assertEquals(destPath, sfi.map(SyncFileInfo::destPath).map(Path::toString).orElse(null)); + assertEquals(compression, sfi.map(SyncFileInfo::uploadCompression).orElse(null)); } - - private static Class<? extends InputStream> getInputStreamType(SyncFileInfo syncFileInfo) { - try (InputStream inputStream = syncFileInfo.inputStream()) { - return inputStream.getClass(); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } - -}
\ No newline at end of file +} diff --git a/searchcore/src/tests/proton/documentdb/documentbucketmover/CMakeLists.txt b/searchcore/src/tests/proton/documentdb/documentbucketmover/CMakeLists.txt index 1aa0b1c585d..d11da09b737 100644 --- a/searchcore/src/tests/proton/documentdb/documentbucketmover/CMakeLists.txt +++ b/searchcore/src/tests/proton/documentdb/documentbucketmover/CMakeLists.txt @@ -17,6 +17,18 @@ vespa_add_executable(searchcore_documentbucketmover_test_app TEST ) vespa_add_test(NAME searchcore_documentbucketmover_test_app COMMAND searchcore_documentbucketmover_test_app) +vespa_add_executable(searchcore_documentbucketmover_v2_test_app TEST + SOURCES + documentbucketmover_v2_test.cpp + DEPENDS + searchcore_bucketmover_test + searchcore_test + searchcore_server + searchcore_feedoperation + GTest::GTest +) +vespa_add_test(NAME searchcore_documentbucketmover_v2_test_app COMMAND searchcore_documentbucketmover_v2_test_app) + vespa_add_executable(searchcore_scaniterator_test_app TEST SOURCES scaniterator_test.cpp diff --git a/searchcore/src/tests/proton/documentdb/documentbucketmover/bucketmover_common.h b/searchcore/src/tests/proton/documentdb/documentbucketmover/bucketmover_common.h index 65e206d7327..e2cd5e268d7 100644 --- a/searchcore/src/tests/proton/documentdb/documentbucketmover/bucketmover_common.h +++ b/searchcore/src/tests/proton/documentdb/documentbucketmover/bucketmover_common.h @@ -82,7 +82,7 @@ struct MyDocumentRetriever : public DocumentRetrieverBaseForTest { struct MyBucketModifiedHandler : public IBucketModifiedHandler { using BucketId = document::BucketId; - BucketId::List _modified; + std::vector<BucketId> _modified; void notifyBucketModified(const BucketId &bucket) override; diff --git a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp index 502639d1dca..5556ed0d475 100644 --- a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp +++ b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp @@ -112,7 +112,7 @@ struct ControllerFixtureBase : public ::testing::Test const MoveOperationVector &docsMoved() const { return _moveHandler._moves; } - const BucketId::List &bucketsModified() const { + const std::vector<BucketId> &bucketsModified() const { return _modifiedHandler._modified; } const BucketId::List &calcAsked() const { diff --git a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_v2_test.cpp b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_v2_test.cpp new file mode 100644 index 00000000000..e7dc9b9e873 --- /dev/null +++ b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_v2_test.cpp @@ -0,0 +1,578 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "bucketmover_common.h" +#include <vespa/searchcore/proton/server/bucketmovejobv2.h> +#include <vespa/persistence/dummyimpl/dummy_bucket_executor.h> +#include <vespa/vespalib/gtest/gtest.h> + +#include <vespa/log/log.h> +LOG_SETUP("document_bucket_mover_test"); + +using namespace proton; +using namespace proton::move::test; +using document::BucketId; +using document::test::makeBucketSpace; +using proton::bucketdb::BucketCreateNotifier; +using storage::spi::BucketInfo; +using BlockedReason = IBlockableMaintenanceJob::BlockedReason; +using MoveOperationVector = std::vector<MoveOperation>; +using storage::spi::dummy::DummyBucketExecutor; +using vespalib::ThreadStackExecutor; + +struct ControllerFixtureBase : public ::testing::Test +{ + test::UserDocumentsBuilder _builder; + test::BucketStateCalculator::SP _calc; + test::ClusterStateHandler _clusterStateHandler; + test::BucketHandler _bucketHandler; + MyBucketModifiedHandler _modifiedHandler; + std::shared_ptr<bucketdb::BucketDBOwner> _bucketDB; + MySubDb _ready; + MySubDb _notReady; + BucketCreateNotifier _bucketCreateNotifier; + test::DiskMemUsageNotifier _diskMemUsageNotifier; + ThreadStackExecutor _singleExecutor; + ExecutorThreadService _master; + DummyBucketExecutor _bucketExecutor; + MyMoveHandler _moveHandler; + BucketMoveJobV2 _bmj; + MyCountJobRunner _runner; + ControllerFixtureBase(const BlockableMaintenanceJobConfig &blockableConfig, bool storeMoveDoneContexts); + ~ControllerFixtureBase(); + ControllerFixtureBase &addReady(const BucketId &bucket) { + _calc->addReady(bucket); + return *this; + } + ControllerFixtureBase &remReady(const BucketId &bucket) { + _calc->remReady(bucket); + return *this; + } + ControllerFixtureBase &changeCalc() { + _calc->resetAsked(); + _moveHandler.reset(); + _modifiedHandler.reset(); + _clusterStateHandler.notifyClusterStateChanged(_calc); + return *this; + } + ControllerFixtureBase &activateBucket(const BucketId &bucket) { + _ready.setBucketState(bucket, true); + _bucketHandler.notifyBucketStateChanged(bucket, BucketInfo::ActiveState::ACTIVE); + return *this; + } + ControllerFixtureBase &deactivateBucket(const BucketId &bucket) { + _ready.setBucketState(bucket, false); + _bucketHandler.notifyBucketStateChanged(bucket, BucketInfo::ActiveState::NOT_ACTIVE); + return *this; + } + const MoveOperationVector &docsMoved() const { + return _moveHandler._moves; + } + const std::vector<BucketId> &bucketsModified() const { + return _modifiedHandler._modified; + } + const BucketId::List &calcAsked() const { + return _calc->asked(); + } + void runLoop() { + while (!_bmj.isBlocked() && !_bmj.run()) { + } + } + void sync() { + _bucketExecutor.sync(); + _master.sync(); + _master.sync(); // Handle that master schedules onto master again + } +}; + +ControllerFixtureBase::ControllerFixtureBase(const BlockableMaintenanceJobConfig &blockableConfig, bool storeMoveDoneContexts) + : _builder(), + _calc(std::make_shared<test::BucketStateCalculator>()), + _bucketHandler(), + _modifiedHandler(), + _bucketDB(std::make_shared<bucketdb::BucketDBOwner>()), + _ready(_builder.getRepo(), _bucketDB, 1, SubDbType::READY), + _notReady(_builder.getRepo(), _bucketDB, 2, SubDbType::NOTREADY), + _bucketCreateNotifier(), + _diskMemUsageNotifier(), + _singleExecutor(1, 0x10000), + _master(_singleExecutor), + _bucketExecutor(4), + _moveHandler(*_bucketDB, storeMoveDoneContexts), + _bmj(_calc, _moveHandler, _modifiedHandler, _master, _bucketExecutor, _ready._subDb, + _notReady._subDb, _bucketCreateNotifier, _clusterStateHandler, _bucketHandler, + _diskMemUsageNotifier, blockableConfig, + "test", makeBucketSpace()), + _runner(_bmj) +{ +} + +ControllerFixtureBase::~ControllerFixtureBase() = default; +constexpr double RESOURCE_LIMIT_FACTOR = 1.0; +constexpr uint32_t MAX_OUTSTANDING_OPS = 10; +const BlockableMaintenanceJobConfig BLOCKABLE_CONFIG(RESOURCE_LIMIT_FACTOR, MAX_OUTSTANDING_OPS); + +struct ControllerFixture : public ControllerFixtureBase +{ + ControllerFixture(const BlockableMaintenanceJobConfig &blockableConfig = BLOCKABLE_CONFIG) + : ControllerFixtureBase(blockableConfig, blockableConfig.getMaxOutstandingMoveOps() != MAX_OUTSTANDING_OPS) + { + _builder.createDocs(1, 1, 4); // 3 docs + _builder.createDocs(2, 4, 6); // 2 docs + _ready.insertDocs(_builder.getDocs()); + _builder.clearDocs(); + _builder.createDocs(3, 1, 3); // 2 docs + _builder.createDocs(4, 3, 6); // 3 docs + _notReady.insertDocs(_builder.getDocs()); + } +}; + +struct OnlyReadyControllerFixture : public ControllerFixtureBase +{ + OnlyReadyControllerFixture() : ControllerFixtureBase(BLOCKABLE_CONFIG, false) + { + _builder.createDocs(1, 1, 2); // 1 docs + _builder.createDocs(2, 2, 4); // 2 docs + _builder.createDocs(3, 4, 7); // 3 docs + _builder.createDocs(4, 7, 11); // 4 docs + _ready.insertDocs(_builder.getDocs()); + } +}; + +TEST_F(ControllerFixture, require_that_nothing_is_moved_if_bucket_state_says_so) +{ + EXPECT_TRUE(_bmj.done()); + addReady(_ready.bucket(1)); + addReady(_ready.bucket(2)); + _bmj.recompute(); + EXPECT_TRUE(_bmj.scanAndMove(4, 3)); + EXPECT_TRUE(_bmj.done()); + EXPECT_TRUE(docsMoved().empty()); + EXPECT_TRUE(bucketsModified().empty()); +} + +TEST_F(ControllerFixture, require_that_not_ready_bucket_is_moved_to_ready_if_bucket_state_says_so) +{ + // bucket 4 should be moved + addReady(_ready.bucket(1)); + addReady(_ready.bucket(2)); + addReady(_notReady.bucket(4)); + _bmj.recompute(); + EXPECT_FALSE(_bmj.done()); + EXPECT_TRUE(_bmj.scanAndMove(4, 3)); + EXPECT_TRUE(_bmj.done()); + sync(); + EXPECT_EQ(3u, docsMoved().size()); + assertEqual(_notReady.bucket(4), _notReady.docs(4)[0], 2, 1, docsMoved()[0]); + assertEqual(_notReady.bucket(4), _notReady.docs(4)[1], 2, 1, docsMoved()[1]); + assertEqual(_notReady.bucket(4), _notReady.docs(4)[2], 2, 1, docsMoved()[2]); + ASSERT_EQ(1u, bucketsModified().size()); + EXPECT_EQ(_notReady.bucket(4), bucketsModified()[0]); +} + +TEST_F(ControllerFixture, require_that_ready_bucket_is_moved_to_not_ready_if_bucket_state_says_so) +{ + // bucket 2 should be moved + addReady(_ready.bucket(1)); + _bmj.recompute(); + EXPECT_FALSE(_bmj.done()); + EXPECT_TRUE(_bmj.scanAndMove(4, 3)); + EXPECT_TRUE(_bmj.done()); + sync(); + EXPECT_EQ(2u, docsMoved().size()); + assertEqual(_ready.bucket(2), _ready.docs(2)[0], 1, 2, docsMoved()[0]); + assertEqual(_ready.bucket(2), _ready.docs(2)[1], 1, 2, docsMoved()[1]); + EXPECT_EQ(1u, bucketsModified().size()); + EXPECT_EQ(_ready.bucket(2), bucketsModified()[0]); +} + + +TEST_F(ControllerFixture, require_that_we_move_buckets_in_several_steps) +{ + // bucket 2, 3, and 4 should be moved + addReady(_ready.bucket(1)); + addReady(_notReady.bucket(3)); + addReady(_notReady.bucket(4)); + + _bmj.recompute(); + EXPECT_FALSE(_bmj.done()); + + EXPECT_FALSE(_bmj.scanAndMove(1, 2)); + EXPECT_FALSE(_bmj.done()); + sync(); + EXPECT_EQ(2u, docsMoved().size()); + + EXPECT_FALSE(_bmj.scanAndMove(1, 2)); + EXPECT_FALSE(_bmj.done()); + sync(); + EXPECT_EQ(4u, docsMoved().size()); + + EXPECT_FALSE(_bmj.scanAndMove(1, 2)); + EXPECT_FALSE(_bmj.done()); + sync(); + EXPECT_EQ(6u, docsMoved().size()); + + // move bucket 4, docs 3 + EXPECT_TRUE(_bmj.scanAndMove(1,2)); + EXPECT_TRUE(_bmj.done()); + sync(); + EXPECT_EQ(7u, docsMoved().size()); + EXPECT_EQ(3u, bucketsModified().size()); + EXPECT_EQ(_ready.bucket(2), bucketsModified()[0]); + EXPECT_EQ(_notReady.bucket(3), bucketsModified()[1]); + EXPECT_EQ(_notReady.bucket(4), bucketsModified()[2]); +} + +TEST_F(ControllerFixture, require_that_last_bucket_is_moved_before_reporting_done) +{ + // bucket 4 should be moved + addReady(_ready.bucket(1)); + addReady(_ready.bucket(2)); + addReady(_notReady.bucket(4)); + _bmj.recompute(); + EXPECT_FALSE(_bmj.done()); + + EXPECT_FALSE(_bmj.scanAndMove(1, 1)); + EXPECT_FALSE(_bmj.done()); + sync(); + EXPECT_EQ(1u, docsMoved().size()); + EXPECT_EQ(4u, calcAsked().size()); + EXPECT_TRUE(_bmj.scanAndMove(1, 2)); + EXPECT_TRUE(_bmj.done()); + sync(); + EXPECT_EQ(3u, docsMoved().size()); + EXPECT_EQ(4u, calcAsked().size()); +} + + +TEST_F(ControllerFixture, require_that_active_bucket_is_not_moved_from_ready_to_not_ready_until_being_not_active) +{ + // bucket 1 should be moved but is active + addReady(_ready.bucket(2)); + _bmj.recompute(); + EXPECT_FALSE(_bmj.done()); + activateBucket(_ready.bucket(1)); + EXPECT_TRUE(_bmj.scanAndMove(4, 3)); // scan all, delay active bucket 1 + EXPECT_TRUE(_bmj.done()); + sync(); + EXPECT_EQ(0u, docsMoved().size()); + EXPECT_EQ(0u, bucketsModified().size()); + + deactivateBucket(_ready.bucket(1)); + EXPECT_FALSE(_bmj.done()); + EXPECT_TRUE(_bmj.scanAndMove(4, 3)); // move delayed and de-activated bucket 1 + EXPECT_TRUE(_bmj.done()); + sync(); + EXPECT_EQ(3u, docsMoved().size()); + EXPECT_EQ(1u, bucketsModified().size()); + EXPECT_EQ(_ready.bucket(1), bucketsModified()[0]); +} + +TEST_F(ControllerFixture, require_that_current_bucket_moving_is_cancelled_when_we_change_calculator) +{ + // bucket 1 should be moved + addReady(_ready.bucket(2)); + _bmj.recompute(); + _bmj.scanAndMove(1, 1); + EXPECT_FALSE(_bmj.done()); + sync(); + EXPECT_EQ(1u, docsMoved().size()); + EXPECT_EQ(4u, calcAsked().size()); + changeCalc(); // Not cancelled, bucket 1 still moving to notReady + EXPECT_EQ(4u, calcAsked().size()); + EXPECT_EQ(_ready.bucket(1), calcAsked()[0]); + _calc->resetAsked(); + _bmj.scanAndMove(1, 1); + EXPECT_FALSE(_bmj.done()); + sync(); + EXPECT_EQ(1u, docsMoved().size()); + EXPECT_EQ(0u, calcAsked().size()); + addReady(_ready.bucket(1)); + changeCalc(); // cancelled, bucket 1 no longer moving to notReady + EXPECT_EQ(4u, calcAsked().size()); + EXPECT_EQ(_ready.bucket(1), calcAsked()[0]); + _calc->resetAsked(); + remReady(_ready.bucket(1)); + _calc->resetAsked(); + changeCalc(); // not cancelled. No active bucket move + EXPECT_EQ(4u, calcAsked().size()); + _bmj.scanAndMove(1, 1); + sync(); + EXPECT_EQ(1u, docsMoved().size()); + EXPECT_EQ(4u, calcAsked().size()); + EXPECT_EQ(_ready.bucket(2), calcAsked()[1]); + EXPECT_EQ(_notReady.bucket(3), calcAsked()[2]); + _bmj.scanAndMove(2, 3); + EXPECT_TRUE(_bmj.done()); + sync(); + EXPECT_EQ(3u, docsMoved().size()); + EXPECT_EQ(4u, calcAsked().size()); + EXPECT_EQ(_notReady.bucket(4), calcAsked()[3]); + EXPECT_EQ(_ready.bucket(1), calcAsked()[0]); +} + +TEST_F(ControllerFixture, require_that_de_activated_bucket_is_not_moved_if_new_calculator_does_not_say_so) +{ + // bucket 1 should be moved + addReady(_ready.bucket(2)); + _bmj.recompute(); + activateBucket(_ready.bucket(1)); + _bmj.scanAndMove(4, 3); // scan all, delay active bucket 1 + sync(); + EXPECT_EQ(0u, docsMoved().size()); + EXPECT_EQ(0u, bucketsModified().size()); + + deactivateBucket(_ready.bucket(1)); + addReady(_ready.bucket(1)); + changeCalc(); + _bmj.scanAndMove(4, 3); // consider delayed bucket 3 + sync(); + EXPECT_EQ(0u, docsMoved().size()); + EXPECT_EQ(0u, bucketsModified().size()); + EXPECT_EQ(4u, calcAsked().size()); + EXPECT_EQ(_ready.bucket(1), calcAsked()[0]); +} + +TEST_F(ControllerFixture, ready_bucket_not_moved_to_not_ready_if_node_is_marked_as_retired) +{ + _calc->setNodeRetired(true); + // Bucket 2 would be moved from ready to not ready in a non-retired case, but not when retired. + addReady(_ready.bucket(1)); + _bmj.recompute(); + _bmj.scanAndMove(4, 3); + EXPECT_TRUE(_bmj.done()); + sync(); + EXPECT_EQ(0u, docsMoved().size()); +} + +// Technically this should never happen since a retired node is never in the ideal state, +// but test this case for the sake of completion. +TEST_F(ControllerFixture, inactive_not_ready_bucket_not_moved_to_ready_if_node_is_marked_as_retired) +{ + _calc->setNodeRetired(true); + addReady(_ready.bucket(1)); + addReady(_ready.bucket(2)); + addReady(_notReady.bucket(3)); + _bmj.recompute(); + _bmj.scanAndMove(4, 3); + EXPECT_TRUE(_bmj.done()); + sync(); + EXPECT_EQ(0u, docsMoved().size()); +} + +TEST_F(ControllerFixture, explicitly_active_not_ready_bucket_can_be_moved_to_ready_even_if_node_is_marked_as_retired) +{ + _calc->setNodeRetired(true); + addReady(_ready.bucket(1)); + addReady(_ready.bucket(2)); + addReady(_notReady.bucket(3)); + _bmj.recompute(); + activateBucket(_notReady.bucket(3)); + _bmj.scanAndMove(4, 3); + EXPECT_TRUE(_bmj.done()); + sync(); + ASSERT_EQ(2u, docsMoved().size()); + assertEqual(_notReady.bucket(3), _notReady.docs(3)[0], 2, 1, docsMoved()[0]); + assertEqual(_notReady.bucket(3), _notReady.docs(3)[1], 2, 1, docsMoved()[1]); + ASSERT_EQ(1u, bucketsModified().size()); + EXPECT_EQ(_notReady.bucket(3), bucketsModified()[0]); +} + + +TEST_F(ControllerFixture, require_that_notifyCreateBucket_causes_bucket_to_be_reconsidered_by_job) +{ + EXPECT_TRUE(_bmj.done()); + addReady(_ready.bucket(1)); + addReady(_ready.bucket(2)); + runLoop(); + EXPECT_TRUE(_bmj.done()); + sync(); + EXPECT_TRUE(docsMoved().empty()); + EXPECT_TRUE(bucketsModified().empty()); + addReady(_notReady.bucket(3)); // bucket 3 now ready, no notify + EXPECT_TRUE(_bmj.done()); // move job still believes work done + sync(); + EXPECT_TRUE(bucketsModified().empty()); + _bmj.notifyCreateBucket(_bucketDB->takeGuard(), _notReady.bucket(3)); // reconsider bucket 3 + EXPECT_FALSE(_bmj.done()); + EXPECT_TRUE(bucketsModified().empty()); + sync(); + EXPECT_TRUE(bucketsModified().empty()); + runLoop(); + EXPECT_TRUE(_bmj.done()); + sync(); + + EXPECT_EQ(1u, bucketsModified().size()); + EXPECT_EQ(2u, docsMoved().size()); +} + + +struct ResourceLimitControllerFixture : public ControllerFixture +{ + ResourceLimitControllerFixture(double resourceLimitFactor = RESOURCE_LIMIT_FACTOR) : + ControllerFixture(BlockableMaintenanceJobConfig(resourceLimitFactor, MAX_OUTSTANDING_OPS)) + {} + + void testJobStopping(DiskMemUsageState blockingUsageState) { + // Bucket 1 should be moved + addReady(_ready.bucket(2)); + _bmj.recompute(); + EXPECT_FALSE(_bmj.done()); + // Note: This depends on _bmj.run() moving max 1 documents + EXPECT_FALSE(_bmj.run()); + sync(); + EXPECT_EQ(1u, docsMoved().size()); + EXPECT_EQ(0u, bucketsModified().size()); + // Notify that we've over limit + _diskMemUsageNotifier.notify(blockingUsageState); + EXPECT_TRUE(_bmj.run()); + sync(); + EXPECT_EQ(1u, docsMoved().size()); + EXPECT_EQ(0u, bucketsModified().size()); + // Notify that we've under limit + _diskMemUsageNotifier.notify(DiskMemUsageState()); + EXPECT_FALSE(_bmj.run()); + sync(); + EXPECT_EQ(2u, docsMoved().size()); + EXPECT_EQ(0u, bucketsModified().size()); + } + + void testJobNotStopping(DiskMemUsageState blockingUsageState) { + // Bucket 1 should be moved + addReady(_ready.bucket(2)); + _bmj.recompute(); + EXPECT_FALSE(_bmj.done()); + // Note: This depends on _bmj.run() moving max 1 documents + EXPECT_FALSE(_bmj.run()); + sync(); + EXPECT_EQ(1u, docsMoved().size()); + EXPECT_EQ(0u, bucketsModified().size()); + // Notify that we've over limit, but not over adjusted limit + _diskMemUsageNotifier.notify(blockingUsageState); + EXPECT_FALSE(_bmj.run()); + sync(); + EXPECT_EQ(2u, docsMoved().size()); + EXPECT_EQ(0u, bucketsModified().size()); + } +}; + +struct ResourceLimitControllerFixture_1_2 : public ResourceLimitControllerFixture { + ResourceLimitControllerFixture_1_2() : ResourceLimitControllerFixture(1.2) {} +}; + +TEST_F(ResourceLimitControllerFixture, require_that_bucket_move_stops_when_disk_limit_is_reached) +{ + testJobStopping(DiskMemUsageState(ResourceUsageState(0.7, 0.8), ResourceUsageState())); +} + +TEST_F(ResourceLimitControllerFixture, require_that_bucket_move_stops_when_memory_limit_is_reached) +{ + testJobStopping(DiskMemUsageState(ResourceUsageState(), ResourceUsageState(0.7, 0.8))); +} + +TEST_F(ResourceLimitControllerFixture_1_2, require_that_bucket_move_uses_resource_limit_factor_for_disk_resource_limit) +{ + testJobNotStopping(DiskMemUsageState(ResourceUsageState(0.7, 0.8), ResourceUsageState())); +} + +TEST_F(ResourceLimitControllerFixture_1_2, require_that_bucket_move_uses_resource_limit_factor_for_memory_resource_limit) +{ + testJobNotStopping(DiskMemUsageState(ResourceUsageState(), ResourceUsageState(0.7, 0.8))); +} + + +struct MaxOutstandingMoveOpsFixture : public ControllerFixtureBase +{ + MaxOutstandingMoveOpsFixture(uint32_t maxOutstandingOps) + : ControllerFixtureBase(BlockableMaintenanceJobConfig(RESOURCE_LIMIT_FACTOR, maxOutstandingOps), true) + { + _builder.createDocs(1, 1, 2); + _builder.createDocs(2, 2, 3); + _builder.createDocs(3, 3, 4); + _builder.createDocs(4, 4, 5); + _ready.insertDocs(_builder.getDocs()); + _builder.clearDocs(); + _builder.createDocs(11, 1, 2); + _builder.createDocs(12, 2, 3); + _builder.createDocs(13, 3, 4); + _builder.createDocs(14, 4, 5); + _notReady.insertDocs(_builder.getDocs()); + addReady(_ready.bucket(3)); + _bmj.recompute(); + } + + void assertRunToBlocked() { + EXPECT_TRUE(_bmj.run()); // job becomes blocked as max outstanding limit is reached + EXPECT_FALSE(_bmj.done()); + EXPECT_TRUE(_bmj.isBlocked()); + EXPECT_TRUE(_bmj.isBlocked(BlockedReason::OUTSTANDING_OPS)); + } + void assertRunToNotBlocked() { + EXPECT_FALSE(_bmj.run()); + EXPECT_FALSE(_bmj.done()); + EXPECT_FALSE(_bmj.isBlocked()); + } + void assertRunToFinished() { + EXPECT_TRUE(_bmj.run()); + EXPECT_TRUE(_bmj.done()); + EXPECT_FALSE(_bmj.isBlocked()); + } + void assertDocsMoved(uint32_t expDocsMovedCnt, uint32_t expMoveContextsCnt) { + EXPECT_EQ(expDocsMovedCnt, docsMoved().size()); + EXPECT_EQ(expMoveContextsCnt, _moveHandler._moveDoneContexts.size()); + } + void unblockJob(uint32_t expRunnerCnt) { + _moveHandler.clearMoveDoneContexts(); // unblocks job and try to execute it via runner + EXPECT_EQ(expRunnerCnt, _runner.runCount); + EXPECT_FALSE(_bmj.isBlocked()); + } +}; + +struct MaxOutstandingMoveOpsFixture_1 : public MaxOutstandingMoveOpsFixture { + MaxOutstandingMoveOpsFixture_1() : MaxOutstandingMoveOpsFixture(1) {} +}; + +struct MaxOutstandingMoveOpsFixture_2 : public MaxOutstandingMoveOpsFixture { + MaxOutstandingMoveOpsFixture_2() : MaxOutstandingMoveOpsFixture(2) {} +}; + +TEST_F(MaxOutstandingMoveOpsFixture_1, require_that_bucket_move_job_is_blocked_if_it_has_too_many_outstanding_move_operations_max_1) +{ + assertRunToBlocked(); + sync(); + assertDocsMoved(1, 1); + assertRunToBlocked(); + assertDocsMoved(1, 1); + + unblockJob(1); + assertRunToBlocked(); + sync(); + assertDocsMoved(2, 1); + + unblockJob(2); + assertRunToBlocked(); + sync(); + assertDocsMoved(3, 1); + + unblockJob(3); + assertRunToFinished(); + sync(); + assertDocsMoved(3, 0); +} + +TEST_F(MaxOutstandingMoveOpsFixture_2, require_that_bucket_move_job_is_blocked_if_it_has_too_many_outstanding_move_operations_max_2) +{ + assertRunToNotBlocked(); + sync(); + assertDocsMoved(1, 1); + + assertRunToBlocked(); + sync(); + assertDocsMoved(2, 2); + + unblockJob(2); + assertRunToFinished(); + sync(); + assertDocsMoved(3, 1); +} + +GTEST_MAIN_RUN_ALL_TESTS() diff --git a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentmover_test.cpp b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentmover_test.cpp index 420f45d99e8..ae504fef603 100644 --- a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentmover_test.cpp +++ b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentmover_test.cpp @@ -43,7 +43,7 @@ struct DocumentMoverTest : ::testing::Test : _builder(), _bucketDB(std::make_shared<bucketdb::BucketDBOwner>()), _limiter(), - _mover(_limiter), + _mover(_limiter, _bucketDb), _source(_builder, _bucketDB, 0u, SubDbType::READY), _bucketDb(), _handler(_bucketDb) @@ -58,7 +58,7 @@ struct DocumentMoverTest : ::testing::Test _source._subDb.retriever(), _source._subDb.feed_view(), &_pendingLidsForCommit); - _mover.setupForBucket(bucket, &_source._subDb, targetSubDbId, _handler, _bucketDb); + _mover.setupForBucket(bucket, &_source._subDb, targetSubDbId, _handler); } bool moveDocuments(size_t maxDocsToMove) { return _mover.moveDocuments(maxDocsToMove); @@ -68,7 +68,7 @@ struct DocumentMoverTest : ::testing::Test TEST_F(DocumentMoverTest, require_that_initial_bucket_mover_is_done) { MyMoveOperationLimiter limiter; - DocumentBucketMover mover(limiter); + DocumentBucketMover mover(limiter, _bucketDb); EXPECT_TRUE(mover.bucketDone()); mover.moveDocuments(2); EXPECT_TRUE(mover.bucketDone()); diff --git a/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt index 26fba8a780f..86d238c7554 100644 --- a/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt +++ b/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt @@ -6,6 +6,7 @@ vespa_add_library(searchcore_server STATIC bootstrapconfigmanager.cpp buckethandler.cpp bucketmovejob.cpp + bucketmovejobv2.cpp clusterstatehandler.cpp combiningfeedview.cpp ddbstate.cpp diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp index 8b9b3c539d6..30b1e8b623c 100644 --- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp @@ -64,8 +64,7 @@ BucketMoveJob::checkBucket(const BucketId &bucket, const MaintenanceDocumentSubDB &target(wantReady ? _ready : _notReady); LOG(debug, "checkBucket(): mover.setupForBucket(%s, source:%u, target:%u)", bucket.toString().c_str(), source.sub_db_id(), target.sub_db_id()); - mover.setupForBucket(bucket, &source, target.sub_db_id(), - _moveHandler, _ready.meta_store()->getBucketDB()); + mover.setupForBucket(bucket, &source, target.sub_db_id(), _moveHandler); } BucketMoveJob::ScanResult @@ -151,7 +150,7 @@ BucketMoveJob(const IBucketStateCalculator::SP &calc, _modifiedHandler(modifiedHandler), _ready(ready), _notReady(notReady), - _mover(getLimiter()), + _mover(getLimiter(), _ready.meta_store()->getBucketDB()), _doneScan(false), _scanPos(), _scanPass(ScanPass::FIRST), @@ -161,7 +160,7 @@ BucketMoveJob(const IBucketStateCalculator::SP &calc, _delayedBucketsFrozen(), _frozenBuckets(frozenBuckets), _bucketCreateNotifier(bucketCreateNotifier), - _delayedMover(getLimiter()), + _delayedMover(getLimiter(), _ready.meta_store()->getBucketDB()), _clusterStateChangedNotifier(clusterStateChangedNotifier), _bucketStateChangedNotifier(bucketStateChangedNotifier), _diskMemUsageNotifier(diskMemUsageNotifier) diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp new file mode 100644 index 00000000000..77942b72478 --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp @@ -0,0 +1,368 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "bucketmovejobv2.h" +#include "imaintenancejobrunner.h" +#include "ibucketstatechangednotifier.h" +#include "iclusterstatechangednotifier.h" +#include "maintenancedocumentsubdb.h" +#include "i_disk_mem_usage_notifier.h" +#include "ibucketmodifiedhandler.h" +#include "move_operation_limiter.h" +#include "document_db_maintenance_config.h" +#include <vespa/searchcore/proton/bucketdb/i_bucket_create_notifier.h> +#include <vespa/searchcore/proton/feedoperation/moveoperation.h> +#include <vespa/searchcore/proton/documentmetastore/i_document_meta_store.h> +#include <vespa/searchcorespi/index/i_thread_service.h> +#include <vespa/persistence/spi/bucket_tasks.h> +#include <vespa/vespalib/util/destructor_callbacks.h> +#include <vespa/vespalib/util/lambdatask.h> + +#include <vespa/log/log.h> +LOG_SETUP(".proton.server.bucketmovejob"); + +using document::BucketId; +using storage::spi::BucketInfo; +using storage::spi::Bucket; +using storage::spi::makeBucketTask; +using proton::bucketdb::BucketMover; +using vespalib::makeLambdaTask; + +namespace proton { + +namespace { + +const char * bool2str(bool v) { return (v ? "T" : "F"); } + +bool +blockedDueToClusterState(const IBucketStateCalculator::SP &calc) +{ + bool clusterUp = calc && calc->clusterUp(); + bool nodeUp = calc && calc->nodeUp(); + bool nodeInitializing = calc && calc->nodeInitializing(); + return !(clusterUp && nodeUp && !nodeInitializing); +} + +} + +BucketMoveJobV2::BucketMoveJobV2(const IBucketStateCalculator::SP &calc, + IDocumentMoveHandler &moveHandler, + IBucketModifiedHandler &modifiedHandler, + IThreadService & master, + BucketExecutor & bucketExecutor, + const MaintenanceDocumentSubDB &ready, + const MaintenanceDocumentSubDB ¬Ready, + bucketdb::IBucketCreateNotifier &bucketCreateNotifier, + IClusterStateChangedNotifier &clusterStateChangedNotifier, + IBucketStateChangedNotifier &bucketStateChangedNotifier, + IDiskMemUsageNotifier &diskMemUsageNotifier, + const BlockableMaintenanceJobConfig &blockableConfig, + const vespalib::string &docTypeName, + document::BucketSpace bucketSpace) + : BlockableMaintenanceJob("move_buckets." + docTypeName, vespalib::duration::zero(), vespalib::duration::zero(), blockableConfig), + IClusterStateChangedHandler(), + bucketdb::IBucketCreateListener(), + IBucketStateChangedHandler(), + IDiskMemUsageListener(), + _calc(calc), + _moveHandler(moveHandler), + _modifiedHandler(modifiedHandler), + _master(master), + _bucketExecutor(bucketExecutor), + _ready(ready), + _notReady(notReady), + _bucketSpace(bucketSpace), + _iterateCount(0), + _movers(), + _buckets2Move(), + _stopped(false), + _startedCount(0), + _executedCount(0), + _bucketCreateNotifier(bucketCreateNotifier), + _clusterStateChangedNotifier(clusterStateChangedNotifier), + _bucketStateChangedNotifier(bucketStateChangedNotifier), + _diskMemUsageNotifier(diskMemUsageNotifier) +{ + _movers.reserve(std::min(100u, blockableConfig.getMaxOutstandingMoveOps())); + if (blockedDueToClusterState(_calc)) { + setBlocked(BlockedReason::CLUSTER_STATE); + } + + _bucketCreateNotifier.addListener(this); + _clusterStateChangedNotifier.addClusterStateChangedHandler(this); + _bucketStateChangedNotifier.addBucketStateChangedHandler(this); + _diskMemUsageNotifier.addDiskMemUsageListener(this); + recompute(); +} + +BucketMoveJobV2::~BucketMoveJobV2() +{ + _bucketCreateNotifier.removeListener(this); + _clusterStateChangedNotifier.removeClusterStateChangedHandler(this); + _bucketStateChangedNotifier.removeBucketStateChangedHandler(this); + _diskMemUsageNotifier.removeDiskMemUsageListener(this); +} + +BucketMoveJobV2::NeedResult +BucketMoveJobV2::needMove(const ScanIterator &itr) const { + NeedResult noMove(false, false); + const bool hasReadyDocs = itr.hasReadyBucketDocs(); + const bool hasNotReadyDocs = itr.hasNotReadyBucketDocs(); + if (!hasReadyDocs && !hasNotReadyDocs) { + return noMove; // No documents for bucket in ready or notready subdbs + } + const bool isActive = itr.isActive(); + // No point in moving buckets when node is retired and everything will be deleted soon. + // However, allow moving of explicitly activated buckets, as this implies a lack of other good replicas. + if (!_calc || (_calc->nodeRetired() && !isActive)) { + return noMove; + } + const bool shouldBeReady = _calc->shouldBeReady(document::Bucket(_bucketSpace, itr.getBucket())); + const bool wantReady = shouldBeReady || isActive; + LOG(spam, "checkBucket(): bucket(%s), shouldBeReady(%s), active(%s)", + itr.getBucket().toString().c_str(), bool2str(shouldBeReady), bool2str(isActive)); + if (wantReady) { + if (!hasNotReadyDocs) + return noMove; // No notready bucket to make ready + } else { + if (isActive) + return noMove; // Do not move rom ready to not ready when active + if (!hasReadyDocs) + return noMove; // No ready bucket to make notready + } + return {true, wantReady}; +} + +void +BucketMoveJobV2::startMove(BucketMoverSP mover, size_t maxDocsToMove) { + auto [keys, done] = mover->getKeysToMove(maxDocsToMove); + if (done) { + mover->setBucketDone(); + } + if (keys.empty()) return; + if (_stopped.load(std::memory_order_relaxed)) return; + mover->updateLastValidGid(keys.back()._gid); + auto context = getLimiter().beginOperation(); + Bucket spiBucket(document::Bucket(_bucketSpace, mover->getBucket())); + auto bucketTask = makeBucketTask( + [this, mover=std::move(mover), keys=std::move(keys),opsTracker=getLimiter().beginOperation()] + (const Bucket & bucket, std::shared_ptr<IDestructorCallback> onDone) mutable + { + assert(mover->getBucket() == bucket.getBucketId()); + using DoneContext = vespalib::KeepAlive<std::pair<IDestructorCallbackSP, IDestructorCallbackSP>>; + prepareMove(std::move(mover), std::move(keys), + std::make_shared<DoneContext>(std::make_pair(std::move(opsTracker), std::move(onDone)))); + }); + auto failed = _bucketExecutor.execute(spiBucket, std::move(bucketTask)); + if (!failed) { + _startedCount.fetch_add(1, std::memory_order_relaxed); + } +} + +namespace { + +class IncOnDestruct { +public: + IncOnDestruct(std::atomic<size_t> & count) : _count(count) {} + ~IncOnDestruct() { + _count.fetch_add(1, std::memory_order_relaxed); + } +private: + std::atomic<size_t> & _count; +}; + +} + +void +BucketMoveJobV2::prepareMove(BucketMoverSP mover, std::vector<MoveKey> keys, IDestructorCallbackSP onDone) +{ + IncOnDestruct countGuard(_executedCount); + if (_stopped.load(std::memory_order_relaxed)) return; + auto moveOps = mover->createMoveOperations(keys); + _master.execute(makeLambdaTask([this, mover=std::move(mover), moveOps=std::move(moveOps), onDone=std::move(onDone)]() mutable { + completeMove(std::move(mover), std::move(moveOps), std::move(onDone)); + })); +} + +void +BucketMoveJobV2::completeMove(BucketMoverSP mover, std::vector<GuardedMoveOp> ops, IDestructorCallbackSP onDone) { + mover->moveDocuments(std::move(ops), std::move(onDone)); + if (mover->bucketDone() && mover->inSync()) { + _modifiedHandler.notifyBucketModified(mover->getBucket()); + } +} + +void +BucketMoveJobV2::cancelMovesForBucket(BucketId bucket) { + for (auto itr = _movers.begin(); itr != _movers.end(); itr++) { + if (bucket == (*itr)->getBucket()) { + _movers.erase(itr); + backFillMovers(); + return; + } + } +} + +void +BucketMoveJobV2::considerBucket(const bucketdb::Guard & guard, BucketId bucket) +{ + ScanIterator itr(guard, bucket); + auto [mustMove, wantReady] = needMove(itr); + if (mustMove) { + _buckets2Move[bucket] = wantReady; + } else { + _buckets2Move.erase(bucket); + cancelMovesForBucket(bucket); + } + backFillMovers(); + considerRun(); +} + +void +BucketMoveJobV2::notifyCreateBucket(const bucketdb::Guard & guard, const BucketId &bucket) +{ + considerBucket(guard, bucket); +} + +BucketMoveJobV2::BucketSet +BucketMoveJobV2::computeBuckets2Move() +{ + BucketMoveJobV2::BucketSet toMove; + for (ScanIterator itr(_ready.meta_store()->getBucketDB().takeGuard(), BucketId()); itr.valid(); ++itr) { + auto [mustMove, wantReady] = needMove(itr); + if (mustMove) { + toMove[itr.getBucket()] = wantReady; + } + } + return toMove; +} + +std::shared_ptr<BucketMover> +BucketMoveJobV2::createMover(BucketId bucket, bool wantReady) { + const MaintenanceDocumentSubDB &source(wantReady ? _notReady : _ready); + const MaintenanceDocumentSubDB &target(wantReady ? _ready : _notReady); + LOG(debug, "checkBucket(): mover.setupForBucket(%s, source:%u, target:%u)", + bucket.toString().c_str(), source.sub_db_id(), target.sub_db_id()); + return std::make_shared<BucketMover>(bucket, &source, target.sub_db_id(), _moveHandler); +} + +std::shared_ptr<BucketMover> +BucketMoveJobV2::greedyCreateMover() { + if ( ! _buckets2Move.empty()) { + auto next = _buckets2Move.begin(); + auto mover = createMover(next->first, next->second); + _buckets2Move.erase(next->first); + return mover; + } + return {}; +} + +bool +BucketMoveJobV2::moveDocs(size_t maxDocsToMove) { + if (done()) return true; + + // Select mover + size_t index = _iterateCount++ % _movers.size(); + const auto & mover = _movers[index]; + + //Move, or reduce movers as we are tailing off + if (!mover->bucketDone()) { + startMove(mover, maxDocsToMove); + if (mover->bucketDone()) { + auto next = greedyCreateMover(); + if (next) { + _movers[index] = next; + } else { + _movers.erase(_movers.begin() + index); + } + } + } + return done(); +} + +bool +BucketMoveJobV2::scanAndMove(size_t maxBuckets2Move, size_t maxDocsToMovePerBucket) { + for (size_t i(0); i < maxBuckets2Move; i++) { + moveDocs(maxDocsToMovePerBucket); + } + return isBlocked() || done(); +} + +bool +BucketMoveJobV2::done() const { + return _buckets2Move.empty() && _movers.empty() && !isBlocked(); +} + +bool +BucketMoveJobV2::run() +{ + if (isBlocked()) { + return true; // indicate work is done, since node state is bad + } + /// Returning false here will immediately post the job back on the executor. This will give a busy loop, + /// but this is considered fine as it is very rare and it will be intermingled with multiple feed operations. + if ( ! scanAndMove(1, 1) ) { + return false; + } + + if (isBlocked(BlockedReason::OUTSTANDING_OPS)) { + return true; + } + return done(); +} + +void +BucketMoveJobV2::recompute() { + _movers.clear(); + _buckets2Move = computeBuckets2Move(); + backFillMovers(); +} + +void +BucketMoveJobV2::backFillMovers() { + // Ensure we have enough movers. + while ( ! _buckets2Move.empty() && (_movers.size() < _movers.capacity())) { + _movers.push_back(greedyCreateMover()); + } +} +void +BucketMoveJobV2::notifyClusterStateChanged(const IBucketStateCalculator::SP &newCalc) +{ + // Called by master write thread + _calc = newCalc; + recompute(); + if (blockedDueToClusterState(_calc)) { + setBlocked(BlockedReason::CLUSTER_STATE); + } else { + unBlock(BlockedReason::CLUSTER_STATE); + } +} + +void +BucketMoveJobV2::notifyBucketStateChanged(const BucketId &bucketId, BucketInfo::ActiveState) +{ + // Called by master write thread + considerBucket(_ready.meta_store()->getBucketDB().takeGuard(), bucketId); +} + +void +BucketMoveJobV2::notifyDiskMemUsage(DiskMemUsageState state) +{ + // Called by master write thread + internalNotifyDiskMemUsage(state); +} + +bool +BucketMoveJobV2::inSync() const { + return _executedCount == _startedCount; +} + +void +BucketMoveJobV2::onStop() { + // Called by master write thread + _stopped = true; + while ( ! inSync() ) { + std::this_thread::sleep_for(1ms); + } +} + +} // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h new file mode 100644 index 00000000000..714f24b6ff6 --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h @@ -0,0 +1,119 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "blockable_maintenance_job.h" +#include "documentbucketmover.h" +#include "i_disk_mem_usage_listener.h" +#include "ibucketstatechangedhandler.h" +#include "iclusterstatechangedhandler.h" +#include <vespa/searchcore/proton/bucketdb/bucketscaniterator.h> +#include <vespa/searchcore/proton/bucketdb/i_bucket_create_listener.h> + +namespace storage::spi { struct BucketExecutor; } +namespace searchcorespi::index { struct IThreadService; } + +namespace proton { + +class BlockableMaintenanceJobConfig; +class IBucketStateChangedNotifier; +class IClusterStateChangedNotifier; +class IDiskMemUsageNotifier; +class IBucketModifiedHandler; + +namespace bucketdb { class IBucketCreateNotifier; } + +/** + * Class used to control the moving of buckets between the ready and + * not ready sub databases based on the readiness of buckets according to the cluster state. + * It will first compute the set of buckets to be moved. Then N of these buckets will be iterated in parallel and + * the documents scheduled for move. The movment will happen in 3 phases. + * 1 - Collect meta info for documents. Must happend in master thread + * 2 - Acquire bucket lock and fetch documents and very against meta data. This is done in BucketExecutor threads. + * 3 - Actual movement is then done in master thread while still holding bucket lock. Once bucket has fully moved + * bucket modified notification is sent. + */ +class BucketMoveJobV2 : public BlockableMaintenanceJob, + public IClusterStateChangedHandler, + public bucketdb::IBucketCreateListener, + public IBucketStateChangedHandler, + public IDiskMemUsageListener +{ +private: + using BucketExecutor = storage::spi::BucketExecutor; + using IDestructorCallback = vespalib::IDestructorCallback; + using IDestructorCallbackSP = std::shared_ptr<IDestructorCallback>; + using IThreadService = searchcorespi::index::IThreadService; + using BucketId = document::BucketId; + using ScanIterator = bucketdb::ScanIterator; + using BucketSet = std::map<BucketId, bool>; + using NeedResult = std::pair<bool, bool>; + using ActiveState = storage::spi::BucketInfo::ActiveState; + using BucketMover = bucketdb::BucketMover; + using BucketMoverSP = std::shared_ptr<BucketMover>; + using Movers = std::vector<std::shared_ptr<BucketMover>>; + using MoveKey = BucketMover::MoveKey; + using GuardedMoveOp = BucketMover::GuardedMoveOp; + std::shared_ptr<IBucketStateCalculator> _calc; + IDocumentMoveHandler &_moveHandler; + IBucketModifiedHandler &_modifiedHandler; + IThreadService &_master; + BucketExecutor &_bucketExecutor; + const MaintenanceDocumentSubDB &_ready; + const MaintenanceDocumentSubDB &_notReady; + const document::BucketSpace _bucketSpace; + size_t _iterateCount; + Movers _movers; + BucketSet _buckets2Move; + std::atomic<bool> _stopped; + std::atomic<size_t> _startedCount; + std::atomic<size_t> _executedCount; + + bucketdb::IBucketCreateNotifier &_bucketCreateNotifier; + IClusterStateChangedNotifier &_clusterStateChangedNotifier; + IBucketStateChangedNotifier &_bucketStateChangedNotifier; + IDiskMemUsageNotifier &_diskMemUsageNotifier; + + void startMove(BucketMoverSP mover, size_t maxDocsToMove); + void prepareMove(BucketMoverSP mover, std::vector<MoveKey> keysToMove, IDestructorCallbackSP context); + void completeMove(BucketMoverSP mover, std::vector<GuardedMoveOp> keys, IDestructorCallbackSP context); + void considerBucket(const bucketdb::Guard & guard, BucketId bucket); + NeedResult needMove(const ScanIterator &itr) const; + BucketSet computeBuckets2Move(); + BucketMoverSP createMover(BucketId bucket, bool wantReady); + BucketMoverSP greedyCreateMover(); + void backFillMovers(); + void cancelMovesForBucket(BucketId bucket); + bool moveDocs(size_t maxDocsToMove); +public: + BucketMoveJobV2(const IBucketStateCalculator::SP &calc, + IDocumentMoveHandler &moveHandler, + IBucketModifiedHandler &modifiedHandler, + IThreadService & master, + BucketExecutor & bucketExecutor, + const MaintenanceDocumentSubDB &ready, + const MaintenanceDocumentSubDB ¬Ready, + bucketdb::IBucketCreateNotifier &bucketCreateNotifier, + IClusterStateChangedNotifier &clusterStateChangedNotifier, + IBucketStateChangedNotifier &bucketStateChangedNotifier, + IDiskMemUsageNotifier &diskMemUsageNotifier, + const BlockableMaintenanceJobConfig &blockableConfig, + const vespalib::string &docTypeName, + document::BucketSpace bucketSpace); + + ~BucketMoveJobV2() override; + + bool scanAndMove(size_t maxBuckets2Move, size_t maxDocsToMovePerBucket); + bool done() const; + void recompute(); + bool inSync() const; + + bool run() override; + void notifyClusterStateChanged(const IBucketStateCalculator::SP &newCalc) override; + void notifyBucketStateChanged(const BucketId &bucketId, ActiveState newState) override; + void notifyDiskMemUsage(DiskMemUsageState state) override; + void notifyCreateBucket(const bucketdb::Guard & guard, const BucketId &bucket) override; + void onStop() override; +}; + +} // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp index 76a65df4d1a..6defd3e7037 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp @@ -20,8 +20,8 @@ namespace proton::bucketdb { typedef IDocumentMetaStore::Iterator Iterator; -MoveOperation::UP -BucketMover::createMoveOperation(const MoveKey &key) { +BucketMover::GuardedMoveOp +BucketMover::createMoveOperation(MoveKey &key) { if (_source->lidNeedsCommit(key._lid)) { return {}; } @@ -29,9 +29,10 @@ BucketMover::createMoveOperation(const MoveKey &key) { if (!doc || doc->getId().getGlobalId() != key._gid) return {}; // Failed to retrieve document, removed or changed identity BucketId bucketId = _bucket.stripUnused(); - return std::make_unique<MoveOperation>(bucketId, key._timestamp, std::move(doc), - DbDocumentId(_source->sub_db_id(), key._lid), - _targetSubDbId); + return BucketMover::GuardedMoveOp(std::make_unique<MoveOperation>(bucketId, key._timestamp, std::move(doc), + DbDocumentId(_source->sub_db_id(), key._lid), + _targetSubDbId), + std::move(key._guard)); } void @@ -39,21 +40,34 @@ BucketMover::moveDocument(MoveOperationUP moveOp, IDestructorCallbackSP onDone) _handler->handleMove(*moveOp, std::move(onDone)); } +BucketMover::MoveKey::MoveKey(uint32_t lid, const document::GlobalId &gid, Timestamp timestamp, MoveGuard guard) noexcept + : _lid(lid), + _gid(gid), + _timestamp(timestamp), + _guard(std::move(guard)) +{ } + +BucketMover::MoveKey::~MoveKey() = default; -BucketMover::BucketMover(const BucketId &bucket, const MaintenanceDocumentSubDB *source, uint32_t targetSubDbId, - IDocumentMoveHandler &handler, BucketDBOwner &bucketDb) noexcept +BucketMover::BucketMover(const BucketId &bucket, const MaintenanceDocumentSubDB *source, + uint32_t targetSubDbId, IDocumentMoveHandler &handler) noexcept : _source(source), _handler(&handler), - _bucketDb(&bucketDb), _bucket(bucket), _targetSubDbId(targetSubDbId), + _started(0), + _completed(0), _bucketDone(false), - _lastGid(), - _lastGidValid(false) + _lastGidValid(false), + _lastGid() { } +BucketMover::~BucketMover() { + assert(inSync()); +} + std::pair<std::vector<BucketMover::MoveKey>, bool> -BucketMover::getKeysToMove(size_t maxDocsToMove) const { +BucketMover::getKeysToMove(size_t maxDocsToMove) { std::pair<std::vector<BucketMover::MoveKey>, bool> result; Iterator itr = (_lastGidValid ? _source->meta_store()->upperBound(_lastGid) : _source->meta_store()->lowerBound(_bucket)); @@ -63,7 +77,7 @@ BucketMover::getKeysToMove(size_t maxDocsToMove) const { uint32_t lid = itr.getKey().get_lid(); const RawDocumentMetaData &metaData = _source->meta_store()->getRawMetaData(lid); if (metaData.getBucketUsedBits() == _bucket.getUsedBits()) { - result.first.emplace_back(lid, metaData.getGid(), metaData.getTimestamp()); + result.first.emplace_back(lid, metaData.getGid(), metaData.getTimestamp(), MoveGuard(*this)); ++docsMoved; } } @@ -71,13 +85,13 @@ BucketMover::getKeysToMove(size_t maxDocsToMove) const { return result; } -std::vector<MoveOperation::UP> -BucketMover::createMoveOperations(const std::vector<MoveKey> &toMove) { - std::vector<MoveOperation::UP> successfulReads; +std::vector<BucketMover::GuardedMoveOp> +BucketMover::createMoveOperations(std::vector<MoveKey> &toMove) { + std::vector<GuardedMoveOp> successfulReads; successfulReads.reserve(toMove.size()); - for (const MoveKey &key : toMove) { + for (MoveKey &key : toMove) { auto moveOp = createMoveOperation(key); - if (!moveOp) { + if (!moveOp.first) { break; } successfulReads.push_back(std::move(moveOp)); @@ -86,37 +100,10 @@ BucketMover::createMoveOperations(const std::vector<MoveKey> &toMove) { } void -BucketMover::moveDocuments(std::vector<MoveOperation::UP> moveOps, IDestructorCallbackSP onDone) { - for (auto & moveOp : moveOps) { - moveDocument(std::move(moveOp), onDone); - } -} - -bool -BucketMover::moveDocuments(size_t maxDocsToMove, IMoveOperationLimiter &limiter) -{ - if (_bucketDone) { - return true; - } - auto [keys, done] = getKeysToMove(maxDocsToMove); - auto moveOps = createMoveOperations(keys); - bool allOk = keys.size() == moveOps.size(); - if (done && allOk) { - setBucketDone(); - } - if (moveOps.empty()) return allOk; - - updateLastValidGid(moveOps.back()->getDocument()->getId().getGlobalId()); - +BucketMover::moveDocuments(std::vector<GuardedMoveOp> moveOps, IDestructorCallbackSP onDone) { for (auto & moveOp : moveOps) { - // We cache the bucket for the document we are going to move to avoid getting - // inconsistent bucket info (getBucketInfo()) while moving between ready and not-ready - // sub dbs as the bucket info is not updated atomically in this case. - _bucketDb->takeGuard()->cacheBucket(moveOp->getBucketId()); - _handler->handleMove(*moveOp, limiter.beginOperation()); - _bucketDb->takeGuard()->uncacheBucket(); + moveDocument(std::move(moveOp.first), std::move(onDone)); } - return allOk; } } @@ -124,22 +111,51 @@ BucketMover::moveDocuments(size_t maxDocsToMove, IMoveOperationLimiter &limiter) namespace proton { using bucketdb::BucketMover; +using bucketdb::BucketDBOwner; -DocumentBucketMover::DocumentBucketMover(IMoveOperationLimiter &limiter) noexcept +DocumentBucketMover::DocumentBucketMover(IMoveOperationLimiter &limiter, BucketDBOwner &bucketDb) noexcept : _limiter(limiter), + _bucketDb(&bucketDb), _impl() {} void DocumentBucketMover::setupForBucket(const document::BucketId &bucket, const MaintenanceDocumentSubDB *source, - uint32_t targetSubDbId, IDocumentMoveHandler &handler, bucketdb::BucketDBOwner &bucketDb) + uint32_t targetSubDbId, IDocumentMoveHandler &handler) { - _impl = std::make_unique<BucketMover>(bucket, source, targetSubDbId, handler, bucketDb); + _impl = std::make_unique<BucketMover>(bucket, source, targetSubDbId, handler); } bool DocumentBucketMover::moveDocuments(size_t maxDocsToMove) { - return !_impl || _impl->moveDocuments(maxDocsToMove, _limiter); + return !_impl || moveDocuments(maxDocsToMove, _limiter); +} + +bool +DocumentBucketMover::moveDocuments(size_t maxDocsToMove, IMoveOperationLimiter &limiter) +{ + if (_impl->bucketDone()) { + return true; + } + auto [keys, done] = _impl->getKeysToMove(maxDocsToMove); + auto moveOps = _impl->createMoveOperations(keys); + bool allOk = keys.size() == moveOps.size(); + if (done && allOk) { + _impl->setBucketDone(); + } + if (moveOps.empty()) return allOk; + + _impl->updateLastValidGid(moveOps.back().first->getDocument()->getId().getGlobalId()); + + for (auto & moveOp : moveOps) { + // We cache the bucket for the document we are going to move to avoid getting + // inconsistent bucket info (getBucketInfo()) while moving between ready and not-ready + // sub dbs as the bucket info is not updated atomically in this case. + _bucketDb->takeGuard()->cacheBucket(moveOp.first->getBucketId()); + _impl->moveDocument(std::move(moveOp.first), limiter.beginOperation()); + _bucketDb->takeGuard()->uncacheBucket(); + } + return allOk; } } diff --git a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h index dc91d40ce4e..c4f94a88cfa 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h +++ b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h @@ -5,6 +5,7 @@ #include <vespa/document/bucket/bucketid.h> #include <vespa/document/base/globalid.h> #include <persistence/spi/types.h> +#include <atomic> namespace vespalib { class IDestructorCallback; } @@ -30,40 +31,60 @@ public: using MoveOperationUP = std::unique_ptr<MoveOperation>; using IDestructorCallback = vespalib::IDestructorCallback; using IDestructorCallbackSP = std::shared_ptr<IDestructorCallback>; + class MoveGuard { + public: + MoveGuard() noexcept : _mover(nullptr) {} + MoveGuard(BucketMover & mover) noexcept + : _mover(&mover) + { + _mover->_started.fetch_add(1, std::memory_order_relaxed); + } + MoveGuard(MoveGuard && rhs) noexcept : _mover(rhs._mover) { rhs._mover = nullptr; } + MoveGuard & operator = (MoveGuard && mover) = delete; + MoveGuard(const MoveGuard & rhs) = delete; + MoveGuard & operator = (const MoveGuard & mover) = delete; + ~MoveGuard() { + if (_mover) { + _mover->_completed.fetch_add(1, std::memory_order_relaxed); + } + } + private: + BucketMover *_mover; + }; struct MoveKey { using Timestamp = storage::spi::Timestamp; - MoveKey(uint32_t lid, const document::GlobalId &gid, Timestamp timestamp) noexcept - : _lid(lid), - _gid(gid), - _timestamp(timestamp) - { } + MoveKey(uint32_t lid, const document::GlobalId &gid, Timestamp timestamp, MoveGuard guard) noexcept; + MoveKey(MoveKey &&) noexcept = default; + ~MoveKey(); uint32_t _lid; document::GlobalId _gid; Timestamp _timestamp; + MoveGuard _guard; }; - BucketMover(const document::BucketId &bucket, const MaintenanceDocumentSubDB *source, uint32_t targetSubDbId, - IDocumentMoveHandler &handler, BucketDBOwner &bucketDb) noexcept; + BucketMover(const document::BucketId &bucket, const MaintenanceDocumentSubDB *source, + uint32_t targetSubDbId, IDocumentMoveHandler &handler) noexcept; BucketMover(BucketMover &&) noexcept = default; BucketMover & operator=(BucketMover &&) noexcept = delete; BucketMover(const BucketMover &) = delete; BucketMover & operator=(const BucketMover &) = delete; + ~BucketMover(); - // TODO remove once we have switched bucket move job - bool moveDocuments(size_t maxDocsToMove, IMoveOperationLimiter &limiter); - + using GuardedMoveOp = std::pair<MoveOperationUP, MoveGuard>; /// Must be called in master thread - std::pair<std::vector<MoveKey>, bool> getKeysToMove(size_t maxDocsToMove) const; + std::pair<std::vector<MoveKey>, bool> getKeysToMove(size_t maxDocsToMove); /// Call from any thread - std::vector<MoveOperationUP> createMoveOperations(const std::vector<MoveKey> & toMove); + std::vector<GuardedMoveOp> createMoveOperations(std::vector<MoveKey> & toMove); /// Must be called in master thread - void moveDocuments(std::vector<MoveOperationUP> moveOps, IDestructorCallbackSP onDone); + void moveDocuments(std::vector<GuardedMoveOp> moveOps, IDestructorCallbackSP onDone); + void moveDocument(MoveOperationUP moveOp, IDestructorCallbackSP onDone); const document::BucketId &getBucket() const { return _bucket; } void cancel() { setBucketDone(); } void setBucketDone() { _bucketDone = true; } + /// Signals all documents have been scheduled for move bool bucketDone() const { return _bucketDone; } const MaintenanceDocumentSubDB * getSource() const { return _source; } /// Must be called in master thread @@ -71,19 +92,24 @@ public: _lastGid = gid; _lastGidValid = true; } + bool inSync() const { + return pending() == 0; + } private: const MaintenanceDocumentSubDB *_source; IDocumentMoveHandler *_handler; - BucketDBOwner *_bucketDb; const document::BucketId _bucket; const uint32_t _targetSubDbId; - bool _bucketDone; - document::GlobalId _lastGid; + std::atomic<uint32_t> _started; + std::atomic<uint32_t> _completed; + bool _bucketDone; // All moves started, or operation has been cancelled bool _lastGidValid; - - void moveDocument(MoveOperationUP moveOp, IDestructorCallbackSP onDone); - MoveOperationUP createMoveOperation(const MoveKey & key); + document::GlobalId _lastGid; + GuardedMoveOp createMoveOperation(MoveKey & key); + size_t pending() const { + return _started.load(std::memory_order_relaxed) - _completed.load(std::memory_order_relaxed); + } }; } @@ -95,10 +121,13 @@ private: class DocumentBucketMover { private: - IMoveOperationLimiter &_limiter; - std::unique_ptr<bucketdb::BucketMover> _impl; + IMoveOperationLimiter &_limiter; + bucketdb::BucketDBOwner *_bucketDb; + std::unique_ptr<bucketdb::BucketMover> _impl; + + bool moveDocuments(size_t maxDocsToMove, IMoveOperationLimiter &limiter); public: - DocumentBucketMover(IMoveOperationLimiter &limiter) noexcept; + DocumentBucketMover(IMoveOperationLimiter &limiter, bucketdb::BucketDBOwner &bucketDb) noexcept; DocumentBucketMover(DocumentBucketMover &&) noexcept = default; DocumentBucketMover & operator=(DocumentBucketMover &&) noexcept = delete; DocumentBucketMover(const DocumentBucketMover &) = delete; @@ -106,8 +135,7 @@ public: void setupForBucket(const document::BucketId &bucket, const MaintenanceDocumentSubDB *source, uint32_t targetSubDbId, - IDocumentMoveHandler &handler, - bucketdb::BucketDBOwner &bucketDb); + IDocumentMoveHandler &handler); const document::BucketId &getBucket() const { return _impl->getBucket(); } bool moveDocuments(size_t maxDocsToMove); void cancel() { _impl->cancel(); } diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp b/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp index c8429984b1f..ce6acb4795e 100644 --- a/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "bucketmovejob.h" +#include "bucketmovejobv2.h" #include "heart_beat_job.h" #include "job_tracked_maintenance_job.h" #include "lid_space_compaction_job.h" @@ -37,8 +38,7 @@ injectLidSpaceCompactionJobs(MaintenanceController &controller, document::BucketSpace bucketSpace) { for (auto &lidHandler : lscHandlers) { - - IMaintenanceJob::UP job; + std::unique_ptr<IMaintenanceJob> job; if (config.getLidSpaceCompactionConfig().useBucketExecutor()) { job = std::make_unique<lidspace::CompactionJob>( config.getLidSpaceCompactionConfig(), @@ -65,6 +65,7 @@ void injectBucketMoveJob(MaintenanceController &controller, const DocumentDBMaintenanceConfig &config, IFrozenBucketHandler &fbHandler, + storage::spi::BucketExecutor & bucketExecutor, bucketdb::IBucketCreateNotifier &bucketCreateNotifier, const vespalib::string &docTypeName, document::BucketSpace bucketSpace, @@ -76,18 +77,35 @@ injectBucketMoveJob(MaintenanceController &controller, DocumentDBJobTrackers &jobTrackers, IDiskMemUsageNotifier &diskMemUsageNotifier) { - auto bmj = std::make_unique<BucketMoveJob>(calc, - moveHandler, - bucketModifiedHandler, - controller.getReadySubDB(), - controller.getNotReadySubDB(), - fbHandler, - bucketCreateNotifier, - clusterStateChangedNotifier, - bucketStateChangedNotifier, - diskMemUsageNotifier, - config.getBlockableJobConfig(), - docTypeName, bucketSpace); + std::unique_ptr<IMaintenanceJob> bmj; + if (config.getBucketMoveConfig().useBucketExecutor()) { + bmj = std::make_unique<BucketMoveJobV2>(calc, + moveHandler, + bucketModifiedHandler, + controller.masterThread(), + bucketExecutor, + controller.getReadySubDB(), + controller.getNotReadySubDB(), + bucketCreateNotifier, + clusterStateChangedNotifier, + bucketStateChangedNotifier, + diskMemUsageNotifier, + config.getBlockableJobConfig(), + docTypeName, bucketSpace); + } else { + bmj = std::make_unique<BucketMoveJob>(calc, + moveHandler, + bucketModifiedHandler, + controller.getReadySubDB(), + controller.getNotReadySubDB(), + fbHandler, + bucketCreateNotifier, + clusterStateChangedNotifier, + bucketStateChangedNotifier, + diskMemUsageNotifier, + config.getBlockableJobConfig(), + docTypeName, bucketSpace); + } controller.registerJobInMasterThread(trackJob(jobTrackers.getBucketMove(), std::move(bmj))); } @@ -133,9 +151,9 @@ MaintenanceJobsInjector::injectJobs(MaintenanceController &controller, clusterStateChangedNotifier, calc, bucketSpace); } - injectBucketMoveJob(controller, config, fbHandler, bucketCreateNotifier, docTypeName, bucketSpace, moveHandler, - bucketModifiedHandler, clusterStateChangedNotifier, bucketStateChangedNotifier, calc, - jobTrackers, diskMemUsageNotifier); + injectBucketMoveJob(controller, config, fbHandler, bucketExecutor, bucketCreateNotifier, docTypeName, bucketSpace, + moveHandler, bucketModifiedHandler, clusterStateChangedNotifier, bucketStateChangedNotifier, + calc, jobTrackers, diskMemUsageNotifier); controller.registerJobInMasterThread( std::make_unique<SampleAttributeUsageJob>(readyAttributeManager, notReadyAttributeManager, diff --git a/vespajlib/src/main/java/com/yahoo/io/WritableByteTransmitter.java b/vespajlib/src/main/java/com/yahoo/io/WritableByteTransmitter.java index d9021e56480..89d449a2eb0 100644 --- a/vespajlib/src/main/java/com/yahoo/io/WritableByteTransmitter.java +++ b/vespajlib/src/main/java/com/yahoo/io/WritableByteTransmitter.java @@ -10,5 +10,5 @@ import java.nio.ByteBuffer; * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a> */ public interface WritableByteTransmitter { - public void send(ByteBuffer src) throws IOException; + void send(ByteBuffer src) throws IOException; } diff --git a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Configurator.java b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Configurator.java index 5f8ebf29ccd..ba79969469a 100644 --- a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Configurator.java +++ b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Configurator.java @@ -162,8 +162,11 @@ public class Configurator { .append("\n"); } - static Set<String> zookeeperServerHostnames(ZookeeperServerConfig zookeeperServerConfig) { - return zookeeperServerConfig.server().stream().map(ZookeeperServerConfig.Server::hostname).collect(Collectors.toSet()); + static List<String> zookeeperServerHostnames(ZookeeperServerConfig zookeeperServerConfig) { + return zookeeperServerConfig.server().stream() + .map(ZookeeperServerConfig.Server::hostname) + .distinct() + .collect(Collectors.toList()); } Path makeAbsolutePath(String filename) { diff --git a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java index 712e37be452..4ef73ec2374 100644 --- a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java +++ b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java @@ -86,12 +86,14 @@ public class Reconfigurer extends AbstractComponent { if (newConfig.server().size() == 1) shutdownAndDie(server, Duration.ZERO); List<String> newServers = difference(servers(newConfig), servers(activeConfig)); - String leavingServers = String.join(",", difference(serverIds(activeConfig), serverIds(newConfig))); - String joiningServers = String.join(",", newServers); - leavingServers = leavingServers.isEmpty() ? null : leavingServers; - joiningServers = joiningServers.isEmpty() ? null : joiningServers; - log.log(Level.INFO, "Will reconfigure ZooKeeper cluster. Joining servers: " + joiningServers + - ", leaving servers: " + leavingServers); + String leavingServerIds = String.join(",", serverIdsDifference(activeConfig, newConfig)); + String joiningServersSpec = String.join(",", newServers); + leavingServerIds = leavingServerIds.isEmpty() ? null : leavingServerIds; + joiningServersSpec = joiningServersSpec.isEmpty() ? null : joiningServersSpec; + log.log(Level.INFO, "Will reconfigure ZooKeeper cluster. \nJoining servers: " + joiningServersSpec + + "\nleaving servers: " + leavingServerIds + + "\nServers in active config:" + servers(activeConfig) + + "\nServers in new config:" + servers(newConfig)); String connectionSpec = localConnectionSpec(activeConfig); Instant now = Instant.now(); Duration reconfigTimeout = reconfigTimeout(newServers.size()); @@ -100,7 +102,7 @@ public class Reconfigurer extends AbstractComponent { for (int attempt = 1; now.isBefore(end); attempt++) { try { Instant reconfigStarted = Instant.now(); - vespaZooKeeperAdmin.reconfigure(connectionSpec, joiningServers, leavingServers); + vespaZooKeeperAdmin.reconfigure(connectionSpec, joiningServersSpec, leavingServerIds); Instant reconfigEnded = Instant.now(); log.log(Level.INFO, "Reconfiguration completed in " + Duration.between(reconfigTriggered, reconfigEnded) + @@ -139,11 +141,10 @@ public class Reconfigurer extends AbstractComponent { return HostName.getLocalhost() + ":" + config.clientPort(); } - private static List<String> serverIds(ZookeeperServerConfig config) { - return config.server().stream() - .map(ZookeeperServerConfig.Server::id) - .map(String::valueOf) - .collect(Collectors.toList()); + private static List<String> serverIdsDifference(ZookeeperServerConfig oldConfig, ZookeeperServerConfig newConfig) { + return difference(servers(oldConfig), servers(newConfig)).stream() + .map(server -> server.substring(0, server.indexOf('='))) + .collect(Collectors.toList()); } private static List<String> servers(ZookeeperServerConfig config) { diff --git a/zookeeper-server/zookeeper-server-common/src/test/java/com/yahoo/vespa/zookeeper/ReconfigurerTest.java b/zookeeper-server/zookeeper-server-common/src/test/java/com/yahoo/vespa/zookeeper/ReconfigurerTest.java index a1b98a23bd0..5cee0de2b6e 100644 --- a/zookeeper-server/zookeeper-server-common/src/test/java/com/yahoo/vespa/zookeeper/ReconfigurerTest.java +++ b/zookeeper-server/zookeeper-server-common/src/test/java/com/yahoo/vespa/zookeeper/ReconfigurerTest.java @@ -13,7 +13,7 @@ import java.io.File; import java.io.IOException; import java.nio.file.Path; import java.time.Duration; -import java.util.stream.IntStream; +import java.util.Arrays; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; @@ -68,6 +68,14 @@ public class ReconfigurerTest { assertNull("No servers are joining", reconfigurer.joiningServers()); assertEquals("3,4", reconfigurer.leavingServers()); assertSame(nextConfig, reconfigurer.activeConfig()); + + // Cluster loses node1, but node3 joins. Indices are shuffled. + nextConfig = createConfig(3, true, 1); + reconfigurer.startOrReconfigure(nextConfig); + assertEquals(3, reconfigurer.reconfigurations()); + assertEquals("1=node2:2182:2183;2181,2=node3:2182:2183;2181", reconfigurer.joiningServers()); + assertEquals("1,2", reconfigurer.leavingServers()); + assertSame(nextConfig, reconfigurer.activeConfig()); } @Test @@ -107,11 +115,15 @@ public class ReconfigurerTest { reconfigurer.shutdown(); } - private ZookeeperServerConfig createConfig(int numberOfServers, boolean dynamicReconfiguration) { + private ZookeeperServerConfig createConfig(int numberOfServers, boolean dynamicReconfiguration, int... skipIndices) { + Arrays.sort(skipIndices); ZookeeperServerConfig.Builder builder = new ZookeeperServerConfig.Builder(); builder.zooKeeperConfigFile(cfgFile.getAbsolutePath()); builder.myidFile(idFile.getAbsolutePath()); - IntStream.range(0, numberOfServers).forEach(i -> builder.server(newServer(i, "node" + i))); + for (int i = 0, index = 0; i < numberOfServers; i++, index++) { + while (Arrays.binarySearch(skipIndices, index) >= 0) index++; + builder.server(newServer(i, "node" + index)); + } builder.myid(0); builder.dynamicReconfiguration(dynamicReconfiguration); return builder.build(); |