summaryrefslogtreecommitdiffstats
path: root/storage/src/vespa/storage/persistence/processallhandler.cpp
blob: 9cf570ef7a417faa00ab8bee60e0a95315459147 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.

#include "processallhandler.h"
#include "bucketprocessor.h"
#include "persistenceutil.h"
#include <vespa/persistence/spi/persistenceprovider.h>
#include <vespa/vespalib/util/stringfmt.h>
#include <vespa/vespalib/stllike/hash_map.hpp>

#include <vespa/log/log.h>
LOG_SETUP(".persistence.processall");

namespace storage {

ProcessAllHandler::ProcessAllHandler(const PersistenceUtil& env, spi::PersistenceProvider& spi)
    : _env(env),
      _spi(spi)
{
}

namespace {

class UnrevertableRemoveEntryProcessor : public BucketProcessor::EntryProcessor {
public:
    spi::PersistenceProvider& _provider;
    const spi::Bucket& _bucket;
    spi::Context& _context;
    uint32_t _n_removed;

    UnrevertableRemoveEntryProcessor(
            spi::PersistenceProvider& provider,
            const spi::Bucket& bucket,
            spi::Context& context)
        : _provider(provider),
          _bucket(bucket),
          _context(context),
          _n_removed(0)
    {}

    void process(spi::DocEntry& entry) override {
        spi::RemoveResult removeResult = _provider.remove(_bucket, entry.getTimestamp(), *entry.getDocumentId(),_context);
        if (removeResult.getErrorCode() != spi::Result::ErrorType::NONE) {
            throw std::runtime_error(vespalib::make_string("Failed to do remove for removelocation: %s", removeResult.getErrorMessage().c_str()));
        }
        ++_n_removed;
    }
};

class StatEntryProcessor : public BucketProcessor::EntryProcessor {
public:
    std::ostream& ost;
    explicit StatEntryProcessor(std::ostream& o)
        : ost(o) {};

    void process(spi::DocEntry& e) override {
        ost << "  Timestamp: " << e.getTimestamp() << ", ";
        if (e.getDocument() != nullptr) {
            ost << "Doc(" << e.getDocument()->getId() << ")"
                << ", " << e.getDocument()->getId().getGlobalId().toString()
                << ", size: " << e.getPersistedDocumentSize();
        } else if (e.getDocumentId() != nullptr) {
            ost << *e.getDocumentId()
                << ", " << e.getDocumentId()->getGlobalId().toString();
        } else {
            ost << "metadata only";
        }
        if (e.isRemove()) {
            ost << " (remove)";
        }
        ost << "\n";
    }
};

}

MessageTracker::UP
ProcessAllHandler::handleRemoveLocation(api::RemoveLocationCommand& cmd, MessageTracker::UP tracker) const
{
    tracker->setMetric(_env._metrics.removeLocation);

    LOG(debug, "RemoveLocation(%s): using selection '%s'",
        cmd.getBucketId().toString().c_str(),
        cmd.getDocumentSelection().c_str());

    spi::Bucket bucket(cmd.getBucket());
    UnrevertableRemoveEntryProcessor processor(_spi, bucket, tracker->context());
    BucketProcessor::iterateAll(_spi, bucket, cmd.getDocumentSelection(),
                                processor, spi::NEWEST_DOCUMENT_ONLY,tracker->context());

    tracker->setReply(std::make_shared<api::RemoveLocationReply>(cmd, processor._n_removed));

    return tracker;
}

MessageTracker::UP
ProcessAllHandler::handleStatBucket(api::StatBucketCommand& cmd, MessageTracker::UP tracker) const
{
    tracker->setMetric(_env._metrics.statBucket);
    std::ostringstream ost;
    ost << "Persistence bucket " << cmd.getBucketId() << "\n";

    spi::Bucket bucket(cmd.getBucket());
    StatEntryProcessor processor(ost);
    BucketProcessor::iterateAll(_spi, bucket, cmd.getDocumentSelection(),
                                processor, spi::ALL_VERSIONS,tracker->context());

    tracker->setReply(std::make_shared<api::StatBucketReply>(cmd, ost.str()));
    return tracker;
}

} // storage