summaryrefslogtreecommitdiffstats
path: root/documentapi
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@vespa.ai>2024-02-20 12:54:17 +0000
committerTor Brede Vekterli <vekterli@vespa.ai>2024-02-20 12:54:17 +0000
commit11c0b2581dfc9f0e995a8171eb99a9793f7c4b38 (patch)
treef5cc57b181936a2e385a1aa04d25599eee5b8c76 /documentapi
parente0195ce27f47717ad5ba59ea59ab027de31d703f (diff)
Improve codec error reporting and use Protobuf type parsers directly
Errors during (de-)serialization will now be logged, alongside the message type in question. Due to differences in internal wiring, the Java version logs the Document API type whereas C++ logs the protobuf message type (and also has throttled logging).
Diffstat (limited to 'documentapi')
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutableFactories80.java99
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/routable_factories_8.cpp43
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/routable_factories_8.h3
3 files changed, 93 insertions, 52 deletions
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutableFactories80.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutableFactories80.java
index e03a7a05a4b..a4dcd660ab8 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutableFactories80.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutableFactories80.java
@@ -7,6 +7,7 @@ import ai.vespa.documentapi.protobuf.DocapiVisiting;
import com.google.protobuf.AbstractMessage;
import com.google.protobuf.ByteString;
import com.google.protobuf.CodedOutputStream;
+import com.google.protobuf.Parser;
import com.yahoo.document.BucketId;
import com.yahoo.document.Document;
import com.yahoo.document.DocumentId;
@@ -31,12 +32,15 @@ import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.function.BiFunction;
import java.util.function.Function;
+import java.util.logging.Logger;
/**
* Implementation of MessageBus message request/response serialization built around Protocol Buffers.
*/
abstract class RoutableFactories80 {
+ private static final Logger log = Logger.getLogger(RoutableFactories80.class.getName());
+
private static class ProtobufCodec<DocApiT extends Routable, ProtoT extends AbstractMessage> implements RoutableFactory {
private final Class<DocApiT> apiClass;
@@ -62,6 +66,7 @@ abstract class RoutableFactories80 {
protoStream.flush();
}
} catch (IOException | UnsupportedOperationException e) {
+ logCodecError("encoding", e);
return false;
}
return true;
@@ -69,7 +74,19 @@ abstract class RoutableFactories80 {
@Override
public Routable decode(DocumentDeserializer in) {
- return decoderFn.apply(in);
+ try {
+ return decoderFn.apply(in);
+ } catch (RuntimeException e) {
+ logCodecError("decoding", e);
+ return null;
+ }
+ }
+
+ private void logCodecError(String op, Throwable t) {
+ // TODO ideally we'd print the peer address here, but that information is Abstracted Away(tm)
+ // and we can't safely propagate the exception to a lower-level component which _does_ know this
+ // information, as it's not necessarily exception safe.
+ log.severe("Error during Protobuf %s for message type %s: %s".formatted(op, apiClass.getSimpleName(), t.getMessage()));
}
}
@@ -98,31 +115,31 @@ abstract class RoutableFactories80 {
return this;
}
- ProtobufCodecBuilder<DocApiT, ProtoT> decoder(ProtoT messagePrototype, Function<ProtoT, DocApiT> fn) {
+ ProtobufCodecBuilder<DocApiT, ProtoT> decoder(Parser<ProtoT> parser, Function<ProtoT, DocApiT> fn) {
if (decoderFn != null) {
throw new IllegalArgumentException("Decoder already set");
}
decoderFn = (buf) -> {
try {
- var protoObj = messagePrototype.getParserForType().parseFrom(buf.getBuf().getByteBuffer());
- return fn.apply(protoClass.cast(protoObj));
+ var protoObj = parser.parseFrom(buf.getBuf().getByteBuffer());
+ return fn.apply(protoObj);
} catch (IOException e) {
- return null;
+ throw new RuntimeException(e);
}
};
return this;
}
- ProtobufCodecBuilder<DocApiT, ProtoT> decoderWithRepo(ProtoT messagePrototype, BiFunction<ProtoT, DocumentTypeManager, DocApiT> fn) {
+ ProtobufCodecBuilder<DocApiT, ProtoT> decoderWithRepo(Parser<ProtoT> parser, BiFunction<ProtoT, DocumentTypeManager, DocApiT> fn) {
if (decoderFn != null) {
throw new IllegalArgumentException("Decoder already set");
}
decoderFn = (buf) -> {
try {
- var protoObj = messagePrototype.getParserForType().parseFrom(buf.getBuf().getByteBuffer());
- return fn.apply(protoClass.cast(protoObj), buf.getTypeRepo());
+ var protoObj = parser.parseFrom(buf.getBuf().getByteBuffer());
+ return fn.apply(protoObj, buf.getTypeRepo());
} catch (IOException e) {
- return null;
+ throw new RuntimeException(e);
}
};
return this;
@@ -270,7 +287,7 @@ abstract class RoutableFactories80 {
.setDocumentId(toProtoDocId(apiMsg.getDocumentId()))
.setFieldSet(toProtoFieldSet(apiMsg.getFieldSet()))
.build())
- .decoder(DocapiFeed.GetDocumentRequest.getDefaultInstance(), (protoMsg) ->
+ .decoder(DocapiFeed.GetDocumentRequest.parser(), (protoMsg) ->
new GetDocumentMessage(
fromProtoDocId(protoMsg.getDocumentId()),
fromProtoFieldSet(protoMsg.getFieldSet())))
@@ -289,7 +306,7 @@ abstract class RoutableFactories80 {
}
return builder.build();
})
- .decoderWithRepo(DocapiFeed.GetDocumentResponse.getDefaultInstance(), (protoReply, repo) -> {
+ .decoderWithRepo(DocapiFeed.GetDocumentResponse.parser(), (protoReply, repo) -> {
GetDocumentReply reply;
if (protoReply.hasDocument()) {
var doc = fromProtoDocument(protoReply.getDocument(), repo);
@@ -321,7 +338,7 @@ abstract class RoutableFactories80 {
}
return builder.build();
})
- .decoderWithRepo(DocapiFeed.PutDocumentRequest.getDefaultInstance(), (protoMsg, repo) -> {
+ .decoderWithRepo(DocapiFeed.PutDocumentRequest.parser(), (protoMsg, repo) -> {
var doc = fromProtoDocument(protoMsg.getDocument(), repo);
var msg = new PutDocumentMessage(new DocumentPut(doc));
if (protoMsg.hasCondition()) {
@@ -341,7 +358,7 @@ abstract class RoutableFactories80 {
DocapiFeed.PutDocumentResponse.newBuilder()
.setModificationTimestamp(apiReply.getHighestModificationTimestamp())
.build())
- .decoder(DocapiFeed.PutDocumentResponse.getDefaultInstance(), (protoReply) -> {
+ .decoder(DocapiFeed.PutDocumentResponse.parser(), (protoReply) -> {
var reply = new WriteDocumentReply(DocumentProtocol.REPLY_PUTDOCUMENT);
reply.setHighestModificationTimestamp(protoReply.getModificationTimestamp());
return reply;
@@ -366,7 +383,7 @@ abstract class RoutableFactories80 {
}
return builder.build();
})
- .decoderWithRepo(DocapiFeed.UpdateDocumentRequest.getDefaultInstance(), (protoMsg, repo) -> {
+ .decoderWithRepo(DocapiFeed.UpdateDocumentRequest.parser(), (protoMsg, repo) -> {
var msg = new UpdateDocumentMessage(fromProtoUpdate(protoMsg.getUpdate(), repo));
msg.setOldTimestamp(protoMsg.getExpectedOldTimestamp());
msg.setNewTimestamp(protoMsg.getForceAssignTimestamp());
@@ -386,7 +403,7 @@ abstract class RoutableFactories80 {
.setModificationTimestamp(apiReply.getHighestModificationTimestamp())
.setWasFound(apiReply.wasFound())
.build())
- .decoder(DocapiFeed.UpdateDocumentResponse.getDefaultInstance(), (protoReply) -> {
+ .decoder(DocapiFeed.UpdateDocumentResponse.parser(), (protoReply) -> {
var reply = new UpdateDocumentReply();
reply.setHighestModificationTimestamp(protoReply.getModificationTimestamp());
reply.setWasFound(protoReply.getWasFound());
@@ -410,7 +427,7 @@ abstract class RoutableFactories80 {
}
return builder.build();
})
- .decoder(DocapiFeed.RemoveDocumentRequest.getDefaultInstance(), (protoMsg) -> {
+ .decoder(DocapiFeed.RemoveDocumentRequest.parser(), (protoMsg) -> {
var msg = new RemoveDocumentMessage(fromProtoDocId(protoMsg.getDocumentId()));
if (protoMsg.hasCondition()) {
msg.setCondition(fromProtoTasCondition(protoMsg.getCondition()));
@@ -428,7 +445,7 @@ abstract class RoutableFactories80 {
.setWasFound(apiReply.wasFound())
.setModificationTimestamp(apiReply.getHighestModificationTimestamp())
.build())
- .decoder(DocapiFeed.RemoveDocumentResponse.getDefaultInstance(), (protoReply) -> {
+ .decoder(DocapiFeed.RemoveDocumentResponse.parser(), (protoReply) -> {
var reply = new RemoveDocumentReply();
reply.setWasFound(protoReply.getWasFound());
reply.setHighestModificationTimestamp(protoReply.getModificationTimestamp());
@@ -449,7 +466,7 @@ abstract class RoutableFactories80 {
.setBucketSpace(toProtoBucketSpace(apiMsg.getBucketSpace()))
.setSelection(toProtoDocumentSelection(apiMsg.getDocumentSelection()))
.build())
- .decoder(DocapiFeed.RemoveLocationRequest.getDefaultInstance(), (protoMsg) ->
+ .decoder(DocapiFeed.RemoveLocationRequest.parser(), (protoMsg) ->
new RemoveLocationMessage(
fromProtoDocumentSelection(protoMsg.getSelection()),
fromProtoBucketSpace(protoMsg.getBucketSpace())))
@@ -460,7 +477,7 @@ abstract class RoutableFactories80 {
return ProtobufCodecBuilder
.of(DocumentReply.class, DocapiFeed.RemoveLocationResponse.class)
.encoder((apiReply) -> DocapiFeed.RemoveLocationResponse.newBuilder().build())
- .decoder(DocapiFeed.RemoveLocationResponse.getDefaultInstance(),
+ .decoder(DocapiFeed.RemoveLocationResponse.parser(),
(protoReply) -> new DocumentReply(DocumentProtocol.REPLY_REMOVELOCATION))
.build();
}
@@ -502,7 +519,7 @@ abstract class RoutableFactories80 {
}
return builder.build();
})
- .decoder(DocapiVisiting.CreateVisitorRequest.getDefaultInstance(), (protoMsg) -> {
+ .decoder(DocapiVisiting.CreateVisitorRequest.parser(), (protoMsg) -> {
var msg = new CreateVisitorMessage();
msg.setBucketSpace(fromProtoBucketSpace(protoMsg.getBucketSpace()));
msg.setLibraryName(protoMsg.getVisitorLibraryName());
@@ -544,7 +561,7 @@ abstract class RoutableFactories80 {
.build())
.build();
})
- .decoder(DocapiVisiting.CreateVisitorResponse.getDefaultInstance(), (protoReply) -> {
+ .decoder(DocapiVisiting.CreateVisitorResponse.parser(), (protoReply) -> {
var reply = new CreateVisitorReply(DocumentProtocol.REPLY_CREATEVISITOR);
reply.setLastBucket(fromProtoBucketId(protoReply.getLastBucket()));
var protoVs = protoReply.getStatistics();
@@ -571,7 +588,7 @@ abstract class RoutableFactories80 {
DocapiVisiting.DestroyVisitorRequest.newBuilder()
.setInstanceId(apiMsg.getInstanceId())
.build())
- .decoder(DocapiVisiting.DestroyVisitorRequest.getDefaultInstance(),
+ .decoder(DocapiVisiting.DestroyVisitorRequest.parser(),
(protoMsg) -> new DestroyVisitorMessage(protoMsg.getInstanceId()))
.build();
}
@@ -580,7 +597,7 @@ abstract class RoutableFactories80 {
return ProtobufCodecBuilder
.of(VisitorReply.class, DocapiVisiting.DestroyVisitorResponse.class)
.encoder((apiReply) -> DocapiVisiting.DestroyVisitorResponse.newBuilder().build())
- .decoder(DocapiVisiting.DestroyVisitorResponse.getDefaultInstance(),
+ .decoder(DocapiVisiting.DestroyVisitorResponse.parser(),
(protoReply) -> new VisitorReply(DocumentProtocol.REPLY_DESTROYVISITOR))
.build();
}
@@ -601,7 +618,7 @@ abstract class RoutableFactories80 {
}
return builder.build();
})
- .decoder(DocapiVisiting.MapVisitorRequest.getDefaultInstance(), (protoMsg) -> {
+ .decoder(DocapiVisiting.MapVisitorRequest.parser(), (protoMsg) -> {
var msg = new MapVisitorMessage();
for (var param : protoMsg.getDataList()) {
msg.getData().put(param.getKey(), param.getValue().toStringUtf8());
@@ -615,7 +632,7 @@ abstract class RoutableFactories80 {
return ProtobufCodecBuilder
.of(VisitorReply.class, DocapiVisiting.MapVisitorResponse.class)
.encoder((apiReply) -> DocapiVisiting.MapVisitorResponse.newBuilder().build())
- .decoder(DocapiVisiting.MapVisitorResponse.getDefaultInstance(),
+ .decoder(DocapiVisiting.MapVisitorResponse.parser(),
(protoReply) -> new VisitorReply(DocumentProtocol.REPLY_MAPVISITOR))
.build();
}
@@ -631,7 +648,7 @@ abstract class RoutableFactories80 {
// Serialization of QueryResultMessages is not implemented in Java (receive only)
throw new UnsupportedOperationException("Serialization of QueryResultMessage instances is not supported");
})
- .decoder(DocapiVisiting.QueryResultRequest.getDefaultInstance(), (protoMsg) -> {
+ .decoder(DocapiVisiting.QueryResultRequest.parser(), (protoMsg) -> {
var msg = new QueryResultMessage();
// Explicitly enforce presence of result/summary fields, as our object is not necessarily
// well-defined if these have not been initialized.
@@ -653,7 +670,7 @@ abstract class RoutableFactories80 {
return ProtobufCodecBuilder
.of(VisitorReply.class, DocapiVisiting.QueryResultResponse.class)
.encoder((apiReply) -> DocapiVisiting.QueryResultResponse.newBuilder().build())
- .decoder(DocapiVisiting.QueryResultResponse.getDefaultInstance(),
+ .decoder(DocapiVisiting.QueryResultResponse.parser(),
(protoReply) -> new VisitorReply(DocumentProtocol.REPLY_QUERYRESULT))
.build();
}
@@ -673,7 +690,7 @@ abstract class RoutableFactories80 {
}
return builder.build();
})
- .decoder(DocapiVisiting.VisitorInfoRequest.getDefaultInstance(), (protoMsg) -> {
+ .decoder(DocapiVisiting.VisitorInfoRequest.parser(), (protoMsg) -> {
var msg = new VisitorInfoMessage();
msg.setErrorMessage(protoMsg.getErrorMessage());
for (var protoId : protoMsg.getFinishedBucketsList()) {
@@ -688,7 +705,7 @@ abstract class RoutableFactories80 {
return ProtobufCodecBuilder
.of(VisitorReply.class, DocapiVisiting.VisitorInfoResponse.class)
.encoder((apiReply) -> DocapiVisiting.VisitorInfoResponse.newBuilder().build())
- .decoder(DocapiVisiting.VisitorInfoResponse.getDefaultInstance(),
+ .decoder(DocapiVisiting.VisitorInfoResponse.parser(),
(protoReply) -> new VisitorReply(DocumentProtocol.REPLY_VISITORINFO))
.build();
}
@@ -712,7 +729,7 @@ abstract class RoutableFactories80 {
}
return builder.build();
})
- .decoderWithRepo(DocapiVisiting.DocumentListRequest.getDefaultInstance(), (protoMsg, repo) -> {
+ .decoderWithRepo(DocapiVisiting.DocumentListRequest.parser(), (protoMsg, repo) -> {
var msg = new DocumentListMessage();
msg.setBucketId(fromProtoBucketId(protoMsg.getBucketId()));
for (var entry : protoMsg.getEntriesList()) {
@@ -730,7 +747,7 @@ abstract class RoutableFactories80 {
return ProtobufCodecBuilder
.of(VisitorReply.class, DocapiVisiting.DocumentListResponse.class)
.encoder((apiReply) -> DocapiVisiting.DocumentListResponse.newBuilder().build())
- .decoder(DocapiVisiting.DocumentListResponse.getDefaultInstance(),
+ .decoder(DocapiVisiting.DocumentListResponse.parser(),
(protoReply) -> new VisitorReply(DocumentProtocol.REPLY_DOCUMENTLIST))
.build();
}
@@ -750,7 +767,7 @@ abstract class RoutableFactories80 {
}
return builder.build();
})
- .decoder(DocapiVisiting.EmptyBucketsRequest.getDefaultInstance(), (protoMsg) -> {
+ .decoder(DocapiVisiting.EmptyBucketsRequest.parser(), (protoMsg) -> {
var msg = new EmptyBucketsMessage();
for (var protoId : protoMsg.getBucketIdsList()) {
msg.getBucketIds().add(fromProtoBucketId(protoId));
@@ -764,7 +781,7 @@ abstract class RoutableFactories80 {
return ProtobufCodecBuilder
.of(VisitorReply.class, DocapiVisiting.EmptyBucketsResponse.class)
.encoder((apiReply) -> DocapiVisiting.EmptyBucketsResponse.newBuilder().build())
- .decoder(DocapiVisiting.EmptyBucketsResponse.getDefaultInstance(),
+ .decoder(DocapiVisiting.EmptyBucketsResponse.parser(),
(protoReply) -> new VisitorReply(DocumentProtocol.REPLY_EMPTYBUCKETS))
.build();
}
@@ -781,7 +798,7 @@ abstract class RoutableFactories80 {
.setBucketId(toProtoBucketId(apiMsg.getBucketId()))
.setBucketSpace(toProtoBucketSpace(apiMsg.getBucketSpace()))
.build())
- .decoder(DocapiInspect.GetBucketListRequest.getDefaultInstance(), (protoMsg) ->
+ .decoder(DocapiInspect.GetBucketListRequest.parser(), (protoMsg) ->
new GetBucketListMessage(
fromProtoBucketId(protoMsg.getBucketId()),
fromProtoBucketSpace(protoMsg.getBucketSpace())))
@@ -800,7 +817,7 @@ abstract class RoutableFactories80 {
}
return builder.build();
})
- .decoder(DocapiInspect.GetBucketListResponse.getDefaultInstance(), (protoReply) -> {
+ .decoder(DocapiInspect.GetBucketListResponse.parser(), (protoReply) -> {
var reply = new GetBucketListReply();
for (var info : protoReply.getBucketInfoList()) {
reply.getBuckets().add(new GetBucketListReply.BucketInfo(
@@ -823,7 +840,7 @@ abstract class RoutableFactories80 {
DocapiInspect.GetBucketStateRequest.newBuilder()
.setBucketId(toProtoBucketId(apiMsg.getBucketId()))
.build())
- .decoder(DocapiInspect.GetBucketStateRequest.getDefaultInstance(), (protoMsg) ->
+ .decoder(DocapiInspect.GetBucketStateRequest.parser(), (protoMsg) ->
new GetBucketStateMessage(fromProtoBucketId(protoMsg.getBucketId())))
.build();
}
@@ -846,7 +863,7 @@ abstract class RoutableFactories80 {
}
return builder.build();
})
- .decoder(DocapiInspect.GetBucketStateResponse.getDefaultInstance(), (protoReply) -> {
+ .decoder(DocapiInspect.GetBucketStateResponse.parser(), (protoReply) -> {
var reply = new GetBucketStateReply();
for (var state : protoReply.getStatesList()) {
if (state.hasDocumentId()) {
@@ -879,7 +896,7 @@ abstract class RoutableFactories80 {
.setBucketSpace(toProtoBucketSpace(apiMsg.getBucketSpace()))
.setSelection(toProtoDocumentSelection(apiMsg.getDocumentSelection()))
.build())
- .decoder(DocapiInspect.StatBucketRequest.getDefaultInstance(), (protoMsg) ->
+ .decoder(DocapiInspect.StatBucketRequest.parser(), (protoMsg) ->
new StatBucketMessage(
fromProtoBucketId(protoMsg.getBucketId()),
fromProtoBucketSpace(protoMsg.getBucketSpace()),
@@ -894,7 +911,7 @@ abstract class RoutableFactories80 {
DocapiInspect.StatBucketResponse.newBuilder()
.setResults(apiReply.getResults())
.build())
- .decoder(DocapiInspect.StatBucketResponse.getDefaultInstance(), (protoReply) ->
+ .decoder(DocapiInspect.StatBucketResponse.parser(), (protoReply) ->
new StatBucketReply(protoReply.getResults()))
.build();
}
@@ -910,7 +927,7 @@ abstract class RoutableFactories80 {
DocapiCommon.WrongDistributionResponse.newBuilder()
.setClusterState(toProtoClusterState(apiReply.getSystemState()))
.build())
- .decoder(DocapiCommon.WrongDistributionResponse.getDefaultInstance(), (protoReply) ->
+ .decoder(DocapiCommon.WrongDistributionResponse.parser(), (protoReply) ->
new WrongDistributionReply(fromProtoClusterState(protoReply.getClusterState())))
.build();
}
@@ -923,7 +940,7 @@ abstract class RoutableFactories80 {
return ProtobufCodecBuilder
.of(DocumentIgnoredReply.class, DocapiCommon.DocumentIgnoredResponse.class)
.encoder((apiReply) -> DocapiCommon.DocumentIgnoredResponse.newBuilder().build())
- .decoder(DocapiCommon.DocumentIgnoredResponse.getDefaultInstance(),
+ .decoder(DocapiCommon.DocumentIgnoredResponse.parser(),
(protoReply) -> new DocumentIgnoredReply())
.build();
}
diff --git a/documentapi/src/vespa/documentapi/messagebus/routable_factories_8.cpp b/documentapi/src/vespa/documentapi/messagebus/routable_factories_8.cpp
index 399801526ba..c707d8bb9fc 100644
--- a/documentapi/src/vespa/documentapi/messagebus/routable_factories_8.cpp
+++ b/documentapi/src/vespa/documentapi/messagebus/routable_factories_8.cpp
@@ -11,6 +11,10 @@
#include <vespa/documentapi/messagebus/docapi_visiting.pb.h>
#include <vespa/documentapi/messagebus/docapi_inspect.pb.h>
#include <vespa/vespalib/objects/nbostream.h>
+#include <vespa/vespalib/util/stringfmt.h>
+
+#include <vespa/log/bufferedlogger.h>
+LOG_SETUP(".documentapi.messagebus.routable_factories_8");
namespace documentapi::messagebus {
@@ -70,8 +74,10 @@ void set_global_id(protobuf::GlobalId& dest, const document::GlobalId& src) {
}
document::GlobalId get_global_id(const protobuf::GlobalId& src) {
- if (src.raw_gid().size() != document::GlobalId::LENGTH) {
- throw document::DeserializeException("Unexpected serialized protobuf GlobalId size");
+ if (src.raw_gid().size() != document::GlobalId::LENGTH) [[unlikely]] {
+ throw document::DeserializeException(
+ vespalib::make_string("Unexpected serialized protobuf GlobalId size (expected %u, was %zu)",
+ document::GlobalId::LENGTH, src.raw_gid().size()));
}
return document::GlobalId(src.raw_gid().data()); // By copy
}
@@ -136,6 +142,11 @@ std::shared_ptr<document::DocumentUpdate> get_update_or_throw(const protobuf::Do
return upd;
}
+void log_codec_error(const char* op, const char* type, const char* msg) noexcept __attribute__((noinline));
+void log_codec_error(const char* op, const char* type, const char* msg) noexcept {
+ LOGBM(error, "Error during Protobuf %s for message type %s: %s", op, type, msg);
+}
+
template <typename DocApiType, typename ProtobufType, typename EncodeFn, typename DecodeFn>
requires std::is_invocable_r_v<void, EncodeFn, const DocApiType&, ProtobufType&> &&
std::is_invocable_r_v<std::unique_ptr<DocApiType>, DecodeFn, const ProtobufType&>
@@ -154,7 +165,12 @@ public:
::google::protobuf::Arena arena;
auto* proto_obj = ::google::protobuf::Arena::Create<ProtobufType>(&arena);
- _encode_fn(dynamic_cast<const DocApiType&>(obj), *proto_obj);
+ try {
+ _encode_fn(dynamic_cast<const DocApiType&>(obj), *proto_obj);
+ } catch (std::exception& e) {
+ log_codec_error("encode", ProtobufType::descriptor()->name().c_str(), e.what());
+ return false;
+ }
const auto sz = proto_obj->ByteSizeLong();
assert(sz <= INT32_MAX);
@@ -168,14 +184,19 @@ public:
const auto buf_size = in.getRemaining();
assert(buf_size <= INT_MAX);
bool ok = proto_obj->ParseFromArray(in.getBufferAtPos(), buf_size);
- if (!ok) {
- return {}; // Malformed protobuf payload
+ if (!ok) [[unlikely]] {
+ return {}; // Malformed protobuf payload. Caller is expected to log an error.
}
- auto msg = _decode_fn(*proto_obj);
- if constexpr (std::is_base_of_v<DocumentMessage, DocApiType>) {
- msg->setApproxSize(buf_size); // Wire size is a proxy for in-memory size
+ try {
+ auto msg = _decode_fn(*proto_obj);
+ if constexpr (std::is_base_of_v<DocumentMessage, DocApiType>) {
+ msg->setApproxSize(buf_size); // Wire size is a proxy for in-memory size
+ }
+ return msg;
+ } catch (std::exception& e) {
+ log_codec_error("decode", ProtobufType::descriptor()->name().c_str(), e.what());
+ return {};
}
- return msg;
}
};
@@ -235,7 +256,7 @@ std::shared_ptr<IRoutableFactory> RoutableFactories80::put_document_message_fact
if (src.getCondition().isPresent()) {
set_tas_condition(*dest.mutable_condition(), src.getCondition());
}
- if (src.getDocumentSP()) { // This should always be present in practice
+ if (src.getDocumentSP()) [[likely]] { // This should always be present in practice
set_document(*dest.mutable_document(), src.getDocument());
}
dest.set_create_if_missing(src.get_create_if_non_existent());
@@ -594,7 +615,7 @@ std::shared_ptr<IRoutableFactory> RoutableFactories80::query_result_message_fact
auto msg = std::make_unique<QueryResultMessage>();
// Explicitly enforce presence of result/summary fields, as our object is not necessarily
// well-defined if these have not been initialized.
- if (!src.has_search_result() || !src.has_document_summary()) {
+ if (!src.has_search_result() || !src.has_document_summary()) [[unlikely]] {
throw document::DeserializeException("Query result does not have all required fields set", VESPA_STRLOC);
}
{
diff --git a/documentapi/src/vespa/documentapi/messagebus/routable_factories_8.h b/documentapi/src/vespa/documentapi/messagebus/routable_factories_8.h
index 76da2f7dc9f..7bd275800c9 100644
--- a/documentapi/src/vespa/documentapi/messagebus/routable_factories_8.h
+++ b/documentapi/src/vespa/documentapi/messagebus/routable_factories_8.h
@@ -7,6 +7,9 @@ namespace document { class DocumentTypeRepo; }
namespace documentapi::messagebus {
+/**
+ * Implementation of MessageBus message request/response serialization built around Protocol Buffers.
+ */
class RoutableFactories80 {
public:
RoutableFactories80() = delete;