diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2019-06-11 15:04:42 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-06-11 15:04:42 +0200 |
commit | e4e999fb5a70cad3f9f869ab0c915884f88e52c0 (patch) | |
tree | 36b0f870a1e061cba45e915083c1adf454e8c7dd | |
parent | 37893ddd1a4f68877c3be150cbeacf7336078f37 (diff) | |
parent | b8816865133cdecfe1f43f75e54cdfb365860dad (diff) |
Merge pull request #9743 from vespa-engine/balder/remove-sync-not-necessary
Balder/remove sync not necessary
4 files changed, 39 insertions, 52 deletions
diff --git a/searchlib/src/vespa/searchlib/docstore/logdatastore.cpp b/searchlib/src/vespa/searchlib/docstore/logdatastore.cpp index f17f9459ff9..46fcdafc585 100644 --- a/searchlib/src/vespa/searchlib/docstore/logdatastore.cpp +++ b/searchlib/src/vespa/searchlib/docstore/logdatastore.cpp @@ -91,7 +91,7 @@ void LogDataStore::updateSerialNum() { LockGuard guard(_updateLock); - if (getPrevActive(guard) != NULL) { + if (getPrevActive(guard) != nullptr) { if (getActive(guard).getSerialNum() < getPrevActive(guard)->getLastPersistedSerialNum()) { getActive(guard).setSerialNum(getPrevActive(guard)->getLastPersistedSerialNum()); @@ -234,7 +234,7 @@ LogDataStore::lastSyncToken() const uint64_t lastSerial(getActive(guard).getLastPersistedSerialNum()); if (lastSerial == 0) { const FileChunk * prev = getPrevActive(guard); - if (prev != NULL) { + if (prev != nullptr) { lastSerial = prev->getLastPersistedSerialNum(); } } @@ -274,7 +274,7 @@ LogDataStore::remove(uint64_t serialNum, uint32_t lid) if (lm.valid()) { _fileChunks[lm.getFileId()]->remove(lid, lm.size()); } - lm = getActive(guard).append(serialNum, lid, NULL, 0); + lm = getActive(guard).append(serialNum, lid, nullptr, 0); assert( lm.empty() ); _lidInfo[lid] = lm; } @@ -327,7 +327,7 @@ LogDataStore::getMaxCompactGain() const void LogDataStore::flush(uint64_t syncToken) { - WriteableFileChunk * active = NULL; + WriteableFileChunk * active = nullptr; std::unique_ptr<FileChunkHolder> activeHolder; assert(syncToken == _initFlushSyncToken); { @@ -604,7 +604,7 @@ LogDataStore::getDiskBloat() const /// Do not count the holes in the last file as bloat if (i != _active) { const FileChunk * chunk = _fileChunks[i.getId()].get(); - if (chunk != NULL) { + if (chunk != nullptr) { sz += chunk->getDiskBloat(); } } @@ -916,7 +916,7 @@ LogDataStore::scanDir(const vespalib::string &dir, const vespalib::string &suffi if (file.size() > suffix.size() && file.find(suffix.c_str()) == file.size() - suffix.size()) { vespalib::string base(file.substr(0, file.find(suffix.c_str()))); - char *err(NULL); + char *err(nullptr); errno = 0; NameId baseId(strtoul(base.c_str(), &err, 10)); if ((errno == 0) && (err[0] == '\0')) { diff --git a/searchlib/src/vespa/searchlib/docstore/logdatastore.h b/searchlib/src/vespa/searchlib/docstore/logdatastore.h index c4d1e8bbdb4..4ab747d115d 100644 --- a/searchlib/src/vespa/searchlib/docstore/logdatastore.h +++ b/searchlib/src/vespa/searchlib/docstore/logdatastore.h @@ -89,7 +89,7 @@ public: const search::common::FileHeaderContext &fileHeaderContext, transactionlog::SyncProxy &tlSyncer, const IBucketizer::SP & bucketizer, bool readOnly = false); - ~LogDataStore(); + ~LogDataStore() override; // Implements IDataStore API ssize_t read(uint32_t lid, vespalib::DataBuffer & buffer) const override; @@ -220,7 +220,7 @@ private: const FileChunk * getPrevActive(const LockGuard & guard) const { assert(guard.locks(_updateLock)); (void) guard; - return ( !_prevActive.isActive() ) ? _fileChunks[_prevActive.getId()].get() : NULL; + return ( !_prevActive.isActive() ) ? _fileChunks[_prevActive.getId()].get() : nullptr; } void setActive(const LockGuard & guard, FileId fileId) { assert(guard.locks(_updateLock)); diff --git a/searchlib/src/vespa/searchlib/docstore/writeablefilechunk.cpp b/searchlib/src/vespa/searchlib/docstore/writeablefilechunk.cpp index 91f5c37b817..50517cf09e2 100644 --- a/searchlib/src/vespa/searchlib/docstore/writeablefilechunk.cpp +++ b/searchlib/src/vespa/searchlib/docstore/writeablefilechunk.cpp @@ -3,7 +3,7 @@ #include "writeablefilechunk.h" #include "data_store_file_chunk_stats.h" #include "summaryexceptions.h" -#include <vespa/vespalib/util/closuretask.h> +#include <vespa/vespalib/util/lambdatask.h> #include <vespa/vespalib/util/array.hpp> #include <vespa/vespalib/data/fileheader.h> #include <vespa/vespalib/data/databuffer.h> @@ -14,8 +14,7 @@ #include <vespa/log/log.h> LOG_SETUP(".search.writeablefilechunk"); -using vespalib::makeTask; -using vespalib::makeClosure; +using vespalib::makeLambdaTask; using vespalib::FileHeader; using vespalib::make_string; using vespalib::LockGuard; @@ -45,7 +44,6 @@ class PendingChunk uint64_t _dataOffset; uint32_t _dataLen; public: - typedef std::shared_ptr<PendingChunk> SP; PendingChunk(uint64_t lastSerial, uint64_t dataOffset, uint32_t dataLen); ~PendingChunk(); vespalib::nbostream & getSerializedIdx() { return _idx; } @@ -59,7 +57,6 @@ public: class ProcessedChunk { public: - typedef std::unique_ptr<ProcessedChunk> UP; ProcessedChunk(uint32_t chunkId, uint32_t alignment) : _chunkId(chunkId), _payLoad(0), @@ -77,7 +74,7 @@ private: }; WriteableFileChunk:: -WriteableFileChunk(vespalib::ThreadExecutor &executor, +WriteableFileChunk(vespalib::Executor &executor, FileId fileId, NameId nameId, const vespalib::string &baseName, SerialNum initialSerialNum, @@ -155,6 +152,7 @@ WriteableFileChunk::openIdx() { } return file; } + WriteableFileChunk::~WriteableFileChunk() { if (!frozen()) { @@ -177,7 +175,7 @@ WriteableFileChunk::updateLidMap(const LockGuard &guard, ISetLid &ds, uint64_t s { size_t sz = FileChunk::updateLidMap(guard, ds, serialNum, docIdLimit); _nextChunkId = _chunkInfo.size(); - _active.reset( new Chunk(_nextChunkId++, Chunk::Config(_config.getMaxChunkBytes()))); + _active = std::make_unique<Chunk>(_nextChunkId++, Chunk::Config(_config.getMaxChunkBytes())); _serialNum = getLastPersistedSerialNum(); _firstChunkIdToBeWritten = _active->getId(); setDiskFootprint(0); @@ -188,7 +186,7 @@ WriteableFileChunk::updateLidMap(const LockGuard &guard, ISetLid &ds, uint64_t s void WriteableFileChunk::restart(uint32_t nextChunkId) { - _executor.execute(makeTask(makeClosure(this, &WriteableFileChunk::fileWriter, nextChunkId))); + _executor.execute(makeLambdaTask([this, nextChunkId] {fileWriter(nextChunkId);})); } namespace { @@ -219,7 +217,7 @@ WriteableFileChunk::read(LidInfoWithLidV::const_iterator begin, size_t count, IB const LidInfoWithLid & li = *(begin + i); uint32_t chunk = li.getChunkId(); if ((chunk >= _chunkInfo.size()) || !_chunkInfo[chunk].valid()) { - ChunkMap::const_iterator found = _chunkMap.find(chunk); + auto found = _chunkMap.find(chunk); vespalib::ConstBufferRef buffer; if (found != _chunkMap.end()) { buffer = found->second->getLid(li.getLid()); @@ -234,8 +232,8 @@ WriteableFileChunk::read(LidInfoWithLidV::const_iterator begin, size_t count, IB } } for (auto & it : chunksOnFile) { - LidInfoWithLidV::const_iterator first = find_first(begin, it.first); - LidInfoWithLidV::const_iterator last = seek_past(first, begin + count, it.first); + auto first = find_first(begin, it.first); + auto last = seek_past(first, begin + count, it.first); FileChunk::read(first, last - first, it.second, visitor); } } else { @@ -250,7 +248,7 @@ WriteableFileChunk::read(uint32_t lid, SubChunkId chunkId, vespalib::DataBuffer if (!frozen()) { LockGuard guard(_lock); if ((chunkId >= _chunkInfo.size()) || !_chunkInfo[chunkId].valid()) { - ChunkMap::const_iterator found = _chunkMap.find(chunkId); + auto found = _chunkMap.find(chunkId); if (found != _chunkMap.end()) { return found->second->read(lid, buffer); } else { @@ -268,13 +266,13 @@ WriteableFileChunk::read(uint32_t lid, SubChunkId chunkId, vespalib::DataBuffer void WriteableFileChunk::internalFlush(uint32_t chunkId, uint64_t serialNum) { - Chunk * active(NULL); + Chunk * active(nullptr); { LockGuard guard(_lock); active = _chunkMap[chunkId].get(); } - ProcessedChunk::UP tmp(new ProcessedChunk(chunkId, _alignment)); + auto tmp = std::make_unique<ProcessedChunk>(chunkId, _alignment); if (_alignment > 1) { tmp->getBuf().ensureFree(active->getMaxPackSize(_config.getCompression()) + _alignment - 1); } @@ -293,12 +291,12 @@ WriteableFileChunk::internalFlush(uint32_t chunkId, uint64_t serialNum) } void -WriteableFileChunk::enque(ProcessedChunk::UP tmp) +WriteableFileChunk::enque(ProcessedChunkUP tmp) { LOG(debug, "enqueing %p", tmp.get()); MonitorGuard guard(_writeMonitor); _writeQ.push_back(std::move(tmp)); - if (_writeTaskIsRunning == false) { + if ( ! _writeTaskIsRunning) { _writeTaskIsRunning = true; uint32_t nextChunkId = _firstChunkIdToBeWritten; guard.signal(); @@ -359,12 +357,12 @@ WriteableFileChunk::insertChunks(ProcessedChunkMap & orderedChunks, ProcessedChu { (void) nextChunkId; for (auto &chunk : newChunks) { - if (chunk.get() != 0) { + if (chunk) { assert(chunk->getChunkId() >= nextChunkId); assert(orderedChunks.find(chunk->getChunkId()) == orderedChunks.end()); orderedChunks[chunk->getChunkId()] = std::move(chunk); } else { - orderedChunks[std::numeric_limits<uint32_t>::max()] = ProcessedChunk::UP(); + orderedChunks[std::numeric_limits<uint32_t>::max()] = ProcessedChunkUP(); } } } @@ -375,7 +373,7 @@ WriteableFileChunk::fetchNextChain(ProcessedChunkMap & orderedChunks, const uint ProcessedChunkQ chunks; while (!orderedChunks.empty() && ((orderedChunks.begin()->first == (firstChunkId+chunks.size())) || - (orderedChunks.begin()->second.get() == NULL))) + !orderedChunks.begin()->second)) { chunks.push_back(std::move(orderedChunks.begin()->second)); orderedChunks.erase(orderedChunks.begin()); @@ -393,8 +391,7 @@ WriteableFileChunk::computeChunkMeta(const LockGuard & guard, const ChunkMeta cmeta(offset, tmp.getPayLoad(), active.getLastSerial(), active.count()); assert((size_t(tmp.getBuf().getData())%_alignment) == 0); assert((dataLen%_alignment) == 0); - PendingChunk::SP pcsp; - pcsp.reset(new PendingChunk(active.getLastSerial(), offset, dataLen)); + auto pcsp = std::make_shared<PendingChunk>(active.getLastSerial(), offset, dataLen); PendingChunk &pc(*pcsp.get()); nbostream &os(pc.getSerializedIdx()); cmeta.serialize(os); @@ -424,8 +421,7 @@ WriteableFileChunk::computeChunkMeta(ProcessedChunkQ & chunks, size_t startPos, LockGuard guard(_lock); if (!_pendingChunks.empty()) { - const PendingChunk::SP pcsp(_pendingChunks.back()); - const PendingChunk &pc(*pcsp.get()); + const PendingChunk & pc = *_pendingChunks.back(); assert(pc.getLastSerial() >= lastSerial); lastSerial = pc.getLastSerial(); } @@ -454,7 +450,7 @@ WriteableFileChunk::writeData(const ProcessedChunkQ & chunks, size_t sz) { vespalib::DataBuffer buf(0ul, _alignment); buf.ensureFree(sz); - for (const ProcessedChunk::UP & chunk : chunks) { + for (const auto & chunk : chunks) { buf.writeBytes(chunk->getBuf().getData(), chunk->getBuf().getDataLen()); } @@ -540,15 +536,15 @@ WriteableFileChunk::freeze() { if (!frozen()) { waitForAllChunksFlushedToDisk(); - enque(ProcessedChunk::UP()); - _executor.sync(); + enque(ProcessedChunkUP()); { MonitorGuard guard(_writeMonitor); while (_writeTaskIsRunning) { guard.wait(10); } - assert(_writeQ.empty()); } + assert(_writeQ.empty()); + assert(_chunkMap.empty()); { MonitorGuard guard(_lock); setDiskFootprint(getDiskFootprint(guard)); @@ -632,7 +628,7 @@ int32_t WriteableFileChunk::flushLastIfNonEmpty(bool force) chunkId = _active->getId(); _chunkMap[chunkId] = std::move(_active); assert(_nextChunkId < LidInfo::getChunkIdLimit()); - _active.reset(new Chunk(_nextChunkId++, Chunk::Config(_config.getMaxChunkBytes()))); + _active = std::make_unique<Chunk>(_nextChunkId++, Chunk::Config(_config.getMaxChunkBytes())); } return chunkId; } @@ -643,10 +639,7 @@ WriteableFileChunk::flush(bool block, uint64_t syncToken) int32_t chunkId = flushLastIfNonEmpty(syncToken > _serialNum); if (chunkId >= 0) { setSerialNum(syncToken); - _executor.execute(makeTask(makeClosure(this, - &WriteableFileChunk::internalFlush, - static_cast<uint32_t>(chunkId), - _serialNum))); + _executor.execute(makeLambdaTask([this, chunkId, serialNum=_serialNum] { internalFlush(chunkId, serialNum); })); } else { if (block) { MonitorGuard guard(_lock); @@ -656,7 +649,6 @@ WriteableFileChunk::flush(bool block, uint64_t syncToken) } } if (block) { - _executor.sync(); waitForChunkFlushedToDisk(chunkId); } } @@ -693,10 +685,7 @@ WriteableFileChunk::waitForAllChunksFlushedToDisk() const } LidInfo -WriteableFileChunk::append(uint64_t serialNum, - uint32_t lid, - const void * buffer, - size_t len) +WriteableFileChunk::append(uint64_t serialNum, uint32_t lid, const void * buffer, size_t len) { assert( !frozen() ); if ( ! _active->hasRoom(len)) { @@ -818,8 +807,7 @@ WriteableFileChunk::needFlushPendingChunks(const MonitorGuard & guard, uint64_t assert(guard.monitors(_lock)); if (_pendingChunks.empty()) return false; - const PendingChunk::SP pcsp(_pendingChunks.front()); - const PendingChunk &pc(*pcsp.get()); + const PendingChunk & pc = *_pendingChunks.front(); if (pc.getLastSerial() > serialNum) return false; bool datWritten = datFileLen >= pc.getDataOffset() + pc.getDataLen(); @@ -868,8 +856,7 @@ WriteableFileChunk::unconditionallyFlushPendingChunks(const vespalib::LockGuard for (;;) { if (!needFlushPendingChunks(guard, serialNum, datFileLen)) break; - PendingChunk::SP pcsp; - pcsp.swap(_pendingChunks.front()); + std::shared_ptr<PendingChunk> pcsp = std::move(_pendingChunks.front()); _pendingChunks.pop_front(); const PendingChunk &pc(*pcsp.get()); assert(_pendingIdx >= pc.getIdxLen()); diff --git a/searchlib/src/vespa/searchlib/docstore/writeablefilechunk.h b/searchlib/src/vespa/searchlib/docstore/writeablefilechunk.h index 4a2ebfc42df..2c300bc9035 100644 --- a/searchlib/src/vespa/searchlib/docstore/writeablefilechunk.h +++ b/searchlib/src/vespa/searchlib/docstore/writeablefilechunk.h @@ -3,7 +3,7 @@ #pragma once #include "filechunk.h" -#include <vespa/vespalib/util/threadexecutor.h> +#include <vespa/vespalib/util/executor.h> #include <vespa/searchlib/transactionlog/syncproxy.h> #include <vespa/fastos/file.h> #include <map> @@ -42,7 +42,7 @@ public: public: typedef std::unique_ptr<WriteableFileChunk> UP; - WriteableFileChunk(vespalib::ThreadExecutor & executor, FileId fileId, NameId nameId, + WriteableFileChunk(vespalib::Executor & executor, FileId fileId, NameId nameId, const vespalib::string & baseName, uint64_t initialSerialNum, uint32_t docIdLimit, const Config & config, const TuneFileSummary &tune, const common::FileHeaderContext &fileHeaderContext, @@ -128,7 +128,7 @@ private: bool _writeTaskIsRunning; vespalib::Monitor _writeMonitor; ProcessedChunkQ _writeQ; - vespalib::ThreadExecutor & _executor; + vespalib::Executor & _executor; ProcessedChunkMap _orderedChunks; BucketDensityComputer _bucketMap; }; |