// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once #include "sketch.h" #include #include #include #include #include #include #include namespace search { // How many elements are required before we use a normal sketch representation. const uint32_t SPARSE_SKETCH_LIMIT = 255; /** * Decorator to SparseSketch handling the switch to NormalSketch * representation. It holds a reference to HyperLogLog::_sketch, which * is a unique pointer initially pointing to this class. By resetting * that pointer to a new sketch class, this class is deleted. By * having the logic for exchanging the sketch class here, we remove it * along with the sparse representation once the switch is made. */ template class ExchangerSketch : public SparseSketch { typename Sketch::UP &_sketch_ptr; int aggregate(HashT hash) override { if (this->getSize() > SPARSE_SKETCH_LIMIT) { NormalSketch *normal_sketch = new NormalSketch; normal_sketch->merge(*this); _sketch_ptr.reset(normal_sketch); // deletes this return normal_sketch->aggregate(hash); } return SparseSketch::aggregate(hash); } public: ExchangerSketch(typename Sketch::UP &sketch_ptr) : _sketch_ptr(sketch_ptr) {} }; /** * HyperLogLog is used to estimate the number of unique hashes seen. */ template class HyperLogLog { typename Sketch::UP _sketch; public: using hash_type = HashT; enum { bucketBits = BucketBits }; // Initialize ExchangerSketch with a reference to _sketch. HyperLogLog() : _sketch(new ExchangerSketch(_sketch)) {} HyperLogLog(const HyperLogLog &other) : HyperLogLog() { merge(other); } HyperLogLog &operator=( const HyperLogLog &other) { _sketch.reset(new ExchangerSketch(_sketch)); merge(other); return *this; } // Aggregates a hash value into the sketch. int aggregate(HashT hash) { return _sketch->aggregate(hash); } void merge(const HyperLogLog &other); void serialize(vespalib::Serializer &os) const; void deserialize(vespalib::Deserializer &is); const Sketch &getSketch() const { return *_sketch; } }; template void HyperLogLog:: merge(const HyperLogLog &other) { using Sparse = SparseSketch; using Normal = NormalSketch; if (_sketch->getClassId() == Sparse::classId) { Sparse &sparse = static_cast(*_sketch); if (other.getSketch().getClassId() == Sparse::classId) { const Sparse &other_sparse = static_cast(other.getSketch()); sparse.merge(other_sparse); if (sparse.getSize() > SPARSE_SKETCH_LIMIT) { typename Normal::UP new_sketch(new Normal); new_sketch->merge(sparse); _sketch.reset(new_sketch.release()); } } else { // other is NormalSketch const Normal &other_normal = static_cast(other.getSketch()); typename Normal::UP new_sketch(new Normal(other_normal)); new_sketch->merge(sparse); _sketch.reset(new_sketch.release()); } } else { // NormalSketch Normal &normal = static_cast(*_sketch); if (other.getSketch().getClassId() == Sparse::classId) { const Sparse &other_sparse = static_cast(other.getSketch()); normal.merge(other_sparse); } else { // other is NormalSketch const Normal &other_normal = static_cast(other.getSketch()); normal.merge(other_normal); } } } template void HyperLogLog:: serialize(vespalib::Serializer &os) const { os << _sketch->getClassId(); _sketch->serialize(os); } template void HyperLogLog:: deserialize(vespalib::Deserializer &is) { uint32_t type; is >> type; if (type == SparseSketch::classId) { _sketch.reset(new ExchangerSketch(_sketch)); _sketch->deserialize(is); } else if (type == NormalSketch::classId) { _sketch.reset(new NormalSketch); _sketch->deserialize(is); } } } // namespace search