aboutsummaryrefslogtreecommitdiffstats
path: root/searchcore/src/vespa/searchcore/proton/server/replaypacketdispatcher.cpp
blob: 790b70c6296021c0fac7f0aea3df339bf28516bd (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.

#include "replaypacketdispatcher.h"
#include <vespa/searchcore/proton/feedoperation/operations.h>
#include <vespa/document/util/serializableexceptions.h>

using vespalib::make_string;
using vespalib::IllegalStateException;

namespace proton {

template <typename OperationType>
void
ReplayPacketDispatcher::replay(OperationType &op, vespalib::nbostream &is, const Packet::Entry &entry)
{
    op.deserialize(is, _handler.getDeserializeRepo());
    op.setSerialNum(entry.serial());
    store(op);
    _handler.replay(op);
}


ReplayPacketDispatcher::ReplayPacketDispatcher(IReplayPacketHandler &handler)
    : _handler(handler)
{
}


void
ReplayPacketDispatcher::replayEntry(const Packet::Entry &entry)
{
    vespalib::nbostream is(entry.data().c_str(), entry.data().size());
    switch (entry.type()) {
    case FeedOperation::PUT: {
        PutOperation op;
        replay(op, is, entry);
        break;
    } case FeedOperation::REMOVE: {
        RemoveOperationWithDocId op;
        replay(op, is, entry);
        break;
    } case FeedOperation::REMOVE_GID: {
        RemoveOperationWithGid op;
        replay(op, is, entry);
        break;
    } case FeedOperation::UPDATE: {
        UpdateOperation op(static_cast<FeedOperation::Type>(entry.type()));
        replay(op, is, entry);
        break;
    } case FeedOperation::NOOP: {
        NoopOperation op;
        replay(op, is, entry);
        break;
    } case FeedOperation::NEW_CONFIG: {
        NewConfigOperation op(entry.serial(), _handler.getNewConfigStreamHandler());
        op.deserialize(is, _handler.getDeserializeRepo());
        _handler.replay(op);
        break;
    } case FeedOperation::DELETE_BUCKET: {
        DeleteBucketOperation op;
        replay(op, is, entry);
        break;
    } case FeedOperation::SPLIT_BUCKET: {
        SplitBucketOperation op;
        replay(op, is, entry);
        break;
    } case FeedOperation::JOIN_BUCKETS: {
        JoinBucketsOperation op;
        replay(op, is, entry);
        break;
    } case FeedOperation::PRUNE_REMOVED_DOCUMENTS: {
        PruneRemovedDocumentsOperation op;
        replay(op, is, entry);
        break;
    } case FeedOperation::MOVE: {
        MoveOperation op;
        replay(op, is, entry);
        break;
    } case FeedOperation::CREATE_BUCKET: {
        CreateBucketOperation op;
        replay(op, is, entry);
        break;
    } case FeedOperation::COMPACT_LID_SPACE: {
        CompactLidSpaceOperation op;
        replay(op, is, entry);
        break;
    } default:
        throw IllegalStateException
            (make_string("Got packet entry with unknown type id '%u' from TLS", entry.type()));
    }
    if ( ! is.empty()) {
        throw document::DeserializeException
            (make_string("Too much data in packet entry (type id '%u', %ld bytes)",
                         entry.type(), is.size()));
    }
}


ReplayPacketDispatcher::~ReplayPacketDispatcher() = default;


void
ReplayPacketDispatcher::store(const FeedOperation &)
{
}

} // namespace proton