aboutsummaryrefslogtreecommitdiffstats
path: root/storageserver
diff options
context:
space:
mode:
authorGeir Storli <geirst@verizonmedia.com>2019-05-28 08:30:38 +0000
committerGeir Storli <geirst@verizonmedia.com>2019-05-28 08:30:38 +0000
commitd2f4c5c243367a39f94c240eed3da3649ef4a455 (patch)
treeb7486c6df03dfa8c83a68c060261522e968bdaec /storageserver
parent0f2dbc91395a97d471d8e81adee6628bd062dc13 (diff)
Rewrite storageserver tests from cppunit to gtest.
Also remove a set of tests that have been ignored / disabled for 8 years. These were highly unstable integration tests. System tests should cover such cases instead.
Diffstat (limited to 'storageserver')
-rw-r--r--storageserver/src/tests/.gitignore3
-rw-r--r--storageserver/src/tests/CMakeLists.txt12
-rw-r--r--storageserver/src/tests/dummystoragelink.cpp182
-rw-r--r--storageserver/src/tests/dummystoragelink.h113
-rw-r--r--storageserver/src/tests/gtest_runner.cpp8
-rw-r--r--storageserver/src/tests/storageservertest.cpp1034
-rw-r--r--storageserver/src/tests/testhelper.cpp10
-rw-r--r--storageserver/src/tests/testhelper.h37
-rw-r--r--storageserver/src/tests/testrunner.cpp12
9 files changed, 55 insertions, 1356 deletions
diff --git a/storageserver/src/tests/.gitignore b/storageserver/src/tests/.gitignore
index 101f84131dc..3674aed6117 100644
--- a/storageserver/src/tests/.gitignore
+++ b/storageserver/src/tests/.gitignore
@@ -1,6 +1,5 @@
/Makefile
/dirconfig.tmp
/test.vlog
-/testrunner
/vdsroot
-storageserver_testrunner_app
+storageserver_gtest_runner_app
diff --git a/storageserver/src/tests/CMakeLists.txt b/storageserver/src/tests/CMakeLists.txt
index 21412e4bc33..9c475543b81 100644
--- a/storageserver/src/tests/CMakeLists.txt
+++ b/storageserver/src/tests/CMakeLists.txt
@@ -1,17 +1,17 @@
# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-vespa_add_executable(storageserver_testrunner_app TEST
+
+vespa_add_executable(storageserver_gtest_runner_app TEST
SOURCES
storageservertest.cpp
testhelper.cpp
- dummystoragelink.cpp
- testrunner.cpp
+ gtest_runner.cpp
DEPENDS
storageserver_storageapp
vdstestlib
- searchlib_searchlib_uca
+ gtest
)
vespa_add_test(
- NAME storageserver_testrunner_app
- COMMAND storageserver_testrunner_app
+ NAME storageserver_gtest_runner_app
+ COMMAND storageserver_gtest_runner_app
)
diff --git a/storageserver/src/tests/dummystoragelink.cpp b/storageserver/src/tests/dummystoragelink.cpp
deleted file mode 100644
index c8b81e51702..00000000000
--- a/storageserver/src/tests/dummystoragelink.cpp
+++ /dev/null
@@ -1,182 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <tests/dummystoragelink.h>
-#include <vespa/storageframework/defaultimplementation/clock/realclock.h>
-#include <sys/time.h>
-#include <vespa/vespalib/util/exceptions.h>
-
-namespace storage {
-
-DummyStorageLink* DummyStorageLink::_last(0);
-
-DummyStorageLink::DummyStorageLink()
- : StorageLink("Dummy storage link"),
- _commands(),
- _replies(),
- _injected(),
- _autoReply(false),
- _useDispatch(false),
- _ignore(false),
- _waitMonitor()
-{
- _last = this;
-}
-
-DummyStorageLink::~DummyStorageLink()
-{
- // Often a chain with dummy link on top is deleted in unit tests.
- // If they haven't been closed already, close them for a cleaner
- // shutdown
- if (getState() == OPENED) {
- close();
- flush();
- }
- closeNextLink();
- reset();
-}
-
-bool DummyStorageLink::onDown(const api::StorageMessage::SP& cmd)
-{
- if (_ignore) {
- return false;
- }
- if (_injected.size() > 0) {
- vespalib::LockGuard guard(_lock);
- sendUp(*_injected.begin());
- _injected.pop_front();
- } else if (_autoReply) {
- if (!cmd->getType().isReply()) {
- std::shared_ptr<api::StorageReply> reply(
- std::dynamic_pointer_cast<api::StorageCommand>(cmd)
- ->makeReply().release());
- reply->setResult(api::ReturnCode(
- api::ReturnCode::OK, "Automatically generated reply"));
- sendUp(reply);
- }
- }
- if (isBottom()) {
- vespalib::MonitorGuard lock(_waitMonitor);
- {
- vespalib::LockGuard guard(_lock);
- _commands.push_back(cmd);
- }
- lock.broadcast();
- return true;
- }
- return StorageLink::onDown(cmd);
-}
-
-bool DummyStorageLink::onUp(const api::StorageMessage::SP& reply) {
- if (isTop()) {
- vespalib::MonitorGuard lock(_waitMonitor);
- {
- vespalib::LockGuard guard(_lock);
- _replies.push_back(reply);
- }
- lock.broadcast();
- return true;
- }
- return StorageLink::onUp(reply);
-
-}
-
-void DummyStorageLink::injectReply(api::StorageReply* reply)
-{
- assert(reply);
- vespalib::LockGuard guard(_lock);
- _injected.push_back(std::shared_ptr<api::StorageReply>(reply));
-}
-
-void DummyStorageLink::reset() {
- vespalib::MonitorGuard lock(_waitMonitor);
- vespalib::LockGuard guard(_lock);
- _commands.clear();
- _replies.clear();
- _injected.clear();
-}
-
-void DummyStorageLink::waitForMessages(unsigned int msgCount, int timeout)
-{
- framework::defaultimplementation::RealClock clock;
- framework::MilliSecTime endTime(
- clock.getTimeInMillis() + framework::MilliSecTime(timeout * 1000));
- vespalib::MonitorGuard lock(_waitMonitor);
- while (_commands.size() + _replies.size() < msgCount) {
- if (timeout != 0 && clock.getTimeInMillis() > endTime) {
- std::ostringstream ost;
- ost << "Timed out waiting for " << msgCount << " messages to "
- << "arrive in dummy storage link. Only "
- << (_commands.size() + _replies.size()) << " messages seen "
- << "after timout of " << timeout << " seconds was reached.";
- throw vespalib::IllegalStateException(ost.str(), VESPA_STRLOC);
- }
- if (timeout >= 0) {
- lock.wait((endTime - clock.getTimeInMillis()).getTime());
- } else {
- lock.wait();
- }
- }
-}
-
-void DummyStorageLink::waitForMessage(const api::MessageType& type, int timeout)
-{
- framework::defaultimplementation::RealClock clock;
- framework::MilliSecTime endTime(
- clock.getTimeInMillis() + framework::MilliSecTime(timeout * 1000));
- vespalib::MonitorGuard lock(_waitMonitor);
- while (true) {
- for (uint32_t i=0; i<_commands.size(); ++i) {
- if (_commands[i]->getType() == type) return;
- }
- for (uint32_t i=0; i<_replies.size(); ++i) {
- if (_replies[i]->getType() == type) return;
- }
- if (timeout != 0 && clock.getTimeInMillis() > endTime) {
- std::ostringstream ost;
- ost << "Timed out waiting for " << type << " message to "
- << "arrive in dummy storage link. Only "
- << (_commands.size() + _replies.size()) << " messages seen "
- << "after timout of " << timeout << " seconds was reached.";
- if (_commands.size() == 1) {
- ost << " Found command of type " << _commands[0]->getType();
- }
- if (_replies.size() == 1) {
- ost << " Found command of type " << _replies[0]->getType();
- }
- throw vespalib::IllegalStateException(ost.str(), VESPA_STRLOC);
- }
- if (timeout >= 0) {
- lock.wait((endTime - clock.getTimeInMillis()).getTime());
- } else {
- lock.wait();
- }
- }
-}
-
-api::StorageMessage::SP
-DummyStorageLink::getAndRemoveMessage(const api::MessageType& type)
-{
- vespalib::MonitorGuard lock(_waitMonitor);
- for (std::vector<api::StorageMessage::SP>::iterator it = _commands.begin();
- it != _commands.end(); ++it)
- {
- if ((*it)->getType() == type) {
- api::StorageMessage::SP result(*it);
- _commands.erase(it);
- return result;
- }
- }
- for (std::vector<api::StorageMessage::SP>::iterator it = _replies.begin();
- it != _replies.end(); ++it)
- {
- if ((*it)->getType() == type) {
- api::StorageMessage::SP result(*it);
- _replies.erase(it);
- return result;
- }
- }
- std::ostringstream ost;
- ost << "No message of type " << type << " found.";
- throw vespalib::IllegalStateException(ost.str(), VESPA_STRLOC);
-}
-
-} // storage
diff --git a/storageserver/src/tests/dummystoragelink.h b/storageserver/src/tests/dummystoragelink.h
deleted file mode 100644
index 0670a3cfa7a..00000000000
--- a/storageserver/src/tests/dummystoragelink.h
+++ /dev/null
@@ -1,113 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#pragma once
-
-
-#include <vespa/storage/common/storagelink.h>
-#include <vespa/storage/common/bucketmessages.h>
-#include <vespa/storageapi/message/internal.h>
-#include <vespa/storageapi/messageapi/storagecommand.h>
-#include <vespa/vespalib/util/sync.h>
-#include <list>
-#include <sstream>
-
-class FastOS_ThreadPool;
-
-namespace storage {
-
-class DummyStorageLink : public StorageLink {
-
- mutable vespalib::Lock _lock; // to protect below containers:
- std::vector<api::StorageMessage::SP> _commands;
- std::vector<api::StorageMessage::SP> _replies;
- std::list<api::StorageMessage::SP> _injected;
-
- bool _autoReply;
- bool _useDispatch;
- bool _ignore;
- static DummyStorageLink* _last;
- vespalib::Monitor _waitMonitor;
-
-public:
- DummyStorageLink();
- ~DummyStorageLink();
-
- bool onDown(const api::StorageMessage::SP&) override;
- bool onUp(const api::StorageMessage::SP&) override;
-
- void addOnTopOfChain(StorageLink& link) {
- link.addTestLinkOnTop(this);
- }
-
- void print(std::ostream& ost, bool verbose, const std::string& indent) const override {
- (void) verbose;
- ost << indent << "DummyStorageLink("
- << "autoreply = " << (_autoReply ? "on" : "off")
- << ", dispatch = " << (_useDispatch ? "on" : "off")
- << ", " << _commands.size() << " commands"
- << ", " << _replies.size() << " replies";
- if (_injected.size() > 0)
- ost << ", " << _injected.size() << " injected";
- ost << ")";
- }
-
- void injectReply(api::StorageReply* reply);
- void reset();
- void setAutoreply(bool autoReply) { _autoReply = autoReply; }
- void setIgnore(bool ignore) { _ignore = ignore; }
- // Timeout is given in seconds
- void waitForMessages(unsigned int msgCount = 1, int timeout = -1);
- // Wait for a single message of a given type
- void waitForMessage(const api::MessageType&, int timeout = -1);
-
- api::StorageMessage::SP getCommand(size_t i) const {
- vespalib::LockGuard guard(_lock);
- api::StorageMessage::SP ret = _commands[i];
- return ret;
- }
- api::StorageMessage::SP getReply(size_t i) const {
- vespalib::LockGuard guard(_lock);
- api::StorageMessage::SP ret = _replies[i];
- return ret;
- }
- size_t getNumCommands() const {
- vespalib::LockGuard guard(_lock);
- return _commands.size();
- }
- size_t getNumReplies() const {
- vespalib::LockGuard guard(_lock);
- return _replies.size();
- }
-
- const std::vector<api::StorageMessage::SP>& getCommands() const
- { return _commands; }
- const std::vector<api::StorageMessage::SP>& getReplies() const
- { return _replies; }
-
- std::vector<api::StorageMessage::SP> getCommandsOnce() {
- vespalib::MonitorGuard lock(_waitMonitor);
- std::vector<api::StorageMessage::SP> retval;
- {
- vespalib::LockGuard guard(_lock);
- retval.swap(_commands);
- }
- return retval;
- }
-
- std::vector<api::StorageMessage::SP> getRepliesOnce() {
- vespalib::MonitorGuard lock(_waitMonitor);
- std::vector<api::StorageMessage::SP> retval;
- {
- vespalib::LockGuard guard(_lock);
- retval.swap(_replies);
- }
- return retval;
- }
-
- api::StorageMessage::SP getAndRemoveMessage(const api::MessageType&);
-
- static DummyStorageLink* getLast() { return _last; }
-};
-
-}
-
diff --git a/storageserver/src/tests/gtest_runner.cpp b/storageserver/src/tests/gtest_runner.cpp
new file mode 100644
index 00000000000..422b4fd5a73
--- /dev/null
+++ b/storageserver/src/tests/gtest_runner.cpp
@@ -0,0 +1,8 @@
+// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/vespalib/gtest/gtest.h>
+
+#include <vespa/log/log.h>
+LOG_SETUP("storageserver_gtest_runner");
+
+GTEST_MAIN_RUN_ALL_TESTS()
diff --git a/storageserver/src/tests/storageservertest.cpp b/storageserver/src/tests/storageservertest.cpp
index 413acf89f27..5e50fff8f94 100644
--- a/storageserver/src/tests/storageservertest.cpp
+++ b/storageserver/src/tests/storageservertest.cpp
@@ -1,186 +1,62 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <vespa/storage/storageserver/servicelayernode.h>
-#include <vespa/storage/storageserver/distributornode.h>
-
-#include <vespa/document/base/testdocman.h>
-#include <vespa/document/test/make_document_bucket.h>
-#include <vespa/documentapi/documentapi.h>
-#include <vespa/messagebus/rpcmessagebus.h>
-#include <vespa/messagebus/network/rpcnetworkparams.h>
-#include <vespa/messagebus/staticthrottlepolicy.h>
-#include <vespa/messagebus/testlib/slobrok.h>
-#include <vespa/storageapi/mbusprot/storagecommand.h>
-#include <vespa/storageapi/mbusprot/storagereply.h>
-#include <vespa/storageapi/message/bucketsplitting.h>
-#include <vespa/storageapi/message/state.h>
-#include <vespa/storage/common/statusmetricconsumer.h>
#include <tests/testhelper.h>
-#include <tests/dummystoragelink.h>
-#include <vespa/slobrok/sbmirror.h>
+#include <vespa/storage/storageserver/distributornode.h>
+#include <vespa/storage/storageserver/servicelayernode.h>
#include <vespa/storageserver/app/distributorprocess.h>
#include <vespa/storageserver/app/dummyservicelayerprocess.h>
-#include <vespa/vespalib/util/exceptions.h>
-#include <vespa/fnet/frt/supervisor.h>
-#include <sys/time.h>
+#include <vespa/vespalib/gtest/gtest.h>
#include <vespa/log/log.h>
LOG_SETUP(".storageservertest");
-using document::test::makeDocumentBucket;
-
namespace storage {
-namespace {
-
-uint64_t getTimeInMillis() {
- struct timeval t;
- gettimeofday(&t, 0);
- return (t.tv_sec * uint64_t(1000)) + (t.tv_usec / uint64_t(1000));
-}
-
-class SlobrokMirror {
- config::ConfigUri config;
- fnet::frt::StandaloneFRT visor;
- std::unique_ptr<slobrok::api::MirrorAPI> mirror;
-
-public:
- SlobrokMirror(const config::ConfigUri &cfg) : config(cfg) {}
-
- void init(uint32_t timeoutms) {
- uint64_t timeout = getTimeInMillis() + timeoutms;
- mirror.reset(new slobrok::api::MirrorAPI(visor.supervisor(), config));
- while (!mirror->ready()) {
- if (getTimeInMillis() > timeout)
- throw vespalib::IllegalStateException(
- "Failed to initialize slobrok mirror within "
- "timeout.", VESPA_STRLOC);
- FastOS_Thread::Sleep(1);
- }
- }
-
- slobrok::api::MirrorAPI &getMirror() {
- if (mirror.get() == 0)
- throw vespalib::IllegalStateException(
- "You need to call init() before you can fetch mirror");
- return *mirror;
- }
-
- FRT_Supervisor &getSupervisor() {
- if (mirror.get() == 0)
- throw vespalib::IllegalStateException(
- "You need to call init() before you can fetch supervisor");
- return visor.supervisor();
- }
-
- ~SlobrokMirror() = default;
-};
-
-}
-
-struct StorageServerTest : public CppUnit::TestFixture {
- std::unique_ptr<FastOS_ThreadPool> threadPool;
- std::unique_ptr<document::TestDocMan> docMan;
+struct StorageServerTest : public ::testing::Test {
std::unique_ptr<mbus::Slobrok> slobrok;
std::unique_ptr<vdstestlib::DirConfig> distConfig;
std::unique_ptr<vdstestlib::DirConfig> storConfig;
- std::unique_ptr<SlobrokMirror> slobrokMirror;
StorageServerTest();
~StorageServerTest();
- void setUp() override;
- void tearDown() override;
-
- void testNormalUsage();
- void testPortOverlap_Stress();
- void testShutdownDuringDiskLoad(bool storagenode);
- void testShutdownStorageDuringDiskLoad();
- void testShutdownDistributorDuringDiskLoad();
- void testShutdownAfterDiskFailure_Stress();
- void testPriorityAndQueueSneakingWhileSplitJoinStressTest();
- void testStatusPages();
-
- CPPUNIT_TEST_SUITE(StorageServerTest);
- CPPUNIT_TEST(testNormalUsage);
- CPPUNIT_TEST_IGNORED(testPortOverlap_Stress);
- CPPUNIT_TEST_IGNORED(testShutdownStorageDuringDiskLoad);
- CPPUNIT_TEST_IGNORED(testShutdownDistributorDuringDiskLoad);
- CPPUNIT_TEST_IGNORED(testShutdownAfterDiskFailure_Stress);
+ void SetUp() override;
+ void TearDown() override;
- CPPUNIT_TEST_DISABLED(testPriorityAndQueueSneakingWhileSplitJoinStressTest);
-
- // Doesn't work in new framework. Will investigate as soon as there's time
- CPPUNIT_TEST_DISABLED(testStatusPages);
- CPPUNIT_TEST_SUITE_END();
};
StorageServerTest::StorageServerTest() = default;
StorageServerTest::~StorageServerTest() = default;
-CPPUNIT_TEST_SUITE_REGISTRATION(StorageServerTest);
-
namespace {
- template<typename T>
- struct ConfigReader : public config::IFetcherCallback<T>,
- public T
- {
- ConfigReader(const std::string& configId) {
- config::LegacySubscriber subscription;
- subscription.subscribe<T>(configId, this);
- }
- void configure(std::unique_ptr<document::DocumenttypesConfig> c)
- {
- static_cast<T&>(*this) = *c;
- }
- };
-
- struct Node {
- virtual ~Node() {}
- virtual StorageNode& getNode() = 0;
- virtual StorageNodeContext& getContext() = 0;
-
- bool attemptedStopped()
- { return getNode().attemptedStopped(); }
- void waitUntilInitialized(uint32_t timeout)
- { getNode().waitUntilInitialized(timeout); }
- StorageLink* getChain() { return getNode().getChain(); }
- void requestShutdown(const std::string& reason)
- { getNode().requestShutdown(reason); }
- const framework::StatusReporter* getStatusReporter(const std::string& i)
- { return getContext().getComponentRegister().getStatusReporter(i); }
- NodeStateUpdater& getStateUpdater()
- { return getContext().getComponentRegister().getNodeStateUpdater(); }
- };
+struct Node {
+ virtual ~Node() {}
+ virtual StorageNode& getNode() = 0;
+ virtual StorageNodeContext& getContext() = 0;
+};
- struct Distributor : public Node {
- DistributorProcess _process;
+struct Distributor : public Node {
+ DistributorProcess _process;
- Distributor(vdstestlib::DirConfig& config);
- ~Distributor();
+ Distributor(vdstestlib::DirConfig& config);
+ ~Distributor();
- virtual StorageNode& getNode() override { return _process.getNode(); }
- virtual StorageNodeContext& getContext() override
- { return _process.getContext(); }
- };
+ virtual StorageNode& getNode() override { return _process.getNode(); }
+ virtual StorageNodeContext& getContext() override { return _process.getContext(); }
+};
- struct Storage : public Node {
- DummyServiceLayerProcess _process;
- StorageComponent::UP _component;
+struct Storage : public Node {
+ DummyServiceLayerProcess _process;
+ StorageComponent::UP _component;
- Storage(vdstestlib::DirConfig& config);
- ~Storage();
+ Storage(vdstestlib::DirConfig& config);
+ ~Storage();
- virtual StorageNode& getNode() override { return _process.getNode(); }
- virtual StorageNodeContext& getContext() override
- { return _process.getContext(); }
- spi::PartitionStateList getPartitions()
- { return _process.getProvider().getPartitionStates().getList(); }
- uint16_t getDiskCount() { return getPartitions().size(); }
- StorageComponent& getComponent() { return *_component; }
- };
+ virtual StorageNode& getNode() override { return _process.getNode(); }
+ virtual StorageNodeContext& getContext() override { return _process.getContext(); }
+};
Distributor::Distributor(vdstestlib::DirConfig& config)
: _process(config.getConfigId())
@@ -188,7 +64,8 @@ Distributor::Distributor(vdstestlib::DirConfig& config)
_process.setupConfig(60000);
_process.createNode();
}
-Distributor::~Distributor() {}
+
+Distributor::~Distributor() = default;
Storage::Storage(vdstestlib::DirConfig& config)
: _process(config.getConfigId())
@@ -198,15 +75,14 @@ Storage::Storage(vdstestlib::DirConfig& config)
_component.reset(new StorageComponent(
getContext().getComponentRegister(), "test"));
}
-Storage::~Storage() {}
+
+Storage::~Storage() = default;
}
void
-StorageServerTest::setUp()
+StorageServerTest::SetUp()
{
- threadPool.reset(new FastOS_ThreadPool(128 * 1024));
- docMan.reset(new document::TestDocMan);
[[maybe_unused]] int systemResult = system("chmod -R 755 vdsroot");
systemResult = system("rm -rf vdsroot*");
slobrok.reset(new mbus::Slobrok);
@@ -217,858 +93,24 @@ StorageServerTest::setUp()
storConfig->getConfig("stor-filestor").set("fail_disk_after_error_count", "1");
systemResult = system("mkdir -p vdsroot/disks/d0");
systemResult = system("mkdir -p vdsroot.distributor");
- slobrokMirror.reset(new SlobrokMirror(slobrok->config()));
-}
-
-void
-StorageServerTest::tearDown()
-{
- slobrokMirror.reset(NULL);
- storConfig.reset(NULL);
- distConfig.reset(NULL);
- slobrok.reset(NULL);
- docMan.reset(NULL);
- threadPool.reset(NULL);
-}
-
-void
-StorageServerTest::testNormalUsage()
-{
- {
- Distributor distServer(*distConfig);
- Storage storServer(*storConfig);
- }
-}
-
-namespace {
- struct LoadGiver : public document::Runnable,
- public mbus::IReplyHandler
- {
- const vdstestlib::DirConfig& _config;
- const std::shared_ptr<const document::DocumentTypeRepo> _repo;
- documentapi::LoadTypeSet _loadTypes;
- std::unique_ptr<mbus::RPCMessageBus> _mbus;
- mbus::SourceSession::UP _sourceSession;
- uint32_t _maxPending;
- uint32_t _currentPending;
- uint32_t _processedOk;
- uint32_t _unexpectedErrors;
- bool _startedShutdown;
-
- LoadGiver(const vdstestlib::DirConfig& config,
- const std::shared_ptr<const document::DocumentTypeRepo> repo)
- : _config(config), _repo(repo), _mbus(), _sourceSession(),
- _maxPending(20), _currentPending(0), _processedOk(0),
- _unexpectedErrors(0), _startedShutdown(false) {}
- virtual ~LoadGiver() {
- if (_sourceSession.get() != 0) {
- _sourceSession->close();
- }
- }
-
- void init() {
- auto protocol = std::make_shared<documentapi::DocumentProtocol>(_loadTypes, _repo);
- auto storageProtocol = std::make_shared<storage::mbusprot::StorageProtocol>(_repo, _loadTypes);
- mbus::ProtocolSet protocols;
- protocols.add(protocol);
- protocols.add(storageProtocol);
- mbus::RPCNetworkParams networkParams(config::ConfigUri(_config.getConfigId()));
- _mbus = std::make_unique<mbus::RPCMessageBus>(protocols, networkParams, _config.getConfigId());
- mbus::SourceSessionParams sourceParams;
- sourceParams.setTimeout(5000);
- auto policy = std::make_shared<mbus::StaticThrottlePolicy>();
- policy->setMaxPendingCount(_maxPending);
- sourceParams.setThrottlePolicy(policy);
- _sourceSession = _mbus->getMessageBus().createSourceSession(*this, sourceParams);
- }
-
- virtual void notifyStartingShutdown() {
- _startedShutdown = true;
- }
-
- void handleReply(mbus::Reply::UP reply) override {
- using documentapi::DocumentProtocol;
- --_currentPending;
- if (!reply->hasErrors()) {
- ++_processedOk;
- } else if (!_startedShutdown && reply->getNumErrors() > 1) {
- ++_unexpectedErrors;
- std::cerr << reply->getNumErrors() << " errors. First: "
- << reply->getError(0).getCode() << " - "
- << reply->getError(0).getMessage() << "\n";
- } else {
- int code = reply->getError(0).getCode();
- std::string errorMsg = reply->getError(0).getMessage();
-
- if (code == mbus::ErrorCode::UNKNOWN_SESSION
- || code == mbus::ErrorCode::NO_ADDRESS_FOR_SERVICE
- || code == mbus::ErrorCode::CONNECTION_ERROR
- || code == mbus::ErrorCode::HANDSHAKE_FAILED)
- {
- // Ignore
- } else if ((code >= mbus::ErrorCode::TRANSIENT_ERROR
- && code < mbus::ErrorCode::FATAL_ERROR)
- && (errorMsg.find("UNKNOWN_SESSION") != std::string::npos
- || errorMsg.find("NO_ADDRESS_FOR_SERVICE")
- != std::string::npos
- || errorMsg.find("when node is in state Stopping")
- != std::string::npos
- || errorMsg.find("HANDSHAKE_FAILED")
- != std::string::npos
- || code == mbus::ErrorCode::APP_TRANSIENT_ERROR
- || code == DocumentProtocol::ERROR_IO_FAILURE
- || code == DocumentProtocol::ERROR_ABORTED
- || code == DocumentProtocol::ERROR_BUCKET_NOT_FOUND))
- {
- // Ignore
- } else {
- ++_unexpectedErrors;
- std::cerr << reply->getNumErrors() << " errors. First: "
- << reply->getError(0).getCode() << " - "
- << reply->getError(0).getMessage();
- mbus::Message::UP msg(reply->getMessage());
- if (msg->getType() == DocumentProtocol::MESSAGE_PUTDOCUMENT)
- {
- documentapi::PutDocumentMessage& putMsg(
- static_cast<documentapi::PutDocumentMessage&>(*msg));
- std::cerr << " - " << putMsg.getDocument().getId();
- }
- std::cerr << "\n";
- }
- }
- }
-
- void waitUntilDecentLoad(uint32_t maxWait = 60000) {
- uint64_t maxTime = getTimeInMillis() + maxWait;
- while (true) {
- if (_processedOk > 5 && _currentPending > _maxPending / 2) {
- break;
- }
- uint64_t time = getTimeInMillis();
- if (time > maxTime) {
- if (_processedOk < 5) {
- throw vespalib::IllegalStateException(
- "Failed to process 5 ok operations within timeout.",
- VESPA_STRLOC);
- }
- if (_currentPending < _maxPending / 2) {
- throw vespalib::IllegalStateException(
- "Failed to get enough max pending.",
- VESPA_STRLOC);
- }
- break;
- }
- FastOS_Thread::Sleep(1);
- }
- LOG(info, "Currently, we have received %u ok replies and have %u pending ones.",
- _processedOk, _currentPending);
- }
- };
-
- struct SimpleLoadGiver : public LoadGiver {
- const document::TestDocMan& _testDocMan;
- vespalib::Monitor _threadMonitor;
-
- SimpleLoadGiver(const vdstestlib::DirConfig& config,
- const document::TestDocMan& tdm)
- : LoadGiver(config, tdm.getTypeRepoSP()), _testDocMan(tdm),
- _threadMonitor() {}
- ~SimpleLoadGiver() {
- stop();
- join();
- }
- virtual bool onStop() override {
- vespalib::MonitorGuard monitor(_threadMonitor);
- monitor.signal();
- return true;
- }
- void run() override {
- uint32_t seed = 0;
- uint32_t maxDocSize = 65536;
- init();
- vespalib::MonitorGuard monitor(_threadMonitor);
- while (running()) {
- uint32_t attemptCount = 0;
- while (_currentPending < _maxPending
- && ++attemptCount < _maxPending)
- {
- document::Document::SP doc(
- _testDocMan.createRandomDocument(
- ++seed, maxDocSize));
- mbus::Message::UP msg(
- new documentapi::PutDocumentMessage(doc));
- msg->setRetryEnabled(false);
- mbus::Result r = _sourceSession->send(std::move(msg),
- "storage/cluster.storage/distributor/0/default",
- true);
- if (r.isAccepted()){
- ++_currentPending;
- } else {
- if (!_startedShutdown) {
- std::cerr << "Source session did not accept "
- "message.\n";
- }
- break;
- }
- }
- monitor.wait(1);
- }
- }
- };
-
- void setSystemState(SlobrokMirror& mirror,
- const lib::ClusterState& state,
- const std::vector<std::string>& address)
- {
- std::string systemState = state.toString();
- auto deleter = [](auto * ptr) { ptr->SubRef(); };
- for (uint32_t i=0; i<address.size(); ++i) {
- slobrok::api::MirrorAPI::SpecList list(
- mirror.getMirror().lookup(address[i]));
- for (uint32_t j=0; j<list.size(); ++j) {
- auto target = std::unique_ptr<FRT_Target, decltype(deleter)>(mirror.getSupervisor().GetTarget(
- list[j].second.c_str()), deleter);
- auto req = std::unique_ptr<FRT_RPCRequest, decltype(deleter)>(mirror.getSupervisor().AllocRPCRequest(),
- deleter);
- req->SetMethodName("setsystemstate2");
- req->GetParams()->AddString(systemState.c_str());
- target->InvokeSync(req.get(), 5.0);
- if (req->GetErrorCode() != FRTE_NO_ERROR) {
- throw vespalib::IllegalStateException(
- "Failed sending setsystemstate request: "
- + std::string(req->GetErrorMessage()), VESPA_STRLOC);
- }
- }
- }
- }
-}
-
-void
-StorageServerTest::testShutdownDuringDiskLoad(bool storagenode)
-{
- slobrokMirror->init(5000);
- // Verify that, then shutdown, we stop accepting new messages, fail
- // all messages enqueued and finish current operations before shutting
- // down without any errors.
- std::unique_ptr<Distributor> distServer(new Distributor(*distConfig));
- std::unique_ptr<Storage> storServer(new Storage(*storConfig));
- storServer->waitUntilInitialized(30);
- LOG(info, "\n\nStorage server stable\n\n");
- lib::ClusterState state("version:1 bits:1 distributor:1 storage:1");
- std::vector<std::string> addresses;
- addresses.push_back("storage/cluster.storage/storage/0");
- addresses.push_back("storage/cluster.storage/distributor/0");
- LOG(info, "\n\nSetting system states\n\n");
- setSystemState(*slobrokMirror, state, addresses);
- LOG(info, "\n\nWaiting for stable distributor server\n\n");
- distServer->waitUntilInitialized(30);
-
- LOG(info, "\n\nSTARTING LOADGIVER\n\n");
-
- SimpleLoadGiver loadGiver(*distConfig, *docMan);
- loadGiver.start(*threadPool);
- loadGiver.waitUntilDecentLoad();
-
- loadGiver.notifyStartingShutdown();
-
- if (storagenode) {
- LOG(info, "\n\nKILLING STORAGE NODE\n\n");
- storServer->requestShutdown(
- "Stopping storage server during load for testing");
- storServer.reset(0);
- } else {
- LOG(info, "\n\nKILLING DISTRIBUTOR\n\n");
- distServer->requestShutdown(
- "Stopping distributor during load for testing");
- distServer.reset(0);
- }
- LOG(info, "\n\nDONE KILLING NODE. Cleaning up other stuff.\n\n");
-
- CPPUNIT_ASSERT_EQUAL(0u, loadGiver._unexpectedErrors);
}
void
-StorageServerTest::testShutdownStorageDuringDiskLoad()
+StorageServerTest::TearDown()
{
- testShutdownDuringDiskLoad(true);
+ storConfig.reset(nullptr);
+ distConfig.reset(nullptr);
+ slobrok.reset(nullptr);
}
-void
-StorageServerTest::testShutdownDistributorDuringDiskLoad()
+TEST_F(StorageServerTest, distributor_server_can_be_instantiated)
{
- testShutdownDuringDiskLoad(false);
+ Distributor distServer(*distConfig);
}
-void
-StorageServerTest::testShutdownAfterDiskFailure_Stress()
+TEST_F(StorageServerTest, storage_server_can_be_instantiated)
{
- slobrokMirror->init(5000);
-
- // Verify that, then shutdown, we stop accepting new messages, fail
- // all messages enqueued and finish current operations before shutting
- // down without any errors.
- std::unique_ptr<Distributor> distServer(new Distributor(*distConfig));
- std::unique_ptr<Storage> storServer(new Storage(*storConfig));
- //storServer->getSlotFileCache().disable();
- storServer->waitUntilInitialized(30);
- LOG(info, "\n\nStorage server stable\n\n");
- lib::ClusterState state("version:1 bits:1 distributor:1 storage:1");
- std::vector<std::string> addresses;
- addresses.push_back("storage/cluster.storage/storage/0");
- addresses.push_back("storage/cluster.storage/distributor/0");
- LOG(info, "\n\nSetting system states\n\n");
- setSystemState(*slobrokMirror, state, addresses);
- LOG(info, "\n\nWaiting for stable distributor server\n\n");
- distServer->waitUntilInitialized(30);
-
- LOG(info, "\n\nSTARTING LOADGIVER\n\n");
-
- SimpleLoadGiver loadGiver(*distConfig, *docMan);
- loadGiver.start(*threadPool);
- loadGiver.waitUntilDecentLoad();
-
- // Test that getting io errors flags storage for shutdown
- // (The shutdown is the responsibility of the application in
- // storageserver)
- CPPUNIT_ASSERT(!storServer->attemptedStopped());
- loadGiver.notifyStartingShutdown();
- LOG(info, "\n\nREMOVING PERMISSIONS\n\n");
- [[maybe_unused]] int systemResult = system("chmod 000 vdsroot/disks/d0/*.0");
- systemResult = system("ls -ld vdsroot/disks/d0/* > permissions");
-
- for (uint32_t i=0; i<6000; ++i) {
- //storServer->getMemFileCache().clear();
- if (storServer->attemptedStopped()) break;
- FastOS_Thread::Sleep(10);
- }
- if (!storServer->attemptedStopped()) {
- CPPUNIT_FAIL("Removing permissions from disk failed to stop storage "
- "within timeout of 60 seconds");
- }
-
- CPPUNIT_ASSERT_EQUAL(0u, loadGiver._unexpectedErrors);
- unlink("permissions");
-}
-
-namespace {
-
- struct PriorityStorageLoadGiver : public LoadGiver {
- const document::TestDocMan& _testDocMan;
- vespalib::Monitor _threadMonitor;
- StorBucketDatabase _bucketDB;
- document::BucketIdFactory _idFactory;
- uint32_t _putCount;
- uint32_t _getCount;
- uint32_t _removeCount;
- uint32_t _joinCount;
- uint32_t _splitCount;
- uint32_t _createBucket;
- uint32_t _deleteBucket;
- uint32_t _remappedOperations;
- uint32_t _notFoundOps;
- uint32_t _existOps;
- uint32_t _bucketDeletedOps;
- uint32_t _bucketNotFoundOps;
- uint32_t _rejectedOps;
-
- PriorityStorageLoadGiver(const vdstestlib::DirConfig& config,
- const document::TestDocMan& tdm)
- : LoadGiver(config, tdm.getTypeRepoSP()), _testDocMan(tdm),
- _threadMonitor(),
- _bucketDB(), _idFactory(),
- _putCount(0), _getCount(0), _removeCount(0), _joinCount(0),
- _splitCount(0), _createBucket(0), _deleteBucket(0),
- _remappedOperations(0), _notFoundOps(0), _existOps(0),
- _bucketDeletedOps(0), _bucketNotFoundOps(0), _rejectedOps(0) {}
- ~PriorityStorageLoadGiver() {
- close();
- }
- virtual void close() {
- if (running()) {
- stop();
- join();
- }
- }
- virtual bool onStop() override {
- vespalib::MonitorGuard monitor(_threadMonitor);
- monitor.signal();
- return true;
- }
-
- void run() override {
- uint32_t seed = 0;
- uint32_t maxDocSize = 65536;
- init();
- vespalib::MonitorGuard monitor(_threadMonitor);
- std::list<mbusprot::StorageCommand*> sendList;
- while (running()) {
- while (sendList.size() < (_maxPending - _currentPending)) {
- document::Document::SP doc(
- _testDocMan.createRandomDocument(
- ++seed, maxDocSize));
- api::StorageCommand::SP cmd;
- document::BucketId bucket(
- _idFactory.getBucketId(doc->getId()));
- std::map<document::BucketId,
- StorBucketDatabase::WrappedEntry> entries(
- _bucketDB.getContained(bucket, ""));
- if (entries.size() == 0
- || (entries.size() == 1
- && (entries.begin()->second->getBucketInfo().getChecksum() & 2) != 0))
- {
- if (entries.size() == 0) {
- bucket.setUsedBits(4);
- bucket = bucket.stripUnused();
- entries[bucket] = _bucketDB.get(bucket, "",
- StorBucketDatabase::CREATE_IF_NONEXISTING);
- entries[bucket]->setChecksum(0);
- } else {
- bucket = entries.begin()->first;
- entries[bucket]->setChecksum(
- entries[bucket]->getBucketInfo().getChecksum() & ~2);
- }
- entries[bucket]->disk = 0;
- entries[bucket].write();
- entries[bucket] = _bucketDB.get(bucket, "foo");
- CPPUNIT_ASSERT(entries[bucket].exist());
- cmd.reset(new api::CreateBucketCommand(makeDocumentBucket(bucket)));
- sendList.push_back(new mbusprot::StorageCommand(cmd));
- }
- CPPUNIT_ASSERT_EQUAL(size_t(1), entries.size());
- bucket = entries.begin()->first;
- auto *entry_wrapper = &(entries.begin()->second);
- auto *entry = entry_wrapper->get();
- if (seed % 95 == 93) { // Delete bucket
- if ((entry->getBucketInfo().getChecksum() & 2) == 0) {
- cmd.reset(new api::DeleteBucketCommand(makeDocumentBucket(bucket)));
- entry->setChecksum(
- entry->getBucketInfo().getChecksum() | 2);
- entry_wrapper->write();
- sendList.push_back(
- new mbusprot::StorageCommand(cmd));
- }
- } else if (seed % 13 == 8) { // Join
- if (entry->getBucketInfo().getChecksum() == 0 && bucket.getUsedBits() > 3) {
- // Remove existing locks we have to not cause
- // deadlock
- entry = nullptr;
- entry_wrapper = nullptr;
- entries.clear();
- // Then continue
- document::BucketId super(bucket.getUsedBits() - 1,
- bucket.getRawId());
- super = super.stripUnused();
- api::JoinBucketsCommand::SP jcmd(
- new api::JoinBucketsCommand(makeDocumentBucket(super)));
- entries = _bucketDB.getAll(super, "foo");
- bool foundAnyLocked = false;
- for (std::map<document::BucketId,
- StorBucketDatabase::WrappedEntry>
- ::iterator it = entries.begin();
- it != entries.end(); ++it)
- {
- if (!super.contains(it->first) || super == it->first) continue;
- jcmd->getSourceBuckets().push_back(
- it->first.stripUnused());
- foundAnyLocked |= (it->second->getBucketInfo().getChecksum() != 0);
- }
- if (!foundAnyLocked && jcmd->getSourceBuckets().size() == 2) {
- for (std::map<document::BucketId,
- StorBucketDatabase::WrappedEntry>
- ::iterator it = entries.begin();
- it != entries.end(); ++it)
- {
- if (!super.contains(it->first)) continue;
- it->second->setChecksum(
- it->second->getBucketInfo().getChecksum() | 1);
- it->second.write();
- }
- cmd = jcmd;
- sendList.push_back(
- new mbusprot::StorageCommand(cmd));
- }
- }
- } else if (seed % 13 == 1) { // Split
- // Use _checksum == 1 to mean that we have a pending
- // maintenance operation to this bucket.
- if (entry->getBucketInfo().getChecksum() == 0) {
- cmd.reset(new api::SplitBucketCommand(makeDocumentBucket(bucket)));
- entry->setChecksum(1);
- entry_wrapper->write();
- sendList.push_back(
- new mbusprot::StorageCommand(cmd));
- }
- } else if (seed % 7 == 5) { // Remove
- if ((entry->getBucketInfo().getChecksum() & 2) == 0) {
- cmd.reset(new api::RemoveCommand(makeDocumentBucket(bucket),
- doc->getId(), 1000ull * seed + 2));
- sendList.push_back(
- new mbusprot::StorageCommand(cmd));
- }
- } else if (seed % 5 == 3) { // Get
- if ((entry->getBucketInfo().getChecksum() & 2) == 0) {
- cmd.reset(new api::GetCommand(
- makeDocumentBucket(bucket), doc->getId(), "[all]"));
- sendList.push_back(
- new mbusprot::StorageCommand(cmd));
- }
- } else { // Put
- if ((entry->getBucketInfo().getChecksum() & 2) == 0) {
- cmd.reset(new api::PutCommand(
- makeDocumentBucket(bucket), doc, 1000ull * seed + 1));
- sendList.push_back(
- new mbusprot::StorageCommand(cmd));
- }
- }
- if (!sendList.empty()) {
- uint8_t priorities[] = {
- api::StorageMessage::LOW,
- api::StorageMessage::NORMAL,
- api::StorageMessage::HIGH,
- api::StorageMessage::VERYHIGH
- };
- sendList.back()->getCommand()->setPriority(priorities[seed % 4]);
- }
- }
- if (sendList.size() > 0) {
- uint32_t sent = 0;
- for (uint32_t i=0; i<sendList.size(); ++i) {
- mbus::Message::UP msg(*sendList.begin());
- msg->setRetryEnabled(false);
- mbus::Result r = _sourceSession->send(std::move(msg),
- "storage/cluster.storage/storage/0/default",
- true);
- if (r.isAccepted()){
- sendList.pop_front();
- ++_currentPending;
- ++sent;
- } else {
- r.getMessage().release();
- break;
- }
- }
- }
- monitor.wait(1);
- }
- }
-
- std::string report() {
- std::ostringstream ost;
- ost << "Performed ("
- << _putCount << ", " << _getCount << ", "
- << _removeCount << ", " << _splitCount << ", "
- << _joinCount << ", " << _createBucket << ", "
- << _deleteBucket
- << ") put/get/remove/split/join/create/delete operations.\n"
- << "Result: " << _remappedOperations << " remapped operations\n"
- << " " << _processedOk << " ok responses.\n"
- << " " << _notFoundOps << " NOT_FOUND responses.\n"
- << " " << _existOps << " EXISTS responses\n"
- << " " << _bucketDeletedOps << " BUCKET_DELETED responses\n"
- << " " << _bucketNotFoundOps << " BUCKET_NOT_FOUND responses\n"
- << " " << _rejectedOps << " REJECTED responses (duplicate splits)\n"
- << " " << _unexpectedErrors << " unexpected errors\n";
- return ost.str();
- }
-
- virtual void handleReply(mbus::Reply::UP reply) override {
- if (_startedShutdown) return;
- --_currentPending;
- std::ostringstream err;
- mbusprot::StorageReply* mreply(
- dynamic_cast<mbusprot::StorageReply*>(reply.get()));
- if (mreply == 0) {
- ++_unexpectedErrors;
- err << "Got unexpected reply which is not a storage reply, "
- << "likely emptyreply.";
- if (reply->hasErrors()) {
- int code = reply->getError(0).getCode();
- std::string errorMsg = reply->getError(0).getMessage();
- err << "\n mbus(" << code << "): '" << errorMsg << "'";
- }
- err << "\n";
- std::cerr << err.str();
- return;
- }
- api::StorageReply& sreply(*mreply->getReply());
- api::BucketReply& breply(static_cast<api::BucketReply&>(sreply));
-
- if ((!reply->hasErrors()
- && sreply.getResult().success())
- || sreply.getResult().getResult() == api::ReturnCode::EXISTS
- || sreply.getResult().getMessage().find("Bucket does not exist; assuming already split") != std::string::npos
- || sreply.getResult().getResult()
- == api::ReturnCode::BUCKET_DELETED
- || sreply.getResult().getResult()
- == api::ReturnCode::BUCKET_NOT_FOUND)
- {
- std::ostringstream out;
- if (breply.hasBeenRemapped()) {
- ++_remappedOperations;
- }
- if (sreply.getType() == api::MessageType::JOINBUCKETS_REPLY) {
- vespalib::MonitorGuard monitor(_threadMonitor);
- api::JoinBucketsReply& joinReply(
- static_cast<api::JoinBucketsReply&>(sreply));
- StorBucketDatabase::WrappedEntry entry(
- _bucketDB.get(joinReply.getBucketId(), "",
- StorBucketDatabase::CREATE_IF_NONEXISTING));
- entry->setChecksum(0);
- entry->disk = 0;
- entry.write();
- for(std::vector<document::BucketId>::const_iterator it
- = joinReply.getSourceBuckets().begin();
- it != joinReply.getSourceBuckets().end(); ++it)
- {
- _bucketDB.erase(*it, "foo");
- }
- ++_joinCount;
- out << "OK " << joinReply.getBucketId() << " Join\n";
- } else if (sreply.getType()
- == api::MessageType::SPLITBUCKET_REPLY
- && sreply.getResult().getResult() != api::ReturnCode::REJECTED)
- {
- vespalib::MonitorGuard monitor(_threadMonitor);
- api::SplitBucketReply& splitReply(
- static_cast<api::SplitBucketReply&>(sreply));
- StorBucketDatabase::WrappedEntry entry(
- _bucketDB.get(splitReply.getBucketId(), "foo"));
- if (entry.exist()) {
- //CPPUNIT_ASSERT((entry->getBucketInfo().getChecksum() & 1) != 0);
- entry.remove();
- }
- for(std::vector<api::SplitBucketReply::Entry>::iterator it
- = splitReply.getSplitInfo().begin();
- it != splitReply.getSplitInfo().end(); ++it)
- {
- entry = _bucketDB.get(it->first, "foo",
- StorBucketDatabase::CREATE_IF_NONEXISTING);
- entry->setChecksum(0);
- entry->disk = 0;
- entry.write();
- }
- ++_splitCount;
- out << "OK " << splitReply.getBucketId() << " Split\n";
- } else if (sreply.getType() == api::MessageType::PUT_REPLY) {
- ++_putCount;
- if (!static_cast<api::PutReply&>(sreply).wasFound()) {
- ++_notFoundOps;
- }
- out << "OK " << breply.getBucketId() << " Put\n";
- } else if (sreply.getType() == api::MessageType::GET_REPLY) {
- ++_getCount;
- if (!static_cast<api::GetReply&>(sreply).wasFound()) {
- ++_notFoundOps;
- }
- out << "OK " << breply.getBucketId() << " Get\n";
- } else if (sreply.getType() == api::MessageType::REMOVE_REPLY) {
- ++_removeCount;
- if (!static_cast<api::RemoveReply&>(sreply).wasFound()) {
- ++_notFoundOps;
- }
- out << "OK " << breply.getBucketId() << " Remove\n";
- } else if (sreply.getType()
- == api::MessageType::CREATEBUCKET_REPLY)
- {
- ++_createBucket;
- out << "OK " << breply.getBucketId() << " Create\n";
- } else if (sreply.getType()
- == api::MessageType::DELETEBUCKET_REPLY)
- {
- ++_deleteBucket;
- out << "OK " << breply.getBucketId() << " Delete\n";
- }
- switch (sreply.getResult().getResult()) {
- case api::ReturnCode::EXISTS: ++_existOps; break;
- case api::ReturnCode::BUCKET_NOT_FOUND:
- ++_bucketNotFoundOps; break;
- case api::ReturnCode::BUCKET_DELETED:
- ++_bucketDeletedOps; break;
- case api::ReturnCode::REJECTED:
- ++_rejectedOps; break;
- case api::ReturnCode::OK: ++_processedOk; break;
- default:
- assert(false);
- }
- //std::cerr << "OK - " << sreply.getType() << "\n";
- if (_processedOk % 5000 == 0) {
- out << report();
- }
- //err << out.str();
- } else {
- ++_unexpectedErrors;
- api::BucketReply& brep(static_cast<api::BucketReply&>(sreply));
- err << "Failed " << brep.getBucketId() << " "
- << sreply.getType().getName() << ":";
- if (reply->hasErrors()) {
- int code = reply->getError(0).getCode();
- std::string errorMsg = reply->getError(0).getMessage();
- err << " mbus(" << code << "): '" << errorMsg << "'";
- }
- if (sreply.getResult().failed()) {
- err << " sapi: " << sreply.getResult() << "\n";
- }
- }
- std::cerr << err.str();
- }
- };
-
- enum StateType { REPORTED, CURRENT };
- void waitForStorageUp(StorageComponent& storageNode,
- StateType type, time_t timeoutMS = 60000)
- {
- framework::defaultimplementation::RealClock clock;
- framework::MilliSecTime timeout = clock.getTimeInMillis()
- + framework::MilliSecTime(timeoutMS);
- while (true) {
- lib::NodeState::CSP ns(type == REPORTED
- ? storageNode.getStateUpdater().getReportedNodeState()
- : storageNode.getStateUpdater().getCurrentNodeState());
- if (ns->getState() == lib::State::UP) return;
- if (clock.getTimeInMillis() > timeout) {
- std::ostringstream ost;
- ost << "Storage node failed to get up within timeout of "
- << timeoutMS << " ms. Current state is: " << ns;
- CPPUNIT_FAIL(ost.str());
- }
- FastOS_Thread::Sleep(10);
- }
- }
-
-}
-
-void
-StorageServerTest::testPriorityAndQueueSneakingWhileSplitJoinStressTest()
-{
- PriorityStorageLoadGiver loadGiver(*storConfig, *docMan);
Storage storServer(*storConfig);
- waitForStorageUp(storServer.getComponent(), REPORTED);
- api::SetSystemStateCommand::SP cmd(new api::SetSystemStateCommand(
- lib::ClusterState("storage:1")));
- storServer.getChain()->sendDown(cmd);
- waitForStorageUp(storServer.getComponent(), CURRENT);
- loadGiver.start(*threadPool);
- while (loadGiver._processedOk + loadGiver._unexpectedErrors < 1000) {
- FastOS_Thread::Sleep(100);
- std::cerr << "OK " << loadGiver._processedOk << " Errors: "
- << loadGiver._unexpectedErrors << "\n";
- }
- loadGiver.notifyStartingShutdown();
- loadGiver.stop();
- loadGiver.close();
- std::cerr << loadGiver.report();
- CPPUNIT_ASSERT(loadGiver._bucketNotFoundOps < 300);
- CPPUNIT_ASSERT(loadGiver._unexpectedErrors == 0);
}
-// This test is not a stress test, but adding stress to its name makes it not
-// run during regular make test runs
-void
-StorageServerTest::testPortOverlap_Stress()
-{
- for (uint32_t i=0; i<3; ++i) {
- std::cerr << "Run " << i << "\n";
- tearDown();
- setUp();
- const char* config = "stor-communicationmanager";
- std::string type = "none";
- if (i == 0) {
- distConfig->getConfig(config).set("mbusport", "12301");
- storConfig->getConfig(config).set("mbusport", "12311");
- type = "mbusport";
- } else if (i == 1) {
- distConfig->getConfig(config).set("rpcport", "12302");
- storConfig->getConfig(config).set("rpcport", "12312");
- type = "rpcport";
- } else if (i == 2) {
- distConfig->getConfig("stor-status").set("httpport", "12303");
- storConfig->getConfig("stor-status").set("httpport", "12313");
- type = "httpport";
- }
- LOG(info, "TEST: (0) STARTING PORT TEST: %s", type.c_str());
- slobrokMirror->init(5000);
-
- std::unique_ptr<Distributor> distServerOld(new Distributor(*distConfig));
- std::unique_ptr<Storage> storServerOld(new Storage(*storConfig));
-
- LOG(info, "TEST: (1) WAITING FOR STABLE STORAGE SERVER");
- storServerOld->waitUntilInitialized(30);
- LOG(info, "TEST: (2) STORAGE SERVER STABLE");
- lib::ClusterState state("version:1 distributor:1 storage:1");
- std::vector<std::string> addresses;
- addresses.push_back("storage/cluster.storage/storage/0");
- addresses.push_back("storage/cluster.storage/distributor/0");
- LOG(info, "TEST: (3) SETTING SYSTEM STATES");
- setSystemState(*slobrokMirror, state, addresses);
- LOG(info, "TEST: (4) WAITING FOR STABLE DISTRIBUTOR SERVER");
- distServerOld->waitUntilInitialized(30);
-
- {
- LOG(info, "TEST: (5) ADDING SOME LOAD TO CHECK PORTS");
- SimpleLoadGiver loadGiver(*distConfig, *docMan);
- loadGiver.start(*threadPool);
- loadGiver.waitUntilDecentLoad();
- }
-
- LOG(info, "TEST: (6) CREATING NEW SET OF SERVERS");
- try{
- Distributor distServer(*distConfig);
- CPPUNIT_FAIL("Distributor server failed to fail on busy " + type);
- } catch (vespalib::Exception& e) {
- std::string msg = e.getMessage();
- std::string::size_type pos = msg.rfind(':');
- if (pos != std::string::npos) msg = msg.substr(pos + 2);
- if (msg == "Failed to listen to RPC port 12302." ||
- msg == "Failed to start network." ||
- msg == "Failed to start status HTTP server using port 12303.")
- {
- } else {
- CPPUNIT_FAIL("Unexpected exception: " + msg);
- }
- }
- try{
- Storage storServer(*storConfig);
- CPPUNIT_FAIL("Storage server failed to fail on busy " + type);
- } catch (vespalib::Exception& e) {
- std::string msg = e.getMessage();
- std::string::size_type pos = msg.rfind(':');
- if (pos != std::string::npos) msg = msg.substr(pos + 2);
- if (msg == "Failed to listen to RPC port 12312." ||
- msg == "Failed to start network." ||
- msg == "Failed to start status HTTP server using port 12313.")
- {
- } else {
- CPPUNIT_FAIL("Unexpected exception: " + msg);
- }
- }
- }
}
-
-void
-StorageServerTest::testStatusPages()
-{
- Storage storServer(*storConfig);
- // Bucket manager doesn't set up metrics before after talking to
- // persistence layer
- storServer.getNode().waitUntilInitialized();
- {
- // Get HTML status pages
- framework::HttpUrlPath path("?interval=-2&format=html");
- std::ostringstream ost;
- try{
- const framework::StatusReporter* reporter(
- storServer.getStatusReporter("statusmetricsconsumer"));
- CPPUNIT_ASSERT(reporter != 0);
- reporter->reportStatus(ost, path);
- } catch (std::exception& e) {
- CPPUNIT_FAIL("Failed to get status metric page: "
- + std::string(e.what()) + "\nGot so far: " + ost.str());
- }
- std::string output = ost.str();
- CPPUNIT_ASSERT_MSG(output,
- output.find("Exception") == std::string::npos);
- CPPUNIT_ASSERT_MSG(output, output.find("Error") == std::string::npos);
- }
-}
-
-} // storage
diff --git a/storageserver/src/tests/testhelper.cpp b/storageserver/src/tests/testhelper.cpp
index 5e6a71b078f..39935770897 100644
--- a/storageserver/src/tests/testhelper.cpp
+++ b/storageserver/src/tests/testhelper.cpp
@@ -138,14 +138,4 @@ void addFileConfig(vdstestlib::DirConfig& dc,
in.close();
}
-TestName::TestName(const std::string& n)
- : name(n)
-{
- LOG(debug, "Starting test %s", name.c_str());
-}
-
-TestName::~TestName() {
- LOG(debug, "Done with test %s", name.c_str());
-}
-
} // storage
diff --git a/storageserver/src/tests/testhelper.h b/storageserver/src/tests/testhelper.h
index 9db4a93041a..7dbaaecdbf4 100644
--- a/storageserver/src/tests/testhelper.h
+++ b/storageserver/src/tests/testhelper.h
@@ -1,35 +1,10 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
-#include <vespa/vdstestlib/cppunit/dirconfig.h>
-#include <vespa/vdstestlib/cppunit/macros.h>
-
#include <fstream>
-#include <vespa/messagebus/testlib/slobrok.h>
#include <sstream>
-
-#define ASSERT_REPLY_COUNT(count, dummylink) \
- { \
- std::ostringstream msgost; \
- if ((dummylink).getNumReplies() != count) { \
- for (uint32_t ijx=0; ijx<(dummylink).getNumReplies(); ++ijx) { \
- msgost << (dummylink).getReply(ijx)->toString(true) << "\n"; \
- } \
- } \
- CPPUNIT_ASSERT_EQUAL_MSG(msgost.str(), size_t(count), \
- (dummylink).getNumReplies()); \
- }
-#define ASSERT_COMMAND_COUNT(count, dummylink) \
- { \
- std::ostringstream msgost; \
- if ((dummylink).getNumCommands() != count) { \
- for (uint32_t ijx=0; ijx<(dummylink).getNumCommands(); ++ijx) { \
- msgost << (dummylink).getCommand(ijx)->toString(true) << "\n"; \
- } \
- } \
- CPPUNIT_ASSERT_EQUAL_MSG(msgost.str(), size_t(count), \
- (dummylink).getNumCommands()); \
- }
+#include <vespa/messagebus/testlib/slobrok.h>
+#include <vespa/vdstestlib/cppunit/dirconfig.h>
namespace storage {
@@ -45,13 +20,5 @@ vdstestlib::DirConfig getStandardConfig(bool storagenode);
void addSlobrokConfig(vdstestlib::DirConfig& dc,
const mbus::Slobrok& slobrok);
-// Class used to print start and end of test. Enable debug when you want to see
-// which test creates what output or where we get stuck
-struct TestName {
- std::string name;
- TestName(const std::string& n);
- ~TestName();
-};
-
} // storage
diff --git a/storageserver/src/tests/testrunner.cpp b/storageserver/src/tests/testrunner.cpp
deleted file mode 100644
index 3ca7a1b93a2..00000000000
--- a/storageserver/src/tests/testrunner.cpp
+++ /dev/null
@@ -1,12 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include <vespa/vdstestlib/cppunit/cppunittestrunner.h>
-#include <vespa/log/log.h>
-LOG_SETUP("storagecppunittests");
-
-int
-main(int argc, const char *argv[])
-{
- vdstestlib::CppUnitTestRunner testRunner;
- return testRunner.run(argc, argv);
-}