From 651306b9172bf91ed278fb717f2ab78f0e331022 Mon Sep 17 00:00:00 2001 From: gjoranv Date: Tue, 7 Jun 2022 12:33:32 +0200 Subject: Revoke deprecated implementation classes in c.y.docproc from public --- docproc/abi-spec.json | 112 +----- docproc/src/main/java/com/yahoo/docproc/Call.java | 2 +- .../java/com/yahoo/docproc/DocprocExecutor.java | 191 ---------- .../java/com/yahoo/docproc/DocprocService.java | 404 --------------------- .../yahoo/docproc/DocumentOperationWrapper.java | 15 - .../java/com/yahoo/docproc/DocumentProcessor.java | 2 +- .../yahoo/docproc/HandledProcessingException.java | 18 - .../main/java/com/yahoo/docproc/Processing.java | 85 +---- .../java/com/yahoo/docproc/ProcessingEndpoint.java | 15 - .../yahoo/docproc/TransientFailureException.java | 18 - .../com/yahoo/docproc/impl/DocprocExecutor.java | 193 ++++++++++ .../com/yahoo/docproc/impl/DocprocService.java | 404 +++++++++++++++++++++ .../docproc/impl/DocumentOperationWrapper.java | 13 + .../docproc/impl/HandledProcessingException.java | 16 + .../com/yahoo/docproc/impl/ProcessingAccess.java | 31 ++ .../com/yahoo/docproc/impl/ProcessingEndpoint.java | 15 + .../docproc/impl/TransientFailureException.java | 16 + .../java/com/yahoo/docproc/impl/package-info.java | 5 + .../docproc/jdisc/DocumentProcessingHandler.java | 10 +- .../docproc/jdisc/DocumentProcessingTask.java | 12 +- .../jdisc/messagebus/MbusRequestContext.java | 10 +- .../jdisc/messagebus/ProcessingFactory.java | 12 +- .../com/yahoo/docproc/proxy/ProxyDocument.java | 4 +- .../yahoo/docproc/proxy/ProxyDocumentUpdate.java | 3 +- .../java/com/yahoo/docproc/CallbackTestCase.java | 2 + .../DocumentProcessingAbstractTestCase.java | 1 + .../com/yahoo/docproc/EmptyProcessingTestCase.java | 2 +- .../docproc/FailingDocumentProcessingTestCase.java | 2 + ...DocumentProcessingWithoutExceptionTestCase.java | 1 + ...ilingPermanentlyDocumentProcessingTestCase.java | 2 +- .../yahoo/docproc/FailingWithErrorTestCase.java | 3 +- .../NotAcceptingNewProcessingsTestCase.java | 1 + .../yahoo/docproc/ProcessingUpdateTestCase.java | 1 + .../docproc/SimpleDocumentProcessingTestCase.java | 1 + .../docproc/SimpleDocumentProcessorTestCase.java | 1 + .../yahoo/docproc/TransientFailureTestCase.java | 3 + .../jdisc/DocumentProcessingHandlerTestBase.java | 5 +- 37 files changed, 743 insertions(+), 888 deletions(-) delete mode 100644 docproc/src/main/java/com/yahoo/docproc/DocprocExecutor.java delete mode 100644 docproc/src/main/java/com/yahoo/docproc/DocprocService.java delete mode 100644 docproc/src/main/java/com/yahoo/docproc/DocumentOperationWrapper.java delete mode 100644 docproc/src/main/java/com/yahoo/docproc/HandledProcessingException.java delete mode 100644 docproc/src/main/java/com/yahoo/docproc/ProcessingEndpoint.java delete mode 100644 docproc/src/main/java/com/yahoo/docproc/TransientFailureException.java create mode 100644 docproc/src/main/java/com/yahoo/docproc/impl/DocprocExecutor.java create mode 100644 docproc/src/main/java/com/yahoo/docproc/impl/DocprocService.java create mode 100644 docproc/src/main/java/com/yahoo/docproc/impl/DocumentOperationWrapper.java create mode 100644 docproc/src/main/java/com/yahoo/docproc/impl/HandledProcessingException.java create mode 100644 docproc/src/main/java/com/yahoo/docproc/impl/ProcessingAccess.java create mode 100644 docproc/src/main/java/com/yahoo/docproc/impl/ProcessingEndpoint.java create mode 100644 docproc/src/main/java/com/yahoo/docproc/impl/TransientFailureException.java create mode 100644 docproc/src/main/java/com/yahoo/docproc/impl/package-info.java (limited to 'docproc') diff --git a/docproc/abi-spec.json b/docproc/abi-spec.json index f90d31023d3..41c76cc3a17 100644 --- a/docproc/abi-spec.json +++ b/docproc/abi-spec.json @@ -132,72 +132,6 @@ ], "fields": [] }, - "com.yahoo.docproc.DocprocExecutor": { - "superClass": "java.lang.Object", - "interfaces": [], - "attributes": [ - "public" - ], - "methods": [ - "public void (java.lang.String, com.yahoo.docproc.CallStack)", - "public void (com.yahoo.docproc.DocprocExecutor, com.yahoo.docproc.CallStack)", - "public com.yahoo.docproc.CallStack getCallStack()", - "public java.lang.String getName()", - "public com.yahoo.docproc.DocumentProcessor$Progress process(com.yahoo.docproc.Processing)", - "public com.yahoo.docproc.DocumentProcessor$Progress processUntilDone(com.yahoo.docproc.Processing)" - ], - "fields": [] - }, - "com.yahoo.docproc.DocprocService": { - "superClass": "com.yahoo.component.AbstractComponent", - "interfaces": [], - "attributes": [ - "public" - ], - "methods": [ - "public void (com.yahoo.component.ComponentId)", - "public void (com.yahoo.component.ComponentId, com.yahoo.docproc.CallStack, com.yahoo.document.DocumentTypeManager, int)", - "public void (com.yahoo.component.ComponentId, com.yahoo.docproc.CallStack, com.yahoo.document.DocumentTypeManager)", - "public void (java.lang.String)", - "public void deconstruct()", - "public com.yahoo.document.DocumentTypeManager getDocumentTypeManager()", - "public void setDocumentTypeManager(com.yahoo.document.DocumentTypeManager)", - "public int getQueueSize()", - "public com.yahoo.docproc.DocprocExecutor getExecutor()", - "public java.util.concurrent.ThreadPoolExecutor getThreadPoolExecutor()", - "public void setInService(boolean)", - "public boolean isInService()", - "public boolean isAcceptingNewProcessings()", - "public void setAcceptingNewProcessings(boolean)", - "public java.lang.String getName()", - "public com.yahoo.docproc.CallStack getCallStack()", - "public void setCallStack(com.yahoo.docproc.CallStack)", - "public void process(com.yahoo.docproc.Processing, com.yahoo.docproc.ProcessingEndpoint)", - "public void process(com.yahoo.docproc.Processing)", - "public void process(com.yahoo.document.DocumentOperation, com.yahoo.docproc.ProcessingEndpoint)", - "public void process(com.yahoo.document.DocumentOperation)", - "public void processDocumentOperations(java.util.List, com.yahoo.docproc.ProcessingEndpoint)", - "public void processDocumentOperations(java.util.List)", - "public boolean doWork()", - "public boolean doWorkBlocking()" - ], - "fields": [ - "public static com.yahoo.docproc.proxy.SchemaMap schemaMap" - ] - }, - "com.yahoo.docproc.DocumentOperationWrapper": { - "superClass": "java.lang.Object", - "interfaces": [], - "attributes": [ - "public", - "interface", - "abstract" - ], - "methods": [ - "public abstract com.yahoo.document.DocumentOperation getWrappedDocumentOperation()" - ], - "fields": [] - }, "com.yahoo.docproc.DocumentProcessor$LaterProgress": { "superClass": "com.yahoo.docproc.DocumentProcessor$Progress", "interfaces": [], @@ -252,19 +186,8 @@ ], "fields": [] }, - "com.yahoo.docproc.HandledProcessingException": { - "superClass": "java.lang.RuntimeException", - "interfaces": [], - "attributes": [ - "public" - ], - "methods": [ - "public void (java.lang.String)" - ], - "fields": [] - }, "com.yahoo.docproc.Processing": { - "superClass": "java.lang.Object", + "superClass": "com.yahoo.docproc.impl.ProcessingAccess", "interfaces": [], "attributes": [ "public" @@ -273,39 +196,27 @@ "public void ()", "public static com.yahoo.docproc.Processing of(com.yahoo.document.DocumentOperation)", "public void (java.lang.String, com.yahoo.document.DocumentOperation, com.yahoo.docproc.CallStack)", + "protected void (java.lang.String, java.util.List, com.yahoo.docproc.CallStack, com.yahoo.docproc.impl.ProcessingEndpoint, boolean)", "public static com.yahoo.docproc.Processing createProcessingFromDocumentOperations(java.lang.String, java.util.List, com.yahoo.docproc.CallStack)", - "public com.yahoo.component.provider.ComponentRegistry getDocprocServiceRegistry()", - "public void setDocprocServiceRegistry(com.yahoo.component.provider.ComponentRegistry)", "public java.lang.String getServiceName()", "public void setServiceName(java.lang.String)", - "public com.yahoo.docproc.DocprocService getService()", "public java.lang.Object getVariable(java.lang.String)", "public java.util.Iterator getVariableAndNameIterator()", "public void clearVariables()", "public void setVariable(java.lang.String, java.lang.Object)", "public java.lang.Object removeVariable(java.lang.String)", "public boolean hasVariable(java.lang.String)", + "protected com.yahoo.docproc.impl.ProcessingEndpoint getEndpoint()", + "protected void setEndpoint(com.yahoo.docproc.impl.ProcessingEndpoint)", "public void addDocumentOperation(com.yahoo.document.DocumentOperation)", "public java.util.List getDocumentOperations()", "public com.yahoo.docproc.CallStack callStack()", + "protected void setCallStack(com.yahoo.docproc.CallStack)", + "protected java.util.List getOnceOperationsToBeProcessed()", "public java.lang.String toString()" ], "fields": [] }, - "com.yahoo.docproc.ProcessingEndpoint": { - "superClass": "java.lang.Object", - "interfaces": [], - "attributes": [ - "public", - "interface", - "abstract" - ], - "methods": [ - "public abstract void processingDone(com.yahoo.docproc.Processing)", - "public abstract void processingFailed(com.yahoo.docproc.Processing, java.lang.Exception)" - ], - "fields": [] - }, "com.yahoo.docproc.SimpleDocumentProcessor": { "superClass": "com.yahoo.docproc.DocumentProcessor", "interfaces": [], @@ -320,16 +231,5 @@ "public final com.yahoo.docproc.DocumentProcessor$Progress process(com.yahoo.docproc.Processing)" ], "fields": [] - }, - "com.yahoo.docproc.TransientFailureException": { - "superClass": "java.lang.RuntimeException", - "interfaces": [], - "attributes": [ - "public" - ], - "methods": [ - "public void (java.lang.String)" - ], - "fields": [] } } \ No newline at end of file diff --git a/docproc/src/main/java/com/yahoo/docproc/Call.java b/docproc/src/main/java/com/yahoo/docproc/Call.java index 75670badf91..440935ef494 100644 --- a/docproc/src/main/java/com/yahoo/docproc/Call.java +++ b/docproc/src/main/java/com/yahoo/docproc/Call.java @@ -2,6 +2,7 @@ package com.yahoo.docproc; import com.yahoo.component.ComponentId; +import com.yahoo.docproc.impl.DocumentOperationWrapper; import com.yahoo.docproc.jdisc.metric.NullMetric; import com.yahoo.docproc.proxy.ProxyDocument; import com.yahoo.docproc.proxy.ProxyDocumentUpdate; @@ -122,7 +123,6 @@ public class Call implements Cloneable { } - @SuppressWarnings("removal") // TODO Vespa 8: remove private void unwrapSchemaMapping(Processing processing) { final List documentOperations = processing.getDocumentOperations(); diff --git a/docproc/src/main/java/com/yahoo/docproc/DocprocExecutor.java b/docproc/src/main/java/com/yahoo/docproc/DocprocExecutor.java deleted file mode 100644 index 0e7a217efbe..00000000000 --- a/docproc/src/main/java/com/yahoo/docproc/DocprocExecutor.java +++ /dev/null @@ -1,191 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.docproc; - -import com.yahoo.document.DocumentOperation; -import com.yahoo.document.DocumentPut; -import com.yahoo.document.json.JsonWriter; -import com.yahoo.jdisc.Metric; -import com.yahoo.text.Utf8; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.function.Function; -import java.util.logging.Level; -import java.util.logging.Logger; - -import static java.util.stream.Collectors.counting; -import static java.util.stream.Collectors.groupingBy; - -/** - * An executor executed incoming processings on its CallStack - * - * @author Einar M R Rosenvinge - * @deprecated Will be removed in Vespa 8. Only for internal use. - */ -@Deprecated(forRemoval = true, since = "7") -public class DocprocExecutor { - - private final static String METRIC_NAME_DOCUMENTS_PROCESSED = "documents_processed"; - - private static final Logger log = Logger.getLogger(DocprocExecutor.class.getName()); - - private final String name; - private final String docCounterName; - private final Metric metric; - private final Function contexts; - private final CallStack callStack; - - /** - * Creates a new named DocprocExecutor with the given CallStack. - * - * @param name the name of this executor - * @param callStack the chain of document processors this executor shall execute on processings - */ - public DocprocExecutor(String name, CallStack callStack) { - this.name = name; - String chainDimension = name != null ? name.replaceAll("[^\\p{Alnum}]", "_") : name; - docCounterName = "chain_" + chainDimension + "_documents"; - this.metric = callStack.getMetric(); - this.callStack = callStack; - this.callStack.setName(name); - this.contexts = cachedContexts(chainDimension); - } - - /** - * Creates a new named DocprocExecutor, with the same instance variables as the given executor, - * but a new call stack. - * - * @param oldExecutor the executor to inherit the instance variables from, sans call stack. - * @param callStack the call stack to use. - */ - public DocprocExecutor(DocprocExecutor oldExecutor, CallStack callStack) { - this.name = oldExecutor.name; - this.docCounterName = oldExecutor.docCounterName; - this.metric = oldExecutor.metric; - this.contexts = oldExecutor.contexts; - this.callStack = callStack; - } - - public CallStack getCallStack() { - return callStack; - } - - public String getName() { - return name; - } - - private void incrementNumDocsProcessed(Processing processing) { - List operations = processing.getOnceOperationsToBeProcessed(); - if ( ! operations.isEmpty()) { - metric.add(docCounterName, operations.size(), null); - operations.stream() - .collect(groupingBy(operation -> operation.getId().getDocType(), counting())) - .forEach((type, count) -> metric.add(METRIC_NAME_DOCUMENTS_PROCESSED, count, contexts.apply(type))); - } - } - - /** - * Processes a given Processing through the CallStack of this executor. - * - * @param processing the Processing to process. The CallStack of the Processing will be set to a clone of the CallStack of this executor, iff. it is currently null. - * @return a Progress; if this is LATER, the Processing is not done and must be reprocessed later. - * @throws RuntimeException if a document processor throws an exception during processing. - * @see com.yahoo.docproc.Processing - */ - public DocumentProcessor.Progress process(Processing processing) { - processing.setServiceName(getName()); - if (processing.callStack() == null) { - processing.setCallStack(new CallStack(getCallStack())); - } - - DocumentProcessor.Progress progress = DocumentProcessor.Progress.DONE; - //metrics stuff: - //TODO: Note that this is *wrong* in case of Progress.LATER, documents are then counted several times until the Processing is DONE or FAILED. - incrementNumDocsProcessed(processing); - do { - Call call = processing.callStack().pop(); - if (call == null) { - // No more processors - done - return progress; - } - - //might throw exception, which is OK: - progress = call.call(processing); - - if (log.isLoggable(Level.FINEST)) { - logProgress(processing, progress, call); - } - - if (DocumentProcessor.Progress.LATER.equals(progress)) { - processing.callStack().addNext(call); - return progress; - } - } while (DocumentProcessor.Progress.DONE.equals(progress)); - return progress; - } - - private void logProgress(Processing processing, DocumentProcessor.Progress progress, Call call) { - StringBuilder message = new StringBuilder(); - boolean first = true; - message.append(call.getDocumentProcessorId()).append(" of class ") - .append(call.getDocumentProcessor().getClass().getSimpleName()).append(" returned ").append(progress) - .append(" for the documents: ["); - for (DocumentOperation op : processing.getDocumentOperations()) { - if (first) { - first = false; - } else { - message.append(", "); - } - if (op instanceof DocumentPut) { - message.append(Utf8.toString(JsonWriter.toByteArray(((DocumentPut) op).getDocument()))); - } else { - message.append(op.toString()); - } - } - message.append("]"); - log.log(Level.FINEST, message.toString()); - } - - /** - * Processes a given Processing through the CallStack of this executor. Note that if a DocumentProcessor - * returns a LaterProgress for this processing, it will be re-processed (after waiting the specified delay given - * by the LaterProgress), until done or failed. - * - * @param processing the Processing to process. The CallStack of the Processing will be set to a clone of the CallStack of this executor, iff. it is currently null. - * @return a Progress; this is never a LaterProgress. - * @throws RuntimeException if a document processor throws an exception during processing, or this thread is interrupted while waiting. - * @see com.yahoo.docproc.Processing - * @see com.yahoo.docproc.DocumentProcessor.Progress - * @see com.yahoo.docproc.DocumentProcessor.LaterProgress - */ - public DocumentProcessor.Progress processUntilDone(Processing processing) { - DocumentProcessor.Progress progress; - while (true) { - progress = process(processing); - if (!(progress instanceof DocumentProcessor.LaterProgress)) { - break; - } - DocumentProcessor.LaterProgress later = (DocumentProcessor.LaterProgress) progress; - try { - Thread.sleep(later.getDelay()); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } - } - return progress; - } - - private Function cachedContexts(String chainDimension) { - Map contextCache = new ConcurrentHashMap<>(); - return documentType -> contextCache.computeIfAbsent(documentType, type -> { - Map dimensions = new HashMap<>(2); - dimensions.put("chain", chainDimension); - dimensions.put("documenttype", type); - return metric.createContext(dimensions); - }); - } - -} diff --git a/docproc/src/main/java/com/yahoo/docproc/DocprocService.java b/docproc/src/main/java/com/yahoo/docproc/DocprocService.java deleted file mode 100644 index c6956f44f52..00000000000 --- a/docproc/src/main/java/com/yahoo/docproc/DocprocService.java +++ /dev/null @@ -1,404 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.docproc; - -import com.yahoo.component.AbstractComponent; -import com.yahoo.component.ComponentId; -import com.yahoo.concurrent.DaemonThreadFactory; -import com.yahoo.docproc.proxy.SchemaMap; -import com.yahoo.document.DocumentOperation; -import com.yahoo.document.DocumentTypeManager; - -import java.util.List; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - *

The document processing service. - * Use this to set up a document processing chain and to - * process documents using that chain. Note that there may - * be multiple named instances of this service in the same - * runtime. The default service is called "default" and is always present.

- * - *

To create a server which receives documents from the network - * and processes them, have a look at com.yahoo.docproc.server.Server.

- * - *

This class is thread safe.

- * - * @author bratseth - * @deprecated Will be removed in Vespa 8. Only for internal use. - */ -@SuppressWarnings("removal") // TODO Vespa 8: remove -@Deprecated(forRemoval = true, since = "7") -public class DocprocService extends AbstractComponent { - - private static final Logger log = Logger.getLogger(DocprocService.class.getName()); - private volatile DocprocExecutor executor; - - /** The processings currently in progress at this service */ - private final LinkedBlockingQueue queue; - private final ThreadPoolExecutor threadPool; - /** The current state of this service */ - private boolean inService = false; - /** The current state of this service */ - private boolean acceptingNewProcessings = true; - public static SchemaMap schemaMap = new SchemaMap(); - private DocumentTypeManager documentTypeManager = null; - - private DocprocService(ComponentId id, int numThreads) { - super(id); - queue = new LinkedBlockingQueue<>(); - threadPool = new ThreadPoolExecutor(numThreads, - numThreads, - 0, TimeUnit.SECONDS, - new LinkedBlockingQueue<>(), - new DaemonThreadFactory("docproc-" + id.stringValue() + "-")); - } - - public DocprocService(ComponentId id) { - this(id, Runtime.getRuntime().availableProcessors()); - } - - /** - * Creates a new docproc service, which is set to be in service. - * - * @param id the component id of the new service. - * @param stack the call stack to use. - * @param mgr the document type manager to use. - * @param numThreads to have in the thread pool - */ - public DocprocService(ComponentId id, CallStack stack, DocumentTypeManager mgr, int numThreads) { - this(id, numThreads); - setCallStack(stack); - setDocumentTypeManager(mgr); - setInService(true); - } - - @Deprecated - public DocprocService(ComponentId id, CallStack stack, DocumentTypeManager mgr) { - this(id, stack, mgr, Runtime.getRuntime().availableProcessors()); - } - - /** - * Creates a service with a name with an unbounded input queue. If the given name is null or the empty string, - * it will become the name "default". - * Testing only - */ - public DocprocService(String name) { - this(new ComponentId(name, null), 1); - } - - @Override - public void deconstruct() { - threadPool.shutdown(); - } - - public DocumentTypeManager getDocumentTypeManager() { - return documentTypeManager; - } - - public void setDocumentTypeManager(DocumentTypeManager documentTypeManager) { - this.documentTypeManager = documentTypeManager; - } - - public int getQueueSize() { - return queue.size(); - } - - /** - * Returns the DocprocExecutor of this DocprocService. This can be used to - * synchronously process one Processing. - * - * @return the DocprocExecutor of this DocprocService, or null if a CallStack has not yet been set. - */ - public DocprocExecutor getExecutor() { - return executor; - } - - public ThreadPoolExecutor getThreadPoolExecutor() { - return threadPool; - } - - private void setExecutor(DocprocExecutor executor) { - this.executor = executor; - } - - /** - * Sets whether this should currently perform any processing. - * New processings will be accepted also when this is out of service, - * but no processing will happen when it is out of service. - */ - public void setInService(boolean inService) { - this.inService = inService; - } - - /** - * Returns true if this is currently processing incoming processings - * (in service), or false if they are just queued up (out of service). - * By default, this is out of service. - */ - public boolean isInService() { - return inService; - } - - /** - * Returns true if this service currently accepts new incoming processings via process(...). Default is true. - * - * @return true if accepting new incoming processings - */ - public boolean isAcceptingNewProcessings() { - return acceptingNewProcessings; - } - - /** - * Sets whether this service should accept new incoming processings via process(...). - * - * @param acceptingNewProcessings true if service should accept new incoming processings - */ - public void setAcceptingNewProcessings(boolean acceptingNewProcessings) { - this.acceptingNewProcessings = acceptingNewProcessings; - } - - public String getName() { - return getId().stringValue(); - } - - /** - * Returns the processing chain of this service. This stack can not be modified. - * To change the stack, set a new one. - */ - // TODO: Enforce unmodifiability - public CallStack getCallStack() { - DocprocExecutor ex = getExecutor(); - return (ex == null) ? null : ex.getCallStack(); - } - - /** - * Sets a new processing stack for this service. This will be the Prototype - * for the call stacks of individual processings in this service - */ - public void setCallStack(CallStack stack) { - DocprocExecutor ex = ((getExecutor() == null) ? new DocprocExecutor(getName(), stack) : new DocprocExecutor(getExecutor(), stack)); - setExecutor(ex); - } - - /** - * Asynchronously process the given Processing using the processing - * chain of this service, and call the specified ProcessingEndpoint when done. - * - * @throws RuntimeException caused by a QueueFullException if this DocprocService has a bounded input queue and the queue is full - * @throws IllegalStateException if this DocprocService is not accepting new incoming processings - */ - public void process(Processing processing, ProcessingEndpoint endp) { - processing.setServiceName(getName()); - processing.setCallStack(new CallStack(getCallStack())); - processing.setEndpoint(endp); - addProcessing(processing); - } - - /** - * Asynchronously process the given Processing using the processing - * chain of this service - * - * @throws RuntimeException caused by a QueueFullException if this DocprocService has a bounded input queue and the queue is full - * @throws IllegalStateException if this DocprocService is not accepting new incoming processings - */ - public void process(Processing processing) { - process(processing, null); - } - - /** - * Asynchronously process the given document put or document update using the processing - * chain of this service, and call the specified ProcessingEndpoint when done. - * - * @throws RuntimeException caused by a QueueFullException if this DocprocService has a bounded input queue and the queue is full - * @throws IllegalStateException if this DocprocService is not accepting new incoming processings - */ - public void process(DocumentOperation documentOperation, ProcessingEndpoint endp) { - addProcessing(new Processing(getName(), documentOperation, new CallStack(getCallStack()), endp)); - } - - /** - * Asynchronously process the given document operation using the processing - * chain of this service. - * - * @throws RuntimeException caused by a QueueFullException if this DocprocService has a bounded input queue and the queue is full - * @throws IllegalStateException if this DocprocService is not accepting new incoming processings - */ - public void process(DocumentOperation documentOperation) { - process(documentOperation, null); - } - - /** - * Asynchronously process the given document operations as one unit - * using the processing chain of this service, - * and call the specified ProcessingEndpoint when done. - * - * @throws RuntimeException caused by a QueueFullException if this DocprocService has a bounded input queue and the queue is full - * @throws IllegalStateException if this DocprocService is not accepting new incoming processings - */ - public void processDocumentOperations(List documentOperations, ProcessingEndpoint endp) { - addProcessing(Processing.createProcessingFromDocumentOperations(getName(), documentOperations, new CallStack(getCallStack()), endp)); - } - - /** - * Asynchronously process the given document operations as one unit - * using the processing chain of this service. - * - * @throws RuntimeException caused by a QueueFullException if this DocprocService has a bounded input queue and the queue is full - * @throws IllegalStateException if this DocprocService is not accepting new incoming processings - */ - public void processDocumentOperations(List documentOperations) { - processDocumentOperations(documentOperations, null); - } - - private void addProcessing(Processing processing) { - if ( ! isAcceptingNewProcessings()) - throw new IllegalStateException("Docproc service " + getName() + - " is not accepting new incoming processings. Cannot add " + processing + " "); - - if ( ! queue.offer(processing)) - throw new RejectedExecutionException("Docproc service " + getName() + " is busy, please try later"); - } - - /** - *

Do some work in this service. This will perform some processing and return - * in a "short" while, as long as individual processors also returns.

- * - *

This method is thread safe - multiple threads may call doWork at any time. - * Note that processors - * should be non-blocking, so multiple threads should be used primarily to - * utilize multiple processors.

- * - * @return true if some work was performed, false if no work could be performed - * at this time, because there are no outstanding processings, or because - * this is out of service. Note that since processings may arrive or be put - * back by another thread at any time, this return value does not mean - * that no work will be available if doWork as called again immediately. - */ - public boolean doWork() { - try { - return doWork(false); - } catch (InterruptedException e) { - //will never happen because we are not blocking - throw new RuntimeException(e); - } - } - - private boolean doWork(boolean blocking) throws InterruptedException { - Processing processing; - if (blocking) { - processing = queue.take(); - } else { - processing = queue.poll(); - } - - if (processing == null) { - //did no work, returning nothing to queue - return false; - } - if (!isInService()) { - //did no work, returning processing (because it's not empty) - queue.add(processing); //always successful, since queue is unbounded - return false; - } - - boolean remove = workOn(processing); //NOTE: Exceptions are handled in here, but not Errors - if (!remove) { - queue.add(processing); //always successful, since queue is unbounded - } - return true; - //NOTE: We *could* have called returnProcessing() in a finally block here, but we don't - //want that, since the only thing being thrown out here is Errors, and then the Processing - //can just disappear instead - } - - /** - *

Do some work in this service. This will perform some processing and return - * in a "short" while, as long as individual processors also returns. Note that - * if the internal queue is empty when this method is called, it will block until - * some work is submitted by a call to process() by another thread.

- * - *

This method is thread safe - multiple threads may call doWorkBlocking at any time. - * Note that processors - * should be non-blocking, so multiple threads should be used primarily to - * utilize multiple processors.

- * - * @return always true, since if the internal queue is empty when this method is - * called, it will block until some work is submitted by a call to - * process() by another thread. - * @throws InterruptedException if a call to this method is interrupted while waiting for data to become available - */ - public boolean doWorkBlocking() throws InterruptedException { - return doWork(true); - } - - /** - * Do some work on this processing. Must only be called from the worker thread. - * - * @return true if this processing should be removed, false if there is more work to do on it later - * @throws NoCallStackException if no CallStack has been set on this executor. - */ - boolean workOn(Processing processing) { - DocprocExecutor ex = getExecutor(); - if (ex == null) { - throw new NoCallStackException(); - } - - DocumentProcessor.Progress progress; - - try { - progress = ex.process(processing); - } catch (Exception e) { - processingFailed(processing, processing + " failed", e); - return true; - } - - if (DocumentProcessor.Progress.DONE.equals(progress)) { - //notify endpoint - ProcessingEndpoint recv = processing.getEndpoint(); - if (recv != null) { - recv.processingDone(processing); - } - return true; - } else if (DocumentProcessor.Progress.FAILED.equals(progress)) { - processingFailed(processing, processing + " failed at " + processing.callStack().getLastPopped(), null); - return true; - } else if (DocumentProcessor.Progress.PERMANENT_FAILURE.equals(progress)) { - processingFailed(processing, - processing + " failed PERMANENTLY at " + processing.callStack().getLastPopped() + ", disabling processing service.", null); - setInService(false); - return true; - } else { - //LATER: - return false; - } - } - - private void processingFailed(Processing processing, String errorMsg, Exception e) { - if (e != null) { - if (e instanceof HandledProcessingException) { - errorMsg += ". Error message: " + e.getMessage(); - log.log(Level.WARNING, errorMsg); - log.log(Level.FINE, "Chained exception:", e); - } else { - log.log(Level.WARNING, errorMsg, e); - } - } else { - log.log(Level.WARNING, errorMsg); - } - - //notify endpoint - ProcessingEndpoint recv = processing.getEndpoint(); - if (recv != null) { - recv.processingFailed(processing, e); - } - } - - private class NoCallStackException extends RuntimeException { - } - -} diff --git a/docproc/src/main/java/com/yahoo/docproc/DocumentOperationWrapper.java b/docproc/src/main/java/com/yahoo/docproc/DocumentOperationWrapper.java deleted file mode 100644 index 4530af29600..00000000000 --- a/docproc/src/main/java/com/yahoo/docproc/DocumentOperationWrapper.java +++ /dev/null @@ -1,15 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.docproc; - -import com.yahoo.document.DocumentOperation; - -/** - * @author Einar M R Rosenvinge - * @deprecated Will be removed in Vespa 8. Only for internal use. - */ -@Deprecated(forRemoval = true, since = "7") -public interface DocumentOperationWrapper { - - DocumentOperation getWrappedDocumentOperation(); - -} diff --git a/docproc/src/main/java/com/yahoo/docproc/DocumentProcessor.java b/docproc/src/main/java/com/yahoo/docproc/DocumentProcessor.java index ae7c1358a4f..7b8f07373a5 100644 --- a/docproc/src/main/java/com/yahoo/docproc/DocumentProcessor.java +++ b/docproc/src/main/java/com/yahoo/docproc/DocumentProcessor.java @@ -3,6 +3,7 @@ package com.yahoo.docproc; import com.yahoo.collections.Pair; import com.yahoo.component.chain.ChainedComponent; +import com.yahoo.docproc.impl.DocprocService; import java.util.HashMap; import java.util.Map; @@ -42,7 +43,6 @@ import java.util.logging.Logger; */ public abstract class DocumentProcessor extends ChainedComponent { - @SuppressWarnings("removal") // TODO Vespa 8: remove static Logger log = Logger.getLogger(DocprocService.class.getName()); /** Schema map for doctype-fieldnames */ diff --git a/docproc/src/main/java/com/yahoo/docproc/HandledProcessingException.java b/docproc/src/main/java/com/yahoo/docproc/HandledProcessingException.java deleted file mode 100644 index 2872d53f558..00000000000 --- a/docproc/src/main/java/com/yahoo/docproc/HandledProcessingException.java +++ /dev/null @@ -1,18 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.docproc; - -/** - * Exception generated by known bad input in a docproc. Will cause only message to be logged, - * not stacktrace. - * - * @author Mathias Mølster Lidal - * @deprecated Will be removed in Vespa 8. Only for internal use. - */ -@Deprecated(forRemoval = true, since = "7") -public class HandledProcessingException extends RuntimeException { - - public HandledProcessingException(String message) { - super(message); - } - -} diff --git a/docproc/src/main/java/com/yahoo/docproc/Processing.java b/docproc/src/main/java/com/yahoo/docproc/Processing.java index 834d63c5a86..7579304c8cb 100644 --- a/docproc/src/main/java/com/yahoo/docproc/Processing.java +++ b/docproc/src/main/java/com/yahoo/docproc/Processing.java @@ -1,7 +1,8 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.docproc; -import com.yahoo.component.provider.ComponentRegistry; +import com.yahoo.docproc.impl.ProcessingAccess; +import com.yahoo.docproc.impl.ProcessingEndpoint; import com.yahoo.document.DocumentOperation; import java.util.ArrayList; @@ -18,7 +19,7 @@ import java.util.Map; * * @author bratseth */ -public class Processing { +public class Processing extends ProcessingAccess { /** The name of the service which owns this processing. Null is the same as "default". */ private String service = null; @@ -40,13 +41,8 @@ public class Processing { private Map context = null; /** The endpoint of this processing. */ - @SuppressWarnings("removal") // TODO Vespa 8: remove private ProcessingEndpoint endpoint = null; - /** The registry of docproc services. */ - @SuppressWarnings("removal") // TODO Vespa 8: remove - private ComponentRegistry docprocServiceRegistry = null; - private boolean operationsGotten = false; /** @@ -79,8 +75,7 @@ public class Processing { * @param callStack the document processors to call in this processing. * @param endp the endpoint of this processing */ - @SuppressWarnings("removal") // TODO Vespa 8: remove - Processing(String service, DocumentOperation documentOperation, CallStack callStack, ProcessingEndpoint endp) { + private Processing(String service, DocumentOperation documentOperation, CallStack callStack, ProcessingEndpoint endp) { this.service = service; this.documentOperations = new ArrayList<>(1); documentOperations.add(documentOperation); @@ -102,9 +97,8 @@ public class Processing { this(service, documentOperation, callStack, null); } - // TODO Vespa 8: remove "removal" - @SuppressWarnings({"unused", "removal"}) - private Processing(String service, List documentOpsAndUpdates, CallStack callStack, ProcessingEndpoint endp, boolean unused) { + @SuppressWarnings({"unused"}) + protected Processing(String service, List documentOpsAndUpdates, CallStack callStack, ProcessingEndpoint endp, boolean unused) { this.service = service; this.documentOperations = new ArrayList<>(documentOpsAndUpdates.size()); documentOperations.addAll(documentOpsAndUpdates); @@ -112,11 +106,6 @@ public class Processing { this.endpoint = endp; } - @SuppressWarnings("removal") // TODO Vespa 8: remove - static Processing createProcessingFromDocumentOperations(String service, List documentOpsAndUpdates, CallStack callStack, ProcessingEndpoint endp) { - return new Processing(service, documentOpsAndUpdates, callStack, endp, false); - } - /** * Creates a Processing from a list of operations. * @@ -133,21 +122,6 @@ public class Processing { return new Processing(service, documentsAndUpdates, callStack, null, false); } - @Deprecated(forRemoval = true, since="7") - @SuppressWarnings("removal") // TODO Vespa 8: remove - public ComponentRegistry getDocprocServiceRegistry() { - return docprocServiceRegistry; - } - - /** - * @deprecated This method will be removed without replacement in Vespa 8. - */ - @Deprecated(forRemoval = true, since="7") - @SuppressWarnings("removal") // TODO Vespa 8: remove - public void setDocprocServiceRegistry(ComponentRegistry docprocServiceRegistry) { - this.docprocServiceRegistry = docprocServiceRegistry; - } - /** Returns the name of the service processing this. This will never return null */ public String getServiceName() { if (service == null) return "default"; @@ -159,23 +133,6 @@ public class Processing { this.service = service; } - /** - * Convenience method for looking up and returning the service processing this. This might return null - * if #getServiceName returns a name that is not registered in {@link com.yahoo.docproc.DocprocService}. - * - * @return the service processing this, or null if unknown. - * @deprecated Formerly used to retrieve the {@link com.yahoo.document.DocumentTypeManager}, - * which can now be directly injected via your component constructor. - */ - @Deprecated(forRemoval = true, since="7") - @SuppressWarnings("removal") // TODO Vespa 8: remove - public DocprocService getService() { - if (docprocServiceRegistry != null) { - return docprocServiceRegistry.getComponent(getServiceName()); - } - return null; - } - /** Returns a context variable, or null if it is not set */ public Object getVariable(String name) { if (context == null) return null; @@ -214,23 +171,13 @@ public class Processing { return context != null && context.containsKey(name); } - /** - * Returns the ProcessingEndpoint that is called when this Processing is complete, if any. - * - * @return the ProcessingEndpoint, or null - */ - @SuppressWarnings("removal") // TODO Vespa 8: remove - ProcessingEndpoint getEndpoint() { + @Override + protected ProcessingEndpoint getEndpoint() { return endpoint; } - /** - * Sets the ProcessingEndpoint to be called when this Processing is complete. - * - * @param endpoint the ProcessingEndpoint to use - */ - @SuppressWarnings("removal") // TODO Vespa 8: remove - void setEndpoint(ProcessingEndpoint endpoint) { + @Override + protected void setEndpoint(ProcessingEndpoint endpoint) { this.endpoint = endpoint; } @@ -256,17 +203,13 @@ public class Processing { return callStack; } - /** - * Package-private method to set the callstack of this processing. Only to be used - * by DocprocService.process(Processing). - * - * @param callStack the callstack to set - */ - void setCallStack(CallStack callStack) { + @Override + protected void setCallStack(CallStack callStack) { this.callStack = callStack; } - List getOnceOperationsToBeProcessed() { + @Override + protected List getOnceOperationsToBeProcessed() { if (operationsGotten) return Collections.emptyList(); diff --git a/docproc/src/main/java/com/yahoo/docproc/ProcessingEndpoint.java b/docproc/src/main/java/com/yahoo/docproc/ProcessingEndpoint.java deleted file mode 100644 index 00b0bc7c7c7..00000000000 --- a/docproc/src/main/java/com/yahoo/docproc/ProcessingEndpoint.java +++ /dev/null @@ -1,15 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.docproc; - -/** - * @author Einar M R Rosenvinge - * @deprecated Will be removed in Vespa 8. Only for internal use. - */ -@Deprecated(forRemoval = true, since = "7") -public interface ProcessingEndpoint { - - void processingDone(Processing processing); - - void processingFailed(Processing processing, Exception exception); - -} diff --git a/docproc/src/main/java/com/yahoo/docproc/TransientFailureException.java b/docproc/src/main/java/com/yahoo/docproc/TransientFailureException.java deleted file mode 100644 index a88158cc367..00000000000 --- a/docproc/src/main/java/com/yahoo/docproc/TransientFailureException.java +++ /dev/null @@ -1,18 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.docproc; - -/** - * Exception to be thrown by a document processor on transient failures. - * Caller is welcome to try the call again later. - * - * @author Einar M R Rosenvinge - * @deprecated Will be removed in Vespa 8. Only for internal use. - */ -@Deprecated(forRemoval = true, since = "7") -public class TransientFailureException extends RuntimeException { - - public TransientFailureException(String s) { - super(s); - } - -} diff --git a/docproc/src/main/java/com/yahoo/docproc/impl/DocprocExecutor.java b/docproc/src/main/java/com/yahoo/docproc/impl/DocprocExecutor.java new file mode 100644 index 00000000000..cfd43099cc8 --- /dev/null +++ b/docproc/src/main/java/com/yahoo/docproc/impl/DocprocExecutor.java @@ -0,0 +1,193 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.docproc.impl; + +import com.yahoo.docproc.Call; +import com.yahoo.docproc.CallStack; +import com.yahoo.docproc.DocumentProcessor; +import com.yahoo.docproc.Processing; +import com.yahoo.document.DocumentOperation; +import com.yahoo.document.DocumentPut; +import com.yahoo.document.json.JsonWriter; +import com.yahoo.jdisc.Metric; +import com.yahoo.text.Utf8; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; +import java.util.logging.Level; +import java.util.logging.Logger; + +import static java.util.stream.Collectors.counting; +import static java.util.stream.Collectors.groupingBy; + +/** + * An executor executed incoming processings on its CallStack + * + * @author Einar M R Rosenvinge + */ +public class DocprocExecutor { + + private final static String METRIC_NAME_DOCUMENTS_PROCESSED = "documents_processed"; + + private static final Logger log = Logger.getLogger(DocprocExecutor.class.getName()); + + private final String name; + private final String docCounterName; + private final Metric metric; + private final Function contexts; + private final CallStack callStack; + + /** + * Creates a new named DocprocExecutor with the given CallStack. + * + * @param name the name of this executor + * @param callStack the chain of document processors this executor shall execute on processings + */ + public DocprocExecutor(String name, CallStack callStack) { + this.name = name; + String chainDimension = name != null ? name.replaceAll("[^\\p{Alnum}]", "_") : name; + docCounterName = "chain_" + chainDimension + "_documents"; + this.metric = callStack.getMetric(); + this.callStack = callStack; + this.callStack.setName(name); + this.contexts = cachedContexts(chainDimension); + } + + /** + * Creates a new named DocprocExecutor, with the same instance variables as the given executor, + * but a new call stack. + * + * @param oldExecutor the executor to inherit the instance variables from, sans call stack. + * @param callStack the call stack to use. + */ + public DocprocExecutor(DocprocExecutor oldExecutor, CallStack callStack) { + this.name = oldExecutor.name; + this.docCounterName = oldExecutor.docCounterName; + this.metric = oldExecutor.metric; + this.contexts = oldExecutor.contexts; + this.callStack = callStack; + } + + public CallStack getCallStack() { + return callStack; + } + + public String getName() { + return name; + } + + private void incrementNumDocsProcessed(Processing processing) { + List operations = ((ProcessingAccess)processing).getOnceOperationsToBeProcessed(); + if ( ! operations.isEmpty()) { + metric.add(docCounterName, operations.size(), null); + operations.stream() + .collect(groupingBy(operation -> operation.getId().getDocType(), counting())) + .forEach((type, count) -> metric.add(METRIC_NAME_DOCUMENTS_PROCESSED, count, contexts.apply(type))); + } + } + + /** + * Processes a given Processing through the CallStack of this executor. + * + * @param processing the Processing to process. The CallStack of the Processing will be set to a clone of the CallStack of this executor, iff. it is currently null. + * @return a Progress; if this is LATER, the Processing is not done and must be reprocessed later. + * @throws RuntimeException if a document processor throws an exception during processing. + * @see com.yahoo.docproc.Processing + */ + public DocumentProcessor.Progress process(Processing processing) { + processing.setServiceName(getName()); + if (processing.callStack() == null) { + ((ProcessingAccess)processing).setCallStack(new CallStack(getCallStack())); + } + + DocumentProcessor.Progress progress = DocumentProcessor.Progress.DONE; + //metrics stuff: + //TODO: Note that this is *wrong* in case of Progress.LATER, documents are then counted several times until the Processing is DONE or FAILED. + incrementNumDocsProcessed(processing); + do { + Call call = processing.callStack().pop(); + if (call == null) { + // No more processors - done + return progress; + } + + //might throw exception, which is OK: + progress = call.call(processing); + + if (log.isLoggable(Level.FINEST)) { + logProgress(processing, progress, call); + } + + if (DocumentProcessor.Progress.LATER.equals(progress)) { + processing.callStack().addNext(call); + return progress; + } + } while (DocumentProcessor.Progress.DONE.equals(progress)); + return progress; + } + + private void logProgress(Processing processing, DocumentProcessor.Progress progress, Call call) { + StringBuilder message = new StringBuilder(); + boolean first = true; + message.append(call.getDocumentProcessorId()).append(" of class ") + .append(call.getDocumentProcessor().getClass().getSimpleName()).append(" returned ").append(progress) + .append(" for the documents: ["); + for (DocumentOperation op : processing.getDocumentOperations()) { + if (first) { + first = false; + } else { + message.append(", "); + } + if (op instanceof DocumentPut) { + message.append(Utf8.toString(JsonWriter.toByteArray(((DocumentPut) op).getDocument()))); + } else { + message.append(op.toString()); + } + } + message.append("]"); + log.log(Level.FINEST, message.toString()); + } + + /** + * Processes a given Processing through the CallStack of this executor. Note that if a DocumentProcessor + * returns a LaterProgress for this processing, it will be re-processed (after waiting the specified delay given + * by the LaterProgress), until done or failed. + * + * @param processing the Processing to process. The CallStack of the Processing will be set to a clone of the CallStack of this executor, iff. it is currently null. + * @return a Progress; this is never a LaterProgress. + * @throws RuntimeException if a document processor throws an exception during processing, or this thread is interrupted while waiting. + * @see com.yahoo.docproc.Processing + * @see com.yahoo.docproc.DocumentProcessor.Progress + * @see com.yahoo.docproc.DocumentProcessor.LaterProgress + */ + public DocumentProcessor.Progress processUntilDone(Processing processing) { + DocumentProcessor.Progress progress; + while (true) { + progress = process(processing); + if (!(progress instanceof DocumentProcessor.LaterProgress)) { + break; + } + DocumentProcessor.LaterProgress later = (DocumentProcessor.LaterProgress) progress; + try { + Thread.sleep(later.getDelay()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + return progress; + } + + private Function cachedContexts(String chainDimension) { + Map contextCache = new ConcurrentHashMap<>(); + return documentType -> contextCache.computeIfAbsent(documentType, type -> { + Map dimensions = new HashMap<>(2); + dimensions.put("chain", chainDimension); + dimensions.put("documenttype", type); + return metric.createContext(dimensions); + }); + } + +} diff --git a/docproc/src/main/java/com/yahoo/docproc/impl/DocprocService.java b/docproc/src/main/java/com/yahoo/docproc/impl/DocprocService.java new file mode 100644 index 00000000000..62a9a31f47a --- /dev/null +++ b/docproc/src/main/java/com/yahoo/docproc/impl/DocprocService.java @@ -0,0 +1,404 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.docproc.impl; + +import com.yahoo.component.AbstractComponent; +import com.yahoo.component.ComponentId; +import com.yahoo.concurrent.DaemonThreadFactory; +import com.yahoo.docproc.CallStack; +import com.yahoo.docproc.DocumentProcessor; +import com.yahoo.docproc.Processing; +import com.yahoo.docproc.proxy.SchemaMap; +import com.yahoo.document.DocumentOperation; +import com.yahoo.document.DocumentTypeManager; + +import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + *

The document processing service. + * Use this to set up a document processing chain and to + * process documents using that chain. Note that there may + * be multiple named instances of this service in the same + * runtime. The default service is called "default" and is always present.

+ * + *

To create a server which receives documents from the network + * and processes them, have a look at com.yahoo.docproc.server.Server.

+ * + *

This class is thread safe.

+ * + * @author bratseth + */ +public class DocprocService extends AbstractComponent { + + private static final Logger log = Logger.getLogger(DocprocService.class.getName()); + private volatile DocprocExecutor executor; + + /** The processings currently in progress at this service */ + private final LinkedBlockingQueue queue; + private final ThreadPoolExecutor threadPool; + /** The current state of this service */ + private boolean inService = false; + /** The current state of this service */ + private boolean acceptingNewProcessings = true; + public static SchemaMap schemaMap = new SchemaMap(); + private DocumentTypeManager documentTypeManager = null; + + private DocprocService(ComponentId id, int numThreads) { + super(id); + queue = new LinkedBlockingQueue<>(); + threadPool = new ThreadPoolExecutor(numThreads, + numThreads, + 0, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), + new DaemonThreadFactory("docproc-" + id.stringValue() + "-")); + } + + public DocprocService(ComponentId id) { + this(id, Runtime.getRuntime().availableProcessors()); + } + + /** + * Creates a new docproc service, which is set to be in service. + * + * @param id the component id of the new service. + * @param stack the call stack to use. + * @param mgr the document type manager to use. + * @param numThreads to have in the thread pool + */ + public DocprocService(ComponentId id, CallStack stack, DocumentTypeManager mgr, int numThreads) { + this(id, numThreads); + setCallStack(stack); + setDocumentTypeManager(mgr); + setInService(true); + } + + /** + * Creates a service with a name with an unbounded input queue. If the given name is null or the empty string, + * it will become the name "default". + * Testing only + */ + public DocprocService(String name) { + this(new ComponentId(name, null), 1); + } + + @Override + public void deconstruct() { + threadPool.shutdown(); + } + + public DocumentTypeManager getDocumentTypeManager() { + return documentTypeManager; + } + + public void setDocumentTypeManager(DocumentTypeManager documentTypeManager) { + this.documentTypeManager = documentTypeManager; + } + + public int getQueueSize() { + return queue.size(); + } + + /** + * Returns the DocprocExecutor of this DocprocService. This can be used to + * synchronously process one Processing. + * + * @return the DocprocExecutor of this DocprocService, or null if a CallStack has not yet been set. + */ + public DocprocExecutor getExecutor() { + return executor; + } + + public ThreadPoolExecutor getThreadPoolExecutor() { + return threadPool; + } + + private void setExecutor(DocprocExecutor executor) { + this.executor = executor; + } + + /** + * Sets whether this should currently perform any processing. + * New processings will be accepted also when this is out of service, + * but no processing will happen when it is out of service. + */ + public void setInService(boolean inService) { + this.inService = inService; + } + + /** + * Returns true if this is currently processing incoming processings + * (in service), or false if they are just queued up (out of service). + * By default, this is out of service. + */ + public boolean isInService() { + return inService; + } + + /** + * Returns true if this service currently accepts new incoming processings via process(...). Default is true. + * + * @return true if accepting new incoming processings + */ + public boolean isAcceptingNewProcessings() { + return acceptingNewProcessings; + } + + /** + * Sets whether this service should accept new incoming processings via process(...). + * + * @param acceptingNewProcessings true if service should accept new incoming processings + */ + public void setAcceptingNewProcessings(boolean acceptingNewProcessings) { + this.acceptingNewProcessings = acceptingNewProcessings; + } + + public String getName() { + return getId().stringValue(); + } + + /** + * Returns the processing chain of this service. This stack can not be modified. + * To change the stack, set a new one. + */ + // TODO: Enforce unmodifiability + public CallStack getCallStack() { + DocprocExecutor ex = getExecutor(); + return (ex == null) ? null : ex.getCallStack(); + } + + /** + * Sets a new processing stack for this service. This will be the Prototype + * for the call stacks of individual processings in this service + */ + public void setCallStack(CallStack stack) { + DocprocExecutor ex = ((getExecutor() == null) ? new DocprocExecutor(getName(), stack) : new DocprocExecutor(getExecutor(), stack)); + setExecutor(ex); + } + + /** + * Asynchronously process the given Processing using the processing + * chain of this service, and call the specified ProcessingEndpoint when done. + * + * @throws RuntimeException caused by a QueueFullException if this DocprocService has a bounded input queue and the queue is full + * @throws IllegalStateException if this DocprocService is not accepting new incoming processings + */ + public void process(Processing processing, ProcessingEndpoint endp) { + processing.setServiceName(getName()); + ((ProcessingAccess)processing).setCallStack(new CallStack(getCallStack())); + ((ProcessingAccess)processing).setEndpoint(endp); + addProcessing(processing); + } + + /** + * Asynchronously process the given Processing using the processing + * chain of this service + * + * @throws RuntimeException caused by a QueueFullException if this DocprocService has a bounded input queue and the queue is full + * @throws IllegalStateException if this DocprocService is not accepting new incoming processings + */ + public void process(Processing processing) { + process(processing, null); + } + + /** + * Asynchronously process the given document put or document update using the processing + * chain of this service, and call the specified ProcessingEndpoint when done. + * + * @throws RuntimeException caused by a QueueFullException if this DocprocService has a bounded input queue and the queue is full + * @throws IllegalStateException if this DocprocService is not accepting new incoming processings + */ + public void process(DocumentOperation documentOperation, ProcessingEndpoint endp) { + Processing processing = new Processing(getName(), documentOperation, new CallStack(getCallStack())); + ((ProcessingAccess)processing).setEndpoint(endp); + addProcessing(processing); + } + + /** + * Asynchronously process the given document operation using the processing + * chain of this service. + * + * @throws RuntimeException caused by a QueueFullException if this DocprocService has a bounded input queue and the queue is full + * @throws IllegalStateException if this DocprocService is not accepting new incoming processings + */ + public void process(DocumentOperation documentOperation) { + process(documentOperation, null); + } + + /** + * Asynchronously process the given document operations as one unit + * using the processing chain of this service, + * and call the specified ProcessingEndpoint when done. + * + * @throws RuntimeException caused by a QueueFullException if this DocprocService has a bounded input queue and the queue is full + * @throws IllegalStateException if this DocprocService is not accepting new incoming processings + */ + public void processDocumentOperations(List documentOperations, ProcessingEndpoint endp) { + Processing processing = Processing.createProcessingFromDocumentOperations(getName(), documentOperations, new CallStack(getCallStack())); + ((ProcessingAccess)processing).setEndpoint(endp); + addProcessing(processing); + + } + + /** + * Asynchronously process the given document operations as one unit + * using the processing chain of this service. + * + * @throws RuntimeException caused by a QueueFullException if this DocprocService has a bounded input queue and the queue is full + * @throws IllegalStateException if this DocprocService is not accepting new incoming processings + */ + public void processDocumentOperations(List documentOperations) { + processDocumentOperations(documentOperations, null); + } + + private void addProcessing(Processing processing) { + if ( ! isAcceptingNewProcessings()) + throw new IllegalStateException("Docproc service " + getName() + + " is not accepting new incoming processings. Cannot add " + processing + " "); + + if ( ! queue.offer(processing)) + throw new RejectedExecutionException("Docproc service " + getName() + " is busy, please try later"); + } + + /** + *

Do some work in this service. This will perform some processing and return + * in a "short" while, as long as individual processors also returns.

+ * + *

This method is thread safe - multiple threads may call doWork at any time. + * Note that processors + * should be non-blocking, so multiple threads should be used primarily to + * utilize multiple processors.

+ * + * @return true if some work was performed, false if no work could be performed + * at this time, because there are no outstanding processings, or because + * this is out of service. Note that since processings may arrive or be put + * back by another thread at any time, this return value does not mean + * that no work will be available if doWork as called again immediately. + */ + public boolean doWork() { + try { + return doWork(false); + } catch (InterruptedException e) { + //will never happen because we are not blocking + throw new RuntimeException(e); + } + } + + private boolean doWork(boolean blocking) throws InterruptedException { + Processing processing; + if (blocking) { + processing = queue.take(); + } else { + processing = queue.poll(); + } + + if (processing == null) { + //did no work, returning nothing to queue + return false; + } + if (!isInService()) { + //did no work, returning processing (because it's not empty) + queue.add(processing); //always successful, since queue is unbounded + return false; + } + + boolean remove = workOn(processing); //NOTE: Exceptions are handled in here, but not Errors + if (!remove) { + queue.add(processing); //always successful, since queue is unbounded + } + return true; + //NOTE: We *could* have called returnProcessing() in a finally block here, but we don't + //want that, since the only thing being thrown out here is Errors, and then the Processing + //can just disappear instead + } + + /** + *

Do some work in this service. This will perform some processing and return + * in a "short" while, as long as individual processors also returns. Note that + * if the internal queue is empty when this method is called, it will block until + * some work is submitted by a call to process() by another thread.

+ * + *

This method is thread safe - multiple threads may call doWorkBlocking at any time. + * Note that processors + * should be non-blocking, so multiple threads should be used primarily to + * utilize multiple processors.

+ * + * @return always true, since if the internal queue is empty when this method is + * called, it will block until some work is submitted by a call to + * process() by another thread. + * @throws InterruptedException if a call to this method is interrupted while waiting for data to become available + */ + public boolean doWorkBlocking() throws InterruptedException { + return doWork(true); + } + + /** + * Do some work on this processing. Must only be called from the worker thread. + * + * @return true if this processing should be removed, false if there is more work to do on it later + * @throws NoCallStackException if no CallStack has been set on this executor. + */ + boolean workOn(Processing processing) { + DocprocExecutor ex = getExecutor(); + if (ex == null) { + throw new NoCallStackException(); + } + + DocumentProcessor.Progress progress; + + try { + progress = ex.process(processing); + } catch (Exception e) { + processingFailed(processing, processing + " failed", e); + return true; + } + + if (DocumentProcessor.Progress.DONE.equals(progress)) { + //notify endpoint + ProcessingEndpoint recv = ((ProcessingAccess)processing).getEndpoint(); + if (recv != null) { + recv.processingDone(processing); + } + return true; + } else if (DocumentProcessor.Progress.FAILED.equals(progress)) { + processingFailed(processing, processing + " failed at " + processing.callStack().getLastPopped(), null); + return true; + } else if (DocumentProcessor.Progress.PERMANENT_FAILURE.equals(progress)) { + processingFailed(processing, + processing + " failed PERMANENTLY at " + processing.callStack().getLastPopped() + ", disabling processing service.", null); + setInService(false); + return true; + } else { + //LATER: + return false; + } + } + + private void processingFailed(Processing processing, String errorMsg, Exception e) { + if (e != null) { + if (e instanceof HandledProcessingException) { + errorMsg += ". Error message: " + e.getMessage(); + log.log(Level.WARNING, errorMsg); + log.log(Level.FINE, "Chained exception:", e); + } else { + log.log(Level.WARNING, errorMsg, e); + } + } else { + log.log(Level.WARNING, errorMsg); + } + + //notify endpoint + ProcessingEndpoint recv = ((ProcessingAccess)processing).getEndpoint(); + if (recv != null) { + recv.processingFailed(processing, e); + } + } + + private class NoCallStackException extends RuntimeException { + } + +} diff --git a/docproc/src/main/java/com/yahoo/docproc/impl/DocumentOperationWrapper.java b/docproc/src/main/java/com/yahoo/docproc/impl/DocumentOperationWrapper.java new file mode 100644 index 00000000000..32ccc11c1ab --- /dev/null +++ b/docproc/src/main/java/com/yahoo/docproc/impl/DocumentOperationWrapper.java @@ -0,0 +1,13 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.docproc.impl; + +import com.yahoo.document.DocumentOperation; + +/** + * @author Einar M R Rosenvinge + */ +public interface DocumentOperationWrapper { + + DocumentOperation getWrappedDocumentOperation(); + +} diff --git a/docproc/src/main/java/com/yahoo/docproc/impl/HandledProcessingException.java b/docproc/src/main/java/com/yahoo/docproc/impl/HandledProcessingException.java new file mode 100644 index 00000000000..7bf706a0b56 --- /dev/null +++ b/docproc/src/main/java/com/yahoo/docproc/impl/HandledProcessingException.java @@ -0,0 +1,16 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.docproc.impl; + +/** + * Exception generated by known bad input in a docproc. Will cause only message to be logged, + * not stacktrace. + * + * @author Mathias Mølster Lidal + */ +public class HandledProcessingException extends RuntimeException { + + public HandledProcessingException(String message) { + super(message); + } + +} diff --git a/docproc/src/main/java/com/yahoo/docproc/impl/ProcessingAccess.java b/docproc/src/main/java/com/yahoo/docproc/impl/ProcessingAccess.java new file mode 100644 index 00000000000..a18257f61ec --- /dev/null +++ b/docproc/src/main/java/com/yahoo/docproc/impl/ProcessingAccess.java @@ -0,0 +1,31 @@ +package com.yahoo.docproc.impl; + +import com.yahoo.docproc.CallStack; +import com.yahoo.document.DocumentOperation; + +import java.util.List; + +/** + * Bridge to access protected (originally package private) methods in {@link com.yahoo.docproc.Processing}. + * + * @author gjoranv + */ +public abstract class ProcessingAccess { + + protected ProcessingEndpoint getEndpoint() { + throw new UnsupportedOperationException("docproc.Processing must override this method!"); + } + + protected void setEndpoint(ProcessingEndpoint endpoint) { + throw new UnsupportedOperationException("docproc.Processing must override this method!"); + } + + protected void setCallStack(CallStack callStack) { + throw new UnsupportedOperationException("docproc.Processing must override this method!"); + } + + protected List getOnceOperationsToBeProcessed() { + throw new UnsupportedOperationException("docproc.Processing must override this method!"); + } + +} diff --git a/docproc/src/main/java/com/yahoo/docproc/impl/ProcessingEndpoint.java b/docproc/src/main/java/com/yahoo/docproc/impl/ProcessingEndpoint.java new file mode 100644 index 00000000000..e88c2ef444b --- /dev/null +++ b/docproc/src/main/java/com/yahoo/docproc/impl/ProcessingEndpoint.java @@ -0,0 +1,15 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.docproc.impl; + +import com.yahoo.docproc.Processing; + +/** + * @author Einar M R Rosenvinge + */ +public interface ProcessingEndpoint { + + void processingDone(Processing processing); + + void processingFailed(Processing processing, Exception exception); + +} diff --git a/docproc/src/main/java/com/yahoo/docproc/impl/TransientFailureException.java b/docproc/src/main/java/com/yahoo/docproc/impl/TransientFailureException.java new file mode 100644 index 00000000000..b470ba4b806 --- /dev/null +++ b/docproc/src/main/java/com/yahoo/docproc/impl/TransientFailureException.java @@ -0,0 +1,16 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.docproc.impl; + +/** + * Exception to be thrown by a document processor on transient failures. + * Caller is welcome to try the call again later. + * + * @author Einar M R Rosenvinge + */ +public class TransientFailureException extends RuntimeException { + + public TransientFailureException(String s) { + super(s); + } + +} diff --git a/docproc/src/main/java/com/yahoo/docproc/impl/package-info.java b/docproc/src/main/java/com/yahoo/docproc/impl/package-info.java new file mode 100644 index 00000000000..f77ab5d9103 --- /dev/null +++ b/docproc/src/main/java/com/yahoo/docproc/impl/package-info.java @@ -0,0 +1,5 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +@ExportPackage +package com.yahoo.docproc.impl; + +import com.yahoo.osgi.annotation.ExportPackage; diff --git a/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingHandler.java b/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingHandler.java index 3b31b0447a3..9eda0b12069 100644 --- a/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingHandler.java +++ b/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingHandler.java @@ -12,12 +12,11 @@ import com.yahoo.container.core.ChainsConfig; import com.yahoo.container.core.document.ContainerDocumentConfig; import com.yahoo.docproc.AbstractConcreteDocumentFactory; import com.yahoo.docproc.CallStack; -import com.yahoo.docproc.DocprocService; +import com.yahoo.docproc.impl.DocprocService; import com.yahoo.docproc.DocumentProcessor; import com.yahoo.docproc.jdisc.messagebus.MbusRequestContext; import com.yahoo.docproc.proxy.SchemaMap; import com.yahoo.document.DocumentTypeManager; -import com.yahoo.document.config.DocumentmanagerConfig; import com.yahoo.jdisc.Metric; import com.yahoo.jdisc.Request; import com.yahoo.jdisc.handler.AbstractRequestHandler; @@ -43,7 +42,6 @@ import static com.yahoo.component.chain.model.ChainsModelBuilder.buildFromConfig public class DocumentProcessingHandler extends AbstractRequestHandler { private static final Logger log = Logger.getLogger(DocumentProcessingHandler.class.getName()); - @SuppressWarnings("removal") // TODO Vespa 8: remove private final ComponentRegistry docprocServiceRegistry; private final ComponentRegistry docFactoryRegistry; private final ChainRegistry chainRegistry = new ChainRegistry<>(); @@ -52,7 +50,6 @@ public class DocumentProcessingHandler extends AbstractRequestHandler { private final ContainerDocumentConfig containerDocConfig; private final DocumentTypeManager documentTypeManager; - @SuppressWarnings("removal") // TODO Vespa 8: remove private DocumentProcessingHandler(ComponentRegistry docprocServiceRegistry, ComponentRegistry documentProcessorComponentRegistry, ComponentRegistry docFactoryRegistry, @@ -85,7 +82,6 @@ public class DocumentProcessingHandler extends AbstractRequestHandler { return (maxThreads > 0) ? maxThreads : Runtime.getRuntime().availableProcessors(); } - @SuppressWarnings("removal") // TODO Vespa 8: remove DocumentProcessingHandler(ComponentRegistry docprocServiceRegistry, ComponentRegistry documentProcessorComponentRegistry, ComponentRegistry docFactoryRegistry, @@ -118,13 +114,11 @@ public class DocumentProcessingHandler extends AbstractRequestHandler { } @Override - @SuppressWarnings("removal") // TODO Vespa 8: remove protected void destroy() { laterExecutor.shutdown(); docprocServiceRegistry.allComponents().forEach(docprocService -> docprocService.deconstruct()); } - @SuppressWarnings("removal") // TODO Vespa 8: remove public ComponentRegistry getDocprocServiceRegistry() { return docprocServiceRegistry; } @@ -140,7 +134,6 @@ public class DocumentProcessingHandler extends AbstractRequestHandler { } - @SuppressWarnings("removal") // TODO Vespa 8: remove private static CallStack convertToCallStack(Chain chain, Metric metric) { CallStack stack = new CallStack(chain.getId().stringValue(), metric); for (DocumentProcessor processor : chain.components()) { @@ -151,7 +144,6 @@ public class DocumentProcessingHandler extends AbstractRequestHandler { } @Override - @SuppressWarnings("removal") // TODO Vespa 8: remove public ContentChannel handleRequest(Request request, ResponseHandler handler) { RequestContext requestContext; if (request instanceof MbusRequest) { diff --git a/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingTask.java b/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingTask.java index e629953ccff..f7f8d0e6a10 100644 --- a/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingTask.java +++ b/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingTask.java @@ -4,10 +4,10 @@ package com.yahoo.docproc.jdisc; import com.yahoo.collections.Tuple2; import com.yahoo.docproc.Call; import com.yahoo.docproc.CallStack; -import com.yahoo.docproc.DocprocExecutor; -import com.yahoo.docproc.DocprocService; +import com.yahoo.docproc.impl.DocprocExecutor; +import com.yahoo.docproc.impl.DocprocService; import com.yahoo.docproc.DocumentProcessor; -import com.yahoo.docproc.HandledProcessingException; +import com.yahoo.docproc.impl.HandledProcessingException; import com.yahoo.docproc.Processing; import java.util.logging.Level; import com.yahoo.yolean.Exceptions; @@ -19,7 +19,6 @@ import java.util.Iterator; import java.util.List; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; -import java.util.logging.Level; import java.util.logging.Logger; /** @@ -34,11 +33,9 @@ public class DocumentProcessingTask implements Runnable { private final DocumentProcessingHandler docprocHandler; private final RequestContext requestContext; - @SuppressWarnings("removal") // TODO Vespa 8: remove private final DocprocService service; private final ThreadPoolExecutor executor; - @SuppressWarnings("removal") // TODO Vespa 8: remove public DocumentProcessingTask(RequestContext requestContext, DocumentProcessingHandler docprocHandler, DocprocService service, ThreadPoolExecutor executor) { this.requestContext = requestContext; @@ -67,7 +64,6 @@ public class DocumentProcessingTask implements Runnable { return; } - @SuppressWarnings("removal") // TODO Vespa 8: remove DocprocExecutor executor = service.getExecutor(); DocumentProcessor.Progress progress = process(executor); @@ -91,7 +87,6 @@ public class DocumentProcessingTask implements Runnable { * * @param executor the DocprocService to use for processing */ - @SuppressWarnings("removal") // TODO Vespa 8: remove private DocumentProcessor.Progress process(DocprocExecutor executor) { Iterator iterator = processings.iterator(); List> later = new ArrayList<>(); @@ -188,7 +183,6 @@ public class DocumentProcessingTask implements Runnable { '}'; } - @SuppressWarnings("removal") // TODO Vespa 8: remove private static void logProcessingFailure(Processing processing, Exception exception) { //LOGGING ONLY: String errorMsg = processing + " failed at " + processing.callStack().getLastPopped(); diff --git a/docproc/src/main/java/com/yahoo/docproc/jdisc/messagebus/MbusRequestContext.java b/docproc/src/main/java/com/yahoo/docproc/jdisc/messagebus/MbusRequestContext.java index 50d7acb4064..caaff318cdd 100644 --- a/docproc/src/main/java/com/yahoo/docproc/jdisc/messagebus/MbusRequestContext.java +++ b/docproc/src/main/java/com/yahoo/docproc/jdisc/messagebus/MbusRequestContext.java @@ -5,10 +5,10 @@ import com.yahoo.component.provider.ComponentRegistry; import com.yahoo.concurrent.CopyOnWriteHashMap; import com.yahoo.container.core.document.ContainerDocumentConfig; import com.yahoo.docproc.AbstractConcreteDocumentFactory; -import com.yahoo.docproc.DocprocService; -import com.yahoo.docproc.HandledProcessingException; +import com.yahoo.docproc.impl.DocprocService; +import com.yahoo.docproc.impl.HandledProcessingException; import com.yahoo.docproc.Processing; -import com.yahoo.docproc.TransientFailureException; +import com.yahoo.docproc.impl.TransientFailureException; import com.yahoo.docproc.jdisc.RequestContext; import com.yahoo.document.DocumentOperation; import com.yahoo.documentapi.messagebus.protocol.DocumentMessage; @@ -53,7 +53,6 @@ public class MbusRequestContext implements RequestContext, ResponseHandler { public final static String internalNoThrottledSource = "internalNoThrottledSource"; private final static String internalNoThrottledSourcePath = "/" + internalNoThrottledSource; - @SuppressWarnings("removal") // TODO Vespa 8: remove public MbusRequestContext(MbusRequest request, ResponseHandler responseHandler, ComponentRegistry docprocServiceComponentRegistry, ComponentRegistry docFactoryRegistry, @@ -61,7 +60,7 @@ public class MbusRequestContext implements RequestContext, ResponseHandler { this.request = request; this.requestMsg = (DocumentMessage)request.getMessage(); this.responseHandler = responseHandler; - this.processingFactory = new ProcessingFactory(docprocServiceComponentRegistry, docFactoryRegistry, + this.processingFactory = new ProcessingFactory(docFactoryRegistry, containerDocConfig, getServiceName()); this.messageFactory = newMessageFactory(requestMsg); } @@ -112,7 +111,6 @@ public class MbusRequestContext implements RequestContext, ResponseHandler { } @Override - @SuppressWarnings("removal") // TODO Vespa 8: remove public void processingFailed(Exception exception) { ErrorCode errorCode; if (exception instanceof TransientFailureException) { diff --git a/docproc/src/main/java/com/yahoo/docproc/jdisc/messagebus/ProcessingFactory.java b/docproc/src/main/java/com/yahoo/docproc/jdisc/messagebus/ProcessingFactory.java index 33cd6647ede..52f112f9b43 100644 --- a/docproc/src/main/java/com/yahoo/docproc/jdisc/messagebus/ProcessingFactory.java +++ b/docproc/src/main/java/com/yahoo/docproc/jdisc/messagebus/ProcessingFactory.java @@ -8,7 +8,6 @@ import com.yahoo.component.ComponentId; import com.yahoo.component.provider.ComponentRegistry; import com.yahoo.container.core.document.ContainerDocumentConfig; import com.yahoo.docproc.AbstractConcreteDocumentFactory; -import com.yahoo.docproc.DocprocService; import com.yahoo.docproc.Processing; import com.yahoo.document.Document; import com.yahoo.document.DocumentOperation; @@ -27,18 +26,13 @@ import com.yahoo.messagebus.Message; class ProcessingFactory { private final static Logger log = Logger.getLogger(ProcessingFactory.class.getName()); - @SuppressWarnings("removal") // TODO Vespa 8: remove - private final ComponentRegistry docprocServiceComponentRegistry; private final ComponentRegistry docFactoryRegistry; private final ContainerDocumentConfig containerDocConfig; private final String serviceName; - @SuppressWarnings("removal") // TODO Vespa 8: remove - public ProcessingFactory(ComponentRegistry docprocServiceComponentRegistry, - ComponentRegistry docFactoryRegistry, + public ProcessingFactory(ComponentRegistry docFactoryRegistry, ContainerDocumentConfig containerDocConfig, String serviceName) { - this.docprocServiceComponentRegistry = docprocServiceComponentRegistry; this.docFactoryRegistry = docFactoryRegistry; this.containerDocConfig = containerDocConfig; this.serviceName = serviceName; @@ -100,15 +94,11 @@ class ProcessingFactory { return null; } - @SuppressWarnings("removal") // TODO Vespa 8: remove private Processing createProcessing(DocumentOperation documentOperation, Message message) { Processing processing = new Processing(); processing.addDocumentOperation(documentOperation); processing.setServiceName(serviceName); - // TODO Vespa 8: Remove statement (registry will be removed from Processing) - processing.setDocprocServiceRegistry(docprocServiceComponentRegistry); - processing.setVariable("route", message.getRoute()); processing.setVariable("timeout", message.getTimeRemaining()); return processing; diff --git a/docproc/src/main/java/com/yahoo/docproc/proxy/ProxyDocument.java b/docproc/src/main/java/com/yahoo/docproc/proxy/ProxyDocument.java index e2432dd71c9..975d396a5cb 100644 --- a/docproc/src/main/java/com/yahoo/docproc/proxy/ProxyDocument.java +++ b/docproc/src/main/java/com/yahoo/docproc/proxy/ProxyDocument.java @@ -2,7 +2,7 @@ package com.yahoo.docproc.proxy; import com.yahoo.docproc.Accesses; -import com.yahoo.docproc.DocumentOperationWrapper; +import com.yahoo.docproc.impl.DocumentOperationWrapper; import com.yahoo.docproc.DocumentProcessor; import com.yahoo.document.DataType; import com.yahoo.document.Document; @@ -15,7 +15,6 @@ import com.yahoo.document.FieldPath; import com.yahoo.document.datatypes.FieldPathIteratorHandler; import com.yahoo.document.datatypes.FieldPathIteratorHandler.ModificationStatus; import com.yahoo.document.datatypes.FieldValue; -import com.yahoo.document.datatypes.Struct; import com.yahoo.document.serialization.DocumentReader; import com.yahoo.document.serialization.DocumentWriter; import com.yahoo.document.serialization.FieldReader; @@ -42,7 +41,6 @@ import java.util.Set; * * @author Vegard Havdal */ -@SuppressWarnings("removal") // TODO Vespa 8: remove public class ProxyDocument extends Document implements DocumentOperationWrapper { private final Map fieldMap; diff --git a/docproc/src/main/java/com/yahoo/docproc/proxy/ProxyDocumentUpdate.java b/docproc/src/main/java/com/yahoo/docproc/proxy/ProxyDocumentUpdate.java index 68af456829d..642ae216687 100644 --- a/docproc/src/main/java/com/yahoo/docproc/proxy/ProxyDocumentUpdate.java +++ b/docproc/src/main/java/com/yahoo/docproc/proxy/ProxyDocumentUpdate.java @@ -1,6 +1,6 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.docproc.proxy; -import com.yahoo.docproc.DocumentOperationWrapper; +import com.yahoo.docproc.impl.DocumentOperationWrapper; import com.yahoo.document.Document; import com.yahoo.document.DocumentId; import com.yahoo.document.DocumentOperation; @@ -18,7 +18,6 @@ import java.util.Map; * * @author vegardh */ -@SuppressWarnings("removal") // TODO Vespa 8: remove public class ProxyDocumentUpdate extends DocumentUpdate implements DocumentOperationWrapper { private DocumentUpdate docU; diff --git a/docproc/src/test/java/com/yahoo/docproc/CallbackTestCase.java b/docproc/src/test/java/com/yahoo/docproc/CallbackTestCase.java index 93ba8089395..c5ff24741a9 100644 --- a/docproc/src/test/java/com/yahoo/docproc/CallbackTestCase.java +++ b/docproc/src/test/java/com/yahoo/docproc/CallbackTestCase.java @@ -1,6 +1,8 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.docproc; +import com.yahoo.docproc.impl.DocprocService; +import com.yahoo.docproc.impl.ProcessingEndpoint; import com.yahoo.document.DataType; import com.yahoo.document.DocumentId; import com.yahoo.document.DocumentOperation; diff --git a/docproc/src/test/java/com/yahoo/docproc/DocumentProcessingAbstractTestCase.java b/docproc/src/test/java/com/yahoo/docproc/DocumentProcessingAbstractTestCase.java index 6a0d7727393..bce32936aab 100644 --- a/docproc/src/test/java/com/yahoo/docproc/DocumentProcessingAbstractTestCase.java +++ b/docproc/src/test/java/com/yahoo/docproc/DocumentProcessingAbstractTestCase.java @@ -1,6 +1,7 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.docproc; +import com.yahoo.docproc.impl.DocprocService; import com.yahoo.document.DataType; import com.yahoo.document.DocumentId; import com.yahoo.document.DocumentOperation; diff --git a/docproc/src/test/java/com/yahoo/docproc/EmptyProcessingTestCase.java b/docproc/src/test/java/com/yahoo/docproc/EmptyProcessingTestCase.java index 14a6dc3c2d5..fd312acd3ff 100644 --- a/docproc/src/test/java/com/yahoo/docproc/EmptyProcessingTestCase.java +++ b/docproc/src/test/java/com/yahoo/docproc/EmptyProcessingTestCase.java @@ -1,12 +1,12 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.docproc; +import com.yahoo.docproc.impl.DocprocService; import org.junit.Test; /** * @author Einar M R Rosenvinge */ -@SuppressWarnings("removal") // TODO Vespa 8: remove public class EmptyProcessingTestCase { @Test diff --git a/docproc/src/test/java/com/yahoo/docproc/FailingDocumentProcessingTestCase.java b/docproc/src/test/java/com/yahoo/docproc/FailingDocumentProcessingTestCase.java index 669621bb926..7c057b43030 100644 --- a/docproc/src/test/java/com/yahoo/docproc/FailingDocumentProcessingTestCase.java +++ b/docproc/src/test/java/com/yahoo/docproc/FailingDocumentProcessingTestCase.java @@ -1,6 +1,8 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.docproc; +import com.yahoo.docproc.impl.DocprocService; +import com.yahoo.docproc.impl.HandledProcessingException; import com.yahoo.document.DataType; import com.yahoo.document.DocumentId; import com.yahoo.document.DocumentPut; diff --git a/docproc/src/test/java/com/yahoo/docproc/FailingDocumentProcessingWithoutExceptionTestCase.java b/docproc/src/test/java/com/yahoo/docproc/FailingDocumentProcessingWithoutExceptionTestCase.java index 6293ac8e9f4..b3c631bdf46 100644 --- a/docproc/src/test/java/com/yahoo/docproc/FailingDocumentProcessingWithoutExceptionTestCase.java +++ b/docproc/src/test/java/com/yahoo/docproc/FailingDocumentProcessingWithoutExceptionTestCase.java @@ -1,6 +1,7 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.docproc; +import com.yahoo.docproc.impl.DocprocService; import com.yahoo.document.DataType; import com.yahoo.document.DocumentId; import com.yahoo.document.DocumentOperation; diff --git a/docproc/src/test/java/com/yahoo/docproc/FailingPermanentlyDocumentProcessingTestCase.java b/docproc/src/test/java/com/yahoo/docproc/FailingPermanentlyDocumentProcessingTestCase.java index 4da76484c1f..b19187e37ef 100644 --- a/docproc/src/test/java/com/yahoo/docproc/FailingPermanentlyDocumentProcessingTestCase.java +++ b/docproc/src/test/java/com/yahoo/docproc/FailingPermanentlyDocumentProcessingTestCase.java @@ -1,8 +1,8 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.docproc; +import com.yahoo.docproc.impl.DocprocService; import com.yahoo.document.DataType; -import com.yahoo.document.Document; import com.yahoo.document.DocumentId; import com.yahoo.document.DocumentOperation; import com.yahoo.document.DocumentPut; diff --git a/docproc/src/test/java/com/yahoo/docproc/FailingWithErrorTestCase.java b/docproc/src/test/java/com/yahoo/docproc/FailingWithErrorTestCase.java index c4442fadad3..59ba6c43e93 100644 --- a/docproc/src/test/java/com/yahoo/docproc/FailingWithErrorTestCase.java +++ b/docproc/src/test/java/com/yahoo/docproc/FailingWithErrorTestCase.java @@ -1,10 +1,9 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.docproc; +import com.yahoo.docproc.impl.DocprocService; import com.yahoo.document.DataType; -import com.yahoo.document.Document; import com.yahoo.document.DocumentId; -import com.yahoo.document.DocumentOperation; import com.yahoo.document.DocumentPut; import com.yahoo.document.DocumentType; import org.junit.Test; diff --git a/docproc/src/test/java/com/yahoo/docproc/NotAcceptingNewProcessingsTestCase.java b/docproc/src/test/java/com/yahoo/docproc/NotAcceptingNewProcessingsTestCase.java index 7b1cee20d17..2f5b859d699 100644 --- a/docproc/src/test/java/com/yahoo/docproc/NotAcceptingNewProcessingsTestCase.java +++ b/docproc/src/test/java/com/yahoo/docproc/NotAcceptingNewProcessingsTestCase.java @@ -1,6 +1,7 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.docproc; +import com.yahoo.docproc.impl.DocprocService; import org.junit.Test; import static org.junit.Assert.assertEquals; diff --git a/docproc/src/test/java/com/yahoo/docproc/ProcessingUpdateTestCase.java b/docproc/src/test/java/com/yahoo/docproc/ProcessingUpdateTestCase.java index 9afce98dda1..5ee593cfbf7 100644 --- a/docproc/src/test/java/com/yahoo/docproc/ProcessingUpdateTestCase.java +++ b/docproc/src/test/java/com/yahoo/docproc/ProcessingUpdateTestCase.java @@ -1,6 +1,7 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.docproc; +import com.yahoo.docproc.impl.DocprocService; import com.yahoo.document.DataType; import com.yahoo.document.Document; import com.yahoo.document.DocumentId; diff --git a/docproc/src/test/java/com/yahoo/docproc/SimpleDocumentProcessingTestCase.java b/docproc/src/test/java/com/yahoo/docproc/SimpleDocumentProcessingTestCase.java index 84d0cad7a4e..f1e0162080f 100644 --- a/docproc/src/test/java/com/yahoo/docproc/SimpleDocumentProcessingTestCase.java +++ b/docproc/src/test/java/com/yahoo/docproc/SimpleDocumentProcessingTestCase.java @@ -3,6 +3,7 @@ package com.yahoo.docproc; import com.yahoo.component.chain.dependencies.After; import com.yahoo.docproc.Accesses.Field.Tree; +import com.yahoo.docproc.impl.DocprocService; import org.junit.Test; import static org.junit.Assert.assertEquals; diff --git a/docproc/src/test/java/com/yahoo/docproc/SimpleDocumentProcessorTestCase.java b/docproc/src/test/java/com/yahoo/docproc/SimpleDocumentProcessorTestCase.java index e23c1d2e7fd..1b591f89942 100644 --- a/docproc/src/test/java/com/yahoo/docproc/SimpleDocumentProcessorTestCase.java +++ b/docproc/src/test/java/com/yahoo/docproc/SimpleDocumentProcessorTestCase.java @@ -1,6 +1,7 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.docproc; +import com.yahoo.docproc.impl.DocprocService; import com.yahoo.jdisc.test.MockMetric; import com.yahoo.document.DataType; import com.yahoo.document.DocumentId; diff --git a/docproc/src/test/java/com/yahoo/docproc/TransientFailureTestCase.java b/docproc/src/test/java/com/yahoo/docproc/TransientFailureTestCase.java index 44c0347aa1f..2dd2bfb9cfc 100644 --- a/docproc/src/test/java/com/yahoo/docproc/TransientFailureTestCase.java +++ b/docproc/src/test/java/com/yahoo/docproc/TransientFailureTestCase.java @@ -1,6 +1,9 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.docproc; +import com.yahoo.docproc.impl.DocprocService; +import com.yahoo.docproc.impl.ProcessingEndpoint; +import com.yahoo.docproc.impl.TransientFailureException; import com.yahoo.document.DataType; import com.yahoo.document.DocumentId; import com.yahoo.document.DocumentOperation; diff --git a/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerTestBase.java b/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerTestBase.java index 5e07b26accd..499eb93fe80 100644 --- a/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerTestBase.java +++ b/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerTestBase.java @@ -1,7 +1,6 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.docproc.jdisc; -import com.yahoo.cloud.config.SlobroksConfig; import com.yahoo.collections.Pair; import com.yahoo.component.ComponentId; import com.yahoo.component.provider.ComponentRegistry; @@ -11,15 +10,13 @@ import com.yahoo.container.jdisc.messagebus.MbusServerProvider; import com.yahoo.container.jdisc.messagebus.NetworkMultiplexerProvider; import com.yahoo.container.jdisc.messagebus.SessionCache; import com.yahoo.docproc.CallStack; -import com.yahoo.docproc.DocprocService; +import com.yahoo.docproc.impl.DocprocService; import com.yahoo.docproc.jdisc.messagebus.MbusRequestContext; import com.yahoo.document.DocumentType; import com.yahoo.document.DocumentTypeManager; -import com.yahoo.document.config.DocumentmanagerConfig; import com.yahoo.documentapi.messagebus.protocol.DocumentMessage; import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; -import com.yahoo.documentapi.messagebus.protocol.DocumentProtocolPoliciesConfig; import com.yahoo.jdisc.AbstractResource; import com.yahoo.jdisc.ReferencedResource; import com.yahoo.jdisc.application.ContainerBuilder; -- cgit v1.2.3