diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2019-03-23 11:44:07 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2019-03-27 07:54:52 +0000 |
commit | 490d18680b82b74a35750843b4f308c51cb089e1 (patch) | |
tree | 2c46c477243432059b68c1df8cd3b17a46ba8ff5 /searchcore | |
parent | 5e33bb54604989bb2cef605572f7750d45eb630a (diff) |
- Wire in control of checksum type.
- Implement both legacy and xxhash64
- Add a small conformance test.
Diffstat (limited to 'searchcore')
17 files changed, 405 insertions, 173 deletions
diff --git a/searchcore/src/tests/proton/bucketdb/bucketdb/bucketdb_test.cpp b/searchcore/src/tests/proton/bucketdb/bucketdb/bucketdb_test.cpp index a5f8b1ff6c9..81177fd72d3 100644 --- a/searchcore/src/tests/proton/bucketdb/bucketdb/bucketdb_test.cpp +++ b/searchcore/src/tests/proton/bucketdb/bucketdb/bucketdb_test.cpp @@ -1,12 +1,13 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <vespa/log/log.h> -LOG_SETUP("bucketdb_test"); - #include <vespa/searchcore/proton/bucketdb/bucket_db_explorer.h> #include <vespa/searchcore/proton/bucketdb/bucketdb.h> #include <vespa/vespalib/data/slime/slime.h> +#include <vespa/vespalib/stllike/asciistream.h> #include <vespa/vespalib/testkit/testapp.h> +#include <vespa/log/log.h> +LOG_SETUP("bucketdb_test"); + using namespace document; using namespace proton; using namespace proton::bucketdb; @@ -128,8 +129,8 @@ TEST_F("require that bucket checksum is a combination of sub db types", Fixture) EXPECT_EQUAL(zero, f.get().getChecksum()); EXPECT_EQUAL(ready, f.add(TIME_1, SDT::READY).getChecksum()); - EXPECT_EQUAL(ready + notReady, f.add(TIME_2, SDT::NOTREADY).getChecksum()); - EXPECT_EQUAL(ready + notReady, f.add(TIME_3, SDT::REMOVED).getChecksum()); + EXPECT_EQUAL(BucketState::addChecksum(ready, notReady), f.add(TIME_2, SDT::NOTREADY).getChecksum()); + EXPECT_EQUAL(BucketState::addChecksum(ready, notReady), f.add(TIME_3, SDT::REMOVED).getChecksum()); EXPECT_EQUAL(notReady, f.remove(TIME_1, SDT::READY).getChecksum()); EXPECT_EQUAL(zero, f.remove(TIME_2, SDT::NOTREADY).getChecksum()); EXPECT_EQUAL(zero, f.remove(TIME_3, SDT::REMOVED).getChecksum()); @@ -179,17 +180,20 @@ TEST_F("require that bucket checksum ignores document sizes", Fixture) TEST("require that bucket db can be explored") { BucketDBOwner db; - db.takeGuard()->add(GID_1, BUCKET_1, TIME_1, DOCSIZE_1, SDT::READY); + const BucketState & expectedState = db.takeGuard()->add(GID_1, BUCKET_1, TIME_1, DOCSIZE_1, SDT::READY); { BucketDBExplorer explorer(db.takeGuard()); Slime expectSlime; - vespalib::string expectJson = + vespalib::asciistream expectJson; + expectJson << "{" " numBuckets: 1," " buckets: [" " {" " id: '0x2000000000000031'," - " checksum: '0x93939394'," + " checksum: '0x" + << vespalib::hex << expectedState.getChecksum() << + "'," " readyCount: 1," " notReadyCount: 0," " removedCount: 0," @@ -200,7 +204,7 @@ TEST("require that bucket db can be explored") " }" " ]" "}"; - EXPECT_TRUE(JsonFormat::decode(expectJson, expectSlime) > 0); + EXPECT_TRUE(JsonFormat::decode(expectJson.str(), expectSlime) > 0); Slime actualSlime; SlimeInserter inserter(actualSlime); explorer.get_state(inserter, true); @@ -212,4 +216,63 @@ TEST("require that bucket db can be explored") db.takeGuard()->remove(GID_1, BUCKET_1, TIME_1, DOCSIZE_1, SDT::READY); } +BucketChecksum +verifyChecksumCompliance(ChecksumAggregator::ChecksumType type) { + GlobalId gid1("a"); + GlobalId gid2("b"); + Timestamp t1(0); + Timestamp t2(1); + auto ckaggr = ChecksumAggregator::create(type, BucketChecksum(0)); + + EXPECT_EQUAL(0u, ckaggr->getChecksum()); + ckaggr->addDoc(gid1, t1); + BucketChecksum afterAdd = ckaggr->getChecksum(); + EXPECT_NOT_EQUAL(0u, afterAdd); // add Changes checksum + ckaggr->removeDoc(gid1, t1); + EXPECT_EQUAL(0u, ckaggr->getChecksum()); // add/remove are symmetrical + ckaggr->addDoc(gid1, t2); + EXPECT_NOT_EQUAL(afterAdd, ckaggr->getChecksum()); // timestamp changes checksum + ckaggr->removeDoc(gid1, t2); + EXPECT_EQUAL(0u, ckaggr->getChecksum()); // add/remove are symmetrical + ckaggr->addDoc(gid2, t1); + EXPECT_NOT_EQUAL(afterAdd, ckaggr->getChecksum()); // gid changes checksum + ckaggr->removeDoc(gid2, t1); + EXPECT_EQUAL(0u, ckaggr->getChecksum()); // add/remove are symmetrical + + { + // Verify order does not matter, only current content. A,B == B,A + ckaggr->addDoc(gid1, t1); + BucketChecksum after1AddOfGid1 = ckaggr->getChecksum(); + ckaggr->addDoc(gid2, t2); + BucketChecksum after2Add1 = ckaggr->getChecksum(); + ckaggr->removeDoc(gid2, t2); + EXPECT_EQUAL(after1AddOfGid1, ckaggr->getChecksum()); + ckaggr->removeDoc(gid1, t1); + EXPECT_EQUAL(0u, ckaggr->getChecksum()); + + ckaggr->addDoc(gid2, t2); + EXPECT_NOT_EQUAL(after1AddOfGid1, ckaggr->getChecksum()); + ckaggr->addDoc(gid1, t1); + EXPECT_EQUAL(after2Add1, ckaggr->getChecksum()); + ckaggr->removeDoc(gid2, t2); + EXPECT_EQUAL(after1AddOfGid1, ckaggr->getChecksum()); + ckaggr->removeDoc(gid1, t1); + EXPECT_EQUAL(0u, ckaggr->getChecksum()); // add/remove are symmetrical + } + + ckaggr->addDoc(gid1, t1); // Add something so we can verify it does not change between releases. + + return ckaggr->getChecksum(); +} + +TEST("test that legacy checksum complies") { + BucketChecksum cksum = verifyChecksumCompliance(ChecksumAggregator::ChecksumType::LEGACY); + EXPECT_EQUAL(0x61u, cksum); +} + +TEST("test that xxhash64 checksum complies") { + BucketChecksum cksum = verifyChecksumCompliance(ChecksumAggregator::ChecksumType::XXHASH64); + EXPECT_EQUAL(0xc34cd5c3u, cksum); +} + TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/searchcore/src/tests/proton/documentmetastore/documentmetastore_test.cpp b/searchcore/src/tests/proton/documentmetastore/documentmetastore_test.cpp index d436c63ae2e..d7d6159af61 100644 --- a/searchcore/src/tests/proton/documentmetastore/documentmetastore_test.cpp +++ b/searchcore/src/tests/proton/documentmetastore/documentmetastore_test.cpp @@ -4,6 +4,7 @@ #include <vespa/searchcore/proton/documentmetastore/documentmetastore.h> #include <vespa/searchcore/proton/flushengine/shrink_lid_space_flush_target.h> #include <vespa/searchcore/proton/bucketdb/bucketdbhandler.h> +#include <vespa/searchcore/proton/bucketdb/checksumaggregators.h> #include <vespa/searchcore/proton/bucketdb/i_bucket_create_listener.h> #include <vespa/searchlib/attribute/attributefilesavetarget.h> #include <vespa/searchlib/fef/matchdatalayout.h> @@ -22,6 +23,8 @@ LOG_SETUP("documentmetastore_test"); using namespace document; using proton::bucketdb::BucketState; +using proton::bucketdb::LegacyChecksumAggregator; +using proton::bucketdb::XXHChecksumAggregator; using proton::bucketdb::IBucketCreateListener; using search::AttributeFileSaveTarget; using search::AttributeGuard; @@ -771,7 +774,9 @@ TEST("requireThatWeCanSortGids") } } -TEST("requireThatBasicBucketInfoWorks") +template <typename T> +void +requireThatBasicBucketInfoWorks() { DocumentMetaStore dms(createBucketDB()); typedef std::pair<BucketId, GlobalId> Elem; @@ -808,24 +813,22 @@ TEST("requireThatBasicBucketInfoWorks") m.erase(std::make_pair(bucketId, gid)); } assert(!m.empty()); - BucketChecksum cksum; + T cksum(BucketChecksum(0)); BucketId prevBucket = m.begin()->first.first; uint32_t cnt = 0u; uint32_t maxcnt = 0u; BucketDBOwner::Guard bucketDB = dms.getBucketDB().takeGuard(); for (Map::const_iterator i = m.begin(), ie = m.end(); i != ie; ++i) { if (i->first.first == prevBucket) { - cksum = BucketChecksum(cksum + - BucketState::calcChecksum(i->first.second, - i->second)); + cksum.addDoc(i->first.second, i->second); ++cnt; } else { BucketInfo bi = bucketDB->get(prevBucket); EXPECT_EQUAL(cnt, bi.getDocumentCount()); - EXPECT_EQUAL(cksum, bi.getChecksum()); + EXPECT_EQUAL(cksum.getChecksum(), bi.getChecksum()); prevBucket = i->first.first; - cksum = BucketState::calcChecksum(i->first.second, - i->second); + cksum = T(BucketChecksum(0)); + cksum.addDoc(i->first.second, i->second); maxcnt = std::max(maxcnt, cnt); cnt = 1u; } @@ -833,10 +836,18 @@ TEST("requireThatBasicBucketInfoWorks") maxcnt = std::max(maxcnt, cnt); BucketInfo bi = bucketDB->get(prevBucket); EXPECT_EQUAL(cnt, bi.getDocumentCount()); - EXPECT_EQUAL(cksum, bi.getChecksum()); + EXPECT_EQUAL(cksum.getChecksum(), bi.getChecksum()); LOG(info, "Largest bucket: %u elements", maxcnt); } +TEST("requireThatBasicBucketInfoWorks") +{ + BucketState::setChecksumType(BucketState::ChecksumType::LEGACY); + requireThatBasicBucketInfoWorks<LegacyChecksumAggregator>(); + BucketState::setChecksumType(BucketState::ChecksumType::XXHASH64); + requireThatBasicBucketInfoWorks<XXHChecksumAggregator>(); +} + TEST("requireThatWeCanRetrieveListOfLidsFromBucketId") { typedef std::vector<uint32_t> LidVector; diff --git a/searchcore/src/vespa/searchcore/config/proton.def b/searchcore/src/vespa/searchcore/config/proton.def index 6cf99348911..a88239a41f6 100644 --- a/searchcore/src/vespa/searchcore/config/proton.def +++ b/searchcore/src/vespa/searchcore/config/proton.def @@ -452,3 +452,6 @@ maintenancejobs.resourcelimitfactor double default = 1.05 ## Currently used by 'lid_space_compaction' job. maintenancejobs.maxoutstandingmoveops int default=10 +## Controls the type of bucket checksum used. Do not change unless +## in depth understanding is present. +bucketdb.checksumtype enum {LEGACY, XXHASH64} default = LEGACY restart diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/bucketdb/CMakeLists.txt index 6cb22e6d1e8..375515938b3 100644 --- a/searchcore/src/vespa/searchcore/proton/bucketdb/CMakeLists.txt +++ b/searchcore/src/vespa/searchcore/proton/bucketdb/CMakeLists.txt @@ -8,6 +8,8 @@ vespa_add_library(searchcore_bucketdb STATIC bucketdbhandler.cpp bucketsessionbase.cpp bucketstate.cpp + checksumaggregator.cpp + checksumaggregators.cpp joinbucketssession.cpp splitbucketsession.cpp DEPENDS diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/bucket_db_owner.h b/searchcore/src/vespa/searchcore/proton/bucketdb/bucket_db_owner.h index 1ff8e16a749..e6f04977d33 100644 --- a/searchcore/src/vespa/searchcore/proton/bucketdb/bucket_db_owner.h +++ b/searchcore/src/vespa/searchcore/proton/bucketdb/bucket_db_owner.h @@ -44,6 +44,7 @@ public: Guard takeGuard() { return Guard(&_bucketDB, _mutex); } + const BucketDB & getBucketDB() const { return _bucketDB; } }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/bucketdb.cpp b/searchcore/src/vespa/searchcore/proton/bucketdb/bucketdb.cpp index 06aca3381f7..8d93d6db6b3 100644 --- a/searchcore/src/vespa/searchcore/proton/bucketdb/bucketdb.cpp +++ b/searchcore/src/vespa/searchcore/proton/bucketdb/bucketdb.cpp @@ -22,10 +22,15 @@ BucketDB::~BucketDB() clear(); } +void +BucketDB::add(const BucketId &bucketId, const BucketState & state) { + _map[bucketId] += state; +} + bucketdb::BucketState * BucketDB::getBucketStatePtr(const BucketId &bucket) { - MapIterator it(_map.find(bucket)); + auto it(_map.find(bucket)); if (it != _map.end()) { return &it->second; } @@ -42,9 +47,7 @@ BucketDB::unloadBucket(const BucketId &bucket, const BucketState &delta) const bucketdb::BucketState & BucketDB::add(const GlobalId &gid, - const BucketId &bucketId, - const Timestamp ×tamp, - uint32_t docSize, + const BucketId &bucketId, const Timestamp ×tamp, uint32_t docSize, SubDbType subDbType) { BucketState &state = _map[bucketId]; @@ -54,9 +57,7 @@ BucketDB::add(const GlobalId &gid, void BucketDB::remove(const GlobalId &gid, - const BucketId &bucketId, - const Timestamp ×tamp, - uint32_t docSize, + const BucketId &bucketId, const Timestamp ×tamp, uint32_t docSize, SubDbType subDbType) { BucketState &state = _map[bucketId]; @@ -65,17 +66,13 @@ BucketDB::remove(const GlobalId &gid, void BucketDB::modify(const GlobalId &gid, - const BucketId &oldBucketId, - const Timestamp &oldTimestamp, - uint32_t oldDocSize, - const BucketId &newBucketId, - const Timestamp &newTimestamp, - uint32_t newDocSize, + const BucketId &oldBucketId, const Timestamp &oldTimestamp, uint32_t oldDocSize, + const BucketId &newBucketId, const Timestamp &newTimestamp, uint32_t newDocSize, SubDbType subDbType) { if (oldBucketId == newBucketId) { BucketState &state = _map[oldBucketId]; - state.modify(oldTimestamp, oldDocSize, newTimestamp, newDocSize, subDbType); + state.modify(gid, oldTimestamp, oldDocSize, newTimestamp, newDocSize, subDbType); } else { remove(gid, oldBucketId, oldTimestamp, oldDocSize, subDbType); add(gid, newBucketId, newTimestamp, newDocSize, subDbType); @@ -86,7 +83,7 @@ BucketDB::modify(const GlobalId &gid, bucketdb::BucketState BucketDB::get(const BucketId &bucketId) const { - Map::const_iterator itr = _map.find(bucketId); + auto itr = _map.find(bucketId); if (itr != _map.end()) { return itr->second; } @@ -125,11 +122,7 @@ BucketDB::cachedGet(const BucketId &bucketId) const bool BucketDB::hasBucket(const BucketId &bucketId) const { - Map::const_iterator itr = _map.find(bucketId); - if (itr != _map.end()) { - return true; - } - return false; + return (_map.find(bucketId) != _map.end()); } @@ -237,12 +230,10 @@ BucketDB::populateActiveBuckets(const BucketId::List &buckets, for (; si != se; ++si) { toAdd.push_back(*si); } - BIV::const_iterator ai(toAdd.begin()); - BIV::const_iterator ae(toAdd.end()); BucketState activeState; activeState.setActive(true); - for (; ai != ae; ++ai) { - InsertResult ins(_map.insert(std::make_pair(*ai, activeState))); + for (const BucketId & bucketId : toAdd) { + InsertResult ins(_map.insert(std::make_pair(bucketId, activeState))); assert(ins.second); } } diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/bucketdb.h b/searchcore/src/vespa/searchcore/proton/bucketdb/bucketdb.h index 9e2c7a17b8f..e64cc53d4a2 100644 --- a/searchcore/src/vespa/searchcore/proton/bucketdb/bucketdb.h +++ b/searchcore/src/vespa/searchcore/proton/bucketdb/bucketdb.h @@ -33,33 +33,19 @@ public: BucketDB(); virtual ~BucketDB(); - const BucketState & - add(const GlobalId &gid, - const BucketId &bucketId, - const Timestamp ×tamp, - uint32_t docSize, - SubDbType subDbType); + const BucketState & add(const GlobalId &gid, + const BucketId &bucketId, const Timestamp ×tamp, uint32_t docSize, + SubDbType subDbType); - void add(const BucketId &bucketId, const BucketState & state) { - _map[bucketId] += state; - } + void add(const BucketId &bucketId, const BucketState & state); + void remove(const GlobalId &gid, + const BucketId &bucketId, const Timestamp ×tamp, uint32_t docSize, + SubDbType subDbType); - void - remove(const GlobalId &gid, - const BucketId &bucketId, - const Timestamp ×tamp, - uint32_t docSize, - SubDbType subDbType); - - void - modify(const GlobalId &gid, - const BucketId &oldBucketId, - const Timestamp &oldTimestamp, - uint32_t oldDocSize, - const BucketId &newBucketId, - const Timestamp &newTimestamp, - uint32_t newDocSize, - SubDbType subDbType); + void modify(const GlobalId &gid, + const BucketId &oldBucketId, const Timestamp &oldTimestamp, uint32_t oldDocSize, + const BucketId &newBucketId, const Timestamp &newTimestamp, uint32_t newDocSize, + SubDbType subDbType); BucketState get(const BucketId &bucketId) const; void cacheBucket(const BucketId &bucketId); diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/bucketstate.cpp b/searchcore/src/vespa/searchcore/proton/bucketdb/bucketstate.cpp index 4219835778f..cea30680faf 100644 --- a/searchcore/src/vespa/searchcore/proton/bucketdb/bucketstate.cpp +++ b/searchcore/src/vespa/searchcore/proton/bucketdb/bucketstate.cpp @@ -1,42 +1,42 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "bucketstate.h" +#include "checksumaggregators.h" #include <cassert> +#include <xxhash.h> namespace proton::bucketdb { namespace { uint32_t -gidChecksum(const document::GlobalId &gid) -{ - union { - const unsigned char *_c; - const uint32_t *_i; - } u; - u._c = gid.get(); - const uint32_t *i = u._i; - return i[0] + i[1] + i[2]; +toIdx(SubDbType subDbType) { + return static_cast<uint32_t>(subDbType); } - -uint32_t -timestampChecksum(const storage::spi::Timestamp ×tamp) -{ - return (timestamp >> 32) + timestamp; } -inline uint32_t toIdx(SubDbType subDbType) -{ - return static_cast<uint32_t>(subDbType); +BucketState::ChecksumType BucketState::_checksumType = BucketState::ChecksumType::LEGACY; + +std::unique_ptr<ChecksumAggregator> +BucketState::createChecksum(BucketChecksum seed) { + return ChecksumAggregator::create(_checksumType, seed); } +void +BucketState::setChecksumType(ChecksumType type) { + _checksumType = type; } +BucketState::~BucketState() = default; +BucketState::BucketState(const BucketState & rhs) = default; +BucketState::BucketState(BucketState && rhs) noexcept = default; +BucketState & BucketState::operator=(BucketState && rhs) noexcept = default; + BucketState::BucketState() : _docCount(), - _checksum(0), _docSizes(), + _checksum(createChecksum(BucketChecksum(0))), _active(false) { for (uint32_t i = 0; i < COUNTS; ++i) { @@ -45,26 +45,23 @@ BucketState::BucketState() } } -uint32_t -BucketState::calcChecksum(const GlobalId &gid, const Timestamp ×tamp) -{ - return gidChecksum(gid) + timestampChecksum(timestamp); +BucketState::BucketChecksum +BucketState::addChecksum(BucketChecksum a, BucketChecksum b) { + return createChecksum(a)->addChecksum(*createChecksum(b)).getChecksum(); } - void BucketState::add(const GlobalId &gid, const Timestamp ×tamp, uint32_t docSize, SubDbType subDbType) { assert(subDbType < SubDbType::COUNT); if (subDbType != SubDbType::REMOVED) { - _checksum += calcChecksum(gid, timestamp); + _checksum->addDoc(gid, timestamp); } uint32_t subDbTypeIdx = toIdx(subDbType); ++_docCount[subDbTypeIdx]; _docSizes[subDbTypeIdx] += docSize; } - void BucketState::remove(const GlobalId &gid, const Timestamp ×tamp, uint32_t docSize, SubDbType subDbType) { @@ -73,15 +70,15 @@ BucketState::remove(const GlobalId &gid, const Timestamp ×tamp, uint32_t do assert(_docCount[subDbTypeIdx] > 0); assert(_docSizes[subDbTypeIdx] >= docSize); if (subDbType != SubDbType::REMOVED) { - _checksum -= calcChecksum(gid, timestamp); + _checksum->removeDoc(gid, timestamp); } --_docCount[subDbTypeIdx]; _docSizes[subDbTypeIdx] -= docSize; } - void -BucketState::modify(const Timestamp &oldTimestamp, uint32_t oldDocSize, +BucketState::modify(const GlobalId &gid, + const Timestamp &oldTimestamp, uint32_t oldDocSize, const Timestamp &newTimestamp, uint32_t newDocSize, SubDbType subDbType) { @@ -90,27 +87,25 @@ BucketState::modify(const Timestamp &oldTimestamp, uint32_t oldDocSize, assert(_docCount[subDbTypeIdx] > 0); assert(_docSizes[subDbTypeIdx] >= oldDocSize); if (subDbType != SubDbType::REMOVED) { - _checksum = _checksum - timestampChecksum(oldTimestamp) + - timestampChecksum(newTimestamp); + _checksum->removeDoc(gid, oldTimestamp); + _checksum->addDoc(gid, newTimestamp); } _docSizes[subDbTypeIdx] = _docSizes[subDbTypeIdx] + newDocSize - oldDocSize; } - bool BucketState::empty() const { if (getReadyCount() != 0 || getRemovedCount() != 0 || getNotReadyCount() != 0) return false; - assert(_checksum == 0); + assert(_checksum->empty()); for (uint32_t i = 0; i < COUNTS; ++i) { assert(_docSizes[i] == 0); } return true; } - BucketState & BucketState::operator+=(const BucketState &rhs) { @@ -120,11 +115,10 @@ BucketState::operator+=(const BucketState &rhs) for (uint32_t i = 0; i < COUNTS; ++i) { _docSizes[i] += rhs._docSizes[i]; } - _checksum += rhs._checksum; + _checksum->addChecksum(*rhs._checksum); return *this; } - BucketState & BucketState::operator-=(const BucketState &rhs) { @@ -140,7 +134,7 @@ BucketState::operator-=(const BucketState &rhs) for (uint32_t i = 0; i < COUNTS; ++i) { _docSizes[i] -= rhs._docSizes[i]; } - _checksum -= rhs._checksum; + _checksum->removeChecksum(*rhs._checksum); return *this; } @@ -166,11 +160,9 @@ BucketState::operator storage::spi::BucketInfo() const using BucketInfo = storage::spi::BucketInfo; - return BucketInfo(storage::spi::BucketChecksum(_checksum), - documentCount, - docSizes, - entryCount, - entrySizes, + return BucketInfo(getChecksum(), + documentCount, docSizes, + entryCount, entrySizes, notReady > 0 ? BucketInfo::NOT_READY : BucketInfo::READY, _active ? BucketInfo::ACTIVE : BucketInfo::NOT_ACTIVE); } diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/bucketstate.h b/searchcore/src/vespa/searchcore/proton/bucketdb/bucketstate.h index 1fd5289da0b..8c390b288ae 100644 --- a/searchcore/src/vespa/searchcore/proton/bucketdb/bucketstate.h +++ b/searchcore/src/vespa/searchcore/proton/bucketdb/bucketstate.h @@ -2,46 +2,58 @@ #pragma once +#include "checksumaggregator.h" #include <vespa/searchcore/proton/common/subdbtype.h> -#include <vespa/document/base/globalid.h> -#include <vespa/persistence/spi/bucketinfo.h> +#include <vespa/vespalib/util/memory.h> namespace proton::bucketdb { +class ChecksumAggregator; + /** * Class BucketState represent the known state of a bucket in raw form. */ class BucketState { public: - typedef document::GlobalId GlobalId; - typedef storage::spi::Timestamp Timestamp; + using ChecksumType = ChecksumAggregator::ChecksumType; + using GlobalId = document::GlobalId; + using Timestamp = storage::spi::Timestamp; + using BucketChecksum = storage::spi::BucketChecksum; private: static constexpr uint32_t READY = static_cast<uint32_t>(SubDbType::READY); - static constexpr uint32_t REMOVED = - static_cast<uint32_t>(SubDbType::REMOVED); - static constexpr uint32_t NOTREADY = - static_cast<uint32_t>(SubDbType::NOTREADY); + static constexpr uint32_t REMOVED = static_cast<uint32_t>(SubDbType::REMOVED); + static constexpr uint32_t NOTREADY = static_cast<uint32_t>(SubDbType::NOTREADY); static constexpr uint32_t COUNTS = static_cast<uint32_t>(SubDbType::COUNT); uint32_t _docCount[COUNTS]; - uint32_t _checksum; size_t _docSizes[COUNTS]; - bool _active; + vespalib::CloneablePtr<ChecksumAggregator> _checksum; + bool _active; + + static ChecksumType _checksumType; + static std::unique_ptr<ChecksumAggregator> createChecksum(BucketChecksum seed); public: + static void setChecksumType(ChecksumType checksum); BucketState(); + BucketState(const BucketState & rhs); + BucketState(BucketState && rhs) noexcept; + BucketState & operator=(BucketState && rhs) noexcept; + ~BucketState(); - static uint32_t calcChecksum(const GlobalId &gid, const Timestamp ×tamp); + static BucketChecksum addChecksum(BucketChecksum a, BucketChecksum b); void add(const GlobalId &gid, const Timestamp ×tamp, uint32_t docSize, SubDbType subDbType); void remove(const GlobalId &gid, const Timestamp ×tamp, uint32_t docSize, SubDbType subDbType); - void modify(const Timestamp &oldTimestamp, uint32_t oldDocSize, + void modify(const GlobalId &gid, + const Timestamp &oldTimestamp, uint32_t oldDocSize, const Timestamp &newTimestamp, uint32_t newDocSize, SubDbType subDbType); - BucketState &setActive(bool active) { + BucketState & + setActive(bool active) { _active = active; return *this; } @@ -55,7 +67,7 @@ public: size_t getNotReadyDocSizes() const { return _docSizes[NOTREADY]; } uint32_t getDocumentCount() const { return getReadyCount() + getNotReadyCount(); } uint32_t getEntryCount() const { return getDocumentCount() + getRemovedCount(); } - storage::spi::BucketChecksum getChecksum() const { return storage::spi::BucketChecksum(_checksum); } + BucketChecksum getChecksum() const { return _checksum->getChecksum(); } bool empty() const; BucketState &operator+=(const BucketState &rhs); BucketState &operator-=(const BucketState &rhs); diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregator.cpp b/searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregator.cpp new file mode 100644 index 00000000000..738a9a81655 --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregator.cpp @@ -0,0 +1,16 @@ +#include "checksumaggregators.h" + +namespace proton::bucketdb { + +std::unique_ptr<ChecksumAggregator> +ChecksumAggregator::create(ChecksumType type, BucketChecksum seed) { + switch (type) { + case ChecksumType::LEGACY: + return std::make_unique<LegacyChecksumAggregator>(seed); + case ChecksumType::XXHASH64: + return std::make_unique<XXHChecksumAggregator>(seed); + } + return std::unique_ptr<ChecksumAggregator>(); +} + +} diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregator.h b/searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregator.h new file mode 100644 index 00000000000..a1dba50304f --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregator.h @@ -0,0 +1,27 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vespa/document/base/globalid.h> +#include <vespa/persistence/spi/bucketinfo.h> + +namespace proton::bucketdb { + +class ChecksumAggregator { +public: + enum class ChecksumType {LEGACY, XXHASH64}; + using GlobalId = document::GlobalId; + using Timestamp = storage::spi::Timestamp; + using BucketChecksum = storage::spi::BucketChecksum; + virtual ~ChecksumAggregator() = default; + virtual ChecksumAggregator & addDoc(const GlobalId &gid, const Timestamp ×tamp) = 0; + virtual ChecksumAggregator & removeDoc(const GlobalId &gid, const Timestamp ×tamp) = 0; + virtual ChecksumAggregator & addChecksum(const ChecksumAggregator & rhs) = 0; + virtual ChecksumAggregator & removeChecksum(const ChecksumAggregator & rhs) = 0; + virtual BucketChecksum getChecksum() const = 0; + virtual bool empty() const = 0;; + virtual ChecksumAggregator * clone() const = 0; + static std::unique_ptr<ChecksumAggregator> create(ChecksumType type, BucketChecksum seed); +}; + +} diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregators.cpp b/searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregators.cpp new file mode 100644 index 00000000000..c4b57078fcb --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregators.cpp @@ -0,0 +1,114 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "checksumaggregators.h" +#include <xxhash.h> + +namespace proton::bucketdb { + +using BucketChecksum = ChecksumAggregator::BucketChecksum; +using Timestamp = ChecksumAggregator::Timestamp; +using GlobalId = ChecksumAggregator::GlobalId; + +namespace { + +uint32_t +gidChecksum(const document::GlobalId &gid) +{ + union { + const unsigned char *_c; + const uint32_t *_i; + } u; + u._c = gid.get(); + const uint32_t *i = u._i; + return i[0] + i[1] + i[2]; +} + + +uint32_t +timestampChecksum(const Timestamp ×tamp) +{ + return (timestamp >> 32) + timestamp; +} + + +uint32_t +calcChecksum(const GlobalId &gid, const Timestamp ×tamp) +{ + return gidChecksum(gid) + timestampChecksum(timestamp); +} + +} + +LegacyChecksumAggregator * +LegacyChecksumAggregator::clone() const { + return new LegacyChecksumAggregator(*this); +} +LegacyChecksumAggregator & +LegacyChecksumAggregator::addDoc(const GlobalId &gid, const Timestamp ×tamp) { + _checksum += calcChecksum(gid, timestamp); + return *this; +} +LegacyChecksumAggregator & +LegacyChecksumAggregator::removeDoc(const GlobalId &gid, const Timestamp ×tamp) { + _checksum -= calcChecksum(gid, timestamp); + return *this; +} +LegacyChecksumAggregator & +LegacyChecksumAggregator::addChecksum(const ChecksumAggregator & rhs) { + _checksum += dynamic_cast<const LegacyChecksumAggregator &>(rhs)._checksum; + return *this; +} +LegacyChecksumAggregator & +LegacyChecksumAggregator::removeChecksum(const ChecksumAggregator & rhs) { + _checksum -= dynamic_cast<const LegacyChecksumAggregator &>(rhs)._checksum; + return *this; +} +BucketChecksum +LegacyChecksumAggregator::getChecksum() const { + return BucketChecksum(_checksum); +} +bool +LegacyChecksumAggregator::empty() const { return _checksum == 0; } + + + +XXHChecksumAggregator * +XXHChecksumAggregator::clone() const { + return new XXHChecksumAggregator(*this); +} +XXHChecksumAggregator & +XXHChecksumAggregator::addDoc(const GlobalId &gid, const Timestamp ×tamp) { + _checksum ^= compute(gid, timestamp); + return *this; +} +XXHChecksumAggregator & +XXHChecksumAggregator::removeDoc(const GlobalId &gid, const Timestamp ×tamp) { + _checksum ^= compute(gid, timestamp); + return *this; +} +XXHChecksumAggregator & +XXHChecksumAggregator::addChecksum(const ChecksumAggregator & rhs) { + _checksum ^= dynamic_cast<const XXHChecksumAggregator &>(rhs)._checksum; + return *this; +} +XXHChecksumAggregator & +XXHChecksumAggregator::removeChecksum(const ChecksumAggregator & rhs) { + _checksum ^= dynamic_cast<const XXHChecksumAggregator &>(rhs)._checksum; + return *this; +} +BucketChecksum +XXHChecksumAggregator::getChecksum() const { + return BucketChecksum((_checksum >> 32) ^ (_checksum & 0xffffffffL)); +} +bool +XXHChecksumAggregator::empty() const { return _checksum == 0; } + +uint64_t +XXHChecksumAggregator::compute(const GlobalId &gid, const Timestamp ×tamp) { + char buffer[20]; + memcpy(&buffer[0], gid.get(), 12); + reinterpret_cast<uint64_t *>(&buffer[12])[0] = timestamp.getValue(); + return XXH64(buffer, sizeof(buffer), 0); +} + +} diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregators.h b/searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregators.h new file mode 100644 index 00000000000..c4dc33141bd --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregators.h @@ -0,0 +1,36 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "checksumaggregator.h" + +namespace proton::bucketdb { + +class LegacyChecksumAggregator : public ChecksumAggregator { +public: + LegacyChecksumAggregator(BucketChecksum seed) : _checksum(seed) { } + LegacyChecksumAggregator * clone() const override; + LegacyChecksumAggregator & addDoc(const GlobalId &gid, const Timestamp ×tamp) override; + LegacyChecksumAggregator & removeDoc(const GlobalId &gid, const Timestamp ×tamp) override; + LegacyChecksumAggregator & addChecksum(const ChecksumAggregator & rhs) override; + LegacyChecksumAggregator & removeChecksum(const ChecksumAggregator & rhs) override; + BucketChecksum getChecksum() const override; + bool empty() const override; +private: + uint32_t _checksum; +}; + +class XXHChecksumAggregator : public ChecksumAggregator { +public: + XXHChecksumAggregator(BucketChecksum seed) : _checksum(seed) { } + XXHChecksumAggregator * clone() const override; + XXHChecksumAggregator & addDoc(const GlobalId &gid, const Timestamp ×tamp) override; + XXHChecksumAggregator & removeDoc(const GlobalId &gid, const Timestamp ×tamp) override; + XXHChecksumAggregator & addChecksum(const ChecksumAggregator & rhs) override; + XXHChecksumAggregator & removeChecksum(const ChecksumAggregator & rhs) override; + BucketChecksum getChecksum() const override; + bool empty() const override; +private: + static uint64_t compute(const GlobalId &gid, const Timestamp ×tamp); + uint64_t _checksum; +}; + +} diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/joinbucketssession.cpp b/searchcore/src/vespa/searchcore/proton/bucketdb/joinbucketssession.cpp index f4c88402409..f8f707dbfc3 100644 --- a/searchcore/src/vespa/searchcore/proton/bucketdb/joinbucketssession.cpp +++ b/searchcore/src/vespa/searchcore/proton/bucketdb/joinbucketssession.cpp @@ -38,20 +38,16 @@ JoinBucketsSession::setup() bool source2Active = extractInfo(_source2, source2State); _wantTargetActive = source1Active || source2Active; - _adjustSource1ActiveLids = calcFixupNeed(source1State, _wantTargetActive, - false); - _adjustSource2ActiveLids = calcFixupNeed(source2State, _wantTargetActive, - false); + _adjustSource1ActiveLids = calcFixupNeed(source1State, _wantTargetActive, false); + _adjustSource2ActiveLids = calcFixupNeed(source2State, _wantTargetActive, false); BucketState *targetState = nullptr; (void) extractInfo(_target, targetState); - _adjustTargetActiveLids = calcFixupNeed(targetState, _wantTargetActive, - true); + _adjustTargetActiveLids = calcFixupNeed(targetState, _wantTargetActive, true); } bool -JoinBucketsSession::mustFixupTargetActiveLids(bool movedSource1Docs, - bool movedSource2Docs) const +JoinBucketsSession::mustFixupTargetActiveLids(bool movedSource1Docs, bool movedSource2Docs) const { return _adjustTargetActiveLids || (_adjustSource1ActiveLids && movedSource1Docs) || @@ -68,8 +64,7 @@ JoinBucketsSession::applyDeltas(const BucketDeltaPair &deltas) bool -JoinBucketsSession::applyDelta(const BucketState &delta, BucketId &srcBucket, - BucketState *dst) +JoinBucketsSession::applyDelta(const BucketState &delta, BucketId &srcBucket, BucketState *dst) { if (!srcBucket.valid()) { assert(delta.empty()); diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/joinbucketssession.h b/searchcore/src/vespa/searchcore/proton/bucketdb/joinbucketssession.h index 1029379dfb1..9ef89992a9c 100644 --- a/searchcore/src/vespa/searchcore/proton/bucketdb/joinbucketssession.h +++ b/searchcore/src/vespa/searchcore/proton/bucketdb/joinbucketssession.h @@ -41,43 +41,14 @@ public: const BucketId &source2, const BucketId &target); - void - applyDeltas(const BucketDeltaPair &deltas); - - bool - getWantTargetActive() const - { - return _wantTargetActive; - } - - bool - mustFixupTargetActiveLids(bool movedSource1Docs, - bool movedSource2Docs) const; - - - void - setup(); - - void - finish(); - - const BucketId & - getSource1() const - { - return _source1; - } - - const BucketId & - getSource2() const - { - return _source2; - } - - const BucketId & - getTarget() const - { - return _target; - } + void applyDeltas(const BucketDeltaPair &deltas); + bool getWantTargetActive() const { return _wantTargetActive; } + bool mustFixupTargetActiveLids(bool movedSource1Docs, bool movedSource2Docs) const; + void setup(); + void finish(); + const BucketId & getSource1() const { return _source1; } + const BucketId & getSource2() const { return _source2; } + const BucketId & getTarget() const { return _target;} }; } diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/splitbucketsession.cpp b/searchcore/src/vespa/searchcore/proton/bucketdb/splitbucketsession.cpp index 0e085507a95..d03b630b822 100644 --- a/searchcore/src/vespa/searchcore/proton/bucketdb/splitbucketsession.cpp +++ b/searchcore/src/vespa/searchcore/proton/bucketdb/splitbucketsession.cpp @@ -41,13 +41,11 @@ SplitBucketSession::setup() if (_target1.valid()) { BucketState *target1State = _bucketDB->getBucketStatePtr(_target1); - _adjustTarget1ActiveLids = calcFixupNeed(target1State, _sourceActive, - true); + _adjustTarget1ActiveLids = calcFixupNeed(target1State, _sourceActive, true); } if (_target2.valid()) { BucketState *target2State = _bucketDB->getBucketStatePtr(_target2); - _adjustTarget2ActiveLids = calcFixupNeed(target2State, _sourceActive, - true); + _adjustTarget2ActiveLids = calcFixupNeed(target2State, _sourceActive, true); } } diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp index 8aef8d51c5f..3926a428f99 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp @@ -70,6 +70,19 @@ convert(InternalProtonType::Packetcompresstype type) } void +setBucketCheckSumType(const ProtonConfig & proton) +{ + switch (proton.bucketdb.checksumtype) { + case InternalProtonType::Bucketdb::LEGACY: + bucketdb::BucketState::setChecksumType(bucketdb::BucketState::ChecksumType::LEGACY); + break; + case InternalProtonType::Bucketdb::XXHASH64: + bucketdb::BucketState::setChecksumType(bucketdb::BucketState::ChecksumType::XXHASH64); + break; + } +} + +void setFS4Compression(const ProtonConfig & proton) { FS4PersistentPacketStreamer & fs4(FS4PersistentPacketStreamer::Instance); @@ -241,6 +254,7 @@ Proton::init(const BootstrapConfig::SP & configSnapshot) const ProtonConfig &protonConfig = configSnapshot->getProtonConfig(); const HwInfo & hwInfo = configSnapshot->getHwInfo(); + setBucketCheckSumType(protonConfig); setFS4Compression(protonConfig); _diskMemUsageSampler = std::make_unique<DiskMemUsageSampler>(protonConfig.basedir, diskMemUsageSamplerConfig(protonConfig, hwInfo)); |