summaryrefslogtreecommitdiffstats
path: root/storageserver
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@yahoo-inc.com>2016-06-15 23:09:44 +0200
committerJon Bratseth <bratseth@yahoo-inc.com>2016-06-15 23:09:44 +0200
commit72231250ed81e10d66bfe70701e64fa5fe50f712 (patch)
tree2728bba1131a6f6e5bdf95afec7d7ff9358dac50 /storageserver
Publish
Diffstat (limited to 'storageserver')
-rw-r--r--storageserver/.gitignore2
-rw-r--r--storageserver/CMakeLists.txt18
-rw-r--r--storageserver/OWNERS2
-rw-r--r--storageserver/src/.gitignore10
-rw-r--r--storageserver/src/apps/storaged/.gitignore3
-rw-r--r--storageserver/src/apps/storaged/CMakeLists.txt10
-rw-r--r--storageserver/src/apps/storaged/forcelink.cpp30
-rw-r--r--storageserver/src/apps/storaged/forcelink.h19
-rw-r--r--storageserver/src/apps/storaged/storage.cpp220
-rw-r--r--storageserver/src/tests/.gitignore6
-rw-r--r--storageserver/src/tests/CMakeLists.txt12
-rw-r--r--storageserver/src/tests/config-doctypes.cfg158
-rw-r--r--storageserver/src/tests/dummystoragelink.cpp182
-rw-r--r--storageserver/src/tests/dummystoragelink.h115
-rw-r--r--storageserver/src/tests/storageservertest.cpp1245
-rw-r--r--storageserver/src/tests/testhelper.cpp170
-rw-r--r--storageserver/src/tests/testhelper.h58
-rw-r--r--storageserver/src/tests/testrunner.cpp15
-rw-r--r--storageserver/src/vespa/storageserver/app/.gitignore12
-rw-r--r--storageserver/src/vespa/storageserver/app/CMakeLists.txt11
-rw-r--r--storageserver/src/vespa/storageserver/app/distributorprocess.cpp84
-rw-r--r--storageserver/src/vespa/storageserver/app/distributorprocess.h42
-rw-r--r--storageserver/src/vespa/storageserver/app/dummyservicelayerprocess.cpp32
-rw-r--r--storageserver/src/vespa/storageserver/app/dummyservicelayerprocess.h27
-rw-r--r--storageserver/src/vespa/storageserver/app/memfileservicelayerprocess.cpp109
-rw-r--r--storageserver/src/vespa/storageserver/app/memfileservicelayerprocess.h59
-rw-r--r--storageserver/src/vespa/storageserver/app/process.cpp66
-rw-r--r--storageserver/src/vespa/storageserver/app/process.h56
-rw-r--r--storageserver/src/vespa/storageserver/app/rpcservicelayerprocess.cpp44
-rw-r--r--storageserver/src/vespa/storageserver/app/rpcservicelayerprocess.h28
-rw-r--r--storageserver/src/vespa/storageserver/app/servicelayerprocess.cpp38
-rw-r--r--storageserver/src/vespa/storageserver/app/servicelayerprocess.h50
32 files changed, 2933 insertions, 0 deletions
diff --git a/storageserver/.gitignore b/storageserver/.gitignore
new file mode 100644
index 00000000000..a9b20e8992d
--- /dev/null
+++ b/storageserver/.gitignore
@@ -0,0 +1,2 @@
+Makefile
+Testing
diff --git a/storageserver/CMakeLists.txt b/storageserver/CMakeLists.txt
new file mode 100644
index 00000000000..8f537a61793
--- /dev/null
+++ b/storageserver/CMakeLists.txt
@@ -0,0 +1,18 @@
+# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_define_module(
+ DEPENDS
+ fastos
+ storage
+ streamingvisitors_searchvisitor
+ memfilepersistence
+
+ APPS
+ src/apps/storaged
+ src/vespa/storageserver/app
+
+ TEST_DEPENDS
+ messagebus_messagebus-test
+
+ TESTS
+ src/tests
+)
diff --git a/storageserver/OWNERS b/storageserver/OWNERS
new file mode 100644
index 00000000000..97c35339850
--- /dev/null
+++ b/storageserver/OWNERS
@@ -0,0 +1,2 @@
+vekterli
+dybdahl
diff --git a/storageserver/src/.gitignore b/storageserver/src/.gitignore
new file mode 100644
index 00000000000..9669be96e4b
--- /dev/null
+++ b/storageserver/src/.gitignore
@@ -0,0 +1,10 @@
+*.So
+*.lo
+.depend
+.depend.NEW
+.deps
+.libs
+Makefile.ini
+config_command.sh
+project.dsw
+/storageserver.mak
diff --git a/storageserver/src/apps/storaged/.gitignore b/storageserver/src/apps/storaged/.gitignore
new file mode 100644
index 00000000000..67fbcdcf4db
--- /dev/null
+++ b/storageserver/src/apps/storaged/.gitignore
@@ -0,0 +1,3 @@
+/Makefile
+/storaged
+storaged-bin
diff --git a/storageserver/src/apps/storaged/CMakeLists.txt b/storageserver/src/apps/storaged/CMakeLists.txt
new file mode 100644
index 00000000000..629b357d574
--- /dev/null
+++ b/storageserver/src/apps/storaged/CMakeLists.txt
@@ -0,0 +1,10 @@
+# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_executable(storageserver_storaged_app
+ SOURCES
+ storage.cpp
+ forcelink.cpp
+ OUTPUT_NAME storaged-bin
+ INSTALL sbin
+ DEPENDS
+ storageserver_storageapp
+)
diff --git a/storageserver/src/apps/storaged/forcelink.cpp b/storageserver/src/apps/storaged/forcelink.cpp
new file mode 100644
index 00000000000..2c628644e82
--- /dev/null
+++ b/storageserver/src/apps/storaged/forcelink.cpp
@@ -0,0 +1,30 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/fastos/fastos.h>
+#include <vespa/document/base/forcelink.h>
+#include <vespa/documentapi/documentapi.h>
+#include <vespa/searchlib/aggregation/forcelink.hpp>
+#include <vespa/searchlib/expression/forcelink.hpp>
+
+/* Here is code that initializes a lot of stuff to force it to be linked */
+namespace search {
+ struct ForceLink {
+ ForceLink() {
+ if (time(NULL) == 7) {
+ // grouping stuff
+ forcelink_searchlib_aggregation();
+ forcelink_searchlib_expression();
+ }
+ }
+ };
+}
+
+namespace storage {
+
+void serverForceLink()
+{
+ document::ForceLink documentForce;
+ search::ForceLink searchForce;
+}
+
+} // namespace storage
diff --git a/storageserver/src/apps/storaged/forcelink.h b/storageserver/src/apps/storaged/forcelink.h
new file mode 100644
index 00000000000..c3fa56c1b3c
--- /dev/null
+++ b/storageserver/src/apps/storaged/forcelink.h
@@ -0,0 +1,19 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+/**
+ * \file forcelink.h
+ *
+ * \brief Utility to link in objects we need in binary.
+ */
+
+#pragma once
+
+#include <vespa/documentapi/documentapi.h>
+#include <vespa/fastos/fastos.h>
+#include <vespa/config-rank-profiles.h>
+
+namespace storage {
+
+ extern void serverForceLink();
+
+}
+
diff --git a/storageserver/src/apps/storaged/storage.cpp b/storageserver/src/apps/storaged/storage.cpp
new file mode 100644
index 00000000000..8eb955ae930
--- /dev/null
+++ b/storageserver/src/apps/storaged/storage.cpp
@@ -0,0 +1,220 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+/**
+ * \class storage::StorageApp
+ * \ingroup serverapp
+ *
+ * \brief The storage daemon application.
+ *
+ * This code is NOT unit tested and should be as minimal as possible.
+ *
+ * It should handle process signals and have the main method for the
+ * application, but as little as possible else.
+ */
+
+#include <vespa/fastos/fastos.h>
+#include <vespa/log/log.h>
+#include <signal.h>
+#include <vespa/persistence/spi/exceptions.h>
+#include <vespa/storage/storageutil/utils.h>
+#include <vespa/storageserver/app/distributorprocess.h>
+#include "forcelink.h"
+#include <vespa/storageserver/app/memfileservicelayerprocess.h>
+#include <vespa/storageserver/app/dummyservicelayerprocess.h>
+#include <vespa/storageserver/app/rpcservicelayerprocess.h>
+#include <vespa/vespalib/util/programoptions.h>
+#include <vespa/vespalib/util/shutdownguard.h>
+#include <vespa/config/config.h>
+
+LOG_SETUP("vds.application");
+
+namespace storage {
+
+namespace {
+
+Process::UP createProcess(vespalib::stringref configId) {
+ // FIXME: Rewrite parameter to config uri and pass when all subsequent configs are converted.
+ config::ConfigUri uri(configId);
+ std::unique_ptr<vespa::config::content::core::StorServerConfig> serverConfig = config::ConfigGetter<vespa::config::content::core::StorServerConfig>::getConfig(uri.getConfigId(), uri.getContext());
+ if (serverConfig->isDistributor) {
+ return Process::UP(new DistributorProcess(configId));
+ } else switch (serverConfig->persistenceProvider.type) {
+ case vespa::config::content::core::StorServerConfig::PersistenceProvider::STORAGE:
+ return Process::UP(new MemFileServiceLayerProcess(configId));
+ case vespa::config::content::core::StorServerConfig::PersistenceProvider::DUMMY:
+ return Process::UP(new DummyServiceLayerProcess(configId));
+ case vespa::config::content::core::StorServerConfig::PersistenceProvider::RPC:
+ return Process::UP(new RpcServiceLayerProcess(configId));
+ default:
+ throw vespalib::IllegalStateException(
+ "Unknown persistence provider.", VESPA_STRLOC);
+ }
+}
+
+} // End of anonymous namespace
+
+class StorageApp : public FastOS_Application,
+ private vespalib::ProgramOptions
+{
+ std::string _configId;
+ bool _showSyntax;
+ uint32_t _maxShutdownTime;
+ int _lastSignal;
+ vespalib::Monitor _signalLock;
+ Process::UP _process;
+
+public:
+ StorageApp();
+
+ void handleSignal(int signal) {
+ LOG(info, "Got signal %d, waiting for lock", signal);
+ vespalib::MonitorGuard sync(_signalLock);
+
+ LOG(info, "Got lock for signal %d", signal);
+ _lastSignal = signal;
+ sync.signal();
+ }
+ void handleSignals();
+
+private:
+ bool Init();
+ int Main();
+ bool gotSignal() { return _lastSignal != 0; }
+};
+
+StorageApp::StorageApp()
+ : _showSyntax(false), _maxShutdownTime(120000), _lastSignal(0), _signalLock()
+{
+ setSyntaxMessage(
+ "This is the main daemon used to start the storage nodes. The same "
+ "actual binary is used for both storage and distributor nodes, but "
+ "it is duplicated when installing, such that one can hotfix a "
+ "distributor bug without restarting storage nodes.");
+ addOption("c config-id", _configId,
+ "The config identifier this storage node should use to request "
+ "config. This identifier specifies whether the binary will behave "
+ "as a storage or distributor, what cluster it belongs to, and the "
+ "index it has in the cluster.");
+ addOption("h help", _showSyntax, false, "Show this syntax help page.");
+ addOption("t maxshutdowntime", _maxShutdownTime, uint32_t(120000),
+ "Maximum amount of milliseconds we allow proper shutdown to run before "
+ "abruptly killing the process.");
+}
+
+bool StorageApp::Init()
+{
+ FastOS_Application::Init();
+ setCommandLineArguments(
+ FastOS_Application::_argc, FastOS_Application::_argv);
+ try{
+ parse();
+ } catch (vespalib::InvalidCommandLineArgumentsException& e) {
+ std::cerr << e.getMessage() << "\n\n";
+ writeSyntaxPage(std::cerr);
+ exit(EXIT_FAILURE);
+ }
+ if (_showSyntax) {
+ writeSyntaxPage(std::cerr);
+ exit(0);
+ }
+ return true;
+}
+
+namespace {
+ storage::StorageApp *sigtramp = 0;
+ uint32_t _G_signalCount = 0;
+
+ void killHandler(int sig) {
+ if (_G_signalCount == 0) {
+ _G_signalCount++;
+ if (sigtramp == 0) _exit(EXIT_FAILURE);
+ // note: this is not totally safe, sigtramp is not protected by a lock
+ sigtramp->handleSignal(sig);
+ } else {
+ fprintf(stderr, "Received another shutdown signal %u while "
+ "shutdown in progress (count=%u)",
+ sig, _G_signalCount);
+ }
+ }
+
+ void setupKillHandler() {
+ struct sigaction usr_action;
+ sigset_t block_mask;
+
+ /* Establish the signal handler. */
+ sigfillset (&block_mask);
+ usr_action.sa_handler = killHandler;
+ usr_action.sa_mask = block_mask;
+ usr_action.sa_flags = 0;
+ sigaction (SIGTERM, &usr_action, NULL);
+ sigaction (SIGINT, &usr_action, NULL);
+ }
+}
+
+void StorageApp::handleSignals()
+{
+ if (gotSignal()) {
+ int signal = _lastSignal;
+ LOG(debug, "starting controlled shutdown of storage "
+ "(received signal %d)", signal);
+ _process->getNode().requestShutdown("controlled shutdown");
+ }
+}
+
+int StorageApp::Main()
+{
+ try{
+ _process = createProcess(_configId);
+ _process->setupConfig(600000);
+ _process->createNode();
+ } catch (const spi::HandledException & e) {
+ LOG(warning, "Died due to known cause: %s", e.what());
+ return 1;
+ } catch (const vespalib::NetworkSetupFailureException & e) {
+ LOG(warning, "Network failure: '%s'", e.what());
+ return 1;
+ } catch (const vespalib::IllegalStateException & e) {
+ LOG(error, "Unknown IllegalStateException: '%s'", e.what());
+ return 1;
+ } catch (const vespalib::Exception & e) {
+ LOG(error, "Caught exception when starting: %s", e.what());
+ return 1;
+ }
+
+ // Not setting up kill handlers before storage is up. Before that
+ // we can just die quickly with default handlers.
+ LOG(debug, "Node created. Setting up kill handler.");
+ setupKillHandler();
+
+ // main loop - wait for termination signal
+ while (!_process->getNode().attemptedStopped()) {
+ if (_process->configUpdated()) {
+ LOG(debug, "Config updated. Progagating config updates");
+ ResumeGuard guard(_process->getNode().pause());
+ _process->updateConfig();
+ }
+ // Wait until we get a kill signal.
+ vespalib::MonitorGuard lock(_signalLock);
+ lock.wait(1000);
+ handleSignals();
+ }
+ LOG(debug, "Server was attempted stopped, shutting down");
+ // Create guard that will forcifully kill storage if destruction takes longer
+ // time than given timeout.
+ vespalib::ShutdownGuard shutdownGuard(_maxShutdownTime);
+ LOG(debug, "Attempting proper shutdown");
+ _process.reset(0);
+ LOG(debug, "Completed controlled shutdown.");
+ return 0;
+}
+
+} // storage
+
+int main(int argc, char **argv)
+{
+ storage::StorageApp app;
+ storage::sigtramp = &app;
+ int retval = app.Entry(argc,argv);
+ storage::sigtramp = NULL;
+ LOG(debug, "Exiting");
+ return retval;
+}
diff --git a/storageserver/src/tests/.gitignore b/storageserver/src/tests/.gitignore
new file mode 100644
index 00000000000..101f84131dc
--- /dev/null
+++ b/storageserver/src/tests/.gitignore
@@ -0,0 +1,6 @@
+/Makefile
+/dirconfig.tmp
+/test.vlog
+/testrunner
+/vdsroot
+storageserver_testrunner_app
diff --git a/storageserver/src/tests/CMakeLists.txt b/storageserver/src/tests/CMakeLists.txt
new file mode 100644
index 00000000000..b735716c705
--- /dev/null
+++ b/storageserver/src/tests/CMakeLists.txt
@@ -0,0 +1,12 @@
+# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_executable(storageserver_testrunner_app
+ SOURCES
+ storageservertest.cpp
+ testhelper.cpp
+ dummystoragelink.cpp
+ testrunner.cpp
+ DEPENDS
+ storageserver_storageapp
+ vdstestlib
+)
+vespa_add_test(NAME storageserver_testrunner_app COMMAND storageserver_testrunner_app)
diff --git a/storageserver/src/tests/config-doctypes.cfg b/storageserver/src/tests/config-doctypes.cfg
new file mode 100644
index 00000000000..f41593ebfc3
--- /dev/null
+++ b/storageserver/src/tests/config-doctypes.cfg
@@ -0,0 +1,158 @@
+enablecompression false
+documenttype[3]
+documenttype[0].id -519202262
+documenttype[0].name "text/plain"
+documenttype[0].version 0
+documenttype[0].headerstruct 160469461
+documenttype[0].bodystruct 749465898
+documenttype[0].inherits[0]
+documenttype[0].datatype[2]
+documenttype[0].datatype[0].id 160469461
+documenttype[0].datatype[0].type STRUCT
+documenttype[0].datatype[0].array.element.id 0
+documenttype[0].datatype[0].map.key.id 0
+documenttype[0].datatype[0].map.value.id 0
+documenttype[0].datatype[0].wset.key.id 0
+documenttype[0].datatype[0].wset.createifnonexistent false
+documenttype[0].datatype[0].wset.removeifzero false
+documenttype[0].datatype[0].annotationref.annotation.id 0
+documenttype[0].datatype[0].sstruct.name "text/plain.header"
+documenttype[0].datatype[0].sstruct.version 0
+documenttype[0].datatype[0].sstruct.compression.type NONE
+documenttype[0].datatype[0].sstruct.compression.level 0
+documenttype[0].datatype[0].sstruct.compression.threshold 90
+documenttype[0].datatype[0].sstruct.compression.minsize 0
+documenttype[0].datatype[0].sstruct.field[3]
+documenttype[0].datatype[0].sstruct.field[0].name "author"
+documenttype[0].datatype[0].sstruct.field[0].id 644499292
+documenttype[0].datatype[0].sstruct.field[0].id_v6 177126295
+documenttype[0].datatype[0].sstruct.field[0].datatype 2
+documenttype[0].datatype[0].sstruct.field[1].name "date"
+documenttype[0].datatype[0].sstruct.field[1].id 491786523
+documenttype[0].datatype[0].sstruct.field[1].id_v6 916979460
+documenttype[0].datatype[0].sstruct.field[1].datatype 0
+documenttype[0].datatype[0].sstruct.field[2].name "subject"
+documenttype[0].datatype[0].sstruct.field[2].id 1797950813
+documenttype[0].datatype[0].sstruct.field[2].id_v6 943449689
+documenttype[0].datatype[0].sstruct.field[2].datatype 2
+documenttype[0].datatype[1].id 749465898
+documenttype[0].datatype[1].type STRUCT
+documenttype[0].datatype[1].array.element.id 0
+documenttype[0].datatype[1].map.key.id 0
+documenttype[0].datatype[1].map.value.id 0
+documenttype[0].datatype[1].wset.key.id 0
+documenttype[0].datatype[1].wset.createifnonexistent false
+documenttype[0].datatype[1].wset.removeifzero false
+documenttype[0].datatype[1].annotationref.annotation.id 0
+documenttype[0].datatype[1].sstruct.name "text/plain.body"
+documenttype[0].datatype[1].sstruct.version 0
+documenttype[0].datatype[1].sstruct.compression.type NONE
+documenttype[0].datatype[1].sstruct.compression.level 0
+documenttype[0].datatype[1].sstruct.compression.threshold 90
+documenttype[0].datatype[1].sstruct.compression.minsize 0
+documenttype[0].datatype[1].sstruct.field[1]
+documenttype[0].datatype[1].sstruct.field[0].name "content"
+documenttype[0].datatype[1].sstruct.field[0].id 1721764358
+documenttype[0].datatype[1].sstruct.field[0].id_v6 1751481844
+documenttype[0].datatype[1].sstruct.field[0].datatype 3
+documenttype[0].annotationtype[0]
+documenttype[1].id -653677105
+documenttype[1].name "text/html"
+documenttype[1].version 0
+documenttype[1].headerstruct 143329936
+documenttype[1].bodystruct 1473469605
+documenttype[1].inherits[0]
+documenttype[1].datatype[2]
+documenttype[1].datatype[0].id 143329936
+documenttype[1].datatype[0].type STRUCT
+documenttype[1].datatype[0].array.element.id 0
+documenttype[1].datatype[0].map.key.id 0
+documenttype[1].datatype[0].map.value.id 0
+documenttype[1].datatype[0].wset.key.id 0
+documenttype[1].datatype[0].wset.createifnonexistent false
+documenttype[1].datatype[0].wset.removeifzero false
+documenttype[1].datatype[0].annotationref.annotation.id 0
+documenttype[1].datatype[0].sstruct.name "text/html.header"
+documenttype[1].datatype[0].sstruct.version 0
+documenttype[1].datatype[0].sstruct.compression.type NONE
+documenttype[1].datatype[0].sstruct.compression.level 0
+documenttype[1].datatype[0].sstruct.compression.threshold 90
+documenttype[1].datatype[0].sstruct.compression.minsize 0
+documenttype[1].datatype[0].sstruct.field[3]
+documenttype[1].datatype[0].sstruct.field[0].name "author"
+documenttype[1].datatype[0].sstruct.field[0].id 644499292
+documenttype[1].datatype[0].sstruct.field[0].id_v6 177126295
+documenttype[1].datatype[0].sstruct.field[0].datatype 2
+documenttype[1].datatype[0].sstruct.field[1].name "date"
+documenttype[1].datatype[0].sstruct.field[1].id 491786523
+documenttype[1].datatype[0].sstruct.field[1].id_v6 916979460
+documenttype[1].datatype[0].sstruct.field[1].datatype 0
+documenttype[1].datatype[0].sstruct.field[2].name "subject"
+documenttype[1].datatype[0].sstruct.field[2].id 1797950813
+documenttype[1].datatype[0].sstruct.field[2].id_v6 943449689
+documenttype[1].datatype[0].sstruct.field[2].datatype 2
+documenttype[1].datatype[1].id 1473469605
+documenttype[1].datatype[1].type STRUCT
+documenttype[1].datatype[1].array.element.id 0
+documenttype[1].datatype[1].map.key.id 0
+documenttype[1].datatype[1].map.value.id 0
+documenttype[1].datatype[1].wset.key.id 0
+documenttype[1].datatype[1].wset.createifnonexistent false
+documenttype[1].datatype[1].wset.removeifzero false
+documenttype[1].datatype[1].annotationref.annotation.id 0
+documenttype[1].datatype[1].sstruct.name "text/html.body"
+documenttype[1].datatype[1].sstruct.version 0
+documenttype[1].datatype[1].sstruct.compression.type NONE
+documenttype[1].datatype[1].sstruct.compression.level 0
+documenttype[1].datatype[1].sstruct.compression.threshold 90
+documenttype[1].datatype[1].sstruct.compression.minsize 0
+documenttype[1].datatype[1].sstruct.field[1]
+documenttype[1].datatype[1].sstruct.field[0].name "content"
+documenttype[1].datatype[1].sstruct.field[0].id 1721764358
+documenttype[1].datatype[1].sstruct.field[0].id_v6 1751481844
+documenttype[1].datatype[1].sstruct.field[0].datatype 3
+documenttype[1].annotationtype[0]
+documenttype[2].id 238423572
+documenttype[2].name "testdoctype1"
+documenttype[2].version 1
+documenttype[2].headerstruct -226322995
+documenttype[2].bodystruct -1016297758
+documenttype[2].inherits[0]
+documenttype[2].datatype[2]
+documenttype[2].datatype[0].id -226322995
+documenttype[2].datatype[0].type STRUCT
+documenttype[2].datatype[0].array.element.id 0
+documenttype[2].datatype[0].map.key.id 0
+documenttype[2].datatype[0].map.value.id 0
+documenttype[2].datatype[0].wset.key.id 0
+documenttype[2].datatype[0].wset.createifnonexistent false
+documenttype[2].datatype[0].wset.removeifzero false
+documenttype[2].datatype[0].annotationref.annotation.id 0
+documenttype[2].datatype[0].sstruct.name "testdoctype1.header"
+documenttype[2].datatype[0].sstruct.version 1
+documenttype[2].datatype[0].sstruct.compression.type NONE
+documenttype[2].datatype[0].sstruct.compression.level 0
+documenttype[2].datatype[0].sstruct.compression.threshold 90
+documenttype[2].datatype[0].sstruct.compression.minsize 0
+documenttype[2].datatype[0].sstruct.field[0]
+documenttype[2].datatype[1].id -1016297758
+documenttype[2].datatype[1].type STRUCT
+documenttype[2].datatype[1].array.element.id 0
+documenttype[2].datatype[1].map.key.id 0
+documenttype[2].datatype[1].map.value.id 0
+documenttype[2].datatype[1].wset.key.id 0
+documenttype[2].datatype[1].wset.createifnonexistent false
+documenttype[2].datatype[1].wset.removeifzero false
+documenttype[2].datatype[1].annotationref.annotation.id 0
+documenttype[2].datatype[1].sstruct.name "testdoctype1.body"
+documenttype[2].datatype[1].sstruct.version 1
+documenttype[2].datatype[1].sstruct.compression.type NONE
+documenttype[2].datatype[1].sstruct.compression.level 0
+documenttype[2].datatype[1].sstruct.compression.threshold 90
+documenttype[2].datatype[1].sstruct.compression.minsize 0
+documenttype[2].datatype[1].sstruct.field[1]
+documenttype[2].datatype[1].sstruct.field[0].name "content"
+documenttype[2].datatype[1].sstruct.field[0].id 5
+documenttype[2].datatype[1].sstruct.field[0].id_v6 5
+documenttype[2].datatype[1].sstruct.field[0].datatype 2
+documenttype[2].annotationtype[0]
diff --git a/storageserver/src/tests/dummystoragelink.cpp b/storageserver/src/tests/dummystoragelink.cpp
new file mode 100644
index 00000000000..30953a1fe7c
--- /dev/null
+++ b/storageserver/src/tests/dummystoragelink.cpp
@@ -0,0 +1,182 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/fastos/fastos.h>
+#include <vespa/storageframework/defaultimplementation/clock/realclock.h>
+#include <tests/dummystoragelink.h>
+#include <sys/time.h>
+
+namespace storage {
+
+DummyStorageLink* DummyStorageLink::_last(0);
+
+DummyStorageLink::DummyStorageLink()
+ : StorageLink("Dummy storage link"),
+ _commands(),
+ _replies(),
+ _injected(),
+ _autoReply(false),
+ _useDispatch(false),
+ _ignore(false),
+ _waitMonitor()
+{
+ _last = this;
+}
+
+DummyStorageLink::~DummyStorageLink()
+{
+ // Often a chain with dummy link on top is deleted in unit tests.
+ // If they haven't been closed already, close them for a cleaner
+ // shutdown
+ if (getState() == OPENED) {
+ close();
+ flush();
+ }
+ closeNextLink();
+ reset();
+}
+
+bool DummyStorageLink::onDown(const api::StorageMessage::SP& cmd)
+{
+ if (_ignore) {
+ return false;
+ }
+ if (_injected.size() > 0) {
+ vespalib::LockGuard guard(_lock);
+ sendUp(*_injected.begin());
+ _injected.pop_front();
+ } else if (_autoReply) {
+ if (!cmd->getType().isReply()) {
+ std::shared_ptr<api::StorageReply> reply(
+ std::dynamic_pointer_cast<api::StorageCommand>(cmd)
+ ->makeReply().release());
+ reply->setResult(api::ReturnCode(
+ api::ReturnCode::OK, "Automatically generated reply"));
+ sendUp(reply);
+ }
+ }
+ if (isBottom()) {
+ vespalib::MonitorGuard lock(_waitMonitor);
+ {
+ vespalib::LockGuard guard(_lock);
+ _commands.push_back(cmd);
+ }
+ lock.broadcast();
+ return true;
+ }
+ return StorageLink::onDown(cmd);
+}
+
+bool DummyStorageLink::onUp(const api::StorageMessage::SP& reply) {
+ if (isTop()) {
+ vespalib::MonitorGuard lock(_waitMonitor);
+ {
+ vespalib::LockGuard guard(_lock);
+ _replies.push_back(reply);
+ }
+ lock.broadcast();
+ return true;
+ }
+ return StorageLink::onUp(reply);
+
+}
+
+void DummyStorageLink::injectReply(api::StorageReply* reply)
+{
+ assert(reply);
+ vespalib::LockGuard guard(_lock);
+ _injected.push_back(std::shared_ptr<api::StorageReply>(reply));
+}
+
+void DummyStorageLink::reset() {
+ vespalib::MonitorGuard lock(_waitMonitor);
+ vespalib::LockGuard guard(_lock);
+ _commands.clear();
+ _replies.clear();
+ _injected.clear();
+}
+
+void DummyStorageLink::waitForMessages(unsigned int msgCount, int timeout)
+{
+ framework::defaultimplementation::RealClock clock;
+ framework::MilliSecTime endTime(
+ clock.getTimeInMillis() + framework::MilliSecTime(timeout * 1000));
+ vespalib::MonitorGuard lock(_waitMonitor);
+ while (_commands.size() + _replies.size() < msgCount) {
+ if (timeout != 0 && clock.getTimeInMillis() > endTime) {
+ std::ostringstream ost;
+ ost << "Timed out waiting for " << msgCount << " messages to "
+ << "arrive in dummy storage link. Only "
+ << (_commands.size() + _replies.size()) << " messages seen "
+ << "after timout of " << timeout << " seconds was reached.";
+ throw vespalib::IllegalStateException(ost.str(), VESPA_STRLOC);
+ }
+ if (timeout >= 0) {
+ lock.wait((endTime - clock.getTimeInMillis()).getTime());
+ } else {
+ lock.wait();
+ }
+ }
+}
+
+void DummyStorageLink::waitForMessage(const api::MessageType& type, int timeout)
+{
+ framework::defaultimplementation::RealClock clock;
+ framework::MilliSecTime endTime(
+ clock.getTimeInMillis() + framework::MilliSecTime(timeout * 1000));
+ vespalib::MonitorGuard lock(_waitMonitor);
+ while (true) {
+ for (uint32_t i=0; i<_commands.size(); ++i) {
+ if (_commands[i]->getType() == type) return;
+ }
+ for (uint32_t i=0; i<_replies.size(); ++i) {
+ if (_replies[i]->getType() == type) return;
+ }
+ if (timeout != 0 && clock.getTimeInMillis() > endTime) {
+ std::ostringstream ost;
+ ost << "Timed out waiting for " << type << " message to "
+ << "arrive in dummy storage link. Only "
+ << (_commands.size() + _replies.size()) << " messages seen "
+ << "after timout of " << timeout << " seconds was reached.";
+ if (_commands.size() == 1) {
+ ost << " Found command of type " << _commands[0]->getType();
+ }
+ if (_replies.size() == 1) {
+ ost << " Found command of type " << _replies[0]->getType();
+ }
+ throw vespalib::IllegalStateException(ost.str(), VESPA_STRLOC);
+ }
+ if (timeout >= 0) {
+ lock.wait((endTime - clock.getTimeInMillis()).getTime());
+ } else {
+ lock.wait();
+ }
+ }
+}
+
+api::StorageMessage::SP
+DummyStorageLink::getAndRemoveMessage(const api::MessageType& type)
+{
+ vespalib::MonitorGuard lock(_waitMonitor);
+ for (std::vector<api::StorageMessage::SP>::iterator it = _commands.begin();
+ it != _commands.end(); ++it)
+ {
+ if ((*it)->getType() == type) {
+ api::StorageMessage::SP result(*it);
+ _commands.erase(it);
+ return result;
+ }
+ }
+ for (std::vector<api::StorageMessage::SP>::iterator it = _replies.begin();
+ it != _replies.end(); ++it)
+ {
+ if ((*it)->getType() == type) {
+ api::StorageMessage::SP result(*it);
+ _replies.erase(it);
+ return result;
+ }
+ }
+ std::ostringstream ost;
+ ost << "No message of type " << type << " found.";
+ throw vespalib::IllegalStateException(ost.str(), VESPA_STRLOC);
+}
+
+} // storage
diff --git a/storageserver/src/tests/dummystoragelink.h b/storageserver/src/tests/dummystoragelink.h
new file mode 100644
index 00000000000..cb9df8c5642
--- /dev/null
+++ b/storageserver/src/tests/dummystoragelink.h
@@ -0,0 +1,115 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/vespalib/util/sync.h>
+#include <list>
+#include <sstream>
+#include <vespa/storageapi/messageapi/storagecommand.h>
+#include <string>
+#include <vector>
+#include <vespa/storage/common/storagelink.h>
+#include <vespa/storage/common/bucketmessages.h>
+#include <vespa/storageapi/message/internal.h>
+
+class FastOS_ThreadPool;
+
+namespace storage {
+
+class DummyStorageLink : public StorageLink {
+
+ mutable vespalib::Lock _lock; // to protect below containers:
+ std::vector<api::StorageMessage::SP> _commands;
+ std::vector<api::StorageMessage::SP> _replies;
+ std::list<api::StorageMessage::SP> _injected;
+
+ bool _autoReply;
+ bool _useDispatch;
+ bool _ignore;
+ static DummyStorageLink* _last;
+ vespalib::Monitor _waitMonitor;
+
+public:
+ DummyStorageLink();
+ ~DummyStorageLink();
+
+ bool onDown(const api::StorageMessage::SP&);
+ bool onUp(const api::StorageMessage::SP&);
+
+ void addOnTopOfChain(StorageLink& link) {
+ link.addTestLinkOnTop(this);
+ }
+
+ void print(std::ostream& ost, bool verbose, const std::string& indent) const
+ {
+ (void) verbose;
+ ost << indent << "DummyStorageLink("
+ << "autoreply = " << (_autoReply ? "on" : "off")
+ << ", dispatch = " << (_useDispatch ? "on" : "off")
+ << ", " << _commands.size() << " commands"
+ << ", " << _replies.size() << " replies";
+ if (_injected.size() > 0)
+ ost << ", " << _injected.size() << " injected";
+ ost << ")";
+ }
+
+ void injectReply(api::StorageReply* reply);
+ void reset();
+ void setAutoreply(bool autoReply) { _autoReply = autoReply; }
+ void setIgnore(bool ignore) { _ignore = ignore; }
+ // Timeout is given in seconds
+ void waitForMessages(unsigned int msgCount = 1, int timeout = -1);
+ // Wait for a single message of a given type
+ void waitForMessage(const api::MessageType&, int timeout = -1);
+
+ api::StorageMessage::SP getCommand(size_t i) const {
+ vespalib::LockGuard guard(_lock);
+ api::StorageMessage::SP ret = _commands[i];
+ return ret;
+ }
+ api::StorageMessage::SP getReply(size_t i) const {
+ vespalib::LockGuard guard(_lock);
+ api::StorageMessage::SP ret = _replies[i];
+ return ret;
+ }
+ size_t getNumCommands() const {
+ vespalib::LockGuard guard(_lock);
+ return _commands.size();
+ }
+ size_t getNumReplies() const {
+ vespalib::LockGuard guard(_lock);
+ return _replies.size();
+ }
+
+ const std::vector<api::StorageMessage::SP>& getCommands() const
+ { return _commands; }
+ const std::vector<api::StorageMessage::SP>& getReplies() const
+ { return _replies; }
+
+ std::vector<api::StorageMessage::SP> getCommandsOnce() {
+ vespalib::MonitorGuard lock(_waitMonitor);
+ std::vector<api::StorageMessage::SP> retval;
+ {
+ vespalib::LockGuard guard(_lock);
+ retval.swap(_commands);
+ }
+ return retval;
+ }
+
+ std::vector<api::StorageMessage::SP> getRepliesOnce() {
+ vespalib::MonitorGuard lock(_waitMonitor);
+ std::vector<api::StorageMessage::SP> retval;
+ {
+ vespalib::LockGuard guard(_lock);
+ retval.swap(_replies);
+ }
+ return retval;
+ }
+
+ api::StorageMessage::SP getAndRemoveMessage(const api::MessageType&);
+
+ static DummyStorageLink* getLast() { return _last; }
+};
+
+}
+
diff --git a/storageserver/src/tests/storageservertest.cpp b/storageserver/src/tests/storageservertest.cpp
new file mode 100644
index 00000000000..69c55248eb7
--- /dev/null
+++ b/storageserver/src/tests/storageservertest.cpp
@@ -0,0 +1,1245 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/fastos/fastos.h>
+#include <vespa/storage/storageserver/servicelayernode.h>
+#include <vespa/storage/storageserver/distributornode.h>
+
+#include <vespa/document/base/testdocman.h>
+#include <vespa/document/config/config-documenttypes.h>
+#include <vespa/documentapi/documentapi.h>
+#include <vespa/messagebus/rpcmessagebus.h>
+#include <fstream>
+#include <vespa/log/log.h>
+#include <vespa/memfilepersistence/spi/memfilepersistenceprovider.h>
+#include <vespa/messagebus/staticthrottlepolicy.h>
+#include <vespa/messagebus/testlib/slobrok.h>
+#include <vespa/storageapi/mbusprot/storagecommand.h>
+#include <vespa/storageapi/mbusprot/storagereply.h>
+#include <vespa/storageapi/message/bucketsplitting.h>
+#include <vespa/storageapi/message/persistence.h>
+#include <vespa/storageapi/message/state.h>
+#include <vespa/storage/common/nodestateupdater.h>
+#include <vespa/storage/common/statusmetricconsumer.h>
+#include <vespa/memfilepersistence/memfile/memfilecache.h>
+#include <tests/testhelper.h>
+#include <vespa/vdstestlib/cppunit/macros.h>
+#include <tests/dummystoragelink.h>
+
+#include <vespa/storageserver/app/distributorprocess.h>
+#include <vespa/storageserver/app/memfileservicelayerprocess.h>
+
+LOG_SETUP(".storageservertest");
+
+namespace storage {
+
+namespace {
+ uint64_t getTimeInMillis() {
+ struct timeval t;
+ gettimeofday(&t, 0);
+ return (t.tv_sec * uint64_t(1000)) + (t.tv_usec / uint64_t(1000));
+ }
+
+ class SlobrokMirror {
+ config::ConfigUri config;
+ FRT_Supervisor visor;
+ std::unique_ptr<slobrok::api::MirrorAPI> mirror;
+
+ public:
+ SlobrokMirror(const config::ConfigUri & cfg) : config(cfg) {}
+
+ void init(uint32_t timeoutms) {
+ uint64_t timeout = getTimeInMillis() + timeoutms;
+ visor.Start();
+ mirror.reset(new slobrok::api::MirrorAPI(visor, config));
+ while (!mirror->ready()) {
+ if (getTimeInMillis() > timeout)
+ throw vespalib::IllegalStateException(
+ "Failed to initialize slobrok mirror within "
+ "timeout.", VESPA_STRLOC);
+ FastOS_Thread::Sleep(1);
+ }
+ }
+
+ slobrok::api::MirrorAPI& getMirror() {
+ if (mirror.get() == 0) throw vespalib::IllegalStateException(
+ "You need to call init() before you can fetch mirror");
+ return *mirror;
+ }
+ FRT_Supervisor& getSupervisor() {
+ if (mirror.get() == 0) throw vespalib::IllegalStateException(
+ "You need to call init() before you can fetch supervisor");
+ return visor;
+ }
+
+ ~SlobrokMirror() {
+ if (mirror.get() != 0) {
+ mirror.reset(0);
+ visor.ShutDown(true);
+ }
+ }
+ };
+}
+
+struct StorageServerTest : public CppUnit::TestFixture {
+ std::unique_ptr<FastOS_ThreadPool> threadPool;
+ std::unique_ptr<document::TestDocMan> docMan;
+ std::unique_ptr<mbus::Slobrok> slobrok;
+ std::unique_ptr<vdstestlib::DirConfig> distConfig;
+ std::unique_ptr<vdstestlib::DirConfig> storConfig;
+ std::unique_ptr<SlobrokMirror> slobrokMirror;
+
+ void setUp();
+ void tearDown();
+
+ void testNormalUsage();
+ void testPortOverlap_Stress();
+ void testFailOnNoDisks();
+ void testFailOnWrongAmountOfDisks();
+ void testOneDiskUnusable();
+ void testShutdownDuringDiskLoad(bool storagenode);
+ void testShutdownStorageDuringDiskLoad();
+ void testShutdownDistributorDuringDiskLoad();
+ void testShutdownAfterDiskFailure_Stress();
+ void testSplitJoinSplitThroughDistributor_Stress();
+ void testPriorityAndQueueSneakingWhileSplitJoinStressTest();
+ void testStatusPages();
+
+ CPPUNIT_TEST_SUITE(StorageServerTest);
+ CPPUNIT_TEST(testNormalUsage);
+ CPPUNIT_TEST_IGNORED(testPortOverlap_Stress);
+ CPPUNIT_TEST(testFailOnNoDisks);
+ CPPUNIT_TEST(testFailOnWrongAmountOfDisks);
+ CPPUNIT_TEST(testOneDiskUnusable);
+ CPPUNIT_TEST_IGNORED(testShutdownStorageDuringDiskLoad);
+ CPPUNIT_TEST_IGNORED(testShutdownDistributorDuringDiskLoad);
+ CPPUNIT_TEST_IGNORED(testShutdownAfterDiskFailure_Stress);
+
+ // Disabled test for new core... TODO
+ CPPUNIT_TEST_DISABLED(testSplitJoinSplitThroughDistributor_Stress);
+
+ CPPUNIT_TEST_DISABLED(testPriorityAndQueueSneakingWhileSplitJoinStressTest);
+
+ // Doesn't work in new framework. Will investigate as soon as there's time
+ CPPUNIT_TEST_DISABLED(testStatusPages);
+ CPPUNIT_TEST_SUITE_END();
+};
+
+CPPUNIT_TEST_SUITE_REGISTRATION(StorageServerTest);
+
+namespace {
+
+ template<typename T>
+ struct ConfigReader : public config::IFetcherCallback<T>,
+ public T
+ {
+ ConfigReader(const std::string& configId) {
+ config::LegacySubscriber subscription;
+ subscription.subscribe<T>(configId, this);
+ }
+ void configure(std::unique_ptr<document::DocumenttypesConfig> c)
+ {
+ static_cast<T&>(*this) = *c;
+ }
+ };
+
+ struct Node {
+ virtual ~Node() {}
+ virtual StorageNode& getNode() = 0;
+ virtual StorageNodeContext& getContext() = 0;
+
+ bool attemptedStopped()
+ { return getNode().attemptedStopped(); }
+ void waitUntilInitialized(uint32_t timeout)
+ { getNode().waitUntilInitialized(timeout); }
+ StorageLink* getChain() { return getNode().getChain(); }
+ void requestShutdown(const std::string& reason)
+ { getNode().requestShutdown(reason); }
+ const framework::StatusReporter* getStatusReporter(const std::string& i)
+ { return getContext().getComponentRegister().getStatusReporter(i); }
+ NodeStateUpdater& getStateUpdater()
+ { return getContext().getComponentRegister().getNodeStateUpdater(); }
+ };
+
+ struct Distributor : public Node {
+ DistributorProcess _process;
+
+ Distributor(vdstestlib::DirConfig& config)
+ : _process(config.getConfigId())
+ {
+ _process.setupConfig(60000);
+ _process.createNode();
+ }
+
+ virtual StorageNode& getNode() { return _process.getNode(); }
+ virtual StorageNodeContext& getContext()
+ { return _process.getContext(); }
+ distributor::BucketDatabase& getBucketDatabase()
+ { return _process.getDistributorContext().getComponentRegister().getBucketDatabase(); }
+ };
+
+ struct Storage : public Node {
+ MemFileServiceLayerProcess _process;
+ StorageComponent::UP _component;
+
+ Storage(vdstestlib::DirConfig& config) : _process(config.getConfigId())
+ {
+ _process.setupConfig(60000);
+ _process.createNode();
+ _component.reset(new StorageComponent(
+ getContext().getComponentRegister(), "test"));
+ }
+
+ virtual StorageNode& getNode() { return _process.getNode(); }
+ virtual StorageNodeContext& getContext()
+ { return _process.getContext(); }
+ spi::PartitionStateList getPartitions()
+ { return _process.getProvider().getPartitionStates().getList(); }
+ uint16_t getDiskCount() { return getPartitions().size(); }
+ StorageComponent& getComponent() { return *_component; }
+ };
+}
+
+void
+StorageServerTest::setUp()
+{
+ threadPool.reset(new FastOS_ThreadPool(128 * 1024));
+ docMan.reset(new document::TestDocMan);
+ system("chmod -R 755 vdsroot");
+ system("rm -rf vdsroot*");
+ slobrok.reset(new mbus::Slobrok);
+ distConfig.reset(new vdstestlib::DirConfig(getStandardConfig(false)));
+ storConfig.reset(new vdstestlib::DirConfig(getStandardConfig(true)));
+ addSlobrokConfig(*distConfig, *slobrok);
+ addSlobrokConfig(*storConfig, *slobrok);
+ storConfig->getConfig("stor-filestor")
+ .set("fail_disk_after_error_count", "1");
+ system("mkdir -p vdsroot/disks/d0");
+ system("mkdir -p vdsroot.distributor");
+ slobrokMirror.reset(new SlobrokMirror(slobrok->config()));
+}
+
+void
+StorageServerTest::tearDown()
+{
+ slobrokMirror.reset(NULL);
+ storConfig.reset(NULL);
+ distConfig.reset(NULL);
+ slobrok.reset(NULL);
+ docMan.reset(NULL);
+ threadPool.reset(NULL);
+}
+
+void
+StorageServerTest::testNormalUsage()
+{
+ {
+ Distributor distServer(*distConfig);
+ Storage storServer(*storConfig);
+ }
+}
+
+void
+StorageServerTest::testFailOnNoDisks()
+{
+ system("rmdir vdsroot/disks/d0");
+ try{
+ Storage server(*storConfig);
+ CPPUNIT_FAIL("Expected exception about no available disks.");
+ } catch (vespalib::Exception& e) {
+ CPPUNIT_ASSERT_CONTAIN_MESSAGE(e.what(),
+ "No disks configured",
+ e.getMessage());
+ }
+}
+
+void
+StorageServerTest::testFailOnWrongAmountOfDisks()
+{
+/* TODO: Can't be in stor-server config anymore.
+ storConfig->getConfig("stor-server").set("disk_count", "2");
+ try{
+ StorageServer server(storConfig->getConfigId());
+ CPPUNIT_FAIL("Expected exception about wrong amount of disks.");
+ } catch (vespalib::Exception& e) {
+ CPPUNIT_ASSERT_CONTAIN_MESSAGE(e.what(),
+ "Found 1 disks and config says we're supposed to have 2",
+ e.getMessage());
+ }
+*/
+}
+
+void
+StorageServerTest::testOneDiskUnusable()
+{
+ CPPUNIT_ASSERT(system("rm -rf vdsroot/disks/d0") == 0);
+ CPPUNIT_ASSERT(system("mkdir -p vdsroot/disks/d1") == 0);
+ //CPPUNIT_ASSERT(system("ln -s /thisdoesnotexist vdsroot/disks/d0") == 0);
+ Storage server(*storConfig);
+ CPPUNIT_ASSERT_EQUAL(2, (int)server.getDiskCount());
+ CPPUNIT_ASSERT(!server.getPartitions()[0].isUp());
+ CPPUNIT_ASSERT_CONTAIN(
+ std::string("Disk not found during scanning"),
+ server.getPartitions()[0].getReason());
+ CPPUNIT_ASSERT(server.getPartitions()[1].isUp());
+}
+
+namespace {
+ struct LoadGiver : public document::Runnable,
+ public mbus::IReplyHandler
+ {
+ const vdstestlib::DirConfig& _config;
+ const document::DocumentTypeRepo::SP _repo;
+ documentapi::LoadTypeSet _loadTypes;
+ std::unique_ptr<mbus::RPCMessageBus> _mbus;
+ mbus::SourceSession::UP _sourceSession;
+ uint32_t _maxPending;
+ uint32_t _currentPending;
+ uint32_t _processedOk;
+ uint32_t _unexpectedErrors;
+ bool _startedShutdown;
+
+ LoadGiver(const vdstestlib::DirConfig& config,
+ const document::DocumentTypeRepo::SP repo)
+ : _config(config), _repo(repo), _mbus(), _sourceSession(),
+ _maxPending(20), _currentPending(0), _processedOk(0),
+ _unexpectedErrors(0), _startedShutdown(false) {}
+ virtual ~LoadGiver() {
+ if (_sourceSession.get() != 0) {
+ _sourceSession->close();
+ }
+ }
+
+ void init() {
+ documentapi::DocumentProtocol::SP protocol(
+ new documentapi::DocumentProtocol(_loadTypes, _repo));
+ storage::mbusprot::StorageProtocol::SP storageProtocol(
+ new storage::mbusprot::StorageProtocol(_repo, _loadTypes));
+ mbus::ProtocolSet protocols;
+ protocols.add(protocol);
+ protocols.add(storageProtocol);
+ mbus::RPCNetworkParams networkParams;
+ networkParams.setSlobrokConfig(config::ConfigUri(_config.getConfigId()));
+ _mbus.reset(new mbus::RPCMessageBus(protocols, networkParams,
+ _config.getConfigId()));
+ mbus::SourceSessionParams sourceParams;
+ sourceParams.setTimeout(5000);
+ mbus::StaticThrottlePolicy::SP policy(new mbus::StaticThrottlePolicy());
+ policy->setMaxPendingCount(_maxPending);
+ sourceParams.setThrottlePolicy(policy);
+ _sourceSession = _mbus->getMessageBus().createSourceSession(
+ *this, sourceParams);
+ }
+
+ virtual void notifyStartingShutdown() {
+ _startedShutdown = true;
+ }
+
+ virtual void handleReply(mbus::Reply::UP reply) {
+ using documentapi::DocumentProtocol;
+ --_currentPending;
+ if (!reply->hasErrors()) {
+ ++_processedOk;
+ } else if (!_startedShutdown && reply->getNumErrors() > 1) {
+ ++_unexpectedErrors;
+ std::cerr << reply->getNumErrors() << " errors. First: "
+ << reply->getError(0).getCode() << " - "
+ << reply->getError(0).getMessage() << "\n";
+ } else {
+ int code = reply->getError(0).getCode();
+ std::string errorMsg = reply->getError(0).getMessage();
+
+ if (code == mbus::ErrorCode::UNKNOWN_SESSION
+ || code == mbus::ErrorCode::NO_ADDRESS_FOR_SERVICE
+ || code == mbus::ErrorCode::CONNECTION_ERROR
+ || code == mbus::ErrorCode::HANDSHAKE_FAILED)
+ {
+ // Ignore
+ } else if ((code >= mbus::ErrorCode::TRANSIENT_ERROR
+ && code < mbus::ErrorCode::FATAL_ERROR)
+ && (errorMsg.find("UNKNOWN_SESSION") != std::string::npos
+ || errorMsg.find("NO_ADDRESS_FOR_SERVICE")
+ != std::string::npos
+ || errorMsg.find("when node is in state Stopping")
+ != std::string::npos
+ || errorMsg.find("HANDSHAKE_FAILED")
+ != std::string::npos
+ || code == mbus::ErrorCode::APP_TRANSIENT_ERROR
+ || code == DocumentProtocol::ERROR_IO_FAILURE
+ || code == DocumentProtocol::ERROR_ABORTED
+ || code == DocumentProtocol::ERROR_BUCKET_NOT_FOUND))
+ {
+ // Ignore
+ } else {
+ ++_unexpectedErrors;
+ std::cerr << reply->getNumErrors() << " errors. First: "
+ << reply->getError(0).getCode() << " - "
+ << reply->getError(0).getMessage();
+ mbus::Message::UP msg(reply->getMessage());
+ if (msg->getType() == DocumentProtocol::MESSAGE_PUTDOCUMENT)
+ {
+ documentapi::PutDocumentMessage& putMsg(
+ static_cast<documentapi::PutDocumentMessage&>(
+ *msg));
+ std::cerr << " - " << putMsg.getDocument()->getId();
+ }
+ std::cerr << "\n";
+ }
+ }
+ }
+
+ void waitUntilDecentLoad(uint32_t maxWait = 60000) {
+ uint64_t maxTime = getTimeInMillis() + maxWait;
+ while (true) {
+ if (_processedOk > 5 && _currentPending > _maxPending / 2) {
+ break;
+ }
+ uint64_t time = getTimeInMillis();
+ if (time > maxTime) {
+ if (_processedOk < 5) {
+ throw vespalib::IllegalStateException(
+ "Failed to process 5 ok operations within timeout.",
+ VESPA_STRLOC);
+ }
+ if (_currentPending < _maxPending / 2) {
+ throw vespalib::IllegalStateException(
+ "Failed to get enough max pending.",
+ VESPA_STRLOC);
+ }
+ break;
+ }
+ FastOS_Thread::Sleep(1);
+ }
+ LOG(info, "Currently, we have received %u ok replies and have %u "
+ "pending ones.",
+ _processedOk, _currentPending);
+ }
+ };
+
+ struct SimpleLoadGiver : public LoadGiver {
+ const document::TestDocMan& _testDocMan;
+ vespalib::Monitor _threadMonitor;
+
+ SimpleLoadGiver(const vdstestlib::DirConfig& config,
+ const document::TestDocMan& tdm)
+ : LoadGiver(config, tdm.getTypeRepoSP()), _testDocMan(tdm),
+ _threadMonitor() {}
+ ~SimpleLoadGiver() {
+ stop();
+ join();
+ }
+ virtual bool onStop() {
+ vespalib::MonitorGuard monitor(_threadMonitor);
+ monitor.signal();
+ return true;
+ }
+ void run() {
+ uint32_t seed = 0;
+ uint32_t maxDocSize = 65536;
+ init();
+ vespalib::MonitorGuard monitor(_threadMonitor);
+ while (running()) {
+ uint32_t attemptCount = 0;
+ while (_currentPending < _maxPending
+ && ++attemptCount < _maxPending)
+ {
+ document::Document::SP doc(
+ _testDocMan.createRandomDocument(
+ ++seed, maxDocSize));
+ mbus::Message::UP msg(
+ new documentapi::PutDocumentMessage(doc));
+ msg->setRetryEnabled(false);
+ mbus::Result r = _sourceSession->send(std::move(msg),
+ "storage/cluster.storage/distributor/0/default",
+ true);
+ if (r.isAccepted()){
+ ++_currentPending;
+ } else {
+ if (!_startedShutdown) {
+ std::cerr << "Source session did not accept "
+ "message.\n";
+ }
+ break;
+ }
+ }
+ monitor.wait(1);
+ }
+ }
+ };
+
+ void setSystemState(SlobrokMirror& mirror,
+ const lib::ClusterState& state,
+ const std::vector<std::string>& address)
+ {
+ std::string systemState = state.toString();
+ auto deleter = [](auto * ptr) { ptr->SubRef(); };
+ for (uint32_t i=0; i<address.size(); ++i) {
+ slobrok::api::MirrorAPI::SpecList list(
+ mirror.getMirror().lookup(address[i]));
+ for (uint32_t j=0; j<list.size(); ++j) {
+ auto target = std::unique_ptr<FRT_Target, decltype(deleter)>(mirror.getSupervisor().GetTarget(
+ list[j].second.c_str()), deleter);
+ auto req = std::unique_ptr<FRT_RPCRequest, decltype(deleter)>(mirror.getSupervisor().AllocRPCRequest(),
+ deleter);
+ req->SetMethodName("setsystemstate2");
+ req->GetParams()->AddString(systemState.c_str());
+ target->InvokeSync(req.get(), 5.0);
+ if (req->GetErrorCode() != FRTE_NO_ERROR) {
+ throw vespalib::IllegalStateException(
+ "Failed sending setsystemstate request: "
+ + std::string(req->GetErrorMessage()), VESPA_STRLOC);
+ }
+ }
+ }
+ }
+}
+
+void
+StorageServerTest::testShutdownDuringDiskLoad(bool storagenode)
+{
+ slobrokMirror->init(5000);
+ // Verify that, then shutdown, we stop accepting new messages, fail
+ // all messages enqueued and finish current operations before shutting
+ // down without any errors.
+ std::unique_ptr<Distributor> distServer(new Distributor(*distConfig));
+ std::unique_ptr<Storage> storServer(new Storage(*storConfig));
+ storServer->waitUntilInitialized(30);
+ LOG(info, "\n\nStorage server stable\n\n");
+ lib::ClusterState state("version:1 bits:1 distributor:1 storage:1");
+ std::vector<std::string> addresses;
+ addresses.push_back("storage/cluster.storage/storage/0");
+ addresses.push_back("storage/cluster.storage/distributor/0");
+ LOG(info, "\n\nSetting system states\n\n");
+ setSystemState(*slobrokMirror, state, addresses);
+ LOG(info, "\n\nWaiting for stable distributor server\n\n");
+ distServer->waitUntilInitialized(30);
+
+ LOG(info, "\n\nSTARTING LOADGIVER\n\n");
+
+ SimpleLoadGiver loadGiver(*distConfig, *docMan);
+ loadGiver.start(*threadPool);
+ loadGiver.waitUntilDecentLoad();
+
+ loadGiver.notifyStartingShutdown();
+
+ if (storagenode) {
+ LOG(info, "\n\nKILLING STORAGE NODE\n\n");
+ storServer->requestShutdown(
+ "Stopping storage server during load for testing");
+ storServer.reset(0);
+ } else {
+ LOG(info, "\n\nKILLING DISTRIBUTOR\n\n");
+ distServer->requestShutdown(
+ "Stopping distributor during load for testing");
+ distServer.reset(0);
+ }
+ LOG(info, "\n\nDONE KILLING NODE. Cleaning up other stuff.\n\n");
+
+ CPPUNIT_ASSERT_EQUAL(0u, loadGiver._unexpectedErrors);
+}
+
+void
+StorageServerTest::testShutdownStorageDuringDiskLoad()
+{
+ testShutdownDuringDiskLoad(true);
+}
+
+void
+StorageServerTest::testShutdownDistributorDuringDiskLoad()
+{
+ testShutdownDuringDiskLoad(false);
+}
+
+void
+StorageServerTest::testShutdownAfterDiskFailure_Stress()
+{
+ slobrokMirror->init(5000);
+
+ // Verify that, then shutdown, we stop accepting new messages, fail
+ // all messages enqueued and finish current operations before shutting
+ // down without any errors.
+ std::unique_ptr<Distributor> distServer(new Distributor(*distConfig));
+ std::unique_ptr<Storage> storServer(new Storage(*storConfig));
+ //storServer->getSlotFileCache().disable();
+ storServer->waitUntilInitialized(30);
+ LOG(info, "\n\nStorage server stable\n\n");
+ lib::ClusterState state("version:1 bits:1 distributor:1 storage:1");
+ std::vector<std::string> addresses;
+ addresses.push_back("storage/cluster.storage/storage/0");
+ addresses.push_back("storage/cluster.storage/distributor/0");
+ LOG(info, "\n\nSetting system states\n\n");
+ setSystemState(*slobrokMirror, state, addresses);
+ LOG(info, "\n\nWaiting for stable distributor server\n\n");
+ distServer->waitUntilInitialized(30);
+
+ LOG(info, "\n\nSTARTING LOADGIVER\n\n");
+
+ SimpleLoadGiver loadGiver(*distConfig, *docMan);
+ loadGiver.start(*threadPool);
+ loadGiver.waitUntilDecentLoad();
+
+ // Test that getting io errors flags storage for shutdown
+ // (The shutdown is the responsibility of the application in
+ // storageserver)
+ CPPUNIT_ASSERT(!storServer->attemptedStopped());
+ loadGiver.notifyStartingShutdown();
+ LOG(info, "\n\nREMOVING PERMISSIONS\n\n");
+ system("chmod 000 vdsroot/disks/d0/*.0");
+ system("ls -ld vdsroot/disks/d0/* > permissions");
+
+ for (uint32_t i=0; i<6000; ++i) {
+ //storServer->getMemFileCache().clear();
+ if (storServer->attemptedStopped()) break;
+ FastOS_Thread::Sleep(10);
+ }
+ if (!storServer->attemptedStopped()) {
+ CPPUNIT_FAIL("Removing permissions from disk failed to stop storage "
+ "within timeout of 60 seconds");
+ }
+
+ CPPUNIT_ASSERT_EQUAL(0u, loadGiver._unexpectedErrors);
+ unlink("permissions");
+}
+
+void
+StorageServerTest::testSplitJoinSplitThroughDistributor_Stress()
+{
+ // Setup system with storage and distributor
+ distConfig->getConfig("stor-distributormanager")
+ .set("splitcount", "10");
+ distConfig->getConfig("stor-distributormanager")
+ .set("joincount", "5");
+ distConfig->getConfig("stor-server")
+ .set("switch_new_meta_data_flow", "true");
+ storConfig->getConfig("stor-filestor")
+ .set("revert_time_period", "1");
+ storConfig->getConfig("stor-filestor")
+ .set("keep_remove_time_period", "1");
+ storConfig->getConfig("stor-integritychecker")
+ .set("mincycletime", "0");
+ Distributor distServer(*distConfig);
+ Storage storServer(*storConfig);
+ DummyStorageLink dummyLink;
+ dummyLink.addOnTopOfChain(*distServer.getChain());
+ DummyStorageLink sdummyLink;
+ sdummyLink.addOnTopOfChain(*storServer.getChain());
+
+ api::SetSystemStateCommand::SP scmd(new api::SetSystemStateCommand(
+ lib::ClusterState("version:3 distributor:1 storage:1")));
+ for (uint32_t i=0; i<500; ++i) {
+ if (storServer.getStateUpdater().getReportedNodeState()->getState()
+ == lib::State::UP) break;
+ FastOS_Thread::Sleep(10);
+ }
+ CPPUNIT_ASSERT(
+ storServer.getStateUpdater().getReportedNodeState()->getState()
+ == lib::State::UP);
+ distServer.getChain()->sendDown(scmd);
+ storServer.getChain()->sendDown(scmd);
+ dummyLink.waitForMessages(1, 15);
+ dummyLink.reset();
+ sdummyLink.waitForMessages(1, 15);
+ sdummyLink.reset();
+ for (uint32_t i=0; i<500; ++i) {
+ if (distServer.getStateUpdater().getReportedNodeState()->getState()
+ == lib::State::UP
+ && distServer.getStateUpdater().getSystemState()->getVersion() == 3)
+ {
+ break;
+ }
+ FastOS_Thread::Sleep(10);
+ }
+ CPPUNIT_ASSERT_EQUAL(lib::State::UP,
+ distServer.getStateUpdater().getReportedNodeState()->getState());
+ CPPUNIT_ASSERT_EQUAL(3u,
+ storServer.getStateUpdater().getSystemState()->getVersion());
+ CPPUNIT_ASSERT_EQUAL(3u,
+ distServer.getStateUpdater().getSystemState()->getVersion());
+ // Feed 50 documents
+ document::BucketId bucket(16, 0);
+ for (uint32_t i=0; i<50; ++i) {
+ document::Document::SP doc(
+ docMan->createRandomDocumentAtLocation(0, i, 1024));
+ api::PutCommand::SP cmd(new api::PutCommand(bucket, doc, 1000 + i));
+ distServer.getChain()->sendDown(cmd);
+ }
+ dummyLink.waitForMessages(50, 15);
+ for (uint32_t i=0; i<50; ++i) {
+ CPPUNIT_ASSERT_EQUAL(api::MessageType::PUT_REPLY,
+ dummyLink.getReplies()[i]->getType());
+ api::PutReply& reply(
+ dynamic_cast<api::PutReply&>(*dummyLink.getReplies()[i]));
+ CPPUNIT_ASSERT_EQUAL(api::ReturnCode(api::ReturnCode::OK),
+ reply.getResult());
+ }
+ dummyLink.reset();
+ // Wait until system has split to 7 buckets
+ distributor::BucketDatabase& db(distServer.getBucketDatabase());
+ for (size_t i(0); (i < 6000) && (7ul != db.size()); i++) {
+ FastOS_Thread::Sleep(10);
+ }
+ CPPUNIT_ASSERT_EQUAL_MSG(db.toString(), size_t(7), db.size());
+
+ // Remove 40 first documents
+ for (uint32_t i=0; i<40; ++i) {
+ document::Document::SP doc(
+ docMan->createRandomDocumentAtLocation(0, i, 1024));
+ api::RemoveCommand::SP cmd(new api::RemoveCommand(
+ bucket, doc->getId(), 2000 + i));
+ distServer.getChain()->sendDown(cmd);
+ }
+ dummyLink.waitForMessages(40, 15);
+ for (uint32_t i=0; i<40; ++i) {
+ CPPUNIT_ASSERT_EQUAL(api::MessageType::REMOVE_REPLY,
+ dummyLink.getReplies()[i]->getType());
+ api::RemoveReply& reply(
+ dynamic_cast<api::RemoveReply&>(*dummyLink.getReplies()[i]));
+ CPPUNIT_ASSERT_EQUAL(api::ReturnCode(api::ReturnCode::OK),
+ reply.getResult());
+ }
+ dummyLink.reset();
+ /*
+ framework::defaultimplementation::RealClock clock;
+ framework::MilliSecTime endTime = clock.getTimeInMillis()
+ + framework::MilliSecTime(100 * 1000);
+
+ Join dont happen anymore, as we dont join due to meta entries, used file
+ * size. Need to wait long enough for data to be compacted away if so.
+
+ // Wait until we have joined the 10 documents back to 3 buckets
+ while (db.size() != 3ul) {
+// std::cerr << db.toString(true);
+
+ if (clock.getTimeInMillis() > endTime) {
+ CPPUNIT_ASSERT_EQUAL_MSG(
+ "Failed to join buckets within timeout of 60 seconds"
+ + db.toString(true), 3ul, db.size());
+ }
+
+// FastOS_Thread::Sleep(1000);
+ }
+ CPPUNIT_ASSERT(db.get(document::BucketId(0x8800000000000000)).valid());
+ CPPUNIT_ASSERT(db.get(document::BucketId(0x8800000200000000)).valid());
+ CPPUNIT_ASSERT(db.get(document::BucketId(0x8400000100000000)).valid());
+ */
+}
+
+namespace {
+
+ struct PriorityStorageLoadGiver : public LoadGiver {
+ const document::TestDocMan& _testDocMan;
+ vespalib::Monitor _threadMonitor;
+ StorBucketDatabase _bucketDB;
+ document::BucketIdFactory _idFactory;
+ uint32_t _putCount;
+ uint32_t _getCount;
+ uint32_t _removeCount;
+ uint32_t _joinCount;
+ uint32_t _splitCount;
+ uint32_t _createBucket;
+ uint32_t _deleteBucket;
+ uint32_t _remappedOperations;
+ uint32_t _notFoundOps;
+ uint32_t _existOps;
+ uint32_t _bucketDeletedOps;
+ uint32_t _bucketNotFoundOps;
+ uint32_t _rejectedOps;
+
+ PriorityStorageLoadGiver(const vdstestlib::DirConfig& config,
+ const document::TestDocMan& tdm)
+ : LoadGiver(config, tdm.getTypeRepoSP()), _testDocMan(tdm),
+ _threadMonitor(),
+ _bucketDB(), _idFactory(),
+ _putCount(0), _getCount(0), _removeCount(0), _joinCount(0),
+ _splitCount(0), _createBucket(0), _deleteBucket(0),
+ _remappedOperations(0), _notFoundOps(0), _existOps(0),
+ _bucketDeletedOps(0), _bucketNotFoundOps(0), _rejectedOps(0) {}
+ ~PriorityStorageLoadGiver() {
+ close();
+ }
+ virtual void close() {
+ if (running()) {
+ stop();
+ join();
+ }
+ }
+ virtual bool onStop() {
+ vespalib::MonitorGuard monitor(_threadMonitor);
+ monitor.signal();
+ return true;
+ }
+ void run() {
+ uint32_t seed = 0;
+ uint32_t maxDocSize = 65536;
+ init();
+ vespalib::MonitorGuard monitor(_threadMonitor);
+ std::list<mbusprot::StorageCommand*> sendList;
+ while (running()) {
+ while (sendList.size() < (_maxPending - _currentPending)) {
+ document::Document::SP doc(
+ _testDocMan.createRandomDocument(
+ ++seed, maxDocSize));
+ api::StorageCommand::SP cmd;
+ document::BucketId bucket(
+ _idFactory.getBucketId(doc->getId()));
+ std::map<document::BucketId,
+ StorBucketDatabase::WrappedEntry> entries(
+ _bucketDB.getContained(bucket, ""));
+ if (entries.size() == 0
+ || (entries.size() == 1
+ && (entries.begin()->second->getBucketInfo().getChecksum() & 2) != 0))
+ {
+ if (entries.size() == 0) {
+ bucket.setUsedBits(4);
+ bucket = bucket.stripUnused();
+ entries[bucket] = _bucketDB.get(bucket, "",
+ StorBucketDatabase::CREATE_IF_NONEXISTING);
+ entries[bucket]->setChecksum(0);
+ } else {
+ bucket = entries.begin()->first;
+ entries[bucket]->setChecksum(
+ entries[bucket]->getBucketInfo().getChecksum() & ~2);
+ }
+ entries[bucket]->disk = 0;
+ entries[bucket].write();
+ entries[bucket] = _bucketDB.get(bucket, "foo");
+ CPPUNIT_ASSERT(entries[bucket].exist());
+ cmd.reset(new api::CreateBucketCommand(bucket));
+ sendList.push_back(new mbusprot::StorageCommand(cmd));
+ }
+ CPPUNIT_ASSERT_EQUAL(size_t(1), entries.size());
+ bucket = entries.begin()->first;
+ StorBucketDatabase::WrappedEntry entry(
+ entries.begin()->second);
+ if (seed % 95 == 93) { // Delete bucket
+ if ((entry->getBucketInfo().getChecksum() & 2) == 0) {
+ cmd.reset(new api::DeleteBucketCommand(bucket));
+ entry->setChecksum(
+ entry->getBucketInfo().getChecksum() | 2);
+ entry.write();
+ sendList.push_back(
+ new mbusprot::StorageCommand(cmd));
+ }
+ } else if (seed % 13 == 8) { // Join
+ if (entry->getBucketInfo().getChecksum() == 0 && bucket.getUsedBits() > 3) {
+ // Remove existing locks we have to not cause
+ // deadlock
+ entry = StorBucketDatabase::WrappedEntry();
+ entries.clear();
+ // Then continue
+ document::BucketId super(bucket.getUsedBits() - 1,
+ bucket.getRawId());
+ super = super.stripUnused();
+ api::JoinBucketsCommand::SP jcmd(
+ new api::JoinBucketsCommand(super));
+ entries = _bucketDB.getAll(super, "foo");
+ bool foundAnyLocked = false;
+ for (std::map<document::BucketId,
+ StorBucketDatabase::WrappedEntry>
+ ::iterator it = entries.begin();
+ it != entries.end(); ++it)
+ {
+ if (!super.contains(it->first) || super == it->first) continue;
+ jcmd->getSourceBuckets().push_back(
+ it->first.stripUnused());
+ foundAnyLocked |= (it->second->getBucketInfo().getChecksum() != 0);
+ }
+ if (!foundAnyLocked && jcmd->getSourceBuckets().size() == 2) {
+ for (std::map<document::BucketId,
+ StorBucketDatabase::WrappedEntry>
+ ::iterator it = entries.begin();
+ it != entries.end(); ++it)
+ {
+ if (!super.contains(it->first)) continue;
+ it->second->setChecksum(
+ it->second->getBucketInfo().getChecksum() | 1);
+ it->second.write();
+ }
+ cmd = jcmd;
+ sendList.push_back(
+ new mbusprot::StorageCommand(cmd));
+ }
+ }
+ } else if (seed % 13 == 1) { // Split
+ // Use _checksum == 1 to mean that we have a pending
+ // maintenance operation to this bucket.
+ if (entry->getBucketInfo().getChecksum() == 0) {
+ cmd.reset(new api::SplitBucketCommand(bucket));
+ entry->setChecksum(1);
+ entry.write();
+ sendList.push_back(
+ new mbusprot::StorageCommand(cmd));
+ }
+ } else if (seed % 7 == 5) { // Remove
+ if ((entry->getBucketInfo().getChecksum() & 2) == 0) {
+ cmd.reset(new api::RemoveCommand(bucket,
+ doc->getId(), 1000ull * seed + 2));
+ sendList.push_back(
+ new mbusprot::StorageCommand(cmd));
+ }
+ } else if (seed % 5 == 3) { // Get
+ if ((entry->getBucketInfo().getChecksum() & 2) == 0) {
+ cmd.reset(new api::GetCommand(
+ bucket, doc->getId(), "[all]"));
+ sendList.push_back(
+ new mbusprot::StorageCommand(cmd));
+ }
+ } else { // Put
+ if ((entry->getBucketInfo().getChecksum() & 2) == 0) {
+ cmd.reset(new api::PutCommand(
+ bucket, doc, 1000ull * seed + 1));
+ sendList.push_back(
+ new mbusprot::StorageCommand(cmd));
+ }
+ }
+ if (!sendList.empty()) {
+ uint8_t priorities[] = {
+ api::StorageMessage::LOW,
+ api::StorageMessage::NORMAL,
+ api::StorageMessage::HIGH,
+ api::StorageMessage::VERYHIGH
+ };
+ sendList.back()->getCommand()->setPriority(priorities[seed % 4]);
+ }
+ }
+ if (sendList.size() > 0) {
+ uint32_t sent = 0;
+ for (uint32_t i=0; i<sendList.size(); ++i) {
+ mbus::Message::UP msg(*sendList.begin());
+ msg->setRetryEnabled(false);
+ mbus::Result r = _sourceSession->send(std::move(msg),
+ "storage/cluster.storage/storage/0/default",
+ true);
+ if (r.isAccepted()){
+ sendList.pop_front();
+ ++_currentPending;
+ ++sent;
+ } else {
+ r.getMessage().release();
+ break;
+ }
+ }
+ }
+ monitor.wait(1);
+ }
+ }
+
+ std::string report() {
+ std::ostringstream ost;
+ ost << "Performed ("
+ << _putCount << ", " << _getCount << ", "
+ << _removeCount << ", " << _splitCount << ", "
+ << _joinCount << ", " << _createBucket << ", "
+ << _deleteBucket
+ << ") put/get/remove/split/join/create/delete operations.\n"
+ << "Result: " << _remappedOperations << " remapped operations\n"
+ << " " << _processedOk << " ok responses.\n"
+ << " " << _notFoundOps << " NOT_FOUND responses.\n"
+ << " " << _existOps << " EXISTS responses\n"
+ << " " << _bucketDeletedOps << " BUCKET_DELETED responses\n"
+ << " " << _bucketNotFoundOps << " BUCKET_NOT_FOUND responses\n"
+ << " " << _rejectedOps << " REJECTED responses (duplicate splits)\n"
+ << " " << _unexpectedErrors << " unexpected errors\n";
+ return ost.str();
+ }
+
+ virtual void handleReply(mbus::Reply::UP reply) {
+ if (_startedShutdown) return;
+ --_currentPending;
+ std::ostringstream err;
+ mbusprot::StorageReply* mreply(
+ dynamic_cast<mbusprot::StorageReply*>(reply.get()));
+ if (mreply == 0) {
+ ++_unexpectedErrors;
+ err << "Got unexpected reply which is not a storage reply, "
+ << "likely emptyreply.";
+ if (reply->hasErrors()) {
+ int code = reply->getError(0).getCode();
+ std::string errorMsg = reply->getError(0).getMessage();
+ err << "\n mbus(" << code << "): '" << errorMsg << "'";
+ }
+ err << "\n";
+ std::cerr << err.str();
+ return;
+ }
+ api::StorageReply& sreply(*mreply->getReply());
+ api::BucketReply& breply(static_cast<api::BucketReply&>(sreply));
+
+ if ((!reply->hasErrors()
+ && sreply.getResult().success())
+ || sreply.getResult().getResult() == api::ReturnCode::EXISTS
+ || sreply.getResult().getMessage().find("Bucket does not exist; assuming already split") != std::string::npos
+ || sreply.getResult().getResult()
+ == api::ReturnCode::BUCKET_DELETED
+ || sreply.getResult().getResult()
+ == api::ReturnCode::BUCKET_NOT_FOUND)
+ {
+ std::ostringstream out;
+ if (breply.hasBeenRemapped()) {
+ ++_remappedOperations;
+ }
+ if (sreply.getType() == api::MessageType::JOINBUCKETS_REPLY) {
+ vespalib::MonitorGuard monitor(_threadMonitor);
+ api::JoinBucketsReply& joinReply(
+ static_cast<api::JoinBucketsReply&>(sreply));
+ StorBucketDatabase::WrappedEntry entry(
+ _bucketDB.get(joinReply.getBucketId(), "",
+ StorBucketDatabase::CREATE_IF_NONEXISTING));
+ entry->setChecksum(0);
+ entry->disk = 0;
+ entry.write();
+ for(std::vector<document::BucketId>::const_iterator it
+ = joinReply.getSourceBuckets().begin();
+ it != joinReply.getSourceBuckets().end(); ++it)
+ {
+ _bucketDB.erase(*it, "foo");
+ }
+ ++_joinCount;
+ out << "OK " << joinReply.getBucketId() << " Join\n";
+ } else if (sreply.getType()
+ == api::MessageType::SPLITBUCKET_REPLY
+ && sreply.getResult().getResult() != api::ReturnCode::REJECTED)
+ {
+ vespalib::MonitorGuard monitor(_threadMonitor);
+ api::SplitBucketReply& splitReply(
+ static_cast<api::SplitBucketReply&>(sreply));
+ StorBucketDatabase::WrappedEntry entry(
+ _bucketDB.get(splitReply.getBucketId(), "foo"));
+ if (entry.exist()) {
+ //CPPUNIT_ASSERT((entry->getBucketInfo().getChecksum() & 1) != 0);
+ entry.remove();
+ }
+ for(std::vector<api::SplitBucketReply::Entry>::iterator it
+ = splitReply.getSplitInfo().begin();
+ it != splitReply.getSplitInfo().end(); ++it)
+ {
+ entry = _bucketDB.get(it->first, "foo",
+ StorBucketDatabase::CREATE_IF_NONEXISTING);
+ entry->setChecksum(0);
+ entry->disk = 0;
+ entry.write();
+ }
+ ++_splitCount;
+ out << "OK " << splitReply.getBucketId() << " Split\n";
+ } else if (sreply.getType() == api::MessageType::PUT_REPLY) {
+ ++_putCount;
+ if (!static_cast<api::PutReply&>(sreply).wasFound()) {
+ ++_notFoundOps;
+ }
+ out << "OK " << breply.getBucketId() << " Put\n";
+ } else if (sreply.getType() == api::MessageType::GET_REPLY) {
+ ++_getCount;
+ if (!static_cast<api::GetReply&>(sreply).wasFound()) {
+ ++_notFoundOps;
+ }
+ out << "OK " << breply.getBucketId() << " Get\n";
+ } else if (sreply.getType() == api::MessageType::REMOVE_REPLY) {
+ ++_removeCount;
+ if (!static_cast<api::RemoveReply&>(sreply).wasFound()) {
+ ++_notFoundOps;
+ }
+ out << "OK " << breply.getBucketId() << " Remove\n";
+ } else if (sreply.getType()
+ == api::MessageType::CREATEBUCKET_REPLY)
+ {
+ ++_createBucket;
+ out << "OK " << breply.getBucketId() << " Create\n";
+ } else if (sreply.getType()
+ == api::MessageType::DELETEBUCKET_REPLY)
+ {
+ ++_deleteBucket;
+ out << "OK " << breply.getBucketId() << " Delete\n";
+ }
+ switch (sreply.getResult().getResult()) {
+ case api::ReturnCode::EXISTS: ++_existOps; break;
+ case api::ReturnCode::BUCKET_NOT_FOUND:
+ ++_bucketNotFoundOps; break;
+ case api::ReturnCode::BUCKET_DELETED:
+ ++_bucketDeletedOps; break;
+ case api::ReturnCode::REJECTED:
+ ++_rejectedOps; break;
+ case api::ReturnCode::OK: ++_processedOk; break;
+ default:
+ assert(false);
+ }
+ //std::cerr << "OK - " << sreply.getType() << "\n";
+ if (_processedOk % 5000 == 0) {
+ out << report();
+ }
+ //err << out.str();
+ } else {
+ ++_unexpectedErrors;
+ api::BucketReply& brep(static_cast<api::BucketReply&>(sreply));
+ err << "Failed " << brep.getBucketId() << " "
+ << sreply.getType().getName() << ":";
+ if (reply->hasErrors()) {
+ int code = reply->getError(0).getCode();
+ std::string errorMsg = reply->getError(0).getMessage();
+ err << " mbus(" << code << "): '" << errorMsg << "'";
+ }
+ if (sreply.getResult().failed()) {
+ err << " sapi: " << sreply.getResult() << "\n";
+ }
+ }
+ std::cerr << err.str();
+ }
+ };
+
+ enum StateType { REPORTED, CURRENT };
+ void waitForStorageUp(StorageComponent& storageNode,
+ StateType type, time_t timeoutMS = 60000)
+ {
+ framework::defaultimplementation::RealClock clock;
+ framework::MilliSecTime timeout = clock.getTimeInMillis()
+ + framework::MilliSecTime(timeoutMS);
+ while (true) {
+ lib::NodeState::CSP ns(type == REPORTED
+ ? storageNode.getStateUpdater().getReportedNodeState()
+ : storageNode.getStateUpdater().getCurrentNodeState());
+ if (ns->getState() == lib::State::UP) return;
+ if (clock.getTimeInMillis() > timeout) {
+ std::ostringstream ost;
+ ost << "Storage node failed to get up within timeout of "
+ << timeoutMS << " ms. Current state is: " << ns;
+ CPPUNIT_FAIL(ost.str());
+ }
+ FastOS_Thread::Sleep(10);
+ }
+ }
+
+}
+
+void
+StorageServerTest::testPriorityAndQueueSneakingWhileSplitJoinStressTest()
+{
+ PriorityStorageLoadGiver loadGiver(*storConfig, *docMan);
+ Storage storServer(*storConfig);
+ waitForStorageUp(storServer.getComponent(), REPORTED);
+ api::SetSystemStateCommand::SP cmd(new api::SetSystemStateCommand(
+ lib::ClusterState("storage:1")));
+ storServer.getChain()->sendDown(cmd);
+ waitForStorageUp(storServer.getComponent(), CURRENT);
+ loadGiver.start(*threadPool);
+ while (loadGiver._processedOk + loadGiver._unexpectedErrors < 1000) {
+ FastOS_Thread::Sleep(100);
+ std::cerr << "OK " << loadGiver._processedOk << " Errors: "
+ << loadGiver._unexpectedErrors << "\n";
+ }
+ loadGiver.notifyStartingShutdown();
+ loadGiver.stop();
+ loadGiver.close();
+ std::cerr << loadGiver.report();
+ CPPUNIT_ASSERT(loadGiver._bucketNotFoundOps < 300);
+ CPPUNIT_ASSERT(loadGiver._unexpectedErrors == 0);
+}
+
+// This test is not a stress test, but adding stress to its name makes it not
+// run during regular make test runs
+void
+StorageServerTest::testPortOverlap_Stress()
+{
+ for (uint32_t i=0; i<3; ++i) {
+ std::cerr << "Run " << i << "\n";
+ tearDown();
+ setUp();
+ const char* config = "stor-communicationmanager";
+ std::string type = "none";
+ if (i == 0) {
+ distConfig->getConfig(config).set("mbusport", "12301");
+ storConfig->getConfig(config).set("mbusport", "12311");
+ type = "mbusport";
+ } else if (i == 1) {
+ distConfig->getConfig(config).set("rpcport", "12302");
+ storConfig->getConfig(config).set("rpcport", "12312");
+ type = "rpcport";
+ } else if (i == 2) {
+ distConfig->getConfig("stor-status").set("httpport", "12303");
+ storConfig->getConfig("stor-status").set("httpport", "12313");
+ type = "httpport";
+ }
+ LOG(info, "TEST: (0) STARTING PORT TEST: %s", type.c_str());
+ slobrokMirror->init(5000);
+
+ std::unique_ptr<Distributor> distServerOld(new Distributor(*distConfig));
+ std::unique_ptr<Storage> storServerOld(new Storage(*storConfig));
+
+ LOG(info, "TEST: (1) WAITING FOR STABLE STORAGE SERVER");
+ storServerOld->waitUntilInitialized(30);
+ LOG(info, "TEST: (2) STORAGE SERVER STABLE");
+ lib::ClusterState state("version:1 distributor:1 storage:1");
+ std::vector<std::string> addresses;
+ addresses.push_back("storage/cluster.storage/storage/0");
+ addresses.push_back("storage/cluster.storage/distributor/0");
+ LOG(info, "TEST: (3) SETTING SYSTEM STATES");
+ setSystemState(*slobrokMirror, state, addresses);
+ LOG(info, "TEST: (4) WAITING FOR STABLE DISTRIBUTOR SERVER");
+ distServerOld->waitUntilInitialized(30);
+
+ {
+ LOG(info, "TEST: (5) ADDING SOME LOAD TO CHECK PORTS");
+ SimpleLoadGiver loadGiver(*distConfig, *docMan);
+ loadGiver.start(*threadPool);
+ loadGiver.waitUntilDecentLoad();
+ }
+
+ LOG(info, "TEST: (6) CREATING NEW SET OF SERVERS");
+ try{
+ Distributor distServer(*distConfig);
+ CPPUNIT_FAIL("Distributor server failed to fail on busy " + type);
+ } catch (vespalib::Exception& e) {
+ std::string msg = e.getMessage();
+ std::string::size_type pos = msg.rfind(':');
+ if (pos != std::string::npos) msg = msg.substr(pos + 2);
+ if (msg == "Failed to listen to RPC port 12302." ||
+ msg == "Failed to start network." ||
+ msg == "Failed to start status HTTP server using port 12303.")
+ {
+ } else {
+ CPPUNIT_FAIL("Unexpected exception: " + msg);
+ }
+ }
+ try{
+ Storage storServer(*storConfig);
+ CPPUNIT_FAIL("Storage server failed to fail on busy " + type);
+ } catch (vespalib::Exception& e) {
+ std::string msg = e.getMessage();
+ std::string::size_type pos = msg.rfind(':');
+ if (pos != std::string::npos) msg = msg.substr(pos + 2);
+ if (msg == "Failed to listen to RPC port 12312." ||
+ msg == "Failed to start network." ||
+ msg == "Failed to start status HTTP server using port 12313.")
+ {
+ } else {
+ CPPUNIT_FAIL("Unexpected exception: " + msg);
+ }
+ }
+ }
+}
+
+void
+StorageServerTest::testStatusPages()
+{
+ Storage storServer(*storConfig);
+ // Bucket manager doesn't set up metrics before after talking to
+ // persistence layer
+ storServer.getNode().waitUntilInitialized();
+ {
+ // Get HTML status pages
+ framework::HttpUrlPath path("?interval=-2&format=html");
+ std::ostringstream ost;
+ try{
+ const framework::StatusReporter* reporter(
+ storServer.getStatusReporter("statusmetricsconsumer"));
+ CPPUNIT_ASSERT(reporter != 0);
+ reporter->reportStatus(ost, path);
+ } catch (std::exception& e) {
+ CPPUNIT_FAIL("Failed to get status metric page: "
+ + std::string(e.what()) + "\nGot so far: " + ost.str());
+ }
+ std::string output = ost.str();
+ CPPUNIT_ASSERT_MSG(output,
+ output.find("Exception") == std::string::npos);
+ CPPUNIT_ASSERT_MSG(output, output.find("Error") == std::string::npos);
+ }
+}
+
+} // storage
diff --git a/storageserver/src/tests/testhelper.cpp b/storageserver/src/tests/testhelper.cpp
new file mode 100644
index 00000000000..0aba3aeb7cf
--- /dev/null
+++ b/storageserver/src/tests/testhelper.cpp
@@ -0,0 +1,170 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/fastos/fastos.h>
+#include <tests/testhelper.h>
+
+#include <vespa/log/log.h>
+#include <vespa/vespalib/io/fileutil.h>
+
+LOG_SETUP(".testhelper");
+
+namespace storage {
+
+namespace {
+ bool useNewStorageCore() {
+ if ( // Unit test directory
+ vespalib::fileExists("use_new_storage_core") ||
+ // src/cpp directory
+ vespalib::fileExists("../use_new_storage_core") ||
+ // Top build directory where storage-HEAD remains
+ vespalib::fileExists("../../../../../use_new_storage_core"))
+ {
+ std::cerr << "Using new storage core for unit tests\n";
+ return true;
+ }
+ return false;
+ }
+ bool newStorageCore(useNewStorageCore());
+}
+
+void addStorageDistributionConfig(vdstestlib::DirConfig& dc)
+{
+ vdstestlib::DirConfig::Config* config;
+ config = &dc.getConfig("stor-distribution", true);
+ config->clear();
+ config->set("group[1]");
+ config->set("group[0].name", "invalid");
+ config->set("group[0].index", "invalid");
+ config->set("group[0].nodes[50]");
+
+ for (uint32_t i = 0; i < 50; i++) {
+ std::ostringstream key; key << "group[0].nodes[" << i << "].index";
+ std::ostringstream val; val << i;
+ config->set(key.str(), val.str());
+ }
+}
+
+vdstestlib::DirConfig getStandardConfig(bool storagenode) {
+ vdstestlib::DirConfig dc;
+ vdstestlib::DirConfig::Config* config;
+ config = &dc.addConfig("upgrading");
+ config = &dc.addConfig("load-type");
+ config = &dc.addConfig("bucket");
+ config = &dc.addConfig("messagebus");
+ config = &dc.addConfig("stor-prioritymapping");
+ config = &dc.addConfig("stor-bucketdbupdater");
+ config = &dc.addConfig("stor-bucket-init");
+ config = &dc.addConfig("metricsmanager");
+ config->set("consumer[1]");
+ config->set("consumer[0].name", "\"status\"");
+ config->set("consumer[0].addedmetrics[1]");
+ config->set("consumer[0].addedmetrics[0]", "\"*\"");
+ config = &dc.addConfig("stor-communicationmanager");
+ config->set("rpcport", "0");
+ config->set("mbusport", "0");
+ config = &dc.addConfig("stor-bucketdb");
+ config->set("chunklevel", "0");
+ config = &dc.addConfig("stor-distributormanager");
+ config = &dc.addConfig("stor-opslogger");
+ config = &dc.addConfig("stor-filestor");
+ // Easier to see what goes wrong with only 1 thread per disk.
+ config->set("minimum_file_meta_slots", "2");
+ config->set("minimum_file_header_block_size", "368");
+ config->set("minimum_file_size", "4096");
+ config->set("threads[1]");
+ config->set("threads[0].lowestpri 255");
+ config->set("dir_spread", "4");
+ config->set("dir_levels", "0");
+ config->set("use_new_core", newStorageCore ? "true" : "false");
+ config->set("maximum_versions_of_single_document_stored", "0");
+ //config->set("enable_slotfile_cache", "false");
+ // Unit tests typically use fake low time values, so don't complain
+ // about them or compact/delete them by default. Override in tests testing that
+ // behavior
+ config->set("time_future_limit", "5");
+ config->set("time_past_limit", "2000000000");
+ config->set("keep_remove_time_period", "2000000000");
+ config->set("revert_time_period", "2000000000");
+ // Don't want test to call exit()
+ config->set("fail_disk_after_error_count", "0");
+ config = &dc.addConfig("stor-memfilepersistence");
+ // Easier to see what goes wrong with only 1 thread per disk.
+ config->set("minimum_file_meta_slots", "2");
+ config->set("minimum_file_header_block_size", "368");
+ config->set("minimum_file_size", "4096");
+ config->set("dir_spread", "4");
+ config->set("dir_levels", "0");
+ config = &dc.addConfig("persistence");
+ config->set("keep_remove_time_period", "2000000000");
+ config->set("revert_time_period", "2000000000");
+ config = &dc.addConfig("stor-bouncer");
+ config = &dc.addConfig("stor-integritychecker");
+ config = &dc.addConfig("stor-bucketmover");
+ config = &dc.addConfig("stor-messageforwarder");
+ config = &dc.addConfig("stor-server");
+ config->set("enable_dead_lock_detector", "false");
+ config->set("enable_dead_lock_detector_warnings", "false");
+ config->set("max_merges_per_node", "25");
+ config->set("max_merge_queue_size", "20");
+ config->set("root_folder",
+ (storagenode ? "vdsroot" : "vdsroot.distributor"));
+ config->set("is_distributor",
+ (storagenode ? "false" : "true"));
+ config = &dc.addConfig("stor-devices");
+ config->set("root_folder",
+ (storagenode ? "vdsroot" : "vdsroot.distributor"));
+ config = &dc.addConfig("stor-status");
+ config->set("httpport", "0");
+ config = &dc.addConfig("stor-visitor");
+ config->set("defaultdocblocksize", "8192");
+ // By default, need "old" behaviour of maxconcurrent
+ config->set("maxconcurrentvisitors_fixed", "4");
+ config->set("maxconcurrentvisitors_variable", "0");
+ config = &dc.addConfig("stor-visitordispatcher");
+ addFileConfig(dc, "documenttypes", "config-doctypes.cfg");
+ addStorageDistributionConfig(dc);
+ return dc;
+}
+
+void addSlobrokConfig(vdstestlib::DirConfig& dc,
+ const mbus::Slobrok& slobrok)
+{
+ std::ostringstream ost;
+ ost << "tcp/localhost:" << slobrok.port();
+ vdstestlib::DirConfig::Config* config;
+ config = &dc.getConfig("slobroks", true);
+ config->clear();
+ config->set("slobrok[1]");
+ config->set("slobrok[0].connectionspec", ost.str());
+}
+
+void addFileConfig(vdstestlib::DirConfig& dc,
+ const std::string& configDefName,
+ const std::string& fileName)
+{
+ vdstestlib::DirConfig::Config* config;
+ config = &dc.getConfig(configDefName, true);
+ config->clear();
+ std::ifstream in(fileName.c_str());
+ std::string line;
+ while (std::getline(in, line, '\n')) {
+ std::string::size_type pos = line.find(' ');
+ if (pos == std::string::npos) {
+ config->set(line);
+ } else {
+ config->set(line.substr(0, pos), line.substr(pos + 1));
+ }
+ }
+ in.close();
+}
+
+TestName::TestName(const std::string& n)
+ : name(n)
+{
+ LOG(debug, "Starting test %s", name.c_str());
+}
+
+TestName::~TestName() {
+ LOG(debug, "Done with test %s", name.c_str());
+}
+
+} // storage
diff --git a/storageserver/src/tests/testhelper.h b/storageserver/src/tests/testhelper.h
new file mode 100644
index 00000000000..be2c3e7ec66
--- /dev/null
+++ b/storageserver/src/tests/testhelper.h
@@ -0,0 +1,58 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+#include <vespa/vdstestlib/cppunit/dirconfig.h>
+#include <vespa/vdstestlib/cppunit/macros.h>
+
+
+#include <fstream>
+#include <vespa/fastos/fastos.h>
+#include <vespa/messagebus/testlib/slobrok.h>
+#include <sstream>
+
+#define ASSERT_REPLY_COUNT(count, dummylink) \
+ { \
+ std::ostringstream msgost; \
+ if ((dummylink).getNumReplies() != count) { \
+ for (uint32_t ijx=0; ijx<(dummylink).getNumReplies(); ++ijx) { \
+ msgost << (dummylink).getReply(ijx)->toString(true) << "\n"; \
+ } \
+ } \
+ CPPUNIT_ASSERT_EQUAL_MSG(msgost.str(), size_t(count), \
+ (dummylink).getNumReplies()); \
+ }
+#define ASSERT_COMMAND_COUNT(count, dummylink) \
+ { \
+ std::ostringstream msgost; \
+ if ((dummylink).getNumCommands() != count) { \
+ for (uint32_t ijx=0; ijx<(dummylink).getNumCommands(); ++ijx) { \
+ msgost << (dummylink).getCommand(ijx)->toString(true) << "\n"; \
+ } \
+ } \
+ CPPUNIT_ASSERT_EQUAL_MSG(msgost.str(), size_t(count), \
+ (dummylink).getNumCommands()); \
+ }
+
+namespace storage {
+
+void addFileConfig(vdstestlib::DirConfig& dc,
+ const std::string& configDefName,
+ const std::string& fileName);
+
+
+void addStorageDistributionConfig(vdstestlib::DirConfig& dc);
+
+vdstestlib::DirConfig getStandardConfig(bool storagenode);
+
+void addSlobrokConfig(vdstestlib::DirConfig& dc,
+ const mbus::Slobrok& slobrok);
+
+// Class used to print start and end of test. Enable debug when you want to see
+// which test creates what output or where we get stuck
+struct TestName {
+ std::string name;
+ TestName(const std::string& n);
+ ~TestName();
+};
+
+} // storage
+
diff --git a/storageserver/src/tests/testrunner.cpp b/storageserver/src/tests/testrunner.cpp
new file mode 100644
index 00000000000..5d8dc8d4c1f
--- /dev/null
+++ b/storageserver/src/tests/testrunner.cpp
@@ -0,0 +1,15 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/fastos/fastos.h>
+#include <iostream>
+#include <vespa/log/log.h>
+#include <vespa/vdstestlib/cppunit/cppunittestrunner.h>
+
+LOG_SETUP("storagecppunittests");
+
+int
+main(int argc, char **argv)
+{
+ vdstestlib::CppUnitTestRunner testRunner;
+ return testRunner.run(argc, argv);
+}
diff --git a/storageserver/src/vespa/storageserver/app/.gitignore b/storageserver/src/vespa/storageserver/app/.gitignore
new file mode 100644
index 00000000000..3a4be5d506a
--- /dev/null
+++ b/storageserver/src/vespa/storageserver/app/.gitignore
@@ -0,0 +1,12 @@
+*.So
+*.lo
+.depend
+.depend.NEW
+.deps
+.libs
+Makefile
+Makefile.inc
+config_command.sh
+project.dsw
+storaged
+storaged.core
diff --git a/storageserver/src/vespa/storageserver/app/CMakeLists.txt b/storageserver/src/vespa/storageserver/app/CMakeLists.txt
new file mode 100644
index 00000000000..e2918ae49d6
--- /dev/null
+++ b/storageserver/src/vespa/storageserver/app/CMakeLists.txt
@@ -0,0 +1,11 @@
+# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_library(storageserver_storageapp STATIC
+ SOURCES
+ process.cpp
+ distributorprocess.cpp
+ servicelayerprocess.cpp
+ dummyservicelayerprocess.cpp
+ rpcservicelayerprocess.cpp
+ memfileservicelayerprocess.cpp
+ DEPENDS
+)
diff --git a/storageserver/src/vespa/storageserver/app/distributorprocess.cpp b/storageserver/src/vespa/storageserver/app/distributorprocess.cpp
new file mode 100644
index 00000000000..80de17455d3
--- /dev/null
+++ b/storageserver/src/vespa/storageserver/app/distributorprocess.cpp
@@ -0,0 +1,84 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/fastos/fastos.h>
+#include <vespa/storageserver/app/distributorprocess.h>
+
+#include <vespa/log/log.h>
+#include <vespa/storage/storageutil/utils.h>
+
+LOG_SETUP(".process.distributor");
+
+namespace storage {
+
+DistributorProcess::DistributorProcess(const config::ConfigUri & configUri)
+ : Process(configUri),
+ _activeFlag(DistributorNode::NO_NEED_FOR_ACTIVE_STATES)
+{
+}
+
+void
+DistributorProcess::shutdown()
+{
+ Process::shutdown();
+ _node.reset();
+}
+
+void
+DistributorProcess::setupConfig(uint64_t subscribeTimeout)
+{
+ std::unique_ptr<vespa::config::content::core::StorServerConfig> config =
+ config::ConfigGetter<vespa::config::content::core::StorServerConfig>::getConfig(_configUri.getConfigId(), _configUri.getContext(), subscribeTimeout);
+ if (config->persistenceProvider.type
+ != vespa::config::content::core::StorServerConfig::PersistenceProvider::STORAGE)
+ {
+ _activeFlag = DistributorNode::NEED_ACTIVE_BUCKET_STATES_SET;
+ }
+ _distributorConfigHandler
+ = _configSubscriber.subscribe<vespa::config::content::core::StorDistributormanagerConfig>(
+ _configUri.getConfigId(), subscribeTimeout);
+ _visitDispatcherConfigHandler
+ = _configSubscriber.subscribe<vespa::config::content::core::StorVisitordispatcherConfig>(
+ _configUri.getConfigId(), subscribeTimeout);
+ Process::setupConfig(subscribeTimeout);
+}
+
+void
+DistributorProcess::updateConfig()
+{
+ Process::updateConfig();
+ if (_distributorConfigHandler->isChanged()) {
+ _node->handleConfigChange(
+ *_distributorConfigHandler->getConfig());
+ }
+ if (_visitDispatcherConfigHandler->isChanged()) {
+ _node->handleConfigChange(
+ *_visitDispatcherConfigHandler->getConfig());
+ }
+}
+
+bool
+DistributorProcess::configUpdated()
+{
+ bool changed = Process::configUpdated();
+ if (_distributorConfigHandler->isChanged()) {
+ LOG(info, "Distributor manager config detected changed");
+ changed = true;
+ }
+ if (_visitDispatcherConfigHandler->isChanged()) {
+ LOG(info, "Visitor dispatcher config detected changed");
+ changed = true;
+ }
+ return changed;
+}
+
+void
+DistributorProcess::createNode()
+{
+ _node.reset(new DistributorNode(_configUri, _context, *this, _activeFlag));
+ _node->handleConfigChange(
+ *_distributorConfigHandler->getConfig());
+ _node->handleConfigChange(
+ *_visitDispatcherConfigHandler->getConfig());
+}
+
+} // storage
diff --git a/storageserver/src/vespa/storageserver/app/distributorprocess.h b/storageserver/src/vespa/storageserver/app/distributorprocess.h
new file mode 100644
index 00000000000..32472a68793
--- /dev/null
+++ b/storageserver/src/vespa/storageserver/app/distributorprocess.h
@@ -0,0 +1,42 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+/**
+ * \class storage::DistributorProcess
+ *
+ * \brief A process running a distributor.
+ */
+
+#pragma once
+
+#include <vespa/storageserver/app/process.h>
+
+namespace storage {
+
+class DistributorProcess : public Process {
+ DistributorNodeContext _context;
+ DistributorNode::NeedActiveState _activeFlag;
+ DistributorNode::UP _node;
+ config::ConfigHandle<vespa::config::content::core::StorDistributormanagerConfig>::UP
+ _distributorConfigHandler;
+ config::ConfigHandle<vespa::config::content::core::StorVisitordispatcherConfig>::UP
+ _visitDispatcherConfigHandler;
+
+public:
+ DistributorProcess(const config::ConfigUri & configUri);
+ ~DistributorProcess() { shutdown(); }
+
+ virtual void shutdown();
+
+ virtual void setupConfig(uint64_t subscribeTimeout);
+ virtual void createNode();
+ virtual bool configUpdated();
+ virtual void updateConfig();
+
+ virtual StorageNode& getNode() { return *_node; }
+ virtual StorageNodeContext& getContext() { return _context; }
+ virtual DistributorNodeContext& getDistributorContext() { return _context; }
+
+ virtual std::string getComponentName() const { return "distributor"; }
+};
+
+} // storage
+
diff --git a/storageserver/src/vespa/storageserver/app/dummyservicelayerprocess.cpp b/storageserver/src/vespa/storageserver/app/dummyservicelayerprocess.cpp
new file mode 100644
index 00000000000..c57d7667248
--- /dev/null
+++ b/storageserver/src/vespa/storageserver/app/dummyservicelayerprocess.cpp
@@ -0,0 +1,32 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/fastos/fastos.h>
+#include <vespa/storageserver/app/dummyservicelayerprocess.h>
+
+#include <vespa/log/log.h>
+
+LOG_SETUP(".process.servicelayer");
+
+namespace storage {
+
+// DummyServiceLayerProcess implementation
+
+DummyServiceLayerProcess::DummyServiceLayerProcess(const config::ConfigUri & configUri)
+ : ServiceLayerProcess(configUri)
+{
+}
+
+void
+DummyServiceLayerProcess::shutdown()
+{
+ ServiceLayerProcess::shutdown();
+ _provider.reset(0);
+}
+
+void
+DummyServiceLayerProcess::setupProvider()
+{
+ _provider.reset(new spi::dummy::DummyPersistence(getTypeRepo()));
+}
+
+} // storage
diff --git a/storageserver/src/vespa/storageserver/app/dummyservicelayerprocess.h b/storageserver/src/vespa/storageserver/app/dummyservicelayerprocess.h
new file mode 100644
index 00000000000..ab119bfba5e
--- /dev/null
+++ b/storageserver/src/vespa/storageserver/app/dummyservicelayerprocess.h
@@ -0,0 +1,27 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+/**
+ * \class storage::DummyServiceLayerProcess
+ *
+ * \brief A process running a service layer with dummy persistence provider.
+ */
+#pragma once
+
+#include <vespa/persistence/dummyimpl/dummypersistence.h>
+#include <vespa/storageserver/app/servicelayerprocess.h>
+
+namespace storage {
+
+class DummyServiceLayerProcess : public ServiceLayerProcess {
+ spi::dummy::DummyPersistence::UP _provider;
+
+public:
+ DummyServiceLayerProcess(const config::ConfigUri & configUri);
+ ~DummyServiceLayerProcess() { shutdown(); }
+
+ virtual void shutdown();
+ virtual void setupProvider();
+ virtual spi::PersistenceProvider& getProvider() { return *_provider; }
+};
+
+} // storage
+
diff --git a/storageserver/src/vespa/storageserver/app/memfileservicelayerprocess.cpp b/storageserver/src/vespa/storageserver/app/memfileservicelayerprocess.cpp
new file mode 100644
index 00000000000..c85a3af8a69
--- /dev/null
+++ b/storageserver/src/vespa/storageserver/app/memfileservicelayerprocess.cpp
@@ -0,0 +1,109 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/fastos/fastos.h>
+#include <vespa/storageserver/app/memfileservicelayerprocess.h>
+
+#include <vespa/log/log.h>
+
+LOG_SETUP(".process.servicelayer");
+
+namespace storage {
+
+// MemFileServiceLayerProcess implementation
+
+MemFileServiceLayerProcess::MemFileServiceLayerProcess(
+ const config::ConfigUri & configUri)
+ : ServiceLayerProcess(configUri),
+ _changed(false)
+{
+}
+
+void
+MemFileServiceLayerProcess::shutdown()
+{
+ ServiceLayerProcess::shutdown();
+ _provider.reset(0);
+}
+
+void
+MemFileServiceLayerProcess::setupConfig(uint64_t subscribeTimeout)
+{
+ ServiceLayerProcess::setupConfig(subscribeTimeout);
+ _configFetcher.reset(new config::ConfigFetcher(_configUri.getContext()));
+ _configFetcher->subscribe<vespa::config::storage::StorDevicesConfig>(_configUri.getConfigId(), this, subscribeTimeout);
+ _configFetcher->subscribe<vespa::config::storage::StorMemfilepersistenceConfig>(_configUri.getConfigId(), this, subscribeTimeout);
+ _configFetcher->subscribe<vespa::config::content::PersistenceConfig>(_configUri.getConfigId(), this, subscribeTimeout);
+ _configFetcher->start();
+}
+
+void
+MemFileServiceLayerProcess::removeConfigSubscriptions()
+{
+ _configFetcher.reset(0);
+}
+
+void
+MemFileServiceLayerProcess::setupProvider()
+{
+ _provider.reset(new memfile::MemFilePersistenceProvider(
+ _context.getComponentRegister(), _configUri));
+ _provider->setDocumentRepo(*getTypeRepo());
+}
+
+bool
+MemFileServiceLayerProcess::configUpdated()
+{
+ if (ServiceLayerProcess::configUpdated()) return true;
+ vespalib::LockGuard guard(_lock);
+ return _changed;
+}
+
+void
+MemFileServiceLayerProcess::updateConfig()
+{
+ ServiceLayerProcess::updateConfig();
+ LOG(info, "Config updated. Sending new config to memfile provider");
+ vespalib::LockGuard guard(_lock);
+ if (_changed) {
+ LOG(debug, "Memfile or device config changed too.");
+ if (_nextMemfilepersistence) {
+ _provider->setConfig(std::move(_nextMemfilepersistence));
+ }
+ if (_nextPersistence) {
+ _provider->setConfig(std::move(_nextPersistence));
+ }
+ if (_nextDevices) {
+ _provider->setConfig(std::move(_nextDevices));
+ }
+ }
+ _provider->setDocumentRepo(*getTypeRepo());
+ _changed = false;
+}
+
+void
+MemFileServiceLayerProcess::configure(
+ std::unique_ptr<vespa::config::storage::StorMemfilepersistenceConfig> config)
+{
+ vespalib::LockGuard guard(_lock);
+ _nextMemfilepersistence = std::move(config);
+ _changed = true;
+}
+
+void
+MemFileServiceLayerProcess::configure(
+ std::unique_ptr<vespa::config::content::PersistenceConfig> config)
+{
+ vespalib::LockGuard guard(_lock);
+ _nextPersistence = std::move(config);
+ _changed = true;
+}
+
+void
+MemFileServiceLayerProcess::configure(std::unique_ptr<vespa::config::storage::StorDevicesConfig> config)
+{
+ vespalib::LockGuard guard(_lock);
+ _nextDevices = std::move(config);
+ _changed = true;
+}
+
+} // storage
diff --git a/storageserver/src/vespa/storageserver/app/memfileservicelayerprocess.h b/storageserver/src/vespa/storageserver/app/memfileservicelayerprocess.h
new file mode 100644
index 00000000000..2e365f3f1a2
--- /dev/null
+++ b/storageserver/src/vespa/storageserver/app/memfileservicelayerprocess.h
@@ -0,0 +1,59 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+/**
+ * \class storage::ServiceLayerProcess
+ *
+ * \brief A process running a service layer.
+ */
+/**
+ * \class storage::MemFileServiceLayerProcess
+ *
+ * \brief A process running a service layer with memfile persistence provider.
+ */
+/**
+ * \class storage::RpcServiceLayerProcess
+ *
+ * \brief A process running a service layer with RPC persistence provider.
+ */
+#pragma once
+
+#include <vespa/memfilepersistence/spi/memfilepersistenceprovider.h>
+#include <vespa/storageserver/app/servicelayerprocess.h>
+#include <vespa/vespalib/util/sync.h>
+
+namespace storage {
+
+class MemFileServiceLayerProcess
+ : public ServiceLayerProcess,
+ public config::IFetcherCallback<vespa::config::storage::StorMemfilepersistenceConfig>,
+ public config::IFetcherCallback<vespa::config::storage::StorDevicesConfig>,
+ public config::IFetcherCallback<vespa::config::content::PersistenceConfig>
+{
+ bool _changed;
+ std::unique_ptr<config::ConfigFetcher> _configFetcher;
+ std::unique_ptr<vespa::config::storage::StorMemfilepersistenceConfig> _nextMemfilepersistence;
+ std::unique_ptr<vespa::config::storage::StorDevicesConfig> _nextDevices;
+ std::unique_ptr<vespa::config::content::PersistenceConfig> _nextPersistence;
+ memfile::MemFilePersistenceProvider::UP _provider;
+ vespalib::Lock _lock;
+
+public:
+ MemFileServiceLayerProcess(const config::ConfigUri & configUri);
+ ~MemFileServiceLayerProcess() { shutdown(); }
+
+ virtual void shutdown();
+
+ void setupConfig(uint64_t subscribeTimeout);
+ virtual void removeConfigSubscriptions();
+ virtual void setupProvider();
+ virtual bool configUpdated();
+ virtual void updateConfig();
+
+ virtual spi::PersistenceProvider& getProvider() { return *_provider; }
+
+ void configure(std::unique_ptr<vespa::config::storage::StorMemfilepersistenceConfig> config);
+ void configure(std::unique_ptr<vespa::config::storage::StorDevicesConfig> config);
+ void configure(std::unique_ptr<vespa::config::content::PersistenceConfig> config);
+};
+
+} // storage
+
diff --git a/storageserver/src/vespa/storageserver/app/process.cpp b/storageserver/src/vespa/storageserver/app/process.cpp
new file mode 100644
index 00000000000..fc537f06590
--- /dev/null
+++ b/storageserver/src/vespa/storageserver/app/process.cpp
@@ -0,0 +1,66 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/fastos/fastos.h>
+#include <vespa/storageserver/app/process.h>
+
+//#include <vespa/config/helper/legacy.h>
+#include <vespa/log/log.h>
+
+LOG_SETUP(".process");
+
+namespace storage {
+
+Process::Process(const config::ConfigUri & configUri)
+ : _configUri(configUri),
+ _configSubscriber(_configUri.getContext())
+{
+}
+
+void
+Process::setupConfig(uint64_t subscribeTimeout)
+{
+ _documentHandler = _configSubscriber.subscribe<document::DocumenttypesConfig>(_configUri.getConfigId(), subscribeTimeout);
+ if (!_configSubscriber.nextConfig()) {
+ throw vespalib::TimeoutException(
+ "Could not subscribe to document config within timeout");
+ }
+ _repos.push_back(document::DocumentTypeRepo::SP(
+ new document::DocumentTypeRepo(*_documentHandler->getConfig())));
+ getContext().getComponentRegister().setDocumentTypeRepo(_repos.back());
+}
+
+bool
+Process::configUpdated()
+{
+ _configSubscriber.nextGeneration(0);
+ if (_documentHandler->isChanged()) {
+ LOG(info, "Document config detected changed");
+ return true;
+ }
+ return false;
+}
+
+void
+Process::updateConfig()
+{
+ if (_documentHandler->isChanged()) {
+ _repos.push_back(document::DocumentTypeRepo::SP(
+ new document::DocumentTypeRepo(
+ *_documentHandler->getConfig())));
+ getNode().setNewDocumentRepo(_repos.back());
+ }
+}
+
+void
+Process::shutdown()
+{
+ removeConfigSubscriptions();
+}
+
+int64_t
+Process::getGeneration() const
+{
+ return _configSubscriber.getGeneration();
+}
+
+} // storage
diff --git a/storageserver/src/vespa/storageserver/app/process.h b/storageserver/src/vespa/storageserver/app/process.h
new file mode 100644
index 00000000000..85e30e73687
--- /dev/null
+++ b/storageserver/src/vespa/storageserver/app/process.h
@@ -0,0 +1,56 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+/**
+ * \class storage::Process
+ *
+ * \brief Storage process as a library.
+ *
+ * A class with a main function cannot be tested within C++ code. This class
+ * contains the process as a library such that it can be tested and used in
+ * other pieces of code.
+ *
+ * Specializations of this class will exist to add the funcionality needed for
+ * the various process types.
+ */
+
+#pragma once
+
+#include <vespa/document/datatype/documenttype.h>
+#include <vespa/storage/storageserver/applicationgenerationfetcher.h>
+#include <vespa/storage/storageserver/distributornode.h>
+#include <vespa/storage/storageserver/servicelayernode.h>
+#include <vespa/config/config.h>
+
+namespace storage {
+
+class Process : public ApplicationGenerationFetcher {
+protected:
+ config::ConfigUri _configUri;
+ document::DocumentTypeRepo::SP getTypeRepo() { return _repos.back(); }
+ config::ConfigSubscriber _configSubscriber;
+
+private:
+ config::ConfigHandle<document::DocumenttypesConfig>::UP _documentHandler;
+ std::vector<document::DocumentTypeRepo::SP> _repos;
+
+public:
+ typedef std::unique_ptr<Process> UP;
+
+ Process(const config::ConfigUri & configUri);
+ virtual ~Process() {}
+
+ virtual void setupConfig(uint64_t subscribeTimeout);
+ virtual void createNode() = 0;
+ virtual bool configUpdated();
+ virtual void updateConfig();
+
+ virtual void shutdown();
+ virtual void removeConfigSubscriptions() {}
+
+ virtual StorageNode& getNode() = 0;
+ virtual StorageNodeContext& getContext() = 0;
+
+ virtual int64_t getGeneration() const;
+};
+
+} // storage
+
diff --git a/storageserver/src/vespa/storageserver/app/rpcservicelayerprocess.cpp b/storageserver/src/vespa/storageserver/app/rpcservicelayerprocess.cpp
new file mode 100644
index 00000000000..cca173ba04d
--- /dev/null
+++ b/storageserver/src/vespa/storageserver/app/rpcservicelayerprocess.cpp
@@ -0,0 +1,44 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/fastos/fastos.h>
+#include <vespa/storageserver/app/rpcservicelayerprocess.h>
+
+#include <vespa/log/log.h>
+
+LOG_SETUP(".process.servicelayer");
+
+namespace storage {
+
+// RpcServiceLayerProcess implementation
+
+RpcServiceLayerProcess::RpcServiceLayerProcess(const config::ConfigUri & configUri)
+ : ServiceLayerProcess(configUri)
+{
+}
+
+void
+RpcServiceLayerProcess::shutdown()
+{
+ ServiceLayerProcess::shutdown();
+ _provider.reset(0);
+}
+
+void
+RpcServiceLayerProcess::setupProvider()
+{
+ std::unique_ptr<vespa::config::content::core::StorServerConfig> serverConfig =
+ config::ConfigGetter<vespa::config::content::core::StorServerConfig>::getConfig(_configUri.getConfigId(), _configUri.getContext());
+
+ _provider.reset(new spi::ProviderProxy(
+ serverConfig->persistenceProvider.rpc.connectspec, *getTypeRepo()));
+}
+
+void
+RpcServiceLayerProcess::updateConfig()
+{
+ ServiceLayerProcess::updateConfig();
+ LOG(info, "Config updated. Sending new config to RPC proxy provider");
+ _provider->setRepo(*getTypeRepo());
+}
+
+} // storage
diff --git a/storageserver/src/vespa/storageserver/app/rpcservicelayerprocess.h b/storageserver/src/vespa/storageserver/app/rpcservicelayerprocess.h
new file mode 100644
index 00000000000..70f843ae597
--- /dev/null
+++ b/storageserver/src/vespa/storageserver/app/rpcservicelayerprocess.h
@@ -0,0 +1,28 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+/**
+ * \class storage::RpcServiceLayerProcess
+ *
+ * \brief A process running a service layer with RPC persistence provider.
+ */
+#pragma once
+
+#include <vespa/persistence/proxy/providerproxy.h>
+#include <vespa/storageserver/app/servicelayerprocess.h>
+
+namespace storage {
+
+class RpcServiceLayerProcess : public ServiceLayerProcess {
+ spi::ProviderProxy::UP _provider;
+
+public:
+ RpcServiceLayerProcess(const config::ConfigUri & configUri);
+ ~RpcServiceLayerProcess() { shutdown(); }
+
+ virtual void shutdown();
+ virtual void setupProvider();
+ virtual void updateConfig();
+ virtual spi::PersistenceProvider& getProvider() { return *_provider; }
+};
+
+} // storage
+
diff --git a/storageserver/src/vespa/storageserver/app/servicelayerprocess.cpp b/storageserver/src/vespa/storageserver/app/servicelayerprocess.cpp
new file mode 100644
index 00000000000..52f47531c78
--- /dev/null
+++ b/storageserver/src/vespa/storageserver/app/servicelayerprocess.cpp
@@ -0,0 +1,38 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/fastos/fastos.h>
+#include <vespa/storageserver/app/servicelayerprocess.h>
+
+#include <vespa/log/log.h>
+#include <vespa/searchvisitor/searchvisitor.h>
+#include <vespa/storage/storageutil/utils.h>
+
+LOG_SETUP(".process.servicelayer");
+
+namespace storage {
+
+// ServiceLayerProcess implementation
+
+ServiceLayerProcess::ServiceLayerProcess(const config::ConfigUri & configUri)
+ : Process(configUri)
+{
+}
+
+void
+ServiceLayerProcess::shutdown()
+{
+ Process::shutdown();
+ _node.reset(0);
+}
+
+void
+ServiceLayerProcess::createNode()
+{
+ _externalVisitors["searchvisitor"].reset(new SearchVisitorFactory(_configUri));
+ setupProvider();
+ _node.reset(new ServiceLayerNode(
+ _configUri, _context, *this, getProvider(), _externalVisitors));
+ _node->init();
+}
+
+} // storage
diff --git a/storageserver/src/vespa/storageserver/app/servicelayerprocess.h b/storageserver/src/vespa/storageserver/app/servicelayerprocess.h
new file mode 100644
index 00000000000..d1ac919a674
--- /dev/null
+++ b/storageserver/src/vespa/storageserver/app/servicelayerprocess.h
@@ -0,0 +1,50 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+/**
+ * \class storage::ServiceLayerProcess
+ *
+ * \brief A process running a service layer.
+ */
+/**
+ * \class storage::MemFileServiceLayerProcess
+ *
+ * \brief A process running a service layer with memfile persistence provider.
+ */
+/**
+ * \class storage::RpcServiceLayerProcess
+ *
+ * \brief A process running a service layer with RPC persistence provider.
+ */
+#pragma once
+
+#include <vespa/storageserver/app/process.h>
+#include <vespa/config/config.h>
+#include <vespa/config/helper/configfetcher.h>
+#include <vespa/config-persistence.h>
+
+namespace storage {
+
+class ServiceLayerProcess : public Process {
+ VisitorFactory::Map _externalVisitors;
+ ServiceLayerNode::UP _node;
+
+protected:
+ ServiceLayerNodeContext _context;
+
+public:
+ ServiceLayerProcess(const config::ConfigUri & configUri);
+
+ virtual void shutdown();
+
+ virtual void setupProvider() = 0;
+ virtual spi::PersistenceProvider& getProvider() = 0;
+
+ virtual void createNode();
+
+ virtual StorageNode& getNode() { return *_node; }
+ virtual StorageNodeContext& getContext() { return _context; }
+
+ virtual std::string getComponentName() const { return "servicelayer"; }
+};
+
+} // storage
+