diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2019-04-01 18:27:20 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-04-01 18:27:20 +0300 |
commit | f918e20fb06d98e552e62a18a50b110ff4530c7a (patch) | |
tree | 9015ba2b3df04c7aa72c60e4c3b53ea929aab694 /searchcore | |
parent | ca1090424ac25e0ffb6abd7e19edbf1e32a00c33 (diff) | |
parent | db4835b4dc6688e23b77e673dbd95afb861e9f5e (diff) |
Merge pull request #8922 from vespa-engine/balder/add-config-control-for-bucket-checksum
Balder/add config control for bucket checksum.
Diffstat (limited to 'searchcore')
16 files changed, 416 insertions, 181 deletions
diff --git a/searchcore/src/tests/proton/bucketdb/bucketdb/bucketdb_test.cpp b/searchcore/src/tests/proton/bucketdb/bucketdb/bucketdb_test.cpp index a5f8b1ff6c9..998b1626aeb 100644 --- a/searchcore/src/tests/proton/bucketdb/bucketdb/bucketdb_test.cpp +++ b/searchcore/src/tests/proton/bucketdb/bucketdb/bucketdb_test.cpp @@ -1,12 +1,13 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <vespa/log/log.h> -LOG_SETUP("bucketdb_test"); - #include <vespa/searchcore/proton/bucketdb/bucket_db_explorer.h> #include <vespa/searchcore/proton/bucketdb/bucketdb.h> #include <vespa/vespalib/data/slime/slime.h> +#include <vespa/vespalib/stllike/asciistream.h> #include <vespa/vespalib/testkit/testapp.h> +#include <vespa/log/log.h> +LOG_SETUP("bucketdb_test"); + using namespace document; using namespace proton; using namespace proton::bucketdb; @@ -128,8 +129,8 @@ TEST_F("require that bucket checksum is a combination of sub db types", Fixture) EXPECT_EQUAL(zero, f.get().getChecksum()); EXPECT_EQUAL(ready, f.add(TIME_1, SDT::READY).getChecksum()); - EXPECT_EQUAL(ready + notReady, f.add(TIME_2, SDT::NOTREADY).getChecksum()); - EXPECT_EQUAL(ready + notReady, f.add(TIME_3, SDT::REMOVED).getChecksum()); + EXPECT_EQUAL(BucketState::addChecksum(ready, notReady), f.add(TIME_2, SDT::NOTREADY).getChecksum()); + EXPECT_EQUAL(BucketState::addChecksum(ready, notReady), f.add(TIME_3, SDT::REMOVED).getChecksum()); EXPECT_EQUAL(notReady, f.remove(TIME_1, SDT::READY).getChecksum()); EXPECT_EQUAL(zero, f.remove(TIME_2, SDT::NOTREADY).getChecksum()); EXPECT_EQUAL(zero, f.remove(TIME_3, SDT::REMOVED).getChecksum()); @@ -179,17 +180,20 @@ TEST_F("require that bucket checksum ignores document sizes", Fixture) TEST("require that bucket db can be explored") { BucketDBOwner db; - db.takeGuard()->add(GID_1, BUCKET_1, TIME_1, DOCSIZE_1, SDT::READY); + const BucketState & expectedState = db.takeGuard()->add(GID_1, BUCKET_1, TIME_1, DOCSIZE_1, SDT::READY); { BucketDBExplorer explorer(db.takeGuard()); Slime expectSlime; - vespalib::string expectJson = + vespalib::asciistream expectJson; + expectJson << "{" " numBuckets: 1," " buckets: [" " {" " id: '0x2000000000000031'," - " checksum: '0x93939394'," + " checksum: '0x" + << vespalib::hex << expectedState.getChecksum() << + "'," " readyCount: 1," " notReadyCount: 0," " removedCount: 0," @@ -200,7 +204,7 @@ TEST("require that bucket db can be explored") " }" " ]" "}"; - EXPECT_TRUE(JsonFormat::decode(expectJson, expectSlime) > 0); + EXPECT_TRUE(JsonFormat::decode(expectJson.str(), expectSlime) > 0); Slime actualSlime; SlimeInserter inserter(actualSlime); explorer.get_state(inserter, true); @@ -212,4 +216,63 @@ TEST("require that bucket db can be explored") db.takeGuard()->remove(GID_1, BUCKET_1, TIME_1, DOCSIZE_1, SDT::READY); } +BucketChecksum +verifyChecksumCompliance(ChecksumAggregator::ChecksumType type) { + GlobalId gid1("aaaaaaaaaaaa"); + GlobalId gid2("bbbbbbbbbbbb"); + Timestamp t1(0); + Timestamp t2(1); + auto ckaggr = ChecksumAggregator::create(type, BucketChecksum(0)); + + EXPECT_EQUAL(0u, ckaggr->getChecksum()); + ckaggr->addDoc(gid1, t1); + BucketChecksum afterAdd = ckaggr->getChecksum(); + EXPECT_NOT_EQUAL(0u, afterAdd); // add Changes checksum + ckaggr->removeDoc(gid1, t1); + EXPECT_EQUAL(0u, ckaggr->getChecksum()); // add/remove are symmetrical + ckaggr->addDoc(gid1, t2); + EXPECT_NOT_EQUAL(afterAdd, ckaggr->getChecksum()); // timestamp changes checksum + ckaggr->removeDoc(gid1, t2); + EXPECT_EQUAL(0u, ckaggr->getChecksum()); // add/remove are symmetrical + ckaggr->addDoc(gid2, t1); + EXPECT_NOT_EQUAL(afterAdd, ckaggr->getChecksum()); // gid changes checksum + ckaggr->removeDoc(gid2, t1); + EXPECT_EQUAL(0u, ckaggr->getChecksum()); // add/remove are symmetrical + + { + // Verify order does not matter, only current content. A,B == B,A + ckaggr->addDoc(gid1, t1); + BucketChecksum after1AddOfGid1 = ckaggr->getChecksum(); + ckaggr->addDoc(gid2, t2); + BucketChecksum after2Add1 = ckaggr->getChecksum(); + ckaggr->removeDoc(gid2, t2); + EXPECT_EQUAL(after1AddOfGid1, ckaggr->getChecksum()); + ckaggr->removeDoc(gid1, t1); + EXPECT_EQUAL(0u, ckaggr->getChecksum()); + + ckaggr->addDoc(gid2, t2); + EXPECT_NOT_EQUAL(after1AddOfGid1, ckaggr->getChecksum()); + ckaggr->addDoc(gid1, t1); + EXPECT_EQUAL(after2Add1, ckaggr->getChecksum()); + ckaggr->removeDoc(gid2, t2); + EXPECT_EQUAL(after1AddOfGid1, ckaggr->getChecksum()); + ckaggr->removeDoc(gid1, t1); + EXPECT_EQUAL(0u, ckaggr->getChecksum()); // add/remove are symmetrical + } + + ckaggr->addDoc(gid1, t1); // Add something so we can verify it does not change between releases. + + return ckaggr->getChecksum(); +} + +TEST("test that legacy checksum complies") { + BucketChecksum cksum = verifyChecksumCompliance(ChecksumAggregator::ChecksumType::LEGACY); + EXPECT_EQUAL(0x24242423u, cksum); +} + +TEST("test that xxhash64 checksum complies") { + BucketChecksum cksum = verifyChecksumCompliance(ChecksumAggregator::ChecksumType::XXHASH64); + EXPECT_EQUAL(0xd26fca9au, cksum); +} + TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/searchcore/src/tests/proton/documentmetastore/documentmetastore_test.cpp b/searchcore/src/tests/proton/documentmetastore/documentmetastore_test.cpp index d436c63ae2e..42d038e6b88 100644 --- a/searchcore/src/tests/proton/documentmetastore/documentmetastore_test.cpp +++ b/searchcore/src/tests/proton/documentmetastore/documentmetastore_test.cpp @@ -4,6 +4,7 @@ #include <vespa/searchcore/proton/documentmetastore/documentmetastore.h> #include <vespa/searchcore/proton/flushengine/shrink_lid_space_flush_target.h> #include <vespa/searchcore/proton/bucketdb/bucketdbhandler.h> +#include <vespa/searchcore/proton/bucketdb/checksumaggregators.h> #include <vespa/searchcore/proton/bucketdb/i_bucket_create_listener.h> #include <vespa/searchlib/attribute/attributefilesavetarget.h> #include <vespa/searchlib/fef/matchdatalayout.h> @@ -22,6 +23,8 @@ LOG_SETUP("documentmetastore_test"); using namespace document; using proton::bucketdb::BucketState; +using proton::bucketdb::LegacyChecksumAggregator; +using proton::bucketdb::XXH64ChecksumAggregator; using proton::bucketdb::IBucketCreateListener; using search::AttributeFileSaveTarget; using search::AttributeGuard; @@ -771,7 +774,9 @@ TEST("requireThatWeCanSortGids") } } -TEST("requireThatBasicBucketInfoWorks") +template <typename ChecksumType> +void +requireThatBasicBucketInfoWorks() { DocumentMetaStore dms(createBucketDB()); typedef std::pair<BucketId, GlobalId> Elem; @@ -808,24 +813,22 @@ TEST("requireThatBasicBucketInfoWorks") m.erase(std::make_pair(bucketId, gid)); } assert(!m.empty()); - BucketChecksum cksum; + ChecksumType cksum(BucketChecksum(0)); BucketId prevBucket = m.begin()->first.first; uint32_t cnt = 0u; uint32_t maxcnt = 0u; BucketDBOwner::Guard bucketDB = dms.getBucketDB().takeGuard(); for (Map::const_iterator i = m.begin(), ie = m.end(); i != ie; ++i) { if (i->first.first == prevBucket) { - cksum = BucketChecksum(cksum + - BucketState::calcChecksum(i->first.second, - i->second)); + cksum.addDoc(i->first.second, i->second); ++cnt; } else { BucketInfo bi = bucketDB->get(prevBucket); EXPECT_EQUAL(cnt, bi.getDocumentCount()); - EXPECT_EQUAL(cksum, bi.getChecksum()); + EXPECT_EQUAL(cksum.getChecksum(), bi.getChecksum()); prevBucket = i->first.first; - cksum = BucketState::calcChecksum(i->first.second, - i->second); + cksum = T(BucketChecksum(0)); + cksum.addDoc(i->first.second, i->second); maxcnt = std::max(maxcnt, cnt); cnt = 1u; } @@ -833,10 +836,18 @@ TEST("requireThatBasicBucketInfoWorks") maxcnt = std::max(maxcnt, cnt); BucketInfo bi = bucketDB->get(prevBucket); EXPECT_EQUAL(cnt, bi.getDocumentCount()); - EXPECT_EQUAL(cksum, bi.getChecksum()); + EXPECT_EQUAL(cksum.getChecksum(), bi.getChecksum()); LOG(info, "Largest bucket: %u elements", maxcnt); } +TEST("requireThatBasicBucketInfoWorks") +{ + BucketState::setChecksumType(BucketState::ChecksumType::LEGACY); + requireThatBasicBucketInfoWorks<LegacyChecksumAggregator>(); + BucketState::setChecksumType(BucketState::ChecksumType::XXHASH64); + requireThatBasicBucketInfoWorks<XXH64ChecksumAggregator>(); +} + TEST("requireThatWeCanRetrieveListOfLidsFromBucketId") { typedef std::vector<uint32_t> LidVector; diff --git a/searchcore/src/vespa/searchcore/config/proton.def b/searchcore/src/vespa/searchcore/config/proton.def index 6cf99348911..a88239a41f6 100644 --- a/searchcore/src/vespa/searchcore/config/proton.def +++ b/searchcore/src/vespa/searchcore/config/proton.def @@ -452,3 +452,6 @@ maintenancejobs.resourcelimitfactor double default = 1.05 ## Currently used by 'lid_space_compaction' job. maintenancejobs.maxoutstandingmoveops int default=10 +## Controls the type of bucket checksum used. Do not change unless +## in depth understanding is present. +bucketdb.checksumtype enum {LEGACY, XXHASH64} default = LEGACY restart diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/bucketdb/CMakeLists.txt index 6cb22e6d1e8..375515938b3 100644 --- a/searchcore/src/vespa/searchcore/proton/bucketdb/CMakeLists.txt +++ b/searchcore/src/vespa/searchcore/proton/bucketdb/CMakeLists.txt @@ -8,6 +8,8 @@ vespa_add_library(searchcore_bucketdb STATIC bucketdbhandler.cpp bucketsessionbase.cpp bucketstate.cpp + checksumaggregator.cpp + checksumaggregators.cpp joinbucketssession.cpp splitbucketsession.cpp DEPENDS diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/bucketdb.cpp b/searchcore/src/vespa/searchcore/proton/bucketdb/bucketdb.cpp index 06aca3381f7..3ced0f509b5 100644 --- a/searchcore/src/vespa/searchcore/proton/bucketdb/bucketdb.cpp +++ b/searchcore/src/vespa/searchcore/proton/bucketdb/bucketdb.cpp @@ -22,10 +22,15 @@ BucketDB::~BucketDB() clear(); } +void +BucketDB::add(const BucketId &bucketId, const BucketState & state) { + _map[bucketId] += state; +} + bucketdb::BucketState * BucketDB::getBucketStatePtr(const BucketId &bucket) { - MapIterator it(_map.find(bucket)); + auto it(_map.find(bucket)); if (it != _map.end()) { return &it->second; } @@ -42,9 +47,7 @@ BucketDB::unloadBucket(const BucketId &bucket, const BucketState &delta) const bucketdb::BucketState & BucketDB::add(const GlobalId &gid, - const BucketId &bucketId, - const Timestamp ×tamp, - uint32_t docSize, + const BucketId &bucketId, const Timestamp ×tamp, uint32_t docSize, SubDbType subDbType) { BucketState &state = _map[bucketId]; @@ -54,9 +57,7 @@ BucketDB::add(const GlobalId &gid, void BucketDB::remove(const GlobalId &gid, - const BucketId &bucketId, - const Timestamp ×tamp, - uint32_t docSize, + const BucketId &bucketId, const Timestamp ×tamp, uint32_t docSize, SubDbType subDbType) { BucketState &state = _map[bucketId]; @@ -65,17 +66,13 @@ BucketDB::remove(const GlobalId &gid, void BucketDB::modify(const GlobalId &gid, - const BucketId &oldBucketId, - const Timestamp &oldTimestamp, - uint32_t oldDocSize, - const BucketId &newBucketId, - const Timestamp &newTimestamp, - uint32_t newDocSize, + const BucketId &oldBucketId, const Timestamp &oldTimestamp, uint32_t oldDocSize, + const BucketId &newBucketId, const Timestamp &newTimestamp, uint32_t newDocSize, SubDbType subDbType) { if (oldBucketId == newBucketId) { BucketState &state = _map[oldBucketId]; - state.modify(oldTimestamp, oldDocSize, newTimestamp, newDocSize, subDbType); + state.modify(gid, oldTimestamp, oldDocSize, newTimestamp, newDocSize, subDbType); } else { remove(gid, oldBucketId, oldTimestamp, oldDocSize, subDbType); add(gid, newBucketId, newTimestamp, newDocSize, subDbType); @@ -86,7 +83,7 @@ BucketDB::modify(const GlobalId &gid, bucketdb::BucketState BucketDB::get(const BucketId &bucketId) const { - Map::const_iterator itr = _map.find(bucketId); + auto itr = _map.find(bucketId); if (itr != _map.end()) { return itr->second; } @@ -125,22 +122,15 @@ BucketDB::cachedGet(const BucketId &bucketId) const bool BucketDB::hasBucket(const BucketId &bucketId) const { - Map::const_iterator itr = _map.find(bucketId); - if (itr != _map.end()) { - return true; - } - return false; + return (_map.find(bucketId) != _map.end()); } bool BucketDB::isActiveBucket(const BucketId &bucketId) const { - Map::const_iterator itr = _map.find(bucketId); - if (itr != _map.end()) { - return itr->second.isActive(); - } - return false; + auto itr = _map.find(bucketId); + return (itr != _map.end()) && itr->second.isActive(); } void @@ -194,7 +184,7 @@ BucketDB::createBucket(const BucketId &bucketId) void BucketDB::deleteEmptyBucket(const BucketId &bucketId) { - Map::iterator itr = _map.find(bucketId); + auto itr = _map.find(bucketId); if (itr == _map.end()) { return; } @@ -215,8 +205,7 @@ BucketDB::getActiveBuckets(BucketId::List &buckets) const } void -BucketDB::populateActiveBuckets(const BucketId::List &buckets, - BucketId::List &fixupBuckets) +BucketDB::populateActiveBuckets(const BucketId::List &buckets, BucketId::List &fixupBuckets) { typedef BucketId::List BIV; BIV sorted(buckets); @@ -237,12 +226,10 @@ BucketDB::populateActiveBuckets(const BucketId::List &buckets, for (; si != se; ++si) { toAdd.push_back(*si); } - BIV::const_iterator ai(toAdd.begin()); - BIV::const_iterator ae(toAdd.end()); BucketState activeState; activeState.setActive(true); - for (; ai != ae; ++ai) { - InsertResult ins(_map.insert(std::make_pair(*ai, activeState))); + for (const BucketId & bucketId : toAdd) { + InsertResult ins(_map.emplace(bucketId, activeState)); assert(ins.second); } } diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/bucketdb.h b/searchcore/src/vespa/searchcore/proton/bucketdb/bucketdb.h index 9e2c7a17b8f..e64cc53d4a2 100644 --- a/searchcore/src/vespa/searchcore/proton/bucketdb/bucketdb.h +++ b/searchcore/src/vespa/searchcore/proton/bucketdb/bucketdb.h @@ -33,33 +33,19 @@ public: BucketDB(); virtual ~BucketDB(); - const BucketState & - add(const GlobalId &gid, - const BucketId &bucketId, - const Timestamp ×tamp, - uint32_t docSize, - SubDbType subDbType); + const BucketState & add(const GlobalId &gid, + const BucketId &bucketId, const Timestamp ×tamp, uint32_t docSize, + SubDbType subDbType); - void add(const BucketId &bucketId, const BucketState & state) { - _map[bucketId] += state; - } + void add(const BucketId &bucketId, const BucketState & state); + void remove(const GlobalId &gid, + const BucketId &bucketId, const Timestamp ×tamp, uint32_t docSize, + SubDbType subDbType); - void - remove(const GlobalId &gid, - const BucketId &bucketId, - const Timestamp ×tamp, - uint32_t docSize, - SubDbType subDbType); - - void - modify(const GlobalId &gid, - const BucketId &oldBucketId, - const Timestamp &oldTimestamp, - uint32_t oldDocSize, - const BucketId &newBucketId, - const Timestamp &newTimestamp, - uint32_t newDocSize, - SubDbType subDbType); + void modify(const GlobalId &gid, + const BucketId &oldBucketId, const Timestamp &oldTimestamp, uint32_t oldDocSize, + const BucketId &newBucketId, const Timestamp &newTimestamp, uint32_t newDocSize, + SubDbType subDbType); BucketState get(const BucketId &bucketId) const; void cacheBucket(const BucketId &bucketId); diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/bucketstate.cpp b/searchcore/src/vespa/searchcore/proton/bucketdb/bucketstate.cpp index 4219835778f..cea30680faf 100644 --- a/searchcore/src/vespa/searchcore/proton/bucketdb/bucketstate.cpp +++ b/searchcore/src/vespa/searchcore/proton/bucketdb/bucketstate.cpp @@ -1,42 +1,42 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "bucketstate.h" +#include "checksumaggregators.h" #include <cassert> +#include <xxhash.h> namespace proton::bucketdb { namespace { uint32_t -gidChecksum(const document::GlobalId &gid) -{ - union { - const unsigned char *_c; - const uint32_t *_i; - } u; - u._c = gid.get(); - const uint32_t *i = u._i; - return i[0] + i[1] + i[2]; +toIdx(SubDbType subDbType) { + return static_cast<uint32_t>(subDbType); } - -uint32_t -timestampChecksum(const storage::spi::Timestamp ×tamp) -{ - return (timestamp >> 32) + timestamp; } -inline uint32_t toIdx(SubDbType subDbType) -{ - return static_cast<uint32_t>(subDbType); +BucketState::ChecksumType BucketState::_checksumType = BucketState::ChecksumType::LEGACY; + +std::unique_ptr<ChecksumAggregator> +BucketState::createChecksum(BucketChecksum seed) { + return ChecksumAggregator::create(_checksumType, seed); } +void +BucketState::setChecksumType(ChecksumType type) { + _checksumType = type; } +BucketState::~BucketState() = default; +BucketState::BucketState(const BucketState & rhs) = default; +BucketState::BucketState(BucketState && rhs) noexcept = default; +BucketState & BucketState::operator=(BucketState && rhs) noexcept = default; + BucketState::BucketState() : _docCount(), - _checksum(0), _docSizes(), + _checksum(createChecksum(BucketChecksum(0))), _active(false) { for (uint32_t i = 0; i < COUNTS; ++i) { @@ -45,26 +45,23 @@ BucketState::BucketState() } } -uint32_t -BucketState::calcChecksum(const GlobalId &gid, const Timestamp ×tamp) -{ - return gidChecksum(gid) + timestampChecksum(timestamp); +BucketState::BucketChecksum +BucketState::addChecksum(BucketChecksum a, BucketChecksum b) { + return createChecksum(a)->addChecksum(*createChecksum(b)).getChecksum(); } - void BucketState::add(const GlobalId &gid, const Timestamp ×tamp, uint32_t docSize, SubDbType subDbType) { assert(subDbType < SubDbType::COUNT); if (subDbType != SubDbType::REMOVED) { - _checksum += calcChecksum(gid, timestamp); + _checksum->addDoc(gid, timestamp); } uint32_t subDbTypeIdx = toIdx(subDbType); ++_docCount[subDbTypeIdx]; _docSizes[subDbTypeIdx] += docSize; } - void BucketState::remove(const GlobalId &gid, const Timestamp ×tamp, uint32_t docSize, SubDbType subDbType) { @@ -73,15 +70,15 @@ BucketState::remove(const GlobalId &gid, const Timestamp ×tamp, uint32_t do assert(_docCount[subDbTypeIdx] > 0); assert(_docSizes[subDbTypeIdx] >= docSize); if (subDbType != SubDbType::REMOVED) { - _checksum -= calcChecksum(gid, timestamp); + _checksum->removeDoc(gid, timestamp); } --_docCount[subDbTypeIdx]; _docSizes[subDbTypeIdx] -= docSize; } - void -BucketState::modify(const Timestamp &oldTimestamp, uint32_t oldDocSize, +BucketState::modify(const GlobalId &gid, + const Timestamp &oldTimestamp, uint32_t oldDocSize, const Timestamp &newTimestamp, uint32_t newDocSize, SubDbType subDbType) { @@ -90,27 +87,25 @@ BucketState::modify(const Timestamp &oldTimestamp, uint32_t oldDocSize, assert(_docCount[subDbTypeIdx] > 0); assert(_docSizes[subDbTypeIdx] >= oldDocSize); if (subDbType != SubDbType::REMOVED) { - _checksum = _checksum - timestampChecksum(oldTimestamp) + - timestampChecksum(newTimestamp); + _checksum->removeDoc(gid, oldTimestamp); + _checksum->addDoc(gid, newTimestamp); } _docSizes[subDbTypeIdx] = _docSizes[subDbTypeIdx] + newDocSize - oldDocSize; } - bool BucketState::empty() const { if (getReadyCount() != 0 || getRemovedCount() != 0 || getNotReadyCount() != 0) return false; - assert(_checksum == 0); + assert(_checksum->empty()); for (uint32_t i = 0; i < COUNTS; ++i) { assert(_docSizes[i] == 0); } return true; } - BucketState & BucketState::operator+=(const BucketState &rhs) { @@ -120,11 +115,10 @@ BucketState::operator+=(const BucketState &rhs) for (uint32_t i = 0; i < COUNTS; ++i) { _docSizes[i] += rhs._docSizes[i]; } - _checksum += rhs._checksum; + _checksum->addChecksum(*rhs._checksum); return *this; } - BucketState & BucketState::operator-=(const BucketState &rhs) { @@ -140,7 +134,7 @@ BucketState::operator-=(const BucketState &rhs) for (uint32_t i = 0; i < COUNTS; ++i) { _docSizes[i] -= rhs._docSizes[i]; } - _checksum -= rhs._checksum; + _checksum->removeChecksum(*rhs._checksum); return *this; } @@ -166,11 +160,9 @@ BucketState::operator storage::spi::BucketInfo() const using BucketInfo = storage::spi::BucketInfo; - return BucketInfo(storage::spi::BucketChecksum(_checksum), - documentCount, - docSizes, - entryCount, - entrySizes, + return BucketInfo(getChecksum(), + documentCount, docSizes, + entryCount, entrySizes, notReady > 0 ? BucketInfo::NOT_READY : BucketInfo::READY, _active ? BucketInfo::ACTIVE : BucketInfo::NOT_ACTIVE); } diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/bucketstate.h b/searchcore/src/vespa/searchcore/proton/bucketdb/bucketstate.h index 1fd5289da0b..8c390b288ae 100644 --- a/searchcore/src/vespa/searchcore/proton/bucketdb/bucketstate.h +++ b/searchcore/src/vespa/searchcore/proton/bucketdb/bucketstate.h @@ -2,46 +2,58 @@ #pragma once +#include "checksumaggregator.h" #include <vespa/searchcore/proton/common/subdbtype.h> -#include <vespa/document/base/globalid.h> -#include <vespa/persistence/spi/bucketinfo.h> +#include <vespa/vespalib/util/memory.h> namespace proton::bucketdb { +class ChecksumAggregator; + /** * Class BucketState represent the known state of a bucket in raw form. */ class BucketState { public: - typedef document::GlobalId GlobalId; - typedef storage::spi::Timestamp Timestamp; + using ChecksumType = ChecksumAggregator::ChecksumType; + using GlobalId = document::GlobalId; + using Timestamp = storage::spi::Timestamp; + using BucketChecksum = storage::spi::BucketChecksum; private: static constexpr uint32_t READY = static_cast<uint32_t>(SubDbType::READY); - static constexpr uint32_t REMOVED = - static_cast<uint32_t>(SubDbType::REMOVED); - static constexpr uint32_t NOTREADY = - static_cast<uint32_t>(SubDbType::NOTREADY); + static constexpr uint32_t REMOVED = static_cast<uint32_t>(SubDbType::REMOVED); + static constexpr uint32_t NOTREADY = static_cast<uint32_t>(SubDbType::NOTREADY); static constexpr uint32_t COUNTS = static_cast<uint32_t>(SubDbType::COUNT); uint32_t _docCount[COUNTS]; - uint32_t _checksum; size_t _docSizes[COUNTS]; - bool _active; + vespalib::CloneablePtr<ChecksumAggregator> _checksum; + bool _active; + + static ChecksumType _checksumType; + static std::unique_ptr<ChecksumAggregator> createChecksum(BucketChecksum seed); public: + static void setChecksumType(ChecksumType checksum); BucketState(); + BucketState(const BucketState & rhs); + BucketState(BucketState && rhs) noexcept; + BucketState & operator=(BucketState && rhs) noexcept; + ~BucketState(); - static uint32_t calcChecksum(const GlobalId &gid, const Timestamp ×tamp); + static BucketChecksum addChecksum(BucketChecksum a, BucketChecksum b); void add(const GlobalId &gid, const Timestamp ×tamp, uint32_t docSize, SubDbType subDbType); void remove(const GlobalId &gid, const Timestamp ×tamp, uint32_t docSize, SubDbType subDbType); - void modify(const Timestamp &oldTimestamp, uint32_t oldDocSize, + void modify(const GlobalId &gid, + const Timestamp &oldTimestamp, uint32_t oldDocSize, const Timestamp &newTimestamp, uint32_t newDocSize, SubDbType subDbType); - BucketState &setActive(bool active) { + BucketState & + setActive(bool active) { _active = active; return *this; } @@ -55,7 +67,7 @@ public: size_t getNotReadyDocSizes() const { return _docSizes[NOTREADY]; } uint32_t getDocumentCount() const { return getReadyCount() + getNotReadyCount(); } uint32_t getEntryCount() const { return getDocumentCount() + getRemovedCount(); } - storage::spi::BucketChecksum getChecksum() const { return storage::spi::BucketChecksum(_checksum); } + BucketChecksum getChecksum() const { return _checksum->getChecksum(); } bool empty() const; BucketState &operator+=(const BucketState &rhs); BucketState &operator-=(const BucketState &rhs); diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregator.cpp b/searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregator.cpp new file mode 100644 index 00000000000..795d88ab106 --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregator.cpp @@ -0,0 +1,18 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "checksumaggregators.h" + +namespace proton::bucketdb { + +std::unique_ptr<ChecksumAggregator> +ChecksumAggregator::create(ChecksumType type, BucketChecksum seed) { + switch (type) { + case ChecksumType::LEGACY: + return std::make_unique<LegacyChecksumAggregator>(seed); + case ChecksumType::XXHASH64: + return std::make_unique<XXH64ChecksumAggregator>(seed); + } + abort(); +} + +} diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregator.h b/searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregator.h new file mode 100644 index 00000000000..421cca5c6b5 --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregator.h @@ -0,0 +1,30 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vespa/document/base/globalid.h> +#include <vespa/persistence/spi/bucketinfo.h> + +namespace proton::bucketdb { + +/** + * Interface for aggregating bucket checksums. + **/ +class ChecksumAggregator { +public: + enum class ChecksumType {LEGACY, XXHASH64}; + using GlobalId = document::GlobalId; + using Timestamp = storage::spi::Timestamp; + using BucketChecksum = storage::spi::BucketChecksum; + virtual ~ChecksumAggregator() = default; + virtual ChecksumAggregator & addDoc(const GlobalId &gid, const Timestamp ×tamp) = 0; + virtual ChecksumAggregator & removeDoc(const GlobalId &gid, const Timestamp ×tamp) = 0; + virtual ChecksumAggregator & addChecksum(const ChecksumAggregator & rhs) = 0; + virtual ChecksumAggregator & removeChecksum(const ChecksumAggregator & rhs) = 0; + virtual BucketChecksum getChecksum() const = 0; + virtual bool empty() const = 0;; + virtual ChecksumAggregator * clone() const = 0; + static std::unique_ptr<ChecksumAggregator> create(ChecksumType type, BucketChecksum seed); +}; + +} diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregators.cpp b/searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregators.cpp new file mode 100644 index 00000000000..a118f416b6b --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregators.cpp @@ -0,0 +1,111 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "checksumaggregators.h" +#include <xxhash.h> + +namespace proton::bucketdb { + +using BucketChecksum = ChecksumAggregator::BucketChecksum; +using Timestamp = ChecksumAggregator::Timestamp; +using GlobalId = ChecksumAggregator::GlobalId; + +namespace { + +uint32_t +gidChecksum(const GlobalId &gid) +{ + uint32_t i[3]; + memcpy(i, gid.get(), GlobalId::LENGTH); + return i[0] + i[1] + i[2]; +} + + +uint32_t +timestampChecksum(const Timestamp ×tamp) +{ + return (timestamp >> 32) + timestamp; +} + + +uint32_t +calcChecksum(const GlobalId &gid, const Timestamp ×tamp) +{ + return gidChecksum(gid) + timestampChecksum(timestamp); +} + +} + +LegacyChecksumAggregator * +LegacyChecksumAggregator::clone() const { + return new LegacyChecksumAggregator(*this); +} +LegacyChecksumAggregator & +LegacyChecksumAggregator::addDoc(const GlobalId &gid, const Timestamp ×tamp) { + _checksum += calcChecksum(gid, timestamp); + return *this; +} +LegacyChecksumAggregator & +LegacyChecksumAggregator::removeDoc(const GlobalId &gid, const Timestamp ×tamp) { + _checksum -= calcChecksum(gid, timestamp); + return *this; +} +LegacyChecksumAggregator & +LegacyChecksumAggregator::addChecksum(const ChecksumAggregator & rhs) { + _checksum += dynamic_cast<const LegacyChecksumAggregator &>(rhs)._checksum; + return *this; +} +LegacyChecksumAggregator & +LegacyChecksumAggregator::removeChecksum(const ChecksumAggregator & rhs) { + _checksum -= dynamic_cast<const LegacyChecksumAggregator &>(rhs)._checksum; + return *this; +} +BucketChecksum +LegacyChecksumAggregator::getChecksum() const { + return BucketChecksum(_checksum); +} +bool +LegacyChecksumAggregator::empty() const { return _checksum == 0; } + + + +XXH64ChecksumAggregator * +XXH64ChecksumAggregator::clone() const { + return new XXH64ChecksumAggregator(*this); +} +XXH64ChecksumAggregator & +XXH64ChecksumAggregator::addDoc(const GlobalId &gid, const Timestamp ×tamp) { + _checksum ^= compute(gid, timestamp); + return *this; +} +XXH64ChecksumAggregator & +XXH64ChecksumAggregator::removeDoc(const GlobalId &gid, const Timestamp ×tamp) { + _checksum ^= compute(gid, timestamp); + return *this; +} +XXH64ChecksumAggregator & +XXH64ChecksumAggregator::addChecksum(const ChecksumAggregator & rhs) { + _checksum ^= dynamic_cast<const XXH64ChecksumAggregator &>(rhs)._checksum; + return *this; +} +XXH64ChecksumAggregator & +XXH64ChecksumAggregator::removeChecksum(const ChecksumAggregator & rhs) { + _checksum ^= dynamic_cast<const XXH64ChecksumAggregator &>(rhs)._checksum; + return *this; +} +BucketChecksum +XXH64ChecksumAggregator::getChecksum() const { + return BucketChecksum((_checksum >> 32) ^ (_checksum & 0xffffffffL)); +} +bool +XXH64ChecksumAggregator::empty() const { return _checksum == 0; } + +uint64_t +XXH64ChecksumAggregator::compute(const GlobalId &gid, const Timestamp ×tamp) { + char buffer[20]; + memcpy(&buffer[0], gid.get(), GlobalId::LENGTH); + uint64_t tmp = timestamp.getValue(); + memcpy(&buffer[GlobalId::LENGTH], &tmp, sizeof(tmp)); + return XXH64(buffer, sizeof(buffer), 0); +} + +} diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregators.h b/searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregators.h new file mode 100644 index 00000000000..49762ad107f --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregators.h @@ -0,0 +1,42 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "checksumaggregator.h" + +namespace proton::bucketdb { + +/** + * Implementations of the legacy bucket checksums. + **/ +class LegacyChecksumAggregator : public ChecksumAggregator { +public: + explicit LegacyChecksumAggregator(BucketChecksum seed) : _checksum(seed) { } + LegacyChecksumAggregator * clone() const override; + LegacyChecksumAggregator & addDoc(const GlobalId &gid, const Timestamp ×tamp) override; + LegacyChecksumAggregator & removeDoc(const GlobalId &gid, const Timestamp ×tamp) override; + LegacyChecksumAggregator & addChecksum(const ChecksumAggregator & rhs) override; + LegacyChecksumAggregator & removeChecksum(const ChecksumAggregator & rhs) override; + BucketChecksum getChecksum() const override; + bool empty() const override; +private: + uint32_t _checksum; +}; + +/** + * Implementations of the bucket checksums based on XXHASH64. + **/ +class XXH64ChecksumAggregator : public ChecksumAggregator { +public: + explicit XXH64ChecksumAggregator(BucketChecksum seed) : _checksum(seed) { } + XXH64ChecksumAggregator * clone() const override; + XXH64ChecksumAggregator & addDoc(const GlobalId &gid, const Timestamp ×tamp) override; + XXH64ChecksumAggregator & removeDoc(const GlobalId &gid, const Timestamp ×tamp) override; + XXH64ChecksumAggregator & addChecksum(const ChecksumAggregator & rhs) override; + XXH64ChecksumAggregator & removeChecksum(const ChecksumAggregator & rhs) override; + BucketChecksum getChecksum() const override; + bool empty() const override; +private: + static uint64_t compute(const GlobalId &gid, const Timestamp ×tamp); + uint64_t _checksum; +}; + +} diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/joinbucketssession.cpp b/searchcore/src/vespa/searchcore/proton/bucketdb/joinbucketssession.cpp index f4c88402409..f8f707dbfc3 100644 --- a/searchcore/src/vespa/searchcore/proton/bucketdb/joinbucketssession.cpp +++ b/searchcore/src/vespa/searchcore/proton/bucketdb/joinbucketssession.cpp @@ -38,20 +38,16 @@ JoinBucketsSession::setup() bool source2Active = extractInfo(_source2, source2State); _wantTargetActive = source1Active || source2Active; - _adjustSource1ActiveLids = calcFixupNeed(source1State, _wantTargetActive, - false); - _adjustSource2ActiveLids = calcFixupNeed(source2State, _wantTargetActive, - false); + _adjustSource1ActiveLids = calcFixupNeed(source1State, _wantTargetActive, false); + _adjustSource2ActiveLids = calcFixupNeed(source2State, _wantTargetActive, false); BucketState *targetState = nullptr; (void) extractInfo(_target, targetState); - _adjustTargetActiveLids = calcFixupNeed(targetState, _wantTargetActive, - true); + _adjustTargetActiveLids = calcFixupNeed(targetState, _wantTargetActive, true); } bool -JoinBucketsSession::mustFixupTargetActiveLids(bool movedSource1Docs, - bool movedSource2Docs) const +JoinBucketsSession::mustFixupTargetActiveLids(bool movedSource1Docs, bool movedSource2Docs) const { return _adjustTargetActiveLids || (_adjustSource1ActiveLids && movedSource1Docs) || @@ -68,8 +64,7 @@ JoinBucketsSession::applyDeltas(const BucketDeltaPair &deltas) bool -JoinBucketsSession::applyDelta(const BucketState &delta, BucketId &srcBucket, - BucketState *dst) +JoinBucketsSession::applyDelta(const BucketState &delta, BucketId &srcBucket, BucketState *dst) { if (!srcBucket.valid()) { assert(delta.empty()); diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/joinbucketssession.h b/searchcore/src/vespa/searchcore/proton/bucketdb/joinbucketssession.h index 1029379dfb1..9ef89992a9c 100644 --- a/searchcore/src/vespa/searchcore/proton/bucketdb/joinbucketssession.h +++ b/searchcore/src/vespa/searchcore/proton/bucketdb/joinbucketssession.h @@ -41,43 +41,14 @@ public: const BucketId &source2, const BucketId &target); - void - applyDeltas(const BucketDeltaPair &deltas); - - bool - getWantTargetActive() const - { - return _wantTargetActive; - } - - bool - mustFixupTargetActiveLids(bool movedSource1Docs, - bool movedSource2Docs) const; - - - void - setup(); - - void - finish(); - - const BucketId & - getSource1() const - { - return _source1; - } - - const BucketId & - getSource2() const - { - return _source2; - } - - const BucketId & - getTarget() const - { - return _target; - } + void applyDeltas(const BucketDeltaPair &deltas); + bool getWantTargetActive() const { return _wantTargetActive; } + bool mustFixupTargetActiveLids(bool movedSource1Docs, bool movedSource2Docs) const; + void setup(); + void finish(); + const BucketId & getSource1() const { return _source1; } + const BucketId & getSource2() const { return _source2; } + const BucketId & getTarget() const { return _target;} }; } diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/splitbucketsession.cpp b/searchcore/src/vespa/searchcore/proton/bucketdb/splitbucketsession.cpp index 0e085507a95..d03b630b822 100644 --- a/searchcore/src/vespa/searchcore/proton/bucketdb/splitbucketsession.cpp +++ b/searchcore/src/vespa/searchcore/proton/bucketdb/splitbucketsession.cpp @@ -41,13 +41,11 @@ SplitBucketSession::setup() if (_target1.valid()) { BucketState *target1State = _bucketDB->getBucketStatePtr(_target1); - _adjustTarget1ActiveLids = calcFixupNeed(target1State, _sourceActive, - true); + _adjustTarget1ActiveLids = calcFixupNeed(target1State, _sourceActive, true); } if (_target2.valid()) { BucketState *target2State = _bucketDB->getBucketStatePtr(_target2); - _adjustTarget2ActiveLids = calcFixupNeed(target2State, _sourceActive, - true); + _adjustTarget2ActiveLids = calcFixupNeed(target2State, _sourceActive, true); } } diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp index 8333911d7d9..7b27ba2fa3a 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp @@ -70,6 +70,19 @@ convert(InternalProtonType::Packetcompresstype type) } void +setBucketCheckSumType(const ProtonConfig & proton) +{ + switch (proton.bucketdb.checksumtype) { + case InternalProtonType::Bucketdb::LEGACY: + bucketdb::BucketState::setChecksumType(bucketdb::BucketState::ChecksumType::LEGACY); + break; + case InternalProtonType::Bucketdb::XXHASH64: + bucketdb::BucketState::setChecksumType(bucketdb::BucketState::ChecksumType::XXHASH64); + break; + } +} + +void setFS4Compression(const ProtonConfig & proton) { FS4PersistentPacketStreamer & fs4(FS4PersistentPacketStreamer::Instance); @@ -241,6 +254,7 @@ Proton::init(const BootstrapConfig::SP & configSnapshot) const ProtonConfig &protonConfig = configSnapshot->getProtonConfig(); const HwInfo & hwInfo = configSnapshot->getHwInfo(); + setBucketCheckSumType(protonConfig); setFS4Compression(protonConfig); _diskMemUsageSampler = std::make_unique<DiskMemUsageSampler>(protonConfig.basedir, diskMemUsageSamplerConfig(protonConfig, hwInfo)); |