diff options
author | Tor Egge <Tor.Egge@yahoo-inc.com> | 2017-01-27 11:55:37 +0000 |
---|---|---|
committer | Tor Egge <Tor.Egge@yahoo-inc.com> | 2017-01-27 11:55:37 +0000 |
commit | 066ba0a610c827331a60f295654ad7915373a8c9 (patch) | |
tree | de87aff7528310334022b1637260a41ada85e94a /searchlib | |
parent | 015d11f91c5f9b5fd02ff10332e61e8c134b1e3c (diff) |
Add unique store, a data store containing refcounted unique values.
Diffstat (limited to 'searchlib')
8 files changed, 557 insertions, 0 deletions
diff --git a/searchlib/CMakeLists.txt b/searchlib/CMakeLists.txt index 4fa141f876f..57beefe47a2 100644 --- a/searchlib/CMakeLists.txt +++ b/searchlib/CMakeLists.txt @@ -105,6 +105,7 @@ vespa_define_module( src/tests/datastore/array_store src/tests/datastore/array_store_config src/tests/datastore/datastore + src/tests/datastore/unique_store src/tests/diskindex/bitvector src/tests/diskindex/diskindex src/tests/diskindex/fieldwriter diff --git a/searchlib/src/tests/datastore/unique_store/CMakeLists.txt b/searchlib/src/tests/datastore/unique_store/CMakeLists.txt new file mode 100644 index 00000000000..35865a35cab --- /dev/null +++ b/searchlib/src/tests/datastore/unique_store/CMakeLists.txt @@ -0,0 +1,8 @@ +# Copyright 2017 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(searchlib_unique_store_test_app TEST + SOURCES + unique_store_test.cpp + DEPENDS + searchlib +) +vespa_add_test(NAME searchlib_unique_store_test_app COMMAND searchlib_unique_store_test_app) diff --git a/searchlib/src/tests/datastore/unique_store/FILES b/searchlib/src/tests/datastore/unique_store/FILES new file mode 100644 index 00000000000..8045679eb6a --- /dev/null +++ b/searchlib/src/tests/datastore/unique_store/FILES @@ -0,0 +1 @@ +unique_store_test.cpp diff --git a/searchlib/src/tests/datastore/unique_store/unique_store_test.cpp b/searchlib/src/tests/datastore/unique_store/unique_store_test.cpp new file mode 100644 index 00000000000..8d811b2a957 --- /dev/null +++ b/searchlib/src/tests/datastore/unique_store/unique_store_test.cpp @@ -0,0 +1,230 @@ +// Copyright 2017 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include <vespa/log/log.h> +LOG_SETUP("unique_store_test"); +#include <vespa/vespalib/testkit/testapp.h> +#include <vespa/vespalib/test/insertion_operators.h> +#include <vespa/vespalib/util/traits.h> +#include <vespa/searchlib/datastore/unique_store.hpp> +#include <vector> + +using namespace search::datastore; +using search::MemoryUsage; +using vespalib::ArrayRef; +using generation_t = vespalib::GenerationHandler::generation_t; + +struct MemStats +{ + size_t _used; + size_t _hold; + size_t _dead; + MemStats() : _used(0), _hold(0), _dead(0) {} + MemStats(const MemoryUsage &usage) + : _used(usage.usedBytes()), + _hold(usage.allocatedBytesOnHold()), + _dead(usage.deadBytes()) {} + MemStats &used(size_t val) { _used += val; return *this; } + MemStats &hold(size_t val) { _hold += val; return *this; } + MemStats &dead(size_t val) { _dead += val; return *this; } + MemStats &holdToDead(size_t val) { + decHold(val); + _dead += val; + return *this; + } + MemStats &decHold(size_t val) { + ASSERT_TRUE(_hold >= val); + _hold -= val; + return *this; + } +}; + +template <typename EntryT, typename RefT = EntryRefT<22> > +struct Fixture +{ + using EntryRefType = RefT; + using UniqueStoreType = UniqueStore<EntryT, RefT>; + using value_type = EntryT; + using ReferenceStore = std::map<EntryRef, std::pair<EntryT,uint32_t>>; + + UniqueStoreType store; + ReferenceStore refStore; + generation_t generation; + Fixture() + : store(), + refStore(), + generation(1) + {} + void assertAdd(const EntryT &input) { + EntryRef ref = add(input); + assertGet(ref, input); + } + EntryRef add(const EntryT &input) { + EntryRef result = store.add(input); + auto insres = refStore.insert(std::make_pair(result, std::make_pair(input, 1u))); + if (!insres.second) { + ++insres.first->second.second; + } + return result; + } + void assertGet(EntryRef ref, const EntryT &exp) const { + EntryT act = store.get(ref); + EXPECT_EQUAL(exp, act); + } + void remove(EntryRef ref) { + ASSERT_EQUAL(1u, refStore.count(ref)); + store.remove(ref); + if (refStore[ref].second > 1) { + --refStore[ref].second; + } else { + refStore.erase(ref); + } + } + void remove(const EntryT &input) { + remove(getEntryRef(input)); + } + uint32_t getBufferId(EntryRef ref) const { + return EntryRefType(ref).bufferId(); + } + void assertBufferState(EntryRef ref, const MemStats expStats) const { + EXPECT_EQUAL(expStats._used, store.bufferState(ref).size()); + EXPECT_EQUAL(expStats._hold, store.bufferState(ref).getHoldElems()); + EXPECT_EQUAL(expStats._dead, store.bufferState(ref).getDeadElems()); + } + void assertMemoryUsage(const MemStats expStats) const { + MemoryUsage act = store.getMemoryUsage(); + EXPECT_EQUAL(expStats._used, act.usedBytes()); + EXPECT_EQUAL(expStats._hold, act.allocatedBytesOnHold()); + EXPECT_EQUAL(expStats._dead, act.deadBytes()); + } + void assertStoreContent() const { + for (const auto &elem : refStore) { + TEST_DO(assertGet(elem.first, elem.second.first)); + } + } + EntryRef getEntryRef(const EntryT &input) { + for (const auto &elem : refStore) { + if (elem.second.first == input) { + return elem.first; + } + } + return EntryRef(); + } + void trimHoldLists() { + store.transferHoldLists(generation++); + store.trimHoldLists(generation); + } + void compactWorst() { + ICompactionContext::UP ctx = store.compactWorst(); + std::vector<EntryRef> refs; + for (const auto &elem : refStore) { + refs.push_back(elem.first); + } + refs.push_back(EntryRef()); + std::vector<EntryRef> compactedRefs = refs; + ctx->compact(ArrayRef<EntryRef>(compactedRefs)); + ASSERT_FALSE(refs.back().valid()); + refs.pop_back(); + ReferenceStore compactedRefStore; + for (size_t i = 0; i < refs.size(); ++i) { + ASSERT_EQUAL(0u, compactedRefStore.count(compactedRefs[i])); + ASSERT_EQUAL(1u, refStore.count(refs[i])); + compactedRefStore.insert(std::make_pair(compactedRefs[i], refStore[refs[i]])); + } + refStore = compactedRefStore; + } + size_t entrySize() const { return sizeof(EntryT); } +}; + +using NumberFixture = Fixture<uint32_t>; +using StringFixture = Fixture<std::string>; +using SmallOffsetNumberFixture = Fixture<uint32_t, EntryRefT<10>>; + +TEST("require that we test with trivial and non-trivial types") +{ + EXPECT_TRUE(vespalib::can_skip_destruction<NumberFixture::value_type>::value); + EXPECT_FALSE(vespalib::can_skip_destruction<StringFixture::value_type>::value); +} + +TEST_F("require that we can add and get values of trivial type", NumberFixture) +{ + TEST_DO(f.assertAdd(1)); + TEST_DO(f.assertAdd(2)); + TEST_DO(f.assertAdd(3)); + TEST_DO(f.assertAdd(1)); +} + +TEST_F("require that we can add and get values of non-trivial type", StringFixture) +{ + TEST_DO(f.assertAdd("aa")); + TEST_DO(f.assertAdd("bbb")); + TEST_DO(f.assertAdd("ccc")); + TEST_DO(f.assertAdd("aa")); +} + +TEST_F("require that elements are put on hold when value is removed", NumberFixture) +{ + EntryRef ref = f.add(1); + // Note: The first buffer have the first element reserved -> we expect 2 elements used here. + TEST_DO(f.assertBufferState(ref, MemStats().used(2).hold(0).dead(1))); + f.store.remove(ref); + TEST_DO(f.assertBufferState(ref, MemStats().used(2).hold(1).dead(1))); +} + +TEST_F("require that elements are reference counted", NumberFixture) +{ + EntryRef ref = f.add(1); + EntryRef ref2 = f.add(1); + EXPECT_EQUAL(ref.ref(), ref2.ref()); + // Note: The first buffer have the first element reserved -> we expect 2 elements used here. + TEST_DO(f.assertBufferState(ref, MemStats().used(2).hold(0).dead(1))); + f.store.remove(ref); + TEST_DO(f.assertBufferState(ref, MemStats().used(2).hold(0).dead(1))); + f.store.remove(ref); + TEST_DO(f.assertBufferState(ref, MemStats().used(2).hold(1).dead(1))); +} + +TEST_F("require that new underlying buffer is allocated when current is full", SmallOffsetNumberFixture) +{ + uint32_t firstBufferId = f.getBufferId(f.add(1)); + for (uint32_t i = 0; i < (F1::EntryRefType::offsetSize() - 2); ++i) { + uint32_t bufferId = f.getBufferId(f.add(i + 2)); + EXPECT_EQUAL(firstBufferId, bufferId); + } + TEST_DO(f.assertStoreContent()); + + uint32_t bias = F1::EntryRefType::offsetSize(); + uint32_t secondBufferId = f.getBufferId(f.add(bias + 1)); + EXPECT_NOT_EQUAL(firstBufferId, secondBufferId); + for (uint32_t i = 0; i < 10u; ++i) { + uint32_t bufferId = f.getBufferId(f.add(bias + i + 2)); + EXPECT_EQUAL(secondBufferId, bufferId); + } + TEST_DO(f.assertStoreContent()); +} + +TEST_F("require that compaction works", NumberFixture) +{ + EntryRef val1Ref = f.add(1); + EntryRef val2Ref = f.add(2); + f.remove(f.add(4)); + f.trimHoldLists(); + TEST_DO(f.assertBufferState(val1Ref, MemStats().used(4).dead(2))); // Note: First element is reserved + uint32_t val1BufferId = f.getBufferId(val1Ref); + + EXPECT_EQUAL(2u, f.refStore.size()); + f.compactWorst(); + EXPECT_EQUAL(2u, f.refStore.size()); + TEST_DO(f.assertStoreContent()); + + // Buffer has been compacted + EXPECT_NOT_EQUAL(val1BufferId, f.getBufferId(f.getEntryRef(1))); + // Old ref should still point to data. + f.assertGet(val1Ref, 1); + f.assertGet(val2Ref, 2); + EXPECT_TRUE(f.store.bufferState(val1Ref).isOnHold()); + f.trimHoldLists(); + EXPECT_TRUE(f.store.bufferState(val1Ref).isFree()); + TEST_DO(f.assertStoreContent()); +} + +TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/searchlib/src/vespa/searchlib/datastore/CMakeLists.txt b/searchlib/src/vespa/searchlib/datastore/CMakeLists.txt index a54cbc7c479..d31ac9312ad 100644 --- a/searchlib/src/vespa/searchlib/datastore/CMakeLists.txt +++ b/searchlib/src/vespa/searchlib/datastore/CMakeLists.txt @@ -6,5 +6,6 @@ vespa_add_library(searchlib_datastore OBJECT bufferstate.cpp datastore.cpp datastorebase.cpp + unique_store.cpp DEPENDS ) diff --git a/searchlib/src/vespa/searchlib/datastore/unique_store.cpp b/searchlib/src/vespa/searchlib/datastore/unique_store.cpp new file mode 100644 index 00000000000..bcbe1be0360 --- /dev/null +++ b/searchlib/src/vespa/searchlib/datastore/unique_store.cpp @@ -0,0 +1,12 @@ +// Copyright 2017 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "unique_store.hpp" +#include <vespa/document/base/globalid.h> + +namespace search { +namespace datastore { + +template class UniqueStore<document::GlobalId, EntryRefT<22>>; + +} // namespace datastore +} // namespace search diff --git a/searchlib/src/vespa/searchlib/datastore/unique_store.h b/searchlib/src/vespa/searchlib/datastore/unique_store.h new file mode 100644 index 00000000000..b60aa30bea9 --- /dev/null +++ b/searchlib/src/vespa/searchlib/datastore/unique_store.h @@ -0,0 +1,116 @@ +// Copyright 2017 Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "buffer_type.h" +#include "bufferstate.h" +#include "datastore.h" +#include "entryref.h" +#include "i_compaction_context.h" +#include <vespa/vespalib/util/array.h> +#include <vespa/searchlib/btree/btree.h> + +namespace search { +namespace datastore { + +/** + * Datastore for unique values of type EntryT that is accessed via a + * 32-bit EntryRef. + */ +template <typename EntryT, typename RefT = EntryRefT<22> > +class UniqueStore +{ +public: + using DataStoreType = DataStoreT<RefT>; + using EntryType = EntryT; + using RefType = RefT; + class WrappedEntry { + EntryType _value; + public: + WrappedEntry() : _value() { } + WrappedEntry(const EntryType &value_) : _value(value_) { } + WrappedEntry(const WrappedEntry &rhs) : _value(rhs.value()) { } + const EntryType &value() const { return _value; } + }; + class Compare { + const DataStoreType &_store; + const EntryType _value; +public: + Compare(const DataStoreType &store, const EntryType &value) + : _store(store), + _value(value) + { + } + Compare(const DataStoreType &store) + : _store(store), + _value() + { + } + inline const EntryType &get(EntryRef ref) const { + if (ref.valid()) { + RefType iRef(ref); + return _store.template getBufferEntry<WrappedEntry>(iRef.bufferId(), iRef.offset())->value(); + } else { + return _value; + } + } + inline bool operator()(const EntryRef lhs, const EntryRef rhs) const + { + const EntryType &lhsValue = get(lhs); + const EntryType &rhsValue = get(rhs); + return lhsValue < rhsValue; + } + }; + class WrappedCompare { + const Compare &_comp; + public: + WrappedCompare(const Compare &comp) + : _comp(comp) + { + } + inline bool operator()(EntryRef lhs, EntryRef rhs) const + { + return _comp(lhs, rhs); + } + }; + + using UniqueStoreBufferType = BufferType<WrappedEntry>; + using DictionaryTraits = btree::BTreeTraits<32, 32, 7, true>; + using Dictionary = btree::BTree<EntryRef, uint32_t, + btree::NoAggregated, + const WrappedCompare, + DictionaryTraits>; +private: + DataStoreType _store; + UniqueStoreBufferType _typeHandler; + uint32_t _typeId; + Dictionary _dict; + using generation_t = vespalib::GenerationHandler::generation_t; + + const WrappedEntry &getWrapped(EntryRef ref) const + { + RefType iRef(ref); + return *_store.template getBufferEntry<WrappedEntry>(iRef.bufferId(), iRef.offset()); + } +public: + UniqueStore(); + ~UniqueStore(); + EntryRef move(EntryRef ref); + EntryRef add(const EntryType &array); + const EntryType &get(EntryRef ref) const { return getWrapped(ref).value(); } + void remove(EntryRef ref); + ICompactionContext::UP compactWorst(); + MemoryUsage getMemoryUsage() const; + + // Pass on hold list management to underlying store + void transferHoldLists(generation_t generation) { _dict.getAllocator().transferHoldLists(generation); _store.transferHoldLists(generation); } + void trimHoldLists(generation_t firstUsed) { _dict.getAllocator().trimHoldLists(firstUsed); _store.trimHoldLists(firstUsed); } + vespalib::GenerationHolder &getGenerationHolder(void) { return _store.getGenerationHolder(); } + void setInitializing(bool initializing) { _store.setInitializing(initializing); } + + // Should only be used for unit testing + const BufferState &bufferState(EntryRef ref) const; +}; + +} +} diff --git a/searchlib/src/vespa/searchlib/datastore/unique_store.hpp b/searchlib/src/vespa/searchlib/datastore/unique_store.hpp new file mode 100644 index 00000000000..b50c8e6ceaa --- /dev/null +++ b/searchlib/src/vespa/searchlib/datastore/unique_store.hpp @@ -0,0 +1,188 @@ +// Copyright 2017 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "unique_store.h" +#include "datastore.hpp" +#include <vespa/searchlib/btree/btree.hpp> +#include <vespa/searchlib/btree/btreeroot.hpp> +#include <vespa/searchlib/btree/btreenodeallocator.hpp> +#include <vespa/searchlib/btree/btreeiterator.hpp> +#include <vespa/searchlib/btree/btreenode.hpp> +#include <atomic> + +namespace search { +namespace datastore { + +template <typename EntryT, typename RefT> +UniqueStore<EntryT, RefT>::UniqueStore() + : _store(), + _typeHandler(1, 2u, RefT::offsetSize(), 1024u), + _typeId(0), + _dict() +{ + _typeId = _store.addType(&_typeHandler); + assert(_typeId == 0u); + _store.initActiveBuffers(); +} + +template <typename EntryT, typename RefT> +UniqueStore<EntryT, RefT>::~UniqueStore() +{ + _store.clearHoldLists(); + _store.dropBuffers(); +} + +template <typename EntryT, typename RefT> +EntryRef +UniqueStore<EntryT, RefT>::add(const EntryType &value) +{ + Compare comp(_store, value); + auto itr = _dict.lowerBound(RefType(), WrappedCompare(comp)); + if (itr.valid() && !comp(EntryRef(), itr.getKey())) { + uint32_t refCount = itr.getData(); + assert(refCount != std::numeric_limits<uint32_t>::max()); + itr.writeData(refCount + 1); + return itr.getKey(); + } else { + EntryRef newRef = _store.template allocator<WrappedEntry>(_typeId).alloc(value).ref; + _dict.insert(itr, newRef, 1u); + return newRef; + } +} + +template <typename EntryT, typename RefT> +EntryRef +UniqueStore<EntryT, RefT>::move(EntryRef ref) +{ + return _store.template allocator<WrappedEntry>(_typeId).alloc(getWrapped(ref)).ref; +} + +template <typename EntryT, typename RefT> +void +UniqueStore<EntryT, RefT>::remove(EntryRef ref) +{ + assert(ref.valid()); + Compare comp(_store); + auto itr = _dict.lowerBound(ref, WrappedCompare(comp)); + if (itr.valid() && itr.getKey() == ref) { + uint32_t refCount = itr.getData(); + if (refCount > 1) { + itr.writeData(refCount - 1); + } else { + _dict.remove(itr); + _store.holdElem(ref, 1); + } + } +} + +namespace uniquestore { + +template <typename EntryT, typename RefT> +class CompactionContext : public ICompactionContext { +private: + using UniqueStoreType = UniqueStore<EntryT, RefT>; + using Dictionary = typename UniqueStoreType::Dictionary; + DataStoreBase &_dataStore; + Dictionary &_dict; + UniqueStoreType &_store; + std::vector<uint32_t> _bufferIdsToCompact; + std::vector<std::vector<EntryRef>> _mapping; + + bool compactingBuffer(uint32_t bufferId) { + return std::find(_bufferIdsToCompact.begin(), _bufferIdsToCompact.end(), + bufferId) != _bufferIdsToCompact.end(); + } + + void allocMapping() { + _mapping.resize(RefT::numBuffers()); + for (const auto bufferId : _bufferIdsToCompact) { + BufferState &state = _dataStore.getBufferState(bufferId); + _mapping[bufferId].resize(state.size()); + } + } + + void fillMapping() { + auto itr = _dict.begin(); + while (itr.valid()) { + RefT iRef(itr.getKey()); + assert(iRef.valid()); + if (compactingBuffer(iRef.bufferId())) { + assert(iRef.offset() < _mapping[iRef.bufferId()].size()); + EntryRef &mappedRef = _mapping[iRef.bufferId()][iRef.offset()]; + assert(!mappedRef.valid()); + EntryRef newRef = _store.move(itr.getKey()); + std::atomic_thread_fence(std::memory_order_release); + mappedRef = newRef; + itr.writeKey(newRef); + } + ++itr; + } + } + +public: + CompactionContext(DataStoreBase &dataStore, + Dictionary &dict, + UniqueStoreType &store, + std::vector<uint32_t> bufferIdsToCompact) + : _dataStore(dataStore), + _dict(dict), + _store(store), + _bufferIdsToCompact(std::move(bufferIdsToCompact)), + _mapping() + { + } + virtual ~CompactionContext() { + _dataStore.finishCompact(_bufferIdsToCompact); + } + virtual void compact(vespalib::ArrayRef<EntryRef> refs) override { + if (!_bufferIdsToCompact.empty()) { + if (_mapping.empty()) { + allocMapping(); + fillMapping(); + } + for (auto &ref : refs) { + if (ref.valid()) { + RefT internalRef(ref); + if (compactingBuffer(internalRef.bufferId())) { + assert(internalRef.offset() < _mapping[internalRef.bufferId()].size()); + EntryRef newRef = _mapping[internalRef.bufferId()][internalRef.offset()]; + assert(newRef.valid()); + ref = newRef; + } + } + } + } + } +}; + +} + +template <typename EntryT, typename RefT> +ICompactionContext::UP +UniqueStore<EntryT, RefT>::compactWorst() +{ + std::vector<uint32_t> bufferIdsToCompact = _store.startCompactWorstBuffers(true, true); + return std::make_unique<uniquestore::CompactionContext<EntryT, RefT>> + (_store, _dict, *this, std::move(bufferIdsToCompact)); +} + +template <typename EntryT, typename RefT> +MemoryUsage +UniqueStore<EntryT, RefT>::getMemoryUsage() const +{ + MemoryUsage usage = _store.getMemoryUsage(); + usage.merge(_dict.getMemoryUsage()); + return usage; +} + +template <typename EntryT, typename RefT> +const BufferState & +UniqueStore<EntryT, RefT>::bufferState(EntryRef ref) const +{ + RefT internalRef(ref); + return _store.getBufferState(internalRef.bufferId()); +} + +} +} |