summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@oath.com>2021-06-30 21:24:03 +0200
committerGitHub <noreply@github.com>2021-06-30 21:24:03 +0200
commit3b17f037ee15828b45f7e271efae102200afa334 (patch)
tree5da54dbaf9452414240afd0e0e1fc99a09b36eca
parent1991465dbd06bee5377df35dba9dd87bac787e4a (diff)
parent9ceacdab281de851c841d395b1e1fd42c25d4f18 (diff)
Merge pull request #15875 from vespa-engine/jonmv/use-injected-document-access-in-streaming-search
Use an injected VespaDocumentAccess for streaming search
-rw-r--r--container-core/src/main/java/com/yahoo/container/core/documentapi/VespaDocumentAccess.java36
-rw-r--r--container-search/src/main/java/com/yahoo/prelude/cluster/ClusterSearcher.java16
-rw-r--r--container-search/src/main/java/com/yahoo/vespa/streamingvisitors/VdsStreamingSearcher.java44
-rw-r--r--container-search/src/main/java/com/yahoo/vespa/streamingvisitors/VdsVisitor.java56
-rw-r--r--container-search/src/test/java/com/yahoo/prelude/cluster/ClusterSearcherTestCase.java3
-rw-r--r--vespaclient-core/src/main/java/com/yahoo/feedapi/MessagePropertyProcessor.java23
6 files changed, 72 insertions, 106 deletions
diff --git a/container-core/src/main/java/com/yahoo/container/core/documentapi/VespaDocumentAccess.java b/container-core/src/main/java/com/yahoo/container/core/documentapi/VespaDocumentAccess.java
index d55df15b2fd..d01cd994fa5 100644
--- a/container-core/src/main/java/com/yahoo/container/core/documentapi/VespaDocumentAccess.java
+++ b/container-core/src/main/java/com/yahoo/container/core/documentapi/VespaDocumentAccess.java
@@ -24,8 +24,10 @@ import com.yahoo.documentapi.messagebus.protocol.DocumentProtocolPoliciesConfig;
import com.yahoo.vespa.config.content.DistributionConfig;
import com.yahoo.vespa.config.content.LoadTypeConfig;
+import java.util.concurrent.atomic.AtomicReference;
+
/**
- * Wraps a lazily initialised MessageBusDocumentAccess. Lazy to allow it to always be set up.
+ * Wraps a lazily initialised {@link DocumentAccess}. Lazy to allow it to always be set up.
* Inject this class directly (instead of DocumentAccess) for use in internal code.
*
* @author jonmv
@@ -33,9 +35,8 @@ import com.yahoo.vespa.config.content.LoadTypeConfig;
public class VespaDocumentAccess extends DocumentAccess {
private final MessageBusParams parameters;
- private final Object monitor = new Object();
- private DocumentAccess delegate = null;
+ private final AtomicReference<DocumentAccess> delegate = new AtomicReference<>();
private boolean shutDown = false;
VespaDocumentAccess(DocumentmanagerConfig documentmanagerConfig,
@@ -52,26 +53,29 @@ public class VespaDocumentAccess extends DocumentAccess {
this.parameters.getMessageBusParams().setMessageBusConfig(messagebusConfig);
}
- private DocumentAccess delegate() {
- synchronized (monitor) {
- if (delegate == null) {
- if (shutDown)
- throw new IllegalStateException("This document access has been shut down");
+ public DocumentAccess delegate() {
+ DocumentAccess access = delegate.getAcquire();
+ return access != null ? access : delegate.updateAndGet(value -> {
+ if (value != null)
+ return value;
+
+ if (shutDown)
+ throw new IllegalStateException("This document access has been shut down");
- delegate = new MessageBusDocumentAccess(parameters);
- }
- return delegate;
- }
+ return new MessageBusDocumentAccess(parameters);
+ });
}
@Override
public void shutdown() {
- synchronized (monitor) {
+ delegate.updateAndGet(access -> {
super.shutdown();
shutDown = true;
- if (delegate != null)
- delegate.shutdown();
- }
+ if (access != null)
+ access.shutdown();
+
+ return null;
+ });
}
@Override
diff --git a/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterSearcher.java b/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterSearcher.java
index ccb6e1248b4..8685400b5c1 100644
--- a/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterSearcher.java
+++ b/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterSearcher.java
@@ -1,12 +1,15 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.prelude.cluster;
+import com.google.inject.Inject;
import com.yahoo.component.ComponentId;
import com.yahoo.component.chain.dependencies.After;
import com.yahoo.component.provider.ComponentRegistry;
import com.yahoo.container.QrConfig;
import com.yahoo.container.QrSearchersConfig;
+import com.yahoo.container.core.documentapi.VespaDocumentAccess;
import com.yahoo.container.handler.VipStatus;
+import com.yahoo.documentapi.DocumentAccess;
import com.yahoo.prelude.IndexFacts;
import com.yahoo.prelude.fastsearch.ClusterParams;
import com.yahoo.prelude.fastsearch.DocumentdbInfoConfig;
@@ -61,13 +64,15 @@ public class ClusterSearcher extends Searcher {
private VespaBackEndSearcher server = null;
+ @Inject
public ClusterSearcher(ComponentId id,
QrSearchersConfig qrsConfig,
ClusterConfig clusterConfig,
DocumentdbInfoConfig documentDbConfig,
ComponentRegistry<Dispatcher> dispatchers,
QrConfig qrConfig,
- VipStatus vipStatus) {
+ VipStatus vipStatus,
+ VespaDocumentAccess access) {
super(id);
int searchClusterIndex = clusterConfig.clusterId();
@@ -93,7 +98,7 @@ public class ClusterSearcher extends Searcher {
if (searchClusterConfig.indexingmode() == STREAMING) {
VdsStreamingSearcher searcher = vdsCluster(qrConfig.discriminator(), searchClusterIndex,
- searchClusterConfig, docSumParams, documentDbConfig);
+ searchClusterConfig, docSumParams, documentDbConfig, access);
addBackendSearcher(searcher);
vipStatus.addToRotation(searcher.getName());
} else {
@@ -139,13 +144,14 @@ public class ClusterSearcher extends Searcher {
int searchclusterIndex,
QrSearchersConfig.Searchcluster searchClusterConfig,
SummaryParameters docSumParams,
- DocumentdbInfoConfig documentdbInfoConfig) {
+ DocumentdbInfoConfig documentdbInfoConfig,
+ VespaDocumentAccess access) {
if (searchClusterConfig.searchdef().size() != 1) {
throw new IllegalArgumentException("Search clusters in streaming search shall only contain a single searchdefinition : " + searchClusterConfig.searchdef());
}
ClusterParams clusterParams = makeClusterParams(searchclusterIndex);
- VdsStreamingSearcher searcher = new VdsStreamingSearcher();
- searcher.setSearchClusterConfigId(searchClusterConfig.rankprofiles().configid());
+ VdsStreamingSearcher searcher = new VdsStreamingSearcher(access);
+ searcher.setSearchClusterName(searchClusterConfig.rankprofiles().configid());
searcher.setDocumentType(searchClusterConfig.searchdef(0));
searcher.setStorageClusterRouteSpec(searchClusterConfig.storagecluster().routespec());
searcher.init(serverId, docSumParams, clusterParams, documentdbInfoConfig);
diff --git a/container-search/src/main/java/com/yahoo/vespa/streamingvisitors/VdsStreamingSearcher.java b/container-search/src/main/java/com/yahoo/vespa/streamingvisitors/VdsStreamingSearcher.java
index 3528da17dfe..24dd25c5182 100644
--- a/container-search/src/main/java/com/yahoo/vespa/streamingvisitors/VdsStreamingSearcher.java
+++ b/container-search/src/main/java/com/yahoo/vespa/streamingvisitors/VdsStreamingSearcher.java
@@ -1,9 +1,14 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.streamingvisitors;
+import com.yahoo.container.core.documentapi.VespaDocumentAccess;
import com.yahoo.document.DocumentId;
import com.yahoo.document.select.parser.ParseException;
import com.yahoo.document.select.parser.TokenMgrException;
+import com.yahoo.documentapi.VisitorParameters;
+import com.yahoo.documentapi.VisitorSession;
+import com.yahoo.documentapi.messagebus.MessageBusDocumentAccess;
+import com.yahoo.documentapi.messagebus.loadtypes.LoadTypeSet;
import com.yahoo.fs4.DocsumPacket;
import com.yahoo.messagebus.routing.Route;
import com.yahoo.prelude.Ping;
@@ -53,15 +58,15 @@ public class VdsStreamingSearcher extends VespaBackEndSearcher {
private Route route;
/** The configId used to access the searchcluster. */
- private String searchClusterConfigId = null;
+ private String searchClusterName = null;
private String documentType;
/** The route to the storage cluster. */
private String storageClusterRouteSpec = null;
- private String getSearchClusterConfigId() { return searchClusterConfigId; }
+ private String getSearchClusterName() { return searchClusterName; }
private String getStorageClusterRouteSpec() { return storageClusterRouteSpec; }
- public final void setSearchClusterConfigId(String clusterName) {
- this.searchClusterConfigId = clusterName;
+ public final void setSearchClusterName(String clusterName) {
+ this.searchClusterName = clusterName;
}
public final void setDocumentType(String documentType) {
this.documentType = documentType;
@@ -71,16 +76,35 @@ public class VdsStreamingSearcher extends VespaBackEndSearcher {
this.storageClusterRouteSpec = storageClusterRouteSpec;
}
- private static class VdsVisitorFactory implements VisitorFactory {
+ private static class VespaVisitorFactory implements VdsVisitor.VisitorSessionFactory, VisitorFactory {
+
+ private final VespaDocumentAccess access;
+
+ private VespaVisitorFactory(VespaDocumentAccess access) {
+ this.access = access;
+ }
+
+ @Override
+ public VisitorSession createVisitorSession(VisitorParameters params) throws ParseException {
+ return access.createVisitorSession(params);
+ }
+
+ @Override
+ public LoadTypeSet getLoadTypeSet() {
+ return ((MessageBusDocumentAccess) access.delegate()).getParams().getLoadTypes();
+ }
+
@Override
public Visitor createVisitor(Query query, String searchCluster, Route route, String documentType, int traceLevelOverride) {
- return new VdsVisitor(query, searchCluster, route, documentType, traceLevelOverride);
+ return new VdsVisitor(query, searchCluster, route, documentType, this, traceLevelOverride);
}
+
}
- public VdsStreamingSearcher() {
- this(new VdsVisitorFactory());
+ public VdsStreamingSearcher(VespaDocumentAccess access) {
+ this(new VespaVisitorFactory(access));
}
+
VdsStreamingSearcher(VisitorFactory visitorFactory) {
this.visitorFactory = visitorFactory;
tracingOptions = TracingOptions.DEFAULT;
@@ -146,11 +170,11 @@ public class VdsStreamingSearcher extends VespaBackEndSearcher {
"only one of these query parameters to be set: streaming.userid, streaming.groupname, " +
"streaming.selection"));
}
- query.trace("Routing to search cluster " + getSearchClusterConfigId() + " and document type " + documentType, 4);
+ query.trace("Routing to search cluster " + getSearchClusterName() + " and document type " + documentType, 4);
long timeStartedNanos = tracingOptions.getClock().nanoTimeNow();
int effectiveTraceLevel = inferEffectiveQueryTraceLevel(query);
- Visitor visitor = visitorFactory.createVisitor(query, getSearchClusterConfigId(), route, documentType, effectiveTraceLevel);
+ Visitor visitor = visitorFactory.createVisitor(query, getSearchClusterName(), route, documentType, effectiveTraceLevel);
try {
visitor.doSearch();
} catch (ParseException e) {
diff --git a/container-search/src/main/java/com/yahoo/vespa/streamingvisitors/VdsVisitor.java b/container-search/src/main/java/com/yahoo/vespa/streamingvisitors/VdsVisitor.java
index 7d7b78f7153..49fda880b44 100644
--- a/container-search/src/main/java/com/yahoo/vespa/streamingvisitors/VdsVisitor.java
+++ b/container-search/src/main/java/com/yahoo/vespa/streamingvisitors/VdsVisitor.java
@@ -83,55 +83,6 @@ class VdsVisitor extends VisitorDataHandler implements Visitor {
LoadTypeSet getLoadTypeSet();
}
- private static class MessageBusVisitorSessionFactory implements VisitorSessionFactory {
- private static final Object initMonitor = new Object();
- private static final AtomicReference<MessageBusVisitorSessionFactory> instance = new AtomicReference<>();
-
- private final LoadTypeSet loadTypes;
- private final DocumentAccess access;
-
- private MessageBusVisitorSessionFactory() {
- loadTypes = new LoadTypeSet("client");
- access = new MessageBusDocumentAccess(new MessageBusParams(loadTypes));
- }
-
- @Override
- public VisitorSession createVisitorSession(VisitorParameters params) throws ParseException {
- return access.createVisitorSession(params);
- }
-
- @Override
- public LoadTypeSet getLoadTypeSet() {
- return loadTypes;
- }
-
- /**
- * Returns a single, shared instance of this class which is lazily created in a thread-safe
- * manner the first time this method is invoked.
- *
- * May throw any config-related exception if subscription fails.
- */
- static MessageBusVisitorSessionFactory sharedInstance() {
- var ref = instance.getAcquire();
- if (ref != null) {
- return ref;
- }
- synchronized (initMonitor) {
- ref = instance.getAcquire();
- if (ref != null) {
- return ref;
- }
- ref = new MessageBusVisitorSessionFactory();
- instance.setRelease(ref);
- }
- return ref;
- }
- }
-
- public VdsVisitor(Query query, String searchCluster, Route route, String documentType, int traceLevelOverride) {
- this(query, searchCluster, route, documentType, MessageBusVisitorSessionFactory.sharedInstance(), traceLevelOverride);
- }
-
public VdsVisitor(Query query, String searchCluster, Route route,
String documentType, VisitorSessionFactory visitorSessionFactory,
int traceLevelOverride)
@@ -264,9 +215,8 @@ class VdsVisitor extends VisitorDataHandler implements Visitor {
static int getQueryFlags(Query query) {
int flags = 0;
- boolean requestCoverage=true; // Always request coverage information
+ boolean requestCoverage = true; // Always request coverage information
- flags |= 0; // was collapse
flags |= query.properties().getBoolean(Model.ESTIMATE) ? 0x00000080 : 0;
flags |= (query.getRanking().getFreshness() != null) ? 0x00002000 : 0;
flags |= requestCoverage ? 0x00008000 : 0;
@@ -344,9 +294,7 @@ class VdsVisitor extends VisitorDataHandler implements Visitor {
}
if (params.getControlHandler().getResult().code == VisitorControlHandler.CompletionCode.SUCCESS) {
- if (log.isLoggable(Level.FINE)) {
- log.log(Level.FINE, "VdsVisitor completed successfully for " + query + " with selection " + params.getDocumentSelection());
- }
+ log.log(Level.FINE, () -> "VdsVisitor completed successfully for " + query + " with selection " + params.getDocumentSelection());
} else {
throw new IllegalArgumentException("Query failed: " +
params.getControlHandler().getResult().code + ": " +
diff --git a/container-search/src/test/java/com/yahoo/prelude/cluster/ClusterSearcherTestCase.java b/container-search/src/test/java/com/yahoo/prelude/cluster/ClusterSearcherTestCase.java
index f4608f1c991..ca98b6a1a77 100644
--- a/container-search/src/test/java/com/yahoo/prelude/cluster/ClusterSearcherTestCase.java
+++ b/container-search/src/test/java/com/yahoo/prelude/cluster/ClusterSearcherTestCase.java
@@ -530,7 +530,8 @@ public class ClusterSearcherTestCase {
documentDbConfig.build(),
dispatchers,
new QrConfig.Builder().build(),
- vipStatus);
+ vipStatus,
+ null);
}
private static ClusterInfoConfig createClusterInfoConfig() {
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/MessagePropertyProcessor.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/MessagePropertyProcessor.java
index 0a070c9fe35..113f82e1566 100644
--- a/vespaclient-core/src/main/java/com/yahoo/feedapi/MessagePropertyProcessor.java
+++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/MessagePropertyProcessor.java
@@ -4,17 +4,16 @@ package com.yahoo.feedapi;
import com.yahoo.concurrent.SystemTimer;
import com.yahoo.config.subscription.ConfigSubscriber;
import com.yahoo.container.jdisc.HttpRequest;
-import com.yahoo.vespa.config.content.LoadTypeConfig;
-import com.yahoo.documentapi.VisitorParameters;
import com.yahoo.documentapi.messagebus.loadtypes.LoadType;
import com.yahoo.documentapi.messagebus.loadtypes.LoadTypeSet;
import com.yahoo.documentapi.messagebus.protocol.DocumentMessage;
import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
-import java.util.logging.Level;
import com.yahoo.messagebus.Message;
import com.yahoo.messagebus.routing.Route;
+import com.yahoo.vespa.config.content.LoadTypeConfig;
import com.yahoo.vespaclient.config.FeederConfig;
+import java.util.logging.Level;
import java.util.logging.Logger;
/**
@@ -247,22 +246,6 @@ public class MessagePropertyProcessor implements ConfigSubscriber.SingleSubscrib
}
}
- public void process(VisitorParameters params) {
- if (route != null) {
- params.setRoute(route);
- }
- params.setTimeoutMs(timeout);
-
- params.setTraceLevel(Math.max(getFeederOptions().getTraceLevel(), traceLevel));
-
- if (loadType != null) {
- params.setLoadType(loadType);
- params.setPriority(loadType.getPriority());
- }
-
- if (priority != null) {
- params.setPriority(priority);
- }
- }
}
+
}