aboutsummaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-02-01 14:23:40 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2021-02-01 14:23:40 +0000
commit1ee55333e81ce57989f010c1f1e42e8afa8709d9 (patch)
tree1d492152de0a378eeca05dabe5bd0e32c9dee2d5 /storage
parent06cb101c01b422011d8875cd174490a2cfb35cea (diff)
Implement BucketExecutor::sync.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/persistence/filestorage/filestormanagertest.cpp33
-rw-r--r--storage/src/vespa/storage/bucketdb/bucketcopy.h44
-rw-r--r--storage/src/vespa/storage/bucketdb/bucketmanager.cpp8
-rw-r--r--storage/src/vespa/storage/distributor/pending_bucket_space_db_transition_entry.h2
-rw-r--r--storage/src/vespa/storage/distributor/pendingmessagetracker.h12
-rw-r--r--storage/src/vespa/storage/persistence/asynchandler.cpp2
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp57
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.h7
-rw-r--r--storage/src/vespa/storage/persistence/messages.cpp20
-rw-r--r--storage/src/vespa/storage/persistence/messages.h14
10 files changed, 145 insertions, 54 deletions
diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
index acccbb8b9b9..beecaddf5e4 100644
--- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
+++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
@@ -456,6 +456,39 @@ TEST_F(FileStorManagerTest, running_task_against_existing_bucket_works) {
EXPECT_EQ(1, numInvocations);
}
+TEST_F(FileStorManagerTest, sync_waits_for_already_started_tasks) {
+ TestFileStorComponents c(*this);
+
+ setClusterState("storage:3 distributor:3");
+ EXPECT_TRUE(getDummyPersistence().getClusterState().nodeUp());
+
+ auto executor = getDummyPersistence().get_bucket_executor();
+ ASSERT_TRUE(executor);
+
+ spi::Bucket b1 = makeSpiBucket(document::BucketId(1));
+
+ createBucket(b1.getBucketId());
+
+ std::atomic<size_t> numInvocations(0);
+ vespalib::Gate gate;
+ auto response = executor->execute(b1, spi::makeBucketTask([&numInvocations, &gate](const spi::Bucket &, std::shared_ptr<IDestructorCallback>) {
+ gate.await();
+ numInvocations++;
+ }));
+ EXPECT_FALSE(response);
+ EXPECT_EQ(0, numInvocations);
+ std::atomic<bool> syncComplete(false);
+ std::thread thread([&syncComplete, &executor]() {
+ executor->sync();
+ syncComplete = true;
+ });
+ EXPECT_FALSE(syncComplete);
+ gate.countDown();
+ thread.join();
+ EXPECT_TRUE(syncComplete);
+ EXPECT_EQ(1, numInvocations);
+}
+
TEST_F(FileStorManagerTest, state_change) {
TestFileStorComponents c(*this);
diff --git a/storage/src/vespa/storage/bucketdb/bucketcopy.h b/storage/src/vespa/storage/bucketdb/bucketcopy.h
index 94a1e63e53e..3e93e4594a6 100644
--- a/storage/src/vespa/storage/bucketdb/bucketcopy.h
+++ b/storage/src/vespa/storage/bucketdb/bucketcopy.h
@@ -28,9 +28,9 @@ public:
{
}
- bool trusted() const { return _flags & TRUSTED; }
+ bool trusted() const noexcept { return _flags & TRUSTED; }
- BucketCopy& setTrusted(bool val = true) {
+ BucketCopy& setTrusted(bool val = true) noexcept {
if (!val) {
clearTrusted();
} else {
@@ -40,46 +40,44 @@ public:
return *this;
}
- void clearTrusted() { _flags &= ~TRUSTED; }
+ void clearTrusted() noexcept { _flags &= ~TRUSTED; }
- bool valid() const { return getBucketInfo().valid(); }
- bool empty() const { return getBucketInfo().empty(); }
- bool wasRecentlyCreated() const {
+ bool valid() const noexcept { return getBucketInfo().valid(); }
+ bool empty() const noexcept { return getBucketInfo().empty(); }
+ bool wasRecentlyCreated() const noexcept {
return (getChecksum() == 1
&& getDocumentCount() == 0
&& getTotalDocumentSize() == 0);
}
- static BucketCopy recentlyCreatedCopy(uint64_t timestamp, uint16_t nodeIdx)
- {
+ static BucketCopy recentlyCreatedCopy(uint64_t timestamp, uint16_t nodeIdx) noexcept {
return BucketCopy(timestamp, nodeIdx, api::BucketInfo(1, 0, 0, 0, 0));
}
- uint16_t getNode() const { return _node; }
- uint64_t getTimestamp() const { return _timestamp; }
+ uint16_t getNode() const noexcept { return _node; }
+ uint64_t getTimestamp() const noexcept { return _timestamp; }
- uint32_t getChecksum() const { return _info.getChecksum(); }
- uint32_t getDocumentCount() const { return _info.getDocumentCount(); }
- uint32_t getTotalDocumentSize() const
- { return _info.getTotalDocumentSize(); }
- uint32_t getMetaCount() const { return _info.getMetaCount(); }
- uint32_t getUsedFileSize() const { return _info.getUsedFileSize(); }
- bool active() const { return _info.isActive(); }
- bool ready() const { return _info.isReady(); }
+ uint32_t getChecksum() const noexcept { return _info.getChecksum(); }
+ uint32_t getDocumentCount() const noexcept { return _info.getDocumentCount(); }
+ uint32_t getTotalDocumentSize() const noexcept { return _info.getTotalDocumentSize(); }
+ uint32_t getMetaCount() const noexcept { return _info.getMetaCount(); }
+ uint32_t getUsedFileSize() const noexcept { return _info.getUsedFileSize(); }
+ bool active() const noexcept { return _info.isActive(); }
+ bool ready() const noexcept { return _info.isReady(); }
- const api::BucketInfo& getBucketInfo() const { return _info; }
+ const api::BucketInfo& getBucketInfo() const noexcept { return _info; }
- void setBucketInfo(uint64_t timestamp, const api::BucketInfo& bInfo) {
+ void setBucketInfo(uint64_t timestamp, const api::BucketInfo& bInfo) noexcept {
_info = bInfo;
_timestamp = timestamp;
}
- void setActive(bool setactive) {
+ void setActive(bool setactive) noexcept {
_info.setActive(setactive);
}
bool consistentWith(const BucketCopy& other,
- bool countInvalidAsConsistent = false) const
+ bool countInvalidAsConsistent = false) const noexcept
{
// If both are valid, check checksum and doc count.
if (valid() && other.valid()) {
@@ -94,7 +92,7 @@ public:
std::string toString() const;
- bool operator==(const BucketCopy& other) const {
+ bool operator==(const BucketCopy& other) const noexcept {
return
getBucketInfo() == other.getBucketInfo() &&
_flags == other._flags;
diff --git a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp
index 5be6f310c71..2d70ee8d3ba 100644
--- a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp
+++ b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp
@@ -166,17 +166,17 @@ namespace {
uint64_t active;
uint64_t ready;
- Count() : docs(0), bytes(0), buckets(0), active(0), ready(0) {}
+ Count() noexcept : docs(0), bytes(0), buckets(0), active(0), ready(0) {}
};
Count count;
uint32_t lowestUsedBit;
- MetricsUpdater()
+ MetricsUpdater() noexcept
: count(), lowestUsedBit(58) {}
void operator()(document::BucketId::Type bucketId,
- const StorBucketDatabase::Entry& data)
+ const StorBucketDatabase::Entry& data) noexcept
{
document::BucketId bucket(
document::BucketId::keyToBucketId(bucketId));
@@ -198,7 +198,7 @@ namespace {
}
};
- void add(const MetricsUpdater& rhs) {
+ void add(const MetricsUpdater& rhs) noexcept {
auto& d = count;
auto& s = rhs.count;
d.buckets += s.buckets;
diff --git a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition_entry.h b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition_entry.h
index 124ee1bdf45..785419e78cf 100644
--- a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition_entry.h
+++ b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition_entry.h
@@ -8,7 +8,7 @@ namespace storage::distributor::dbtransition {
struct Entry {
Entry(const document::BucketId& bid,
- const BucketCopy& copy_)
+ const BucketCopy& copy_) noexcept
: bucket_key(bid.toKey()),
copy(copy_)
{}
diff --git a/storage/src/vespa/storage/distributor/pendingmessagetracker.h b/storage/src/vespa/storage/distributor/pendingmessagetracker.h
index 51971a276b4..13d83157150 100644
--- a/storage/src/vespa/storage/distributor/pendingmessagetracker.h
+++ b/storage/src/vespa/storage/distributor/pendingmessagetracker.h
@@ -196,18 +196,6 @@ private:
// to be present for that exact purpose.
mutable std::mutex _lock;
- /**
- * Increment latency and operation count stats for the node the message
- * was sent towards based on the registered send time and the current time.
- *
- * In the event that system time has moved backwards across sending a
- * command and reciving its reply, the latency will not be recorded but
- * the total number of messages will increase.
- *
- * _lock MUST be held upon invocation.
- */
- void updateNodeStatsOnReply(const MessageEntry& entry);
-
void getStatusStartPage(std::ostream& out) const;
void getStatusPerNode(std::ostream& out) const;
void getStatusPerBucket(std::ostream& out) const;
diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp
index b8ed6b8ec91..7f8da8e76e7 100644
--- a/storage/src/vespa/storage/persistence/asynchandler.cpp
+++ b/storage/src/vespa/storage/persistence/asynchandler.cpp
@@ -104,7 +104,7 @@ AsyncHandler::handleRunTask(RunTaskCommand& cmd, MessageTracker::UP tracker) con
});
spi::Bucket bucket(cmd.getBucket());
auto onDone = std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, cmd.getBucketId(), std::move(task));
- cmd.task().run(bucket, std::make_shared<vespalib::KeepAlive<decltype(onDone)>>(std::move(onDone)));
+ cmd.run(bucket, std::make_shared<vespalib::KeepAlive<decltype(onDone)>>(std::move(onDone)));
return tracker;
}
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
index a46b4205570..38def4f775e 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
@@ -1,7 +1,7 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include "filestorhandlerimpl.h"
#include "filestormanager.h"
+#include "filestorhandlerimpl.h"
#include <vespa/storage/bucketdb/minimumusedbitstracker.h>
#include <vespa/storage/common/bucketmessages.h>
#include <vespa/storage/common/content_bucket_space_repo.h>
@@ -14,6 +14,7 @@
#include <vespa/storage/persistence/persistencethread.h>
#include <vespa/storage/persistence/persistencehandler.h>
#include <vespa/storage/persistence/provider_error_wrapper.h>
+#include <vespa/persistence/spi/bucket_tasks.h>
#include <vespa/storageapi/message/bucketsplitting.h>
#include <vespa/storageapi/message/state.h>
#include <vespa/storageapi/message/persistence.h>
@@ -23,6 +24,8 @@
#include <vespa/vespalib/util/stringfmt.h>
#include <vespa/vespalib/util/idestructorcallback.h>
#include <vespa/vespalib/util/sequencedtaskexecutor.h>
+#include <vespa/vespalib/util/count_down_latch.h>
+#include <vespa/vespalib/stllike/hash_map.hpp>
#include <thread>
#include <vespa/log/bufferedlogger.h>
@@ -76,6 +79,11 @@ FileStorManager(const config::ConfigUri & configUri, spi::PersistenceProvider& p
_configFetcher(configUri.getContext()),
_use_async_message_handling_on_schedule(false),
_metrics(std::make_unique<FileStorMetrics>()),
+ _filestorHandler(),
+ _sequencedExecutor(),
+ _executeLock(),
+ _executeCount(0),
+ _tasksInExecute(),
_closed(false),
_lock(),
_host_info_reporter(_component.getStateUpdater()),
@@ -973,18 +981,63 @@ void FileStorManager::initialize_bucket_databases_from_provider() {
_init_handler.notifyDoneInitializing();
}
+class FileStorManager::TrackExecutedTasks : public vespalib::Executor::Task {
+public:
+ TrackExecutedTasks(FileStorManager & manager);
+ void run() override;
+private:
+ FileStorManager & _manager;
+ size_t _serialNum;
+};
+
+FileStorManager::TrackExecutedTasks::TrackExecutedTasks(FileStorManager & manager)
+ : _manager(manager),
+ _serialNum(0)
+{
+ std::lock_guard guard(_manager._executeLock);
+ _serialNum = _manager._executeCount++;
+ _manager._tasksInExecute.insert(_serialNum);
+}
+
+void
+FileStorManager::TrackExecutedTasks::run() {
+ std::lock_guard guard(_manager._executeLock);
+ _manager._tasksInExecute.erase(_serialNum);
+}
+
std::unique_ptr<spi::BucketTask>
FileStorManager::execute(const spi::Bucket &bucket, std::unique_ptr<spi::BucketTask> task) {
StorBucketDatabase::WrappedEntry entry(_component.getBucketDatabase(bucket.getBucketSpace()).get(
bucket.getBucketId(), "FileStorManager::execute"));
if (entry.exist()) {
- _filestorHandler->schedule(std::make_shared<RunTaskCommand>(bucket, std::move(task)));
+ auto trackBuckets = std::make_unique<TrackExecutedTasks>(*this);
+ _filestorHandler->schedule(std::make_shared<RunTaskCommand>(bucket, std::move(trackBuckets), std::move(task)));
}
return task;
}
+bool
+FileStorManager::areTaskCompleteUntil(size_t limit) const
+{
+ std::lock_guard guard(_executeLock);
+ for (size_t serial : _tasksInExecute) {
+ if (serial < limit) {
+ return false;
+ }
+ }
+ return true;
+}
+
void
FileStorManager::sync() {
+ size_t serialNumLimit;
+ {
+ std::lock_guard guard(_executeLock);
+ serialNumLimit = _executeCount;
+ }
+ while ( ! areTaskCompleteUntil(serialNumLimit)) {
+ std::this_thread::sleep_for(100us);
+ }
}
} // storage
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h
index 6eaef45e9bd..da47ed64f4a 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h
@@ -72,13 +72,18 @@ class FileStorManager : public StorageLinkQueued,
std::shared_ptr<FileStorMetrics> _metrics;
std::unique_ptr<FileStorHandler> _filestorHandler;
std::unique_ptr<vespalib::ISequencedTaskExecutor> _sequencedExecutor;
+ mutable std::mutex _executeLock;
+ size_t _executeCount;
+ vespalib::hash_set<size_t> _tasksInExecute;
+
bool _closed;
std::mutex _lock;
std::unique_ptr<vespalib::IDestructorCallback> _bucketExecutorRegistration;
ServiceLayerHostInfoReporter _host_info_reporter;
std::unique_ptr<vespalib::IDestructorCallback> _resource_usage_listener_registration;
-
+ bool areTaskCompleteUntil(size_t serialNum) const;
+ class TrackExecutedTasks;
public:
FileStorManager(const config::ConfigUri &, spi::PersistenceProvider&,
ServiceLayerComponentRegister&, DoneInitializeHandler&, HostInfo&);
diff --git a/storage/src/vespa/storage/persistence/messages.cpp b/storage/src/vespa/storage/persistence/messages.cpp
index 7ccb3ee895d..14f41850a4f 100644
--- a/storage/src/vespa/storage/persistence/messages.cpp
+++ b/storage/src/vespa/storage/persistence/messages.cpp
@@ -183,17 +183,29 @@ RunTaskCommand::makeReply() {
return std::make_unique<RunTaskReply>(*this);
}
-RunTaskCommand::RunTaskCommand(const spi::Bucket &bucket, std::unique_ptr<spi::BucketTask> task)
+RunTaskCommand::RunTaskCommand(const spi::Bucket &bucket,
+ std::unique_ptr<vespalib::Executor::Task> afterRun,
+ std::unique_ptr<spi::BucketTask> task)
: api::InternalCommand(ID),
_task(std::move(task)),
+ _afterRun(std::move(afterRun)),
_bucket(bucket)
-{
- assert(_task);
-}
+{ }
RunTaskCommand::~RunTaskCommand() = default;
void
+RunTaskCommand::run(const spi::Bucket & bucket, std::shared_ptr<vespalib::IDestructorCallback> onComplete)
+{
+ if (_task) {
+ _task->run(bucket, std::move(onComplete));
+ }
+ if (_afterRun) {
+ _afterRun->run();
+ }
+}
+
+void
RunTaskCommand::print(std::ostream& out, bool verbose, const std::string& indent) const {
out << "RunTaskCommand(" << _bucket <<")";
diff --git a/storage/src/vespa/storage/persistence/messages.h b/storage/src/vespa/storage/persistence/messages.h
index 043747d10d2..74fee6a2b4f 100644
--- a/storage/src/vespa/storage/persistence/messages.h
+++ b/storage/src/vespa/storage/persistence/messages.h
@@ -7,6 +7,7 @@
#include <vespa/persistence/spi/selection.h>
#include <vespa/persistence/spi/read_consistency.h>
#include <vespa/persistence/spi/bucketexecutor.h>
+#include <vespa/vespalib/util/executor.h>
namespace storage {
@@ -244,19 +245,20 @@ public:
class RunTaskCommand : public api::InternalCommand {
public:
static constexpr uint32_t ID = 1011;
- RunTaskCommand(const spi::Bucket &bucket, std::unique_ptr<spi::BucketTask> task);
+ RunTaskCommand(const spi::Bucket &bucket,
+ std::unique_ptr<vespalib::Executor::Task> afterRun,
+ std::unique_ptr<spi::BucketTask> task);
~RunTaskCommand();
document::Bucket getBucket() const override { return _bucket.getBucket(); }
std::unique_ptr<api::StorageReply> makeReply() override;
- spi::BucketTask & task() & {
- return *_task;
- }
+ void run(const spi::Bucket & bucket, std::shared_ptr<vespalib::IDestructorCallback> onComplete);
void print(std::ostream& out, bool verbose, const std::string& indent) const override;
private:
- std::unique_ptr<spi::BucketTask> _task;
- spi::Bucket _bucket;
+ std::unique_ptr<spi::BucketTask> _task;
+ std::unique_ptr<vespalib::Executor::Task> _afterRun;
+ spi::Bucket _bucket;
};
// Simple reply for matching the RunTaskCommand