1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
|
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
#include "filechunk.h"
#include <vespa/vespalib/util/threadexecutor.h>
#include <vespa/searchlib/transactionlog/syncproxy.h>
#include <vespa/fastos/file.h>
#include <map>
#include <deque>
namespace search {
class PendingChunk;
class ProcessedChunk;
namespace common { class FileHeaderContext; }
class WriteableFileChunk : public FileChunk
{
public:
class Config
{
public:
Config()
: _compression(document::CompressionConfig::LZ4, 9, 60),
_maxChunkBytes(0x10000)
{ }
Config(const document::CompressionConfig &compression, size_t maxChunkBytes)
: _compression(compression),
_maxChunkBytes(maxChunkBytes)
{ }
const document::CompressionConfig & getCompression() const { return _compression; }
size_t getMaxChunkBytes() const { return _maxChunkBytes; }
private:
document::CompressionConfig _compression;
size_t _maxChunkBytes;
};
public:
typedef std::unique_ptr<WriteableFileChunk> UP;
WriteableFileChunk(vespalib::ThreadExecutor & executor,
FileId fileId, NameId nameId,
const vespalib::string & baseName,
uint64_t initialSerialNum,
uint32_t docIdLimit,
const Config & config,
const TuneFileSummary &tune,
const common::FileHeaderContext &fileHeaderContext,
const IBucketizer * bucketizer,
bool crcOnReadDisabled);
~WriteableFileChunk();
ssize_t read(uint32_t lid, SubChunkId chunk, vespalib::DataBuffer & buffer) const override;
void read(LidInfoWithLidV::const_iterator begin, size_t count, IBufferVisitor & visitor) const override;
LidInfo append(uint64_t serialNum, uint32_t lid, const void * buffer, size_t len);
void flush(bool block, uint64_t syncToken);
uint64_t getSerialNum() const { return _serialNum; }
void setSerialNum(uint64_t serialNum) { _serialNum = std::max(_serialNum, serialNum); }
fastos::TimeStamp getModificationTime() const override;
void freeze();
size_t getDiskFootprint() const override;
size_t getMemoryFootprint() const override;
size_t getMemoryMetaFootprint() const override;
MemoryUsage getMemoryUsage() const override;
size_t updateLidMap(const LockGuard &guard, ISetLid &lidMap, uint64_t serialNum, uint32_t docIdLimit) override;
void waitForDiskToCatchUpToNow() const;
void flushPendingChunks(uint64_t serialNum);
DataStoreFileChunkStats getStats() const override;
static uint64_t writeIdxHeader(const common::FileHeaderContext &fileHeaderContext, uint32_t docIdLimit, FastOS_FileInterface &file);
private:
using ProcessedChunkUP = std::unique_ptr<ProcessedChunk>;
typedef std::map<uint32_t, ProcessedChunkUP > ProcessedChunkMap;
typedef std::vector<ProcessedChunkUP> ProcessedChunkQ;
bool frozen() const override { return _frozen; }
void waitForChunkFlushedToDisk(uint32_t chunkId) const;
void waitForAllChunksFlushedToDisk() const;
void fileWriter(const uint32_t firstChunkId);
void internalFlush(uint32_t, uint64_t serialNum);
void enque(ProcessedChunkUP);
int32_t flushLastIfNonEmpty(bool force);
// _writeMonitor should not be held when calling restart
void restart(uint32_t nextChunkId);
ProcessedChunkQ drainQ();
void readDataHeader();
void readIdxHeader();
void writeDataHeader(const common::FileHeaderContext &fileHeaderContext);
bool needFlushPendingChunks(uint64_t serialNum, uint64_t datFileLen);
bool needFlushPendingChunks(const vespalib::MonitorGuard & guard, uint64_t serialNum, uint64_t datFileLen);
fastos::TimeStamp unconditionallyFlushPendingChunks(const vespalib::LockGuard & flushGuard, uint64_t serialNum, uint64_t datFileLen);
static void insertChunks(ProcessedChunkMap & orderedChunks, ProcessedChunkQ & newChunks, const uint32_t nextChunkId);
static ProcessedChunkQ fetchNextChain(ProcessedChunkMap & orderedChunks, const uint32_t firstChunkId);
ChunkMeta computeChunkMeta(const vespalib::LockGuard & guard,
const vespalib::GenerationHandler::Guard & bucketizerGuard,
size_t offset, const ProcessedChunk & tmp, const Chunk & active);
ChunkMetaV computeChunkMeta(ProcessedChunkQ & chunks, size_t startPos, size_t & sz, bool & done);
void writeData(const ProcessedChunkQ & chunks, size_t sz);
void updateChunkInfo(const ProcessedChunkQ & chunks, const ChunkMetaV & cmetaV, size_t sz);
void updateCurrentDiskFootprint();
size_t getDiskFootprint(const vespalib::MonitorGuard & guard) const;
Config _config;
SerialNum _serialNum;
bool _frozen;
// Lock order is _writeLock, _flushLock, _lock
vespalib::Monitor _lock;
vespalib::Lock _writeLock;
vespalib::Lock _flushLock;
FastOS_File _dataFile;
FastOS_File _idxFile;
using ChunkMap = std::map<uint32_t, Chunk::UP>;
ChunkMap _chunkMap;
using PendingChunks = std::deque<std::shared_ptr<PendingChunk>>;
PendingChunks _pendingChunks;
uint64_t _pendingIdx;
uint64_t _pendingDat;
uint64_t _currentDiskFootprint;
uint32_t _nextChunkId;
Chunk::UP _active;
size_t _alignment;
size_t _granularity;
size_t _maxChunkSize;
uint32_t _firstChunkIdToBeWritten;
bool _writeTaskIsRunning;
vespalib::Monitor _writeMonitor;
ProcessedChunkQ _writeQ;
vespalib::ThreadExecutor & _executor;
ProcessedChunkMap _orderedChunks;
BucketDensityComputer _bucketMap;
};
} // namespace search
|