diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2020-09-28 19:01:12 +0200 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2020-09-28 19:01:12 +0200 |
commit | 29f45f8cfdb15e87d3b0dfe312fdcdc39e84cf41 (patch) | |
tree | 8f6a866482dbdf40e3afb9c381fe2bb57e062c1c /vespaclient-container-plugin | |
parent | 7fac15f41005fd65e44a5699f67c1a576d8eb3e7 (diff) |
Tests and the fixes they entailed
Diffstat (limited to 'vespaclient-container-plugin')
-rw-r--r-- | vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java | 36 |
1 files changed, 22 insertions, 14 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java index 36e92c776a1..b675af3b564 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java @@ -30,6 +30,7 @@ import com.yahoo.yolean.Exceptions; import java.time.Clock; import java.time.Duration; import java.time.Instant; +import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -37,6 +38,7 @@ import java.util.Optional; import java.util.StringJoiner; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -115,7 +117,7 @@ public class DocumentOperationExecutor { visits.values().forEach(VisitorSession::destroy); try { throttleShutdown.get(Math.max(0, shutdownMillis - clock.millis()), TimeUnit.MILLISECONDS); - throttleShutdown.get(Math.max(0, shutdownMillis - clock.millis()), TimeUnit.MILLISECONDS); + timeoutShutdown.get(Math.max(0, shutdownMillis - clock.millis()), TimeUnit.MILLISECONDS); } catch (Exception e) { log.log(WARNING, "Exception shutting down " + getClass().getName(), e); @@ -140,6 +142,7 @@ public class DocumentOperationExecutor { public void visit(VisitorOptions options, VisitOperationsContext context) { try { + AtomicBoolean done = new AtomicBoolean(false); VisitorParameters parameters = options.asParameters(clusters, visitTimeout); parameters.setLocalDataHandler(new DumpVisitorDataHandler() { @Override public void onDocument(Document doc, long timeStamp) { context.document(doc); } @@ -161,10 +164,13 @@ public class DocumentOperationExecutor { default: context.error(ERROR, message != null ? message : "Visiting failed"); } - visits.remove(this).destroy(); + done.set(true); // This may be reached before dispatching thread is done putting us in the map. + visits.computeIfPresent(this, (__, session) -> { session.destroy(); return null; }); } }); visits.put(parameters.getControlHandler(), access.createVisitorSession(parameters)); + if (done.get()) + visits.computeIfPresent(parameters.getControlHandler(), (__, session) -> { session.destroy(); return null; }); } catch (IllegalArgumentException | ParseException e) { context.error(BAD_REQUEST, Exceptions.toMessageString(e)); @@ -230,7 +236,7 @@ public class DocumentOperationExecutor { private final Optional<Integer> wantedDocumentCount; private final Optional<Integer> concurrency; - private VisitorOptions(Optional<String> cluster, Optional<String> namespace, Optional<String> documentType, + private VisitorOptions(Optional<String> cluster, Optional<String> documentType, Optional<String> namespace, Optional<Group> group, Optional<String> selection, Optional<String> fieldSet, Optional<String> continuation,Optional<String> bucketSpace, Optional<Integer> wantedDocumentCount, Optional<Integer> concurrency) { @@ -247,12 +253,12 @@ public class DocumentOperationExecutor { } private VisitorParameters asParameters(Map<String, StorageCluster> clusters, Duration visitTimeout) { - if (cluster.isEmpty() && namespace.isEmpty()) + if (cluster.isEmpty() && documentType.isEmpty()) throw new IllegalArgumentException("Must set 'cluster' parameter to a valid content cluster id when visiting at a root /document/v1/ level"); VisitorParameters parameters = new VisitorParameters(Stream.of(selection, documentType, - namespace.map("id.namespace=="::concat), + namespace.map(value -> "id.namespace=='" + value + "'"), group.map(Group::selection)) .flatMap(Optional::stream) .reduce(new StringJoiner(") and (", "(", ")").setEmptyValue(""), // don't mind the lonely chicken to the right @@ -373,8 +379,8 @@ public class DocumentOperationExecutor { this.selection = selection; } - public static Group of(long value) { return new Group(Long.toString(value), "n=" + value, "id.user=" + value); } - public static Group of(String value) { return new Group(value, "g=" + value, "id.group='" + value.replaceAll("'", "\\'") + "'"); } + public static Group of(long value) { return new Group(Long.toString(value), "n=" + value, "id.user==" + value); } + public static Group of(String value) { return new Group(value, "g=" + value, "id.group=='" + value.replaceAll("'", "\\'") + "'"); } public String value() { return value; } public String docIdPart() { return docIdPart; } @@ -581,13 +587,14 @@ public class DocumentOperationExecutor { // Not yet ready for action: keep time to wake up again. waitUntil = waitUntil != null ? waitUntil : delayed.readyAt(); } - long waitUntilMillis = waitUntil != null ? waitUntil.toEpochMilli() : clock.millis() + defaultWaitMillis; - while (clock.millis() < waitUntilMillis) - synchronized (this) { + synchronized (this) { + do { notify(); wait(Math.max(0, waitUntilMillis - clock.millis())); } + while (clock.millis() < waitUntilMillis); + } } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -645,14 +652,14 @@ public class DocumentOperationExecutor { return wanted.map(cluster -> { if ( ! clusters.containsKey(cluster)) - throw new IllegalArgumentException("Your Vespa deployment has no content cluster '" + cluster + "', only " + - String.join(", ", clusters.keySet())); + throw new IllegalArgumentException("Your Vespa deployment has no content cluster '" + cluster + "', only '" + + String.join("', '", clusters.keySet()) + "'"); return clusters.get(cluster); }).orElseGet(() -> { if (clusters.size() > 1) - throw new IllegalArgumentException("Please specify one of the content clusters in your Vespa deployment: " + - String.join(", ", clusters.keySet())); + throw new IllegalArgumentException("Please specify one of the content clusters in your Vespa deployment: '" + + String.join("', '", clusters.keySet()) + "'"); return clusters.values().iterator().next(); }); @@ -688,6 +695,7 @@ public class DocumentOperationExecutor { // Visible for testing. AsyncSession asyncSession() { return asyncSession; } + Collection<VisitorControlHandler> visitorSessions() { return visits.keySet(); } void notifyMaintainers() throws InterruptedException { synchronized (throttled) { throttled.notify(); throttled.wait(); } synchronized (timeouts) { timeouts.notify(); timeouts.wait(); } |