summaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2019-04-01 18:27:20 +0300
committerGitHub <noreply@github.com>2019-04-01 18:27:20 +0300
commitf918e20fb06d98e552e62a18a50b110ff4530c7a (patch)
tree9015ba2b3df04c7aa72c60e4c3b53ea929aab694 /searchcore
parentca1090424ac25e0ffb6abd7e19edbf1e32a00c33 (diff)
parentdb4835b4dc6688e23b77e673dbd95afb861e9f5e (diff)
Merge pull request #8922 from vespa-engine/balder/add-config-control-for-bucket-checksum
Balder/add config control for bucket checksum.
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/tests/proton/bucketdb/bucketdb/bucketdb_test.cpp81
-rw-r--r--searchcore/src/tests/proton/documentmetastore/documentmetastore_test.cpp29
-rw-r--r--searchcore/src/vespa/searchcore/config/proton.def3
-rw-r--r--searchcore/src/vespa/searchcore/proton/bucketdb/CMakeLists.txt2
-rw-r--r--searchcore/src/vespa/searchcore/proton/bucketdb/bucketdb.cpp51
-rw-r--r--searchcore/src/vespa/searchcore/proton/bucketdb/bucketdb.h36
-rw-r--r--searchcore/src/vespa/searchcore/proton/bucketdb/bucketstate.cpp74
-rw-r--r--searchcore/src/vespa/searchcore/proton/bucketdb/bucketstate.h40
-rw-r--r--searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregator.cpp18
-rw-r--r--searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregator.h30
-rw-r--r--searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregators.cpp111
-rw-r--r--searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregators.h42
-rw-r--r--searchcore/src/vespa/searchcore/proton/bucketdb/joinbucketssession.cpp15
-rw-r--r--searchcore/src/vespa/searchcore/proton/bucketdb/joinbucketssession.h45
-rw-r--r--searchcore/src/vespa/searchcore/proton/bucketdb/splitbucketsession.cpp6
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/proton.cpp14
16 files changed, 416 insertions, 181 deletions
diff --git a/searchcore/src/tests/proton/bucketdb/bucketdb/bucketdb_test.cpp b/searchcore/src/tests/proton/bucketdb/bucketdb/bucketdb_test.cpp
index a5f8b1ff6c9..998b1626aeb 100644
--- a/searchcore/src/tests/proton/bucketdb/bucketdb/bucketdb_test.cpp
+++ b/searchcore/src/tests/proton/bucketdb/bucketdb/bucketdb_test.cpp
@@ -1,12 +1,13 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <vespa/log/log.h>
-LOG_SETUP("bucketdb_test");
-
#include <vespa/searchcore/proton/bucketdb/bucket_db_explorer.h>
#include <vespa/searchcore/proton/bucketdb/bucketdb.h>
#include <vespa/vespalib/data/slime/slime.h>
+#include <vespa/vespalib/stllike/asciistream.h>
#include <vespa/vespalib/testkit/testapp.h>
+#include <vespa/log/log.h>
+LOG_SETUP("bucketdb_test");
+
using namespace document;
using namespace proton;
using namespace proton::bucketdb;
@@ -128,8 +129,8 @@ TEST_F("require that bucket checksum is a combination of sub db types", Fixture)
EXPECT_EQUAL(zero, f.get().getChecksum());
EXPECT_EQUAL(ready, f.add(TIME_1, SDT::READY).getChecksum());
- EXPECT_EQUAL(ready + notReady, f.add(TIME_2, SDT::NOTREADY).getChecksum());
- EXPECT_EQUAL(ready + notReady, f.add(TIME_3, SDT::REMOVED).getChecksum());
+ EXPECT_EQUAL(BucketState::addChecksum(ready, notReady), f.add(TIME_2, SDT::NOTREADY).getChecksum());
+ EXPECT_EQUAL(BucketState::addChecksum(ready, notReady), f.add(TIME_3, SDT::REMOVED).getChecksum());
EXPECT_EQUAL(notReady, f.remove(TIME_1, SDT::READY).getChecksum());
EXPECT_EQUAL(zero, f.remove(TIME_2, SDT::NOTREADY).getChecksum());
EXPECT_EQUAL(zero, f.remove(TIME_3, SDT::REMOVED).getChecksum());
@@ -179,17 +180,20 @@ TEST_F("require that bucket checksum ignores document sizes", Fixture)
TEST("require that bucket db can be explored")
{
BucketDBOwner db;
- db.takeGuard()->add(GID_1, BUCKET_1, TIME_1, DOCSIZE_1, SDT::READY);
+ const BucketState & expectedState = db.takeGuard()->add(GID_1, BUCKET_1, TIME_1, DOCSIZE_1, SDT::READY);
{
BucketDBExplorer explorer(db.takeGuard());
Slime expectSlime;
- vespalib::string expectJson =
+ vespalib::asciistream expectJson;
+ expectJson <<
"{"
" numBuckets: 1,"
" buckets: ["
" {"
" id: '0x2000000000000031',"
- " checksum: '0x93939394',"
+ " checksum: '0x"
+ << vespalib::hex << expectedState.getChecksum() <<
+ "',"
" readyCount: 1,"
" notReadyCount: 0,"
" removedCount: 0,"
@@ -200,7 +204,7 @@ TEST("require that bucket db can be explored")
" }"
" ]"
"}";
- EXPECT_TRUE(JsonFormat::decode(expectJson, expectSlime) > 0);
+ EXPECT_TRUE(JsonFormat::decode(expectJson.str(), expectSlime) > 0);
Slime actualSlime;
SlimeInserter inserter(actualSlime);
explorer.get_state(inserter, true);
@@ -212,4 +216,63 @@ TEST("require that bucket db can be explored")
db.takeGuard()->remove(GID_1, BUCKET_1, TIME_1, DOCSIZE_1, SDT::READY);
}
+BucketChecksum
+verifyChecksumCompliance(ChecksumAggregator::ChecksumType type) {
+ GlobalId gid1("aaaaaaaaaaaa");
+ GlobalId gid2("bbbbbbbbbbbb");
+ Timestamp t1(0);
+ Timestamp t2(1);
+ auto ckaggr = ChecksumAggregator::create(type, BucketChecksum(0));
+
+ EXPECT_EQUAL(0u, ckaggr->getChecksum());
+ ckaggr->addDoc(gid1, t1);
+ BucketChecksum afterAdd = ckaggr->getChecksum();
+ EXPECT_NOT_EQUAL(0u, afterAdd); // add Changes checksum
+ ckaggr->removeDoc(gid1, t1);
+ EXPECT_EQUAL(0u, ckaggr->getChecksum()); // add/remove are symmetrical
+ ckaggr->addDoc(gid1, t2);
+ EXPECT_NOT_EQUAL(afterAdd, ckaggr->getChecksum()); // timestamp changes checksum
+ ckaggr->removeDoc(gid1, t2);
+ EXPECT_EQUAL(0u, ckaggr->getChecksum()); // add/remove are symmetrical
+ ckaggr->addDoc(gid2, t1);
+ EXPECT_NOT_EQUAL(afterAdd, ckaggr->getChecksum()); // gid changes checksum
+ ckaggr->removeDoc(gid2, t1);
+ EXPECT_EQUAL(0u, ckaggr->getChecksum()); // add/remove are symmetrical
+
+ {
+ // Verify order does not matter, only current content. A,B == B,A
+ ckaggr->addDoc(gid1, t1);
+ BucketChecksum after1AddOfGid1 = ckaggr->getChecksum();
+ ckaggr->addDoc(gid2, t2);
+ BucketChecksum after2Add1 = ckaggr->getChecksum();
+ ckaggr->removeDoc(gid2, t2);
+ EXPECT_EQUAL(after1AddOfGid1, ckaggr->getChecksum());
+ ckaggr->removeDoc(gid1, t1);
+ EXPECT_EQUAL(0u, ckaggr->getChecksum());
+
+ ckaggr->addDoc(gid2, t2);
+ EXPECT_NOT_EQUAL(after1AddOfGid1, ckaggr->getChecksum());
+ ckaggr->addDoc(gid1, t1);
+ EXPECT_EQUAL(after2Add1, ckaggr->getChecksum());
+ ckaggr->removeDoc(gid2, t2);
+ EXPECT_EQUAL(after1AddOfGid1, ckaggr->getChecksum());
+ ckaggr->removeDoc(gid1, t1);
+ EXPECT_EQUAL(0u, ckaggr->getChecksum()); // add/remove are symmetrical
+ }
+
+ ckaggr->addDoc(gid1, t1); // Add something so we can verify it does not change between releases.
+
+ return ckaggr->getChecksum();
+}
+
+TEST("test that legacy checksum complies") {
+ BucketChecksum cksum = verifyChecksumCompliance(ChecksumAggregator::ChecksumType::LEGACY);
+ EXPECT_EQUAL(0x24242423u, cksum);
+}
+
+TEST("test that xxhash64 checksum complies") {
+ BucketChecksum cksum = verifyChecksumCompliance(ChecksumAggregator::ChecksumType::XXHASH64);
+ EXPECT_EQUAL(0xd26fca9au, cksum);
+}
+
TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/searchcore/src/tests/proton/documentmetastore/documentmetastore_test.cpp b/searchcore/src/tests/proton/documentmetastore/documentmetastore_test.cpp
index d436c63ae2e..42d038e6b88 100644
--- a/searchcore/src/tests/proton/documentmetastore/documentmetastore_test.cpp
+++ b/searchcore/src/tests/proton/documentmetastore/documentmetastore_test.cpp
@@ -4,6 +4,7 @@
#include <vespa/searchcore/proton/documentmetastore/documentmetastore.h>
#include <vespa/searchcore/proton/flushengine/shrink_lid_space_flush_target.h>
#include <vespa/searchcore/proton/bucketdb/bucketdbhandler.h>
+#include <vespa/searchcore/proton/bucketdb/checksumaggregators.h>
#include <vespa/searchcore/proton/bucketdb/i_bucket_create_listener.h>
#include <vespa/searchlib/attribute/attributefilesavetarget.h>
#include <vespa/searchlib/fef/matchdatalayout.h>
@@ -22,6 +23,8 @@ LOG_SETUP("documentmetastore_test");
using namespace document;
using proton::bucketdb::BucketState;
+using proton::bucketdb::LegacyChecksumAggregator;
+using proton::bucketdb::XXH64ChecksumAggregator;
using proton::bucketdb::IBucketCreateListener;
using search::AttributeFileSaveTarget;
using search::AttributeGuard;
@@ -771,7 +774,9 @@ TEST("requireThatWeCanSortGids")
}
}
-TEST("requireThatBasicBucketInfoWorks")
+template <typename ChecksumType>
+void
+requireThatBasicBucketInfoWorks()
{
DocumentMetaStore dms(createBucketDB());
typedef std::pair<BucketId, GlobalId> Elem;
@@ -808,24 +813,22 @@ TEST("requireThatBasicBucketInfoWorks")
m.erase(std::make_pair(bucketId, gid));
}
assert(!m.empty());
- BucketChecksum cksum;
+ ChecksumType cksum(BucketChecksum(0));
BucketId prevBucket = m.begin()->first.first;
uint32_t cnt = 0u;
uint32_t maxcnt = 0u;
BucketDBOwner::Guard bucketDB = dms.getBucketDB().takeGuard();
for (Map::const_iterator i = m.begin(), ie = m.end(); i != ie; ++i) {
if (i->first.first == prevBucket) {
- cksum = BucketChecksum(cksum +
- BucketState::calcChecksum(i->first.second,
- i->second));
+ cksum.addDoc(i->first.second, i->second);
++cnt;
} else {
BucketInfo bi = bucketDB->get(prevBucket);
EXPECT_EQUAL(cnt, bi.getDocumentCount());
- EXPECT_EQUAL(cksum, bi.getChecksum());
+ EXPECT_EQUAL(cksum.getChecksum(), bi.getChecksum());
prevBucket = i->first.first;
- cksum = BucketState::calcChecksum(i->first.second,
- i->second);
+ cksum = T(BucketChecksum(0));
+ cksum.addDoc(i->first.second, i->second);
maxcnt = std::max(maxcnt, cnt);
cnt = 1u;
}
@@ -833,10 +836,18 @@ TEST("requireThatBasicBucketInfoWorks")
maxcnt = std::max(maxcnt, cnt);
BucketInfo bi = bucketDB->get(prevBucket);
EXPECT_EQUAL(cnt, bi.getDocumentCount());
- EXPECT_EQUAL(cksum, bi.getChecksum());
+ EXPECT_EQUAL(cksum.getChecksum(), bi.getChecksum());
LOG(info, "Largest bucket: %u elements", maxcnt);
}
+TEST("requireThatBasicBucketInfoWorks")
+{
+ BucketState::setChecksumType(BucketState::ChecksumType::LEGACY);
+ requireThatBasicBucketInfoWorks<LegacyChecksumAggregator>();
+ BucketState::setChecksumType(BucketState::ChecksumType::XXHASH64);
+ requireThatBasicBucketInfoWorks<XXH64ChecksumAggregator>();
+}
+
TEST("requireThatWeCanRetrieveListOfLidsFromBucketId")
{
typedef std::vector<uint32_t> LidVector;
diff --git a/searchcore/src/vespa/searchcore/config/proton.def b/searchcore/src/vespa/searchcore/config/proton.def
index 6cf99348911..a88239a41f6 100644
--- a/searchcore/src/vespa/searchcore/config/proton.def
+++ b/searchcore/src/vespa/searchcore/config/proton.def
@@ -452,3 +452,6 @@ maintenancejobs.resourcelimitfactor double default = 1.05
## Currently used by 'lid_space_compaction' job.
maintenancejobs.maxoutstandingmoveops int default=10
+## Controls the type of bucket checksum used. Do not change unless
+## in depth understanding is present.
+bucketdb.checksumtype enum {LEGACY, XXHASH64} default = LEGACY restart
diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/bucketdb/CMakeLists.txt
index 6cb22e6d1e8..375515938b3 100644
--- a/searchcore/src/vespa/searchcore/proton/bucketdb/CMakeLists.txt
+++ b/searchcore/src/vespa/searchcore/proton/bucketdb/CMakeLists.txt
@@ -8,6 +8,8 @@ vespa_add_library(searchcore_bucketdb STATIC
bucketdbhandler.cpp
bucketsessionbase.cpp
bucketstate.cpp
+ checksumaggregator.cpp
+ checksumaggregators.cpp
joinbucketssession.cpp
splitbucketsession.cpp
DEPENDS
diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/bucketdb.cpp b/searchcore/src/vespa/searchcore/proton/bucketdb/bucketdb.cpp
index 06aca3381f7..3ced0f509b5 100644
--- a/searchcore/src/vespa/searchcore/proton/bucketdb/bucketdb.cpp
+++ b/searchcore/src/vespa/searchcore/proton/bucketdb/bucketdb.cpp
@@ -22,10 +22,15 @@ BucketDB::~BucketDB()
clear();
}
+void
+BucketDB::add(const BucketId &bucketId, const BucketState & state) {
+ _map[bucketId] += state;
+}
+
bucketdb::BucketState *
BucketDB::getBucketStatePtr(const BucketId &bucket)
{
- MapIterator it(_map.find(bucket));
+ auto it(_map.find(bucket));
if (it != _map.end()) {
return &it->second;
}
@@ -42,9 +47,7 @@ BucketDB::unloadBucket(const BucketId &bucket, const BucketState &delta)
const bucketdb::BucketState &
BucketDB::add(const GlobalId &gid,
- const BucketId &bucketId,
- const Timestamp &timestamp,
- uint32_t docSize,
+ const BucketId &bucketId, const Timestamp &timestamp, uint32_t docSize,
SubDbType subDbType)
{
BucketState &state = _map[bucketId];
@@ -54,9 +57,7 @@ BucketDB::add(const GlobalId &gid,
void
BucketDB::remove(const GlobalId &gid,
- const BucketId &bucketId,
- const Timestamp &timestamp,
- uint32_t docSize,
+ const BucketId &bucketId, const Timestamp &timestamp, uint32_t docSize,
SubDbType subDbType)
{
BucketState &state = _map[bucketId];
@@ -65,17 +66,13 @@ BucketDB::remove(const GlobalId &gid,
void
BucketDB::modify(const GlobalId &gid,
- const BucketId &oldBucketId,
- const Timestamp &oldTimestamp,
- uint32_t oldDocSize,
- const BucketId &newBucketId,
- const Timestamp &newTimestamp,
- uint32_t newDocSize,
+ const BucketId &oldBucketId, const Timestamp &oldTimestamp, uint32_t oldDocSize,
+ const BucketId &newBucketId, const Timestamp &newTimestamp, uint32_t newDocSize,
SubDbType subDbType)
{
if (oldBucketId == newBucketId) {
BucketState &state = _map[oldBucketId];
- state.modify(oldTimestamp, oldDocSize, newTimestamp, newDocSize, subDbType);
+ state.modify(gid, oldTimestamp, oldDocSize, newTimestamp, newDocSize, subDbType);
} else {
remove(gid, oldBucketId, oldTimestamp, oldDocSize, subDbType);
add(gid, newBucketId, newTimestamp, newDocSize, subDbType);
@@ -86,7 +83,7 @@ BucketDB::modify(const GlobalId &gid,
bucketdb::BucketState
BucketDB::get(const BucketId &bucketId) const
{
- Map::const_iterator itr = _map.find(bucketId);
+ auto itr = _map.find(bucketId);
if (itr != _map.end()) {
return itr->second;
}
@@ -125,22 +122,15 @@ BucketDB::cachedGet(const BucketId &bucketId) const
bool
BucketDB::hasBucket(const BucketId &bucketId) const
{
- Map::const_iterator itr = _map.find(bucketId);
- if (itr != _map.end()) {
- return true;
- }
- return false;
+ return (_map.find(bucketId) != _map.end());
}
bool
BucketDB::isActiveBucket(const BucketId &bucketId) const
{
- Map::const_iterator itr = _map.find(bucketId);
- if (itr != _map.end()) {
- return itr->second.isActive();
- }
- return false;
+ auto itr = _map.find(bucketId);
+ return (itr != _map.end()) && itr->second.isActive();
}
void
@@ -194,7 +184,7 @@ BucketDB::createBucket(const BucketId &bucketId)
void
BucketDB::deleteEmptyBucket(const BucketId &bucketId)
{
- Map::iterator itr = _map.find(bucketId);
+ auto itr = _map.find(bucketId);
if (itr == _map.end()) {
return;
}
@@ -215,8 +205,7 @@ BucketDB::getActiveBuckets(BucketId::List &buckets) const
}
void
-BucketDB::populateActiveBuckets(const BucketId::List &buckets,
- BucketId::List &fixupBuckets)
+BucketDB::populateActiveBuckets(const BucketId::List &buckets, BucketId::List &fixupBuckets)
{
typedef BucketId::List BIV;
BIV sorted(buckets);
@@ -237,12 +226,10 @@ BucketDB::populateActiveBuckets(const BucketId::List &buckets,
for (; si != se; ++si) {
toAdd.push_back(*si);
}
- BIV::const_iterator ai(toAdd.begin());
- BIV::const_iterator ae(toAdd.end());
BucketState activeState;
activeState.setActive(true);
- for (; ai != ae; ++ai) {
- InsertResult ins(_map.insert(std::make_pair(*ai, activeState)));
+ for (const BucketId & bucketId : toAdd) {
+ InsertResult ins(_map.emplace(bucketId, activeState));
assert(ins.second);
}
}
diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/bucketdb.h b/searchcore/src/vespa/searchcore/proton/bucketdb/bucketdb.h
index 9e2c7a17b8f..e64cc53d4a2 100644
--- a/searchcore/src/vespa/searchcore/proton/bucketdb/bucketdb.h
+++ b/searchcore/src/vespa/searchcore/proton/bucketdb/bucketdb.h
@@ -33,33 +33,19 @@ public:
BucketDB();
virtual ~BucketDB();
- const BucketState &
- add(const GlobalId &gid,
- const BucketId &bucketId,
- const Timestamp &timestamp,
- uint32_t docSize,
- SubDbType subDbType);
+ const BucketState & add(const GlobalId &gid,
+ const BucketId &bucketId, const Timestamp &timestamp, uint32_t docSize,
+ SubDbType subDbType);
- void add(const BucketId &bucketId, const BucketState & state) {
- _map[bucketId] += state;
- }
+ void add(const BucketId &bucketId, const BucketState & state);
+ void remove(const GlobalId &gid,
+ const BucketId &bucketId, const Timestamp &timestamp, uint32_t docSize,
+ SubDbType subDbType);
- void
- remove(const GlobalId &gid,
- const BucketId &bucketId,
- const Timestamp &timestamp,
- uint32_t docSize,
- SubDbType subDbType);
-
- void
- modify(const GlobalId &gid,
- const BucketId &oldBucketId,
- const Timestamp &oldTimestamp,
- uint32_t oldDocSize,
- const BucketId &newBucketId,
- const Timestamp &newTimestamp,
- uint32_t newDocSize,
- SubDbType subDbType);
+ void modify(const GlobalId &gid,
+ const BucketId &oldBucketId, const Timestamp &oldTimestamp, uint32_t oldDocSize,
+ const BucketId &newBucketId, const Timestamp &newTimestamp, uint32_t newDocSize,
+ SubDbType subDbType);
BucketState get(const BucketId &bucketId) const;
void cacheBucket(const BucketId &bucketId);
diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/bucketstate.cpp b/searchcore/src/vespa/searchcore/proton/bucketdb/bucketstate.cpp
index 4219835778f..cea30680faf 100644
--- a/searchcore/src/vespa/searchcore/proton/bucketdb/bucketstate.cpp
+++ b/searchcore/src/vespa/searchcore/proton/bucketdb/bucketstate.cpp
@@ -1,42 +1,42 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "bucketstate.h"
+#include "checksumaggregators.h"
#include <cassert>
+#include <xxhash.h>
namespace proton::bucketdb {
namespace {
uint32_t
-gidChecksum(const document::GlobalId &gid)
-{
- union {
- const unsigned char *_c;
- const uint32_t *_i;
- } u;
- u._c = gid.get();
- const uint32_t *i = u._i;
- return i[0] + i[1] + i[2];
+toIdx(SubDbType subDbType) {
+ return static_cast<uint32_t>(subDbType);
}
-
-uint32_t
-timestampChecksum(const storage::spi::Timestamp &timestamp)
-{
- return (timestamp >> 32) + timestamp;
}
-inline uint32_t toIdx(SubDbType subDbType)
-{
- return static_cast<uint32_t>(subDbType);
+BucketState::ChecksumType BucketState::_checksumType = BucketState::ChecksumType::LEGACY;
+
+std::unique_ptr<ChecksumAggregator>
+BucketState::createChecksum(BucketChecksum seed) {
+ return ChecksumAggregator::create(_checksumType, seed);
}
+void
+BucketState::setChecksumType(ChecksumType type) {
+ _checksumType = type;
}
+BucketState::~BucketState() = default;
+BucketState::BucketState(const BucketState & rhs) = default;
+BucketState::BucketState(BucketState && rhs) noexcept = default;
+BucketState & BucketState::operator=(BucketState && rhs) noexcept = default;
+
BucketState::BucketState()
: _docCount(),
- _checksum(0),
_docSizes(),
+ _checksum(createChecksum(BucketChecksum(0))),
_active(false)
{
for (uint32_t i = 0; i < COUNTS; ++i) {
@@ -45,26 +45,23 @@ BucketState::BucketState()
}
}
-uint32_t
-BucketState::calcChecksum(const GlobalId &gid, const Timestamp &timestamp)
-{
- return gidChecksum(gid) + timestampChecksum(timestamp);
+BucketState::BucketChecksum
+BucketState::addChecksum(BucketChecksum a, BucketChecksum b) {
+ return createChecksum(a)->addChecksum(*createChecksum(b)).getChecksum();
}
-
void
BucketState::add(const GlobalId &gid, const Timestamp &timestamp, uint32_t docSize, SubDbType subDbType)
{
assert(subDbType < SubDbType::COUNT);
if (subDbType != SubDbType::REMOVED) {
- _checksum += calcChecksum(gid, timestamp);
+ _checksum->addDoc(gid, timestamp);
}
uint32_t subDbTypeIdx = toIdx(subDbType);
++_docCount[subDbTypeIdx];
_docSizes[subDbTypeIdx] += docSize;
}
-
void
BucketState::remove(const GlobalId &gid, const Timestamp &timestamp, uint32_t docSize, SubDbType subDbType)
{
@@ -73,15 +70,15 @@ BucketState::remove(const GlobalId &gid, const Timestamp &timestamp, uint32_t do
assert(_docCount[subDbTypeIdx] > 0);
assert(_docSizes[subDbTypeIdx] >= docSize);
if (subDbType != SubDbType::REMOVED) {
- _checksum -= calcChecksum(gid, timestamp);
+ _checksum->removeDoc(gid, timestamp);
}
--_docCount[subDbTypeIdx];
_docSizes[subDbTypeIdx] -= docSize;
}
-
void
-BucketState::modify(const Timestamp &oldTimestamp, uint32_t oldDocSize,
+BucketState::modify(const GlobalId &gid,
+ const Timestamp &oldTimestamp, uint32_t oldDocSize,
const Timestamp &newTimestamp, uint32_t newDocSize,
SubDbType subDbType)
{
@@ -90,27 +87,25 @@ BucketState::modify(const Timestamp &oldTimestamp, uint32_t oldDocSize,
assert(_docCount[subDbTypeIdx] > 0);
assert(_docSizes[subDbTypeIdx] >= oldDocSize);
if (subDbType != SubDbType::REMOVED) {
- _checksum = _checksum - timestampChecksum(oldTimestamp) +
- timestampChecksum(newTimestamp);
+ _checksum->removeDoc(gid, oldTimestamp);
+ _checksum->addDoc(gid, newTimestamp);
}
_docSizes[subDbTypeIdx] = _docSizes[subDbTypeIdx] + newDocSize - oldDocSize;
}
-
bool
BucketState::empty() const
{
if (getReadyCount() != 0 || getRemovedCount() != 0 ||
getNotReadyCount() != 0)
return false;
- assert(_checksum == 0);
+ assert(_checksum->empty());
for (uint32_t i = 0; i < COUNTS; ++i) {
assert(_docSizes[i] == 0);
}
return true;
}
-
BucketState &
BucketState::operator+=(const BucketState &rhs)
{
@@ -120,11 +115,10 @@ BucketState::operator+=(const BucketState &rhs)
for (uint32_t i = 0; i < COUNTS; ++i) {
_docSizes[i] += rhs._docSizes[i];
}
- _checksum += rhs._checksum;
+ _checksum->addChecksum(*rhs._checksum);
return *this;
}
-
BucketState &
BucketState::operator-=(const BucketState &rhs)
{
@@ -140,7 +134,7 @@ BucketState::operator-=(const BucketState &rhs)
for (uint32_t i = 0; i < COUNTS; ++i) {
_docSizes[i] -= rhs._docSizes[i];
}
- _checksum -= rhs._checksum;
+ _checksum->removeChecksum(*rhs._checksum);
return *this;
}
@@ -166,11 +160,9 @@ BucketState::operator storage::spi::BucketInfo() const
using BucketInfo = storage::spi::BucketInfo;
- return BucketInfo(storage::spi::BucketChecksum(_checksum),
- documentCount,
- docSizes,
- entryCount,
- entrySizes,
+ return BucketInfo(getChecksum(),
+ documentCount, docSizes,
+ entryCount, entrySizes,
notReady > 0 ? BucketInfo::NOT_READY : BucketInfo::READY,
_active ? BucketInfo::ACTIVE : BucketInfo::NOT_ACTIVE);
}
diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/bucketstate.h b/searchcore/src/vespa/searchcore/proton/bucketdb/bucketstate.h
index 1fd5289da0b..8c390b288ae 100644
--- a/searchcore/src/vespa/searchcore/proton/bucketdb/bucketstate.h
+++ b/searchcore/src/vespa/searchcore/proton/bucketdb/bucketstate.h
@@ -2,46 +2,58 @@
#pragma once
+#include "checksumaggregator.h"
#include <vespa/searchcore/proton/common/subdbtype.h>
-#include <vespa/document/base/globalid.h>
-#include <vespa/persistence/spi/bucketinfo.h>
+#include <vespa/vespalib/util/memory.h>
namespace proton::bucketdb {
+class ChecksumAggregator;
+
/**
* Class BucketState represent the known state of a bucket in raw form.
*/
class BucketState
{
public:
- typedef document::GlobalId GlobalId;
- typedef storage::spi::Timestamp Timestamp;
+ using ChecksumType = ChecksumAggregator::ChecksumType;
+ using GlobalId = document::GlobalId;
+ using Timestamp = storage::spi::Timestamp;
+ using BucketChecksum = storage::spi::BucketChecksum;
private:
static constexpr uint32_t READY = static_cast<uint32_t>(SubDbType::READY);
- static constexpr uint32_t REMOVED =
- static_cast<uint32_t>(SubDbType::REMOVED);
- static constexpr uint32_t NOTREADY =
- static_cast<uint32_t>(SubDbType::NOTREADY);
+ static constexpr uint32_t REMOVED = static_cast<uint32_t>(SubDbType::REMOVED);
+ static constexpr uint32_t NOTREADY = static_cast<uint32_t>(SubDbType::NOTREADY);
static constexpr uint32_t COUNTS = static_cast<uint32_t>(SubDbType::COUNT);
uint32_t _docCount[COUNTS];
- uint32_t _checksum;
size_t _docSizes[COUNTS];
- bool _active;
+ vespalib::CloneablePtr<ChecksumAggregator> _checksum;
+ bool _active;
+
+ static ChecksumType _checksumType;
+ static std::unique_ptr<ChecksumAggregator> createChecksum(BucketChecksum seed);
public:
+ static void setChecksumType(ChecksumType checksum);
BucketState();
+ BucketState(const BucketState & rhs);
+ BucketState(BucketState && rhs) noexcept;
+ BucketState & operator=(BucketState && rhs) noexcept;
+ ~BucketState();
- static uint32_t calcChecksum(const GlobalId &gid, const Timestamp &timestamp);
+ static BucketChecksum addChecksum(BucketChecksum a, BucketChecksum b);
void add(const GlobalId &gid, const Timestamp &timestamp, uint32_t docSize, SubDbType subDbType);
void remove(const GlobalId &gid, const Timestamp &timestamp, uint32_t docSize, SubDbType subDbType);
- void modify(const Timestamp &oldTimestamp, uint32_t oldDocSize,
+ void modify(const GlobalId &gid,
+ const Timestamp &oldTimestamp, uint32_t oldDocSize,
const Timestamp &newTimestamp, uint32_t newDocSize,
SubDbType subDbType);
- BucketState &setActive(bool active) {
+ BucketState &
+ setActive(bool active) {
_active = active;
return *this;
}
@@ -55,7 +67,7 @@ public:
size_t getNotReadyDocSizes() const { return _docSizes[NOTREADY]; }
uint32_t getDocumentCount() const { return getReadyCount() + getNotReadyCount(); }
uint32_t getEntryCount() const { return getDocumentCount() + getRemovedCount(); }
- storage::spi::BucketChecksum getChecksum() const { return storage::spi::BucketChecksum(_checksum); }
+ BucketChecksum getChecksum() const { return _checksum->getChecksum(); }
bool empty() const;
BucketState &operator+=(const BucketState &rhs);
BucketState &operator-=(const BucketState &rhs);
diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregator.cpp b/searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregator.cpp
new file mode 100644
index 00000000000..795d88ab106
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregator.cpp
@@ -0,0 +1,18 @@
+// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "checksumaggregators.h"
+
+namespace proton::bucketdb {
+
+std::unique_ptr<ChecksumAggregator>
+ChecksumAggregator::create(ChecksumType type, BucketChecksum seed) {
+ switch (type) {
+ case ChecksumType::LEGACY:
+ return std::make_unique<LegacyChecksumAggregator>(seed);
+ case ChecksumType::XXHASH64:
+ return std::make_unique<XXH64ChecksumAggregator>(seed);
+ }
+ abort();
+}
+
+}
diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregator.h b/searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregator.h
new file mode 100644
index 00000000000..421cca5c6b5
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregator.h
@@ -0,0 +1,30 @@
+// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/document/base/globalid.h>
+#include <vespa/persistence/spi/bucketinfo.h>
+
+namespace proton::bucketdb {
+
+/**
+ * Interface for aggregating bucket checksums.
+ **/
+class ChecksumAggregator {
+public:
+ enum class ChecksumType {LEGACY, XXHASH64};
+ using GlobalId = document::GlobalId;
+ using Timestamp = storage::spi::Timestamp;
+ using BucketChecksum = storage::spi::BucketChecksum;
+ virtual ~ChecksumAggregator() = default;
+ virtual ChecksumAggregator & addDoc(const GlobalId &gid, const Timestamp &timestamp) = 0;
+ virtual ChecksumAggregator & removeDoc(const GlobalId &gid, const Timestamp &timestamp) = 0;
+ virtual ChecksumAggregator & addChecksum(const ChecksumAggregator & rhs) = 0;
+ virtual ChecksumAggregator & removeChecksum(const ChecksumAggregator & rhs) = 0;
+ virtual BucketChecksum getChecksum() const = 0;
+ virtual bool empty() const = 0;;
+ virtual ChecksumAggregator * clone() const = 0;
+ static std::unique_ptr<ChecksumAggregator> create(ChecksumType type, BucketChecksum seed);
+};
+
+}
diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregators.cpp b/searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregators.cpp
new file mode 100644
index 00000000000..a118f416b6b
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregators.cpp
@@ -0,0 +1,111 @@
+// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "checksumaggregators.h"
+#include <xxhash.h>
+
+namespace proton::bucketdb {
+
+using BucketChecksum = ChecksumAggregator::BucketChecksum;
+using Timestamp = ChecksumAggregator::Timestamp;
+using GlobalId = ChecksumAggregator::GlobalId;
+
+namespace {
+
+uint32_t
+gidChecksum(const GlobalId &gid)
+{
+ uint32_t i[3];
+ memcpy(i, gid.get(), GlobalId::LENGTH);
+ return i[0] + i[1] + i[2];
+}
+
+
+uint32_t
+timestampChecksum(const Timestamp &timestamp)
+{
+ return (timestamp >> 32) + timestamp;
+}
+
+
+uint32_t
+calcChecksum(const GlobalId &gid, const Timestamp &timestamp)
+{
+ return gidChecksum(gid) + timestampChecksum(timestamp);
+}
+
+}
+
+LegacyChecksumAggregator *
+LegacyChecksumAggregator::clone() const {
+ return new LegacyChecksumAggregator(*this);
+}
+LegacyChecksumAggregator &
+LegacyChecksumAggregator::addDoc(const GlobalId &gid, const Timestamp &timestamp) {
+ _checksum += calcChecksum(gid, timestamp);
+ return *this;
+}
+LegacyChecksumAggregator &
+LegacyChecksumAggregator::removeDoc(const GlobalId &gid, const Timestamp &timestamp) {
+ _checksum -= calcChecksum(gid, timestamp);
+ return *this;
+}
+LegacyChecksumAggregator &
+LegacyChecksumAggregator::addChecksum(const ChecksumAggregator & rhs) {
+ _checksum += dynamic_cast<const LegacyChecksumAggregator &>(rhs)._checksum;
+ return *this;
+}
+LegacyChecksumAggregator &
+LegacyChecksumAggregator::removeChecksum(const ChecksumAggregator & rhs) {
+ _checksum -= dynamic_cast<const LegacyChecksumAggregator &>(rhs)._checksum;
+ return *this;
+}
+BucketChecksum
+LegacyChecksumAggregator::getChecksum() const {
+ return BucketChecksum(_checksum);
+}
+bool
+LegacyChecksumAggregator::empty() const { return _checksum == 0; }
+
+
+
+XXH64ChecksumAggregator *
+XXH64ChecksumAggregator::clone() const {
+ return new XXH64ChecksumAggregator(*this);
+}
+XXH64ChecksumAggregator &
+XXH64ChecksumAggregator::addDoc(const GlobalId &gid, const Timestamp &timestamp) {
+ _checksum ^= compute(gid, timestamp);
+ return *this;
+}
+XXH64ChecksumAggregator &
+XXH64ChecksumAggregator::removeDoc(const GlobalId &gid, const Timestamp &timestamp) {
+ _checksum ^= compute(gid, timestamp);
+ return *this;
+}
+XXH64ChecksumAggregator &
+XXH64ChecksumAggregator::addChecksum(const ChecksumAggregator & rhs) {
+ _checksum ^= dynamic_cast<const XXH64ChecksumAggregator &>(rhs)._checksum;
+ return *this;
+}
+XXH64ChecksumAggregator &
+XXH64ChecksumAggregator::removeChecksum(const ChecksumAggregator & rhs) {
+ _checksum ^= dynamic_cast<const XXH64ChecksumAggregator &>(rhs)._checksum;
+ return *this;
+}
+BucketChecksum
+XXH64ChecksumAggregator::getChecksum() const {
+ return BucketChecksum((_checksum >> 32) ^ (_checksum & 0xffffffffL));
+}
+bool
+XXH64ChecksumAggregator::empty() const { return _checksum == 0; }
+
+uint64_t
+XXH64ChecksumAggregator::compute(const GlobalId &gid, const Timestamp &timestamp) {
+ char buffer[20];
+ memcpy(&buffer[0], gid.get(), GlobalId::LENGTH);
+ uint64_t tmp = timestamp.getValue();
+ memcpy(&buffer[GlobalId::LENGTH], &tmp, sizeof(tmp));
+ return XXH64(buffer, sizeof(buffer), 0);
+}
+
+}
diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregators.h b/searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregators.h
new file mode 100644
index 00000000000..49762ad107f
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregators.h
@@ -0,0 +1,42 @@
+// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "checksumaggregator.h"
+
+namespace proton::bucketdb {
+
+/**
+ * Implementations of the legacy bucket checksums.
+ **/
+class LegacyChecksumAggregator : public ChecksumAggregator {
+public:
+ explicit LegacyChecksumAggregator(BucketChecksum seed) : _checksum(seed) { }
+ LegacyChecksumAggregator * clone() const override;
+ LegacyChecksumAggregator & addDoc(const GlobalId &gid, const Timestamp &timestamp) override;
+ LegacyChecksumAggregator & removeDoc(const GlobalId &gid, const Timestamp &timestamp) override;
+ LegacyChecksumAggregator & addChecksum(const ChecksumAggregator & rhs) override;
+ LegacyChecksumAggregator & removeChecksum(const ChecksumAggregator & rhs) override;
+ BucketChecksum getChecksum() const override;
+ bool empty() const override;
+private:
+ uint32_t _checksum;
+};
+
+/**
+ * Implementations of the bucket checksums based on XXHASH64.
+ **/
+class XXH64ChecksumAggregator : public ChecksumAggregator {
+public:
+ explicit XXH64ChecksumAggregator(BucketChecksum seed) : _checksum(seed) { }
+ XXH64ChecksumAggregator * clone() const override;
+ XXH64ChecksumAggregator & addDoc(const GlobalId &gid, const Timestamp &timestamp) override;
+ XXH64ChecksumAggregator & removeDoc(const GlobalId &gid, const Timestamp &timestamp) override;
+ XXH64ChecksumAggregator & addChecksum(const ChecksumAggregator & rhs) override;
+ XXH64ChecksumAggregator & removeChecksum(const ChecksumAggregator & rhs) override;
+ BucketChecksum getChecksum() const override;
+ bool empty() const override;
+private:
+ static uint64_t compute(const GlobalId &gid, const Timestamp &timestamp);
+ uint64_t _checksum;
+};
+
+}
diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/joinbucketssession.cpp b/searchcore/src/vespa/searchcore/proton/bucketdb/joinbucketssession.cpp
index f4c88402409..f8f707dbfc3 100644
--- a/searchcore/src/vespa/searchcore/proton/bucketdb/joinbucketssession.cpp
+++ b/searchcore/src/vespa/searchcore/proton/bucketdb/joinbucketssession.cpp
@@ -38,20 +38,16 @@ JoinBucketsSession::setup()
bool source2Active = extractInfo(_source2, source2State);
_wantTargetActive = source1Active || source2Active;
- _adjustSource1ActiveLids = calcFixupNeed(source1State, _wantTargetActive,
- false);
- _adjustSource2ActiveLids = calcFixupNeed(source2State, _wantTargetActive,
- false);
+ _adjustSource1ActiveLids = calcFixupNeed(source1State, _wantTargetActive, false);
+ _adjustSource2ActiveLids = calcFixupNeed(source2State, _wantTargetActive, false);
BucketState *targetState = nullptr;
(void) extractInfo(_target, targetState);
- _adjustTargetActiveLids = calcFixupNeed(targetState, _wantTargetActive,
- true);
+ _adjustTargetActiveLids = calcFixupNeed(targetState, _wantTargetActive, true);
}
bool
-JoinBucketsSession::mustFixupTargetActiveLids(bool movedSource1Docs,
- bool movedSource2Docs) const
+JoinBucketsSession::mustFixupTargetActiveLids(bool movedSource1Docs, bool movedSource2Docs) const
{
return _adjustTargetActiveLids ||
(_adjustSource1ActiveLids && movedSource1Docs) ||
@@ -68,8 +64,7 @@ JoinBucketsSession::applyDeltas(const BucketDeltaPair &deltas)
bool
-JoinBucketsSession::applyDelta(const BucketState &delta, BucketId &srcBucket,
- BucketState *dst)
+JoinBucketsSession::applyDelta(const BucketState &delta, BucketId &srcBucket, BucketState *dst)
{
if (!srcBucket.valid()) {
assert(delta.empty());
diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/joinbucketssession.h b/searchcore/src/vespa/searchcore/proton/bucketdb/joinbucketssession.h
index 1029379dfb1..9ef89992a9c 100644
--- a/searchcore/src/vespa/searchcore/proton/bucketdb/joinbucketssession.h
+++ b/searchcore/src/vespa/searchcore/proton/bucketdb/joinbucketssession.h
@@ -41,43 +41,14 @@ public:
const BucketId &source2,
const BucketId &target);
- void
- applyDeltas(const BucketDeltaPair &deltas);
-
- bool
- getWantTargetActive() const
- {
- return _wantTargetActive;
- }
-
- bool
- mustFixupTargetActiveLids(bool movedSource1Docs,
- bool movedSource2Docs) const;
-
-
- void
- setup();
-
- void
- finish();
-
- const BucketId &
- getSource1() const
- {
- return _source1;
- }
-
- const BucketId &
- getSource2() const
- {
- return _source2;
- }
-
- const BucketId &
- getTarget() const
- {
- return _target;
- }
+ void applyDeltas(const BucketDeltaPair &deltas);
+ bool getWantTargetActive() const { return _wantTargetActive; }
+ bool mustFixupTargetActiveLids(bool movedSource1Docs, bool movedSource2Docs) const;
+ void setup();
+ void finish();
+ const BucketId & getSource1() const { return _source1; }
+ const BucketId & getSource2() const { return _source2; }
+ const BucketId & getTarget() const { return _target;}
};
}
diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/splitbucketsession.cpp b/searchcore/src/vespa/searchcore/proton/bucketdb/splitbucketsession.cpp
index 0e085507a95..d03b630b822 100644
--- a/searchcore/src/vespa/searchcore/proton/bucketdb/splitbucketsession.cpp
+++ b/searchcore/src/vespa/searchcore/proton/bucketdb/splitbucketsession.cpp
@@ -41,13 +41,11 @@ SplitBucketSession::setup()
if (_target1.valid()) {
BucketState *target1State = _bucketDB->getBucketStatePtr(_target1);
- _adjustTarget1ActiveLids = calcFixupNeed(target1State, _sourceActive,
- true);
+ _adjustTarget1ActiveLids = calcFixupNeed(target1State, _sourceActive, true);
}
if (_target2.valid()) {
BucketState *target2State = _bucketDB->getBucketStatePtr(_target2);
- _adjustTarget2ActiveLids = calcFixupNeed(target2State, _sourceActive,
- true);
+ _adjustTarget2ActiveLids = calcFixupNeed(target2State, _sourceActive, true);
}
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp
index 8333911d7d9..7b27ba2fa3a 100644
--- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp
@@ -70,6 +70,19 @@ convert(InternalProtonType::Packetcompresstype type)
}
void
+setBucketCheckSumType(const ProtonConfig & proton)
+{
+ switch (proton.bucketdb.checksumtype) {
+ case InternalProtonType::Bucketdb::LEGACY:
+ bucketdb::BucketState::setChecksumType(bucketdb::BucketState::ChecksumType::LEGACY);
+ break;
+ case InternalProtonType::Bucketdb::XXHASH64:
+ bucketdb::BucketState::setChecksumType(bucketdb::BucketState::ChecksumType::XXHASH64);
+ break;
+ }
+}
+
+void
setFS4Compression(const ProtonConfig & proton)
{
FS4PersistentPacketStreamer & fs4(FS4PersistentPacketStreamer::Instance);
@@ -241,6 +254,7 @@ Proton::init(const BootstrapConfig::SP & configSnapshot)
const ProtonConfig &protonConfig = configSnapshot->getProtonConfig();
const HwInfo & hwInfo = configSnapshot->getHwInfo();
+ setBucketCheckSumType(protonConfig);
setFS4Compression(protonConfig);
_diskMemUsageSampler = std::make_unique<DiskMemUsageSampler>(protonConfig.basedir,
diskMemUsageSamplerConfig(protonConfig, hwInfo));