diff options
4 files changed, 1 insertions, 174 deletions
diff --git a/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingTask.java b/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingTask.java index 4d887616093..d1d5256efe3 100644 --- a/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingTask.java +++ b/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingTask.java @@ -19,14 +19,13 @@ import java.util.Iterator; import java.util.List; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import java.util.logging.Logger; /** * @author Einar M R Rosenvinge */ -public class DocumentProcessingTask implements Comparable<DocumentProcessingTask>, Runnable { +public class DocumentProcessingTask implements Runnable { private static Logger log = Logger.getLogger(DocumentProcessingTask.class.getName()); private final List<Processing> processings = new ArrayList<>(); @@ -35,14 +34,11 @@ public class DocumentProcessingTask implements Comparable<DocumentProcessingTask private final DocumentProcessingHandler docprocHandler; private RequestContext requestContext; - private final static AtomicLong seq = new AtomicLong(); - private final long seqNum; private final DocprocService service; private final ThreadPoolExecutor executor; public DocumentProcessingTask(RequestContext requestContext, DocumentProcessingHandler docprocHandler, DocprocService service, ThreadPoolExecutor executor) { - seqNum = seq.getAndIncrement(); this.requestContext = requestContext; this.docprocHandler = docprocHandler; this.service = service; @@ -180,34 +176,15 @@ public class DocumentProcessingTask implements Comparable<DocumentProcessingTask ". Will be automatically resent."); } - public int compareTo(DocumentProcessingTask other) { - int ourPriority = requestContext.getPriority(); - int otherPriority = other.requestContext.getPriority(); - int res = (ourPriority == otherPriority) ? 0 : ((ourPriority < otherPriority) ? -1 : 1); - if (res == 0) { - res = (seqNum == other.seqNum) ? 0 : ((seqNum < other.seqNum) ? -1 : 1); - } - return res; - } - @Override public String toString() { return "ProcessingTask{" + "processings=" + processings + ", processingsDone=" + processingsDone + ", requestContext=" + requestContext + - ", seqNum=" + seqNum + '}'; } - public int getApproxSize() { - return requestContext.getApproxSize(); - } - - final long getSeqNum() { - return seqNum; - } - private static void logProcessingFailure(Processing processing, Exception exception) { //LOGGING ONLY: String errorMsg = processing + " failed at " + processing.callStack().getLastPopped(); diff --git a/docproc/src/main/java/com/yahoo/docproc/jdisc/RequestContext.java b/docproc/src/main/java/com/yahoo/docproc/jdisc/RequestContext.java index 8f021546ac8..48b58c12456 100644 --- a/docproc/src/main/java/com/yahoo/docproc/jdisc/RequestContext.java +++ b/docproc/src/main/java/com/yahoo/docproc/jdisc/RequestContext.java @@ -21,10 +21,6 @@ public interface RequestContext { boolean isProcessable(); - int getApproxSize(); - - int getPriority(); - void processingDone(List<Processing> processing); void processingFailed(ErrorCode error, String msg); diff --git a/docproc/src/main/java/com/yahoo/docproc/jdisc/messagebus/MbusRequestContext.java b/docproc/src/main/java/com/yahoo/docproc/jdisc/messagebus/MbusRequestContext.java index 187557791f3..2e63e2defff 100644 --- a/docproc/src/main/java/com/yahoo/docproc/jdisc/messagebus/MbusRequestContext.java +++ b/docproc/src/main/java/com/yahoo/docproc/jdisc/messagebus/MbusRequestContext.java @@ -47,7 +47,6 @@ public class MbusRequestContext implements RequestContext, ResponseHandler { private final MbusRequest request; private final DocumentMessage requestMsg; private final ResponseHandler responseHandler; - private volatile int cachedApproxSize; // When spawning off new documents inside document processor, we do not want // throttling since this can lead to live locks. This is because the // document being processed is a resource and is then grabbing more resources of @@ -137,20 +136,6 @@ public class MbusRequestContext implements RequestContext, ResponseHandler { } @Override - public int getApproxSize() { - if (cachedApproxSize > 0) { - return cachedApproxSize; - } - cachedApproxSize = requestMsg.getApproxSize(); - return cachedApproxSize; - } - - @Override - public int getPriority() { - return requestMsg.getPriority().getValue(); - } - - @Override public URI getUri() { return request.getUri(); } diff --git a/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingTaskPrioritizationTestCase.java b/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingTaskPrioritizationTestCase.java deleted file mode 100644 index f050b6c5450..00000000000 --- a/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingTaskPrioritizationTestCase.java +++ /dev/null @@ -1,131 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.docproc.jdisc; - -import com.yahoo.docproc.Processing; -import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; -import org.junit.Test; - -import java.net.URI; -import java.util.List; -import java.util.Queue; -import java.util.concurrent.PriorityBlockingQueue; - -import static org.hamcrest.CoreMatchers.nullValue; -import static org.hamcrest.CoreMatchers.sameInstance; -import static org.junit.Assert.assertThat; - -/** - * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a> - */ -public class DocumentProcessingTaskPrioritizationTestCase { - - @Test - public void proritization() { - Queue<DocumentProcessingTask> queue = new PriorityBlockingQueue<>(); - - DocumentProcessingTask highest = new TestDocumentProcessingTask(DocumentProtocol.Priority.HIGHEST); - DocumentProcessingTask veryhigh = new TestDocumentProcessingTask(DocumentProtocol.Priority.VERY_HIGH); - DocumentProcessingTask high1 = new TestDocumentProcessingTask(DocumentProtocol.Priority.HIGH_1); - DocumentProcessingTask normal_1 = new TestDocumentProcessingTask(DocumentProtocol.Priority.NORMAL_1); - DocumentProcessingTask low_1 = new TestDocumentProcessingTask(DocumentProtocol.Priority.LOW_1); - DocumentProcessingTask verylow = new TestDocumentProcessingTask(DocumentProtocol.Priority.VERY_LOW); - DocumentProcessingTask lowest = new TestDocumentProcessingTask(DocumentProtocol.Priority.LOWEST); - - DocumentProcessingTask normal_2 = new TestDocumentProcessingTask(DocumentProtocol.Priority.NORMAL_1); - DocumentProcessingTask normal_3 = new TestDocumentProcessingTask(DocumentProtocol.Priority.NORMAL_1); - DocumentProcessingTask normal_4 = new TestDocumentProcessingTask(DocumentProtocol.Priority.NORMAL_1); - - DocumentProcessingTask highest_2 = new TestDocumentProcessingTask(DocumentProtocol.Priority.HIGHEST); - DocumentProcessingTask highest_3 = new TestDocumentProcessingTask(DocumentProtocol.Priority.HIGHEST); - - - queue.add(highest); - queue.add(veryhigh); - queue.add(high1); - queue.add(normal_1); - queue.add(low_1); - queue.add(verylow); - queue.add(lowest); - - queue.add(normal_2); - queue.add(normal_3); - queue.add(normal_4); - - queue.add(highest_2); - queue.add(highest_3); - - assertThat(queue.poll(), sameInstance(highest)); - assertThat(queue.poll(), sameInstance(highest_2)); - assertThat(queue.poll(), sameInstance(highest_3)); - assertThat(queue.poll(), sameInstance(veryhigh)); - assertThat(queue.poll(), sameInstance(high1)); - assertThat(queue.poll(), sameInstance(normal_1)); - assertThat(queue.poll(), sameInstance(normal_2)); - assertThat(queue.poll(), sameInstance(normal_3)); - assertThat(queue.poll(), sameInstance(normal_4)); - assertThat(queue.poll(), sameInstance(low_1)); - assertThat(queue.poll(), sameInstance(verylow)); - assertThat(queue.poll(), sameInstance(lowest)); - assertThat(queue.poll(), nullValue()); - } - - private class TestDocumentProcessingTask extends DocumentProcessingTask { - private TestDocumentProcessingTask(DocumentProtocol.Priority priority) { - super(new TestRequestContext(priority), null, null, null); - } - } - - private class TestRequestContext implements RequestContext { - private final DocumentProtocol.Priority priority; - - public TestRequestContext(DocumentProtocol.Priority priority) { - this.priority = priority; - } - - @Override - public List<Processing> getProcessings() { - return null; - } - - @Override - public void skip() { - } - - @Override - public void processingDone(List<Processing> processing) { - } - - @Override - public void processingFailed(ErrorCode error, String msg) { - } - - @Override - public void processingFailed(Exception exception) { - } - - @Override - public int getApproxSize() { - return 0; - } - - @Override - public int getPriority() { - return priority.getValue(); - } - - @Override - public boolean isProcessable() { - return true; - } - - @Override - public URI getUri() { - return null; - } - - @Override - public String getServiceName() { - return null; - } - } -} |