diff options
Diffstat (limited to 'storage/src')
5 files changed, 42 insertions, 23 deletions
diff --git a/storage/src/tests/common/teststorageapp.cpp b/storage/src/tests/common/teststorageapp.cpp index e61050c21a3..7891f21a73d 100644 --- a/storage/src/tests/common/teststorageapp.cpp +++ b/storage/src/tests/common/teststorageapp.cpp @@ -226,7 +226,7 @@ TestDistributorApp::TestDistributorApp(NodeIndex index, vespalib::stringref conf TestDistributorApp::~TestDistributorApp() = default; api::Timestamp -TestDistributorApp::getUniqueTimestamp() +TestDistributorApp::generate_unique_timestamp() { std::lock_guard guard(_accessLock); uint64_t timeNow(getClock().getTimeInSeconds().getTime()); diff --git a/storage/src/tests/common/teststorageapp.h b/storage/src/tests/common/teststorageapp.h index 9e273002580..f30e0b62f4d 100644 --- a/storage/src/tests/common/teststorageapp.h +++ b/storage/src/tests/common/teststorageapp.h @@ -101,7 +101,7 @@ public: private: // Storage server interface implementation (until we can remove it) - virtual api::Timestamp getUniqueTimestamp() { abort(); } + virtual api::Timestamp generate_unique_timestamp() { abort(); } [[nodiscard]] virtual StorBucketDatabase& content_bucket_db(document::BucketSpace) { abort(); } virtual StorBucketDatabase& getStorageBucketDatabase() { abort(); } virtual BucketDatabase& getBucketDatabase() { abort(); } @@ -157,7 +157,7 @@ public: return _compReg; } - api::Timestamp getUniqueTimestamp() override; + api::Timestamp generate_unique_timestamp() override; }; } // storageo diff --git a/storage/src/vespa/storage/common/distributorcomponent.h b/storage/src/vespa/storage/common/distributorcomponent.h index d5eb3fa56c8..403ffa3376c 100644 --- a/storage/src/vespa/storage/common/distributorcomponent.h +++ b/storage/src/vespa/storage/common/distributorcomponent.h @@ -46,7 +46,7 @@ typedef vespa::config::content::core::internal::InternalStorVisitordispatcherTyp struct UniqueTimeCalculator { virtual ~UniqueTimeCalculator() {} - virtual api::Timestamp getUniqueTimestamp() = 0; + [[nodiscard]] virtual api::Timestamp generate_unique_timestamp() = 0; }; struct DistributorManagedComponent @@ -90,8 +90,8 @@ public: DistributorComponent(DistributorComponentRegister& compReg, vespalib::stringref name); ~DistributorComponent() override; - api::Timestamp getUniqueTimestamp() const { - return _timeCalculator->getUniqueTimestamp(); + [[nodiscard]] api::Timestamp getUniqueTimestamp() const { + return _timeCalculator->generate_unique_timestamp(); } const DistributorConfig& getDistributorConfig() const { return _distributorConfig; diff --git a/storage/src/vespa/storage/storageserver/distributornode.cpp b/storage/src/vespa/storage/storageserver/distributornode.cpp index f49b2a23688..51fa0c01c96 100644 --- a/storage/src/vespa/storage/storageserver/distributornode.cpp +++ b/storage/src/vespa/storage/storageserver/distributornode.cpp @@ -29,8 +29,9 @@ DistributorNode::DistributorNode( _threadPool(framework::TickingThreadPool::createDefault("distributor")), _stripe_pool(std::make_unique<distributor::DistributorStripePool>()), _context(context), - _lastUniqueTimestampRequested(0), - _uniqueTimestampCounter(0), + _timestamp_mutex(), + _timestamp_second_counter(0), + _intra_second_pseudo_usec_counter(0), _num_distributor_stripes(num_distributor_stripes), _retrievedCommunicationManager(std::move(communicationManager)) { @@ -110,23 +111,32 @@ DistributorNode::createChain(IStorageChainBuilder &builder) builder.add(std::move(stateManager)); } -// FIXME STRIPE not thread safe!! api::Timestamp -DistributorNode::getUniqueTimestamp() +DistributorNode::generate_unique_timestamp() { - uint64_t timeNow(_component->getClock().getTimeInSeconds().getTime()); - if (timeNow == _lastUniqueTimestampRequested) { - ++_uniqueTimestampCounter; - } else { - if (timeNow < _lastUniqueTimestampRequested) { - LOG(error, "Time has moved backwards, from %" PRIu64 " to %" PRIu64 ".", - _lastUniqueTimestampRequested, timeNow); + uint64_t now_seconds = _component->getClock().getTimeInSeconds().getTime(); + std::lock_guard lock(_timestamp_mutex); + // We explicitly handle a seemingly decreased wall clock time, as multiple threads may + // race with each other over a second change edge. In this case, pretend an earlier + // timestamp took place in the same second as the newest observed wall clock time. + if (now_seconds <= _timestamp_second_counter) { + // ... but if we're stuck too far in the past, we trigger a process restart. + if ((_timestamp_second_counter - now_seconds) > SanityCheckMaxWallClockSecondSkew) { + LOG(error, "Current wall clock time is more than %u seconds in the past " + "compared to the highest observed wall clock time (%" PRIu64 " < %" PRIu64 "). " + "%u timestamps were generated within this time period.", + SanityCheckMaxWallClockSecondSkew, now_seconds,_timestamp_second_counter, + _intra_second_pseudo_usec_counter); + abort(); } - _lastUniqueTimestampRequested = timeNow; - _uniqueTimestampCounter = 0; + assert(_intra_second_pseudo_usec_counter < 1'000'000); + ++_intra_second_pseudo_usec_counter; + } else { + _timestamp_second_counter = now_seconds; + _intra_second_pseudo_usec_counter = 0; } - return _lastUniqueTimestampRequested * 1000000ll + _uniqueTimestampCounter; + return _timestamp_second_counter * 1'000'000LL + _intra_second_pseudo_usec_counter; } ResumeGuard diff --git a/storage/src/vespa/storage/storageserver/distributornode.h b/storage/src/vespa/storage/storageserver/distributornode.h index f2e483bbc9f..21e9589b760 100644 --- a/storage/src/vespa/storage/storageserver/distributornode.h +++ b/storage/src/vespa/storage/storageserver/distributornode.h @@ -12,6 +12,7 @@ #include "storagenode.h" #include <vespa/storage/common/distributorcomponent.h> #include <vespa/storageframework/generic/thread/tickingthread.h> +#include <mutex> namespace storage { @@ -26,11 +27,19 @@ class DistributorNode framework::TickingThreadPool::UP _threadPool; std::unique_ptr<distributor::DistributorStripePool> _stripe_pool; DistributorNodeContext& _context; - uint64_t _lastUniqueTimestampRequested; - uint32_t _uniqueTimestampCounter; + std::mutex _timestamp_mutex; + uint64_t _timestamp_second_counter; + uint32_t _intra_second_pseudo_usec_counter; uint32_t _num_distributor_stripes; std::unique_ptr<StorageLink> _retrievedCommunicationManager; + // If the current wall clock is more than the below number of seconds into the + // past when compared to the highest recorded wall clock second time stamp, abort + // the process. This is a sanity checking measure to prevent a process running + // on a wall clock that transiently is set far into the future from (hopefully) + // generating a massive amount of broken future timestamps. + constexpr static uint32_t SanityCheckMaxWallClockSecondSkew = 120; + public: typedef std::unique_ptr<DistributorNode> UP; @@ -52,7 +61,7 @@ private: void initializeNodeSpecific() override; void perform_post_chain_creation_init_steps() override { /* no-op */ } void createChain(IStorageChainBuilder &builder) override; - api::Timestamp getUniqueTimestamp() override; + api::Timestamp generate_unique_timestamp() override; /** * Shut down necessary distributor-specific components before shutting |