aboutsummaryrefslogtreecommitdiffstats
path: root/documentapi/src/vespa/documentapi/messagebus/routablefactories60.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'documentapi/src/vespa/documentapi/messagebus/routablefactories60.cpp')
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/routablefactories60.cpp932
1 files changed, 920 insertions, 12 deletions
diff --git a/documentapi/src/vespa/documentapi/messagebus/routablefactories60.cpp b/documentapi/src/vespa/documentapi/messagebus/routablefactories60.cpp
index 5823cc6720e..c34291c634f 100644
--- a/documentapi/src/vespa/documentapi/messagebus/routablefactories60.cpp
+++ b/documentapi/src/vespa/documentapi/messagebus/routablefactories60.cpp
@@ -1,10 +1,140 @@
-// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+// @author Vegard Sjonfjell
#include "routablefactories60.h"
+#include <vespa/document/bucket/bucketidfactory.h>
+#include <vespa/document/bucket/fixed_bucket_spaces.h>
+#include <vespa/document/select/parser.h>
+#include <vespa/document/update/documentupdate.h>
+#include <vespa/documentapi/documentapi.h>
+#include <vespa/documentapi/loadtypes/loadtypeset.h>
+#include <vespa/vespalib/objects/nbostream.h>
+
+using document::FixedBucketSpaces;
+using vespalib::nbostream;
+using std::make_shared;
+using std::make_unique;
namespace documentapi {
-// TODO dedupe
+bool
+RoutableFactories60::DocumentMessageFactory::encode(const mbus::Routable &obj, vespalib::GrowableByteBuffer &out) const
+{
+ const DocumentMessage &msg = static_cast<const DocumentMessage&>(obj);
+ out.putByte(msg.getPriority());
+ out.putInt(msg.getLoadType().getId());
+ return doEncode(msg, out);
+}
+
+mbus::Routable::UP
+RoutableFactories60::DocumentMessageFactory::decode(document::ByteBuffer &in, const LoadTypeSet& loadTypes) const
+{
+ uint8_t pri;
+ in.getByte(pri);
+ uint32_t loadClass = decodeInt(in);
+
+ DocumentMessage::UP msg = doDecode(in);
+ if (msg) {
+ msg->setPriority((Priority::Value)pri);
+ msg->setLoadType(loadTypes[loadClass]);
+ }
+
+ return mbus::Routable::UP(msg.release());
+}
+
+bool
+RoutableFactories60::DocumentReplyFactory::encode(const mbus::Routable &obj, vespalib::GrowableByteBuffer &out) const
+{
+ const DocumentReply &msg = static_cast<const DocumentReply&>(obj);
+ out.putByte(msg.getPriority());
+ return doEncode(msg, out);
+}
+
+mbus::Routable::UP
+RoutableFactories60::DocumentReplyFactory::decode(document::ByteBuffer &in, const LoadTypeSet&) const
+{
+ uint8_t pri;
+ in.getByte(pri);
+ DocumentReply::UP reply = doDecode(in);
+ if (reply) {
+ reply->setPriority((Priority::Value)pri);
+ }
+ return mbus::Routable::UP(reply.release());
+}
+
+////////////////////////////////////////////////////////////////////////////////
+//
+// Factories
+//
+////////////////////////////////////////////////////////////////////////////////
+
+DocumentMessage::UP
+RoutableFactories60::CreateVisitorMessageFactory::doDecode(document::ByteBuffer &buf) const
+{
+ auto msg = std::make_unique<CreateVisitorMessage>();
+
+ msg->setLibraryName(decodeString(buf));
+ msg->setInstanceId(decodeString(buf));
+ msg->setControlDestination(decodeString(buf));
+ msg->setDataDestination(decodeString(buf));
+ msg->setDocumentSelection(decodeString(buf));
+ msg->setMaximumPendingReplyCount(decodeInt(buf));
+
+ int32_t len = decodeInt(buf);
+ msg->getBuckets().reserve(len);
+ for (int32_t i = 0; i < len; i++) {
+ int64_t val;
+ buf.getLong(val); // NOT using getLongNetwork
+ msg->getBuckets().push_back(document::BucketId(val));
+ }
+
+ msg->setFromTimestamp(decodeLong(buf));
+ msg->setToTimestamp(decodeLong(buf));
+ msg->setVisitRemoves(decodeBoolean(buf));
+ msg->setFieldSet(decodeString(buf));
+ msg->setVisitInconsistentBuckets(decodeBoolean(buf));
+ msg->getParameters().deserialize(_repo, buf);
+ msg->setVisitorDispatcherVersion(50);
+ msg->setVisitorOrdering((document::OrderingSpecification::Order)decodeInt(buf));
+ msg->setMaxBucketsPerVisitor(decodeInt(buf));
+ msg->setBucketSpace(decodeBucketSpace(buf));
+
+ return msg;
+}
+
+bool
+RoutableFactories60::CreateVisitorMessageFactory::doEncode(const DocumentMessage &obj, vespalib::GrowableByteBuffer &buf) const
+{
+ const CreateVisitorMessage &msg = static_cast<const CreateVisitorMessage&>(obj);
+
+ buf.putString(msg.getLibraryName());
+ buf.putString(msg.getInstanceId());
+ buf.putString(msg.getControlDestination());
+ buf.putString(msg.getDataDestination());
+ buf.putString(msg.getDocumentSelection());
+ buf.putInt(msg.getMaximumPendingReplyCount());
+ buf.putInt(msg.getBuckets().size());
+
+ for (const auto & bucketId : msg.getBuckets()) {
+ uint64_t val = bucketId.getRawId();
+ buf.putBytes((const char*)&val, 8);
+ }
+
+ buf.putLong(msg.getFromTimestamp());
+ buf.putLong(msg.getToTimestamp());
+ buf.putBoolean(msg.visitRemoves());
+ buf.putString(msg.getFieldSet());
+ buf.putBoolean(msg.visitInconsistentBuckets());
+
+ int len = msg.getParameters().getSerializedSize();
+ char *tmp = buf.allocate(len);
+ document::ByteBuffer dbuf(tmp, len);
+ msg.getParameters().serialize(dbuf);
+
+ buf.putInt(msg.getVisitorOrdering());
+ buf.putInt(msg.getMaxBucketsPerVisitor());
+ return encodeBucketSpace(msg.getBucketSpace(), buf);
+}
bool RoutableFactories60::CreateVisitorMessageFactory::encodeBucketSpace(
vespalib::stringref bucketSpace,
@@ -13,32 +143,810 @@ bool RoutableFactories60::CreateVisitorMessageFactory::encodeBucketSpace(
return true;
}
-string RoutableFactories60::CreateVisitorMessageFactory::decodeBucketSpace(document::ByteBuffer& buf) const {
+string RoutableFactories60::CreateVisitorMessageFactory::decodeBucketSpace(document::ByteBuffer &buf) const {
return doDecodeBucketSpace(buf);
}
-bool RoutableFactories60::StatBucketMessageFactory::encodeBucketSpace(
- vespalib::stringref bucketSpace,
- vespalib::GrowableByteBuffer& buf) const {
+DocumentMessage::UP
+RoutableFactories60::DestroyVisitorMessageFactory::doDecode(document::ByteBuffer &buf) const
+{
+ auto msg = std::make_unique<DestroyVisitorMessage>();
+ msg->setInstanceId(decodeString(buf));
+ return msg;
+}
+
+bool
+RoutableFactories60::DestroyVisitorMessageFactory::doEncode(const DocumentMessage &obj, vespalib::GrowableByteBuffer &buf) const
+{
+ const DestroyVisitorMessage &msg = static_cast<const DestroyVisitorMessage&>(obj);
+ buf.putString(msg.getInstanceId());
+ return true;
+}
+
+DocumentReply::UP
+RoutableFactories60::CreateVisitorReplyFactory::doDecode(document::ByteBuffer &buf) const
+{
+ auto reply = std::make_unique<CreateVisitorReply>(DocumentProtocol::REPLY_CREATEVISITOR);
+ reply->setLastBucket(document::BucketId((uint64_t)decodeLong(buf)));
+ vdslib::VisitorStatistics vs;
+ vs.setBucketsVisited(decodeInt(buf));
+ vs.setDocumentsVisited(decodeLong(buf));
+ vs.setBytesVisited(decodeLong(buf));
+ vs.setDocumentsReturned(decodeLong(buf));
+ vs.setBytesReturned(decodeLong(buf));
+ vs.setSecondPassDocumentsReturned(decodeLong(buf));
+ vs.setSecondPassBytesReturned(decodeLong(buf));
+ reply->setVisitorStatistics(vs);
+
+ return reply;
+}
+
+bool
+RoutableFactories60::CreateVisitorReplyFactory::doEncode(const DocumentReply &obj, vespalib::GrowableByteBuffer &buf) const
+{
+ const CreateVisitorReply &reply = static_cast<const CreateVisitorReply&>(obj);
+ buf.putLong(reply.getLastBucket().getRawId());
+ buf.putInt(reply.getVisitorStatistics().getBucketsVisited());
+ buf.putLong(reply.getVisitorStatistics().getDocumentsVisited());
+ buf.putLong(reply.getVisitorStatistics().getBytesVisited());
+ buf.putLong(reply.getVisitorStatistics().getDocumentsReturned());
+ buf.putLong(reply.getVisitorStatistics().getBytesReturned());
+ buf.putLong(reply.getVisitorStatistics().getSecondPassDocumentsReturned());
+ buf.putLong(reply.getVisitorStatistics().getSecondPassBytesReturned());
+ return true;
+}
+
+DocumentReply::UP
+RoutableFactories60::DestroyVisitorReplyFactory::doDecode(document::ByteBuffer &) const
+{
+ return std::make_unique<VisitorReply>(DocumentProtocol::REPLY_DESTROYVISITOR);
+}
+
+bool
+RoutableFactories60::DestroyVisitorReplyFactory::doEncode(const DocumentReply &, vespalib::GrowableByteBuffer &) const
+{
+ return true;
+}
+
+DocumentReply::UP
+RoutableFactories60::DocumentIgnoredReplyFactory::doDecode(document::ByteBuffer& buf) const
+{
+ (void) buf;
+ return DocumentReply::UP(new DocumentIgnoredReply());
+}
+
+bool
+RoutableFactories60::DocumentIgnoredReplyFactory::doEncode(
+ const DocumentReply& obj,
+ vespalib::GrowableByteBuffer& buf) const
+{
+ (void) obj;
+ (void) buf;
+ return true;
+}
+
+DocumentMessage::UP
+RoutableFactories60::DocumentListMessageFactory::doDecode(document::ByteBuffer &buf) const
+{
+ auto msg = std::make_unique<DocumentListMessage>();
+ msg->setBucketId(document::BucketId(decodeLong(buf)));
+
+ int32_t len = decodeInt(buf);
+ for (int32_t i = 0; i < len; i++) {
+ DocumentListMessage::Entry entry(_repo, buf);
+ msg->getDocuments().push_back(entry);
+ }
+
+ return msg;
+}
+
+bool
+RoutableFactories60::DocumentListMessageFactory::doEncode(const DocumentMessage &obj, vespalib::GrowableByteBuffer &buf) const
+{
+ const DocumentListMessage &msg = static_cast<const DocumentListMessage&>(obj);
+
+ buf.putLong(msg.getBucketId().getRawId());
+ buf.putInt(msg.getDocuments().size());
+ for (const auto & document : msg.getDocuments()) {
+ int len = document.getSerializedSize();
+ char *tmp = buf.allocate(len);
+ document::ByteBuffer dbuf(tmp, len);
+ document.serialize(dbuf);
+ }
+
+ return true;
+}
+
+DocumentReply::UP
+RoutableFactories60::DocumentListReplyFactory::doDecode(document::ByteBuffer &) const
+{
+ return std::make_unique<VisitorReply>(DocumentProtocol::REPLY_DOCUMENTLIST);
+}
+
+bool
+RoutableFactories60::DocumentListReplyFactory::doEncode(const DocumentReply &, vespalib::GrowableByteBuffer &) const
+{
+ return true;
+}
+
+DocumentMessage::UP
+RoutableFactories60::DocumentSummaryMessageFactory::doDecode(document::ByteBuffer &buf) const
+{
+ auto msg = std::make_unique<DocumentSummaryMessage>();
+
+ msg->deserialize(buf);
+
+ return msg;
+}
+
+bool
+RoutableFactories60::DocumentSummaryMessageFactory::doEncode(const DocumentMessage &obj, vespalib::GrowableByteBuffer &buf) const
+{
+ const DocumentSummaryMessage &msg = static_cast<const DocumentSummaryMessage&>(obj);
+
+ int32_t len = msg.getSerializedSize();
+ char *tmp = buf.allocate(len);
+ document::ByteBuffer dbuf(tmp, len);
+ msg.serialize(dbuf);
+
+ return true;
+}
+
+DocumentReply::UP
+RoutableFactories60::DocumentSummaryReplyFactory::doDecode(document::ByteBuffer &) const
+{
+ return std::make_unique<VisitorReply>(DocumentProtocol::REPLY_DOCUMENTSUMMARY);
+}
+
+bool
+RoutableFactories60::DocumentSummaryReplyFactory::doEncode(const DocumentReply &, vespalib::GrowableByteBuffer &) const
+{
+ return true;
+}
+
+DocumentMessage::UP
+RoutableFactories60::EmptyBucketsMessageFactory::doDecode(document::ByteBuffer &buf) const
+{
+ auto msg = std::make_unique<EmptyBucketsMessage>();
+
+ int32_t len = decodeInt(buf);
+ std::vector<document::BucketId> buckets(len);
+ for (int32_t i = 0; i < len; ++i) {
+ buckets[i] = document::BucketId(decodeLong(buf));
+ }
+ msg->getBucketIds().swap(buckets);
+
+ return msg;
+}
+
+bool
+RoutableFactories60::EmptyBucketsMessageFactory::doEncode(const DocumentMessage &obj, vespalib::GrowableByteBuffer &buf) const
+{
+ const EmptyBucketsMessage &msg = static_cast<const EmptyBucketsMessage&>(obj);
+
+ buf.putInt(msg.getBucketIds().size());
+ for (const auto & bucketId : msg.getBucketIds()) {
+ buf.putLong(bucketId.getRawId());
+ }
+
+ return true;
+}
+
+DocumentReply::UP
+RoutableFactories60::EmptyBucketsReplyFactory::doDecode(document::ByteBuffer &) const
+{
+ return std::make_unique<VisitorReply>(DocumentProtocol::REPLY_EMPTYBUCKETS);
+}
+
+bool
+RoutableFactories60::EmptyBucketsReplyFactory::doEncode(const DocumentReply &, vespalib::GrowableByteBuffer &) const
+{
+ return true;
+}
+
+bool RoutableFactories60::GetBucketListMessageFactory::encodeBucketSpace(vespalib::stringref bucketSpace,
+ vespalib::GrowableByteBuffer &buf) const
+{
doEncodeBucketSpace(bucketSpace, buf);
return true;
}
-string RoutableFactories60::StatBucketMessageFactory::decodeBucketSpace(document::ByteBuffer& buf) const {
+string RoutableFactories60::GetBucketListMessageFactory::decodeBucketSpace(document::ByteBuffer &buf) const {
return doDecodeBucketSpace(buf);
}
-bool RoutableFactories60::GetBucketListMessageFactory::encodeBucketSpace(
- vespalib::stringref bucketSpace,
- vespalib::GrowableByteBuffer& buf) const {
+DocumentMessage::UP
+RoutableFactories60::GetBucketListMessageFactory::doDecode(document::ByteBuffer &buf) const
+{
+ document::BucketId bucketId(decodeLong(buf));
+ auto msg = std::make_unique<GetBucketListMessage>(bucketId);
+ msg->setBucketSpace(decodeBucketSpace(buf));
+ return msg;
+}
+
+bool
+RoutableFactories60::GetBucketListMessageFactory::doEncode(const DocumentMessage &obj, vespalib::GrowableByteBuffer &buf) const
+{
+ const GetBucketListMessage &msg = static_cast<const GetBucketListMessage&>(obj);
+ buf.putLong(msg.getBucketId().getRawId());
+ return encodeBucketSpace(msg.getBucketSpace(), buf);
+}
+
+DocumentReply::UP
+RoutableFactories60::GetBucketListReplyFactory::doDecode(document::ByteBuffer &buf) const
+{
+ auto reply = std::make_unique<GetBucketListReply>();
+
+ int32_t len = decodeInt(buf);
+ reply->getBuckets().reserve(len);
+ for (int32_t i = 0; i < len; i++) {
+ GetBucketListReply::BucketInfo info;
+ info._bucket = document::BucketId((uint64_t)decodeLong(buf));
+ info._bucketInformation = decodeString(buf);
+ reply->getBuckets().push_back(info);
+ }
+
+ return reply;
+}
+
+bool
+RoutableFactories60::GetBucketListReplyFactory::doEncode(const DocumentReply &obj, vespalib::GrowableByteBuffer &buf) const
+{
+ const GetBucketListReply &reply = static_cast<const GetBucketListReply&>(obj);
+
+ const std::vector<GetBucketListReply::BucketInfo> &buckets = reply.getBuckets();
+ buf.putInt(buckets.size());
+ for (const auto & bucketInfo : buckets) {
+ buf.putLong(bucketInfo._bucket.getRawId());
+ buf.putString(bucketInfo._bucketInformation);
+ }
+
+ return true;
+}
+
+DocumentMessage::UP
+RoutableFactories60::GetBucketStateMessageFactory::doDecode(document::ByteBuffer &buf) const
+{
+ auto msg = std::make_unique<GetBucketStateMessage>();
+
+ msg->setBucketId(document::BucketId((uint64_t)decodeLong(buf)));
+
+ return msg;
+}
+
+bool
+RoutableFactories60::GetBucketStateMessageFactory::doEncode(const DocumentMessage &obj, vespalib::GrowableByteBuffer &buf) const
+{
+ const GetBucketStateMessage &msg = static_cast<const GetBucketStateMessage&>(obj);
+ buf.putLong(msg.getBucketId().getRawId());
+ return true;
+}
+
+DocumentReply::UP
+RoutableFactories60::GetBucketStateReplyFactory::doDecode(document::ByteBuffer &buf) const
+{
+ auto reply = std::make_unique<GetBucketStateReply>();
+
+ int32_t len = decodeInt(buf);
+ reply->getBucketState().reserve(len);
+ for (int32_t i = 0; i < len; i++) {
+ reply->getBucketState().emplace_back(buf);
+ }
+
+ return reply;
+}
+
+bool
+RoutableFactories60::GetBucketStateReplyFactory::doEncode(const DocumentReply &obj, vespalib::GrowableByteBuffer &buf) const
+{
+ const GetBucketStateReply &reply = static_cast<const GetBucketStateReply&>(obj);
+
+ buf.putInt(reply.getBucketState().size());
+ for (const auto & state : reply.getBucketState()) {
+ state.serialize(buf);
+ }
+
+ return true;
+}
+
+DocumentMessage::UP
+RoutableFactories60::GetDocumentMessageFactory::doDecode(document::ByteBuffer &buf) const
+{
+ return DocumentMessage::UP(
+ new GetDocumentMessage(decodeDocumentId(buf),
+ decodeString(buf)));
+}
+
+bool
+RoutableFactories60::GetDocumentMessageFactory::doEncode(const DocumentMessage &obj,
+ vespalib::GrowableByteBuffer &buf) const
+{
+ const GetDocumentMessage &msg = static_cast<const GetDocumentMessage&>(obj);
+
+ encodeDocumentId(msg.getDocumentId(), buf);
+ buf.putString(msg.getFieldSet());
+ return true;
+}
+
+DocumentReply::UP
+RoutableFactories60::GetDocumentReplyFactory::doDecode(document::ByteBuffer &buf) const
+{
+ auto reply = std::make_unique<GetDocumentReply>();
+
+ bool hasDocument = decodeBoolean(buf);
+ document::Document * document = nullptr;
+ if (hasDocument) {
+ auto doc = std::make_shared<document::Document>(_repo, buf);
+ document = doc.get();
+ reply->setDocument(std::move(doc));
+ }
+ int64_t lastModified = decodeLong(buf);
+ reply->setLastModified(lastModified);
+ if (hasDocument) {
+ document->setLastModified(lastModified);
+ }
+
+ return reply;
+}
+
+bool
+RoutableFactories60::GetDocumentReplyFactory::doEncode(const DocumentReply &obj, vespalib::GrowableByteBuffer &buf) const
+{
+ const GetDocumentReply &reply = static_cast<const GetDocumentReply&>(obj);
+
+ buf.putByte(reply.hasDocument() ? 1 : 0);
+ if (reply.hasDocument()) {
+ nbostream stream;
+ reply.getDocument().serialize(stream);
+ buf.putBytes(stream.peek(), stream.size());
+ }
+ buf.putLong(reply.getLastModified());
+
+ return true;
+}
+
+DocumentMessage::UP
+RoutableFactories60::MapVisitorMessageFactory::doDecode(document::ByteBuffer &buf) const
+{
+ auto msg = std::make_unique<MapVisitorMessage>();
+ msg->getData().deserialize(_repo, buf);
+ return msg;
+}
+
+bool
+RoutableFactories60::MapVisitorMessageFactory::doEncode(const DocumentMessage &obj, vespalib::GrowableByteBuffer &buf) const
+{
+ const MapVisitorMessage &msg = static_cast<const MapVisitorMessage&>(obj);
+
+ int32_t len = msg.getData().getSerializedSize();
+ char *tmp = buf.allocate(len);
+ document::ByteBuffer dbuf(tmp, len);
+ msg.getData().serialize(dbuf);
+
+ return true;
+}
+
+DocumentReply::UP
+RoutableFactories60::MapVisitorReplyFactory::doDecode(document::ByteBuffer &) const
+{
+ return std::make_unique<VisitorReply>(DocumentProtocol::REPLY_MAPVISITOR);
+}
+
+bool
+RoutableFactories60::MapVisitorReplyFactory::doEncode(const DocumentReply &, vespalib::GrowableByteBuffer &) const
+{
+ return true;
+}
+
+void
+RoutableFactories60::PutDocumentMessageFactory::decodeInto(PutDocumentMessage & msg, document::ByteBuffer & buf) const {
+ msg.setDocument(make_shared<document::Document>(_repo, buf));
+ msg.setTimestamp(static_cast<uint64_t>(decodeLong(buf)));
+ decodeTasCondition(msg, buf);
+}
+
+bool
+RoutableFactories60::PutDocumentMessageFactory::doEncode(const DocumentMessage &obj, vespalib::GrowableByteBuffer &buf) const
+{
+ auto & msg = static_cast<const PutDocumentMessage &>(obj);
+ nbostream stream;
+
+ msg.getDocument().serialize(stream);
+ buf.putBytes(stream.peek(), stream.size());
+ buf.putLong(static_cast<int64_t>(msg.getTimestamp()));
+ encodeTasCondition(buf, msg);
+
+ return true;
+}
+
+DocumentReply::UP
+RoutableFactories60::PutDocumentReplyFactory::doDecode(document::ByteBuffer &buf) const
+{
+ auto reply = make_unique<WriteDocumentReply>(DocumentProtocol::REPLY_PUTDOCUMENT);
+ reply->setHighestModificationTimestamp(decodeLong(buf));
+
+ // Doing an explicit move here to force converting result to an rvalue.
+ // This is done automatically in GCC >= 5.
+ return std::move(reply);
+}
+
+bool
+RoutableFactories60::PutDocumentReplyFactory::doEncode(const DocumentReply &obj, vespalib::GrowableByteBuffer &buf) const
+{
+ const WriteDocumentReply& reply = (const WriteDocumentReply&)obj;
+ buf.putLong(reply.getHighestModificationTimestamp());
+ return true;
+}
+
+void
+RoutableFactories60::RemoveDocumentMessageFactory::decodeInto(RemoveDocumentMessage & msg, document::ByteBuffer & buf) const {
+ msg.setDocumentId(decodeDocumentId(buf));
+ decodeTasCondition(msg, buf);
+}
+
+bool
+RoutableFactories60::RemoveDocumentMessageFactory::doEncode(const DocumentMessage &obj, vespalib::GrowableByteBuffer &buf) const
+{
+ const RemoveDocumentMessage &msg = static_cast<const RemoveDocumentMessage&>(obj);
+ encodeDocumentId(msg.getDocumentId(), buf);
+ encodeTasCondition(buf, msg);
+ return true;
+}
+
+DocumentReply::UP
+RoutableFactories60::RemoveDocumentReplyFactory::doDecode(document::ByteBuffer &buf) const
+{
+ auto reply = std::make_unique<RemoveDocumentReply>();
+ reply->setWasFound(decodeBoolean(buf));
+ reply->setHighestModificationTimestamp(decodeLong(buf));
+ return reply;
+}
+
+bool
+RoutableFactories60::RemoveDocumentReplyFactory::doEncode(const DocumentReply &obj, vespalib::GrowableByteBuffer &buf) const
+{
+ const RemoveDocumentReply &reply = static_cast<const RemoveDocumentReply&>(obj);
+ buf.putBoolean(reply.getWasFound());
+ buf.putLong(reply.getHighestModificationTimestamp());
+ return true;
+}
+
+DocumentMessage::UP
+RoutableFactories60::RemoveLocationMessageFactory::doDecode(document::ByteBuffer &buf) const
+{
+ string selection = decodeString(buf);
+
+ document::BucketIdFactory factory;
+ document::select::Parser parser(_repo, factory);
+
+ auto msg = std::make_unique<RemoveLocationMessage>(factory, parser, selection);
+ // FIXME bucket space not part of wire format, implicitly limiting to only default space for now.
+ msg->setBucketSpace(document::FixedBucketSpaces::default_space_name());
+ return msg;
+}
+
+bool
+RoutableFactories60::RemoveLocationMessageFactory::doEncode(const DocumentMessage &obj, vespalib::GrowableByteBuffer &buf) const
+{
+ const RemoveLocationMessage &msg = static_cast<const RemoveLocationMessage&>(obj);
+ buf.putString(msg.getDocumentSelection());
+ return true;
+}
+
+DocumentReply::UP
+RoutableFactories60::RemoveLocationReplyFactory::doDecode(document::ByteBuffer &) const
+{
+ return std::make_unique<DocumentReply>(DocumentProtocol::REPLY_REMOVELOCATION);
+}
+
+bool
+RoutableFactories60::RemoveLocationReplyFactory::doEncode(const DocumentReply &, vespalib::GrowableByteBuffer &) const
+{
+ return true;
+}
+
+DocumentMessage::UP
+RoutableFactories60::SearchResultMessageFactory::doDecode(document::ByteBuffer &buf) const
+{
+ auto msg = std::make_unique<SearchResultMessage>();
+ msg->deserialize(buf);
+ return msg;
+}
+
+bool
+RoutableFactories60::SearchResultMessageFactory::doEncode(const DocumentMessage &obj, vespalib::GrowableByteBuffer &buf) const
+{
+ const SearchResultMessage &msg = static_cast<const SearchResultMessage&>(obj);
+
+ int len = msg.getSerializedSize();
+ char *tmp = buf.allocate(len);
+ document::ByteBuffer dbuf(tmp, len);
+ msg.serialize(dbuf);
+
+ return true;
+}
+
+DocumentMessage::UP
+RoutableFactories60::QueryResultMessageFactory::doDecode(document::ByteBuffer &buf) const
+{
+ auto msg = std::make_unique<QueryResultMessage>();
+ msg->getSearchResult().deserialize(buf);
+ msg->getDocumentSummary().deserialize(buf);
+
+ return msg;
+}
+
+bool
+RoutableFactories60::QueryResultMessageFactory::doEncode(const DocumentMessage &obj, vespalib::GrowableByteBuffer &buf) const
+{
+ const QueryResultMessage &msg = static_cast<const QueryResultMessage&>(obj);
+
+ int len = msg.getSearchResult().getSerializedSize() + msg.getDocumentSummary().getSerializedSize();
+ char *tmp = buf.allocate(len);
+ document::ByteBuffer dbuf(tmp, len);
+ msg.getSearchResult().serialize(dbuf);
+ msg.getDocumentSummary().serialize(dbuf);
+
+ return true;
+}
+
+DocumentReply::UP
+RoutableFactories60::SearchResultReplyFactory::doDecode(document::ByteBuffer &) const
+{
+ return std::make_unique<VisitorReply>(DocumentProtocol::REPLY_SEARCHRESULT);
+}
+
+bool
+RoutableFactories60::SearchResultReplyFactory::doEncode(const DocumentReply &, vespalib::GrowableByteBuffer &) const
+{
+ return true;
+}
+
+DocumentReply::UP
+RoutableFactories60::QueryResultReplyFactory::doDecode(document::ByteBuffer &) const
+{
+ return std::make_unique<VisitorReply>(DocumentProtocol::REPLY_QUERYRESULT);
+}
+
+bool
+RoutableFactories60::QueryResultReplyFactory::doEncode(const DocumentReply &, vespalib::GrowableByteBuffer &) const
+{
+ return true;
+}
+
+bool RoutableFactories60::StatBucketMessageFactory::encodeBucketSpace(vespalib::stringref bucketSpace,
+ vespalib::GrowableByteBuffer &buf) const
+{
doEncodeBucketSpace(bucketSpace, buf);
return true;
}
-string RoutableFactories60::GetBucketListMessageFactory::decodeBucketSpace(document::ByteBuffer& buf) const {
+string RoutableFactories60::StatBucketMessageFactory::decodeBucketSpace(document::ByteBuffer &buf) const {
return doDecodeBucketSpace(buf);
}
+DocumentMessage::UP
+RoutableFactories60::StatBucketMessageFactory::doDecode(document::ByteBuffer &buf) const
+{
+ auto msg = std::make_unique<StatBucketMessage>();
+
+ msg->setBucketId(document::BucketId(decodeLong(buf)));
+ msg->setDocumentSelection(decodeString(buf));
+ msg->setBucketSpace(decodeBucketSpace(buf));
+
+ return msg;
+}
+
+bool
+RoutableFactories60::StatBucketMessageFactory::doEncode(const DocumentMessage &obj, vespalib::GrowableByteBuffer &buf) const
+{
+ const StatBucketMessage &msg = static_cast<const StatBucketMessage&>(obj);
+
+ buf.putLong(msg.getBucketId().getRawId());
+ buf.putString(msg.getDocumentSelection());
+ return encodeBucketSpace(msg.getBucketSpace(), buf);
+}
+
+DocumentReply::UP
+RoutableFactories60::StatBucketReplyFactory::doDecode(document::ByteBuffer &buf) const
+{
+ auto reply = std::make_unique<StatBucketReply>();
+ reply->setResults(decodeString(buf));
+ return reply;
+}
+
+bool
+RoutableFactories60::StatBucketReplyFactory::doEncode(const DocumentReply &obj, vespalib::GrowableByteBuffer &buf) const
+{
+ const StatBucketReply &reply = static_cast<const StatBucketReply&>(obj);
+ buf.putString(reply.getResults());
+ return true;
+}
+
+DocumentMessage::UP
+RoutableFactories60::StatDocumentMessageFactory::doDecode(document::ByteBuffer &) const
+{
+ return DocumentMessage::UP(); // TODO: remove message type
+}
+
+bool
+RoutableFactories60::StatDocumentMessageFactory::doEncode(const DocumentMessage &, vespalib::GrowableByteBuffer &) const
+{
+ return false;
+}
+
+DocumentReply::UP
+RoutableFactories60::StatDocumentReplyFactory::doDecode(document::ByteBuffer &) const
+{
+ return DocumentReply::UP(); // TODO: remove reply type
+}
+
+bool
+RoutableFactories60::StatDocumentReplyFactory::doEncode(const DocumentReply &, vespalib::GrowableByteBuffer &) const
+{
+ return false;
+}
+
+void
+RoutableFactories60::UpdateDocumentMessageFactory::decodeInto(UpdateDocumentMessage & msg, document::ByteBuffer & buf) const {
+ msg.setDocumentUpdate(document::DocumentUpdate::createHEAD(_repo, buf));
+ msg.setOldTimestamp(static_cast<uint64_t>(decodeLong(buf)));
+ msg.setNewTimestamp(static_cast<uint64_t>(decodeLong(buf)));
+ decodeTasCondition(msg, buf);
+}
+
+bool
+RoutableFactories60::UpdateDocumentMessageFactory::doEncode(const DocumentMessage &obj, vespalib::GrowableByteBuffer &buf) const
+{
+ const UpdateDocumentMessage &msg = static_cast<const UpdateDocumentMessage&>(obj);
+
+ vespalib::nbostream stream;
+ msg.getDocumentUpdate().serializeHEAD(stream);
+ buf.putBytes(stream.peek(), stream.size());
+ buf.putLong((int64_t)msg.getOldTimestamp());
+ buf.putLong((int64_t)msg.getNewTimestamp());
+ encodeTasCondition(buf, msg);
+
+ return true;
+}
+
+DocumentReply::UP
+RoutableFactories60::UpdateDocumentReplyFactory::doDecode(document::ByteBuffer &buf) const
+{
+ auto reply = std::make_unique<UpdateDocumentReply>();
+ reply->setWasFound(decodeBoolean(buf));
+ reply->setHighestModificationTimestamp(decodeLong(buf));
+ return reply;
+}
+
+bool
+RoutableFactories60::UpdateDocumentReplyFactory::doEncode(const DocumentReply &obj, vespalib::GrowableByteBuffer &buf) const
+{
+ const UpdateDocumentReply &reply = static_cast<const UpdateDocumentReply&>(obj);
+ buf.putBoolean(reply.getWasFound());
+ buf.putLong(reply.getHighestModificationTimestamp());
+ return true;
+}
+
+DocumentMessage::UP
+RoutableFactories60::VisitorInfoMessageFactory::doDecode(document::ByteBuffer &buf) const
+{
+ auto msg = std::make_unique<VisitorInfoMessage>();
+
+ int32_t len = decodeInt(buf);
+ msg->getFinishedBuckets().reserve(len);
+ for (int32_t i = 0; i < len; i++) {
+ int64_t val;
+ buf.getLong(val); // NOT using getLongNetwork
+ msg->getFinishedBuckets().emplace_back(val);
+ }
+ msg->setErrorMessage(decodeString(buf));
+
+ return msg;
+}
+
+bool
+RoutableFactories60::VisitorInfoMessageFactory::doEncode(const DocumentMessage &obj, vespalib::GrowableByteBuffer &buf) const
+{
+ const VisitorInfoMessage &msg = static_cast<const VisitorInfoMessage&>(obj);
+
+ buf.putInt(msg.getFinishedBuckets().size());
+ for (const auto & bucketId : msg.getFinishedBuckets()) {
+ uint64_t val = bucketId.getRawId();
+ buf.putBytes((const char*)&val, 8);
+ }
+ buf.putString(msg.getErrorMessage());
+
+ return true;
+}
+
+DocumentReply::UP
+RoutableFactories60::VisitorInfoReplyFactory::doDecode(document::ByteBuffer &) const
+{
+ return std::make_unique<VisitorReply>(DocumentProtocol::REPLY_VISITORINFO);
+}
+
+bool
+RoutableFactories60::VisitorInfoReplyFactory::doEncode(const DocumentReply &, vespalib::GrowableByteBuffer &) const
+{
+ return true;
+}
+
+DocumentReply::UP
+RoutableFactories60::WrongDistributionReplyFactory::doDecode(document::ByteBuffer &buf) const
+{
+ auto reply = std::make_unique<WrongDistributionReply>();
+ reply->setSystemState(decodeString(buf));
+ return reply;
+}
+
+bool
+RoutableFactories60::WrongDistributionReplyFactory::doEncode(const DocumentReply &obj, vespalib::GrowableByteBuffer &buf) const
+{
+ const WrongDistributionReply &reply = static_cast<const WrongDistributionReply&>(obj);
+ buf.putString(reply.getSystemState());
+ return true;
+}
+
+string
+RoutableFactories60::decodeString(document::ByteBuffer &in)
+{
+ int32_t len = decodeInt(in);
+ string ret = string(in.getBufferAtPos(), len);
+ in.incPos(len);
+ return ret;
+}
+
+bool
+RoutableFactories60::decodeBoolean(document::ByteBuffer &in)
+{
+ char ret;
+ in.getBytes(&ret, 1);
+ return (bool)ret;
+}
+
+int32_t
+RoutableFactories60::decodeInt(document::ByteBuffer &in)
+{
+ int32_t ret;
+ in.getIntNetwork(ret);
+ return ret;
+}
+
+int64_t
+RoutableFactories60::decodeLong(document::ByteBuffer &in)
+{
+ int64_t ret;
+ in.getLongNetwork(ret);
+ return ret;
+}
+
+document::DocumentId
+RoutableFactories60::decodeDocumentId(document::ByteBuffer &in)
+{
+ nbostream stream(in.getBufferAtPos(), in.getRemaining());
+ document::DocumentId ret(stream);
+ in.incPos(stream.rp());
+ return ret;
+}
+
+void
+RoutableFactories60::encodeDocumentId(const document::DocumentId &id, vespalib::GrowableByteBuffer &out)
+{
+ string str = id.toString();
+ out.putBytes(str.c_str(), str.size() + 1);
+}
+
+void RoutableFactories60::decodeTasCondition(DocumentMessage & docMsg, document::ByteBuffer & buf) {
+ auto & msg = static_cast<TestAndSetMessage &>(docMsg);
+ msg.setCondition(TestAndSetCondition(decodeString(buf)));
+}
+
+void RoutableFactories60::encodeTasCondition(vespalib::GrowableByteBuffer & buf, const DocumentMessage & docMsg) {
+ auto & msg = static_cast<const TestAndSetMessage &>(docMsg);
+ buf.putString(msg.getCondition().getSelection());
+}
+
void RoutableFactories60::doEncodeBucketSpace(
vespalib::stringref bucketSpace,
vespalib::GrowableByteBuffer& buf) {
@@ -48,4 +956,4 @@ string RoutableFactories60::doDecodeBucketSpace(document::ByteBuffer& buf) {
return decodeString(buf);
}
-} \ No newline at end of file
+}