summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-02-02 16:19:03 +0100
committerGitHub <noreply@github.com>2021-02-02 16:19:03 +0100
commit4c87d180e880012b9ef932c8490a773509cfedd0 (patch)
treeff993a6ea2bf254e06d6875511aeae533c9b1d8b /storage
parentcc09087f82931a6b00498128f88c30e8d024bd34 (diff)
Revert "Properly track execution of BucketTasks and provide sync() and order… "
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/persistence/filestorage/filestormanagertest.cpp34
-rw-r--r--storage/src/vespa/storage/bucketdb/bucketcopy.h44
-rw-r--r--storage/src/vespa/storage/bucketdb/bucketmanager.cpp8
-rw-r--r--storage/src/vespa/storage/distributor/pending_bucket_space_db_transition_entry.h2
-rw-r--r--storage/src/vespa/storage/distributor/pendingmessagetracker.h12
-rw-r--r--storage/src/vespa/storage/persistence/asynchandler.cpp2
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp73
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.h8
-rw-r--r--storage/src/vespa/storage/persistence/messages.cpp19
-rw-r--r--storage/src/vespa/storage/persistence/messages.h13
10 files changed, 54 insertions, 161 deletions
diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
index 5a7a4394da4..acccbb8b9b9 100644
--- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
+++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
@@ -456,40 +456,6 @@ TEST_F(FileStorManagerTest, running_task_against_existing_bucket_works) {
EXPECT_EQ(1, numInvocations);
}
-TEST_F(FileStorManagerTest, sync_waits_for_already_started_tasks) {
- TestFileStorComponents c(*this);
-
- setClusterState("storage:3 distributor:3");
- EXPECT_TRUE(getDummyPersistence().getClusterState().nodeUp());
-
- auto executor = getDummyPersistence().get_bucket_executor();
- ASSERT_TRUE(executor);
-
- spi::Bucket b1 = makeSpiBucket(document::BucketId(1));
-
- createBucket(b1.getBucketId());
-
- std::atomic<size_t> numInvocations(0);
- vespalib::Gate gate;
- auto response = executor->execute(b1, spi::makeBucketTask([&numInvocations, &gate](const spi::Bucket &, std::shared_ptr<IDestructorCallback>) {
- gate.await();
- numInvocations++;
- }));
- EXPECT_FALSE(response);
- EXPECT_EQ(0, numInvocations);
- std::atomic<bool> syncComplete(false);
- std::thread thread([&syncComplete, &executor]() {
- executor->sync();
- syncComplete = true;
- });
- std::this_thread::sleep_for(100us);
- EXPECT_FALSE(syncComplete);
- gate.countDown();
- thread.join();
- EXPECT_TRUE(syncComplete);
- EXPECT_EQ(1, numInvocations);
-}
-
TEST_F(FileStorManagerTest, state_change) {
TestFileStorComponents c(*this);
diff --git a/storage/src/vespa/storage/bucketdb/bucketcopy.h b/storage/src/vespa/storage/bucketdb/bucketcopy.h
index 3e93e4594a6..94a1e63e53e 100644
--- a/storage/src/vespa/storage/bucketdb/bucketcopy.h
+++ b/storage/src/vespa/storage/bucketdb/bucketcopy.h
@@ -28,9 +28,9 @@ public:
{
}
- bool trusted() const noexcept { return _flags & TRUSTED; }
+ bool trusted() const { return _flags & TRUSTED; }
- BucketCopy& setTrusted(bool val = true) noexcept {
+ BucketCopy& setTrusted(bool val = true) {
if (!val) {
clearTrusted();
} else {
@@ -40,44 +40,46 @@ public:
return *this;
}
- void clearTrusted() noexcept { _flags &= ~TRUSTED; }
+ void clearTrusted() { _flags &= ~TRUSTED; }
- bool valid() const noexcept { return getBucketInfo().valid(); }
- bool empty() const noexcept { return getBucketInfo().empty(); }
- bool wasRecentlyCreated() const noexcept {
+ bool valid() const { return getBucketInfo().valid(); }
+ bool empty() const { return getBucketInfo().empty(); }
+ bool wasRecentlyCreated() const {
return (getChecksum() == 1
&& getDocumentCount() == 0
&& getTotalDocumentSize() == 0);
}
- static BucketCopy recentlyCreatedCopy(uint64_t timestamp, uint16_t nodeIdx) noexcept {
+ static BucketCopy recentlyCreatedCopy(uint64_t timestamp, uint16_t nodeIdx)
+ {
return BucketCopy(timestamp, nodeIdx, api::BucketInfo(1, 0, 0, 0, 0));
}
- uint16_t getNode() const noexcept { return _node; }
- uint64_t getTimestamp() const noexcept { return _timestamp; }
+ uint16_t getNode() const { return _node; }
+ uint64_t getTimestamp() const { return _timestamp; }
- uint32_t getChecksum() const noexcept { return _info.getChecksum(); }
- uint32_t getDocumentCount() const noexcept { return _info.getDocumentCount(); }
- uint32_t getTotalDocumentSize() const noexcept { return _info.getTotalDocumentSize(); }
- uint32_t getMetaCount() const noexcept { return _info.getMetaCount(); }
- uint32_t getUsedFileSize() const noexcept { return _info.getUsedFileSize(); }
- bool active() const noexcept { return _info.isActive(); }
- bool ready() const noexcept { return _info.isReady(); }
+ uint32_t getChecksum() const { return _info.getChecksum(); }
+ uint32_t getDocumentCount() const { return _info.getDocumentCount(); }
+ uint32_t getTotalDocumentSize() const
+ { return _info.getTotalDocumentSize(); }
+ uint32_t getMetaCount() const { return _info.getMetaCount(); }
+ uint32_t getUsedFileSize() const { return _info.getUsedFileSize(); }
+ bool active() const { return _info.isActive(); }
+ bool ready() const { return _info.isReady(); }
- const api::BucketInfo& getBucketInfo() const noexcept { return _info; }
+ const api::BucketInfo& getBucketInfo() const { return _info; }
- void setBucketInfo(uint64_t timestamp, const api::BucketInfo& bInfo) noexcept {
+ void setBucketInfo(uint64_t timestamp, const api::BucketInfo& bInfo) {
_info = bInfo;
_timestamp = timestamp;
}
- void setActive(bool setactive) noexcept {
+ void setActive(bool setactive) {
_info.setActive(setactive);
}
bool consistentWith(const BucketCopy& other,
- bool countInvalidAsConsistent = false) const noexcept
+ bool countInvalidAsConsistent = false) const
{
// If both are valid, check checksum and doc count.
if (valid() && other.valid()) {
@@ -92,7 +94,7 @@ public:
std::string toString() const;
- bool operator==(const BucketCopy& other) const noexcept {
+ bool operator==(const BucketCopy& other) const {
return
getBucketInfo() == other.getBucketInfo() &&
_flags == other._flags;
diff --git a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp
index 2d70ee8d3ba..5be6f310c71 100644
--- a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp
+++ b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp
@@ -166,17 +166,17 @@ namespace {
uint64_t active;
uint64_t ready;
- Count() noexcept : docs(0), bytes(0), buckets(0), active(0), ready(0) {}
+ Count() : docs(0), bytes(0), buckets(0), active(0), ready(0) {}
};
Count count;
uint32_t lowestUsedBit;
- MetricsUpdater() noexcept
+ MetricsUpdater()
: count(), lowestUsedBit(58) {}
void operator()(document::BucketId::Type bucketId,
- const StorBucketDatabase::Entry& data) noexcept
+ const StorBucketDatabase::Entry& data)
{
document::BucketId bucket(
document::BucketId::keyToBucketId(bucketId));
@@ -198,7 +198,7 @@ namespace {
}
};
- void add(const MetricsUpdater& rhs) noexcept {
+ void add(const MetricsUpdater& rhs) {
auto& d = count;
auto& s = rhs.count;
d.buckets += s.buckets;
diff --git a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition_entry.h b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition_entry.h
index 785419e78cf..124ee1bdf45 100644
--- a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition_entry.h
+++ b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition_entry.h
@@ -8,7 +8,7 @@ namespace storage::distributor::dbtransition {
struct Entry {
Entry(const document::BucketId& bid,
- const BucketCopy& copy_) noexcept
+ const BucketCopy& copy_)
: bucket_key(bid.toKey()),
copy(copy_)
{}
diff --git a/storage/src/vespa/storage/distributor/pendingmessagetracker.h b/storage/src/vespa/storage/distributor/pendingmessagetracker.h
index 13d83157150..51971a276b4 100644
--- a/storage/src/vespa/storage/distributor/pendingmessagetracker.h
+++ b/storage/src/vespa/storage/distributor/pendingmessagetracker.h
@@ -196,6 +196,18 @@ private:
// to be present for that exact purpose.
mutable std::mutex _lock;
+ /**
+ * Increment latency and operation count stats for the node the message
+ * was sent towards based on the registered send time and the current time.
+ *
+ * In the event that system time has moved backwards across sending a
+ * command and reciving its reply, the latency will not be recorded but
+ * the total number of messages will increase.
+ *
+ * _lock MUST be held upon invocation.
+ */
+ void updateNodeStatsOnReply(const MessageEntry& entry);
+
void getStatusStartPage(std::ostream& out) const;
void getStatusPerNode(std::ostream& out) const;
void getStatusPerBucket(std::ostream& out) const;
diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp
index 7f8da8e76e7..b8ed6b8ec91 100644
--- a/storage/src/vespa/storage/persistence/asynchandler.cpp
+++ b/storage/src/vespa/storage/persistence/asynchandler.cpp
@@ -104,7 +104,7 @@ AsyncHandler::handleRunTask(RunTaskCommand& cmd, MessageTracker::UP tracker) con
});
spi::Bucket bucket(cmd.getBucket());
auto onDone = std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, cmd.getBucketId(), std::move(task));
- cmd.run(bucket, std::make_shared<vespalib::KeepAlive<decltype(onDone)>>(std::move(onDone)));
+ cmd.task().run(bucket, std::make_shared<vespalib::KeepAlive<decltype(onDone)>>(std::move(onDone)));
return tracker;
}
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
index babee2c6351..a46b4205570 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
@@ -1,7 +1,7 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include "filestormanager.h"
#include "filestorhandlerimpl.h"
+#include "filestormanager.h"
#include <vespa/storage/bucketdb/minimumusedbitstracker.h>
#include <vespa/storage/common/bucketmessages.h>
#include <vespa/storage/common/content_bucket_space_repo.h>
@@ -14,7 +14,6 @@
#include <vespa/storage/persistence/persistencethread.h>
#include <vespa/storage/persistence/persistencehandler.h>
#include <vespa/storage/persistence/provider_error_wrapper.h>
-#include <vespa/persistence/spi/bucket_tasks.h>
#include <vespa/storageapi/message/bucketsplitting.h>
#include <vespa/storageapi/message/state.h>
#include <vespa/storageapi/message/persistence.h>
@@ -24,8 +23,6 @@
#include <vespa/vespalib/util/stringfmt.h>
#include <vespa/vespalib/util/idestructorcallback.h>
#include <vespa/vespalib/util/sequencedtaskexecutor.h>
-#include <vespa/vespalib/util/count_down_latch.h>
-#include <vespa/vespalib/stllike/hash_map.hpp>
#include <thread>
#include <vespa/log/bufferedlogger.h>
@@ -79,13 +76,6 @@ FileStorManager(const config::ConfigUri & configUri, spi::PersistenceProvider& p
_configFetcher(configUri.getContext()),
_use_async_message_handling_on_schedule(false),
_metrics(std::make_unique<FileStorMetrics>()),
- _filestorHandler(),
- _sequencedExecutor(),
- _executeLock(),
- _syncCond(),
- _notifyAfterExecute(false),
- _executeCount(0),
- _tasksInExecute(),
_closed(false),
_lock(),
_host_info_reporter(_component.getStateUpdater()),
@@ -820,13 +810,6 @@ FileStorManager::sendUp(const std::shared_ptr<api::StorageMessage>& msg)
void FileStorManager::onClose()
{
LOG(debug, "Start closing");
- std::unique_ptr<vespalib::IDestructorCallback> toDestruct;
- {
- std::lock_guard guard(_executeLock);
- toDestruct = std::move(_bucketExecutorRegistration);
- }
- toDestruct.reset();
- _resource_usage_listener_registration.reset();
// Avoid getting config during shutdown
_configFetcher.close();
LOG(debug, "Closed _configFetcher.");
@@ -990,70 +973,18 @@ void FileStorManager::initialize_bucket_databases_from_provider() {
_init_handler.notifyDoneInitializing();
}
-class FileStorManager::TrackExecutedTasks : public vespalib::IDestructorCallback {
-public:
- TrackExecutedTasks(std::lock_guard<std::mutex> & guard, FileStorManager & manager);
- ~TrackExecutedTasks() override;
-private:
- FileStorManager & _manager;
- size_t _serialNum;
-};
-
-FileStorManager::TrackExecutedTasks::TrackExecutedTasks(std::lock_guard<std::mutex> & guard, FileStorManager & manager)
- : _manager(manager),
- _serialNum(_manager._executeCount++)
-{
- (void) guard;
- _manager._tasksInExecute.insert(_serialNum);
-}
-
-FileStorManager::TrackExecutedTasks::~TrackExecutedTasks() {
- std::lock_guard guard(_manager._executeLock);
- _manager._tasksInExecute.erase(_serialNum);
- if (_manager._notifyAfterExecute) {
- _manager._syncCond.notify_all();
- }
-}
-
std::unique_ptr<spi::BucketTask>
FileStorManager::execute(const spi::Bucket &bucket, std::unique_ptr<spi::BucketTask> task) {
StorBucketDatabase::WrappedEntry entry(_component.getBucketDatabase(bucket.getBucketSpace()).get(
bucket.getBucketId(), "FileStorManager::execute"));
if (entry.exist()) {
- std::unique_ptr<TrackExecutedTasks> trackTasks;
- {
- std::lock_guard guard(_executeLock);
- if (_bucketExecutorRegistration) {
- trackTasks = std::make_unique<TrackExecutedTasks>(guard, *this);
- }
- }
- if (trackTasks) {
- _filestorHandler->schedule(std::make_shared<RunTaskCommand>(bucket, std::move(trackTasks), std::move(task)));
- }
+ _filestorHandler->schedule(std::make_shared<RunTaskCommand>(bucket, std::move(task)));
}
return task;
}
-namespace {
-bool
-areTasksCompleteUntil(const vespalib::hash_set<size_t> &inFlight, size_t limit) {
- for (size_t serial : inFlight) {
- if (serial < limit) {
- return false;
- }
- }
- return true;
-}
-}
-
void
FileStorManager::sync() {
- std::unique_lock guard(_executeLock);
- _notifyAfterExecute = true;
- _syncCond.wait(guard, [this, limit=_executeCount]() {
- return areTasksCompleteUntil(_tasksInExecute, limit);
- });
- _notifyAfterExecute = false;
}
} // storage
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h
index fea1b9c7ea6..6eaef45e9bd 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h
@@ -72,19 +72,13 @@ class FileStorManager : public StorageLinkQueued,
std::shared_ptr<FileStorMetrics> _metrics;
std::unique_ptr<FileStorHandler> _filestorHandler;
std::unique_ptr<vespalib::ISequencedTaskExecutor> _sequencedExecutor;
- std::mutex _executeLock;
- std::condition_variable _syncCond;
- bool _notifyAfterExecute;
- size_t _executeCount;
- vespalib::hash_set<size_t> _tasksInExecute;
-
bool _closed;
std::mutex _lock;
std::unique_ptr<vespalib::IDestructorCallback> _bucketExecutorRegistration;
ServiceLayerHostInfoReporter _host_info_reporter;
std::unique_ptr<vespalib::IDestructorCallback> _resource_usage_listener_registration;
- class TrackExecutedTasks;
+
public:
FileStorManager(const config::ConfigUri &, spi::PersistenceProvider&,
ServiceLayerComponentRegister&, DoneInitializeHandler&, HostInfo&);
diff --git a/storage/src/vespa/storage/persistence/messages.cpp b/storage/src/vespa/storage/persistence/messages.cpp
index e7d8158fb13..7ccb3ee895d 100644
--- a/storage/src/vespa/storage/persistence/messages.cpp
+++ b/storage/src/vespa/storage/persistence/messages.cpp
@@ -1,7 +1,6 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "messages.h"
-#include <vespa/vespalib/util/idestructorcallback.h>
#include <ostream>
#include <cassert>
@@ -184,26 +183,16 @@ RunTaskCommand::makeReply() {
return std::make_unique<RunTaskReply>(*this);
}
-RunTaskCommand::RunTaskCommand(const spi::Bucket &bucket,
- std::unique_ptr<vespalib::IDestructorCallback> afterRun,
- std::unique_ptr<spi::BucketTask> task)
+RunTaskCommand::RunTaskCommand(const spi::Bucket &bucket, std::unique_ptr<spi::BucketTask> task)
: api::InternalCommand(ID),
_task(std::move(task)),
- _afterRun(std::move(afterRun)),
_bucket(bucket)
-{ }
-
-RunTaskCommand::~RunTaskCommand() = default;
-
-void
-RunTaskCommand::run(const spi::Bucket & bucket, std::shared_ptr<vespalib::IDestructorCallback> onComplete)
{
- if (_task) {
- _task->run(bucket, std::move(onComplete));
- }
- _afterRun.reset();
+ assert(_task);
}
+RunTaskCommand::~RunTaskCommand() = default;
+
void
RunTaskCommand::print(std::ostream& out, bool verbose, const std::string& indent) const {
out << "RunTaskCommand(" << _bucket <<")";
diff --git a/storage/src/vespa/storage/persistence/messages.h b/storage/src/vespa/storage/persistence/messages.h
index d26543c7797..043747d10d2 100644
--- a/storage/src/vespa/storage/persistence/messages.h
+++ b/storage/src/vespa/storage/persistence/messages.h
@@ -244,20 +244,19 @@ public:
class RunTaskCommand : public api::InternalCommand {
public:
static constexpr uint32_t ID = 1011;
- RunTaskCommand(const spi::Bucket &bucket,
- std::unique_ptr<vespalib::IDestructorCallback> afterRun,
- std::unique_ptr<spi::BucketTask> task);
+ RunTaskCommand(const spi::Bucket &bucket, std::unique_ptr<spi::BucketTask> task);
~RunTaskCommand();
document::Bucket getBucket() const override { return _bucket.getBucket(); }
std::unique_ptr<api::StorageReply> makeReply() override;
- void run(const spi::Bucket & bucket, std::shared_ptr<vespalib::IDestructorCallback> onComplete);
+ spi::BucketTask & task() & {
+ return *_task;
+ }
void print(std::ostream& out, bool verbose, const std::string& indent) const override;
private:
- std::unique_ptr<spi::BucketTask> _task;
- std::unique_ptr<vespalib::IDestructorCallback> _afterRun;
- spi::Bucket _bucket;
+ std::unique_ptr<spi::BucketTask> _task;
+ spi::Bucket _bucket;
};
// Simple reply for matching the RunTaskCommand