aboutsummaryrefslogtreecommitdiffstats
path: root/storage/src/vespa/storage/storageserver/rpc/slime_cluster_state_bundle_codec.cpp
blob: ea049493348601a41304c7ef08c5647be07e061f (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.

#include "slime_cluster_state_bundle_codec.h"
#include <vespa/document/bucket/fixed_bucket_spaces.h>
#include <vespa/vdslib/state/clusterstate.h>
#include <vespa/vdslib/state/cluster_state_bundle.h>
#include <vespa/vespalib/data/slime/slime.h>
#include <vespa/vespalib/stllike/asciistream.h>
#include <vespa/vespalib/util/size_literals.h>

using document::FixedBucketSpaces;
using vespalib::slime::Cursor;
using vespalib::slime::BinaryFormat;
using vespalib::DataBuffer;
using vespalib::ConstBufferRef;
using vespalib::compression::CompressionConfig;
using vespalib::compression::decompress;
using vespalib::compression::compress;
using vespalib::Memory;
using namespace vespalib::slime;

namespace storage::rpc {

// TODO find a suitable home for this class to avoid dupes with rpcsendv2.cpp
namespace {
class OutputBuf : public vespalib::Output {
public:
    explicit OutputBuf(size_t estimatedSize) : _buf(estimatedSize) { }
    ~OutputBuf() override;
    vespalib::DataBuffer & getBuf() { return _buf; }
private:
    vespalib::WritableMemory reserve(size_t bytes) override {
        _buf.ensureFree(bytes);
        return {_buf.getFree(), _buf.getFreeLen()};
    }
    Output &commit(size_t bytes) override {
        _buf.moveFreeToData(bytes);
        return *this;
    }
    vespalib::DataBuffer _buf;
};

OutputBuf::~OutputBuf() = default;

vespalib::string serialize_state(const lib::ClusterState& state) {
    vespalib::asciistream as;
    state.serialize(as);
    return as.str();
}

const Memory StatesField("states");
const Memory BaselineField("baseline");
const Memory SpacesField("spaces");
const Memory DeferredActivationField("deferred-activation");
const Memory FeedBlockField("feed-block");
const Memory BlockFeedInClusterField("block-feed-in-cluster");
const Memory DescriptionField("description");

}

// Only used from unit tests; the cluster controller encodes all bundles
// we decode in practice.
EncodedClusterStateBundle SlimeClusterStateBundleCodec::encode(
        const lib::ClusterStateBundle& bundle) const
{
    vespalib::Slime slime;
    Cursor& root = slime.setObject();
    if (bundle.deferredActivation()) {
        root.setBool(DeferredActivationField, bundle.deferredActivation());
    }
    Cursor& states = root.setObject(StatesField);
    states.setString(BaselineField, serialize_state(*bundle.getBaselineClusterState()));
    Cursor& spaces = states.setObject(SpacesField);
    for (const auto& sp : bundle.getDerivedClusterStates()) {
        spaces.setString(FixedBucketSpaces::to_string(sp.first), serialize_state(*sp.second));
    }
    // We only encode feed block state if the cluster is actually blocked.
    if (bundle.block_feed_in_cluster()) {
        Cursor& feed_block = root.setObject(FeedBlockField);
        feed_block.setBool(BlockFeedInClusterField, true);
        feed_block.setString(DescriptionField, bundle.feed_block()->description());
    }

    OutputBuf out_buf(4_Ki);
    BinaryFormat::encode(slime, out_buf);
    ConstBufferRef to_compress(out_buf.getBuf().getData(), out_buf.getBuf().getDataLen());
    auto buf = std::make_unique<DataBuffer>(vespalib::roundUp2inN(out_buf.getBuf().getDataLen()));
    auto actual_type = compress(CompressionConfig::LZ4, to_compress, *buf, false);

    EncodedClusterStateBundle encoded_bundle;
    encoded_bundle._compression_type = actual_type;
    assert(to_compress.size() <= INT32_MAX);
    encoded_bundle._uncompressed_length = to_compress.size();
    encoded_bundle._buffer = std::move(buf);
    return encoded_bundle;
}

namespace {


struct StateInserter : vespalib::slime::ObjectTraverser {
    lib::ClusterStateBundle::BucketSpaceStateMapping& _space_states;

    explicit StateInserter(lib::ClusterStateBundle::BucketSpaceStateMapping& space_states)
        : _space_states(space_states) {}

    void field(const Memory& symbol, const Inspector& inspector) override {
        _space_states.emplace(FixedBucketSpaces::from_string(symbol.make_stringref()),
                              std::make_shared<const lib::ClusterState>(inspector.asString().make_string()));
    }
};

}

std::shared_ptr<const lib::ClusterStateBundle> SlimeClusterStateBundleCodec::decode(
        const EncodedClusterStateBundle& encoded_bundle) const
{
    ConstBufferRef blob(encoded_bundle._buffer->getData(), encoded_bundle._buffer->getDataLen());
    DataBuffer uncompressed;
    decompress(encoded_bundle._compression_type, encoded_bundle._uncompressed_length,
               blob, uncompressed, false);
    if (encoded_bundle._uncompressed_length != uncompressed.getDataLen()) {
        throw std::range_error(vespalib::make_string("ClusterStateBundle indicated uncompressed size (%u) is "
                                                     "not equal to actual uncompressed size (%zu)",
                                                     encoded_bundle._uncompressed_length,
                                                     uncompressed.getDataLen()));
    }

    vespalib::Slime slime;
    BinaryFormat::decode(Memory(uncompressed.getData(), uncompressed.getDataLen()), slime);
    Inspector& root = slime.get();
    Inspector& states = root[StatesField];
    lib::ClusterState baseline(states[BaselineField].asString().make_string());

    Inspector& spaces = states[SpacesField];
    lib::ClusterStateBundle::BucketSpaceStateMapping space_states;
    StateInserter inserter(space_states);
    spaces.traverse(inserter);

    const bool deferred_activation = root[DeferredActivationField].asBool(); // Defaults to false if not set.

    Inspector& fb = root[FeedBlockField];
    if (fb.valid()) {
        lib::ClusterStateBundle::FeedBlock feed_block(fb[BlockFeedInClusterField].asBool(),
                                                      fb[DescriptionField].asString().make_string());
        return std::make_shared<lib::ClusterStateBundle>(baseline, std::move(space_states), feed_block, deferred_activation);
    }

    // TODO add shared_ptr constructor for baseline?
    return std::make_shared<lib::ClusterStateBundle>(baseline, std::move(space_states), deferred_activation);
}

}