summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-02-02 05:28:23 +0100
committerGitHub <noreply@github.com>2021-02-02 05:28:23 +0100
commit82decc80ef372dfdcde493c5d84b4a11ff0655bf (patch)
tree75b233d56e507ea7b768d1ce45529d374946601c
parent6ad76b1e5f1239cb2ecc41cd69eff2888916941e (diff)
parent0a2af44e48c9f2d5cf7df1877d71591bead4951f (diff)
Merge pull request #16322 from vespa-engine/revert-16308-balder/implement-sync
Revert "Implement BucketExecutor::sync."
-rw-r--r--document/src/vespa/document/bucket/bucket.h2
-rw-r--r--document/src/vespa/document/bucket/bucketid.cpp8
-rw-r--r--document/src/vespa/document/bucket/bucketid.h42
-rw-r--r--fastos/src/vespa/fastos/ringbuffer.h17
-rw-r--r--persistence/src/vespa/persistence/spi/bucket.h8
-rw-r--r--storage/src/tests/persistence/filestorage/filestormanagertest.cpp34
-rw-r--r--storage/src/vespa/storage/bucketdb/bucketcopy.h44
-rw-r--r--storage/src/vespa/storage/bucketdb/bucketmanager.cpp8
-rw-r--r--storage/src/vespa/storage/distributor/pending_bucket_space_db_transition_entry.h2
-rw-r--r--storage/src/vespa/storage/distributor/pendingmessagetracker.h12
-rw-r--r--storage/src/vespa/storage/persistence/asynchandler.cpp2
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp60
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.h8
-rw-r--r--storage/src/vespa/storage/persistence/messages.cpp20
-rw-r--r--storage/src/vespa/storage/persistence/messages.h14
-rw-r--r--storageapi/src/vespa/storageapi/buckets/bucketinfo.cpp5
-rw-r--r--storageapi/src/vespa/storageapi/buckets/bucketinfo.h40
-rw-r--r--vespalib/src/vespa/vespalib/util/executor.h4
18 files changed, 126 insertions, 204 deletions
diff --git a/document/src/vespa/document/bucket/bucket.h b/document/src/vespa/document/bucket/bucket.h
index f189c2951c9..44068e1c443 100644
--- a/document/src/vespa/document/bucket/bucket.h
+++ b/document/src/vespa/document/bucket/bucket.h
@@ -32,7 +32,7 @@ public:
vespalib::string toString() const;
struct hash {
- size_t operator () (const Bucket& b) const noexcept {
+ size_t operator () (const Bucket& b) const {
size_t hash1 = BucketId::hash()(b.getBucketId());
size_t hash2 = BucketSpace::hash()(b.getBucketSpace());
// Formula taken from std::hash_combine proposal
diff --git a/document/src/vespa/document/bucket/bucketid.cpp b/document/src/vespa/document/bucket/bucketid.cpp
index 1b9cf1e5304..668798d6c39 100644
--- a/document/src/vespa/document/bucket/bucketid.cpp
+++ b/document/src/vespa/document/bucket/bucketid.cpp
@@ -71,7 +71,7 @@ Initialize _initializeUsedMasks;
}
-void BucketId::initialize() noexcept {
+void BucketId::initialize() {
fillUsedMasks(BucketId::_usedMasks, BucketId::maxNumBits);
fillStripMasks(BucketId::_stripMasks, BucketId::maxNumBits);
}
@@ -91,7 +91,7 @@ void BucketId::throwFailedSetUsedBits(uint32_t used, uint32_t availBits) {
}
BucketId::Type
-BucketId::reverse(Type id) noexcept
+BucketId::reverse(Type id)
{
id = ((id & 0x5555555555555555l) << 1) | ((id & 0xaaaaaaaaaaaaaaaal) >> 1);
id = ((id & 0x3333333333333333l) << 2) | ((id & 0xccccccccccccccccl) >> 2);
@@ -100,7 +100,7 @@ BucketId::reverse(Type id) noexcept
}
BucketId::Type
-BucketId::keyToBucketId(Type key) noexcept
+BucketId::keyToBucketId(Type key)
{
Type retVal = reverse(key);
@@ -113,7 +113,7 @@ BucketId::keyToBucketId(Type key) noexcept
}
bool
-BucketId::contains(const BucketId& id) const noexcept
+BucketId::contains(const BucketId& id) const
{
if (id.getUsedBits() < getUsedBits()) {
return false;
diff --git a/document/src/vespa/document/bucket/bucketid.h b/document/src/vespa/document/bucket/bucketid.h
index 675a0d23ebd..b31f9080acc 100644
--- a/document/src/vespa/document/bucket/bucketid.h
+++ b/document/src/vespa/document/bucket/bucketid.h
@@ -37,7 +37,7 @@ class BucketId
{
public:
struct hash {
- size_t operator () (const BucketId& g) const noexcept {
+ size_t operator () (const BucketId& g) const {
return g.getId();
}
};
@@ -55,23 +55,23 @@ public:
/** Create a bucket id using a set of bits from a raw unchecked value. */
BucketId(uint32_t useBits, Type id) noexcept : _id(createUsedBits(useBits, id)) { }
- bool operator<(const BucketId& id) const noexcept {
+ bool operator<(const BucketId& id) const {
return getId() < id.getId();
}
- bool operator==(const BucketId& id) const noexcept { return getId() == id.getId(); }
- bool operator!=(const BucketId& id) const noexcept { return getId() != id.getId(); }
+ bool operator==(const BucketId& id) const { return getId() == id.getId(); }
+ bool operator!=(const BucketId& id) const { return getId() != id.getId(); }
vespalib::string toString() const;
- bool valid() const noexcept {
+ bool valid() const {
return validUsedBits(getUsedBits());
}
- static bool validUsedBits(uint32_t usedBits) noexcept {
+ static bool validUsedBits(uint32_t usedBits) {
return (usedBits >= minNumBits) && (usedBits <= maxNumBits);
}
- bool isSet() const noexcept {
+ bool isSet() const {
return _id != 0u;
}
/**
@@ -79,14 +79,14 @@ public:
* verify that two different documents belong to the same bucket given some
* level of bucket splitting, use this to ignore the unused bits.
*/
- BucketId stripUnused() const noexcept { return BucketId(getUsedBits(), getId()); }
+ BucketId stripUnused() const { return BucketId(getUsedBits(), getId()); }
/**
* Checks whether the given bucket is contained within this bucket. That is,
* if it is the same bucket, or if it is a bucket using more bits, which is
* identical to this one if set to use as many bits as this one.
*/
- bool contains(const BucketId& id) const noexcept;
+ bool contains(const BucketId& id) const;
// Functions exposing internals we want to make users independent of
@@ -97,7 +97,7 @@ public:
static constexpr uint32_t maxNumBits = (8 * sizeof(Type) - CountBits);
static constexpr uint32_t minNumBits = 1u; // See comment above.
- uint32_t getUsedBits() const noexcept { return _id >> maxNumBits; }
+ uint32_t getUsedBits() const { return _id >> maxNumBits; }
void setUsedBits(uint32_t used) {
uint32_t availBits = maxNumBits;
@@ -113,22 +113,22 @@ public:
}
/** Get the bucket id value stripped of the bits that are not in use. */
- Type getId() const noexcept { return (_id & getStripMask()); }
+ Type getId() const { return (_id & getStripMask()); }
/**
* Get the bucket id value stripped of the count bits plus the bits that
* are not in use.
*/
- Type withoutCountBits() const noexcept { return (_id & getUsedMask()); }
+ Type withoutCountBits() const { return (_id & getUsedMask()); }
- Type getRawId() const noexcept { return _id; }
+ Type getRawId() const { return _id; }
/**
* Reverses the bits in the given number, except the countbits part.
* Used for sorting in the bucket database as we want related buckets
* to be sorted next to each other.
*/
- static Type bucketIdToKey(Type id) noexcept {
+ static Type bucketIdToKey(Type id) {
Type retVal = reverse(id);
Type usedCountLSB = id >> maxNumBits;
@@ -139,39 +139,39 @@ public:
return retVal;
}
- static Type keyToBucketId(Type key) noexcept ;
+ static Type keyToBucketId(Type key);
/**
* Reverses the bucket id bitwise, except the countbits part,
* and returns the value,
*/
- Type toKey() const noexcept { return bucketIdToKey(getId()); };
+ Type toKey() const { return bucketIdToKey(getId()); };
/**
* Reverses the order of the bits in the bucket id.
*/
- static Type reverse(Type id) noexcept;
+ static Type reverse(Type id);
/**
* Returns the value of the Nth bit, counted in the reverse order of the
* bucket id.
*/
- uint8_t getBit(uint32_t n) const noexcept {
+ uint8_t getBit(uint32_t n) const {
return (_id & ((Type)1 << n)) == 0 ? 0 : 1;
}
- static void initialize() noexcept;
+ static void initialize();
private:
static Type _usedMasks[maxNumBits+1];
static Type _stripMasks[maxNumBits+1];
Type _id;
- Type getUsedMask() const noexcept {
+ Type getUsedMask() const {
return _usedMasks[getUsedBits()];
}
- Type getStripMask() const noexcept {
+ Type getStripMask() const {
return _stripMasks[getUsedBits()];
}
diff --git a/fastos/src/vespa/fastos/ringbuffer.h b/fastos/src/vespa/fastos/ringbuffer.h
index 89b1bb84c9c..41c0af7385b 100644
--- a/fastos/src/vespa/fastos/ringbuffer.h
+++ b/fastos/src/vespa/fastos/ringbuffer.h
@@ -42,6 +42,18 @@ public:
_closed = false;
}
+ FastOS_RingBufferData *GetData () { return _data; }
+
+ void RepositionDataAt0 ()
+ {
+ uint8_t *src = &_data->_buffer[_dataIndex];
+ uint8_t *dst = _data->_buffer;
+
+ for(int i=0; i<_dataSize; i++)
+ *dst++ = *src++;
+ _dataIndex = 0;
+ }
+
FastOS_RingBuffer (int bufferSize)
: _closed(false),
_data(0),
@@ -81,6 +93,11 @@ public:
_dataSize += bytes;
}
+ int GetDataSize ()
+ {
+ return _dataSize;
+ }
+
int GetWriteSpace ()
{
int spaceLeft = _bufferSize - _dataSize;
diff --git a/persistence/src/vespa/persistence/spi/bucket.h b/persistence/src/vespa/persistence/spi/bucket.h
index 507cc80ad76..30393109cc9 100644
--- a/persistence/src/vespa/persistence/spi/bucket.h
+++ b/persistence/src/vespa/persistence/spi/bucket.h
@@ -21,12 +21,12 @@ public:
explicit Bucket(const document::Bucket& b) noexcept
: _bucket(b) {}
- const document::Bucket &getBucket() const noexcept { return _bucket; }
- document::BucketId getBucketId() const noexcept { return _bucket.getBucketId(); }
- document::BucketSpace getBucketSpace() const noexcept { return _bucket.getBucketSpace(); }
+ const document::Bucket &getBucket() const { return _bucket; }
+ document::BucketId getBucketId() const { return _bucket.getBucketId(); }
+ document::BucketSpace getBucketSpace() const { return _bucket.getBucketSpace(); }
/** Convert easily to a document bucket id to make class easy to use. */
- operator document::BucketId() const noexcept { return _bucket.getBucketId(); }
+ operator document::BucketId() const { return _bucket.getBucketId(); }
bool operator==(const Bucket& o) const noexcept {
return (_bucket == o._bucket);
diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
index 5a7a4394da4..acccbb8b9b9 100644
--- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
+++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
@@ -456,40 +456,6 @@ TEST_F(FileStorManagerTest, running_task_against_existing_bucket_works) {
EXPECT_EQ(1, numInvocations);
}
-TEST_F(FileStorManagerTest, sync_waits_for_already_started_tasks) {
- TestFileStorComponents c(*this);
-
- setClusterState("storage:3 distributor:3");
- EXPECT_TRUE(getDummyPersistence().getClusterState().nodeUp());
-
- auto executor = getDummyPersistence().get_bucket_executor();
- ASSERT_TRUE(executor);
-
- spi::Bucket b1 = makeSpiBucket(document::BucketId(1));
-
- createBucket(b1.getBucketId());
-
- std::atomic<size_t> numInvocations(0);
- vespalib::Gate gate;
- auto response = executor->execute(b1, spi::makeBucketTask([&numInvocations, &gate](const spi::Bucket &, std::shared_ptr<IDestructorCallback>) {
- gate.await();
- numInvocations++;
- }));
- EXPECT_FALSE(response);
- EXPECT_EQ(0, numInvocations);
- std::atomic<bool> syncComplete(false);
- std::thread thread([&syncComplete, &executor]() {
- executor->sync();
- syncComplete = true;
- });
- std::this_thread::sleep_for(100us);
- EXPECT_FALSE(syncComplete);
- gate.countDown();
- thread.join();
- EXPECT_TRUE(syncComplete);
- EXPECT_EQ(1, numInvocations);
-}
-
TEST_F(FileStorManagerTest, state_change) {
TestFileStorComponents c(*this);
diff --git a/storage/src/vespa/storage/bucketdb/bucketcopy.h b/storage/src/vespa/storage/bucketdb/bucketcopy.h
index 3e93e4594a6..94a1e63e53e 100644
--- a/storage/src/vespa/storage/bucketdb/bucketcopy.h
+++ b/storage/src/vespa/storage/bucketdb/bucketcopy.h
@@ -28,9 +28,9 @@ public:
{
}
- bool trusted() const noexcept { return _flags & TRUSTED; }
+ bool trusted() const { return _flags & TRUSTED; }
- BucketCopy& setTrusted(bool val = true) noexcept {
+ BucketCopy& setTrusted(bool val = true) {
if (!val) {
clearTrusted();
} else {
@@ -40,44 +40,46 @@ public:
return *this;
}
- void clearTrusted() noexcept { _flags &= ~TRUSTED; }
+ void clearTrusted() { _flags &= ~TRUSTED; }
- bool valid() const noexcept { return getBucketInfo().valid(); }
- bool empty() const noexcept { return getBucketInfo().empty(); }
- bool wasRecentlyCreated() const noexcept {
+ bool valid() const { return getBucketInfo().valid(); }
+ bool empty() const { return getBucketInfo().empty(); }
+ bool wasRecentlyCreated() const {
return (getChecksum() == 1
&& getDocumentCount() == 0
&& getTotalDocumentSize() == 0);
}
- static BucketCopy recentlyCreatedCopy(uint64_t timestamp, uint16_t nodeIdx) noexcept {
+ static BucketCopy recentlyCreatedCopy(uint64_t timestamp, uint16_t nodeIdx)
+ {
return BucketCopy(timestamp, nodeIdx, api::BucketInfo(1, 0, 0, 0, 0));
}
- uint16_t getNode() const noexcept { return _node; }
- uint64_t getTimestamp() const noexcept { return _timestamp; }
+ uint16_t getNode() const { return _node; }
+ uint64_t getTimestamp() const { return _timestamp; }
- uint32_t getChecksum() const noexcept { return _info.getChecksum(); }
- uint32_t getDocumentCount() const noexcept { return _info.getDocumentCount(); }
- uint32_t getTotalDocumentSize() const noexcept { return _info.getTotalDocumentSize(); }
- uint32_t getMetaCount() const noexcept { return _info.getMetaCount(); }
- uint32_t getUsedFileSize() const noexcept { return _info.getUsedFileSize(); }
- bool active() const noexcept { return _info.isActive(); }
- bool ready() const noexcept { return _info.isReady(); }
+ uint32_t getChecksum() const { return _info.getChecksum(); }
+ uint32_t getDocumentCount() const { return _info.getDocumentCount(); }
+ uint32_t getTotalDocumentSize() const
+ { return _info.getTotalDocumentSize(); }
+ uint32_t getMetaCount() const { return _info.getMetaCount(); }
+ uint32_t getUsedFileSize() const { return _info.getUsedFileSize(); }
+ bool active() const { return _info.isActive(); }
+ bool ready() const { return _info.isReady(); }
- const api::BucketInfo& getBucketInfo() const noexcept { return _info; }
+ const api::BucketInfo& getBucketInfo() const { return _info; }
- void setBucketInfo(uint64_t timestamp, const api::BucketInfo& bInfo) noexcept {
+ void setBucketInfo(uint64_t timestamp, const api::BucketInfo& bInfo) {
_info = bInfo;
_timestamp = timestamp;
}
- void setActive(bool setactive) noexcept {
+ void setActive(bool setactive) {
_info.setActive(setactive);
}
bool consistentWith(const BucketCopy& other,
- bool countInvalidAsConsistent = false) const noexcept
+ bool countInvalidAsConsistent = false) const
{
// If both are valid, check checksum and doc count.
if (valid() && other.valid()) {
@@ -92,7 +94,7 @@ public:
std::string toString() const;
- bool operator==(const BucketCopy& other) const noexcept {
+ bool operator==(const BucketCopy& other) const {
return
getBucketInfo() == other.getBucketInfo() &&
_flags == other._flags;
diff --git a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp
index 2d70ee8d3ba..5be6f310c71 100644
--- a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp
+++ b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp
@@ -166,17 +166,17 @@ namespace {
uint64_t active;
uint64_t ready;
- Count() noexcept : docs(0), bytes(0), buckets(0), active(0), ready(0) {}
+ Count() : docs(0), bytes(0), buckets(0), active(0), ready(0) {}
};
Count count;
uint32_t lowestUsedBit;
- MetricsUpdater() noexcept
+ MetricsUpdater()
: count(), lowestUsedBit(58) {}
void operator()(document::BucketId::Type bucketId,
- const StorBucketDatabase::Entry& data) noexcept
+ const StorBucketDatabase::Entry& data)
{
document::BucketId bucket(
document::BucketId::keyToBucketId(bucketId));
@@ -198,7 +198,7 @@ namespace {
}
};
- void add(const MetricsUpdater& rhs) noexcept {
+ void add(const MetricsUpdater& rhs) {
auto& d = count;
auto& s = rhs.count;
d.buckets += s.buckets;
diff --git a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition_entry.h b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition_entry.h
index 785419e78cf..124ee1bdf45 100644
--- a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition_entry.h
+++ b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition_entry.h
@@ -8,7 +8,7 @@ namespace storage::distributor::dbtransition {
struct Entry {
Entry(const document::BucketId& bid,
- const BucketCopy& copy_) noexcept
+ const BucketCopy& copy_)
: bucket_key(bid.toKey()),
copy(copy_)
{}
diff --git a/storage/src/vespa/storage/distributor/pendingmessagetracker.h b/storage/src/vespa/storage/distributor/pendingmessagetracker.h
index 13d83157150..51971a276b4 100644
--- a/storage/src/vespa/storage/distributor/pendingmessagetracker.h
+++ b/storage/src/vespa/storage/distributor/pendingmessagetracker.h
@@ -196,6 +196,18 @@ private:
// to be present for that exact purpose.
mutable std::mutex _lock;
+ /**
+ * Increment latency and operation count stats for the node the message
+ * was sent towards based on the registered send time and the current time.
+ *
+ * In the event that system time has moved backwards across sending a
+ * command and reciving its reply, the latency will not be recorded but
+ * the total number of messages will increase.
+ *
+ * _lock MUST be held upon invocation.
+ */
+ void updateNodeStatsOnReply(const MessageEntry& entry);
+
void getStatusStartPage(std::ostream& out) const;
void getStatusPerNode(std::ostream& out) const;
void getStatusPerBucket(std::ostream& out) const;
diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp
index 7f8da8e76e7..b8ed6b8ec91 100644
--- a/storage/src/vespa/storage/persistence/asynchandler.cpp
+++ b/storage/src/vespa/storage/persistence/asynchandler.cpp
@@ -104,7 +104,7 @@ AsyncHandler::handleRunTask(RunTaskCommand& cmd, MessageTracker::UP tracker) con
});
spi::Bucket bucket(cmd.getBucket());
auto onDone = std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, cmd.getBucketId(), std::move(task));
- cmd.run(bucket, std::make_shared<vespalib::KeepAlive<decltype(onDone)>>(std::move(onDone)));
+ cmd.task().run(bucket, std::make_shared<vespalib::KeepAlive<decltype(onDone)>>(std::move(onDone)));
return tracker;
}
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
index 42f9e876eee..a46b4205570 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
@@ -1,7 +1,7 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include "filestormanager.h"
#include "filestorhandlerimpl.h"
+#include "filestormanager.h"
#include <vespa/storage/bucketdb/minimumusedbitstracker.h>
#include <vespa/storage/common/bucketmessages.h>
#include <vespa/storage/common/content_bucket_space_repo.h>
@@ -14,7 +14,6 @@
#include <vespa/storage/persistence/persistencethread.h>
#include <vespa/storage/persistence/persistencehandler.h>
#include <vespa/storage/persistence/provider_error_wrapper.h>
-#include <vespa/persistence/spi/bucket_tasks.h>
#include <vespa/storageapi/message/bucketsplitting.h>
#include <vespa/storageapi/message/state.h>
#include <vespa/storageapi/message/persistence.h>
@@ -24,8 +23,6 @@
#include <vespa/vespalib/util/stringfmt.h>
#include <vespa/vespalib/util/idestructorcallback.h>
#include <vespa/vespalib/util/sequencedtaskexecutor.h>
-#include <vespa/vespalib/util/count_down_latch.h>
-#include <vespa/vespalib/stllike/hash_map.hpp>
#include <thread>
#include <vespa/log/bufferedlogger.h>
@@ -79,13 +76,6 @@ FileStorManager(const config::ConfigUri & configUri, spi::PersistenceProvider& p
_configFetcher(configUri.getContext()),
_use_async_message_handling_on_schedule(false),
_metrics(std::make_unique<FileStorMetrics>()),
- _filestorHandler(),
- _sequencedExecutor(),
- _executeLock(),
- _syncCond(),
- _notifyAfterExecute(false),
- _executeCount(0),
- _tasksInExecute(),
_closed(false),
_lock(),
_host_info_reporter(_component.getStateUpdater()),
@@ -983,64 +973,18 @@ void FileStorManager::initialize_bucket_databases_from_provider() {
_init_handler.notifyDoneInitializing();
}
-class FileStorManager::TrackExecutedTasks : public vespalib::Executor::Task {
-public:
- TrackExecutedTasks(FileStorManager & manager);
- void run() override;
-private:
- FileStorManager & _manager;
- size_t _serialNum;
-};
-
-FileStorManager::TrackExecutedTasks::TrackExecutedTasks(FileStorManager & manager)
- : _manager(manager),
- _serialNum(0)
-{
- std::lock_guard guard(_manager._executeLock);
- _serialNum = _manager._executeCount++;
- _manager._tasksInExecute.insert(_serialNum);
-}
-
-void
-FileStorManager::TrackExecutedTasks::run() {
- std::lock_guard guard(_manager._executeLock);
- _manager._tasksInExecute.erase(_serialNum);
- if (_manager._notifyAfterExecute) {
- _manager._syncCond.notify_all();
- }
-}
-
std::unique_ptr<spi::BucketTask>
FileStorManager::execute(const spi::Bucket &bucket, std::unique_ptr<spi::BucketTask> task) {
StorBucketDatabase::WrappedEntry entry(_component.getBucketDatabase(bucket.getBucketSpace()).get(
bucket.getBucketId(), "FileStorManager::execute"));
if (entry.exist()) {
- auto trackBuckets = std::make_unique<TrackExecutedTasks>(*this);
- _filestorHandler->schedule(std::make_shared<RunTaskCommand>(bucket, std::move(trackBuckets), std::move(task)));
+ _filestorHandler->schedule(std::make_shared<RunTaskCommand>(bucket, std::move(task)));
}
return task;
}
-namespace {
-bool
-areTasksCompleteUntil(const vespalib::hash_set<size_t> &inFlight, size_t limit) {
- for (size_t serial : inFlight) {
- if (serial < limit) {
- return false;
- }
- }
- return true;
-}
-}
-
void
FileStorManager::sync() {
- std::unique_lock guard(_executeLock);
- _notifyAfterExecute = true;
- _syncCond.wait(guard, [this, limit=_executeCount]() {
- return areTasksCompleteUntil(_tasksInExecute, limit);
- });
- _notifyAfterExecute = false;
}
} // storage
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h
index fea1b9c7ea6..6eaef45e9bd 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h
@@ -72,19 +72,13 @@ class FileStorManager : public StorageLinkQueued,
std::shared_ptr<FileStorMetrics> _metrics;
std::unique_ptr<FileStorHandler> _filestorHandler;
std::unique_ptr<vespalib::ISequencedTaskExecutor> _sequencedExecutor;
- std::mutex _executeLock;
- std::condition_variable _syncCond;
- bool _notifyAfterExecute;
- size_t _executeCount;
- vespalib::hash_set<size_t> _tasksInExecute;
-
bool _closed;
std::mutex _lock;
std::unique_ptr<vespalib::IDestructorCallback> _bucketExecutorRegistration;
ServiceLayerHostInfoReporter _host_info_reporter;
std::unique_ptr<vespalib::IDestructorCallback> _resource_usage_listener_registration;
- class TrackExecutedTasks;
+
public:
FileStorManager(const config::ConfigUri &, spi::PersistenceProvider&,
ServiceLayerComponentRegister&, DoneInitializeHandler&, HostInfo&);
diff --git a/storage/src/vespa/storage/persistence/messages.cpp b/storage/src/vespa/storage/persistence/messages.cpp
index 14f41850a4f..7ccb3ee895d 100644
--- a/storage/src/vespa/storage/persistence/messages.cpp
+++ b/storage/src/vespa/storage/persistence/messages.cpp
@@ -183,28 +183,16 @@ RunTaskCommand::makeReply() {
return std::make_unique<RunTaskReply>(*this);
}
-RunTaskCommand::RunTaskCommand(const spi::Bucket &bucket,
- std::unique_ptr<vespalib::Executor::Task> afterRun,
- std::unique_ptr<spi::BucketTask> task)
+RunTaskCommand::RunTaskCommand(const spi::Bucket &bucket, std::unique_ptr<spi::BucketTask> task)
: api::InternalCommand(ID),
_task(std::move(task)),
- _afterRun(std::move(afterRun)),
_bucket(bucket)
-{ }
-
-RunTaskCommand::~RunTaskCommand() = default;
-
-void
-RunTaskCommand::run(const spi::Bucket & bucket, std::shared_ptr<vespalib::IDestructorCallback> onComplete)
{
- if (_task) {
- _task->run(bucket, std::move(onComplete));
- }
- if (_afterRun) {
- _afterRun->run();
- }
+ assert(_task);
}
+RunTaskCommand::~RunTaskCommand() = default;
+
void
RunTaskCommand::print(std::ostream& out, bool verbose, const std::string& indent) const {
out << "RunTaskCommand(" << _bucket <<")";
diff --git a/storage/src/vespa/storage/persistence/messages.h b/storage/src/vespa/storage/persistence/messages.h
index 74fee6a2b4f..043747d10d2 100644
--- a/storage/src/vespa/storage/persistence/messages.h
+++ b/storage/src/vespa/storage/persistence/messages.h
@@ -7,7 +7,6 @@
#include <vespa/persistence/spi/selection.h>
#include <vespa/persistence/spi/read_consistency.h>
#include <vespa/persistence/spi/bucketexecutor.h>
-#include <vespa/vespalib/util/executor.h>
namespace storage {
@@ -245,20 +244,19 @@ public:
class RunTaskCommand : public api::InternalCommand {
public:
static constexpr uint32_t ID = 1011;
- RunTaskCommand(const spi::Bucket &bucket,
- std::unique_ptr<vespalib::Executor::Task> afterRun,
- std::unique_ptr<spi::BucketTask> task);
+ RunTaskCommand(const spi::Bucket &bucket, std::unique_ptr<spi::BucketTask> task);
~RunTaskCommand();
document::Bucket getBucket() const override { return _bucket.getBucket(); }
std::unique_ptr<api::StorageReply> makeReply() override;
- void run(const spi::Bucket & bucket, std::shared_ptr<vespalib::IDestructorCallback> onComplete);
+ spi::BucketTask & task() & {
+ return *_task;
+ }
void print(std::ostream& out, bool verbose, const std::string& indent) const override;
private:
- std::unique_ptr<spi::BucketTask> _task;
- std::unique_ptr<vespalib::Executor::Task> _afterRun;
- spi::Bucket _bucket;
+ std::unique_ptr<spi::BucketTask> _task;
+ spi::Bucket _bucket;
};
// Simple reply for matching the RunTaskCommand
diff --git a/storageapi/src/vespa/storageapi/buckets/bucketinfo.cpp b/storageapi/src/vespa/storageapi/buckets/bucketinfo.cpp
index 4c3b290d4d7..9e4e48d67f4 100644
--- a/storageapi/src/vespa/storageapi/buckets/bucketinfo.cpp
+++ b/storageapi/src/vespa/storageapi/buckets/bucketinfo.cpp
@@ -19,7 +19,8 @@ BucketInfo::BucketInfo() noexcept
_active(false)
{}
-BucketInfo::BucketInfo(uint32_t checksum, uint32_t docCount, uint32_t totDocSize) noexcept
+BucketInfo::BucketInfo(uint32_t checksum, uint32_t docCount,
+ uint32_t totDocSize) noexcept
: _lastModified(0),
_checksum(checksum),
_docCount(docCount),
@@ -72,7 +73,7 @@ BucketInfo::BucketInfo(uint32_t checksum, uint32_t docCount,
{}
bool
-BucketInfo::operator==(const BucketInfo& info) const noexcept
+BucketInfo::operator==(const BucketInfo& info) const
{
return (_checksum == info._checksum &&
_docCount == info._docCount &&
diff --git a/storageapi/src/vespa/storageapi/buckets/bucketinfo.h b/storageapi/src/vespa/storageapi/buckets/bucketinfo.h
index 7e6e7a2aed2..5b2de4b4d61 100644
--- a/storageapi/src/vespa/storageapi/buckets/bucketinfo.h
+++ b/storageapi/src/vespa/storageapi/buckets/bucketinfo.h
@@ -42,37 +42,37 @@ public:
uint32_t metaCount, uint32_t usedFileSize,
bool ready, bool active, Timestamp lastModified) noexcept;
- Timestamp getLastModified() const noexcept { return _lastModified; }
- uint32_t getChecksum() const noexcept { return _checksum; }
- uint32_t getDocumentCount() const noexcept { return _docCount; }
- uint32_t getTotalDocumentSize() const noexcept { return _totDocSize; }
- uint32_t getMetaCount() const noexcept { return _metaCount; }
- uint32_t getUsedFileSize() const noexcept { return _usedFileSize; }
- bool isReady() const noexcept { return _ready; }
- bool isActive() const noexcept { return _active; }
+ Timestamp getLastModified() const { return _lastModified; }
+ uint32_t getChecksum() const { return _checksum; }
+ uint32_t getDocumentCount() const { return _docCount; }
+ uint32_t getTotalDocumentSize() const { return _totDocSize; }
+ uint32_t getMetaCount() const { return _metaCount; }
+ uint32_t getUsedFileSize() const { return _usedFileSize; }
+ bool isReady() const { return _ready; }
+ bool isActive() const { return _active; }
- void setChecksum(uint32_t crc) noexcept { _checksum = crc; }
- void setDocumentCount(uint32_t count) noexcept { _docCount = count; }
- void setTotalDocumentSize(uint32_t size) noexcept { _totDocSize = size; }
- void setMetaCount(uint32_t count) noexcept { _metaCount = count; }
- void setUsedFileSize(uint32_t size) noexcept { _usedFileSize = size; }
- void setReady(bool ready = true) noexcept { _ready = ready; }
- void setActive(bool active = true) noexcept { _active = active; }
- void setLastModified(Timestamp lastModified) noexcept { _lastModified = lastModified; }
+ void setChecksum(uint32_t crc) { _checksum = crc; }
+ void setDocumentCount(uint32_t count) { _docCount = count; }
+ void setTotalDocumentSize(uint32_t size) { _totDocSize = size; }
+ void setMetaCount(uint32_t count) { _metaCount = count; }
+ void setUsedFileSize(uint32_t size) { _usedFileSize = size; }
+ void setReady(bool ready = true) { _ready = ready; }
+ void setActive(bool active = true) { _active = active; }
+ void setLastModified(Timestamp lastModified) { _lastModified = lastModified; }
/**
* Only compare checksum, total document count and document
* size, not meta count or used file size.
*/
- bool equalDocumentInfo(const BucketInfo& other) const noexcept {
+ bool equalDocumentInfo(const BucketInfo& other) const {
return (_checksum == other._checksum
&& _docCount == other._docCount
&& _totDocSize == other._totDocSize);
}
- bool operator==(const BucketInfo& info) const noexcept;
- bool valid() const noexcept { return (_docCount > 0 || _totDocSize == 0); }
- bool empty() const noexcept {
+ bool operator==(const BucketInfo& info) const;
+ bool valid() const { return (_docCount > 0 || _totDocSize == 0); }
+ bool empty() const {
return _metaCount == 0 && _usedFileSize == 0 && _checksum == 0;
}
vespalib::string toString() const;
diff --git a/vespalib/src/vespa/vespalib/util/executor.h b/vespalib/src/vespa/vespalib/util/executor.h
index ce610f4e84c..57ad28344b9 100644
--- a/vespalib/src/vespa/vespalib/util/executor.h
+++ b/vespalib/src/vespa/vespalib/util/executor.h
@@ -20,7 +20,7 @@ public:
struct Task {
typedef std::unique_ptr<Task> UP;
virtual void run() = 0;
- virtual ~Task() = default;
+ virtual ~Task() {}
};
enum class OptimizeFor {LATENCY, THROUGHPUT, ADAPTIVE};
@@ -41,7 +41,7 @@ public:
* In case you have a lazy executor that naps inbetween.
**/
virtual void wakeup() = 0;
- virtual ~Executor() = default;
+ virtual ~Executor() =default;
};
} // namespace vespalib