diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2017-09-14 13:51:39 +0200 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2017-09-14 13:51:39 +0200 |
commit | beb641c2a972af3c992ce46407297e87c3d8e193 (patch) | |
tree | fd095454748a433b28ce5b2a86e2c2874e87fbe2 | |
parent | e165d6b5512922999b91f78054d3070e33307c8b (diff) |
Use multiple threads in decompress due to more expensive zstd
5 files changed, 63 insertions, 28 deletions
diff --git a/searchlib/src/vespa/searchlib/docstore/filechunk.cpp b/searchlib/src/vespa/searchlib/docstore/filechunk.cpp index 2a748a302c6..c823bacb10c 100644 --- a/searchlib/src/vespa/searchlib/docstore/filechunk.cpp +++ b/searchlib/src/vespa/searchlib/docstore/filechunk.cpp @@ -10,8 +10,11 @@ #include <vespa/vespalib/util/array.hpp> #include <vespa/vespalib/stllike/hash_map.hpp> #include <vespa/searchlib/util/filekit.h> +#include <vespa/searchlib/common/lambdatask.h> #include <vespa/vespalib/objects/nbostream.h> #include <vespa/fastos/file.h> +#include <vespa/vespalib/util/threadstackexecutor.h> +#include <future> #include <vespa/log/log.h> LOG_SETUP(".search.filechunk"); @@ -296,35 +299,65 @@ FileChunk::getModificationTime() const return _modificationTime; } +namespace { + +using FutureChunk = std::future<Chunk::UP>; + +struct FixedParams { + const IGetLid & db; + IWriteData & dest; + const vespalib::GenerationHandler::Guard & lidReadGuard; + uint32_t fileId; + IFileChunkVisitorProgress *visitorProgress; +}; + void -FileChunk::appendTo(const IGetLid & db, IWriteData & dest, - uint32_t numChunks, - IFileChunkVisitorProgress *visitorProgress) +appendChunks(FixedParams * args, Chunk::UP chunk) +{ + const Chunk::LidList ll(chunk->getUniqueLids()); + for (const Chunk::Entry & e : ll) { + LidInfo lidInfo(args->fileId, chunk->getId(), e.netSize()); + if (args->db.getLid(args->lidReadGuard, e.getLid()) == lidInfo) { + vespalib::LockGuard guard(args->db.getLidGuard(e.getLid())); + if (args->db.getLid(args->lidReadGuard, e.getLid()) == lidInfo) { + // I am still in use so I need to taken care of. + vespalib::ConstBufferRef data(chunk->getLid(e.getLid())); + args->dest.write(guard, chunk->getId(), e.getLid(), data.c_str(), data.size()); + } + } + } + if (args->visitorProgress != NULL) { + args->visitorProgress->updateProgress(); + } +} + +} + + +void +FileChunk::appendTo(vespalib::Executor & executor, const IGetLid & db, IWriteData & dest, + uint32_t numChunks, IFileChunkVisitorProgress *visitorProgress) { assert(frozen() || visitorProgress); vespalib::GenerationHandler::Guard lidReadGuard(db.getLidReadGuard()); assert(numChunks <= getNumChunks()); + FixedParams fixedParams = {db, dest, lidReadGuard, getFileId().getId(), visitorProgress}; + vespalib::ThreadStackExecutor singleExecutor(1, 64*1024); for (size_t chunkId(0); chunkId < numChunks; chunkId++) { - const ChunkInfo & cInfo(_chunkInfo[chunkId]); - vespalib::DataBuffer whole(0ul, ALIGNMENT); - FileRandRead::FSP keepAlive(_file->read(cInfo.getOffset(), whole, cInfo.getSize())); - Chunk chunk(chunkId, whole.getData(), whole.getDataLen()); - const Chunk::LidList ll(chunk.getUniqueLids()); - for (const Chunk::Entry & e : ll) { - LidInfo lidInfo(getFileId().getId(), chunk.getId(), e.netSize()); - if (db.getLid(lidReadGuard, e.getLid()) == lidInfo) { - vespalib::LockGuard guard(db.getLidGuard(e.getLid())); - if (db.getLid(lidReadGuard, e.getLid()) == lidInfo) { - // I am still in use so I need to taken care of. - vespalib::ConstBufferRef data(chunk.getLid(e.getLid())); - dest.write(guard, chunk.getId(), e.getLid(), data.c_str(), data.size()); - } - } - } - if (visitorProgress != NULL) { - visitorProgress->updateProgress(); - } + std::promise<Chunk::UP> promisedChunk; + std::future<Chunk::UP> futureChunk = promisedChunk.get_future(); + executor.execute(makeLambdaTask([promise = std::move(promisedChunk), chunkId, this]() mutable { + const ChunkInfo & cInfo(_chunkInfo[chunkId]); + vespalib::DataBuffer whole(0ul, ALIGNMENT); + FileRandRead::FSP keepAlive(_file->read(cInfo.getOffset(), whole, cInfo.getSize())); + promise.set_value(std::make_unique<Chunk>(chunkId, whole.getData(), whole.getDataLen())); + })); + + singleExecutor.execute(makeLambdaTask([args = &fixedParams, chunk = std::move(futureChunk)]() mutable { + appendChunks(args, chunk.get()); + })); } + singleExecutor.sync(); dest.close(); } diff --git a/searchlib/src/vespa/searchlib/docstore/filechunk.h b/searchlib/src/vespa/searchlib/docstore/filechunk.h index 326a185f9de..a7b6556a0a3 100644 --- a/searchlib/src/vespa/searchlib/docstore/filechunk.h +++ b/searchlib/src/vespa/searchlib/docstore/filechunk.h @@ -18,6 +18,7 @@ class FastOS_FileInterface; namespace vespalib { class DataBuffer; class GenericHeader; + class Executor; } namespace search { @@ -161,7 +162,8 @@ public: virtual bool frozen() const { return true; } const vespalib::string & getName() const { return _name; } void compact(const IGetLid & iGetLid); - void appendTo(const IGetLid & db, IWriteData & dest, uint32_t numChunks, IFileChunkVisitorProgress *visitorProgress); + void appendTo(vespalib::Executor & executor, const IGetLid & db, IWriteData & dest, + uint32_t numChunks, IFileChunkVisitorProgress *visitorProgress); /** * Must be called after chunk has been created to allow correct * underlying file object to be created. Must be called before diff --git a/searchlib/src/vespa/searchlib/docstore/lid_info.h b/searchlib/src/vespa/searchlib/docstore/lid_info.h index e86a86bce49..10ddd868c41 100644 --- a/searchlib/src/vespa/searchlib/docstore/lid_info.h +++ b/searchlib/src/vespa/searchlib/docstore/lid_info.h @@ -79,7 +79,7 @@ public: using Guard = vespalib::GenerationHandler::Guard; virtual ~IGetLid() { } - virtual LidInfo getLid(Guard & guard, uint32_t lid) const = 0; + virtual LidInfo getLid(const Guard & guard, uint32_t lid) const = 0; virtual vespalib::LockGuard getLidGuard(uint32_t lid) const = 0; virtual Guard getLidReadGuard() const = 0; }; diff --git a/searchlib/src/vespa/searchlib/docstore/logdatastore.cpp b/searchlib/src/vespa/searchlib/docstore/logdatastore.cpp index 024e64c0bdd..4fa4142813c 100644 --- a/searchlib/src/vespa/searchlib/docstore/logdatastore.cpp +++ b/searchlib/src/vespa/searchlib/docstore/logdatastore.cpp @@ -461,7 +461,7 @@ void LogDataStore::compactFile(FileId fileId) compacter.reset(new docstore::Compacter(*this)); } - fc->appendTo(*this, *compacter, fc->getNumChunks(), nullptr); + fc->appendTo(_executor, *this, *compacter, fc->getNumChunks(), nullptr); if (destinationFileId.isActive()) { flushActiveAndWait(0); @@ -1067,7 +1067,7 @@ LogDataStore::accept(IDataStoreVisitor &visitor, WrapVisitorProgress wrapProgress(visitorProgress, totalChunks); for (FileId fcId : fileChunks) { FileChunk & fc = *_fileChunks[fcId.getId()]; - fc.appendTo(*this, wrap, fc.getNumChunks(), &wrapProgress); + fc.appendTo(_executor, *this, wrap, fc.getNumChunks(), &wrapProgress); if (prune) { internalFlushAll(); FileChunk::UP toDie; @@ -1078,7 +1078,7 @@ LogDataStore::accept(IDataStoreVisitor &visitor, toDie->erase(); } } - lfc.appendTo(*this, wrap, lastChunks, &wrapProgress); + lfc.appendTo(_executor, *this, wrap, lastChunks, &wrapProgress); if (prune) { internalFlushAll(); } diff --git a/searchlib/src/vespa/searchlib/docstore/logdatastore.h b/searchlib/src/vespa/searchlib/docstore/logdatastore.h index 080e6f80503..eb46e5438a9 100644 --- a/searchlib/src/vespa/searchlib/docstore/logdatastore.h +++ b/searchlib/src/vespa/searchlib/docstore/logdatastore.h @@ -176,7 +176,7 @@ public: } // Implements IGetLid API - LidInfo getLid(Guard & guard, uint32_t lid) const override { + LidInfo getLid(const Guard & guard, uint32_t lid) const override { (void) guard; if (lid < getDocIdLimit()) { return _lidInfo[lid]; |