diff options
author | Geir Storli <geirst@verizonmedia.com> | 2020-06-12 14:07:49 +0000 |
---|---|---|
committer | Geir Storli <geirst@verizonmedia.com> | 2020-06-17 13:17:54 +0000 |
commit | 5566148a0ad253569e44b4e21ada7e8e59241eaf (patch) | |
tree | 43434e8a049323a2c956b23539a2d969f92cefb8 /searchcommon/src | |
parent | e55f4b911fd3dab1270514edc99b5e3d3a086833 (diff) |
Implement initial support for two-phase puts in attribute writer.
This is only turned on for tensor attributes with a hnsw index that allows multi-threaded indexing.
Diffstat (limited to 'searchcommon/src')
3 files changed, 51 insertions, 6 deletions
diff --git a/searchcommon/src/vespa/searchcommon/attribute/hnsw_index_params.h b/searchcommon/src/vespa/searchcommon/attribute/hnsw_index_params.h index 3e3683ce60f..c8b196023d6 100644 --- a/searchcommon/src/vespa/searchcommon/attribute/hnsw_index_params.h +++ b/searchcommon/src/vespa/searchcommon/attribute/hnsw_index_params.h @@ -16,24 +16,29 @@ private: uint32_t _neighbors_to_explore_at_insert; // This is always the same as in the attribute config, and is duplicated here to simplify usage. DistanceMetric _distance_metric; + bool _allow_multi_threaded_indexing; public: HnswIndexParams(uint32_t max_links_per_node_in, uint32_t neighbors_to_explore_at_insert_in, - DistanceMetric distance_metric_in) + DistanceMetric distance_metric_in, + bool allow_multi_threaded_indexing_in = false) : _max_links_per_node(max_links_per_node_in), _neighbors_to_explore_at_insert(neighbors_to_explore_at_insert_in), - _distance_metric(distance_metric_in) + _distance_metric(distance_metric_in), + _allow_multi_threaded_indexing(allow_multi_threaded_indexing_in) {} uint32_t max_links_per_node() const { return _max_links_per_node; } uint32_t neighbors_to_explore_at_insert() const { return _neighbors_to_explore_at_insert; } DistanceMetric distance_metric() const { return _distance_metric; } + bool allow_multi_threaded_indexing() const { return _allow_multi_threaded_indexing; } bool operator==(const HnswIndexParams& rhs) const { return (_max_links_per_node == rhs._max_links_per_node && _neighbors_to_explore_at_insert == rhs._neighbors_to_explore_at_insert && - _distance_metric == rhs._distance_metric); + _distance_metric == rhs._distance_metric && + _allow_multi_threaded_indexing == rhs._allow_multi_threaded_indexing); } }; diff --git a/searchcommon/src/vespa/searchcommon/attribute/status.cpp b/searchcommon/src/vespa/searchcommon/attribute/status.cpp index da13548ec2e..f2bb49c348a 100644 --- a/searchcommon/src/vespa/searchcommon/attribute/status.cpp +++ b/searchcommon/src/vespa/searchcommon/attribute/status.cpp @@ -20,6 +20,42 @@ Status::Status() { } +Status::Status(const Status& rhs) + : _numDocs(rhs._numDocs), + _numValues(rhs._numValues), + _numUniqueValues(rhs._numUniqueValues), + _allocated(rhs._allocated), + _used(rhs._used), + _dead(rhs._dead), + _unused(rhs._unused), + _onHold(rhs._onHold), + _onHoldMax(rhs._onHoldMax), + _lastSyncToken(rhs.getLastSyncToken()), + _updates(rhs._updates), + _nonIdempotentUpdates(rhs._nonIdempotentUpdates), + _bitVectors(rhs._bitVectors) +{ +} + +Status& +Status::operator=(const Status& rhs) +{ + _numDocs = rhs._numDocs; + _numValues = rhs._numValues; + _numUniqueValues = rhs._numUniqueValues; + _allocated = rhs._allocated; + _used = rhs._used; + _dead = rhs._dead; + _unused = rhs._unused; + _onHold = rhs._onHold; + _onHoldMax = rhs._onHoldMax; + setLastSyncToken(rhs.getLastSyncToken()); + _updates = rhs._updates; + _nonIdempotentUpdates = rhs._nonIdempotentUpdates; + _bitVectors = rhs._bitVectors; + return *this; +} + vespalib::string Status::createName(vespalib::stringref index, vespalib::stringref attr) { diff --git a/searchcommon/src/vespa/searchcommon/attribute/status.h b/searchcommon/src/vespa/searchcommon/attribute/status.h index 888355b3f58..a624309da65 100644 --- a/searchcommon/src/vespa/searchcommon/attribute/status.h +++ b/searchcommon/src/vespa/searchcommon/attribute/status.h @@ -3,6 +3,7 @@ #pragma once #include <vespa/vespalib/stllike/string.h> +#include <atomic> namespace search::attribute { @@ -10,6 +11,8 @@ class Status { public: Status(); + Status(const Status& rhs); + Status& operator=(const Status& rhs); void updateStatistics(uint64_t numValues, uint64_t numUniqueValue, uint64_t allocated, uint64_t used, uint64_t dead, uint64_t onHold); @@ -22,14 +25,15 @@ public: uint64_t getDead() const { return _dead; } uint64_t getOnHold() const { return _onHold; } uint64_t getOnHoldMax() const { return _onHoldMax; } - uint64_t getLastSyncToken() const { return _lastSyncToken; } + // This might be accessed from other threads than the writer thread. + uint64_t getLastSyncToken() const { return _lastSyncToken.load(std::memory_order_relaxed); } uint64_t getUpdateCount() const { return _updates; } uint64_t getNonIdempotentUpdateCount() const { return _nonIdempotentUpdates; } uint32_t getBitVectors() const { return _bitVectors; } void setNumDocs(uint64_t v) { _numDocs = v; } void incNumDocs() { ++_numDocs; } - void setLastSyncToken(uint64_t v) { _lastSyncToken = v; } + void setLastSyncToken(uint64_t v) { _lastSyncToken.store(v, std::memory_order_relaxed); } void incUpdates(uint64_t v=1) { _updates += v; } void incNonIdempotentUpdates(uint64_t v = 1) { _nonIdempotentUpdates += v; } void incBitVectors() { ++_bitVectors; } @@ -47,7 +51,7 @@ private: uint64_t _unused; uint64_t _onHold; uint64_t _onHoldMax; - uint64_t _lastSyncToken; + std::atomic<uint64_t> _lastSyncToken; uint64_t _updates; uint64_t _nonIdempotentUpdates; uint32_t _bitVectors; |