diff options
Diffstat (limited to 'searchlib/src/vespa/searchlib/docstore/filechunk.cpp')
-rw-r--r-- | searchlib/src/vespa/searchlib/docstore/filechunk.cpp | 192 |
1 files changed, 82 insertions, 110 deletions
diff --git a/searchlib/src/vespa/searchlib/docstore/filechunk.cpp b/searchlib/src/vespa/searchlib/docstore/filechunk.cpp index 71dfed86fdb..c57650bb16f 100644 --- a/searchlib/src/vespa/searchlib/docstore/filechunk.cpp +++ b/searchlib/src/vespa/searchlib/docstore/filechunk.cpp @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "filechunk.h" #include "data_store_file_chunk_stats.h" @@ -6,14 +6,12 @@ #include "randreaders.h" #include <vespa/searchlib/util/filekit.h> #include <vespa/vespalib/util/lambdatask.h> -#include <vespa/vespalib/util/size_literals.h> #include <vespa/vespalib/data/fileheader.h> #include <vespa/vespalib/data/databuffer.h> #include <vespa/vespalib/stllike/asciistream.h> #include <vespa/vespalib/objects/nbostream.h> #include <vespa/vespalib/util/executor.h> #include <vespa/vespalib/util/arrayqueue.hpp> -#include <vespa/vespalib/util/array.hpp> #include <vespa/fastos/file.h> #include <filesystem> #include <future> @@ -117,33 +115,14 @@ FileChunk::addNumBuckets(size_t numBucketsInChunk) } } -class TmpChunkMeta : public ChunkMeta, - public std::vector<LidMeta> -{ -public: - void fill(vespalib::nbostream & is) { - resize(getNumEntries()); - for (LidMeta & lm : *this) { - lm.deserialize(is); - } - } -}; - -using TmpChunkMetaV = std::vector<TmpChunkMeta>; - -namespace { - void -verifyOrAssert(const TmpChunkMetaV & v) -{ - for (auto prev(v.begin()), it(prev); it != v.end(); ++it) { - assert(prev->getLastSerial() <= it->getLastSerial()); - prev = it; +FileChunk::TmpChunkMeta::fill(vespalib::nbostream & is) { + resize(getNumEntries()); + for (LidMeta & lm : *this) { + lm.deserialize(is); } } -} - void FileChunk::erase() { @@ -152,98 +131,92 @@ FileChunk::erase() std::filesystem::remove(std::filesystem::path(_dataFileName)); } -size_t +void FileChunk::updateLidMap(const unique_lock &guard, ISetLid &ds, uint64_t serialNum, uint32_t docIdLimit) { - size_t sz(0); assert(_chunkInfo.empty()); FastOS_File idxFile(_idxFileName.c_str()); idxFile.enableMemoryMap(0); - if (idxFile.OpenReadOnly()) { - if (idxFile.IsMemoryMapped()) { - const int64_t fileSize = idxFile.getSize(); - if (_idxHeaderLen == 0) { - _idxHeaderLen = readIdxHeader(idxFile, _docIdLimit); + if ( ! idxFile.OpenReadOnly()) { + LOG_ABORT("should not reach here"); + } + if ( ! idxFile.IsMemoryMapped()) { + assert(idxFile.getSize() == 0); + return; + } + const int64_t fileSize = idxFile.getSize(); + if (_idxHeaderLen == 0) { + _idxHeaderLen = readIdxHeader(idxFile, _docIdLimit); + } + BucketDensityComputer globalBucketMap(_bucketizer); + // Guard comes from the same bucketizer so the same guard can be used + // for both local and global BucketDensityComputer + vespalib::GenerationHandler::Guard bucketizerGuard = globalBucketMap.getGuard(); + vespalib::nbostream is(static_cast<const char *>(idxFile.MemoryMapPtr(0)) + _idxHeaderLen, + fileSize - _idxHeaderLen); + for (size_t count=0; ! is.empty() && is.good(); count++) { + const int64_t lastKnownGoodPos = _idxHeaderLen + is.rp(); + TmpChunkMeta chunkMeta; + try { + chunkMeta.deserialize(is); + chunkMeta.fill(is); + if ((count == 0) && (chunkMeta.getLastSerial() < serialNum)) { + LOG(warning, "last serial num(%" PRIu64 ") from previous file is bigger than my first(%" PRIu64 + "). That is odd.Current filename is '%s'", + serialNum, chunkMeta.getLastSerial(), _idxFileName.c_str()); + serialNum = chunkMeta.getLastSerial(); } - vespalib::nbostream is(static_cast<const char *>(idxFile.MemoryMapPtr(0)) + _idxHeaderLen, - fileSize - _idxHeaderLen); - TmpChunkMetaV tempVector; - tempVector.reserve(fileSize/(sizeof(ChunkMeta)+sizeof(LidMeta))); - while ( ! is.empty() && is.good()) { - const int64_t lastKnownGoodPos = _idxHeaderLen + is.rp(); - tempVector.emplace_back(); - TmpChunkMeta & chunkMeta(tempVector.back()); - try { - chunkMeta.deserialize(is); - chunkMeta.fill(is); - } catch (const vespalib::IllegalStateException & e) { - LOG(warning, "Exception deserializing idx file : %s", e.what()); - LOG(warning, "File '%s' seems to be partially truncated. Will truncate from size=%" PRId64 " to %" PRId64, - _idxFileName.c_str(), fileSize, lastKnownGoodPos); - FastOS_File toTruncate(_idxFileName.c_str()); - if ( toTruncate.OpenReadWrite()) { - if (toTruncate.SetSize(lastKnownGoodPos)) { - tempVector.resize(tempVector.size() - 1); - } else { - throw SummaryException("SetSize() failed.", toTruncate, VESPA_STRLOC); - } - } else { - throw SummaryException("Open for truncation failed.", toTruncate, VESPA_STRLOC); - } - break; + assert(serialNum <= chunkMeta.getLastSerial()); + serialNum = handleChunk(guard, ds, docIdLimit, bucketizerGuard, globalBucketMap, chunkMeta); + assert(serialNum >= _lastPersistedSerialNum.load(std::memory_order_relaxed)); + _lastPersistedSerialNum.store(serialNum, std::memory_order_relaxed); + } catch (const vespalib::IllegalStateException & e) { + LOG(warning, "Exception deserializing idx file : %s", e.what()); + LOG(warning, "File '%s' seems to be partially truncated. Will truncate from size=%" PRId64 " to %" PRId64, + _idxFileName.c_str(), fileSize, lastKnownGoodPos); + FastOS_File toTruncate(_idxFileName.c_str()); + if ( toTruncate.OpenReadWrite()) { + if (toTruncate.SetSize(lastKnownGoodPos)) { + } else { + throw SummaryException("SetSize() failed.", toTruncate, VESPA_STRLOC); } + } else { + throw SummaryException("Open for truncation failed.", toTruncate, VESPA_STRLOC); } - if ( ! tempVector.empty()) { - verifyOrAssert(tempVector); - if (tempVector[0].getLastSerial() < serialNum) { - LOG(warning, - "last serial num(%" PRIu64 ") from previous file is " - "bigger than my first(%" PRIu64 "). That is odd." - "Current filename is '%s'", - serialNum, tempVector[0].getLastSerial(), - _idxFileName.c_str()); - serialNum = tempVector[0].getLastSerial(); - } - BucketDensityComputer globalBucketMap(_bucketizer); - // Guard comes from the same bucketizer so the same guard can be used - // for both local and global BucketDensityComputer - vespalib::GenerationHandler::Guard bucketizerGuard = globalBucketMap.getGuard(); - for (const TmpChunkMeta & chunkMeta : tempVector) { - assert(serialNum <= chunkMeta.getLastSerial()); - BucketDensityComputer bucketMap(_bucketizer); - for (size_t i(0), m(chunkMeta.getNumEntries()); i < m; i++) { - const LidMeta & lidMeta(chunkMeta[i]); - if (lidMeta.getLid() < docIdLimit) { - if (_bucketizer && (lidMeta.size() > 0)) { - document::BucketId bucketId = _bucketizer->getBucketOf(bucketizerGuard, lidMeta.getLid()); - bucketMap.recordLid(bucketId); - globalBucketMap.recordLid(bucketId); - } - ds.setLid(guard, lidMeta.getLid(), LidInfo(getFileId().getId(), _chunkInfo.size(), lidMeta.size())); - _numLids++; - } else { - remove(lidMeta.getLid(), lidMeta.size()); - } - _addedBytes += adjustSize(lidMeta.size()); - } - serialNum = chunkMeta.getLastSerial(); - addNumBuckets(bucketMap.getNumBuckets()); - _chunkInfo.emplace_back(chunkMeta.getOffset(), chunkMeta.getSize(), chunkMeta.getLastSerial()); - assert(serialNum >= _lastPersistedSerialNum.load(std::memory_order_relaxed)); - _lastPersistedSerialNum.store(serialNum, std::memory_order_relaxed); - } - _numUniqueBuckets = globalBucketMap.getNumBuckets(); + break; + } + } + _numUniqueBuckets = globalBucketMap.getNumBuckets(); +} + +uint64_t +FileChunk::handleChunk(const unique_lock &guard, ISetLid &ds, uint32_t docIdLimit, + const vespalib::GenerationHandler::Guard & bucketizerGuard, BucketDensityComputer &globalBucketMap, + const TmpChunkMeta & chunkMeta) { + BucketDensityComputer bucketMap(_bucketizer); + for (size_t i(0), m(chunkMeta.getNumEntries()); i < m; i++) { + const LidMeta & lidMeta(chunkMeta[i]); + if (lidMeta.getLid() < docIdLimit) { + if (_bucketizer && (lidMeta.size() > 0)) { + document::BucketId bucketId = _bucketizer->getBucketOf(bucketizerGuard, lidMeta.getLid()); + bucketMap.recordLid(bucketId); + globalBucketMap.recordLid(bucketId); } + ds.setLid(guard, lidMeta.getLid(), LidInfo(getFileId().getId(), _chunkInfo.size(), lidMeta.size())); + _numLids++; } else { - assert(idxFile.getSize() == 0); + remove(lidMeta.getLid(), lidMeta.size()); } - } else { - LOG_ABORT("should not reach here"); + _addedBytes.store(getAddedBytes() + adjustSize(lidMeta.size()), std::memory_order_relaxed); } - return sz; + uint64_t serialNum = chunkMeta.getLastSerial(); + addNumBuckets(bucketMap.getNumBuckets()); + _chunkInfo.emplace_back(chunkMeta.getOffset(), chunkMeta.getSize(), chunkMeta.getLastSerial()); + return serialNum; } + void FileChunk::enableRead() { @@ -277,8 +250,8 @@ void FileChunk::remove(uint32_t lid, uint32_t size) { (void) lid; - _erasedCount++; - _erasedBytes += adjustSize(size); + _erasedCount.store(getErasedCount() + 1, std::memory_order_relaxed); + _erasedBytes.store(getErasedBytes() + adjustSize(size), std::memory_order_relaxed); } uint64_t @@ -312,9 +285,9 @@ appendChunks(FixedParams * args, Chunk::UP chunk) if (args->db.getLid(args->lidReadGuard, e.getLid()) == lidInfo) { auto guard(args->db.getLidGuard(e.getLid())); if (args->db.getLid(args->lidReadGuard, e.getLid()) == lidInfo) { - // I am still in use so I need to taken care of. + // I am still in use, so I need to be taken care of. vespalib::ConstBufferRef data(chunk->getLid(e.getLid())); - args->dest.write(std::move(guard), chunk->getId(), e.getLid(), data.c_str(), data.size()); + args->dest.write(std::move(guard), chunk->getId(), e.getLid(), data); } } } @@ -478,7 +451,7 @@ FileChunk::verify(bool reportOnly) const (void) reportOnly; LOG(info, "Verifying file '%s' with fileid '%u'. erased-count='%zu' and erased-bytes='%zu'. diskFootprint='%zu'", - _name.c_str(), _fileId.getId(), _erasedCount, _erasedBytes, _diskFootprint.load(std::memory_order_relaxed)); + _name.c_str(), _fileId.getId(), getErasedCount(), getErasedBytes(), _diskFootprint.load(std::memory_order_relaxed)); uint64_t lastSerial(0); size_t chunkId(0); bool errorInPrev(false); @@ -581,8 +554,7 @@ FileChunk::getStats() const uint64_t serialNum = getLastPersistedSerialNum(); uint32_t docIdLimit = getDocIdLimit(); uint64_t nameId = getNameId().getId(); - return DataStoreFileChunkStats(diskFootprint, diskBloat, bucketSpread, - serialNum, serialNum, docIdLimit, nameId); + return {diskFootprint, diskBloat, bucketSpread, serialNum, serialNum, docIdLimit, nameId}; } } // namespace search |