summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2019-06-11 15:04:42 +0200
committerGitHub <noreply@github.com>2019-06-11 15:04:42 +0200
commite4e999fb5a70cad3f9f869ab0c915884f88e52c0 (patch)
tree36b0f870a1e061cba45e915083c1adf454e8c7dd
parent37893ddd1a4f68877c3be150cbeacf7336078f37 (diff)
parentb8816865133cdecfe1f43f75e54cdfb365860dad (diff)
Merge pull request #9743 from vespa-engine/balder/remove-sync-not-necessary
Balder/remove sync not necessary
-rw-r--r--searchlib/src/vespa/searchlib/docstore/logdatastore.cpp12
-rw-r--r--searchlib/src/vespa/searchlib/docstore/logdatastore.h4
-rw-r--r--searchlib/src/vespa/searchlib/docstore/writeablefilechunk.cpp69
-rw-r--r--searchlib/src/vespa/searchlib/docstore/writeablefilechunk.h6
4 files changed, 39 insertions, 52 deletions
diff --git a/searchlib/src/vespa/searchlib/docstore/logdatastore.cpp b/searchlib/src/vespa/searchlib/docstore/logdatastore.cpp
index f17f9459ff9..46fcdafc585 100644
--- a/searchlib/src/vespa/searchlib/docstore/logdatastore.cpp
+++ b/searchlib/src/vespa/searchlib/docstore/logdatastore.cpp
@@ -91,7 +91,7 @@ void
LogDataStore::updateSerialNum()
{
LockGuard guard(_updateLock);
- if (getPrevActive(guard) != NULL) {
+ if (getPrevActive(guard) != nullptr) {
if (getActive(guard).getSerialNum() <
getPrevActive(guard)->getLastPersistedSerialNum()) {
getActive(guard).setSerialNum(getPrevActive(guard)->getLastPersistedSerialNum());
@@ -234,7 +234,7 @@ LogDataStore::lastSyncToken() const
uint64_t lastSerial(getActive(guard).getLastPersistedSerialNum());
if (lastSerial == 0) {
const FileChunk * prev = getPrevActive(guard);
- if (prev != NULL) {
+ if (prev != nullptr) {
lastSerial = prev->getLastPersistedSerialNum();
}
}
@@ -274,7 +274,7 @@ LogDataStore::remove(uint64_t serialNum, uint32_t lid)
if (lm.valid()) {
_fileChunks[lm.getFileId()]->remove(lid, lm.size());
}
- lm = getActive(guard).append(serialNum, lid, NULL, 0);
+ lm = getActive(guard).append(serialNum, lid, nullptr, 0);
assert( lm.empty() );
_lidInfo[lid] = lm;
}
@@ -327,7 +327,7 @@ LogDataStore::getMaxCompactGain() const
void
LogDataStore::flush(uint64_t syncToken)
{
- WriteableFileChunk * active = NULL;
+ WriteableFileChunk * active = nullptr;
std::unique_ptr<FileChunkHolder> activeHolder;
assert(syncToken == _initFlushSyncToken);
{
@@ -604,7 +604,7 @@ LogDataStore::getDiskBloat() const
/// Do not count the holes in the last file as bloat
if (i != _active) {
const FileChunk * chunk = _fileChunks[i.getId()].get();
- if (chunk != NULL) {
+ if (chunk != nullptr) {
sz += chunk->getDiskBloat();
}
}
@@ -916,7 +916,7 @@ LogDataStore::scanDir(const vespalib::string &dir, const vespalib::string &suffi
if (file.size() > suffix.size() &&
file.find(suffix.c_str()) == file.size() - suffix.size()) {
vespalib::string base(file.substr(0, file.find(suffix.c_str())));
- char *err(NULL);
+ char *err(nullptr);
errno = 0;
NameId baseId(strtoul(base.c_str(), &err, 10));
if ((errno == 0) && (err[0] == '\0')) {
diff --git a/searchlib/src/vespa/searchlib/docstore/logdatastore.h b/searchlib/src/vespa/searchlib/docstore/logdatastore.h
index c4d1e8bbdb4..4ab747d115d 100644
--- a/searchlib/src/vespa/searchlib/docstore/logdatastore.h
+++ b/searchlib/src/vespa/searchlib/docstore/logdatastore.h
@@ -89,7 +89,7 @@ public:
const search::common::FileHeaderContext &fileHeaderContext,
transactionlog::SyncProxy &tlSyncer, const IBucketizer::SP & bucketizer, bool readOnly = false);
- ~LogDataStore();
+ ~LogDataStore() override;
// Implements IDataStore API
ssize_t read(uint32_t lid, vespalib::DataBuffer & buffer) const override;
@@ -220,7 +220,7 @@ private:
const FileChunk * getPrevActive(const LockGuard & guard) const {
assert(guard.locks(_updateLock));
(void) guard;
- return ( !_prevActive.isActive() ) ? _fileChunks[_prevActive.getId()].get() : NULL;
+ return ( !_prevActive.isActive() ) ? _fileChunks[_prevActive.getId()].get() : nullptr;
}
void setActive(const LockGuard & guard, FileId fileId) {
assert(guard.locks(_updateLock));
diff --git a/searchlib/src/vespa/searchlib/docstore/writeablefilechunk.cpp b/searchlib/src/vespa/searchlib/docstore/writeablefilechunk.cpp
index 91f5c37b817..50517cf09e2 100644
--- a/searchlib/src/vespa/searchlib/docstore/writeablefilechunk.cpp
+++ b/searchlib/src/vespa/searchlib/docstore/writeablefilechunk.cpp
@@ -3,7 +3,7 @@
#include "writeablefilechunk.h"
#include "data_store_file_chunk_stats.h"
#include "summaryexceptions.h"
-#include <vespa/vespalib/util/closuretask.h>
+#include <vespa/vespalib/util/lambdatask.h>
#include <vespa/vespalib/util/array.hpp>
#include <vespa/vespalib/data/fileheader.h>
#include <vespa/vespalib/data/databuffer.h>
@@ -14,8 +14,7 @@
#include <vespa/log/log.h>
LOG_SETUP(".search.writeablefilechunk");
-using vespalib::makeTask;
-using vespalib::makeClosure;
+using vespalib::makeLambdaTask;
using vespalib::FileHeader;
using vespalib::make_string;
using vespalib::LockGuard;
@@ -45,7 +44,6 @@ class PendingChunk
uint64_t _dataOffset;
uint32_t _dataLen;
public:
- typedef std::shared_ptr<PendingChunk> SP;
PendingChunk(uint64_t lastSerial, uint64_t dataOffset, uint32_t dataLen);
~PendingChunk();
vespalib::nbostream & getSerializedIdx() { return _idx; }
@@ -59,7 +57,6 @@ public:
class ProcessedChunk
{
public:
- typedef std::unique_ptr<ProcessedChunk> UP;
ProcessedChunk(uint32_t chunkId, uint32_t alignment)
: _chunkId(chunkId),
_payLoad(0),
@@ -77,7 +74,7 @@ private:
};
WriteableFileChunk::
-WriteableFileChunk(vespalib::ThreadExecutor &executor,
+WriteableFileChunk(vespalib::Executor &executor,
FileId fileId, NameId nameId,
const vespalib::string &baseName,
SerialNum initialSerialNum,
@@ -155,6 +152,7 @@ WriteableFileChunk::openIdx() {
}
return file;
}
+
WriteableFileChunk::~WriteableFileChunk()
{
if (!frozen()) {
@@ -177,7 +175,7 @@ WriteableFileChunk::updateLidMap(const LockGuard &guard, ISetLid &ds, uint64_t s
{
size_t sz = FileChunk::updateLidMap(guard, ds, serialNum, docIdLimit);
_nextChunkId = _chunkInfo.size();
- _active.reset( new Chunk(_nextChunkId++, Chunk::Config(_config.getMaxChunkBytes())));
+ _active = std::make_unique<Chunk>(_nextChunkId++, Chunk::Config(_config.getMaxChunkBytes()));
_serialNum = getLastPersistedSerialNum();
_firstChunkIdToBeWritten = _active->getId();
setDiskFootprint(0);
@@ -188,7 +186,7 @@ WriteableFileChunk::updateLidMap(const LockGuard &guard, ISetLid &ds, uint64_t s
void
WriteableFileChunk::restart(uint32_t nextChunkId)
{
- _executor.execute(makeTask(makeClosure(this, &WriteableFileChunk::fileWriter, nextChunkId)));
+ _executor.execute(makeLambdaTask([this, nextChunkId] {fileWriter(nextChunkId);}));
}
namespace {
@@ -219,7 +217,7 @@ WriteableFileChunk::read(LidInfoWithLidV::const_iterator begin, size_t count, IB
const LidInfoWithLid & li = *(begin + i);
uint32_t chunk = li.getChunkId();
if ((chunk >= _chunkInfo.size()) || !_chunkInfo[chunk].valid()) {
- ChunkMap::const_iterator found = _chunkMap.find(chunk);
+ auto found = _chunkMap.find(chunk);
vespalib::ConstBufferRef buffer;
if (found != _chunkMap.end()) {
buffer = found->second->getLid(li.getLid());
@@ -234,8 +232,8 @@ WriteableFileChunk::read(LidInfoWithLidV::const_iterator begin, size_t count, IB
}
}
for (auto & it : chunksOnFile) {
- LidInfoWithLidV::const_iterator first = find_first(begin, it.first);
- LidInfoWithLidV::const_iterator last = seek_past(first, begin + count, it.first);
+ auto first = find_first(begin, it.first);
+ auto last = seek_past(first, begin + count, it.first);
FileChunk::read(first, last - first, it.second, visitor);
}
} else {
@@ -250,7 +248,7 @@ WriteableFileChunk::read(uint32_t lid, SubChunkId chunkId, vespalib::DataBuffer
if (!frozen()) {
LockGuard guard(_lock);
if ((chunkId >= _chunkInfo.size()) || !_chunkInfo[chunkId].valid()) {
- ChunkMap::const_iterator found = _chunkMap.find(chunkId);
+ auto found = _chunkMap.find(chunkId);
if (found != _chunkMap.end()) {
return found->second->read(lid, buffer);
} else {
@@ -268,13 +266,13 @@ WriteableFileChunk::read(uint32_t lid, SubChunkId chunkId, vespalib::DataBuffer
void
WriteableFileChunk::internalFlush(uint32_t chunkId, uint64_t serialNum)
{
- Chunk * active(NULL);
+ Chunk * active(nullptr);
{
LockGuard guard(_lock);
active = _chunkMap[chunkId].get();
}
- ProcessedChunk::UP tmp(new ProcessedChunk(chunkId, _alignment));
+ auto tmp = std::make_unique<ProcessedChunk>(chunkId, _alignment);
if (_alignment > 1) {
tmp->getBuf().ensureFree(active->getMaxPackSize(_config.getCompression()) + _alignment - 1);
}
@@ -293,12 +291,12 @@ WriteableFileChunk::internalFlush(uint32_t chunkId, uint64_t serialNum)
}
void
-WriteableFileChunk::enque(ProcessedChunk::UP tmp)
+WriteableFileChunk::enque(ProcessedChunkUP tmp)
{
LOG(debug, "enqueing %p", tmp.get());
MonitorGuard guard(_writeMonitor);
_writeQ.push_back(std::move(tmp));
- if (_writeTaskIsRunning == false) {
+ if ( ! _writeTaskIsRunning) {
_writeTaskIsRunning = true;
uint32_t nextChunkId = _firstChunkIdToBeWritten;
guard.signal();
@@ -359,12 +357,12 @@ WriteableFileChunk::insertChunks(ProcessedChunkMap & orderedChunks, ProcessedChu
{
(void) nextChunkId;
for (auto &chunk : newChunks) {
- if (chunk.get() != 0) {
+ if (chunk) {
assert(chunk->getChunkId() >= nextChunkId);
assert(orderedChunks.find(chunk->getChunkId()) == orderedChunks.end());
orderedChunks[chunk->getChunkId()] = std::move(chunk);
} else {
- orderedChunks[std::numeric_limits<uint32_t>::max()] = ProcessedChunk::UP();
+ orderedChunks[std::numeric_limits<uint32_t>::max()] = ProcessedChunkUP();
}
}
}
@@ -375,7 +373,7 @@ WriteableFileChunk::fetchNextChain(ProcessedChunkMap & orderedChunks, const uint
ProcessedChunkQ chunks;
while (!orderedChunks.empty() &&
((orderedChunks.begin()->first == (firstChunkId+chunks.size())) ||
- (orderedChunks.begin()->second.get() == NULL)))
+ !orderedChunks.begin()->second))
{
chunks.push_back(std::move(orderedChunks.begin()->second));
orderedChunks.erase(orderedChunks.begin());
@@ -393,8 +391,7 @@ WriteableFileChunk::computeChunkMeta(const LockGuard & guard,
const ChunkMeta cmeta(offset, tmp.getPayLoad(), active.getLastSerial(), active.count());
assert((size_t(tmp.getBuf().getData())%_alignment) == 0);
assert((dataLen%_alignment) == 0);
- PendingChunk::SP pcsp;
- pcsp.reset(new PendingChunk(active.getLastSerial(), offset, dataLen));
+ auto pcsp = std::make_shared<PendingChunk>(active.getLastSerial(), offset, dataLen);
PendingChunk &pc(*pcsp.get());
nbostream &os(pc.getSerializedIdx());
cmeta.serialize(os);
@@ -424,8 +421,7 @@ WriteableFileChunk::computeChunkMeta(ProcessedChunkQ & chunks, size_t startPos,
LockGuard guard(_lock);
if (!_pendingChunks.empty()) {
- const PendingChunk::SP pcsp(_pendingChunks.back());
- const PendingChunk &pc(*pcsp.get());
+ const PendingChunk & pc = *_pendingChunks.back();
assert(pc.getLastSerial() >= lastSerial);
lastSerial = pc.getLastSerial();
}
@@ -454,7 +450,7 @@ WriteableFileChunk::writeData(const ProcessedChunkQ & chunks, size_t sz)
{
vespalib::DataBuffer buf(0ul, _alignment);
buf.ensureFree(sz);
- for (const ProcessedChunk::UP & chunk : chunks) {
+ for (const auto & chunk : chunks) {
buf.writeBytes(chunk->getBuf().getData(), chunk->getBuf().getDataLen());
}
@@ -540,15 +536,15 @@ WriteableFileChunk::freeze()
{
if (!frozen()) {
waitForAllChunksFlushedToDisk();
- enque(ProcessedChunk::UP());
- _executor.sync();
+ enque(ProcessedChunkUP());
{
MonitorGuard guard(_writeMonitor);
while (_writeTaskIsRunning) {
guard.wait(10);
}
- assert(_writeQ.empty());
}
+ assert(_writeQ.empty());
+ assert(_chunkMap.empty());
{
MonitorGuard guard(_lock);
setDiskFootprint(getDiskFootprint(guard));
@@ -632,7 +628,7 @@ int32_t WriteableFileChunk::flushLastIfNonEmpty(bool force)
chunkId = _active->getId();
_chunkMap[chunkId] = std::move(_active);
assert(_nextChunkId < LidInfo::getChunkIdLimit());
- _active.reset(new Chunk(_nextChunkId++, Chunk::Config(_config.getMaxChunkBytes())));
+ _active = std::make_unique<Chunk>(_nextChunkId++, Chunk::Config(_config.getMaxChunkBytes()));
}
return chunkId;
}
@@ -643,10 +639,7 @@ WriteableFileChunk::flush(bool block, uint64_t syncToken)
int32_t chunkId = flushLastIfNonEmpty(syncToken > _serialNum);
if (chunkId >= 0) {
setSerialNum(syncToken);
- _executor.execute(makeTask(makeClosure(this,
- &WriteableFileChunk::internalFlush,
- static_cast<uint32_t>(chunkId),
- _serialNum)));
+ _executor.execute(makeLambdaTask([this, chunkId, serialNum=_serialNum] { internalFlush(chunkId, serialNum); }));
} else {
if (block) {
MonitorGuard guard(_lock);
@@ -656,7 +649,6 @@ WriteableFileChunk::flush(bool block, uint64_t syncToken)
}
}
if (block) {
- _executor.sync();
waitForChunkFlushedToDisk(chunkId);
}
}
@@ -693,10 +685,7 @@ WriteableFileChunk::waitForAllChunksFlushedToDisk() const
}
LidInfo
-WriteableFileChunk::append(uint64_t serialNum,
- uint32_t lid,
- const void * buffer,
- size_t len)
+WriteableFileChunk::append(uint64_t serialNum, uint32_t lid, const void * buffer, size_t len)
{
assert( !frozen() );
if ( ! _active->hasRoom(len)) {
@@ -818,8 +807,7 @@ WriteableFileChunk::needFlushPendingChunks(const MonitorGuard & guard, uint64_t
assert(guard.monitors(_lock));
if (_pendingChunks.empty())
return false;
- const PendingChunk::SP pcsp(_pendingChunks.front());
- const PendingChunk &pc(*pcsp.get());
+ const PendingChunk & pc = *_pendingChunks.front();
if (pc.getLastSerial() > serialNum)
return false;
bool datWritten = datFileLen >= pc.getDataOffset() + pc.getDataLen();
@@ -868,8 +856,7 @@ WriteableFileChunk::unconditionallyFlushPendingChunks(const vespalib::LockGuard
for (;;) {
if (!needFlushPendingChunks(guard, serialNum, datFileLen))
break;
- PendingChunk::SP pcsp;
- pcsp.swap(_pendingChunks.front());
+ std::shared_ptr<PendingChunk> pcsp = std::move(_pendingChunks.front());
_pendingChunks.pop_front();
const PendingChunk &pc(*pcsp.get());
assert(_pendingIdx >= pc.getIdxLen());
diff --git a/searchlib/src/vespa/searchlib/docstore/writeablefilechunk.h b/searchlib/src/vespa/searchlib/docstore/writeablefilechunk.h
index 4a2ebfc42df..2c300bc9035 100644
--- a/searchlib/src/vespa/searchlib/docstore/writeablefilechunk.h
+++ b/searchlib/src/vespa/searchlib/docstore/writeablefilechunk.h
@@ -3,7 +3,7 @@
#pragma once
#include "filechunk.h"
-#include <vespa/vespalib/util/threadexecutor.h>
+#include <vespa/vespalib/util/executor.h>
#include <vespa/searchlib/transactionlog/syncproxy.h>
#include <vespa/fastos/file.h>
#include <map>
@@ -42,7 +42,7 @@ public:
public:
typedef std::unique_ptr<WriteableFileChunk> UP;
- WriteableFileChunk(vespalib::ThreadExecutor & executor, FileId fileId, NameId nameId,
+ WriteableFileChunk(vespalib::Executor & executor, FileId fileId, NameId nameId,
const vespalib::string & baseName, uint64_t initialSerialNum,
uint32_t docIdLimit, const Config & config,
const TuneFileSummary &tune, const common::FileHeaderContext &fileHeaderContext,
@@ -128,7 +128,7 @@ private:
bool _writeTaskIsRunning;
vespalib::Monitor _writeMonitor;
ProcessedChunkQ _writeQ;
- vespalib::ThreadExecutor & _executor;
+ vespalib::Executor & _executor;
ProcessedChunkMap _orderedChunks;
BucketDensityComputer _bucketMap;
};