aboutsummaryrefslogtreecommitdiffstats
path: root/vespaclient-container-plugin
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2020-09-28 19:01:12 +0200
committerJon Marius Venstad <venstad@gmail.com>2020-09-28 19:01:12 +0200
commit29f45f8cfdb15e87d3b0dfe312fdcdc39e84cf41 (patch)
tree8f6a866482dbdf40e3afb9c381fe2bb57e062c1c /vespaclient-container-plugin
parent7fac15f41005fd65e44a5699f67c1a576d8eb3e7 (diff)
Tests and the fixes they entailed
Diffstat (limited to 'vespaclient-container-plugin')
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java36
1 files changed, 22 insertions, 14 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java
index 36e92c776a1..b675af3b564 100644
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java
@@ -30,6 +30,7 @@ import com.yahoo.yolean.Exceptions;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
+import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -37,6 +38,7 @@ import java.util.Optional;
import java.util.StringJoiner;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -115,7 +117,7 @@ public class DocumentOperationExecutor {
visits.values().forEach(VisitorSession::destroy);
try {
throttleShutdown.get(Math.max(0, shutdownMillis - clock.millis()), TimeUnit.MILLISECONDS);
- throttleShutdown.get(Math.max(0, shutdownMillis - clock.millis()), TimeUnit.MILLISECONDS);
+ timeoutShutdown.get(Math.max(0, shutdownMillis - clock.millis()), TimeUnit.MILLISECONDS);
}
catch (Exception e) {
log.log(WARNING, "Exception shutting down " + getClass().getName(), e);
@@ -140,6 +142,7 @@ public class DocumentOperationExecutor {
public void visit(VisitorOptions options, VisitOperationsContext context) {
try {
+ AtomicBoolean done = new AtomicBoolean(false);
VisitorParameters parameters = options.asParameters(clusters, visitTimeout);
parameters.setLocalDataHandler(new DumpVisitorDataHandler() {
@Override public void onDocument(Document doc, long timeStamp) { context.document(doc); }
@@ -161,10 +164,13 @@ public class DocumentOperationExecutor {
default:
context.error(ERROR, message != null ? message : "Visiting failed");
}
- visits.remove(this).destroy();
+ done.set(true); // This may be reached before dispatching thread is done putting us in the map.
+ visits.computeIfPresent(this, (__, session) -> { session.destroy(); return null; });
}
});
visits.put(parameters.getControlHandler(), access.createVisitorSession(parameters));
+ if (done.get())
+ visits.computeIfPresent(parameters.getControlHandler(), (__, session) -> { session.destroy(); return null; });
}
catch (IllegalArgumentException | ParseException e) {
context.error(BAD_REQUEST, Exceptions.toMessageString(e));
@@ -230,7 +236,7 @@ public class DocumentOperationExecutor {
private final Optional<Integer> wantedDocumentCount;
private final Optional<Integer> concurrency;
- private VisitorOptions(Optional<String> cluster, Optional<String> namespace, Optional<String> documentType,
+ private VisitorOptions(Optional<String> cluster, Optional<String> documentType, Optional<String> namespace,
Optional<Group> group, Optional<String> selection, Optional<String> fieldSet,
Optional<String> continuation,Optional<String> bucketSpace,
Optional<Integer> wantedDocumentCount, Optional<Integer> concurrency) {
@@ -247,12 +253,12 @@ public class DocumentOperationExecutor {
}
private VisitorParameters asParameters(Map<String, StorageCluster> clusters, Duration visitTimeout) {
- if (cluster.isEmpty() && namespace.isEmpty())
+ if (cluster.isEmpty() && documentType.isEmpty())
throw new IllegalArgumentException("Must set 'cluster' parameter to a valid content cluster id when visiting at a root /document/v1/ level");
VisitorParameters parameters = new VisitorParameters(Stream.of(selection,
documentType,
- namespace.map("id.namespace=="::concat),
+ namespace.map(value -> "id.namespace=='" + value + "'"),
group.map(Group::selection))
.flatMap(Optional::stream)
.reduce(new StringJoiner(") and (", "(", ")").setEmptyValue(""), // don't mind the lonely chicken to the right
@@ -373,8 +379,8 @@ public class DocumentOperationExecutor {
this.selection = selection;
}
- public static Group of(long value) { return new Group(Long.toString(value), "n=" + value, "id.user=" + value); }
- public static Group of(String value) { return new Group(value, "g=" + value, "id.group='" + value.replaceAll("'", "\\'") + "'"); }
+ public static Group of(long value) { return new Group(Long.toString(value), "n=" + value, "id.user==" + value); }
+ public static Group of(String value) { return new Group(value, "g=" + value, "id.group=='" + value.replaceAll("'", "\\'") + "'"); }
public String value() { return value; }
public String docIdPart() { return docIdPart; }
@@ -581,13 +587,14 @@ public class DocumentOperationExecutor {
// Not yet ready for action: keep time to wake up again.
waitUntil = waitUntil != null ? waitUntil : delayed.readyAt();
}
-
long waitUntilMillis = waitUntil != null ? waitUntil.toEpochMilli() : clock.millis() + defaultWaitMillis;
- while (clock.millis() < waitUntilMillis)
- synchronized (this) {
+ synchronized (this) {
+ do {
notify();
wait(Math.max(0, waitUntilMillis - clock.millis()));
}
+ while (clock.millis() < waitUntilMillis);
+ }
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -645,14 +652,14 @@ public class DocumentOperationExecutor {
return wanted.map(cluster -> {
if ( ! clusters.containsKey(cluster))
- throw new IllegalArgumentException("Your Vespa deployment has no content cluster '" + cluster + "', only " +
- String.join(", ", clusters.keySet()));
+ throw new IllegalArgumentException("Your Vespa deployment has no content cluster '" + cluster + "', only '" +
+ String.join("', '", clusters.keySet()) + "'");
return clusters.get(cluster);
}).orElseGet(() -> {
if (clusters.size() > 1)
- throw new IllegalArgumentException("Please specify one of the content clusters in your Vespa deployment: " +
- String.join(", ", clusters.keySet()));
+ throw new IllegalArgumentException("Please specify one of the content clusters in your Vespa deployment: '" +
+ String.join("', '", clusters.keySet()) + "'");
return clusters.values().iterator().next();
});
@@ -688,6 +695,7 @@ public class DocumentOperationExecutor {
// Visible for testing.
AsyncSession asyncSession() { return asyncSession; }
+ Collection<VisitorControlHandler> visitorSessions() { return visits.keySet(); }
void notifyMaintainers() throws InterruptedException {
synchronized (throttled) { throttled.notify(); throttled.wait(); }
synchronized (timeouts) { timeouts.notify(); timeouts.wait(); }