diff options
author | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-06-15 23:09:44 +0200 |
---|---|---|
committer | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-06-15 23:09:44 +0200 |
commit | 72231250ed81e10d66bfe70701e64fa5fe50f712 (patch) | |
tree | 2728bba1131a6f6e5bdf95afec7d7ff9358dac50 /storageserver |
Publish
Diffstat (limited to 'storageserver')
32 files changed, 2933 insertions, 0 deletions
diff --git a/storageserver/.gitignore b/storageserver/.gitignore new file mode 100644 index 00000000000..a9b20e8992d --- /dev/null +++ b/storageserver/.gitignore @@ -0,0 +1,2 @@ +Makefile +Testing diff --git a/storageserver/CMakeLists.txt b/storageserver/CMakeLists.txt new file mode 100644 index 00000000000..8f537a61793 --- /dev/null +++ b/storageserver/CMakeLists.txt @@ -0,0 +1,18 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_define_module( + DEPENDS + fastos + storage + streamingvisitors_searchvisitor + memfilepersistence + + APPS + src/apps/storaged + src/vespa/storageserver/app + + TEST_DEPENDS + messagebus_messagebus-test + + TESTS + src/tests +) diff --git a/storageserver/OWNERS b/storageserver/OWNERS new file mode 100644 index 00000000000..97c35339850 --- /dev/null +++ b/storageserver/OWNERS @@ -0,0 +1,2 @@ +vekterli +dybdahl diff --git a/storageserver/src/.gitignore b/storageserver/src/.gitignore new file mode 100644 index 00000000000..9669be96e4b --- /dev/null +++ b/storageserver/src/.gitignore @@ -0,0 +1,10 @@ +*.So +*.lo +.depend +.depend.NEW +.deps +.libs +Makefile.ini +config_command.sh +project.dsw +/storageserver.mak diff --git a/storageserver/src/apps/storaged/.gitignore b/storageserver/src/apps/storaged/.gitignore new file mode 100644 index 00000000000..67fbcdcf4db --- /dev/null +++ b/storageserver/src/apps/storaged/.gitignore @@ -0,0 +1,3 @@ +/Makefile +/storaged +storaged-bin diff --git a/storageserver/src/apps/storaged/CMakeLists.txt b/storageserver/src/apps/storaged/CMakeLists.txt new file mode 100644 index 00000000000..629b357d574 --- /dev/null +++ b/storageserver/src/apps/storaged/CMakeLists.txt @@ -0,0 +1,10 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(storageserver_storaged_app + SOURCES + storage.cpp + forcelink.cpp + OUTPUT_NAME storaged-bin + INSTALL sbin + DEPENDS + storageserver_storageapp +) diff --git a/storageserver/src/apps/storaged/forcelink.cpp b/storageserver/src/apps/storaged/forcelink.cpp new file mode 100644 index 00000000000..2c628644e82 --- /dev/null +++ b/storageserver/src/apps/storaged/forcelink.cpp @@ -0,0 +1,30 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/fastos/fastos.h> +#include <vespa/document/base/forcelink.h> +#include <vespa/documentapi/documentapi.h> +#include <vespa/searchlib/aggregation/forcelink.hpp> +#include <vespa/searchlib/expression/forcelink.hpp> + +/* Here is code that initializes a lot of stuff to force it to be linked */ +namespace search { + struct ForceLink { + ForceLink() { + if (time(NULL) == 7) { + // grouping stuff + forcelink_searchlib_aggregation(); + forcelink_searchlib_expression(); + } + } + }; +} + +namespace storage { + +void serverForceLink() +{ + document::ForceLink documentForce; + search::ForceLink searchForce; +} + +} // namespace storage diff --git a/storageserver/src/apps/storaged/forcelink.h b/storageserver/src/apps/storaged/forcelink.h new file mode 100644 index 00000000000..c3fa56c1b3c --- /dev/null +++ b/storageserver/src/apps/storaged/forcelink.h @@ -0,0 +1,19 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * \file forcelink.h + * + * \brief Utility to link in objects we need in binary. + */ + +#pragma once + +#include <vespa/documentapi/documentapi.h> +#include <vespa/fastos/fastos.h> +#include <vespa/config-rank-profiles.h> + +namespace storage { + + extern void serverForceLink(); + +} + diff --git a/storageserver/src/apps/storaged/storage.cpp b/storageserver/src/apps/storaged/storage.cpp new file mode 100644 index 00000000000..8eb955ae930 --- /dev/null +++ b/storageserver/src/apps/storaged/storage.cpp @@ -0,0 +1,220 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * \class storage::StorageApp + * \ingroup serverapp + * + * \brief The storage daemon application. + * + * This code is NOT unit tested and should be as minimal as possible. + * + * It should handle process signals and have the main method for the + * application, but as little as possible else. + */ + +#include <vespa/fastos/fastos.h> +#include <vespa/log/log.h> +#include <signal.h> +#include <vespa/persistence/spi/exceptions.h> +#include <vespa/storage/storageutil/utils.h> +#include <vespa/storageserver/app/distributorprocess.h> +#include "forcelink.h" +#include <vespa/storageserver/app/memfileservicelayerprocess.h> +#include <vespa/storageserver/app/dummyservicelayerprocess.h> +#include <vespa/storageserver/app/rpcservicelayerprocess.h> +#include <vespa/vespalib/util/programoptions.h> +#include <vespa/vespalib/util/shutdownguard.h> +#include <vespa/config/config.h> + +LOG_SETUP("vds.application"); + +namespace storage { + +namespace { + +Process::UP createProcess(vespalib::stringref configId) { + // FIXME: Rewrite parameter to config uri and pass when all subsequent configs are converted. + config::ConfigUri uri(configId); + std::unique_ptr<vespa::config::content::core::StorServerConfig> serverConfig = config::ConfigGetter<vespa::config::content::core::StorServerConfig>::getConfig(uri.getConfigId(), uri.getContext()); + if (serverConfig->isDistributor) { + return Process::UP(new DistributorProcess(configId)); + } else switch (serverConfig->persistenceProvider.type) { + case vespa::config::content::core::StorServerConfig::PersistenceProvider::STORAGE: + return Process::UP(new MemFileServiceLayerProcess(configId)); + case vespa::config::content::core::StorServerConfig::PersistenceProvider::DUMMY: + return Process::UP(new DummyServiceLayerProcess(configId)); + case vespa::config::content::core::StorServerConfig::PersistenceProvider::RPC: + return Process::UP(new RpcServiceLayerProcess(configId)); + default: + throw vespalib::IllegalStateException( + "Unknown persistence provider.", VESPA_STRLOC); + } +} + +} // End of anonymous namespace + +class StorageApp : public FastOS_Application, + private vespalib::ProgramOptions +{ + std::string _configId; + bool _showSyntax; + uint32_t _maxShutdownTime; + int _lastSignal; + vespalib::Monitor _signalLock; + Process::UP _process; + +public: + StorageApp(); + + void handleSignal(int signal) { + LOG(info, "Got signal %d, waiting for lock", signal); + vespalib::MonitorGuard sync(_signalLock); + + LOG(info, "Got lock for signal %d", signal); + _lastSignal = signal; + sync.signal(); + } + void handleSignals(); + +private: + bool Init(); + int Main(); + bool gotSignal() { return _lastSignal != 0; } +}; + +StorageApp::StorageApp() + : _showSyntax(false), _maxShutdownTime(120000), _lastSignal(0), _signalLock() +{ + setSyntaxMessage( + "This is the main daemon used to start the storage nodes. The same " + "actual binary is used for both storage and distributor nodes, but " + "it is duplicated when installing, such that one can hotfix a " + "distributor bug without restarting storage nodes."); + addOption("c config-id", _configId, + "The config identifier this storage node should use to request " + "config. This identifier specifies whether the binary will behave " + "as a storage or distributor, what cluster it belongs to, and the " + "index it has in the cluster."); + addOption("h help", _showSyntax, false, "Show this syntax help page."); + addOption("t maxshutdowntime", _maxShutdownTime, uint32_t(120000), + "Maximum amount of milliseconds we allow proper shutdown to run before " + "abruptly killing the process."); +} + +bool StorageApp::Init() +{ + FastOS_Application::Init(); + setCommandLineArguments( + FastOS_Application::_argc, FastOS_Application::_argv); + try{ + parse(); + } catch (vespalib::InvalidCommandLineArgumentsException& e) { + std::cerr << e.getMessage() << "\n\n"; + writeSyntaxPage(std::cerr); + exit(EXIT_FAILURE); + } + if (_showSyntax) { + writeSyntaxPage(std::cerr); + exit(0); + } + return true; +} + +namespace { + storage::StorageApp *sigtramp = 0; + uint32_t _G_signalCount = 0; + + void killHandler(int sig) { + if (_G_signalCount == 0) { + _G_signalCount++; + if (sigtramp == 0) _exit(EXIT_FAILURE); + // note: this is not totally safe, sigtramp is not protected by a lock + sigtramp->handleSignal(sig); + } else { + fprintf(stderr, "Received another shutdown signal %u while " + "shutdown in progress (count=%u)", + sig, _G_signalCount); + } + } + + void setupKillHandler() { + struct sigaction usr_action; + sigset_t block_mask; + + /* Establish the signal handler. */ + sigfillset (&block_mask); + usr_action.sa_handler = killHandler; + usr_action.sa_mask = block_mask; + usr_action.sa_flags = 0; + sigaction (SIGTERM, &usr_action, NULL); + sigaction (SIGINT, &usr_action, NULL); + } +} + +void StorageApp::handleSignals() +{ + if (gotSignal()) { + int signal = _lastSignal; + LOG(debug, "starting controlled shutdown of storage " + "(received signal %d)", signal); + _process->getNode().requestShutdown("controlled shutdown"); + } +} + +int StorageApp::Main() +{ + try{ + _process = createProcess(_configId); + _process->setupConfig(600000); + _process->createNode(); + } catch (const spi::HandledException & e) { + LOG(warning, "Died due to known cause: %s", e.what()); + return 1; + } catch (const vespalib::NetworkSetupFailureException & e) { + LOG(warning, "Network failure: '%s'", e.what()); + return 1; + } catch (const vespalib::IllegalStateException & e) { + LOG(error, "Unknown IllegalStateException: '%s'", e.what()); + return 1; + } catch (const vespalib::Exception & e) { + LOG(error, "Caught exception when starting: %s", e.what()); + return 1; + } + + // Not setting up kill handlers before storage is up. Before that + // we can just die quickly with default handlers. + LOG(debug, "Node created. Setting up kill handler."); + setupKillHandler(); + + // main loop - wait for termination signal + while (!_process->getNode().attemptedStopped()) { + if (_process->configUpdated()) { + LOG(debug, "Config updated. Progagating config updates"); + ResumeGuard guard(_process->getNode().pause()); + _process->updateConfig(); + } + // Wait until we get a kill signal. + vespalib::MonitorGuard lock(_signalLock); + lock.wait(1000); + handleSignals(); + } + LOG(debug, "Server was attempted stopped, shutting down"); + // Create guard that will forcifully kill storage if destruction takes longer + // time than given timeout. + vespalib::ShutdownGuard shutdownGuard(_maxShutdownTime); + LOG(debug, "Attempting proper shutdown"); + _process.reset(0); + LOG(debug, "Completed controlled shutdown."); + return 0; +} + +} // storage + +int main(int argc, char **argv) +{ + storage::StorageApp app; + storage::sigtramp = &app; + int retval = app.Entry(argc,argv); + storage::sigtramp = NULL; + LOG(debug, "Exiting"); + return retval; +} diff --git a/storageserver/src/tests/.gitignore b/storageserver/src/tests/.gitignore new file mode 100644 index 00000000000..101f84131dc --- /dev/null +++ b/storageserver/src/tests/.gitignore @@ -0,0 +1,6 @@ +/Makefile +/dirconfig.tmp +/test.vlog +/testrunner +/vdsroot +storageserver_testrunner_app diff --git a/storageserver/src/tests/CMakeLists.txt b/storageserver/src/tests/CMakeLists.txt new file mode 100644 index 00000000000..b735716c705 --- /dev/null +++ b/storageserver/src/tests/CMakeLists.txt @@ -0,0 +1,12 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(storageserver_testrunner_app + SOURCES + storageservertest.cpp + testhelper.cpp + dummystoragelink.cpp + testrunner.cpp + DEPENDS + storageserver_storageapp + vdstestlib +) +vespa_add_test(NAME storageserver_testrunner_app COMMAND storageserver_testrunner_app) diff --git a/storageserver/src/tests/config-doctypes.cfg b/storageserver/src/tests/config-doctypes.cfg new file mode 100644 index 00000000000..f41593ebfc3 --- /dev/null +++ b/storageserver/src/tests/config-doctypes.cfg @@ -0,0 +1,158 @@ +enablecompression false +documenttype[3] +documenttype[0].id -519202262 +documenttype[0].name "text/plain" +documenttype[0].version 0 +documenttype[0].headerstruct 160469461 +documenttype[0].bodystruct 749465898 +documenttype[0].inherits[0] +documenttype[0].datatype[2] +documenttype[0].datatype[0].id 160469461 +documenttype[0].datatype[0].type STRUCT +documenttype[0].datatype[0].array.element.id 0 +documenttype[0].datatype[0].map.key.id 0 +documenttype[0].datatype[0].map.value.id 0 +documenttype[0].datatype[0].wset.key.id 0 +documenttype[0].datatype[0].wset.createifnonexistent false +documenttype[0].datatype[0].wset.removeifzero false +documenttype[0].datatype[0].annotationref.annotation.id 0 +documenttype[0].datatype[0].sstruct.name "text/plain.header" +documenttype[0].datatype[0].sstruct.version 0 +documenttype[0].datatype[0].sstruct.compression.type NONE +documenttype[0].datatype[0].sstruct.compression.level 0 +documenttype[0].datatype[0].sstruct.compression.threshold 90 +documenttype[0].datatype[0].sstruct.compression.minsize 0 +documenttype[0].datatype[0].sstruct.field[3] +documenttype[0].datatype[0].sstruct.field[0].name "author" +documenttype[0].datatype[0].sstruct.field[0].id 644499292 +documenttype[0].datatype[0].sstruct.field[0].id_v6 177126295 +documenttype[0].datatype[0].sstruct.field[0].datatype 2 +documenttype[0].datatype[0].sstruct.field[1].name "date" +documenttype[0].datatype[0].sstruct.field[1].id 491786523 +documenttype[0].datatype[0].sstruct.field[1].id_v6 916979460 +documenttype[0].datatype[0].sstruct.field[1].datatype 0 +documenttype[0].datatype[0].sstruct.field[2].name "subject" +documenttype[0].datatype[0].sstruct.field[2].id 1797950813 +documenttype[0].datatype[0].sstruct.field[2].id_v6 943449689 +documenttype[0].datatype[0].sstruct.field[2].datatype 2 +documenttype[0].datatype[1].id 749465898 +documenttype[0].datatype[1].type STRUCT +documenttype[0].datatype[1].array.element.id 0 +documenttype[0].datatype[1].map.key.id 0 +documenttype[0].datatype[1].map.value.id 0 +documenttype[0].datatype[1].wset.key.id 0 +documenttype[0].datatype[1].wset.createifnonexistent false +documenttype[0].datatype[1].wset.removeifzero false +documenttype[0].datatype[1].annotationref.annotation.id 0 +documenttype[0].datatype[1].sstruct.name "text/plain.body" +documenttype[0].datatype[1].sstruct.version 0 +documenttype[0].datatype[1].sstruct.compression.type NONE +documenttype[0].datatype[1].sstruct.compression.level 0 +documenttype[0].datatype[1].sstruct.compression.threshold 90 +documenttype[0].datatype[1].sstruct.compression.minsize 0 +documenttype[0].datatype[1].sstruct.field[1] +documenttype[0].datatype[1].sstruct.field[0].name "content" +documenttype[0].datatype[1].sstruct.field[0].id 1721764358 +documenttype[0].datatype[1].sstruct.field[0].id_v6 1751481844 +documenttype[0].datatype[1].sstruct.field[0].datatype 3 +documenttype[0].annotationtype[0] +documenttype[1].id -653677105 +documenttype[1].name "text/html" +documenttype[1].version 0 +documenttype[1].headerstruct 143329936 +documenttype[1].bodystruct 1473469605 +documenttype[1].inherits[0] +documenttype[1].datatype[2] +documenttype[1].datatype[0].id 143329936 +documenttype[1].datatype[0].type STRUCT +documenttype[1].datatype[0].array.element.id 0 +documenttype[1].datatype[0].map.key.id 0 +documenttype[1].datatype[0].map.value.id 0 +documenttype[1].datatype[0].wset.key.id 0 +documenttype[1].datatype[0].wset.createifnonexistent false +documenttype[1].datatype[0].wset.removeifzero false +documenttype[1].datatype[0].annotationref.annotation.id 0 +documenttype[1].datatype[0].sstruct.name "text/html.header" +documenttype[1].datatype[0].sstruct.version 0 +documenttype[1].datatype[0].sstruct.compression.type NONE +documenttype[1].datatype[0].sstruct.compression.level 0 +documenttype[1].datatype[0].sstruct.compression.threshold 90 +documenttype[1].datatype[0].sstruct.compression.minsize 0 +documenttype[1].datatype[0].sstruct.field[3] +documenttype[1].datatype[0].sstruct.field[0].name "author" +documenttype[1].datatype[0].sstruct.field[0].id 644499292 +documenttype[1].datatype[0].sstruct.field[0].id_v6 177126295 +documenttype[1].datatype[0].sstruct.field[0].datatype 2 +documenttype[1].datatype[0].sstruct.field[1].name "date" +documenttype[1].datatype[0].sstruct.field[1].id 491786523 +documenttype[1].datatype[0].sstruct.field[1].id_v6 916979460 +documenttype[1].datatype[0].sstruct.field[1].datatype 0 +documenttype[1].datatype[0].sstruct.field[2].name "subject" +documenttype[1].datatype[0].sstruct.field[2].id 1797950813 +documenttype[1].datatype[0].sstruct.field[2].id_v6 943449689 +documenttype[1].datatype[0].sstruct.field[2].datatype 2 +documenttype[1].datatype[1].id 1473469605 +documenttype[1].datatype[1].type STRUCT +documenttype[1].datatype[1].array.element.id 0 +documenttype[1].datatype[1].map.key.id 0 +documenttype[1].datatype[1].map.value.id 0 +documenttype[1].datatype[1].wset.key.id 0 +documenttype[1].datatype[1].wset.createifnonexistent false +documenttype[1].datatype[1].wset.removeifzero false +documenttype[1].datatype[1].annotationref.annotation.id 0 +documenttype[1].datatype[1].sstruct.name "text/html.body" +documenttype[1].datatype[1].sstruct.version 0 +documenttype[1].datatype[1].sstruct.compression.type NONE +documenttype[1].datatype[1].sstruct.compression.level 0 +documenttype[1].datatype[1].sstruct.compression.threshold 90 +documenttype[1].datatype[1].sstruct.compression.minsize 0 +documenttype[1].datatype[1].sstruct.field[1] +documenttype[1].datatype[1].sstruct.field[0].name "content" +documenttype[1].datatype[1].sstruct.field[0].id 1721764358 +documenttype[1].datatype[1].sstruct.field[0].id_v6 1751481844 +documenttype[1].datatype[1].sstruct.field[0].datatype 3 +documenttype[1].annotationtype[0] +documenttype[2].id 238423572 +documenttype[2].name "testdoctype1" +documenttype[2].version 1 +documenttype[2].headerstruct -226322995 +documenttype[2].bodystruct -1016297758 +documenttype[2].inherits[0] +documenttype[2].datatype[2] +documenttype[2].datatype[0].id -226322995 +documenttype[2].datatype[0].type STRUCT +documenttype[2].datatype[0].array.element.id 0 +documenttype[2].datatype[0].map.key.id 0 +documenttype[2].datatype[0].map.value.id 0 +documenttype[2].datatype[0].wset.key.id 0 +documenttype[2].datatype[0].wset.createifnonexistent false +documenttype[2].datatype[0].wset.removeifzero false +documenttype[2].datatype[0].annotationref.annotation.id 0 +documenttype[2].datatype[0].sstruct.name "testdoctype1.header" +documenttype[2].datatype[0].sstruct.version 1 +documenttype[2].datatype[0].sstruct.compression.type NONE +documenttype[2].datatype[0].sstruct.compression.level 0 +documenttype[2].datatype[0].sstruct.compression.threshold 90 +documenttype[2].datatype[0].sstruct.compression.minsize 0 +documenttype[2].datatype[0].sstruct.field[0] +documenttype[2].datatype[1].id -1016297758 +documenttype[2].datatype[1].type STRUCT +documenttype[2].datatype[1].array.element.id 0 +documenttype[2].datatype[1].map.key.id 0 +documenttype[2].datatype[1].map.value.id 0 +documenttype[2].datatype[1].wset.key.id 0 +documenttype[2].datatype[1].wset.createifnonexistent false +documenttype[2].datatype[1].wset.removeifzero false +documenttype[2].datatype[1].annotationref.annotation.id 0 +documenttype[2].datatype[1].sstruct.name "testdoctype1.body" +documenttype[2].datatype[1].sstruct.version 1 +documenttype[2].datatype[1].sstruct.compression.type NONE +documenttype[2].datatype[1].sstruct.compression.level 0 +documenttype[2].datatype[1].sstruct.compression.threshold 90 +documenttype[2].datatype[1].sstruct.compression.minsize 0 +documenttype[2].datatype[1].sstruct.field[1] +documenttype[2].datatype[1].sstruct.field[0].name "content" +documenttype[2].datatype[1].sstruct.field[0].id 5 +documenttype[2].datatype[1].sstruct.field[0].id_v6 5 +documenttype[2].datatype[1].sstruct.field[0].datatype 2 +documenttype[2].annotationtype[0] diff --git a/storageserver/src/tests/dummystoragelink.cpp b/storageserver/src/tests/dummystoragelink.cpp new file mode 100644 index 00000000000..30953a1fe7c --- /dev/null +++ b/storageserver/src/tests/dummystoragelink.cpp @@ -0,0 +1,182 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include <vespa/storageframework/defaultimplementation/clock/realclock.h> +#include <tests/dummystoragelink.h> +#include <sys/time.h> + +namespace storage { + +DummyStorageLink* DummyStorageLink::_last(0); + +DummyStorageLink::DummyStorageLink() + : StorageLink("Dummy storage link"), + _commands(), + _replies(), + _injected(), + _autoReply(false), + _useDispatch(false), + _ignore(false), + _waitMonitor() +{ + _last = this; +} + +DummyStorageLink::~DummyStorageLink() +{ + // Often a chain with dummy link on top is deleted in unit tests. + // If they haven't been closed already, close them for a cleaner + // shutdown + if (getState() == OPENED) { + close(); + flush(); + } + closeNextLink(); + reset(); +} + +bool DummyStorageLink::onDown(const api::StorageMessage::SP& cmd) +{ + if (_ignore) { + return false; + } + if (_injected.size() > 0) { + vespalib::LockGuard guard(_lock); + sendUp(*_injected.begin()); + _injected.pop_front(); + } else if (_autoReply) { + if (!cmd->getType().isReply()) { + std::shared_ptr<api::StorageReply> reply( + std::dynamic_pointer_cast<api::StorageCommand>(cmd) + ->makeReply().release()); + reply->setResult(api::ReturnCode( + api::ReturnCode::OK, "Automatically generated reply")); + sendUp(reply); + } + } + if (isBottom()) { + vespalib::MonitorGuard lock(_waitMonitor); + { + vespalib::LockGuard guard(_lock); + _commands.push_back(cmd); + } + lock.broadcast(); + return true; + } + return StorageLink::onDown(cmd); +} + +bool DummyStorageLink::onUp(const api::StorageMessage::SP& reply) { + if (isTop()) { + vespalib::MonitorGuard lock(_waitMonitor); + { + vespalib::LockGuard guard(_lock); + _replies.push_back(reply); + } + lock.broadcast(); + return true; + } + return StorageLink::onUp(reply); + +} + +void DummyStorageLink::injectReply(api::StorageReply* reply) +{ + assert(reply); + vespalib::LockGuard guard(_lock); + _injected.push_back(std::shared_ptr<api::StorageReply>(reply)); +} + +void DummyStorageLink::reset() { + vespalib::MonitorGuard lock(_waitMonitor); + vespalib::LockGuard guard(_lock); + _commands.clear(); + _replies.clear(); + _injected.clear(); +} + +void DummyStorageLink::waitForMessages(unsigned int msgCount, int timeout) +{ + framework::defaultimplementation::RealClock clock; + framework::MilliSecTime endTime( + clock.getTimeInMillis() + framework::MilliSecTime(timeout * 1000)); + vespalib::MonitorGuard lock(_waitMonitor); + while (_commands.size() + _replies.size() < msgCount) { + if (timeout != 0 && clock.getTimeInMillis() > endTime) { + std::ostringstream ost; + ost << "Timed out waiting for " << msgCount << " messages to " + << "arrive in dummy storage link. Only " + << (_commands.size() + _replies.size()) << " messages seen " + << "after timout of " << timeout << " seconds was reached."; + throw vespalib::IllegalStateException(ost.str(), VESPA_STRLOC); + } + if (timeout >= 0) { + lock.wait((endTime - clock.getTimeInMillis()).getTime()); + } else { + lock.wait(); + } + } +} + +void DummyStorageLink::waitForMessage(const api::MessageType& type, int timeout) +{ + framework::defaultimplementation::RealClock clock; + framework::MilliSecTime endTime( + clock.getTimeInMillis() + framework::MilliSecTime(timeout * 1000)); + vespalib::MonitorGuard lock(_waitMonitor); + while (true) { + for (uint32_t i=0; i<_commands.size(); ++i) { + if (_commands[i]->getType() == type) return; + } + for (uint32_t i=0; i<_replies.size(); ++i) { + if (_replies[i]->getType() == type) return; + } + if (timeout != 0 && clock.getTimeInMillis() > endTime) { + std::ostringstream ost; + ost << "Timed out waiting for " << type << " message to " + << "arrive in dummy storage link. Only " + << (_commands.size() + _replies.size()) << " messages seen " + << "after timout of " << timeout << " seconds was reached."; + if (_commands.size() == 1) { + ost << " Found command of type " << _commands[0]->getType(); + } + if (_replies.size() == 1) { + ost << " Found command of type " << _replies[0]->getType(); + } + throw vespalib::IllegalStateException(ost.str(), VESPA_STRLOC); + } + if (timeout >= 0) { + lock.wait((endTime - clock.getTimeInMillis()).getTime()); + } else { + lock.wait(); + } + } +} + +api::StorageMessage::SP +DummyStorageLink::getAndRemoveMessage(const api::MessageType& type) +{ + vespalib::MonitorGuard lock(_waitMonitor); + for (std::vector<api::StorageMessage::SP>::iterator it = _commands.begin(); + it != _commands.end(); ++it) + { + if ((*it)->getType() == type) { + api::StorageMessage::SP result(*it); + _commands.erase(it); + return result; + } + } + for (std::vector<api::StorageMessage::SP>::iterator it = _replies.begin(); + it != _replies.end(); ++it) + { + if ((*it)->getType() == type) { + api::StorageMessage::SP result(*it); + _replies.erase(it); + return result; + } + } + std::ostringstream ost; + ost << "No message of type " << type << " found."; + throw vespalib::IllegalStateException(ost.str(), VESPA_STRLOC); +} + +} // storage diff --git a/storageserver/src/tests/dummystoragelink.h b/storageserver/src/tests/dummystoragelink.h new file mode 100644 index 00000000000..cb9df8c5642 --- /dev/null +++ b/storageserver/src/tests/dummystoragelink.h @@ -0,0 +1,115 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vespa/vespalib/util/sync.h> +#include <list> +#include <sstream> +#include <vespa/storageapi/messageapi/storagecommand.h> +#include <string> +#include <vector> +#include <vespa/storage/common/storagelink.h> +#include <vespa/storage/common/bucketmessages.h> +#include <vespa/storageapi/message/internal.h> + +class FastOS_ThreadPool; + +namespace storage { + +class DummyStorageLink : public StorageLink { + + mutable vespalib::Lock _lock; // to protect below containers: + std::vector<api::StorageMessage::SP> _commands; + std::vector<api::StorageMessage::SP> _replies; + std::list<api::StorageMessage::SP> _injected; + + bool _autoReply; + bool _useDispatch; + bool _ignore; + static DummyStorageLink* _last; + vespalib::Monitor _waitMonitor; + +public: + DummyStorageLink(); + ~DummyStorageLink(); + + bool onDown(const api::StorageMessage::SP&); + bool onUp(const api::StorageMessage::SP&); + + void addOnTopOfChain(StorageLink& link) { + link.addTestLinkOnTop(this); + } + + void print(std::ostream& ost, bool verbose, const std::string& indent) const + { + (void) verbose; + ost << indent << "DummyStorageLink(" + << "autoreply = " << (_autoReply ? "on" : "off") + << ", dispatch = " << (_useDispatch ? "on" : "off") + << ", " << _commands.size() << " commands" + << ", " << _replies.size() << " replies"; + if (_injected.size() > 0) + ost << ", " << _injected.size() << " injected"; + ost << ")"; + } + + void injectReply(api::StorageReply* reply); + void reset(); + void setAutoreply(bool autoReply) { _autoReply = autoReply; } + void setIgnore(bool ignore) { _ignore = ignore; } + // Timeout is given in seconds + void waitForMessages(unsigned int msgCount = 1, int timeout = -1); + // Wait for a single message of a given type + void waitForMessage(const api::MessageType&, int timeout = -1); + + api::StorageMessage::SP getCommand(size_t i) const { + vespalib::LockGuard guard(_lock); + api::StorageMessage::SP ret = _commands[i]; + return ret; + } + api::StorageMessage::SP getReply(size_t i) const { + vespalib::LockGuard guard(_lock); + api::StorageMessage::SP ret = _replies[i]; + return ret; + } + size_t getNumCommands() const { + vespalib::LockGuard guard(_lock); + return _commands.size(); + } + size_t getNumReplies() const { + vespalib::LockGuard guard(_lock); + return _replies.size(); + } + + const std::vector<api::StorageMessage::SP>& getCommands() const + { return _commands; } + const std::vector<api::StorageMessage::SP>& getReplies() const + { return _replies; } + + std::vector<api::StorageMessage::SP> getCommandsOnce() { + vespalib::MonitorGuard lock(_waitMonitor); + std::vector<api::StorageMessage::SP> retval; + { + vespalib::LockGuard guard(_lock); + retval.swap(_commands); + } + return retval; + } + + std::vector<api::StorageMessage::SP> getRepliesOnce() { + vespalib::MonitorGuard lock(_waitMonitor); + std::vector<api::StorageMessage::SP> retval; + { + vespalib::LockGuard guard(_lock); + retval.swap(_replies); + } + return retval; + } + + api::StorageMessage::SP getAndRemoveMessage(const api::MessageType&); + + static DummyStorageLink* getLast() { return _last; } +}; + +} + diff --git a/storageserver/src/tests/storageservertest.cpp b/storageserver/src/tests/storageservertest.cpp new file mode 100644 index 00000000000..69c55248eb7 --- /dev/null +++ b/storageserver/src/tests/storageservertest.cpp @@ -0,0 +1,1245 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/fastos/fastos.h> +#include <vespa/storage/storageserver/servicelayernode.h> +#include <vespa/storage/storageserver/distributornode.h> + +#include <vespa/document/base/testdocman.h> +#include <vespa/document/config/config-documenttypes.h> +#include <vespa/documentapi/documentapi.h> +#include <vespa/messagebus/rpcmessagebus.h> +#include <fstream> +#include <vespa/log/log.h> +#include <vespa/memfilepersistence/spi/memfilepersistenceprovider.h> +#include <vespa/messagebus/staticthrottlepolicy.h> +#include <vespa/messagebus/testlib/slobrok.h> +#include <vespa/storageapi/mbusprot/storagecommand.h> +#include <vespa/storageapi/mbusprot/storagereply.h> +#include <vespa/storageapi/message/bucketsplitting.h> +#include <vespa/storageapi/message/persistence.h> +#include <vespa/storageapi/message/state.h> +#include <vespa/storage/common/nodestateupdater.h> +#include <vespa/storage/common/statusmetricconsumer.h> +#include <vespa/memfilepersistence/memfile/memfilecache.h> +#include <tests/testhelper.h> +#include <vespa/vdstestlib/cppunit/macros.h> +#include <tests/dummystoragelink.h> + +#include <vespa/storageserver/app/distributorprocess.h> +#include <vespa/storageserver/app/memfileservicelayerprocess.h> + +LOG_SETUP(".storageservertest"); + +namespace storage { + +namespace { + uint64_t getTimeInMillis() { + struct timeval t; + gettimeofday(&t, 0); + return (t.tv_sec * uint64_t(1000)) + (t.tv_usec / uint64_t(1000)); + } + + class SlobrokMirror { + config::ConfigUri config; + FRT_Supervisor visor; + std::unique_ptr<slobrok::api::MirrorAPI> mirror; + + public: + SlobrokMirror(const config::ConfigUri & cfg) : config(cfg) {} + + void init(uint32_t timeoutms) { + uint64_t timeout = getTimeInMillis() + timeoutms; + visor.Start(); + mirror.reset(new slobrok::api::MirrorAPI(visor, config)); + while (!mirror->ready()) { + if (getTimeInMillis() > timeout) + throw vespalib::IllegalStateException( + "Failed to initialize slobrok mirror within " + "timeout.", VESPA_STRLOC); + FastOS_Thread::Sleep(1); + } + } + + slobrok::api::MirrorAPI& getMirror() { + if (mirror.get() == 0) throw vespalib::IllegalStateException( + "You need to call init() before you can fetch mirror"); + return *mirror; + } + FRT_Supervisor& getSupervisor() { + if (mirror.get() == 0) throw vespalib::IllegalStateException( + "You need to call init() before you can fetch supervisor"); + return visor; + } + + ~SlobrokMirror() { + if (mirror.get() != 0) { + mirror.reset(0); + visor.ShutDown(true); + } + } + }; +} + +struct StorageServerTest : public CppUnit::TestFixture { + std::unique_ptr<FastOS_ThreadPool> threadPool; + std::unique_ptr<document::TestDocMan> docMan; + std::unique_ptr<mbus::Slobrok> slobrok; + std::unique_ptr<vdstestlib::DirConfig> distConfig; + std::unique_ptr<vdstestlib::DirConfig> storConfig; + std::unique_ptr<SlobrokMirror> slobrokMirror; + + void setUp(); + void tearDown(); + + void testNormalUsage(); + void testPortOverlap_Stress(); + void testFailOnNoDisks(); + void testFailOnWrongAmountOfDisks(); + void testOneDiskUnusable(); + void testShutdownDuringDiskLoad(bool storagenode); + void testShutdownStorageDuringDiskLoad(); + void testShutdownDistributorDuringDiskLoad(); + void testShutdownAfterDiskFailure_Stress(); + void testSplitJoinSplitThroughDistributor_Stress(); + void testPriorityAndQueueSneakingWhileSplitJoinStressTest(); + void testStatusPages(); + + CPPUNIT_TEST_SUITE(StorageServerTest); + CPPUNIT_TEST(testNormalUsage); + CPPUNIT_TEST_IGNORED(testPortOverlap_Stress); + CPPUNIT_TEST(testFailOnNoDisks); + CPPUNIT_TEST(testFailOnWrongAmountOfDisks); + CPPUNIT_TEST(testOneDiskUnusable); + CPPUNIT_TEST_IGNORED(testShutdownStorageDuringDiskLoad); + CPPUNIT_TEST_IGNORED(testShutdownDistributorDuringDiskLoad); + CPPUNIT_TEST_IGNORED(testShutdownAfterDiskFailure_Stress); + + // Disabled test for new core... TODO + CPPUNIT_TEST_DISABLED(testSplitJoinSplitThroughDistributor_Stress); + + CPPUNIT_TEST_DISABLED(testPriorityAndQueueSneakingWhileSplitJoinStressTest); + + // Doesn't work in new framework. Will investigate as soon as there's time + CPPUNIT_TEST_DISABLED(testStatusPages); + CPPUNIT_TEST_SUITE_END(); +}; + +CPPUNIT_TEST_SUITE_REGISTRATION(StorageServerTest); + +namespace { + + template<typename T> + struct ConfigReader : public config::IFetcherCallback<T>, + public T + { + ConfigReader(const std::string& configId) { + config::LegacySubscriber subscription; + subscription.subscribe<T>(configId, this); + } + void configure(std::unique_ptr<document::DocumenttypesConfig> c) + { + static_cast<T&>(*this) = *c; + } + }; + + struct Node { + virtual ~Node() {} + virtual StorageNode& getNode() = 0; + virtual StorageNodeContext& getContext() = 0; + + bool attemptedStopped() + { return getNode().attemptedStopped(); } + void waitUntilInitialized(uint32_t timeout) + { getNode().waitUntilInitialized(timeout); } + StorageLink* getChain() { return getNode().getChain(); } + void requestShutdown(const std::string& reason) + { getNode().requestShutdown(reason); } + const framework::StatusReporter* getStatusReporter(const std::string& i) + { return getContext().getComponentRegister().getStatusReporter(i); } + NodeStateUpdater& getStateUpdater() + { return getContext().getComponentRegister().getNodeStateUpdater(); } + }; + + struct Distributor : public Node { + DistributorProcess _process; + + Distributor(vdstestlib::DirConfig& config) + : _process(config.getConfigId()) + { + _process.setupConfig(60000); + _process.createNode(); + } + + virtual StorageNode& getNode() { return _process.getNode(); } + virtual StorageNodeContext& getContext() + { return _process.getContext(); } + distributor::BucketDatabase& getBucketDatabase() + { return _process.getDistributorContext().getComponentRegister().getBucketDatabase(); } + }; + + struct Storage : public Node { + MemFileServiceLayerProcess _process; + StorageComponent::UP _component; + + Storage(vdstestlib::DirConfig& config) : _process(config.getConfigId()) + { + _process.setupConfig(60000); + _process.createNode(); + _component.reset(new StorageComponent( + getContext().getComponentRegister(), "test")); + } + + virtual StorageNode& getNode() { return _process.getNode(); } + virtual StorageNodeContext& getContext() + { return _process.getContext(); } + spi::PartitionStateList getPartitions() + { return _process.getProvider().getPartitionStates().getList(); } + uint16_t getDiskCount() { return getPartitions().size(); } + StorageComponent& getComponent() { return *_component; } + }; +} + +void +StorageServerTest::setUp() +{ + threadPool.reset(new FastOS_ThreadPool(128 * 1024)); + docMan.reset(new document::TestDocMan); + system("chmod -R 755 vdsroot"); + system("rm -rf vdsroot*"); + slobrok.reset(new mbus::Slobrok); + distConfig.reset(new vdstestlib::DirConfig(getStandardConfig(false))); + storConfig.reset(new vdstestlib::DirConfig(getStandardConfig(true))); + addSlobrokConfig(*distConfig, *slobrok); + addSlobrokConfig(*storConfig, *slobrok); + storConfig->getConfig("stor-filestor") + .set("fail_disk_after_error_count", "1"); + system("mkdir -p vdsroot/disks/d0"); + system("mkdir -p vdsroot.distributor"); + slobrokMirror.reset(new SlobrokMirror(slobrok->config())); +} + +void +StorageServerTest::tearDown() +{ + slobrokMirror.reset(NULL); + storConfig.reset(NULL); + distConfig.reset(NULL); + slobrok.reset(NULL); + docMan.reset(NULL); + threadPool.reset(NULL); +} + +void +StorageServerTest::testNormalUsage() +{ + { + Distributor distServer(*distConfig); + Storage storServer(*storConfig); + } +} + +void +StorageServerTest::testFailOnNoDisks() +{ + system("rmdir vdsroot/disks/d0"); + try{ + Storage server(*storConfig); + CPPUNIT_FAIL("Expected exception about no available disks."); + } catch (vespalib::Exception& e) { + CPPUNIT_ASSERT_CONTAIN_MESSAGE(e.what(), + "No disks configured", + e.getMessage()); + } +} + +void +StorageServerTest::testFailOnWrongAmountOfDisks() +{ +/* TODO: Can't be in stor-server config anymore. + storConfig->getConfig("stor-server").set("disk_count", "2"); + try{ + StorageServer server(storConfig->getConfigId()); + CPPUNIT_FAIL("Expected exception about wrong amount of disks."); + } catch (vespalib::Exception& e) { + CPPUNIT_ASSERT_CONTAIN_MESSAGE(e.what(), + "Found 1 disks and config says we're supposed to have 2", + e.getMessage()); + } +*/ +} + +void +StorageServerTest::testOneDiskUnusable() +{ + CPPUNIT_ASSERT(system("rm -rf vdsroot/disks/d0") == 0); + CPPUNIT_ASSERT(system("mkdir -p vdsroot/disks/d1") == 0); + //CPPUNIT_ASSERT(system("ln -s /thisdoesnotexist vdsroot/disks/d0") == 0); + Storage server(*storConfig); + CPPUNIT_ASSERT_EQUAL(2, (int)server.getDiskCount()); + CPPUNIT_ASSERT(!server.getPartitions()[0].isUp()); + CPPUNIT_ASSERT_CONTAIN( + std::string("Disk not found during scanning"), + server.getPartitions()[0].getReason()); + CPPUNIT_ASSERT(server.getPartitions()[1].isUp()); +} + +namespace { + struct LoadGiver : public document::Runnable, + public mbus::IReplyHandler + { + const vdstestlib::DirConfig& _config; + const document::DocumentTypeRepo::SP _repo; + documentapi::LoadTypeSet _loadTypes; + std::unique_ptr<mbus::RPCMessageBus> _mbus; + mbus::SourceSession::UP _sourceSession; + uint32_t _maxPending; + uint32_t _currentPending; + uint32_t _processedOk; + uint32_t _unexpectedErrors; + bool _startedShutdown; + + LoadGiver(const vdstestlib::DirConfig& config, + const document::DocumentTypeRepo::SP repo) + : _config(config), _repo(repo), _mbus(), _sourceSession(), + _maxPending(20), _currentPending(0), _processedOk(0), + _unexpectedErrors(0), _startedShutdown(false) {} + virtual ~LoadGiver() { + if (_sourceSession.get() != 0) { + _sourceSession->close(); + } + } + + void init() { + documentapi::DocumentProtocol::SP protocol( + new documentapi::DocumentProtocol(_loadTypes, _repo)); + storage::mbusprot::StorageProtocol::SP storageProtocol( + new storage::mbusprot::StorageProtocol(_repo, _loadTypes)); + mbus::ProtocolSet protocols; + protocols.add(protocol); + protocols.add(storageProtocol); + mbus::RPCNetworkParams networkParams; + networkParams.setSlobrokConfig(config::ConfigUri(_config.getConfigId())); + _mbus.reset(new mbus::RPCMessageBus(protocols, networkParams, + _config.getConfigId())); + mbus::SourceSessionParams sourceParams; + sourceParams.setTimeout(5000); + mbus::StaticThrottlePolicy::SP policy(new mbus::StaticThrottlePolicy()); + policy->setMaxPendingCount(_maxPending); + sourceParams.setThrottlePolicy(policy); + _sourceSession = _mbus->getMessageBus().createSourceSession( + *this, sourceParams); + } + + virtual void notifyStartingShutdown() { + _startedShutdown = true; + } + + virtual void handleReply(mbus::Reply::UP reply) { + using documentapi::DocumentProtocol; + --_currentPending; + if (!reply->hasErrors()) { + ++_processedOk; + } else if (!_startedShutdown && reply->getNumErrors() > 1) { + ++_unexpectedErrors; + std::cerr << reply->getNumErrors() << " errors. First: " + << reply->getError(0).getCode() << " - " + << reply->getError(0).getMessage() << "\n"; + } else { + int code = reply->getError(0).getCode(); + std::string errorMsg = reply->getError(0).getMessage(); + + if (code == mbus::ErrorCode::UNKNOWN_SESSION + || code == mbus::ErrorCode::NO_ADDRESS_FOR_SERVICE + || code == mbus::ErrorCode::CONNECTION_ERROR + || code == mbus::ErrorCode::HANDSHAKE_FAILED) + { + // Ignore + } else if ((code >= mbus::ErrorCode::TRANSIENT_ERROR + && code < mbus::ErrorCode::FATAL_ERROR) + && (errorMsg.find("UNKNOWN_SESSION") != std::string::npos + || errorMsg.find("NO_ADDRESS_FOR_SERVICE") + != std::string::npos + || errorMsg.find("when node is in state Stopping") + != std::string::npos + || errorMsg.find("HANDSHAKE_FAILED") + != std::string::npos + || code == mbus::ErrorCode::APP_TRANSIENT_ERROR + || code == DocumentProtocol::ERROR_IO_FAILURE + || code == DocumentProtocol::ERROR_ABORTED + || code == DocumentProtocol::ERROR_BUCKET_NOT_FOUND)) + { + // Ignore + } else { + ++_unexpectedErrors; + std::cerr << reply->getNumErrors() << " errors. First: " + << reply->getError(0).getCode() << " - " + << reply->getError(0).getMessage(); + mbus::Message::UP msg(reply->getMessage()); + if (msg->getType() == DocumentProtocol::MESSAGE_PUTDOCUMENT) + { + documentapi::PutDocumentMessage& putMsg( + static_cast<documentapi::PutDocumentMessage&>( + *msg)); + std::cerr << " - " << putMsg.getDocument()->getId(); + } + std::cerr << "\n"; + } + } + } + + void waitUntilDecentLoad(uint32_t maxWait = 60000) { + uint64_t maxTime = getTimeInMillis() + maxWait; + while (true) { + if (_processedOk > 5 && _currentPending > _maxPending / 2) { + break; + } + uint64_t time = getTimeInMillis(); + if (time > maxTime) { + if (_processedOk < 5) { + throw vespalib::IllegalStateException( + "Failed to process 5 ok operations within timeout.", + VESPA_STRLOC); + } + if (_currentPending < _maxPending / 2) { + throw vespalib::IllegalStateException( + "Failed to get enough max pending.", + VESPA_STRLOC); + } + break; + } + FastOS_Thread::Sleep(1); + } + LOG(info, "Currently, we have received %u ok replies and have %u " + "pending ones.", + _processedOk, _currentPending); + } + }; + + struct SimpleLoadGiver : public LoadGiver { + const document::TestDocMan& _testDocMan; + vespalib::Monitor _threadMonitor; + + SimpleLoadGiver(const vdstestlib::DirConfig& config, + const document::TestDocMan& tdm) + : LoadGiver(config, tdm.getTypeRepoSP()), _testDocMan(tdm), + _threadMonitor() {} + ~SimpleLoadGiver() { + stop(); + join(); + } + virtual bool onStop() { + vespalib::MonitorGuard monitor(_threadMonitor); + monitor.signal(); + return true; + } + void run() { + uint32_t seed = 0; + uint32_t maxDocSize = 65536; + init(); + vespalib::MonitorGuard monitor(_threadMonitor); + while (running()) { + uint32_t attemptCount = 0; + while (_currentPending < _maxPending + && ++attemptCount < _maxPending) + { + document::Document::SP doc( + _testDocMan.createRandomDocument( + ++seed, maxDocSize)); + mbus::Message::UP msg( + new documentapi::PutDocumentMessage(doc)); + msg->setRetryEnabled(false); + mbus::Result r = _sourceSession->send(std::move(msg), + "storage/cluster.storage/distributor/0/default", + true); + if (r.isAccepted()){ + ++_currentPending; + } else { + if (!_startedShutdown) { + std::cerr << "Source session did not accept " + "message.\n"; + } + break; + } + } + monitor.wait(1); + } + } + }; + + void setSystemState(SlobrokMirror& mirror, + const lib::ClusterState& state, + const std::vector<std::string>& address) + { + std::string systemState = state.toString(); + auto deleter = [](auto * ptr) { ptr->SubRef(); }; + for (uint32_t i=0; i<address.size(); ++i) { + slobrok::api::MirrorAPI::SpecList list( + mirror.getMirror().lookup(address[i])); + for (uint32_t j=0; j<list.size(); ++j) { + auto target = std::unique_ptr<FRT_Target, decltype(deleter)>(mirror.getSupervisor().GetTarget( + list[j].second.c_str()), deleter); + auto req = std::unique_ptr<FRT_RPCRequest, decltype(deleter)>(mirror.getSupervisor().AllocRPCRequest(), + deleter); + req->SetMethodName("setsystemstate2"); + req->GetParams()->AddString(systemState.c_str()); + target->InvokeSync(req.get(), 5.0); + if (req->GetErrorCode() != FRTE_NO_ERROR) { + throw vespalib::IllegalStateException( + "Failed sending setsystemstate request: " + + std::string(req->GetErrorMessage()), VESPA_STRLOC); + } + } + } + } +} + +void +StorageServerTest::testShutdownDuringDiskLoad(bool storagenode) +{ + slobrokMirror->init(5000); + // Verify that, then shutdown, we stop accepting new messages, fail + // all messages enqueued and finish current operations before shutting + // down without any errors. + std::unique_ptr<Distributor> distServer(new Distributor(*distConfig)); + std::unique_ptr<Storage> storServer(new Storage(*storConfig)); + storServer->waitUntilInitialized(30); + LOG(info, "\n\nStorage server stable\n\n"); + lib::ClusterState state("version:1 bits:1 distributor:1 storage:1"); + std::vector<std::string> addresses; + addresses.push_back("storage/cluster.storage/storage/0"); + addresses.push_back("storage/cluster.storage/distributor/0"); + LOG(info, "\n\nSetting system states\n\n"); + setSystemState(*slobrokMirror, state, addresses); + LOG(info, "\n\nWaiting for stable distributor server\n\n"); + distServer->waitUntilInitialized(30); + + LOG(info, "\n\nSTARTING LOADGIVER\n\n"); + + SimpleLoadGiver loadGiver(*distConfig, *docMan); + loadGiver.start(*threadPool); + loadGiver.waitUntilDecentLoad(); + + loadGiver.notifyStartingShutdown(); + + if (storagenode) { + LOG(info, "\n\nKILLING STORAGE NODE\n\n"); + storServer->requestShutdown( + "Stopping storage server during load for testing"); + storServer.reset(0); + } else { + LOG(info, "\n\nKILLING DISTRIBUTOR\n\n"); + distServer->requestShutdown( + "Stopping distributor during load for testing"); + distServer.reset(0); + } + LOG(info, "\n\nDONE KILLING NODE. Cleaning up other stuff.\n\n"); + + CPPUNIT_ASSERT_EQUAL(0u, loadGiver._unexpectedErrors); +} + +void +StorageServerTest::testShutdownStorageDuringDiskLoad() +{ + testShutdownDuringDiskLoad(true); +} + +void +StorageServerTest::testShutdownDistributorDuringDiskLoad() +{ + testShutdownDuringDiskLoad(false); +} + +void +StorageServerTest::testShutdownAfterDiskFailure_Stress() +{ + slobrokMirror->init(5000); + + // Verify that, then shutdown, we stop accepting new messages, fail + // all messages enqueued and finish current operations before shutting + // down without any errors. + std::unique_ptr<Distributor> distServer(new Distributor(*distConfig)); + std::unique_ptr<Storage> storServer(new Storage(*storConfig)); + //storServer->getSlotFileCache().disable(); + storServer->waitUntilInitialized(30); + LOG(info, "\n\nStorage server stable\n\n"); + lib::ClusterState state("version:1 bits:1 distributor:1 storage:1"); + std::vector<std::string> addresses; + addresses.push_back("storage/cluster.storage/storage/0"); + addresses.push_back("storage/cluster.storage/distributor/0"); + LOG(info, "\n\nSetting system states\n\n"); + setSystemState(*slobrokMirror, state, addresses); + LOG(info, "\n\nWaiting for stable distributor server\n\n"); + distServer->waitUntilInitialized(30); + + LOG(info, "\n\nSTARTING LOADGIVER\n\n"); + + SimpleLoadGiver loadGiver(*distConfig, *docMan); + loadGiver.start(*threadPool); + loadGiver.waitUntilDecentLoad(); + + // Test that getting io errors flags storage for shutdown + // (The shutdown is the responsibility of the application in + // storageserver) + CPPUNIT_ASSERT(!storServer->attemptedStopped()); + loadGiver.notifyStartingShutdown(); + LOG(info, "\n\nREMOVING PERMISSIONS\n\n"); + system("chmod 000 vdsroot/disks/d0/*.0"); + system("ls -ld vdsroot/disks/d0/* > permissions"); + + for (uint32_t i=0; i<6000; ++i) { + //storServer->getMemFileCache().clear(); + if (storServer->attemptedStopped()) break; + FastOS_Thread::Sleep(10); + } + if (!storServer->attemptedStopped()) { + CPPUNIT_FAIL("Removing permissions from disk failed to stop storage " + "within timeout of 60 seconds"); + } + + CPPUNIT_ASSERT_EQUAL(0u, loadGiver._unexpectedErrors); + unlink("permissions"); +} + +void +StorageServerTest::testSplitJoinSplitThroughDistributor_Stress() +{ + // Setup system with storage and distributor + distConfig->getConfig("stor-distributormanager") + .set("splitcount", "10"); + distConfig->getConfig("stor-distributormanager") + .set("joincount", "5"); + distConfig->getConfig("stor-server") + .set("switch_new_meta_data_flow", "true"); + storConfig->getConfig("stor-filestor") + .set("revert_time_period", "1"); + storConfig->getConfig("stor-filestor") + .set("keep_remove_time_period", "1"); + storConfig->getConfig("stor-integritychecker") + .set("mincycletime", "0"); + Distributor distServer(*distConfig); + Storage storServer(*storConfig); + DummyStorageLink dummyLink; + dummyLink.addOnTopOfChain(*distServer.getChain()); + DummyStorageLink sdummyLink; + sdummyLink.addOnTopOfChain(*storServer.getChain()); + + api::SetSystemStateCommand::SP scmd(new api::SetSystemStateCommand( + lib::ClusterState("version:3 distributor:1 storage:1"))); + for (uint32_t i=0; i<500; ++i) { + if (storServer.getStateUpdater().getReportedNodeState()->getState() + == lib::State::UP) break; + FastOS_Thread::Sleep(10); + } + CPPUNIT_ASSERT( + storServer.getStateUpdater().getReportedNodeState()->getState() + == lib::State::UP); + distServer.getChain()->sendDown(scmd); + storServer.getChain()->sendDown(scmd); + dummyLink.waitForMessages(1, 15); + dummyLink.reset(); + sdummyLink.waitForMessages(1, 15); + sdummyLink.reset(); + for (uint32_t i=0; i<500; ++i) { + if (distServer.getStateUpdater().getReportedNodeState()->getState() + == lib::State::UP + && distServer.getStateUpdater().getSystemState()->getVersion() == 3) + { + break; + } + FastOS_Thread::Sleep(10); + } + CPPUNIT_ASSERT_EQUAL(lib::State::UP, + distServer.getStateUpdater().getReportedNodeState()->getState()); + CPPUNIT_ASSERT_EQUAL(3u, + storServer.getStateUpdater().getSystemState()->getVersion()); + CPPUNIT_ASSERT_EQUAL(3u, + distServer.getStateUpdater().getSystemState()->getVersion()); + // Feed 50 documents + document::BucketId bucket(16, 0); + for (uint32_t i=0; i<50; ++i) { + document::Document::SP doc( + docMan->createRandomDocumentAtLocation(0, i, 1024)); + api::PutCommand::SP cmd(new api::PutCommand(bucket, doc, 1000 + i)); + distServer.getChain()->sendDown(cmd); + } + dummyLink.waitForMessages(50, 15); + for (uint32_t i=0; i<50; ++i) { + CPPUNIT_ASSERT_EQUAL(api::MessageType::PUT_REPLY, + dummyLink.getReplies()[i]->getType()); + api::PutReply& reply( + dynamic_cast<api::PutReply&>(*dummyLink.getReplies()[i])); + CPPUNIT_ASSERT_EQUAL(api::ReturnCode(api::ReturnCode::OK), + reply.getResult()); + } + dummyLink.reset(); + // Wait until system has split to 7 buckets + distributor::BucketDatabase& db(distServer.getBucketDatabase()); + for (size_t i(0); (i < 6000) && (7ul != db.size()); i++) { + FastOS_Thread::Sleep(10); + } + CPPUNIT_ASSERT_EQUAL_MSG(db.toString(), size_t(7), db.size()); + + // Remove 40 first documents + for (uint32_t i=0; i<40; ++i) { + document::Document::SP doc( + docMan->createRandomDocumentAtLocation(0, i, 1024)); + api::RemoveCommand::SP cmd(new api::RemoveCommand( + bucket, doc->getId(), 2000 + i)); + distServer.getChain()->sendDown(cmd); + } + dummyLink.waitForMessages(40, 15); + for (uint32_t i=0; i<40; ++i) { + CPPUNIT_ASSERT_EQUAL(api::MessageType::REMOVE_REPLY, + dummyLink.getReplies()[i]->getType()); + api::RemoveReply& reply( + dynamic_cast<api::RemoveReply&>(*dummyLink.getReplies()[i])); + CPPUNIT_ASSERT_EQUAL(api::ReturnCode(api::ReturnCode::OK), + reply.getResult()); + } + dummyLink.reset(); + /* + framework::defaultimplementation::RealClock clock; + framework::MilliSecTime endTime = clock.getTimeInMillis() + + framework::MilliSecTime(100 * 1000); + + Join dont happen anymore, as we dont join due to meta entries, used file + * size. Need to wait long enough for data to be compacted away if so. + + // Wait until we have joined the 10 documents back to 3 buckets + while (db.size() != 3ul) { +// std::cerr << db.toString(true); + + if (clock.getTimeInMillis() > endTime) { + CPPUNIT_ASSERT_EQUAL_MSG( + "Failed to join buckets within timeout of 60 seconds" + + db.toString(true), 3ul, db.size()); + } + +// FastOS_Thread::Sleep(1000); + } + CPPUNIT_ASSERT(db.get(document::BucketId(0x8800000000000000)).valid()); + CPPUNIT_ASSERT(db.get(document::BucketId(0x8800000200000000)).valid()); + CPPUNIT_ASSERT(db.get(document::BucketId(0x8400000100000000)).valid()); + */ +} + +namespace { + + struct PriorityStorageLoadGiver : public LoadGiver { + const document::TestDocMan& _testDocMan; + vespalib::Monitor _threadMonitor; + StorBucketDatabase _bucketDB; + document::BucketIdFactory _idFactory; + uint32_t _putCount; + uint32_t _getCount; + uint32_t _removeCount; + uint32_t _joinCount; + uint32_t _splitCount; + uint32_t _createBucket; + uint32_t _deleteBucket; + uint32_t _remappedOperations; + uint32_t _notFoundOps; + uint32_t _existOps; + uint32_t _bucketDeletedOps; + uint32_t _bucketNotFoundOps; + uint32_t _rejectedOps; + + PriorityStorageLoadGiver(const vdstestlib::DirConfig& config, + const document::TestDocMan& tdm) + : LoadGiver(config, tdm.getTypeRepoSP()), _testDocMan(tdm), + _threadMonitor(), + _bucketDB(), _idFactory(), + _putCount(0), _getCount(0), _removeCount(0), _joinCount(0), + _splitCount(0), _createBucket(0), _deleteBucket(0), + _remappedOperations(0), _notFoundOps(0), _existOps(0), + _bucketDeletedOps(0), _bucketNotFoundOps(0), _rejectedOps(0) {} + ~PriorityStorageLoadGiver() { + close(); + } + virtual void close() { + if (running()) { + stop(); + join(); + } + } + virtual bool onStop() { + vespalib::MonitorGuard monitor(_threadMonitor); + monitor.signal(); + return true; + } + void run() { + uint32_t seed = 0; + uint32_t maxDocSize = 65536; + init(); + vespalib::MonitorGuard monitor(_threadMonitor); + std::list<mbusprot::StorageCommand*> sendList; + while (running()) { + while (sendList.size() < (_maxPending - _currentPending)) { + document::Document::SP doc( + _testDocMan.createRandomDocument( + ++seed, maxDocSize)); + api::StorageCommand::SP cmd; + document::BucketId bucket( + _idFactory.getBucketId(doc->getId())); + std::map<document::BucketId, + StorBucketDatabase::WrappedEntry> entries( + _bucketDB.getContained(bucket, "")); + if (entries.size() == 0 + || (entries.size() == 1 + && (entries.begin()->second->getBucketInfo().getChecksum() & 2) != 0)) + { + if (entries.size() == 0) { + bucket.setUsedBits(4); + bucket = bucket.stripUnused(); + entries[bucket] = _bucketDB.get(bucket, "", + StorBucketDatabase::CREATE_IF_NONEXISTING); + entries[bucket]->setChecksum(0); + } else { + bucket = entries.begin()->first; + entries[bucket]->setChecksum( + entries[bucket]->getBucketInfo().getChecksum() & ~2); + } + entries[bucket]->disk = 0; + entries[bucket].write(); + entries[bucket] = _bucketDB.get(bucket, "foo"); + CPPUNIT_ASSERT(entries[bucket].exist()); + cmd.reset(new api::CreateBucketCommand(bucket)); + sendList.push_back(new mbusprot::StorageCommand(cmd)); + } + CPPUNIT_ASSERT_EQUAL(size_t(1), entries.size()); + bucket = entries.begin()->first; + StorBucketDatabase::WrappedEntry entry( + entries.begin()->second); + if (seed % 95 == 93) { // Delete bucket + if ((entry->getBucketInfo().getChecksum() & 2) == 0) { + cmd.reset(new api::DeleteBucketCommand(bucket)); + entry->setChecksum( + entry->getBucketInfo().getChecksum() | 2); + entry.write(); + sendList.push_back( + new mbusprot::StorageCommand(cmd)); + } + } else if (seed % 13 == 8) { // Join + if (entry->getBucketInfo().getChecksum() == 0 && bucket.getUsedBits() > 3) { + // Remove existing locks we have to not cause + // deadlock + entry = StorBucketDatabase::WrappedEntry(); + entries.clear(); + // Then continue + document::BucketId super(bucket.getUsedBits() - 1, + bucket.getRawId()); + super = super.stripUnused(); + api::JoinBucketsCommand::SP jcmd( + new api::JoinBucketsCommand(super)); + entries = _bucketDB.getAll(super, "foo"); + bool foundAnyLocked = false; + for (std::map<document::BucketId, + StorBucketDatabase::WrappedEntry> + ::iterator it = entries.begin(); + it != entries.end(); ++it) + { + if (!super.contains(it->first) || super == it->first) continue; + jcmd->getSourceBuckets().push_back( + it->first.stripUnused()); + foundAnyLocked |= (it->second->getBucketInfo().getChecksum() != 0); + } + if (!foundAnyLocked && jcmd->getSourceBuckets().size() == 2) { + for (std::map<document::BucketId, + StorBucketDatabase::WrappedEntry> + ::iterator it = entries.begin(); + it != entries.end(); ++it) + { + if (!super.contains(it->first)) continue; + it->second->setChecksum( + it->second->getBucketInfo().getChecksum() | 1); + it->second.write(); + } + cmd = jcmd; + sendList.push_back( + new mbusprot::StorageCommand(cmd)); + } + } + } else if (seed % 13 == 1) { // Split + // Use _checksum == 1 to mean that we have a pending + // maintenance operation to this bucket. + if (entry->getBucketInfo().getChecksum() == 0) { + cmd.reset(new api::SplitBucketCommand(bucket)); + entry->setChecksum(1); + entry.write(); + sendList.push_back( + new mbusprot::StorageCommand(cmd)); + } + } else if (seed % 7 == 5) { // Remove + if ((entry->getBucketInfo().getChecksum() & 2) == 0) { + cmd.reset(new api::RemoveCommand(bucket, + doc->getId(), 1000ull * seed + 2)); + sendList.push_back( + new mbusprot::StorageCommand(cmd)); + } + } else if (seed % 5 == 3) { // Get + if ((entry->getBucketInfo().getChecksum() & 2) == 0) { + cmd.reset(new api::GetCommand( + bucket, doc->getId(), "[all]")); + sendList.push_back( + new mbusprot::StorageCommand(cmd)); + } + } else { // Put + if ((entry->getBucketInfo().getChecksum() & 2) == 0) { + cmd.reset(new api::PutCommand( + bucket, doc, 1000ull * seed + 1)); + sendList.push_back( + new mbusprot::StorageCommand(cmd)); + } + } + if (!sendList.empty()) { + uint8_t priorities[] = { + api::StorageMessage::LOW, + api::StorageMessage::NORMAL, + api::StorageMessage::HIGH, + api::StorageMessage::VERYHIGH + }; + sendList.back()->getCommand()->setPriority(priorities[seed % 4]); + } + } + if (sendList.size() > 0) { + uint32_t sent = 0; + for (uint32_t i=0; i<sendList.size(); ++i) { + mbus::Message::UP msg(*sendList.begin()); + msg->setRetryEnabled(false); + mbus::Result r = _sourceSession->send(std::move(msg), + "storage/cluster.storage/storage/0/default", + true); + if (r.isAccepted()){ + sendList.pop_front(); + ++_currentPending; + ++sent; + } else { + r.getMessage().release(); + break; + } + } + } + monitor.wait(1); + } + } + + std::string report() { + std::ostringstream ost; + ost << "Performed (" + << _putCount << ", " << _getCount << ", " + << _removeCount << ", " << _splitCount << ", " + << _joinCount << ", " << _createBucket << ", " + << _deleteBucket + << ") put/get/remove/split/join/create/delete operations.\n" + << "Result: " << _remappedOperations << " remapped operations\n" + << " " << _processedOk << " ok responses.\n" + << " " << _notFoundOps << " NOT_FOUND responses.\n" + << " " << _existOps << " EXISTS responses\n" + << " " << _bucketDeletedOps << " BUCKET_DELETED responses\n" + << " " << _bucketNotFoundOps << " BUCKET_NOT_FOUND responses\n" + << " " << _rejectedOps << " REJECTED responses (duplicate splits)\n" + << " " << _unexpectedErrors << " unexpected errors\n"; + return ost.str(); + } + + virtual void handleReply(mbus::Reply::UP reply) { + if (_startedShutdown) return; + --_currentPending; + std::ostringstream err; + mbusprot::StorageReply* mreply( + dynamic_cast<mbusprot::StorageReply*>(reply.get())); + if (mreply == 0) { + ++_unexpectedErrors; + err << "Got unexpected reply which is not a storage reply, " + << "likely emptyreply."; + if (reply->hasErrors()) { + int code = reply->getError(0).getCode(); + std::string errorMsg = reply->getError(0).getMessage(); + err << "\n mbus(" << code << "): '" << errorMsg << "'"; + } + err << "\n"; + std::cerr << err.str(); + return; + } + api::StorageReply& sreply(*mreply->getReply()); + api::BucketReply& breply(static_cast<api::BucketReply&>(sreply)); + + if ((!reply->hasErrors() + && sreply.getResult().success()) + || sreply.getResult().getResult() == api::ReturnCode::EXISTS + || sreply.getResult().getMessage().find("Bucket does not exist; assuming already split") != std::string::npos + || sreply.getResult().getResult() + == api::ReturnCode::BUCKET_DELETED + || sreply.getResult().getResult() + == api::ReturnCode::BUCKET_NOT_FOUND) + { + std::ostringstream out; + if (breply.hasBeenRemapped()) { + ++_remappedOperations; + } + if (sreply.getType() == api::MessageType::JOINBUCKETS_REPLY) { + vespalib::MonitorGuard monitor(_threadMonitor); + api::JoinBucketsReply& joinReply( + static_cast<api::JoinBucketsReply&>(sreply)); + StorBucketDatabase::WrappedEntry entry( + _bucketDB.get(joinReply.getBucketId(), "", + StorBucketDatabase::CREATE_IF_NONEXISTING)); + entry->setChecksum(0); + entry->disk = 0; + entry.write(); + for(std::vector<document::BucketId>::const_iterator it + = joinReply.getSourceBuckets().begin(); + it != joinReply.getSourceBuckets().end(); ++it) + { + _bucketDB.erase(*it, "foo"); + } + ++_joinCount; + out << "OK " << joinReply.getBucketId() << " Join\n"; + } else if (sreply.getType() + == api::MessageType::SPLITBUCKET_REPLY + && sreply.getResult().getResult() != api::ReturnCode::REJECTED) + { + vespalib::MonitorGuard monitor(_threadMonitor); + api::SplitBucketReply& splitReply( + static_cast<api::SplitBucketReply&>(sreply)); + StorBucketDatabase::WrappedEntry entry( + _bucketDB.get(splitReply.getBucketId(), "foo")); + if (entry.exist()) { + //CPPUNIT_ASSERT((entry->getBucketInfo().getChecksum() & 1) != 0); + entry.remove(); + } + for(std::vector<api::SplitBucketReply::Entry>::iterator it + = splitReply.getSplitInfo().begin(); + it != splitReply.getSplitInfo().end(); ++it) + { + entry = _bucketDB.get(it->first, "foo", + StorBucketDatabase::CREATE_IF_NONEXISTING); + entry->setChecksum(0); + entry->disk = 0; + entry.write(); + } + ++_splitCount; + out << "OK " << splitReply.getBucketId() << " Split\n"; + } else if (sreply.getType() == api::MessageType::PUT_REPLY) { + ++_putCount; + if (!static_cast<api::PutReply&>(sreply).wasFound()) { + ++_notFoundOps; + } + out << "OK " << breply.getBucketId() << " Put\n"; + } else if (sreply.getType() == api::MessageType::GET_REPLY) { + ++_getCount; + if (!static_cast<api::GetReply&>(sreply).wasFound()) { + ++_notFoundOps; + } + out << "OK " << breply.getBucketId() << " Get\n"; + } else if (sreply.getType() == api::MessageType::REMOVE_REPLY) { + ++_removeCount; + if (!static_cast<api::RemoveReply&>(sreply).wasFound()) { + ++_notFoundOps; + } + out << "OK " << breply.getBucketId() << " Remove\n"; + } else if (sreply.getType() + == api::MessageType::CREATEBUCKET_REPLY) + { + ++_createBucket; + out << "OK " << breply.getBucketId() << " Create\n"; + } else if (sreply.getType() + == api::MessageType::DELETEBUCKET_REPLY) + { + ++_deleteBucket; + out << "OK " << breply.getBucketId() << " Delete\n"; + } + switch (sreply.getResult().getResult()) { + case api::ReturnCode::EXISTS: ++_existOps; break; + case api::ReturnCode::BUCKET_NOT_FOUND: + ++_bucketNotFoundOps; break; + case api::ReturnCode::BUCKET_DELETED: + ++_bucketDeletedOps; break; + case api::ReturnCode::REJECTED: + ++_rejectedOps; break; + case api::ReturnCode::OK: ++_processedOk; break; + default: + assert(false); + } + //std::cerr << "OK - " << sreply.getType() << "\n"; + if (_processedOk % 5000 == 0) { + out << report(); + } + //err << out.str(); + } else { + ++_unexpectedErrors; + api::BucketReply& brep(static_cast<api::BucketReply&>(sreply)); + err << "Failed " << brep.getBucketId() << " " + << sreply.getType().getName() << ":"; + if (reply->hasErrors()) { + int code = reply->getError(0).getCode(); + std::string errorMsg = reply->getError(0).getMessage(); + err << " mbus(" << code << "): '" << errorMsg << "'"; + } + if (sreply.getResult().failed()) { + err << " sapi: " << sreply.getResult() << "\n"; + } + } + std::cerr << err.str(); + } + }; + + enum StateType { REPORTED, CURRENT }; + void waitForStorageUp(StorageComponent& storageNode, + StateType type, time_t timeoutMS = 60000) + { + framework::defaultimplementation::RealClock clock; + framework::MilliSecTime timeout = clock.getTimeInMillis() + + framework::MilliSecTime(timeoutMS); + while (true) { + lib::NodeState::CSP ns(type == REPORTED + ? storageNode.getStateUpdater().getReportedNodeState() + : storageNode.getStateUpdater().getCurrentNodeState()); + if (ns->getState() == lib::State::UP) return; + if (clock.getTimeInMillis() > timeout) { + std::ostringstream ost; + ost << "Storage node failed to get up within timeout of " + << timeoutMS << " ms. Current state is: " << ns; + CPPUNIT_FAIL(ost.str()); + } + FastOS_Thread::Sleep(10); + } + } + +} + +void +StorageServerTest::testPriorityAndQueueSneakingWhileSplitJoinStressTest() +{ + PriorityStorageLoadGiver loadGiver(*storConfig, *docMan); + Storage storServer(*storConfig); + waitForStorageUp(storServer.getComponent(), REPORTED); + api::SetSystemStateCommand::SP cmd(new api::SetSystemStateCommand( + lib::ClusterState("storage:1"))); + storServer.getChain()->sendDown(cmd); + waitForStorageUp(storServer.getComponent(), CURRENT); + loadGiver.start(*threadPool); + while (loadGiver._processedOk + loadGiver._unexpectedErrors < 1000) { + FastOS_Thread::Sleep(100); + std::cerr << "OK " << loadGiver._processedOk << " Errors: " + << loadGiver._unexpectedErrors << "\n"; + } + loadGiver.notifyStartingShutdown(); + loadGiver.stop(); + loadGiver.close(); + std::cerr << loadGiver.report(); + CPPUNIT_ASSERT(loadGiver._bucketNotFoundOps < 300); + CPPUNIT_ASSERT(loadGiver._unexpectedErrors == 0); +} + +// This test is not a stress test, but adding stress to its name makes it not +// run during regular make test runs +void +StorageServerTest::testPortOverlap_Stress() +{ + for (uint32_t i=0; i<3; ++i) { + std::cerr << "Run " << i << "\n"; + tearDown(); + setUp(); + const char* config = "stor-communicationmanager"; + std::string type = "none"; + if (i == 0) { + distConfig->getConfig(config).set("mbusport", "12301"); + storConfig->getConfig(config).set("mbusport", "12311"); + type = "mbusport"; + } else if (i == 1) { + distConfig->getConfig(config).set("rpcport", "12302"); + storConfig->getConfig(config).set("rpcport", "12312"); + type = "rpcport"; + } else if (i == 2) { + distConfig->getConfig("stor-status").set("httpport", "12303"); + storConfig->getConfig("stor-status").set("httpport", "12313"); + type = "httpport"; + } + LOG(info, "TEST: (0) STARTING PORT TEST: %s", type.c_str()); + slobrokMirror->init(5000); + + std::unique_ptr<Distributor> distServerOld(new Distributor(*distConfig)); + std::unique_ptr<Storage> storServerOld(new Storage(*storConfig)); + + LOG(info, "TEST: (1) WAITING FOR STABLE STORAGE SERVER"); + storServerOld->waitUntilInitialized(30); + LOG(info, "TEST: (2) STORAGE SERVER STABLE"); + lib::ClusterState state("version:1 distributor:1 storage:1"); + std::vector<std::string> addresses; + addresses.push_back("storage/cluster.storage/storage/0"); + addresses.push_back("storage/cluster.storage/distributor/0"); + LOG(info, "TEST: (3) SETTING SYSTEM STATES"); + setSystemState(*slobrokMirror, state, addresses); + LOG(info, "TEST: (4) WAITING FOR STABLE DISTRIBUTOR SERVER"); + distServerOld->waitUntilInitialized(30); + + { + LOG(info, "TEST: (5) ADDING SOME LOAD TO CHECK PORTS"); + SimpleLoadGiver loadGiver(*distConfig, *docMan); + loadGiver.start(*threadPool); + loadGiver.waitUntilDecentLoad(); + } + + LOG(info, "TEST: (6) CREATING NEW SET OF SERVERS"); + try{ + Distributor distServer(*distConfig); + CPPUNIT_FAIL("Distributor server failed to fail on busy " + type); + } catch (vespalib::Exception& e) { + std::string msg = e.getMessage(); + std::string::size_type pos = msg.rfind(':'); + if (pos != std::string::npos) msg = msg.substr(pos + 2); + if (msg == "Failed to listen to RPC port 12302." || + msg == "Failed to start network." || + msg == "Failed to start status HTTP server using port 12303.") + { + } else { + CPPUNIT_FAIL("Unexpected exception: " + msg); + } + } + try{ + Storage storServer(*storConfig); + CPPUNIT_FAIL("Storage server failed to fail on busy " + type); + } catch (vespalib::Exception& e) { + std::string msg = e.getMessage(); + std::string::size_type pos = msg.rfind(':'); + if (pos != std::string::npos) msg = msg.substr(pos + 2); + if (msg == "Failed to listen to RPC port 12312." || + msg == "Failed to start network." || + msg == "Failed to start status HTTP server using port 12313.") + { + } else { + CPPUNIT_FAIL("Unexpected exception: " + msg); + } + } + } +} + +void +StorageServerTest::testStatusPages() +{ + Storage storServer(*storConfig); + // Bucket manager doesn't set up metrics before after talking to + // persistence layer + storServer.getNode().waitUntilInitialized(); + { + // Get HTML status pages + framework::HttpUrlPath path("?interval=-2&format=html"); + std::ostringstream ost; + try{ + const framework::StatusReporter* reporter( + storServer.getStatusReporter("statusmetricsconsumer")); + CPPUNIT_ASSERT(reporter != 0); + reporter->reportStatus(ost, path); + } catch (std::exception& e) { + CPPUNIT_FAIL("Failed to get status metric page: " + + std::string(e.what()) + "\nGot so far: " + ost.str()); + } + std::string output = ost.str(); + CPPUNIT_ASSERT_MSG(output, + output.find("Exception") == std::string::npos); + CPPUNIT_ASSERT_MSG(output, output.find("Error") == std::string::npos); + } +} + +} // storage diff --git a/storageserver/src/tests/testhelper.cpp b/storageserver/src/tests/testhelper.cpp new file mode 100644 index 00000000000..0aba3aeb7cf --- /dev/null +++ b/storageserver/src/tests/testhelper.cpp @@ -0,0 +1,170 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include <tests/testhelper.h> + +#include <vespa/log/log.h> +#include <vespa/vespalib/io/fileutil.h> + +LOG_SETUP(".testhelper"); + +namespace storage { + +namespace { + bool useNewStorageCore() { + if ( // Unit test directory + vespalib::fileExists("use_new_storage_core") || + // src/cpp directory + vespalib::fileExists("../use_new_storage_core") || + // Top build directory where storage-HEAD remains + vespalib::fileExists("../../../../../use_new_storage_core")) + { + std::cerr << "Using new storage core for unit tests\n"; + return true; + } + return false; + } + bool newStorageCore(useNewStorageCore()); +} + +void addStorageDistributionConfig(vdstestlib::DirConfig& dc) +{ + vdstestlib::DirConfig::Config* config; + config = &dc.getConfig("stor-distribution", true); + config->clear(); + config->set("group[1]"); + config->set("group[0].name", "invalid"); + config->set("group[0].index", "invalid"); + config->set("group[0].nodes[50]"); + + for (uint32_t i = 0; i < 50; i++) { + std::ostringstream key; key << "group[0].nodes[" << i << "].index"; + std::ostringstream val; val << i; + config->set(key.str(), val.str()); + } +} + +vdstestlib::DirConfig getStandardConfig(bool storagenode) { + vdstestlib::DirConfig dc; + vdstestlib::DirConfig::Config* config; + config = &dc.addConfig("upgrading"); + config = &dc.addConfig("load-type"); + config = &dc.addConfig("bucket"); + config = &dc.addConfig("messagebus"); + config = &dc.addConfig("stor-prioritymapping"); + config = &dc.addConfig("stor-bucketdbupdater"); + config = &dc.addConfig("stor-bucket-init"); + config = &dc.addConfig("metricsmanager"); + config->set("consumer[1]"); + config->set("consumer[0].name", "\"status\""); + config->set("consumer[0].addedmetrics[1]"); + config->set("consumer[0].addedmetrics[0]", "\"*\""); + config = &dc.addConfig("stor-communicationmanager"); + config->set("rpcport", "0"); + config->set("mbusport", "0"); + config = &dc.addConfig("stor-bucketdb"); + config->set("chunklevel", "0"); + config = &dc.addConfig("stor-distributormanager"); + config = &dc.addConfig("stor-opslogger"); + config = &dc.addConfig("stor-filestor"); + // Easier to see what goes wrong with only 1 thread per disk. + config->set("minimum_file_meta_slots", "2"); + config->set("minimum_file_header_block_size", "368"); + config->set("minimum_file_size", "4096"); + config->set("threads[1]"); + config->set("threads[0].lowestpri 255"); + config->set("dir_spread", "4"); + config->set("dir_levels", "0"); + config->set("use_new_core", newStorageCore ? "true" : "false"); + config->set("maximum_versions_of_single_document_stored", "0"); + //config->set("enable_slotfile_cache", "false"); + // Unit tests typically use fake low time values, so don't complain + // about them or compact/delete them by default. Override in tests testing that + // behavior + config->set("time_future_limit", "5"); + config->set("time_past_limit", "2000000000"); + config->set("keep_remove_time_period", "2000000000"); + config->set("revert_time_period", "2000000000"); + // Don't want test to call exit() + config->set("fail_disk_after_error_count", "0"); + config = &dc.addConfig("stor-memfilepersistence"); + // Easier to see what goes wrong with only 1 thread per disk. + config->set("minimum_file_meta_slots", "2"); + config->set("minimum_file_header_block_size", "368"); + config->set("minimum_file_size", "4096"); + config->set("dir_spread", "4"); + config->set("dir_levels", "0"); + config = &dc.addConfig("persistence"); + config->set("keep_remove_time_period", "2000000000"); + config->set("revert_time_period", "2000000000"); + config = &dc.addConfig("stor-bouncer"); + config = &dc.addConfig("stor-integritychecker"); + config = &dc.addConfig("stor-bucketmover"); + config = &dc.addConfig("stor-messageforwarder"); + config = &dc.addConfig("stor-server"); + config->set("enable_dead_lock_detector", "false"); + config->set("enable_dead_lock_detector_warnings", "false"); + config->set("max_merges_per_node", "25"); + config->set("max_merge_queue_size", "20"); + config->set("root_folder", + (storagenode ? "vdsroot" : "vdsroot.distributor")); + config->set("is_distributor", + (storagenode ? "false" : "true")); + config = &dc.addConfig("stor-devices"); + config->set("root_folder", + (storagenode ? "vdsroot" : "vdsroot.distributor")); + config = &dc.addConfig("stor-status"); + config->set("httpport", "0"); + config = &dc.addConfig("stor-visitor"); + config->set("defaultdocblocksize", "8192"); + // By default, need "old" behaviour of maxconcurrent + config->set("maxconcurrentvisitors_fixed", "4"); + config->set("maxconcurrentvisitors_variable", "0"); + config = &dc.addConfig("stor-visitordispatcher"); + addFileConfig(dc, "documenttypes", "config-doctypes.cfg"); + addStorageDistributionConfig(dc); + return dc; +} + +void addSlobrokConfig(vdstestlib::DirConfig& dc, + const mbus::Slobrok& slobrok) +{ + std::ostringstream ost; + ost << "tcp/localhost:" << slobrok.port(); + vdstestlib::DirConfig::Config* config; + config = &dc.getConfig("slobroks", true); + config->clear(); + config->set("slobrok[1]"); + config->set("slobrok[0].connectionspec", ost.str()); +} + +void addFileConfig(vdstestlib::DirConfig& dc, + const std::string& configDefName, + const std::string& fileName) +{ + vdstestlib::DirConfig::Config* config; + config = &dc.getConfig(configDefName, true); + config->clear(); + std::ifstream in(fileName.c_str()); + std::string line; + while (std::getline(in, line, '\n')) { + std::string::size_type pos = line.find(' '); + if (pos == std::string::npos) { + config->set(line); + } else { + config->set(line.substr(0, pos), line.substr(pos + 1)); + } + } + in.close(); +} + +TestName::TestName(const std::string& n) + : name(n) +{ + LOG(debug, "Starting test %s", name.c_str()); +} + +TestName::~TestName() { + LOG(debug, "Done with test %s", name.c_str()); +} + +} // storage diff --git a/storageserver/src/tests/testhelper.h b/storageserver/src/tests/testhelper.h new file mode 100644 index 00000000000..be2c3e7ec66 --- /dev/null +++ b/storageserver/src/tests/testhelper.h @@ -0,0 +1,58 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once +#include <vespa/vdstestlib/cppunit/dirconfig.h> +#include <vespa/vdstestlib/cppunit/macros.h> + + +#include <fstream> +#include <vespa/fastos/fastos.h> +#include <vespa/messagebus/testlib/slobrok.h> +#include <sstream> + +#define ASSERT_REPLY_COUNT(count, dummylink) \ + { \ + std::ostringstream msgost; \ + if ((dummylink).getNumReplies() != count) { \ + for (uint32_t ijx=0; ijx<(dummylink).getNumReplies(); ++ijx) { \ + msgost << (dummylink).getReply(ijx)->toString(true) << "\n"; \ + } \ + } \ + CPPUNIT_ASSERT_EQUAL_MSG(msgost.str(), size_t(count), \ + (dummylink).getNumReplies()); \ + } +#define ASSERT_COMMAND_COUNT(count, dummylink) \ + { \ + std::ostringstream msgost; \ + if ((dummylink).getNumCommands() != count) { \ + for (uint32_t ijx=0; ijx<(dummylink).getNumCommands(); ++ijx) { \ + msgost << (dummylink).getCommand(ijx)->toString(true) << "\n"; \ + } \ + } \ + CPPUNIT_ASSERT_EQUAL_MSG(msgost.str(), size_t(count), \ + (dummylink).getNumCommands()); \ + } + +namespace storage { + +void addFileConfig(vdstestlib::DirConfig& dc, + const std::string& configDefName, + const std::string& fileName); + + +void addStorageDistributionConfig(vdstestlib::DirConfig& dc); + +vdstestlib::DirConfig getStandardConfig(bool storagenode); + +void addSlobrokConfig(vdstestlib::DirConfig& dc, + const mbus::Slobrok& slobrok); + +// Class used to print start and end of test. Enable debug when you want to see +// which test creates what output or where we get stuck +struct TestName { + std::string name; + TestName(const std::string& n); + ~TestName(); +}; + +} // storage + diff --git a/storageserver/src/tests/testrunner.cpp b/storageserver/src/tests/testrunner.cpp new file mode 100644 index 00000000000..5d8dc8d4c1f --- /dev/null +++ b/storageserver/src/tests/testrunner.cpp @@ -0,0 +1,15 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/fastos/fastos.h> +#include <iostream> +#include <vespa/log/log.h> +#include <vespa/vdstestlib/cppunit/cppunittestrunner.h> + +LOG_SETUP("storagecppunittests"); + +int +main(int argc, char **argv) +{ + vdstestlib::CppUnitTestRunner testRunner; + return testRunner.run(argc, argv); +} diff --git a/storageserver/src/vespa/storageserver/app/.gitignore b/storageserver/src/vespa/storageserver/app/.gitignore new file mode 100644 index 00000000000..3a4be5d506a --- /dev/null +++ b/storageserver/src/vespa/storageserver/app/.gitignore @@ -0,0 +1,12 @@ +*.So +*.lo +.depend +.depend.NEW +.deps +.libs +Makefile +Makefile.inc +config_command.sh +project.dsw +storaged +storaged.core diff --git a/storageserver/src/vespa/storageserver/app/CMakeLists.txt b/storageserver/src/vespa/storageserver/app/CMakeLists.txt new file mode 100644 index 00000000000..e2918ae49d6 --- /dev/null +++ b/storageserver/src/vespa/storageserver/app/CMakeLists.txt @@ -0,0 +1,11 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_library(storageserver_storageapp STATIC + SOURCES + process.cpp + distributorprocess.cpp + servicelayerprocess.cpp + dummyservicelayerprocess.cpp + rpcservicelayerprocess.cpp + memfileservicelayerprocess.cpp + DEPENDS +) diff --git a/storageserver/src/vespa/storageserver/app/distributorprocess.cpp b/storageserver/src/vespa/storageserver/app/distributorprocess.cpp new file mode 100644 index 00000000000..80de17455d3 --- /dev/null +++ b/storageserver/src/vespa/storageserver/app/distributorprocess.cpp @@ -0,0 +1,84 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/fastos/fastos.h> +#include <vespa/storageserver/app/distributorprocess.h> + +#include <vespa/log/log.h> +#include <vespa/storage/storageutil/utils.h> + +LOG_SETUP(".process.distributor"); + +namespace storage { + +DistributorProcess::DistributorProcess(const config::ConfigUri & configUri) + : Process(configUri), + _activeFlag(DistributorNode::NO_NEED_FOR_ACTIVE_STATES) +{ +} + +void +DistributorProcess::shutdown() +{ + Process::shutdown(); + _node.reset(); +} + +void +DistributorProcess::setupConfig(uint64_t subscribeTimeout) +{ + std::unique_ptr<vespa::config::content::core::StorServerConfig> config = + config::ConfigGetter<vespa::config::content::core::StorServerConfig>::getConfig(_configUri.getConfigId(), _configUri.getContext(), subscribeTimeout); + if (config->persistenceProvider.type + != vespa::config::content::core::StorServerConfig::PersistenceProvider::STORAGE) + { + _activeFlag = DistributorNode::NEED_ACTIVE_BUCKET_STATES_SET; + } + _distributorConfigHandler + = _configSubscriber.subscribe<vespa::config::content::core::StorDistributormanagerConfig>( + _configUri.getConfigId(), subscribeTimeout); + _visitDispatcherConfigHandler + = _configSubscriber.subscribe<vespa::config::content::core::StorVisitordispatcherConfig>( + _configUri.getConfigId(), subscribeTimeout); + Process::setupConfig(subscribeTimeout); +} + +void +DistributorProcess::updateConfig() +{ + Process::updateConfig(); + if (_distributorConfigHandler->isChanged()) { + _node->handleConfigChange( + *_distributorConfigHandler->getConfig()); + } + if (_visitDispatcherConfigHandler->isChanged()) { + _node->handleConfigChange( + *_visitDispatcherConfigHandler->getConfig()); + } +} + +bool +DistributorProcess::configUpdated() +{ + bool changed = Process::configUpdated(); + if (_distributorConfigHandler->isChanged()) { + LOG(info, "Distributor manager config detected changed"); + changed = true; + } + if (_visitDispatcherConfigHandler->isChanged()) { + LOG(info, "Visitor dispatcher config detected changed"); + changed = true; + } + return changed; +} + +void +DistributorProcess::createNode() +{ + _node.reset(new DistributorNode(_configUri, _context, *this, _activeFlag)); + _node->handleConfigChange( + *_distributorConfigHandler->getConfig()); + _node->handleConfigChange( + *_visitDispatcherConfigHandler->getConfig()); +} + +} // storage diff --git a/storageserver/src/vespa/storageserver/app/distributorprocess.h b/storageserver/src/vespa/storageserver/app/distributorprocess.h new file mode 100644 index 00000000000..32472a68793 --- /dev/null +++ b/storageserver/src/vespa/storageserver/app/distributorprocess.h @@ -0,0 +1,42 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * \class storage::DistributorProcess + * + * \brief A process running a distributor. + */ + +#pragma once + +#include <vespa/storageserver/app/process.h> + +namespace storage { + +class DistributorProcess : public Process { + DistributorNodeContext _context; + DistributorNode::NeedActiveState _activeFlag; + DistributorNode::UP _node; + config::ConfigHandle<vespa::config::content::core::StorDistributormanagerConfig>::UP + _distributorConfigHandler; + config::ConfigHandle<vespa::config::content::core::StorVisitordispatcherConfig>::UP + _visitDispatcherConfigHandler; + +public: + DistributorProcess(const config::ConfigUri & configUri); + ~DistributorProcess() { shutdown(); } + + virtual void shutdown(); + + virtual void setupConfig(uint64_t subscribeTimeout); + virtual void createNode(); + virtual bool configUpdated(); + virtual void updateConfig(); + + virtual StorageNode& getNode() { return *_node; } + virtual StorageNodeContext& getContext() { return _context; } + virtual DistributorNodeContext& getDistributorContext() { return _context; } + + virtual std::string getComponentName() const { return "distributor"; } +}; + +} // storage + diff --git a/storageserver/src/vespa/storageserver/app/dummyservicelayerprocess.cpp b/storageserver/src/vespa/storageserver/app/dummyservicelayerprocess.cpp new file mode 100644 index 00000000000..c57d7667248 --- /dev/null +++ b/storageserver/src/vespa/storageserver/app/dummyservicelayerprocess.cpp @@ -0,0 +1,32 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/fastos/fastos.h> +#include <vespa/storageserver/app/dummyservicelayerprocess.h> + +#include <vespa/log/log.h> + +LOG_SETUP(".process.servicelayer"); + +namespace storage { + +// DummyServiceLayerProcess implementation + +DummyServiceLayerProcess::DummyServiceLayerProcess(const config::ConfigUri & configUri) + : ServiceLayerProcess(configUri) +{ +} + +void +DummyServiceLayerProcess::shutdown() +{ + ServiceLayerProcess::shutdown(); + _provider.reset(0); +} + +void +DummyServiceLayerProcess::setupProvider() +{ + _provider.reset(new spi::dummy::DummyPersistence(getTypeRepo())); +} + +} // storage diff --git a/storageserver/src/vespa/storageserver/app/dummyservicelayerprocess.h b/storageserver/src/vespa/storageserver/app/dummyservicelayerprocess.h new file mode 100644 index 00000000000..ab119bfba5e --- /dev/null +++ b/storageserver/src/vespa/storageserver/app/dummyservicelayerprocess.h @@ -0,0 +1,27 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * \class storage::DummyServiceLayerProcess + * + * \brief A process running a service layer with dummy persistence provider. + */ +#pragma once + +#include <vespa/persistence/dummyimpl/dummypersistence.h> +#include <vespa/storageserver/app/servicelayerprocess.h> + +namespace storage { + +class DummyServiceLayerProcess : public ServiceLayerProcess { + spi::dummy::DummyPersistence::UP _provider; + +public: + DummyServiceLayerProcess(const config::ConfigUri & configUri); + ~DummyServiceLayerProcess() { shutdown(); } + + virtual void shutdown(); + virtual void setupProvider(); + virtual spi::PersistenceProvider& getProvider() { return *_provider; } +}; + +} // storage + diff --git a/storageserver/src/vespa/storageserver/app/memfileservicelayerprocess.cpp b/storageserver/src/vespa/storageserver/app/memfileservicelayerprocess.cpp new file mode 100644 index 00000000000..c85a3af8a69 --- /dev/null +++ b/storageserver/src/vespa/storageserver/app/memfileservicelayerprocess.cpp @@ -0,0 +1,109 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/fastos/fastos.h> +#include <vespa/storageserver/app/memfileservicelayerprocess.h> + +#include <vespa/log/log.h> + +LOG_SETUP(".process.servicelayer"); + +namespace storage { + +// MemFileServiceLayerProcess implementation + +MemFileServiceLayerProcess::MemFileServiceLayerProcess( + const config::ConfigUri & configUri) + : ServiceLayerProcess(configUri), + _changed(false) +{ +} + +void +MemFileServiceLayerProcess::shutdown() +{ + ServiceLayerProcess::shutdown(); + _provider.reset(0); +} + +void +MemFileServiceLayerProcess::setupConfig(uint64_t subscribeTimeout) +{ + ServiceLayerProcess::setupConfig(subscribeTimeout); + _configFetcher.reset(new config::ConfigFetcher(_configUri.getContext())); + _configFetcher->subscribe<vespa::config::storage::StorDevicesConfig>(_configUri.getConfigId(), this, subscribeTimeout); + _configFetcher->subscribe<vespa::config::storage::StorMemfilepersistenceConfig>(_configUri.getConfigId(), this, subscribeTimeout); + _configFetcher->subscribe<vespa::config::content::PersistenceConfig>(_configUri.getConfigId(), this, subscribeTimeout); + _configFetcher->start(); +} + +void +MemFileServiceLayerProcess::removeConfigSubscriptions() +{ + _configFetcher.reset(0); +} + +void +MemFileServiceLayerProcess::setupProvider() +{ + _provider.reset(new memfile::MemFilePersistenceProvider( + _context.getComponentRegister(), _configUri)); + _provider->setDocumentRepo(*getTypeRepo()); +} + +bool +MemFileServiceLayerProcess::configUpdated() +{ + if (ServiceLayerProcess::configUpdated()) return true; + vespalib::LockGuard guard(_lock); + return _changed; +} + +void +MemFileServiceLayerProcess::updateConfig() +{ + ServiceLayerProcess::updateConfig(); + LOG(info, "Config updated. Sending new config to memfile provider"); + vespalib::LockGuard guard(_lock); + if (_changed) { + LOG(debug, "Memfile or device config changed too."); + if (_nextMemfilepersistence) { + _provider->setConfig(std::move(_nextMemfilepersistence)); + } + if (_nextPersistence) { + _provider->setConfig(std::move(_nextPersistence)); + } + if (_nextDevices) { + _provider->setConfig(std::move(_nextDevices)); + } + } + _provider->setDocumentRepo(*getTypeRepo()); + _changed = false; +} + +void +MemFileServiceLayerProcess::configure( + std::unique_ptr<vespa::config::storage::StorMemfilepersistenceConfig> config) +{ + vespalib::LockGuard guard(_lock); + _nextMemfilepersistence = std::move(config); + _changed = true; +} + +void +MemFileServiceLayerProcess::configure( + std::unique_ptr<vespa::config::content::PersistenceConfig> config) +{ + vespalib::LockGuard guard(_lock); + _nextPersistence = std::move(config); + _changed = true; +} + +void +MemFileServiceLayerProcess::configure(std::unique_ptr<vespa::config::storage::StorDevicesConfig> config) +{ + vespalib::LockGuard guard(_lock); + _nextDevices = std::move(config); + _changed = true; +} + +} // storage diff --git a/storageserver/src/vespa/storageserver/app/memfileservicelayerprocess.h b/storageserver/src/vespa/storageserver/app/memfileservicelayerprocess.h new file mode 100644 index 00000000000..2e365f3f1a2 --- /dev/null +++ b/storageserver/src/vespa/storageserver/app/memfileservicelayerprocess.h @@ -0,0 +1,59 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * \class storage::ServiceLayerProcess + * + * \brief A process running a service layer. + */ +/** + * \class storage::MemFileServiceLayerProcess + * + * \brief A process running a service layer with memfile persistence provider. + */ +/** + * \class storage::RpcServiceLayerProcess + * + * \brief A process running a service layer with RPC persistence provider. + */ +#pragma once + +#include <vespa/memfilepersistence/spi/memfilepersistenceprovider.h> +#include <vespa/storageserver/app/servicelayerprocess.h> +#include <vespa/vespalib/util/sync.h> + +namespace storage { + +class MemFileServiceLayerProcess + : public ServiceLayerProcess, + public config::IFetcherCallback<vespa::config::storage::StorMemfilepersistenceConfig>, + public config::IFetcherCallback<vespa::config::storage::StorDevicesConfig>, + public config::IFetcherCallback<vespa::config::content::PersistenceConfig> +{ + bool _changed; + std::unique_ptr<config::ConfigFetcher> _configFetcher; + std::unique_ptr<vespa::config::storage::StorMemfilepersistenceConfig> _nextMemfilepersistence; + std::unique_ptr<vespa::config::storage::StorDevicesConfig> _nextDevices; + std::unique_ptr<vespa::config::content::PersistenceConfig> _nextPersistence; + memfile::MemFilePersistenceProvider::UP _provider; + vespalib::Lock _lock; + +public: + MemFileServiceLayerProcess(const config::ConfigUri & configUri); + ~MemFileServiceLayerProcess() { shutdown(); } + + virtual void shutdown(); + + void setupConfig(uint64_t subscribeTimeout); + virtual void removeConfigSubscriptions(); + virtual void setupProvider(); + virtual bool configUpdated(); + virtual void updateConfig(); + + virtual spi::PersistenceProvider& getProvider() { return *_provider; } + + void configure(std::unique_ptr<vespa::config::storage::StorMemfilepersistenceConfig> config); + void configure(std::unique_ptr<vespa::config::storage::StorDevicesConfig> config); + void configure(std::unique_ptr<vespa::config::content::PersistenceConfig> config); +}; + +} // storage + diff --git a/storageserver/src/vespa/storageserver/app/process.cpp b/storageserver/src/vespa/storageserver/app/process.cpp new file mode 100644 index 00000000000..fc537f06590 --- /dev/null +++ b/storageserver/src/vespa/storageserver/app/process.cpp @@ -0,0 +1,66 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/fastos/fastos.h> +#include <vespa/storageserver/app/process.h> + +//#include <vespa/config/helper/legacy.h> +#include <vespa/log/log.h> + +LOG_SETUP(".process"); + +namespace storage { + +Process::Process(const config::ConfigUri & configUri) + : _configUri(configUri), + _configSubscriber(_configUri.getContext()) +{ +} + +void +Process::setupConfig(uint64_t subscribeTimeout) +{ + _documentHandler = _configSubscriber.subscribe<document::DocumenttypesConfig>(_configUri.getConfigId(), subscribeTimeout); + if (!_configSubscriber.nextConfig()) { + throw vespalib::TimeoutException( + "Could not subscribe to document config within timeout"); + } + _repos.push_back(document::DocumentTypeRepo::SP( + new document::DocumentTypeRepo(*_documentHandler->getConfig()))); + getContext().getComponentRegister().setDocumentTypeRepo(_repos.back()); +} + +bool +Process::configUpdated() +{ + _configSubscriber.nextGeneration(0); + if (_documentHandler->isChanged()) { + LOG(info, "Document config detected changed"); + return true; + } + return false; +} + +void +Process::updateConfig() +{ + if (_documentHandler->isChanged()) { + _repos.push_back(document::DocumentTypeRepo::SP( + new document::DocumentTypeRepo( + *_documentHandler->getConfig()))); + getNode().setNewDocumentRepo(_repos.back()); + } +} + +void +Process::shutdown() +{ + removeConfigSubscriptions(); +} + +int64_t +Process::getGeneration() const +{ + return _configSubscriber.getGeneration(); +} + +} // storage diff --git a/storageserver/src/vespa/storageserver/app/process.h b/storageserver/src/vespa/storageserver/app/process.h new file mode 100644 index 00000000000..85e30e73687 --- /dev/null +++ b/storageserver/src/vespa/storageserver/app/process.h @@ -0,0 +1,56 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * \class storage::Process + * + * \brief Storage process as a library. + * + * A class with a main function cannot be tested within C++ code. This class + * contains the process as a library such that it can be tested and used in + * other pieces of code. + * + * Specializations of this class will exist to add the funcionality needed for + * the various process types. + */ + +#pragma once + +#include <vespa/document/datatype/documenttype.h> +#include <vespa/storage/storageserver/applicationgenerationfetcher.h> +#include <vespa/storage/storageserver/distributornode.h> +#include <vespa/storage/storageserver/servicelayernode.h> +#include <vespa/config/config.h> + +namespace storage { + +class Process : public ApplicationGenerationFetcher { +protected: + config::ConfigUri _configUri; + document::DocumentTypeRepo::SP getTypeRepo() { return _repos.back(); } + config::ConfigSubscriber _configSubscriber; + +private: + config::ConfigHandle<document::DocumenttypesConfig>::UP _documentHandler; + std::vector<document::DocumentTypeRepo::SP> _repos; + +public: + typedef std::unique_ptr<Process> UP; + + Process(const config::ConfigUri & configUri); + virtual ~Process() {} + + virtual void setupConfig(uint64_t subscribeTimeout); + virtual void createNode() = 0; + virtual bool configUpdated(); + virtual void updateConfig(); + + virtual void shutdown(); + virtual void removeConfigSubscriptions() {} + + virtual StorageNode& getNode() = 0; + virtual StorageNodeContext& getContext() = 0; + + virtual int64_t getGeneration() const; +}; + +} // storage + diff --git a/storageserver/src/vespa/storageserver/app/rpcservicelayerprocess.cpp b/storageserver/src/vespa/storageserver/app/rpcservicelayerprocess.cpp new file mode 100644 index 00000000000..cca173ba04d --- /dev/null +++ b/storageserver/src/vespa/storageserver/app/rpcservicelayerprocess.cpp @@ -0,0 +1,44 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/fastos/fastos.h> +#include <vespa/storageserver/app/rpcservicelayerprocess.h> + +#include <vespa/log/log.h> + +LOG_SETUP(".process.servicelayer"); + +namespace storage { + +// RpcServiceLayerProcess implementation + +RpcServiceLayerProcess::RpcServiceLayerProcess(const config::ConfigUri & configUri) + : ServiceLayerProcess(configUri) +{ +} + +void +RpcServiceLayerProcess::shutdown() +{ + ServiceLayerProcess::shutdown(); + _provider.reset(0); +} + +void +RpcServiceLayerProcess::setupProvider() +{ + std::unique_ptr<vespa::config::content::core::StorServerConfig> serverConfig = + config::ConfigGetter<vespa::config::content::core::StorServerConfig>::getConfig(_configUri.getConfigId(), _configUri.getContext()); + + _provider.reset(new spi::ProviderProxy( + serverConfig->persistenceProvider.rpc.connectspec, *getTypeRepo())); +} + +void +RpcServiceLayerProcess::updateConfig() +{ + ServiceLayerProcess::updateConfig(); + LOG(info, "Config updated. Sending new config to RPC proxy provider"); + _provider->setRepo(*getTypeRepo()); +} + +} // storage diff --git a/storageserver/src/vespa/storageserver/app/rpcservicelayerprocess.h b/storageserver/src/vespa/storageserver/app/rpcservicelayerprocess.h new file mode 100644 index 00000000000..70f843ae597 --- /dev/null +++ b/storageserver/src/vespa/storageserver/app/rpcservicelayerprocess.h @@ -0,0 +1,28 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * \class storage::RpcServiceLayerProcess + * + * \brief A process running a service layer with RPC persistence provider. + */ +#pragma once + +#include <vespa/persistence/proxy/providerproxy.h> +#include <vespa/storageserver/app/servicelayerprocess.h> + +namespace storage { + +class RpcServiceLayerProcess : public ServiceLayerProcess { + spi::ProviderProxy::UP _provider; + +public: + RpcServiceLayerProcess(const config::ConfigUri & configUri); + ~RpcServiceLayerProcess() { shutdown(); } + + virtual void shutdown(); + virtual void setupProvider(); + virtual void updateConfig(); + virtual spi::PersistenceProvider& getProvider() { return *_provider; } +}; + +} // storage + diff --git a/storageserver/src/vespa/storageserver/app/servicelayerprocess.cpp b/storageserver/src/vespa/storageserver/app/servicelayerprocess.cpp new file mode 100644 index 00000000000..52f47531c78 --- /dev/null +++ b/storageserver/src/vespa/storageserver/app/servicelayerprocess.cpp @@ -0,0 +1,38 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/fastos/fastos.h> +#include <vespa/storageserver/app/servicelayerprocess.h> + +#include <vespa/log/log.h> +#include <vespa/searchvisitor/searchvisitor.h> +#include <vespa/storage/storageutil/utils.h> + +LOG_SETUP(".process.servicelayer"); + +namespace storage { + +// ServiceLayerProcess implementation + +ServiceLayerProcess::ServiceLayerProcess(const config::ConfigUri & configUri) + : Process(configUri) +{ +} + +void +ServiceLayerProcess::shutdown() +{ + Process::shutdown(); + _node.reset(0); +} + +void +ServiceLayerProcess::createNode() +{ + _externalVisitors["searchvisitor"].reset(new SearchVisitorFactory(_configUri)); + setupProvider(); + _node.reset(new ServiceLayerNode( + _configUri, _context, *this, getProvider(), _externalVisitors)); + _node->init(); +} + +} // storage diff --git a/storageserver/src/vespa/storageserver/app/servicelayerprocess.h b/storageserver/src/vespa/storageserver/app/servicelayerprocess.h new file mode 100644 index 00000000000..d1ac919a674 --- /dev/null +++ b/storageserver/src/vespa/storageserver/app/servicelayerprocess.h @@ -0,0 +1,50 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * \class storage::ServiceLayerProcess + * + * \brief A process running a service layer. + */ +/** + * \class storage::MemFileServiceLayerProcess + * + * \brief A process running a service layer with memfile persistence provider. + */ +/** + * \class storage::RpcServiceLayerProcess + * + * \brief A process running a service layer with RPC persistence provider. + */ +#pragma once + +#include <vespa/storageserver/app/process.h> +#include <vespa/config/config.h> +#include <vespa/config/helper/configfetcher.h> +#include <vespa/config-persistence.h> + +namespace storage { + +class ServiceLayerProcess : public Process { + VisitorFactory::Map _externalVisitors; + ServiceLayerNode::UP _node; + +protected: + ServiceLayerNodeContext _context; + +public: + ServiceLayerProcess(const config::ConfigUri & configUri); + + virtual void shutdown(); + + virtual void setupProvider() = 0; + virtual spi::PersistenceProvider& getProvider() = 0; + + virtual void createNode(); + + virtual StorageNode& getNode() { return *_node; } + virtual StorageNodeContext& getContext() { return _context; } + + virtual std::string getComponentName() const { return "servicelayer"; } +}; + +} // storage + |