aboutsummaryrefslogtreecommitdiffstats
path: root/storage/src/vespa/storage/persistence/provider_error_wrapper.h
blob: a4748d9479bc728958017606d1cd60b60a1eb785 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
/**
 * \class storage::ProviderErrorWrapper
 *
 * \brief Utility class which forwards all calls to the real persistence
 * provider implementation, transparently checking the result of each
 * operation to see if the result is FATAL_ERROR or RESOURCE_EXHAUSTED.
 *
 * If FATAL_ERROR or RESOURCE_EXHAUSTED is observed, the wrapper will invoke any
 * and all resource exhaustion listeners synchronously, before returning the response
 * to the caller as usual.
 */
#pragma once

#include <vespa/persistence/spi/persistenceprovider.h>
#include <mutex>

namespace storage {

class ServiceLayerComponent;

class ProviderErrorListener {
public:
    virtual ~ProviderErrorListener() = default;
    virtual void on_fatal_error(vespalib::stringref message) {
        (void)message;
    }
    virtual void on_resource_exhaustion_error(vespalib::stringref message) {
        (void)message;
    }
};

class ProviderErrorWrapper : public spi::PersistenceProvider, public spi::ResultHandler {
public:
    explicit ProviderErrorWrapper(spi::PersistenceProvider& impl) noexcept;
    ~ProviderErrorWrapper() override;

    spi::Result initialize() override;
    spi::BucketIdListResult listBuckets(BucketSpace bucketSpace) const override;
    spi::Result setClusterState(BucketSpace bucketSpace, const spi::ClusterState&)  override;

    spi::BucketInfoResult getBucketInfo(const spi::Bucket&) const override;
    spi::GetResult get(const spi::Bucket&, const document::FieldSet&, const document::DocumentId&, spi::Context&) const override;
    spi::CreateIteratorResult
    createIterator(const spi::Bucket &bucket, FieldSetSP, const spi::Selection &, spi::IncludedVersions versions,
                   spi::Context &context) override;
    spi::IterateResult iterate(spi::IteratorId, uint64_t maxByteSize) const override;
    spi::Result destroyIterator(spi::IteratorId) override;
    spi::BucketIdListResult getModifiedBuckets(BucketSpace bucketSpace) const override;
    spi::Result split(const spi::Bucket& source, const spi::Bucket& target1, const spi::Bucket& target2) override;
    spi::Result join(const spi::Bucket& source1, const spi::Bucket& source2, const spi::Bucket& target) override;
    std::unique_ptr<vespalib::IDestructorCallback> register_resource_usage_listener(spi::IResourceUsageListener& listener) override;
    spi::Result removeEntry(const spi::Bucket&, spi::Timestamp) override;

    void register_error_listener(std::shared_ptr<ProviderErrorListener> listener);

    void putAsync(const spi::Bucket &, spi::Timestamp, spi::DocumentSP, spi::OperationComplete::UP) override;
    void removeAsync(const spi::Bucket&, std::vector<spi::IdAndTimestamp>, spi::OperationComplete::UP) override;
    void removeIfFoundAsync(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::OperationComplete::UP) override;
    void updateAsync(const spi::Bucket &, spi::Timestamp, spi::DocumentUpdateSP, spi::OperationComplete::UP) override;
    void setActiveStateAsync(const spi::Bucket& b, spi::BucketInfo::ActiveState newState, spi::OperationComplete::UP onComplete) override;
    void createBucketAsync(const spi::Bucket&, spi::OperationComplete::UP) noexcept override;
    void deleteBucketAsync(const spi::Bucket&, spi::OperationComplete::UP) noexcept override;
    std::unique_ptr<vespalib::IDestructorCallback> register_executor(std::shared_ptr<spi::BucketExecutor> executor) override;
private:
    template <typename ResultType>
    ResultType checkResult(ResultType&& result) const;
    void handle(const spi::Result &) const override;

    void trigger_shutdown_listeners(vespalib::stringref reason) const;
    void trigger_resource_exhaustion_listeners(vespalib::stringref reason) const;

    spi::PersistenceProvider& _impl;
    std::vector<std::shared_ptr<ProviderErrorListener>> _listeners;
    mutable std::mutex _mutex;
};

} // storage