aboutsummaryrefslogtreecommitdiffstats
path: root/searchlib/src/vespa/searchlib/transactionlog/common.cpp
blob: 9bf43c8e24478354ab7a8f7fe85f6eb52cdafea3 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.

#include "common.h"
#include <vespa/vespalib/util/stringfmt.h>
#include <vespa/fastos/file.h>

namespace search::transactionlog {

using vespalib::nbostream;
using vespalib::nbostream_longlivedbuf;
using vespalib::make_string;
using std::runtime_error;

namespace {

void throwRangeError(SerialNum prev, SerialNum next) __attribute__((noinline));

void throwRangeError(SerialNum prev, SerialNum next) {
    if (prev < next) return;
    throw runtime_error(make_string("The new serialnum %zu is not higher than the old one %zu", next, prev));
}

}

int
makeDirectory(const char * dir)
{
    int retval(-1);

    FastOS_StatInfo st;
    if ( FastOS_File::Stat(dir, &st) ) {
        retval = st._isDirectory ? 0 : -2;
    } else {
        retval = FastOS_File::MakeDirectory(dir) ? 0 : -3;
    }

    return retval;
}

int64_t
SerialNumRange::cmp(const SerialNumRange & b) const
{
    int64_t diff(0);
    if ( ! (contains(b) || b.contains(*this)) ) {
        diff = _from - b._from;
    }
    return diff;
}

Packet::Packet(const void * buf, size_t sz) :
     _count(0),
     _range(),
     _buf(static_cast<const char *>(buf), sz)
{
    nbostream_longlivedbuf os(_buf.c_str(), sz);
    while ( os.size() > 0 ) {
        Entry e;
        e.deserialize(os);
        if (_range.to() == 0) {
            _range.from(e.serial());
        }
        _range.to(e.serial());
        _count++;
    }
}

void
Packet::merge(const Packet & packet)
{
    if (_range.to() >= packet.range().from()) {
        throwRangeError(_range.to(), packet.range().from());
    }
    if (_buf.empty()) {
        _range.from(packet.range().from());
    }
    _count += packet._count;
    _range.to(packet._range.to());
    _buf.write(packet.getHandle().c_str(), packet.getHandle().size());
}

nbostream &
Packet::Entry::deserialize(nbostream & os)
{
    _valid = false;
    int32_t len(0);
    os >> _unique >> _type >> len;
    _data = vespalib::ConstBufferRef(os.peek(), len);
    os.adjustReadPos(len);
    _valid = true;
    return os;
}

nbostream &
Packet::Entry::serialize(nbostream & os) const
{
    os << _unique << _type << static_cast<uint32_t>(_data.size());
    os.write(_data.c_str(), _data.size());
    return os;
}

Packet::Entry::Entry(SerialNum u, Type t, const vespalib::ConstBufferRef & d) :
    _unique(u),
    _type(t),
    _valid(true),
    _data(d)
{ }

void
Packet::add(const Packet::Entry & e)
{
    if (_range.to() >= e.serial()) {
        throwRangeError(_range.to(), e.serial());
    }

    if (_buf.empty()) {
        _range.from(e.serial());
    }
    e.serialize(_buf);
    _count++;
    _range.to(e.serial());
}

}