summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2021-05-18 14:01:04 +0000
committerTor Brede Vekterli <vekterli@verizonmedia.com>2021-05-18 14:01:04 +0000
commit39228644d50526eba3732e1deff193b17ad869a4 (patch)
tree798288e6cfbd21031dc0edab13b8b1c4dd5c3be1 /storage
parent8bf0c34f48471825ce15666893c1282ae6e5a1c0 (diff)
Make distributor timestamp generation thread safe
New behavior: - Only allow time to travel forwards within a given distributor process' lifetime. This is a change from the old behavior, which would emit a warning to the logs and happily continue from a previously used second, possibly causing the distributor to reuse timestamps. - Try to detect cases where the wall clock has been transiently set far into the future--only to bounce back--by aborting the process if the current observed time is more than 120 seconds older than the highest observed wall clock time. This is an attempt to avoid generating _too_ many bogus future timestamps, as the distributor would otherwise continue generating timestamps within the highest observed second.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/common/teststorageapp.cpp2
-rw-r--r--storage/src/tests/common/teststorageapp.h4
-rw-r--r--storage/src/vespa/storage/common/distributorcomponent.h6
-rw-r--r--storage/src/vespa/storage/storageserver/distributornode.cpp38
-rw-r--r--storage/src/vespa/storage/storageserver/distributornode.h15
5 files changed, 42 insertions, 23 deletions
diff --git a/storage/src/tests/common/teststorageapp.cpp b/storage/src/tests/common/teststorageapp.cpp
index e61050c21a3..7891f21a73d 100644
--- a/storage/src/tests/common/teststorageapp.cpp
+++ b/storage/src/tests/common/teststorageapp.cpp
@@ -226,7 +226,7 @@ TestDistributorApp::TestDistributorApp(NodeIndex index, vespalib::stringref conf
TestDistributorApp::~TestDistributorApp() = default;
api::Timestamp
-TestDistributorApp::getUniqueTimestamp()
+TestDistributorApp::generate_unique_timestamp()
{
std::lock_guard guard(_accessLock);
uint64_t timeNow(getClock().getTimeInSeconds().getTime());
diff --git a/storage/src/tests/common/teststorageapp.h b/storage/src/tests/common/teststorageapp.h
index 9e273002580..f30e0b62f4d 100644
--- a/storage/src/tests/common/teststorageapp.h
+++ b/storage/src/tests/common/teststorageapp.h
@@ -101,7 +101,7 @@ public:
private:
// Storage server interface implementation (until we can remove it)
- virtual api::Timestamp getUniqueTimestamp() { abort(); }
+ virtual api::Timestamp generate_unique_timestamp() { abort(); }
[[nodiscard]] virtual StorBucketDatabase& content_bucket_db(document::BucketSpace) { abort(); }
virtual StorBucketDatabase& getStorageBucketDatabase() { abort(); }
virtual BucketDatabase& getBucketDatabase() { abort(); }
@@ -157,7 +157,7 @@ public:
return _compReg;
}
- api::Timestamp getUniqueTimestamp() override;
+ api::Timestamp generate_unique_timestamp() override;
};
} // storageo
diff --git a/storage/src/vespa/storage/common/distributorcomponent.h b/storage/src/vespa/storage/common/distributorcomponent.h
index d5eb3fa56c8..403ffa3376c 100644
--- a/storage/src/vespa/storage/common/distributorcomponent.h
+++ b/storage/src/vespa/storage/common/distributorcomponent.h
@@ -46,7 +46,7 @@ typedef vespa::config::content::core::internal::InternalStorVisitordispatcherTyp
struct UniqueTimeCalculator {
virtual ~UniqueTimeCalculator() {}
- virtual api::Timestamp getUniqueTimestamp() = 0;
+ [[nodiscard]] virtual api::Timestamp generate_unique_timestamp() = 0;
};
struct DistributorManagedComponent
@@ -90,8 +90,8 @@ public:
DistributorComponent(DistributorComponentRegister& compReg, vespalib::stringref name);
~DistributorComponent() override;
- api::Timestamp getUniqueTimestamp() const {
- return _timeCalculator->getUniqueTimestamp();
+ [[nodiscard]] api::Timestamp getUniqueTimestamp() const {
+ return _timeCalculator->generate_unique_timestamp();
}
const DistributorConfig& getDistributorConfig() const {
return _distributorConfig;
diff --git a/storage/src/vespa/storage/storageserver/distributornode.cpp b/storage/src/vespa/storage/storageserver/distributornode.cpp
index f49b2a23688..51fa0c01c96 100644
--- a/storage/src/vespa/storage/storageserver/distributornode.cpp
+++ b/storage/src/vespa/storage/storageserver/distributornode.cpp
@@ -29,8 +29,9 @@ DistributorNode::DistributorNode(
_threadPool(framework::TickingThreadPool::createDefault("distributor")),
_stripe_pool(std::make_unique<distributor::DistributorStripePool>()),
_context(context),
- _lastUniqueTimestampRequested(0),
- _uniqueTimestampCounter(0),
+ _timestamp_mutex(),
+ _timestamp_second_counter(0),
+ _intra_second_pseudo_usec_counter(0),
_num_distributor_stripes(num_distributor_stripes),
_retrievedCommunicationManager(std::move(communicationManager))
{
@@ -110,23 +111,32 @@ DistributorNode::createChain(IStorageChainBuilder &builder)
builder.add(std::move(stateManager));
}
-// FIXME STRIPE not thread safe!!
api::Timestamp
-DistributorNode::getUniqueTimestamp()
+DistributorNode::generate_unique_timestamp()
{
- uint64_t timeNow(_component->getClock().getTimeInSeconds().getTime());
- if (timeNow == _lastUniqueTimestampRequested) {
- ++_uniqueTimestampCounter;
- } else {
- if (timeNow < _lastUniqueTimestampRequested) {
- LOG(error, "Time has moved backwards, from %" PRIu64 " to %" PRIu64 ".",
- _lastUniqueTimestampRequested, timeNow);
+ uint64_t now_seconds = _component->getClock().getTimeInSeconds().getTime();
+ std::lock_guard lock(_timestamp_mutex);
+ // We explicitly handle a seemingly decreased wall clock time, as multiple threads may
+ // race with each other over a second change edge. In this case, pretend an earlier
+ // timestamp took place in the same second as the newest observed wall clock time.
+ if (now_seconds <= _timestamp_second_counter) {
+ // ... but if we're stuck too far in the past, we trigger a process restart.
+ if ((_timestamp_second_counter - now_seconds) > SanityCheckMaxWallClockSecondSkew) {
+ LOG(error, "Current wall clock time is more than %u seconds in the past "
+ "compared to the highest observed wall clock time (%" PRIu64 " < %" PRIu64 "). "
+ "%u timestamps were generated within this time period.",
+ SanityCheckMaxWallClockSecondSkew, now_seconds,_timestamp_second_counter,
+ _intra_second_pseudo_usec_counter);
+ abort();
}
- _lastUniqueTimestampRequested = timeNow;
- _uniqueTimestampCounter = 0;
+ assert(_intra_second_pseudo_usec_counter < 1'000'000);
+ ++_intra_second_pseudo_usec_counter;
+ } else {
+ _timestamp_second_counter = now_seconds;
+ _intra_second_pseudo_usec_counter = 0;
}
- return _lastUniqueTimestampRequested * 1000000ll + _uniqueTimestampCounter;
+ return _timestamp_second_counter * 1'000'000LL + _intra_second_pseudo_usec_counter;
}
ResumeGuard
diff --git a/storage/src/vespa/storage/storageserver/distributornode.h b/storage/src/vespa/storage/storageserver/distributornode.h
index f2e483bbc9f..21e9589b760 100644
--- a/storage/src/vespa/storage/storageserver/distributornode.h
+++ b/storage/src/vespa/storage/storageserver/distributornode.h
@@ -12,6 +12,7 @@
#include "storagenode.h"
#include <vespa/storage/common/distributorcomponent.h>
#include <vespa/storageframework/generic/thread/tickingthread.h>
+#include <mutex>
namespace storage {
@@ -26,11 +27,19 @@ class DistributorNode
framework::TickingThreadPool::UP _threadPool;
std::unique_ptr<distributor::DistributorStripePool> _stripe_pool;
DistributorNodeContext& _context;
- uint64_t _lastUniqueTimestampRequested;
- uint32_t _uniqueTimestampCounter;
+ std::mutex _timestamp_mutex;
+ uint64_t _timestamp_second_counter;
+ uint32_t _intra_second_pseudo_usec_counter;
uint32_t _num_distributor_stripes;
std::unique_ptr<StorageLink> _retrievedCommunicationManager;
+ // If the current wall clock is more than the below number of seconds into the
+ // past when compared to the highest recorded wall clock second time stamp, abort
+ // the process. This is a sanity checking measure to prevent a process running
+ // on a wall clock that transiently is set far into the future from (hopefully)
+ // generating a massive amount of broken future timestamps.
+ constexpr static uint32_t SanityCheckMaxWallClockSecondSkew = 120;
+
public:
typedef std::unique_ptr<DistributorNode> UP;
@@ -52,7 +61,7 @@ private:
void initializeNodeSpecific() override;
void perform_post_chain_creation_init_steps() override { /* no-op */ }
void createChain(IStorageChainBuilder &builder) override;
- api::Timestamp getUniqueTimestamp() override;
+ api::Timestamp generate_unique_timestamp() override;
/**
* Shut down necessary distributor-specific components before shutting