summaryrefslogtreecommitdiffstats
path: root/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.h
diff options
context:
space:
mode:
Diffstat (limited to 'documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.h')
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.h45
1 files changed, 26 insertions, 19 deletions
diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.h b/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.h
index e49ad378b90..7a3675c3001 100644
--- a/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.h
+++ b/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.h
@@ -6,55 +6,62 @@
#include <vespa/vdslib/distribution/distribution.h>
#include <vespa/document/bucket/bucketidfactory.h>
#include <vespa/messagebus/routing/hop.h>
+#include <shared_mutex>
namespace config {
class ICallback;
class ConfigFetcher;
}
-namespace storage {
-namespace lib {
+namespace storage::lib {
class Distribution;
class ClusterState;
}
-}
namespace documentapi {
class ContentPolicy : public ExternSlobrokPolicy
{
private:
- document::BucketIdFactory _bucketIdFactory;
- std::unique_ptr<storage::lib::ClusterState> _state;
- string _clusterName;
- string _clusterConfigId;
- std::unique_ptr<config::ICallback> _callBack;
- std::unique_ptr<config::ConfigFetcher> _configFetcher;
- std::unique_ptr<storage::lib::Distribution> _distribution;
- std::unique_ptr<storage::lib::Distribution> _nextDistribution;
+ document::BucketIdFactory _bucketIdFactory;
+ mutable std::shared_mutex _rw_lock;
+ std::shared_ptr<const storage::lib::ClusterState> _state;
+ string _clusterName;
+ string _clusterConfigId;
+ std::unique_ptr<config::ICallback> _callBack;
+ std::unique_ptr<config::ConfigFetcher> _configFetcher;
+ std::shared_ptr<const storage::lib::Distribution> _distribution;
+
+ using StateSnapshot = std::pair<std::shared_ptr<const storage::lib::ClusterState>,
+ std::shared_ptr<const storage::lib::Distribution>>;
+
+ // Acquires _lock
+ [[nodiscard]] StateSnapshot internal_state_snapshot();
mbus::Hop getRecipient(mbus::RoutingContext& context, int distributor);
+ // Acquires _lock
+ void updateStateFromReply(WrongDistributionReply& reply);
+ // Acquires _lock
+ void reset_state();
public:
- ContentPolicy(const string& param);
- ~ContentPolicy();
+ explicit ContentPolicy(const string& param);
+ ~ContentPolicy() override;
void doSelect(mbus::RoutingContext &context) override;
void merge(mbus::RoutingContext &context) override;
- void updateStateFromReply(WrongDistributionReply& reply);
-
/**
* @return a pointer to the system state registered with this policy. If
- * we haven't received a system state yet, returns NULL.
+ * we haven't received a system state yet, returns nullptr.
*/
- const storage::lib::ClusterState* getSystemState() const { return _state.get(); }
+ std::shared_ptr<const storage::lib::ClusterState> getSystemState() const noexcept;
virtual void configure(std::unique_ptr<storage::lib::Distribution::DistributionConfig> config);
string init() override;
private:
- string createConfigId(const string & clusterName) const;
- string createPattern(const string & clusterName, int distributor) const;
+ static string createConfigId(const string & clusterName);
+ static string createPattern(const string & clusterName, int distributor);
};
}