diff options
48 files changed, 905 insertions, 142 deletions
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/admin/monitoring/VespaMetricSet.java b/config-model/src/main/java/com/yahoo/vespa/model/admin/monitoring/VespaMetricSet.java index a503fb00825..2243dc1e682 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/admin/monitoring/VespaMetricSet.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/admin/monitoring/VespaMetricSet.java @@ -233,6 +233,7 @@ public class VespaMetricSet { // transaction log metrics.add(new Metric("content.proton.transactionlog.entries.average")); metrics.add(new Metric("content.proton.transactionlog.disk_usage.average")); + metrics.add(new Metric("content.proton.transactionlog.replay_time.last")); // document store metrics.add(new Metric("content.proton.documentdb.ready.document_store.disk_usage.average")); diff --git a/config/src/tests/configagent/configagent.cpp b/config/src/tests/configagent/configagent.cpp index f6593890975..bc90cb458db 100644 --- a/config/src/tests/configagent/configagent.cpp +++ b/config/src/tests/configagent/configagent.cpp @@ -100,6 +100,9 @@ public: void handle(std::unique_ptr<ConfigUpdate> update) override { + if (_update) { + update->merge(*_update); + } _update = std::move(update); } diff --git a/config/src/tests/configholder/configholder.cpp b/config/src/tests/configholder/configholder.cpp index 83249da3824..2c6fa2016bf 100644 --- a/config/src/tests/configholder/configholder.cpp +++ b/config/src/tests/configholder/configholder.cpp @@ -46,7 +46,7 @@ TEST("Require that polling for elements work") holder.handle(ConfigUpdate::UP(new ConfigUpdate(value, true, 0))); ASSERT_TRUE(holder.poll()); holder.provide(); - ASSERT_TRUE(holder.poll()); + ASSERT_FALSE(holder.poll()); } TEST_MT_F("Require that wait is interrupted", 2, ConfigHolder) diff --git a/config/src/tests/subscriber/subscriber.cpp b/config/src/tests/subscriber/subscriber.cpp index 7613ba41234..39a537486cd 100644 --- a/config/src/tests/subscriber/subscriber.cpp +++ b/config/src/tests/subscriber/subscriber.cpp @@ -341,22 +341,22 @@ TEST_FF("requireThatHandlesAreMarkedAsChanged", MyManager, APIFixture(f1)) { ConfigSubscriber s(IConfigContext::SP(new APIFixture(f2))); ConfigHandle<FooConfig>::UP h1 = s.subscribe<FooConfig>("myid2"); ConfigHandle<BarConfig>::UP h2 = s.subscribe<BarConfig>("myid2"); - ASSERT_FALSE(s.nextConfig(0)); + EXPECT_FALSE(s.nextConfig(0)); f1.updateValue(0, createFooValue("foo"), 1); f1.updateValue(1, createFooValue("bar"), 1); - ASSERT_TRUE(s.nextConfig(100)); - ASSERT_TRUE(h1->isChanged()); - ASSERT_TRUE(h2->isChanged()); + EXPECT_TRUE(s.nextConfig(100)); + EXPECT_TRUE(h1->isChanged()); + EXPECT_TRUE(h2->isChanged()); - ASSERT_FALSE(s.nextConfig(100)); - ASSERT_FALSE(h1->isChanged()); - ASSERT_FALSE(h2->isChanged()); + EXPECT_FALSE(s.nextConfig(100)); + EXPECT_FALSE(h1->isChanged()); + EXPECT_FALSE(h2->isChanged()); f1.updateValue(0, createFooValue("bar"), 2); f1.updateGeneration(1, 2); - ASSERT_TRUE(s.nextConfig(100)); - ASSERT_TRUE(h1->isChanged()); - ASSERT_FALSE(h2->isChanged()); + EXPECT_TRUE(s.nextConfig(100)); + EXPECT_TRUE(h1->isChanged()); + EXPECT_FALSE(h2->isChanged()); } TEST_FF("requireThatNextGenerationMarksChanged", MyManager, APIFixture(f1)) { diff --git a/config/src/tests/subscription/subscription.cpp b/config/src/tests/subscription/subscription.cpp index 5d36753813c..a9e4c923e92 100644 --- a/config/src/tests/subscription/subscription.cpp +++ b/config/src/tests/subscription/subscription.cpp @@ -101,11 +101,11 @@ TEST_MT_F("requireThatNextUpdateReturnsInterrupted", 2, SubscriptionFixture(Conf TEST_F("Require that isChanged takes generation into account", SubscriptionFixture(ConfigKey::create<MyConfig>("myid"))) { - f1.holder->handle(ConfigUpdate::UP(new ConfigUpdate(ConfigValue(), true, 1))); + f1.holder->handle(ConfigUpdate::UP(new ConfigUpdate(ConfigValue(std::vector<vespalib::string>(), "a"), true, 1))); ASSERT_TRUE(f1.sub.nextUpdate(0, 0)); f1.sub.flip(); ASSERT_EQUAL(1, f1.sub.getLastGenerationChanged()); - f1.holder->handle(ConfigUpdate::UP(new ConfigUpdate(ConfigValue(), true, 2))); + f1.holder->handle(ConfigUpdate::UP(new ConfigUpdate(ConfigValue(std::vector<vespalib::string>(), "b"), true, 2))); ASSERT_TRUE(f1.sub.nextUpdate(1, 0)); f1.sub.flip(); ASSERT_EQUAL(2, f1.sub.getLastGenerationChanged()); diff --git a/config/src/vespa/config/common/configholder.cpp b/config/src/vespa/config/common/configholder.cpp index 6eeb1bd79a2..e2e6ae87688 100644 --- a/config/src/vespa/config/common/configholder.cpp +++ b/config/src/vespa/config/common/configholder.cpp @@ -16,14 +16,16 @@ ConfigUpdate::UP ConfigHolder::provide() { vespalib::MonitorGuard guard(_monitor); - ConfigUpdate::UP ret(new ConfigUpdate(*_current)); - return ret; + return std::move(_current); } void ConfigHolder::handle(ConfigUpdate::UP update) { vespalib::MonitorGuard guard(_monitor); + if (_current) { + update->merge(*_current); + } _current = std::move(update); guard.broadcast(); } diff --git a/config/src/vespa/config/common/configupdate.h b/config/src/vespa/config/common/configupdate.h index 7371d525266..9afdb9a5a40 100644 --- a/config/src/vespa/config/common/configupdate.h +++ b/config/src/vespa/config/common/configupdate.h @@ -19,6 +19,7 @@ public: const ConfigValue & getValue() const; bool hasChanged() const; int64_t getGeneration() const; + void merge(const ConfigUpdate & b) { _hasChanged = _hasChanged || b.hasChanged(); } private: ConfigValue _value; bool _hasChanged; diff --git a/config/src/vespa/config/frt/frtconfigagent.cpp b/config/src/vespa/config/frt/frtconfigagent.cpp index f475cff0a9a..c1516a7de11 100644 --- a/config/src/vespa/config/frt/frtconfigagent.cpp +++ b/config/src/vespa/config/frt/frtconfigagent.cpp @@ -61,16 +61,17 @@ FRTConfigAgent::handleUpdatedGeneration(const ConfigKey & key, const ConfigState LOG(spam, "Old config: md5:%s \n%s", _latest.getMd5().c_str(), _latest.asJson().c_str()); LOG(spam, "New config: md5:%s \n%s", configValue.getMd5().c_str(), configValue.asJson().c_str()); } + bool changed = false; if (_latest.getMd5() != configValue.getMd5()) { _latest = configValue; + changed = true; } _configState = newState; - if (LOG_WOULD_LOG(spam)) { LOG(spam, "updating holder for key %s,", key.toString().c_str()); } - _holder->handle(ConfigUpdate::UP(new ConfigUpdate(_latest, true, newState.generation))); + _holder->handle(ConfigUpdate::UP(new ConfigUpdate(_latest, changed, newState.generation))); _numConfigured++; } diff --git a/config/src/vespa/config/subscription/configsubscription.cpp b/config/src/vespa/config/subscription/configsubscription.cpp index d97ddfd57fb..efa3f5a02e3 100644 --- a/config/src/vespa/config/subscription/configsubscription.cpp +++ b/config/src/vespa/config/subscription/configsubscription.cpp @@ -28,9 +28,10 @@ ConfigSubscription::~ConfigSubscription() bool ConfigSubscription::nextUpdate(int64_t generation, uint64_t timeoutInMillis) { - if (_closed || !_holder->poll()) + if (_closed || !_holder->poll()) { return false; - _next.reset(_holder->provide().release()); + } + _next = _holder->provide(); if (isGenerationNewer(_next->getGeneration(), generation)) { return true; } @@ -38,9 +39,15 @@ ConfigSubscription::nextUpdate(int64_t generation, uint64_t timeoutInMillis) } bool +ConfigSubscription::hasGenerationChanged() const +{ + return (!_closed && _next && ((_current && (_current->getGeneration() != _next->getGeneration())) || ! _current)); +} + +bool ConfigSubscription::hasChanged() const { - return (!_closed && (_next->hasChanged() || _current.get() == NULL)); + return (!_closed && _next && ((_next->hasChanged() && _current && (_current->getValue() != _next->getValue())) || ! _current)); } int64_t @@ -88,7 +95,7 @@ ConfigSubscription::flip() { bool change = hasChanged(); if (change) { - _current.reset(_next.release()); + _current = std::move(_next); _lastGenerationChanged = _current->getGeneration(); } else { _current.reset(new ConfigUpdate(_current->getValue(), false, _next->getGeneration())); @@ -96,13 +103,15 @@ ConfigSubscription::flip() _isChanged = change; } -ConfigValue +const ConfigValue & ConfigSubscription::getConfig() const { - if (_closed) + if (_closed) { throw ConfigRuntimeException("Subscription is closed, config no longer available"); - if (_current.get() == NULL) + } + if ( ! _current) { throw ConfigRuntimeException("No configuration available"); + } return _current->getValue(); } diff --git a/config/src/vespa/config/subscription/configsubscription.h b/config/src/vespa/config/subscription/configsubscription.h index 04c3da114fd..56b65922a61 100644 --- a/config/src/vespa/config/subscription/configsubscription.h +++ b/config/src/vespa/config/subscription/configsubscription.h @@ -28,7 +28,7 @@ public: * * @return the current ConfigValue. */ - ConfigValue getConfig() const; + const ConfigValue & getConfig() const; /** * Checks whether or not the config has changed. @@ -48,6 +48,7 @@ public: bool nextUpdate(int64_t generation, uint64_t timeoutInMillis); int64_t getGeneration() const; bool hasChanged() const; + bool hasGenerationChanged() const; void flip(); void reset(); void close(); diff --git a/config/src/vespa/config/subscription/configsubscriptionset.cpp b/config/src/vespa/config/subscription/configsubscriptionset.cpp index f6268c8a84a..ab38ba18351 100644 --- a/config/src/vespa/config/subscription/configsubscriptionset.cpp +++ b/config/src/vespa/config/subscription/configsubscriptionset.cpp @@ -38,10 +38,6 @@ ConfigSubscriptionSet::acquireSnapshot(uint64_t timeoutInMillis, bool ignoreChan int64_t lastGeneration = _currentGeneration; bool inSync = false; - for (const auto & subscription : _subscriptionList) { - subscription->reset(); - } - LOG(debug, "Going into nextConfig loop, time left is %d", timeLeft); while (_state != CLOSED && timeLeft >= 0 && !inSync) { size_t numChanged = 0; @@ -52,8 +48,10 @@ ConfigSubscriptionSet::acquireSnapshot(uint64_t timeoutInMillis, bool ignoreChan // Run nextUpdate on all subscribers to get them in sync. for (const auto & subscription : _subscriptionList) { - if (!subscription->nextUpdate(_currentGeneration, timeLeft)) - break; + if (!subscription->nextUpdate(_currentGeneration, timeLeft) && !subscription->hasGenerationChanged()) { + subscription->reset(); + continue; + } const ConfigKey & key(subscription->getKey()); if (subscription->hasChanged()) { diff --git a/configdefinitions/src/vespa/configserver.def b/configdefinitions/src/vespa/configserver.def index 3c99875f978..cbc2317da2d 100644 --- a/configdefinitions/src/vespa/configserver.def +++ b/configdefinitions/src/vespa/configserver.def @@ -46,5 +46,6 @@ dockerVespaBaseImage string default="" # Athenz config loadBalancerAddress string default="" -# File distributions +# File distribution disableFiledistributor bool default=false +usechunkedtransfer bool default=true diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/ConfigServerBootstrap.java b/configserver/src/main/java/com/yahoo/vespa/config/server/ConfigServerBootstrap.java index 58c61134bc4..6ab98f5af1c 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/ConfigServerBootstrap.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/ConfigServerBootstrap.java @@ -4,6 +4,7 @@ package com.yahoo.vespa.config.server; import com.google.inject.Inject; import com.yahoo.component.AbstractComponent; import com.yahoo.config.provision.Deployer; +import com.yahoo.container.jdisc.state.StateMonitor; import com.yahoo.log.LogLevel; import com.yahoo.vespa.config.server.rpc.RpcServer; import com.yahoo.vespa.config.server.version.VersionState; @@ -23,17 +24,19 @@ public class ConfigServerBootstrap extends AbstractComponent implements Runnable private final Thread serverThread; private final Deployer deployer; private final VersionState versionState; + private final StateMonitor stateMonitor; // The tenants object is injected so that all initial requests handlers are // added to the rpc server before it starts answering rpc requests. - @SuppressWarnings("UnusedParameters") + @SuppressWarnings("WeakerAccess") @Inject public ConfigServerBootstrap(ApplicationRepository applicationRepository, RpcServer server, - Deployer deployer, VersionState versionState) { + Deployer deployer, VersionState versionState, StateMonitor stateMonitor) { this.applicationRepository = applicationRepository; this.server = server; this.deployer = deployer; this.versionState = versionState; + this.stateMonitor = stateMonitor; this.serverThread = new Thread(this, "configserver main"); serverThread.start(); } @@ -62,9 +65,11 @@ public class ConfigServerBootstrap extends AbstractComponent implements Runnable log.log(LogLevel.INFO, "All applications redeployed"); } versionState.saveNewVersion(); + stateMonitor.status(StateMonitor.Status.up); log.log(LogLevel.DEBUG, "Starting RPC server"); server.run(); log.log(LogLevel.DEBUG, "RPC server stopped"); + stateMonitor.status(StateMonitor.Status.down); } } diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java index b1277682849..81f5e62016a 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java @@ -20,6 +20,8 @@ import com.yahoo.vespa.config.server.ConfigServerSpec; import com.yahoo.vespa.filedistribution.CompressedFileReference; import com.yahoo.vespa.filedistribution.FileDownloader; import com.yahoo.vespa.filedistribution.FileReferenceData; +import com.yahoo.vespa.filedistribution.FileReferenceDataBlob; +import com.yahoo.vespa.filedistribution.LazyFileReferenceData; import java.io.File; import java.io.IOException; @@ -110,7 +112,7 @@ public class FileServer { log.info("Start serving reference '" + reference.value() + "' with file '" + file.getAbsolutePath() + "'"); boolean success = false; String errorDescription = "OK"; - FileReferenceData fileData = FileReferenceData.empty(reference, file.getName()); + FileReferenceData fileData = FileReferenceDataBlob.empty(reference, file.getName()); try { fileData = readFileReferenceData(reference); success = true; @@ -124,21 +126,16 @@ public class FileServer { log.info("Done serving reference '" + reference.toString() + "' with file '" + file.getAbsolutePath() + "'"); } - private FileReferenceData readFileReferenceData(FileReference reference) throws IOException { File file = root.getFile(reference); - byte[] blob; - FileReferenceData.Type type; if (file.isDirectory()) { - type = FileReferenceData.Type.compressed; - blob = CompressedFileReference.compress(file.getParentFile()); + //TODO Here we should compress to file, but then we have to clean up too. Pending. + byte [] blob = CompressedFileReference.compress(file.getParentFile()); + return new FileReferenceDataBlob(reference, file.getName(), FileReferenceData.Type.compressed, blob); } else { - type = FileReferenceData.Type.file; - blob = IOUtils.readFileBytes(file); + return new LazyFileReferenceData(reference, file.getName(), FileReferenceData.Type.file, file); } - - return new FileReferenceData(reference, file.getName(), type, blob); } public void serveFile(Request request, Receiver receiver) { pullExecutor.execute(() -> serveFile(request.parameters().get(0).asString(), request, receiver)); diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java index 17368b48e59..a09087ad8d3 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java @@ -41,8 +41,10 @@ import com.yahoo.vespa.config.server.monitoring.MetricUpdaterFactory; import com.yahoo.vespa.config.server.tenant.TenantHandlerProvider; import com.yahoo.vespa.config.server.tenant.TenantListener; import com.yahoo.vespa.config.server.tenant.Tenants; +import com.yahoo.vespa.filedistribution.FileReceiver; import com.yahoo.vespa.filedistribution.FileReferenceData; +import java.nio.ByteBuffer; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -93,6 +95,7 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener { private final FileServer fileServer; private final ThreadPoolExecutor executorService; + private final boolean useChunkedFileTransfer; private volatile boolean allTenantsLoaded = false; /** @@ -118,6 +121,7 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener { this.useRequestVersion = config.useVespaVersionInRequest(); this.hostedVespa = config.hostedVespa(); this.fileServer = fileServer; + this.useChunkedFileTransfer = config.usechunkedtransfer(); setUpHandlers(); } @@ -413,9 +417,9 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener { return useRequestVersion; } - class FileReceiver implements FileServer.Receiver { + class WholeFileReceiver implements FileServer.Receiver { Target target; - FileReceiver(Target target) { + WholeFileReceiver(Target target) { this.target = target; } @@ -426,11 +430,11 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener { @Override public void receive(FileReferenceData fileData, FileServer.ReplayStatus status) { - Request fileBlob = new Request("filedistribution.receiveFile"); + Request fileBlob = new Request(FileReceiver.RECEIVE_METHOD); fileBlob.parameters().add(new StringValue(fileData.fileReference().value())); fileBlob.parameters().add(new StringValue(fileData.filename())); fileBlob.parameters().add(new StringValue(fileData.type().name())); - fileBlob.parameters().add(new DataValue(fileData.content())); + fileBlob.parameters().add(new DataValue(fileData.content().array())); fileBlob.parameters().add(new Int64Value(fileData.xxhash())); fileBlob.parameters().add(new Int32Value(status.getCode())); fileBlob.parameters().add(new StringValue(status.getDescription())); @@ -442,9 +446,104 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener { } } + class ChunkedFileReceiver implements FileServer.Receiver { + Target target; + ChunkedFileReceiver(Target target) { + this.target = target; + } + + @Override + public String toString() { + return target.toString(); + } + + @Override + public void receive(FileReferenceData fileData, FileServer.ReplayStatus status) { + int session = sendMeta(fileData); + sendParts(session, fileData); + sendEof(session, fileData, status); + Request fileBlob = new Request(FileReceiver.RECEIVE_META_METHOD); + fileBlob.parameters().add(new StringValue(fileData.fileReference().value())); + fileBlob.parameters().add(new StringValue(fileData.filename())); + fileBlob.parameters().add(new StringValue(fileData.type().name())); + fileBlob.parameters().add(new Int64Value(fileData.size())); + target.invokeSync(fileBlob, 600); + if (fileBlob.isError()) { + log.warning("Failed delivering reference '" + fileData.fileReference().value() + "' with file '" + fileData.filename() + "' to " + + target.toString() + " with error: '" + fileBlob.errorMessage() + "'."); + } + } + private void sendParts(int session, FileReferenceData fileData) { + ByteBuffer bb = ByteBuffer.allocate(0x100000); + for (int partId = 0, read = fileData.nextContent(bb); read >= 0; partId++, read = fileData.nextContent(bb)) { + byte [] buf = bb.array(); + if (buf.length != bb.position()) { + buf = new byte [bb.position()]; + bb.flip(); + bb.get(buf); + } + sendPart(session, fileData.fileReference(), partId, buf); + bb.clear(); + } + } + private int sendMeta(FileReferenceData fileData) { + Request request = new Request(FileReceiver.RECEIVE_META_METHOD); + request.parameters().add(new StringValue(fileData.fileReference().value())); + request.parameters().add(new StringValue(fileData.filename())); + request.parameters().add(new StringValue(fileData.type().name())); + request.parameters().add(new Int64Value(fileData.size())); + target.invokeSync(request, 600); + if (request.isError()) { + log.warning("Failed delivering meta for reference '" + fileData.fileReference().value() + "' with file '" + fileData.filename() + "' to " + + target.toString() + " with error: '" + request.errorMessage() + "'."); + } + int retCode = request.returnValues().get(0).asInt32(); + if (retCode != 0) { + throw new IllegalArgumentException("Unknown error from target '" + target.toString() + "' during rpc call " + request.methodName()); + } + return request.returnValues().get(1).asInt32(); + } + private void sendPart(int session, FileReference ref, int partId, byte [] buf) { + Request request = new Request(FileReceiver.RECEIVE_PART_METHOD); + request.parameters().add(new StringValue(ref.value())); + request.parameters().add(new Int32Value(session)); + request.parameters().add(new Int32Value(partId)); + request.parameters().add(new DataValue(buf)); + target.invokeSync(request, 600); + if (request.isError()) { + log.warning("Failed delivering reference '" + ref.value() + "' to " + + target.toString() + " with error: '" + request.errorMessage() + "'."); + } + int retCode = request.returnValues().get(0).asInt32(); + if (retCode != 0) { + throw new IllegalArgumentException("Unknown error from target '" + target.toString() + "' during rpc call " + request.methodName()); + } + } + private void sendEof(int session, FileReferenceData fileData, FileServer.ReplayStatus status) { + Request request = new Request(FileReceiver.RECEIVE_EOF_METHOD); + request.parameters().add(new StringValue(fileData.fileReference().value())); + request.parameters().add(new Int32Value(session)); + request.parameters().add(new Int64Value(fileData.xxhash())); + request.parameters().add(new Int32Value(status.getCode())); + request.parameters().add(new StringValue(status.getDescription())); + target.invokeSync(request, 600); + if (request.isError()) { + log.warning("Failed delivering reference '" + fileData.fileReference().value() + "' with file '" + fileData.filename() + "' to " + + target.toString() + " with error: '" + request.errorMessage() + "'."); + } + int retCode = request.returnValues().get(0).asInt32(); + if (retCode != 0) { + throw new IllegalArgumentException("Unknown error from target '" + target.toString() + "' during rpc call " + request.methodName()); + } + } + } + @SuppressWarnings("UnusedDeclaration") public final void serveFile(Request request) { request.detach(); - fileServer.serveFile(request, new FileReceiver(request.target())); + FileServer.Receiver receiver = useChunkedFileTransfer + ? new ChunkedFileReceiver(request.target()) + : new WholeFileReceiver(request.target()); + fileServer.serveFile(request, receiver); } } diff --git a/configserver/src/main/resources/configserver-app/services.xml b/configserver/src/main/resources/configserver-app/services.xml index eddba1ee768..fd77fedd789 100644 --- a/configserver/src/main/resources/configserver-app/services.xml +++ b/configserver/src/main/resources/configserver-app/services.xml @@ -6,6 +6,10 @@ <maxthreads>100</maxthreads> <!-- Reduced thread count to minimize memory consumption --> </config> + <config name="container.jdisc.config.health-monitor"> + <initialStatus>initializing</initialStatus> + </config> + <accesslog type="vespa" fileNamePattern="logs/vespa/configserver/access.log.%Y%m%d%H%M%S" rotationScheme="date" symlinkName="access.log" /> <preprocess:include file='access-logging.xml' required='false' /> <component id="com.yahoo.vespa.config.server.ConfigServerBootstrap" bundle="configserver" /> diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/ConfigServerBootstrapTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/ConfigServerBootstrapTest.java index e0d65055f21..384ae0853d8 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/ConfigServerBootstrapTest.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/ConfigServerBootstrapTest.java @@ -6,7 +6,10 @@ import com.yahoo.config.provision.ApplicationId; import com.yahoo.config.provision.ApplicationName; import com.yahoo.config.provision.InstanceName; import com.yahoo.config.provision.TenantName; +import com.yahoo.container.jdisc.config.HealthMonitorConfig; +import com.yahoo.container.jdisc.state.StateMonitor; import com.yahoo.io.IOUtils; +import com.yahoo.jdisc.core.SystemTimer; import com.yahoo.vespa.config.server.deploy.MockDeployer; import com.yahoo.vespa.config.server.host.HostRegistries; import com.yahoo.vespa.config.server.http.SessionHandlerTest; @@ -67,7 +70,9 @@ public class ConfigServerBootstrapTest extends TestWithTenant { assertFalse(myServer.stopped); VersionState versionState = new VersionState(versionFile); assertTrue(versionState.isUpgraded()); - ConfigServerBootstrap bootstrap = new ConfigServerBootstrap(applicationRepository, rpc, (application, timeout) -> Optional.empty(), versionState); + ConfigServerBootstrap bootstrap = + new ConfigServerBootstrap(applicationRepository, rpc, (application, timeout) -> Optional.empty(), versionState, + new StateMonitor(new HealthMonitorConfig(new HealthMonitorConfig.Builder()), new SystemTimer())); waitUntilStarted(rpc, 60000); assertFalse(versionState.isUpgraded()); assertThat(versionState.currentVersion(), is(versionState.storedVersion())); diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java index 5fcaee6e590..b0dce359d58 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java @@ -103,7 +103,7 @@ public class FileServerTest { } @Override public void receive(FileReferenceData fileData, FileServer.ReplayStatus status) { - this.content.complete(fileData.content()); + this.content.complete(fileData.content().array()); } } diff --git a/container-core/src/main/java/com/yahoo/container/jdisc/state/StateHandler.java b/container-core/src/main/java/com/yahoo/container/jdisc/state/StateHandler.java index ce9779b83d9..23c247fa438 100644 --- a/container-core/src/main/java/com/yahoo/container/jdisc/state/StateHandler.java +++ b/container-core/src/main/java/com/yahoo/container/jdisc/state/StateHandler.java @@ -190,7 +190,7 @@ public class StateHandler extends AbstractRequestHandler { private JSONObjectWithLegibleException buildJsonForConsumer(String consumer) throws JSONException { JSONObjectWithLegibleException ret = new JSONObjectWithLegibleException(); ret.put("time", timer.currentTimeMillis()); - ret.put("status", new JSONObjectWithLegibleException().put("code", "up")); + ret.put("status", new JSONObjectWithLegibleException().put("code", getStatus().name())); ret.put(METRICS_PATH, buildJsonForSnapshot(consumer, getSnapshot())); return ret; } @@ -203,6 +203,10 @@ public class StateHandler extends AbstractRequestHandler { } } + private StateMonitor.Status getStatus() { + return monitor.status(); + } + private JSONObjectWithLegibleException buildJsonForSnapshot(String consumer, MetricSnapshot metricSnapshot) throws JSONException { if (metricSnapshot == null) { return new JSONObjectWithLegibleException(); diff --git a/container-core/src/main/java/com/yahoo/container/jdisc/state/StateMonitor.java b/container-core/src/main/java/com/yahoo/container/jdisc/state/StateMonitor.java index 140257cbcef..6234a96d7a0 100644 --- a/container-core/src/main/java/com/yahoo/container/jdisc/state/StateMonitor.java +++ b/container-core/src/main/java/com/yahoo/container/jdisc/state/StateMonitor.java @@ -6,6 +6,7 @@ import com.yahoo.component.AbstractComponent; import com.yahoo.container.jdisc.config.HealthMonitorConfig; import com.yahoo.jdisc.Timer; import com.yahoo.jdisc.application.MetricConsumer; +import com.yahoo.log.LogLevel; import java.util.Map; import java.util.TreeSet; @@ -14,7 +15,7 @@ import java.util.concurrent.TimeUnit; import java.util.logging.Logger; /** - * A statemonitor keeps track of the current metrics state of a container. + * A state monitor keeps track of the current health and metrics state of a container. * It is used by jDisc to hand out metric update API endpoints to workers through {@link #newMetricConsumer}, * and to inspect the current accumulated state of metrics through {@link #snapshot}. * @@ -23,12 +24,16 @@ import java.util.logging.Logger; public class StateMonitor extends AbstractComponent { private final static Logger log = Logger.getLogger(StateMonitor.class.getName()); + + public enum Status {up, down, initializing}; + private final CopyOnWriteArrayList<StateMetricConsumer> consumers = new CopyOnWriteArrayList<>(); private final Thread thread; private final Timer timer; private final long snapshotIntervalMs; private long lastSnapshotTimeMs; private volatile MetricSnapshot snapshot; + private volatile Status status; private final TreeSet<String> valueNames = new TreeSet<>(); @Inject @@ -36,13 +41,8 @@ public class StateMonitor extends AbstractComponent { this.timer = timer; this.snapshotIntervalMs = (long)(config.snapshot_interval() * TimeUnit.SECONDS.toMillis(1)); this.lastSnapshotTimeMs = timer.currentTimeMillis(); - thread = new Thread(new Runnable() { - - @Override - public void run() { - StateMonitor.this.run(); - } - }, "StateMonitor"); + this.status = Status.valueOf(config.initialStatus()); + thread = new Thread(StateMonitor.this::run, "StateMonitor"); thread.setDaemon(true); thread.start(); } @@ -54,6 +54,13 @@ public class StateMonitor extends AbstractComponent { return consumer; } + public void status(Status status) { + log.log(LogLevel.INFO, "Changing health status code from '" + this.status + "' to '" + status.name() + "'"); + this.status = status; + } + + public Status status() { return status; } + /** Returns the last snapshot taken of the metrics in this system */ public MetricSnapshot snapshot() { return snapshot; diff --git a/container-core/src/main/resources/configdefinitions/health-monitor.def b/container-core/src/main/resources/configdefinitions/health-monitor.def index dc5cdbc6ca4..5e70c72ae3f 100644 --- a/container-core/src/main/resources/configdefinitions/health-monitor.def +++ b/container-core/src/main/resources/configdefinitions/health-monitor.def @@ -4,3 +4,6 @@ namespace=container.jdisc.config # How far between snapshots. 5 minutes by default snapshot_interval double default=300 + +# Initial status used in /state/v1/health API (value for 'code' in 'status'). See StateMonitor for valid values +initialStatus string default="up" diff --git a/container-dependency-versions/pom.xml b/container-dependency-versions/pom.xml index 2e23e550352..4d639ceedab 100644 --- a/container-dependency-versions/pom.xml +++ b/container-dependency-versions/pom.xml @@ -421,7 +421,7 @@ <findbugs.version>1.3.9</findbugs.version> <guava.version>18.0</guava.version> <guice.version>3.0</guice.version> - <jetty.version>9.4.6.v20170531</jetty.version> + <jetty.version>9.4.8.v20171121</jetty.version> <slf4j.version>1.7.5</slf4j.version> <!-- These must be kept in sync with version used by current jersey2.version. --> diff --git a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/dns/MemoryNameService.java b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/dns/MemoryNameService.java index 9f4af82c5b0..ec9cf0b3436 100644 --- a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/dns/MemoryNameService.java +++ b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/dns/MemoryNameService.java @@ -39,7 +39,7 @@ public class MemoryNameService implements NameService { public Optional<Record> findRecord(Record.Type type, RecordData data) { return records.values() .stream() - .filter(record -> record.type() == type && record.value().equals(data)) + .filter(record -> record.type() == type && record.data().equals(data)) .findFirst(); } diff --git a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/dns/Record.java b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/dns/Record.java index fd9bddac2c6..b51202e8261 100644 --- a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/dns/Record.java +++ b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/dns/Record.java @@ -13,13 +13,13 @@ public class Record { private final RecordId id; private final Type type; private final RecordName name; - private final RecordData value; + private final RecordData data; - public Record(RecordId id, Type type, RecordName name, RecordData value) { + public Record(RecordId id, Type type, RecordName name, RecordData data) { this.id = Objects.requireNonNull(id, "id cannot be null"); this.type = Objects.requireNonNull(type, "type cannot be null"); this.name = Objects.requireNonNull(name, "name cannot be null"); - this.value = Objects.requireNonNull(value, "value cannot be null"); + this.data = Objects.requireNonNull(data, "data cannot be null"); } /** Unique identifier for this */ @@ -32,9 +32,9 @@ public class Record { return type; } - /** Value for this, e.g. IP address for "A" record */ - public RecordData value() { - return value; + /** Data in this, e.g. IP address for "A" record */ + public RecordData data() { + return data; } /** Name of this, e.g. a FQDN for "A" record */ @@ -60,7 +60,7 @@ public class Record { "id=" + id + ", type=" + type + ", name='" + name + '\'' + - ", value='" + value + '\'' + + ", data='" + data + '\'' + '}'; } @@ -72,11 +72,11 @@ public class Record { return Objects.equals(id, record.id) && type == record.type && Objects.equals(name, record.name) && - Objects.equals(value, record.value); + Objects.equals(data, record.data); } @Override public int hashCode() { - return Objects.hash(id, type, name, value); + return Objects.hash(id, type, name, data); } } diff --git a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/dns/RecordData.java b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/dns/RecordData.java index 444ee28f672..e0d19e0fff9 100644 --- a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/dns/RecordData.java +++ b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/dns/RecordData.java @@ -42,8 +42,14 @@ public class RecordData { '}'; } + /** Create a new record containing the given data */ public static RecordData from(String data) { return new RecordData(data); } + /** Create a new record and append a trailing dot to given data, if missing */ + public static RecordData fqdn(String data) { + return from(data.endsWith(".") ? data : data + "."); + } + } diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/ApplicationController.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/ApplicationController.java index b00e76a331a..3f64562f2d2 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/ApplicationController.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/ApplicationController.java @@ -456,10 +456,19 @@ public class ApplicationController { private void registerRotationInDns(Rotation rotation, String dnsName) { try { Optional<Record> record = nameService.findRecord(Record.Type.CNAME, RecordName.from(dnsName)); - if (!record.isPresent()) { - RecordId id = nameService.createCname(RecordName.from(dnsName), RecordData.from(rotation.name())); - log.info("Registered mapping with record ID " + id.asString() + ": " + dnsName + " -> " - + rotation.name()); + RecordData rotationName = RecordData.fqdn(rotation.name()); + if (record.isPresent()) { + // Ensure that the existing record points to the correct rotation + if (!record.get().data().equals(rotationName)) { + // TODO: Enable once verified + //nameService.updateRecord(record.get().id(), rotationName); + log.info("Updated mapping for record ID " + record.get().id().asString() + ": '" + dnsName + + "' -> '" + rotation.name() + "'"); + } + } else { + RecordId id = nameService.createCname(RecordName.from(dnsName), rotationName); + log.info("Registered mapping with record ID " + id.asString() + ": '" + dnsName + "' -> '" + + rotation.name() + "'"); } } catch (RuntimeException e) { log.log(Level.WARNING, "Failed to register CNAME", e); diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/ApplicationRotation.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/ApplicationRotation.java index 80be9e6676a..e4aed04a01c 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/ApplicationRotation.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/ApplicationRotation.java @@ -13,7 +13,7 @@ import java.net.URI; */ public class ApplicationRotation { - private static final String dnsSuffix = "global.vespa.yahooapis.com"; + public static final String DNS_SUFFIX = "global.vespa.yahooapis.com"; private static final int port = 4080; private final URI url; @@ -23,7 +23,7 @@ public class ApplicationRotation { this.url = URI.create(String.format("http://%s.%s.%s:%d/", sanitize(application.application().value()), sanitize(application.tenant().value()), - dnsSuffix, + DNS_SUFFIX, port)); this.id = id; } diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/ControllerMaintenance.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/ControllerMaintenance.java index bc2112ac0ca..f6dc1326d5e 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/ControllerMaintenance.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/ControllerMaintenance.java @@ -4,6 +4,7 @@ package com.yahoo.vespa.hosted.controller.maintenance; import com.yahoo.component.AbstractComponent; import com.yahoo.jdisc.Metric; import com.yahoo.vespa.hosted.controller.Controller; +import com.yahoo.vespa.hosted.controller.api.integration.dns.NameService; import com.yahoo.vespa.hosted.controller.api.integration.organization.OwnershipIssues; import com.yahoo.vespa.hosted.controller.api.integration.organization.DeploymentIssues; import com.yahoo.vespa.hosted.controller.api.integration.chef.Chef; @@ -34,11 +35,13 @@ public class ControllerMaintenance extends AbstractComponent { private final ClusterUtilizationMaintainer clusterUtilizationMaintainer; private final DeploymentMetricsMaintainer deploymentMetricsMaintainer; private final ApplicationOwnershipConfirmer applicationOwnershipConfirmer; + private final DnsMaintainer dnsMaintainer; @SuppressWarnings("unused") // instantiated by Dependency Injection public ControllerMaintenance(MaintainerConfig maintainerConfig, Controller controller, CuratorDb curator, JobControl jobControl, Metric metric, Chef chefClient, - DeploymentIssues deploymentIssues, OwnershipIssues ownershipIssues) { + DeploymentIssues deploymentIssues, OwnershipIssues ownershipIssues, + NameService nameService) { Duration maintenanceInterval = Duration.ofMinutes(maintainerConfig.intervalMinutes()); this.jobControl = jobControl; deploymentExpirer = new DeploymentExpirer(controller, maintenanceInterval, jobControl); @@ -52,6 +55,7 @@ public class ControllerMaintenance extends AbstractComponent { clusterUtilizationMaintainer = new ClusterUtilizationMaintainer(controller, Duration.ofHours(2), jobControl); deploymentMetricsMaintainer = new DeploymentMetricsMaintainer(controller, Duration.ofMinutes(10), jobControl); applicationOwnershipConfirmer = new ApplicationOwnershipConfirmer(controller, Duration.ofHours(12), jobControl, ownershipIssues); + dnsMaintainer = new DnsMaintainer(controller, Duration.ofHours(12), jobControl, nameService); } public Upgrader upgrader() { return upgrader; } @@ -72,6 +76,7 @@ public class ControllerMaintenance extends AbstractComponent { clusterInfoMaintainer.deconstruct(); deploymentMetricsMaintainer.deconstruct(); applicationOwnershipConfirmer.deconstruct(); + dnsMaintainer.deconstruct(); } } diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/DnsMaintainer.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/DnsMaintainer.java new file mode 100644 index 00000000000..89394bf4dd9 --- /dev/null +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/DnsMaintainer.java @@ -0,0 +1,67 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.hosted.controller.maintenance; + +import com.yahoo.vespa.hosted.controller.Controller; +import com.yahoo.vespa.hosted.controller.api.integration.dns.NameService; +import com.yahoo.vespa.hosted.controller.api.integration.dns.Record; +import com.yahoo.vespa.hosted.controller.api.integration.dns.RecordData; +import com.yahoo.vespa.hosted.controller.application.ApplicationRotation; +import com.yahoo.vespa.hosted.controller.rotation.Rotation; +import com.yahoo.vespa.hosted.controller.rotation.RotationId; +import com.yahoo.vespa.hosted.controller.rotation.RotationLock; +import com.yahoo.vespa.hosted.controller.rotation.RotationRepository; + +import java.time.Duration; +import java.util.Map; +import java.util.Optional; +import java.util.logging.Logger; + +/** + * Performs DNS maintenance tasks such as removing DNS aliases for unassigned rotations. + * + * @author mpolden + */ +public class DnsMaintainer extends Maintainer { + + private static final Logger log = Logger.getLogger(DnsMaintainer.class.getName()); + + private final NameService nameService; + + public DnsMaintainer(Controller controller, Duration interval, JobControl jobControl, + NameService nameService) { + super(controller, interval, jobControl); + this.nameService = nameService; + } + + private RotationRepository rotationRepository() { + return controller().applications().rotationRepository(); + } + + @Override + protected void maintain() { + try (RotationLock lock = rotationRepository().lock()) { + Map<RotationId, Rotation> unassignedRotations = rotationRepository().availableRotations(lock); + unassignedRotations.values().forEach(this::removeDnsAlias); + } + } + + /** Remove DNS alias for unassigned rotation */ + private void removeDnsAlias(Rotation rotation) { + // When looking up CNAME by data, the data must be a FQDN + Optional<Record> record = nameService.findRecord(Record.Type.CNAME, RecordData.fqdn(rotation.name())); + record.filter(this::canUpdate) + .ifPresent(r -> { + log.warning(String.format("Want to remove DNS record %s (%s) because it points to the unassigned " + + "rotation %s (%s)", record.get().id().asString(), + record.get().name().asString(), rotation.id().asString(), rotation.name())); + // TODO: Actually remove the record + //nameService.removeRecord(r.id()); + }); + } + + /** Returns whether we can update the given record */ + private boolean canUpdate(Record record) { + return record.name().asString().endsWith(ApplicationRotation.DNS_SUFFIX); + } + +} diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/MetricsReporter.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/MetricsReporter.java index 6bd50365feb..ab388ca9a9f 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/MetricsReporter.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/MetricsReporter.java @@ -10,6 +10,7 @@ import com.yahoo.vespa.hosted.controller.api.integration.chef.Chef; import com.yahoo.vespa.hosted.controller.api.integration.chef.rest.PartialNode; import com.yahoo.vespa.hosted.controller.api.integration.chef.rest.PartialNodeResult; import com.yahoo.vespa.hosted.controller.application.ApplicationList; +import com.yahoo.vespa.hosted.controller.rotation.RotationLock; import java.time.Clock; import java.time.Duration; @@ -58,8 +59,10 @@ public class MetricsReporter extends Maintainer { } private void reportRemainingRotations() { - int availableRotations = controller().applications().rotationRepository().availableRotations().size(); - metric.set(remainingRotations, availableRotations, metric.createContext(Collections.emptyMap())); + try (RotationLock lock = controller().applications().rotationRepository().lock()) { + int availableRotations = controller().applications().rotationRepository().availableRotations(lock).size(); + metric.set(remainingRotations, availableRotations, metric.createContext(Collections.emptyMap())); + } } private void reportChefMetrics() { diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/rotation/RotationRepository.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/rotation/RotationRepository.java index e70d177a641..c0d3fd4758e 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/rotation/RotationRepository.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/rotation/RotationRepository.java @@ -8,7 +8,6 @@ import com.yahoo.vespa.hosted.controller.ApplicationController; import com.yahoo.vespa.hosted.controller.persistence.CuratorDb; import com.yahoo.vespa.hosted.rotation.config.RotationsConfig; -import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.LinkedHashMap; @@ -55,7 +54,7 @@ public class RotationRepository { * @param application The application requesting a rotation * @param lock Lock which must be acquired by the caller */ - public Rotation getRotation(Application application, @SuppressWarnings("unused") RotationLock lock) { + public Rotation getRotation(Application application, RotationLock lock) { if (application.rotation().isPresent()) { return allRotations.get(application.rotation().get().id()); } @@ -70,28 +69,31 @@ public class RotationRepository { if (productionZones < 2) { throw new IllegalArgumentException("global-service-id is set but less than 2 prod zones are defined"); } - return findAvailableRotation(application); + return findAvailableRotation(application, lock); } - /** Returns all unassigned rotations */ - public List<RotationId> availableRotations() { + /** + * Returns all unassigned rotations + * @param lock Lock which must be acquired by the caller + */ + public Map<RotationId, Rotation> availableRotations(@SuppressWarnings("unused") RotationLock lock) { List<RotationId> assignedRotations = applications.asList().stream() .filter(application -> application.rotation().isPresent()) .map(application -> application.rotation().get().id()) .collect(Collectors.toList()); - List<RotationId> allRotations = new ArrayList<>(this.allRotations.keySet()); - allRotations.removeAll(assignedRotations); - return Collections.unmodifiableList(allRotations); + Map<RotationId, Rotation> unassignedRotations = new LinkedHashMap<>(this.allRotations); + assignedRotations.forEach(unassignedRotations::remove); + return Collections.unmodifiableMap(unassignedRotations); } - private Rotation findAvailableRotation(Application application) { - List<RotationId> availableRotations = availableRotations(); + private Rotation findAvailableRotation(Application application, RotationLock lock) { + Map<RotationId, Rotation> availableRotations = availableRotations(lock); if (availableRotations.isEmpty()) { throw new IllegalStateException("Unable to assign global rotation to " + application.id() + " - no rotations available"); } // Return first available rotation - RotationId rotation = availableRotations.get(0); + RotationId rotation = availableRotations.keySet().iterator().next(); log.info(String.format("Offering %s to application %s", rotation, application.id())); return allRotations.get(rotation); } diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/ControllerTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/ControllerTest.java index 3af68befd1d..68cc7ed20fe 100644 --- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/ControllerTest.java +++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/ControllerTest.java @@ -23,6 +23,7 @@ import com.yahoo.vespa.hosted.controller.api.identifiers.PropertyId; import com.yahoo.vespa.hosted.controller.api.identifiers.TenantId; import com.yahoo.vespa.hosted.controller.api.identifiers.UserGroup; import com.yahoo.vespa.hosted.controller.api.integration.BuildService.BuildJob; +import com.yahoo.vespa.hosted.controller.api.integration.athenz.NToken; import com.yahoo.vespa.hosted.controller.api.integration.dns.Record; import com.yahoo.vespa.hosted.controller.api.integration.dns.RecordName; import com.yahoo.vespa.hosted.controller.application.ApplicationPackage; @@ -32,15 +33,17 @@ import com.yahoo.vespa.hosted.controller.application.DeploymentJobs; import com.yahoo.vespa.hosted.controller.application.DeploymentJobs.JobError; import com.yahoo.vespa.hosted.controller.application.DeploymentJobs.JobType; import com.yahoo.vespa.hosted.controller.application.JobStatus; -import com.yahoo.vespa.hosted.controller.api.integration.athenz.NToken; import com.yahoo.vespa.hosted.controller.athenz.mock.AthenzDbMock; import com.yahoo.vespa.hosted.controller.deployment.ApplicationPackageBuilder; import com.yahoo.vespa.hosted.controller.deployment.BuildSystem; import com.yahoo.vespa.hosted.controller.deployment.DeploymentTester; import com.yahoo.vespa.hosted.controller.persistence.ApplicationSerializer; +import com.yahoo.vespa.hosted.controller.rotation.RotationId; +import com.yahoo.vespa.hosted.controller.rotation.RotationLock; import com.yahoo.vespa.hosted.controller.versions.DeploymentStatistics; import com.yahoo.vespa.hosted.controller.versions.VersionStatus; import com.yahoo.vespa.hosted.controller.versions.VespaVersion; +import org.junit.Ignore; import org.junit.Test; import java.io.IOException; @@ -608,10 +611,102 @@ public class ControllerTest { tester.deployCompletely(application, applicationPackage); assertEquals(1, tester.controllerTester().nameService().records().size()); - Optional<Record> record = tester.controllerTester().nameService().findRecord(Record.Type.CNAME, RecordName.from("app1.tenant1.global.vespa.yahooapis.com")); + Optional<Record> record = tester.controllerTester().nameService().findRecord( + Record.Type.CNAME, RecordName.from("app1.tenant1.global.vespa.yahooapis.com") + ); assertTrue(record.isPresent()); assertEquals("app1.tenant1.global.vespa.yahooapis.com", record.get().name().asString()); - assertEquals("rotation-fqdn-01", record.get().value().asString()); + assertEquals("rotation-fqdn-01.", record.get().data().asString()); + } + + @Test + @Ignore // TODO: Re-enable once verified + public void testUpdatesExistingDnsAlias() { + DeploymentTester tester = new DeploymentTester(); + + // Application 1 is deployed and deleted + { + Application app1 = tester.createApplication("app1", "tenant1", 1, 1L); + ApplicationPackage applicationPackage = new ApplicationPackageBuilder() + .environment(Environment.prod) + .globalServiceId("foo") + .region("us-west-1") + .region("us-central-1") // Two deployments should result in DNS alias being registered once + .build(); + + tester.deployCompletely(app1, applicationPackage); + assertEquals(1, tester.controllerTester().nameService().records().size()); + Optional<Record> record = tester.controllerTester().nameService().findRecord( + Record.Type.CNAME, RecordName.from("app1.tenant1.global.vespa.yahooapis.com") + ); + assertTrue(record.isPresent()); + assertEquals("app1.tenant1.global.vespa.yahooapis.com", record.get().name().asString()); + assertEquals("rotation-fqdn-01.", record.get().data().asString()); + + // Application is deleted and rotation is unassigned + applicationPackage = new ApplicationPackageBuilder() + .environment(Environment.prod) + .allow(ValidationId.deploymentRemoval) + .build(); + tester.notifyJobCompletion(component, app1, true); + tester.deployAndNotify(app1, applicationPackage, true, systemTest); + tester.applications().deactivate(app1, new Zone(Environment.test, RegionName.from("us-east-1"))); + tester.applications().deactivate(app1, new Zone(Environment.staging, RegionName.from("us-east-3"))); + tester.applications().deleteApplication(app1.id(), Optional.of(new NToken("ntoken"))); + try (RotationLock lock = tester.applications().rotationRepository().lock()) { + assertTrue("Rotation is unassigned", + tester.applications().rotationRepository().availableRotations(lock) + .containsKey(new RotationId("rotation-id-01"))); + } + + // Record remains + record = tester.controllerTester().nameService().findRecord( + Record.Type.CNAME, RecordName.from("app1.tenant1.global.vespa.yahooapis.com") + ); + assertTrue(record.isPresent()); + } + + // Application 2 is deployed and assigned same rotation as application 1 had before deletion + { + Application app2 = tester.createApplication("app2", "tenant2", 1, 1L); + ApplicationPackage applicationPackage = new ApplicationPackageBuilder() + .environment(Environment.prod) + .globalServiceId("foo") + .region("us-west-1") + .region("us-central-1") + .build(); + tester.deployCompletely(app2, applicationPackage); + assertEquals(2, tester.controllerTester().nameService().records().size()); + Optional<Record> record = tester.controllerTester().nameService().findRecord( + Record.Type.CNAME, RecordName.from("app2.tenant2.global.vespa.yahooapis.com") + ); + assertTrue(record.isPresent()); + assertEquals("app2.tenant2.global.vespa.yahooapis.com", record.get().name().asString()); + assertEquals("rotation-fqdn-01.", record.get().data().asString()); + } + + // Application 1 is recreated, deployed and assigned a new rotation + { + Application app1 = tester.createApplication("app1", "tenant1", 1, 1L); + ApplicationPackage applicationPackage = new ApplicationPackageBuilder() + .environment(Environment.prod) + .globalServiceId("foo") + .region("us-west-1") + .region("us-central-1") + .build(); + tester.deployCompletely(app1, applicationPackage); + app1 = tester.applications().require(app1.id()); + assertEquals("rotation-id-02", app1.rotation().get().id().asString()); + + // Existing DNS record is updated to point to the newly assigned rotation + assertEquals(2, tester.controllerTester().nameService().records().size()); + Optional<Record> record = tester.controllerTester().nameService().findRecord( + Record.Type.CNAME, RecordName.from("app1.tenant1.global.vespa.yahooapis.com") + ); + assertTrue(record.isPresent()); + assertEquals("rotation-fqdn-02.", record.get().data().asString()); + } + } @Test diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/DnsMaintainerTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/DnsMaintainerTest.java new file mode 100644 index 00000000000..17f9ab29628 --- /dev/null +++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/DnsMaintainerTest.java @@ -0,0 +1,76 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.hosted.controller.maintenance; + +import com.yahoo.config.application.api.ValidationId; +import com.yahoo.config.provision.Environment; +import com.yahoo.config.provision.RegionName; +import com.yahoo.config.provision.Zone; +import com.yahoo.vespa.hosted.controller.Application; +import com.yahoo.vespa.hosted.controller.api.integration.athenz.NToken; +import com.yahoo.vespa.hosted.controller.api.integration.dns.Record; +import com.yahoo.vespa.hosted.controller.api.integration.dns.RecordName; +import com.yahoo.vespa.hosted.controller.application.ApplicationPackage; +import com.yahoo.vespa.hosted.controller.deployment.ApplicationPackageBuilder; +import com.yahoo.vespa.hosted.controller.deployment.DeploymentTester; +import com.yahoo.vespa.hosted.controller.persistence.MockCuratorDb; +import org.junit.Ignore; +import org.junit.Test; + +import java.time.Duration; +import java.util.Optional; + +import static com.yahoo.vespa.hosted.controller.application.DeploymentJobs.JobType.component; +import static com.yahoo.vespa.hosted.controller.application.DeploymentJobs.JobType.systemTest; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * @author mpolden + */ +public class DnsMaintainerTest { + + @Test + @Ignore // TODO: Enable once DnsMaintainer actually removes records + public void removes_record_for_unassigned_rotation() { + DeploymentTester tester = new DeploymentTester(); + Application application = tester.createApplication("app1", "tenant1", 1, 1L); + + ApplicationPackage applicationPackage = new ApplicationPackageBuilder() + .environment(Environment.prod) + .globalServiceId("foo") + .region("us-west-1") + .region("us-central-1") + .build(); + + // Deploy application + tester.deployCompletely(application, applicationPackage); + assertEquals(1, tester.controllerTester().nameService().records().size()); + Optional<Record> record = tester.controllerTester().nameService().findRecord( + Record.Type.CNAME, RecordName.from("app1.tenant1.global.vespa.yahooapis.com") + ); + assertTrue(record.isPresent()); + assertEquals("app1.tenant1.global.vespa.yahooapis.com", record.get().name().asString()); + assertEquals("rotation-fqdn-01.", record.get().data().asString()); + + // Remove application + applicationPackage = new ApplicationPackageBuilder() + .environment(Environment.prod) + .allow(ValidationId.deploymentRemoval) + .build(); + tester.notifyJobCompletion(component, application, true); + tester.deployAndNotify(application, applicationPackage, true, systemTest); + tester.applications().deactivate(application, new Zone(Environment.test, RegionName.from("us-east-1"))); + tester.applications().deactivate(application, new Zone(Environment.staging, RegionName.from("us-east-3"))); + tester.applications().deleteApplication(application.id(), Optional.of(new NToken("ntoken"))); + + // DnsMaintainer removes record + DnsMaintainer dnsMaintainer = new DnsMaintainer(tester.controller(), Duration.ofHours(12), + new JobControl(new MockCuratorDb()), + tester.controllerTester().nameService()); + dnsMaintainer.maintain(); + assertFalse("DNS record removed", tester.controllerTester().nameService().findRecord( + Record.Type.CNAME, RecordName.from("app1.tenant1.global.vespa.yahooapis.com")).isPresent()); + } + +} diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/controller/responses/maintenance.json b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/controller/responses/maintenance.json index 354bab4379c..8f8b76c83c6 100644 --- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/controller/responses/maintenance.json +++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/controller/responses/maintenance.json @@ -19,6 +19,9 @@ "name": "DeploymentMetricsMaintainer" }, { + "name": "DnsMaintainer" + }, + { "name": "MetricsReporter" }, { diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/CompressedFileReference.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/CompressedFileReference.java index 759a859253e..2c08f5a7605 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/CompressedFileReference.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/CompressedFileReference.java @@ -39,6 +39,13 @@ public class CompressedFileReference { return outputFile; } + public static File compress(File directory, File outputFile) throws IOException { + return compress(directory, Files.find(Paths.get(directory.getAbsolutePath()), + recurseDepth, + (p, basicFileAttributes) -> basicFileAttributes.isRegularFile()) + .map(Path::toFile).collect(Collectors.toList()), outputFile); + } + public static byte[] compress(File directory) throws IOException { return compress(directory, Files.find(Paths.get(directory.getAbsolutePath()), recurseDepth, diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java index 1ac3a1bd7df..fa1519b972a 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java @@ -8,30 +8,118 @@ import com.yahoo.jrt.Method; import com.yahoo.jrt.Request; import com.yahoo.jrt.Supervisor; import com.yahoo.log.LogLevel; -import net.jpountz.xxhash.XXHash64; +import net.jpountz.xxhash.StreamingXXHash64; import net.jpountz.xxhash.XXHashFactory; import java.io.File; import java.io.IOException; -import java.nio.ByteBuffer; import java.nio.file.FileAlreadyExistsException; import java.nio.file.Files; +import java.nio.file.StandardOpenOption; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Logger; public class FileReceiver { private final static Logger log = Logger.getLogger(FileReceiver.class.getName()); - private final static String RECEIVE_METHOD = "filedistribution.receiveFile"; - private final static String RECEIVE_META_METHOD = "filedistribution.receiveFileMeta"; - private final static String RECEIVE_PART_METHOD = "filedistribution.receiveFilePart"; - private final static String RECEIVE_EOF_METHOD = "filedistribution.receiveFileEof"; + public final static String RECEIVE_METHOD = "filedistribution.receiveFile"; + public final static String RECEIVE_META_METHOD = "filedistribution.receiveFileMeta"; + public final static String RECEIVE_PART_METHOD = "filedistribution.receiveFilePart"; + public final static String RECEIVE_EOF_METHOD = "filedistribution.receiveFileEof"; private final Supervisor supervisor; private final FileReferenceDownloader downloader; private final File downloadDirectory; - private final XXHash64 hasher = XXHashFactory.fastestInstance().hash64(); + private final AtomicInteger nextSessionId = new AtomicInteger(1); + private final Map<Integer, Session> sessions = new HashMap<>(); + + final static class Session { + private final StreamingXXHash64 hasher; + private final int sessionId; + private final FileReference reference; + private final FileReferenceData.Type fileType; + private final String fileName; + private final long fileSize; + private long currentFileSize; + private long currentPartId; + private long currentHash; + private final File fileReferenceDir; + private final File inprogressFile; + + Session(File downloadDirectory, int sessionId, FileReference reference, + FileReferenceData.Type fileType, String fileName, long fileSize) + { + this.hasher = XXHashFactory.fastestInstance().newStreamingHash64(0); + this.sessionId = sessionId; + this.reference = reference; + this.fileType = fileType; + this.fileName = fileName; + this.fileSize = fileSize; + currentFileSize = 0; + currentPartId = 0; + currentHash = 0; + fileReferenceDir = new File(downloadDirectory, reference.value()); + try { + Files.createDirectories(fileReferenceDir.toPath()); + } catch (IOException e) { + log.log(LogLevel.ERROR, "Failed creating directory(" + fileReferenceDir.toPath() + "): " + e.getMessage(), e); + throw new RuntimeException("Failed creating directory(" + fileReferenceDir.toPath() + "): ", e); + } + + try { + inprogressFile = Files.createTempFile(fileReferenceDir.toPath(), fileName, ".inprogress").toFile(); + } catch (IOException e) { + String msg = "Failed creating tempfile for inprogress file for(" + fileName + ") in '" + fileReferenceDir.toPath() + "': "; + log.log(LogLevel.ERROR, msg + e.getMessage(), e); + throw new RuntimeException(msg, e); + } + } + + void addPart(int partId, byte [] part) { + if (partId != currentPartId) { + throw new IllegalStateException("Received partid " + partId + " while expecting " + currentPartId); + } + if (fileSize < currentFileSize + part.length) { + throw new IllegalStateException("Received part would extend the file from " + currentFileSize + " to " + + (currentFileSize + part.length) + ", but " + fileSize + " is max."); + } + try { + Files.write(inprogressFile.toPath(), part, StandardOpenOption.WRITE, StandardOpenOption.APPEND); + } catch (IOException e) { + log.log(LogLevel.ERROR, "Failed writing to file(" + inprogressFile.toPath() + "): " + e.getMessage(), e); + throw new RuntimeException("Failed writing to file(" + inprogressFile.toPath() + "): ", e); + } + currentFileSize += part.length; + currentPartId++; + hasher.update(part, 0, part.length); + } + File close(long hash) { + if (hasher.getValue() != hash) { + throw new RuntimeException("xxhash from content (" + currentHash + ") is not equal to xxhash in request (" + hash + ")"); + } + File file = new File(fileReferenceDir, fileName); + try { + // Unpack if necessary + if (fileType == FileReferenceData.Type.compressed) { + File decompressedDir = Files.createTempDirectory("archive").toFile(); + log.log(LogLevel.DEBUG, "Archived file, unpacking " + inprogressFile + " to " + decompressedDir); + CompressedFileReference.decompress(inprogressFile, decompressedDir); + moveFileToDestination(decompressedDir, fileReferenceDir); + } else { + log.log(LogLevel.DEBUG, "Uncompressed file, moving to " + file.getAbsolutePath()); + moveFileToDestination(inprogressFile, file); + } + } catch (IOException e) { + log.log(LogLevel.ERROR, "Failed writing file: " + e.getMessage(), e); + throw new RuntimeException("Failed writing file: ", e); + } + return file; + } + } FileReceiver(Supervisor supervisor, FileReferenceDownloader downloader, File downloadDirectory) { this.supervisor = supervisor; @@ -48,10 +136,11 @@ public class FileReceiver { // receiveFile after getting a serveFile method call). handler needs to implement receiveFile method private List<Method> receiveFileMethod(Object handler) { List<Method> methods = new ArrayList<>(); - methods.add(new Method(RECEIVE_META_METHOD, "ssl", "ii", handler,"receiveFileMeta") + methods.add(new Method(RECEIVE_META_METHOD, "sssl", "ii", handler,"receiveFileMeta") .paramDesc(0, "filereference", "file reference to download") .paramDesc(1, "filename", "filename") - .paramDesc(2, "filelength", "length in bytes of file") + .paramDesc(2, "type", "'file' or 'compressed'") + .paramDesc(3, "filelength", "length in bytes of file") .returnDesc(0, "ret", "0 if success, 1 otherwise") .returnDesc(1, "session-id", "Session id to be used for this transfer")); methods.add(new Method(RECEIVE_PART_METHOD, "siix", "i", handler,"receiveFilePart") @@ -94,7 +183,7 @@ public class FileReceiver { if (errorCode == 0) { // TODO: Remove when system test works log.log(LogLevel.INFO, "Receiving file reference '" + fileReference.value() + "'"); - receiveFile(new FileReferenceData(fileReference, filename, FileReferenceData.Type.valueOf(type), content, xxhash)); + receiveFile(new FileReferenceDataBlob(fileReference, filename, FileReferenceData.Type.valueOf(type), content, xxhash)); req.returnValues().add(new Int32Value(0)); } else { log.log(LogLevel.WARNING, "Receiving file reference '" + fileReference.value() + "' failed: " + errorDescription); @@ -104,16 +193,17 @@ public class FileReceiver { } void receiveFile(FileReferenceData fileReferenceData) { - long xxHashFromContent = hasher.hash(ByteBuffer.wrap(fileReferenceData.content()), 0); - if (xxHashFromContent != fileReferenceData.xxhash()) - throw new RuntimeException("xxhash from content (" + xxHashFromContent + ") is not equal to xxhash in request (" + fileReferenceData.xxhash()+ ")"); + long xxHashFromContent = fileReferenceData.xxhash(); + if (xxHashFromContent != fileReferenceData.xxhash()) { + throw new RuntimeException("xxhash from content (" + xxHashFromContent + ") is not equal to xxhash in request (" + fileReferenceData.xxhash() + ")"); + } File fileReferenceDir = new File(downloadDirectory, fileReferenceData.fileReference().value()); // file might be a directory (and then type is compressed) File file = new File(fileReferenceDir, fileReferenceData.filename()); try { File tempFile = new File(Files.createTempDirectory("downloaded").toFile(), fileReferenceData.filename()); - Files.write(tempFile.toPath(), fileReferenceData.content()); + Files.write(tempFile.toPath(), fileReferenceData.content().array()); // Unpack if necessary if (fileReferenceData.type() == FileReferenceData.Type.compressed) { @@ -133,7 +223,7 @@ public class FileReceiver { } } - private void moveFileToDestination(File tempFile, File destination) { + private static void moveFileToDestination(File tempFile, File destination) { try { Files.move(tempFile.toPath(), destination.toPath()); log.log(LogLevel.INFO, "File moved from " + tempFile.getAbsolutePath()+ " to " + destination.getAbsolutePath()); @@ -151,15 +241,78 @@ public class FileReceiver { @SuppressWarnings({"UnusedDeclaration"}) public final void receiveFileMeta(Request req) { log.info("Received method call '" + req.methodName() + "' with parameters : " + req.parameters()); + FileReference reference = new FileReference(req.parameters().get(0).asString()); + String fileName = req.parameters().get(1).asString(); + String type = req.parameters().get(2).asString(); + long fileSize = req.parameters().get(3).asInt64(); + int sessionId = nextSessionId.getAndIncrement(); + int retval = 0; + synchronized (sessions) { + if (sessions.containsKey(sessionId)) { + retval = 1; + log.severe("Session id " + sessionId + " already exist, impossible. Request from(" + req.target() + ")"); + } else { + try { + sessions.put(sessionId, new Session(downloadDirectory, sessionId, reference, + FileReferenceData.Type.valueOf(type),fileName, fileSize)); + } catch (Exception e) { + retval = 1; + } + } + } + req.returnValues().add(new Int32Value(retval)); + req.returnValues().add(new Int32Value(sessionId)); } @SuppressWarnings({"UnusedDeclaration"}) public final void receiveFilePart(Request req) { log.info("Received method call '" + req.methodName() + "' with parameters : " + req.parameters()); + + FileReference reference = new FileReference(req.parameters().get(0).asString()); + int sessionId = req.parameters().get(1).asInt32(); + int partId = req.parameters().get(2).asInt32(); + byte [] part = req.parameters().get(3).asData(); + Session session = getSession(sessionId); + int retval = verifySession(session, sessionId, reference); + try { + session.addPart(partId, part); + } catch (Exception e) { + log.severe("Got exception + " + e); + retval = 1; + } + req.returnValues().add(new Int32Value(retval)); } @SuppressWarnings({"UnusedDeclaration"}) public final void receiveFileEof(Request req) { log.info("Received method call '" + req.methodName() + "' with parameters : " + req.parameters()); + FileReference reference = new FileReference(req.parameters().get(0).asString()); + int sessionId = req.parameters().get(1).asInt32(); + long xxhash = req.parameters().get(2).asInt64(); + Session session = getSession(sessionId); + int retval = verifySession(session, sessionId, reference); + File file = session.close(xxhash); + downloader.completedDownloading(reference, file); + synchronized (sessions) { + sessions.remove(sessionId); + } + req.returnValues().add(new Int32Value(retval)); + } + + private final Session getSession(Integer sessionId) { + synchronized (sessions) { + return sessions.get(sessionId); + } + } + private static final int verifySession(Session session, int sessionId, FileReference reference) { + if (session == null) { + log.severe("session-id " + sessionId + " does not exist."); + return 1; + } + if (! session.reference.equals(reference)) { + log.severe("Session " + session.sessionId + " expects reference " + reference.value() + ", but was " + session.reference.value()); + return 1; + } + return 0; } } diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceData.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceData.java index 6272390f5cb..dabdba2bfc0 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceData.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceData.java @@ -2,41 +2,27 @@ package com.yahoo.vespa.filedistribution; import com.yahoo.config.FileReference; -import net.jpountz.xxhash.XXHashFactory; import java.nio.ByteBuffer; + /** * Utility class for a file reference with data and metadata * * @author hmusum */ -public class FileReferenceData { +public abstract class FileReferenceData { public enum Type {file, compressed} - - private final FileReference fileReference; private final String filename; private final Type type; - private final byte[] content; - private final long xxhash; - - public FileReferenceData(FileReference fileReference, String filename, Type type, byte[] content) { - this(fileReference, filename, type, content, XXHashFactory.fastestInstance().hash64().hash(ByteBuffer.wrap(content), 0)); - } - public FileReferenceData(FileReference fileReference, String filename, Type type, byte[] content, long xxhash) { + public FileReferenceData(FileReference fileReference, String filename, Type type) { this.fileReference = fileReference; this.filename = filename; this.type = type; - this.content = content; - this.xxhash = xxhash; - } - - public static FileReferenceData empty(FileReference fileReference, String filename) { - return new FileReferenceData(fileReference, filename, FileReferenceData.Type.file, new byte[0], 0); } public FileReference fileReference() { @@ -51,11 +37,31 @@ public class FileReferenceData { return type; } - public byte[] content() { - return content; + public ByteBuffer content() { + ByteBuffer bb = ByteBuffer.allocate((int)size()); + while (bb.remaining() > 0) { + nextContent(bb); + } + return bb; } + /** + * Will provide the next part of the content. + * + * @param bb with some available space + * @return Number of bytes transferred. + */ + public abstract int nextContent(ByteBuffer bb); - public long xxhash() { - return xxhash; - } + /** + * Only guaranteed to be valid after all content has been consumed. + * @return xx64hash of content + */ + public abstract long xxhash(); + + /** + * The size of the content in bytes + * + * @return number of bytes + */ + public abstract long size(); } diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDataBlob.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDataBlob.java new file mode 100644 index 00000000000..3759cbe2ef7 --- /dev/null +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDataBlob.java @@ -0,0 +1,44 @@ +package com.yahoo.vespa.filedistribution; + +import com.yahoo.config.FileReference; +import net.jpountz.xxhash.XXHashFactory; + +import java.nio.ByteBuffer; + +public class FileReferenceDataBlob extends FileReferenceData { + private final byte[] content; + private final long xxhash; + + public FileReferenceDataBlob(FileReference fileReference, String filename, Type type, byte[] content) { + this(fileReference, filename, type, content, XXHashFactory.fastestInstance().hash64().hash(ByteBuffer.wrap(content), 0)); + } + + public FileReferenceDataBlob(FileReference fileReference, String filename, Type type, byte[] content, long xxhash) { + super(fileReference, filename, type); + this.content = content; + this.xxhash = xxhash; + } + + public static FileReferenceData empty(FileReference fileReference, String filename) { + return new FileReferenceDataBlob(fileReference, filename, FileReferenceData.Type.file, new byte[0], 0); + } + + public ByteBuffer content() { + return ByteBuffer.wrap(content); + } + @Override + public int nextContent(ByteBuffer bb) { + bb.put(content); + return content.length; + } + + @Override + public long xxhash() { + return xxhash; + } + + @Override + public long size() { + return content.length; + } +} diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/LazyFileReferenceData.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/LazyFileReferenceData.java new file mode 100644 index 00000000000..1681843a818 --- /dev/null +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/LazyFileReferenceData.java @@ -0,0 +1,52 @@ +package com.yahoo.vespa.filedistribution; + +import com.yahoo.config.FileReference; +import net.jpountz.xxhash.StreamingXXHash64; +import net.jpountz.xxhash.XXHashFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.ReadableByteChannel; +import java.nio.file.Files; + +public class LazyFileReferenceData extends FileReferenceData { + private final File file; + private final ReadableByteChannel channel; + private final StreamingXXHash64 hasher; + public LazyFileReferenceData(FileReference fileReference, String filename, Type type, File file) throws IOException { + super(fileReference, filename, type); + this.file = file; + channel = Files.newByteChannel(file.toPath()); + this.hasher = XXHashFactory.fastestInstance().newStreamingHash64(0); + } + + @Override + public int nextContent(ByteBuffer bb) { + int read = 0; + int pos = bb.position(); + try { + read = channel.read(bb); + } catch (IOException e) { + return -1; + } + if (read > 0) { + hasher.update(bb.array(), pos, read); + } + return read; + } + + @Override + public long xxhash() { + return hasher.getValue(); + } + + @Override + public long size() { + try { + return Files.size(file.toPath()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java index 1c9e8cdb91b..4618b229de1 100644 --- a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java +++ b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java @@ -222,7 +222,7 @@ public class FileDownloaderTest { } private void receiveFile(FileReference fileReference, String filename, FileReferenceData.Type type, byte[] content) { - fileDownloader.receiveFile(new FileReferenceData(fileReference, filename, type, content)); + fileDownloader.receiveFile(new FileReferenceDataBlob(fileReference, filename, type, content)); } private static class MockConnection implements ConnectionPool, com.yahoo.vespa.config.Connection { diff --git a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileReceiverTest.java b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileReceiverTest.java new file mode 100644 index 00000000000..5edd1151cb1 --- /dev/null +++ b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileReceiverTest.java @@ -0,0 +1,59 @@ +package com.yahoo.vespa.filedistribution; + +import com.yahoo.config.FileReference; +import com.yahoo.text.Utf8; +import net.jpountz.xxhash.XXHash64; +import net.jpountz.xxhash.XXHashFactory; +import org.junit.Test; +import static org.junit.Assert.assertEquals; + + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Files; + +public class FileReceiverTest { + private final File root = new File("."); + private final XXHash64 hasher = XXHashFactory.fastestInstance().hash64(); + @Test + public void receiveMultiPartFile() throws IOException{ + + String [] parts = new String[3]; + parts[0] = "first part\n"; + parts[1] = "second part\n"; + parts[2] = "third part\n"; + StringBuilder sb = new StringBuilder(); + for (String s : parts) { + sb.append(s); + } + String all = sb.toString(); + String allRead = transferParts(new FileReference("ref-a"), "myfile-1", all, 1); + assertEquals(all, allRead); + allRead = transferParts(new FileReference("ref-a"), "myfile-2", all, 2); + assertEquals(all, allRead); + allRead = transferParts(new FileReference("ref-a"), "myfile-3", all, 3); + assertEquals(all, allRead); + + } + + private String transferParts(FileReference ref, String fileName, String all, int numParts) throws IOException { + byte [] allContent = Utf8.toBytes(all); + + FileReceiver.Session session = new FileReceiver.Session(root, 1, ref, + FileReferenceData.Type.file, fileName, allContent.length); + int partSize = (allContent.length+(numParts-1))/numParts; + ByteBuffer bb = ByteBuffer.wrap(allContent); + for (int i = 0, pos = 0; i < numParts; i++) { + byte [] buf = new byte[Math.min(partSize, allContent.length - pos)]; + bb.get(buf); + session.addPart(i, buf); + pos += buf.length; + } + File file = session.close(hasher.hash(ByteBuffer.wrap(allContent), 0)); + + byte [] allReadBytes = Files.readAllBytes(file.toPath()); + file.delete(); + return Utf8.toString(allReadBytes); + } +} diff --git a/jdisc_core/src/main/java/com/yahoo/jdisc/core/StandaloneMain.java b/jdisc_core/src/main/java/com/yahoo/jdisc/core/StandaloneMain.java index cd52a724275..a78e4f1af40 100644 --- a/jdisc_core/src/main/java/com/yahoo/jdisc/core/StandaloneMain.java +++ b/jdisc_core/src/main/java/com/yahoo/jdisc/core/StandaloneMain.java @@ -47,7 +47,7 @@ public class StandaloneMain { loader.destroy(); System.out.println("debug\tStopped ok."); System.exit(0); - } catch (Exception e) { + } catch (Throwable e) { System.out.print("debug\tUnexpected: "); e.printStackTrace(); log.log(Level.SEVERE, "Unexpected: ", e); diff --git a/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletResponseController.java b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletResponseController.java index 735bf3ed89a..bc14a063cd1 100644 --- a/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletResponseController.java +++ b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletResponseController.java @@ -184,8 +184,7 @@ public class ServletResponseController { private static void setHeaders_holdingLock(Response jdiscResponse, HttpServletResponse servletResponse) { for (final Map.Entry<String, String> entry : jdiscResponse.headers().entries()) { - final String value = entry.getValue(); - servletResponse.addHeader(entry.getKey(), value != null ? value : ""); + servletResponse.addHeader(entry.getKey(), entry.getValue()); } if (servletResponse.getContentType() == null) { diff --git a/jdisc_http_service/src/test/java/com/yahoo/jdisc/http/server/jetty/HttpServerTest.java b/jdisc_http_service/src/test/java/com/yahoo/jdisc/http/server/jetty/HttpServerTest.java index 7ed13decbf6..55dec64e967 100644 --- a/jdisc_http_service/src/test/java/com/yahoo/jdisc/http/server/jetty/HttpServerTest.java +++ b/jdisc_http_service/src/test/java/com/yahoo/jdisc/http/server/jetty/HttpServerTest.java @@ -371,6 +371,7 @@ public class HttpServerTest { } // Header with no value is disallowed by https://tools.ietf.org/html/rfc7230#section-3.2 + // Details in https://github.com/eclipse/jetty.project/issues/1116 @Test public void requireThatHeaderWithNullValueIsOmitted() throws Exception { final TestDriver driver = TestDrivers.newInstance(new EchoWithHeaderRequestHandler("X-Foo", null)); @@ -380,13 +381,14 @@ public class HttpServerTest { assertThat(driver.close(), is(true)); } - // Header with no value is disallowed by https://tools.ietf.org/html/rfc7230#section-3.2 + // Header with empty value is allowed by https://tools.ietf.org/html/rfc7230#section-3.2 + // Details in https://github.com/eclipse/jetty.project/issues/1116 @Test - public void requireThatHeaderWithEmptyValueIsOmitted() throws Exception { + public void requireThatHeaderWithEmptyValueIsAllowed() throws Exception { final TestDriver driver = TestDrivers.newInstance(new EchoWithHeaderRequestHandler("X-Foo", "")); driver.client().get("/status.html") .expectStatusCode(is(OK)) - .expectNoHeader("X-Foo"); + .expectHeader("X-Foo", is("")); assertThat(driver.close(), is(true)); } diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/restapi/v2/NodesApiHandler.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/restapi/v2/NodesApiHandler.java index 797453b12c9..b47b3544d17 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/restapi/v2/NodesApiHandler.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/restapi/v2/NodesApiHandler.java @@ -184,8 +184,7 @@ public class NodesApiHandler extends LoggingRequestHandler { } private Node nodeFromRequest(HttpRequest request) { - String path = request.getUri().getPath(); - String hostname = path.substring(path.lastIndexOf("/")); + String hostname = lastElement(request.getUri().getPath()); return nodeRepository.getNode(hostname).orElseThrow(() -> new NotFoundException("No node found with hostname " + hostname)); diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/restapi/v2/RestApiTest.java b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/restapi/v2/RestApiTest.java index 2739b4d1cf8..c23a7f9990a 100644 --- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/restapi/v2/RestApiTest.java +++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/restapi/v2/RestApiTest.java @@ -455,6 +455,11 @@ public class RestApiTest { Request.Method.PATCH), "{\"message\":\"Updated host4.yahoo.com\"}"); + assertResponse(new Request("http://localhost:8080/nodes/v2/node/doesnotexist.yahoo.com", + Utf8.toBytes("{\"currentRestartGeneration\": 1}"), + Request.Method.PATCH), + 404, "{\"error-code\":\"NOT_FOUND\",\"message\":\"No node found with hostname doesnotexist.yahoo.com\"}"); + assertResponse(new Request("http://localhost:8080/nodes/v2/node/host5.yahoo.com", Utf8.toBytes("{\"currentRestartGeneration\": 1}"), Request.Method.PATCH), diff --git a/searchcore/src/vespa/searchcore/proton/server/proton_config_fetcher.cpp b/searchcore/src/vespa/searchcore/proton/server/proton_config_fetcher.cpp index 710b0ad228c..adb52583a58 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton_config_fetcher.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/proton_config_fetcher.cpp @@ -24,7 +24,9 @@ ProtonConfigFetcher::ProtonConfigFetcher(const config::ConfigUri & configUri, IP _owner(owner), _mutex(), _dbManagerMap(), - _threadPool(128 * 1024, 1) + _threadPool(128 * 1024, 1), + _oldDocumentTypeRepos(), + _currentDocumentTypeRepo() { } @@ -100,10 +102,11 @@ ProtonConfigFetcher::reconfigure() assert(insres.first->second->getGeneration() == generation); } } - auto configSnapshot = std::make_shared<ProtonConfigSnapshot>(std::move(bootstrapConfig), std::move(dbConfigs)); + auto configSnapshot = std::make_shared<ProtonConfigSnapshot>(bootstrapConfig, std::move(dbConfigs)); LOG(debug, "Reconfiguring proton with gen %" PRId64, generation); _owner.reconfigure(std::move(configSnapshot)); LOG(debug, "Reconfigured proton with gen %" PRId64, generation); + rememberDocumentTypeRepo(bootstrapConfig->getDocumentTypeRepoSP()); } void @@ -189,4 +192,22 @@ ProtonConfigFetcher::getDocumentDBConfig(const DocTypeName & docTypeName) const return it->second->getConfig(); } +void +ProtonConfigFetcher::rememberDocumentTypeRepo(std::shared_ptr<document::DocumentTypeRepo> repo) +{ + // Ensure that previous document type repo is kept alive, and also + // any document type repo that was current within last 10 minutes. + using namespace std::chrono_literals; + if (repo == _currentDocumentTypeRepo) { + return; // no change + } + auto &repos = _oldDocumentTypeRepos; + TimePoint now = Clock::now(); + while (!repos.empty() && repos.front().first < now) { + repos.pop_front(); + } + repos.emplace_back(now + 10min, _currentDocumentTypeRepo); + _currentDocumentTypeRepo = repo; +} + } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/proton_config_fetcher.h b/searchcore/src/vespa/searchcore/proton/server/proton_config_fetcher.h index 9adbabbc174..fa1f75ebe91 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton_config_fetcher.h +++ b/searchcore/src/vespa/searchcore/proton/server/proton_config_fetcher.h @@ -9,6 +9,9 @@ #include "bootstrapconfigmanager.h" #include "documentdbconfigmanager.h" #include "i_document_db_config_owner.h" +#include <chrono> + +namespace document { class DocumentTypeRepo; } namespace proton { @@ -47,6 +50,9 @@ public: private: typedef std::map<DocTypeName, DocumentDBConfigManager::SP> DBManagerMap; + using Clock = std::chrono::steady_clock; + using TimePoint = std::chrono::time_point<Clock>; + using OldDocumentTypeRepo = std::pair<TimePoint, std::shared_ptr<document::DocumentTypeRepo>>; BootstrapConfigManager _bootstrapConfigManager; config::ConfigRetriever _retriever; @@ -57,11 +63,14 @@ private: DBManagerMap _dbManagerMap; FastOS_ThreadPool _threadPool; + std::deque<OldDocumentTypeRepo> _oldDocumentTypeRepos; + std::shared_ptr<document::DocumentTypeRepo> _currentDocumentTypeRepo; void fetchConfigs(); void updateDocumentDBConfigs(const BootstrapConfigSP & config, const config::ConfigSnapshot & snapshot); void reconfigure(); const config::ConfigKeySet pruneManagerMap(const BootstrapConfigSP & config); + void rememberDocumentTypeRepo(std::shared_ptr<document::DocumentTypeRepo> repo); }; |