diff options
37 files changed, 4 insertions, 2441 deletions
diff --git a/vespa-hadoop/pom.xml b/vespa-hadoop/pom.xml index 0c724c95839..1dde2d58610 100644 --- a/vespa-hadoop/pom.xml +++ b/vespa-hadoop/pom.xml @@ -136,11 +136,6 @@ <!-- Vespa feeding dependencies --> <dependency> <groupId>com.yahoo.vespa</groupId> - <artifactId>vespa-http-client</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>com.yahoo.vespa</groupId> <artifactId>vespa-feed-client</artifactId> <version>${project.version}</version> </dependency> diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/LegacyVespaRecordWriter.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/LegacyVespaRecordWriter.java deleted file mode 100644 index 6900c7dc82f..00000000000 --- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/LegacyVespaRecordWriter.java +++ /dev/null @@ -1,233 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.hadoop.mapreduce; - -import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.core.JsonParseException; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonToken; -import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration; -import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters; -import com.yahoo.vespa.hadoop.pig.VespaDocumentOperation; -import com.yahoo.vespa.http.client.config.Cluster; -import com.yahoo.vespa.http.client.config.ConnectionParams; -import com.yahoo.vespa.http.client.config.Endpoint; -import com.yahoo.vespa.http.client.config.FeedParams; -import com.yahoo.vespa.http.client.config.FeedParams.DataFormat; -import com.yahoo.vespa.http.client.config.SessionParams; -import org.apache.hadoop.mapreduce.RecordWriter; -import org.apache.hadoop.mapreduce.TaskAttemptContext; - -import javax.xml.namespace.QName; -import javax.xml.stream.FactoryConfigurationError; -import javax.xml.stream.XMLEventReader; -import javax.xml.stream.XMLInputFactory; -import javax.xml.stream.XMLStreamException; -import javax.xml.stream.events.StartElement; -import javax.xml.stream.events.XMLEvent; -import java.io.IOException; -import java.io.StringReader; -import java.time.Duration; -import java.util.List; -import java.util.StringTokenizer; -import java.util.concurrent.ThreadLocalRandom; -import java.util.logging.Logger; - -/** - * {@link LegacyVespaRecordWriter} sends the output <key, value> to one or more Vespa endpoints using vespa-http-client. - * - * @author lesters - * @deprecated Replaced by {@link VespaRecordWriter} - */ -@Deprecated -public class LegacyVespaRecordWriter extends RecordWriter<Object, Object> { - - private final Logger log = Logger.getLogger(getClass().getCanonicalName()); - - private boolean initialized = false; - private com.yahoo.vespa.http.client.FeedClient feedClient; - private final VespaCounters counters; - private final int progressInterval; - - final VespaConfiguration configuration; - - LegacyVespaRecordWriter(VespaConfiguration configuration, VespaCounters counters) { - this.counters = counters; - this.configuration = configuration; - this.progressInterval = configuration.progressInterval(); - } - - - @Override - public void write(Object key, Object data) throws IOException, InterruptedException { - if (!initialized) { - initialize(); - } - - String doc = data.toString().trim(); - - // Parse data to find document id - if none found, skip this write - String docId = DataFormat.JSON_UTF8.equals(configuration.dataFormat()) ? findDocId(doc) - : findDocIdFromXml(doc); - if (docId != null && docId.length() >= 0) { - feedClient.stream(docId, doc); - counters.incrementDocumentsSent(1); - } else { - counters.incrementDocumentsSkipped(1); - } - - if (counters.getDocumentsSent() % progressInterval == 0) { - String progress = String.format("Feed progress: %d / %d / %d / %d (sent, ok, failed, skipped)", - counters.getDocumentsSent(), - counters.getDocumentsOk(), - counters.getDocumentsFailed(), - counters.getDocumentsSkipped()); - log.info(progress); - } - - } - - - @Override - public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { - if (feedClient != null) { - feedClient.close(); - } - } - - protected ConnectionParams.Builder configureConnectionParams() { - ConnectionParams.Builder connParamsBuilder = new ConnectionParams.Builder(); - connParamsBuilder.setDryRun(configuration.dryrun()); - connParamsBuilder.setUseCompression(configuration.useCompression()); - connParamsBuilder.setNumPersistentConnectionsPerEndpoint(configuration.numConnections()); - connParamsBuilder.setMaxRetries(configuration.numRetries()); - if (configuration.proxyHost() != null) { - connParamsBuilder.setProxyHost(configuration.proxyHost()); - } - if (configuration.proxyPort() >= 0) { - connParamsBuilder.setProxyPort(configuration.proxyPort()); - } - return connParamsBuilder; - } - - protected FeedParams.Builder configureFeedParams() { - FeedParams.Builder feedParamsBuilder = new FeedParams.Builder(); - feedParamsBuilder.setDataFormat(configuration.dataFormat()); - feedParamsBuilder.setRoute(configuration.route()); - feedParamsBuilder.setMaxSleepTimeMs(configuration.maxSleepTimeMs()); - feedParamsBuilder.setMaxInFlightRequests(configuration.maxInFlightRequests()); - feedParamsBuilder.setLocalQueueTimeOut(Duration.ofMinutes(10).toMillis()); - return feedParamsBuilder; - } - - protected SessionParams.Builder configureSessionParams() { - SessionParams.Builder sessionParamsBuilder = new SessionParams.Builder(); - sessionParamsBuilder.setThrottlerMinSize(configuration.throttlerMinSize()); - sessionParamsBuilder.setClientQueueSize(configuration.maxInFlightRequests()*2); - return sessionParamsBuilder; - } - - private void initialize() { - if (!configuration.dryrun() && configuration.randomStartupSleepMs() > 0) { - int delay = ThreadLocalRandom.current().nextInt(configuration.randomStartupSleepMs()); - log.info("VespaStorage: Delaying startup by " + delay + " ms"); - try { - Thread.sleep(delay); - } catch (Exception e) {} - } - - ConnectionParams.Builder connParamsBuilder = configureConnectionParams(); - FeedParams.Builder feedParamsBuilder = configureFeedParams(); - SessionParams.Builder sessionParams = configureSessionParams(); - - sessionParams.setConnectionParams(connParamsBuilder.build()); - sessionParams.setFeedParams(feedParamsBuilder.build()); - - String endpoints = configuration.endpoint(); - StringTokenizer tokenizer = new StringTokenizer(endpoints, ","); - while (tokenizer.hasMoreTokens()) { - String endpoint = tokenizer.nextToken().trim(); - sessionParams.addCluster(new Cluster.Builder().addEndpoint( - Endpoint.create(endpoint, configuration.defaultPort(), configuration.useSSL().orElse(false)) - ).build()); - } - - ResultCallback resultCallback = new ResultCallback(counters); - feedClient = com.yahoo.vespa.http.client.FeedClientFactory.create(sessionParams.build(), resultCallback); - - initialized = true; - log.info("VespaStorage configuration:\n" + configuration.toString()); - log.info(feedClient.getStatsAsJson()); - } - - private String findDocIdFromXml(String xml) { - try { - XMLEventReader eventReader = XMLInputFactory.newInstance().createXMLEventReader(new StringReader(xml)); - while (eventReader.hasNext()) { - XMLEvent event = eventReader.nextEvent(); - if (event.getEventType() == XMLEvent.START_ELEMENT) { - StartElement element = event.asStartElement(); - String elementName = element.getName().getLocalPart(); - if (VespaDocumentOperation.Operation.valid(elementName)) { - return element.getAttributeByName(QName.valueOf("documentid")).getValue(); - } - } - } - } catch (XMLStreamException | FactoryConfigurationError e) { - // as json dude does - return null; - } - return null; - } - - private String findDocId(String json) throws IOException { - JsonFactory factory = new JsonFactory(); - try(JsonParser parser = factory.createParser(json)) { - if (parser.nextToken() != JsonToken.START_OBJECT) { - return null; - } - while (parser.nextToken() != JsonToken.END_OBJECT) { - String fieldName = parser.getCurrentName(); - parser.nextToken(); - if (VespaDocumentOperation.Operation.valid(fieldName)) { - String docId = parser.getText(); - return docId; - } else { - parser.skipChildren(); - } - } - } catch (JsonParseException ex) { - return null; - } - return null; - } - - - class ResultCallback implements com.yahoo.vespa.http.client.FeedClient.ResultCallback { - final VespaCounters counters; - - public ResultCallback(VespaCounters counters) { - this.counters = counters; - } - - @Override - public void onCompletion(String docId, com.yahoo.vespa.http.client.Result documentResult) { - if (!documentResult.isSuccess()) { - counters.incrementDocumentsFailed(1); - StringBuilder sb = new StringBuilder(); - sb.append("Problems with docid "); - sb.append(docId); - sb.append(": "); - List<com.yahoo.vespa.http.client.Result.Detail> details = documentResult.getDetails(); - for (com.yahoo.vespa.http.client.Result.Detail detail : details) { - sb.append(detail.toString()); - sb.append(" "); - } - log.warning(sb.toString()); - return; - } - counters.incrementDocumentsOk(1); - } - - } - -} diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java index 66ab94574d9..42eb6293eee 100644 --- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java +++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java @@ -10,24 +10,16 @@ import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import java.io.IOException; -import java.util.Objects; import java.util.Properties; -import java.util.logging.Logger; - -import static com.yahoo.vespa.http.client.config.FeedParams.DataFormat.XML_UTF8; /** * An output specification for writing to Vespa instances in a Map-Reduce job. - * Mainly returns an instance of a {@link LegacyVespaRecordWriter} that does the - * actual feeding to Vespa. * * @author lesters */ @SuppressWarnings("rawtypes") public class VespaOutputFormat extends OutputFormat { - private static final Logger log = Logger.getLogger(VespaOutputFormat.class.getName()); - final Properties configOverride; public VespaOutputFormat() { @@ -42,18 +34,10 @@ public class VespaOutputFormat extends OutputFormat { @Override - @SuppressWarnings("deprecation") public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOException { VespaCounters counters = VespaCounters.get(context); VespaConfiguration configuration = VespaConfiguration.get(context.getConfiguration(), configOverride); - Boolean useLegacyClient = configuration.useLegacyClient().orElse(null); - if (Objects.equals(useLegacyClient, Boolean.TRUE) || configuration.dataFormat() == XML_UTF8) { - log.warning("Feeding with legacy client or XML will no longer be supported on Vespa 8. " + - "See https://docs.vespa.ai/en/vespa8-release-notes.html"); - return new LegacyVespaRecordWriter(configuration, counters); - } else { - return new VespaRecordWriter(configuration, counters); - } + return new VespaRecordWriter(configuration, counters); } diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java index 6d6c3789835..c450d7cdeef 100644 --- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java +++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java @@ -7,7 +7,6 @@ import ai.vespa.feed.client.JsonFeeder; import ai.vespa.feed.client.OperationParseException; import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration; import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters; -import com.yahoo.vespa.http.client.config.FeedParams; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; @@ -85,18 +84,11 @@ public class VespaRecordWriter extends RecordWriter<Object, Object> { private void initializeOnFirstWrite() { if (initialized) return; - validateConfig(); useRandomizedStartupDelayIfEnabled(); feeder = createJsonStreamFeeder(); initialized = true; } - private void validateConfig() { - if (config.dataFormat() != FeedParams.DataFormat.JSON_UTF8) { - throw new IllegalArgumentException("Only JSON is support by this feed client implementation"); - } - } - private void useRandomizedStartupDelayIfEnabled() { if (!config.dryrun() && config.randomStartupSleepMs() > 0) { int delay = ThreadLocalRandom.current().nextInt(config.randomStartupSleepMs()); diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java index 715546fe6fe..54be261fbe7 100644 --- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java +++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java @@ -1,7 +1,6 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.hadoop.mapreduce.util; -import com.yahoo.vespa.http.client.config.FeedParams; import org.apache.hadoop.conf.Configuration; import java.io.IOException; @@ -19,7 +18,6 @@ public class VespaConfiguration { public static final String PROXY_SCHEME = "vespa.feed.proxy.scheme"; public static final String DRYRUN = "vespa.feed.dryrun"; public static final String USE_COMPRESSION = "vespa.feed.usecompression"; - public static final String DATA_FORMAT = "vespa.feed.data.format"; public static final String PROGRESS_REPORT = "vespa.feed.progress.interval"; public static final String CONNECTIONS = "vespa.feed.connections"; public static final String THROTTLER_MIN_SIZE = "vespa.feed.throttler.min.size"; @@ -29,7 +27,6 @@ public class VespaConfiguration { public static final String MAX_IN_FLIGHT_REQUESTS = "vespa.feed.max.in.flight.requests"; public static final String RANDOM_STARTUP_SLEEP = "vespa.feed.random.startup.sleep.ms"; public static final String NUM_RETRIES = "vespa.feed.num.retries"; - public static final String USE_LEGACY_CLIENT = "vespa.feed.uselegacyclient"; private final Configuration conf; private final Properties override; @@ -39,115 +36,82 @@ public class VespaConfiguration { this.override = override; } - public static VespaConfiguration get(Configuration conf, Properties override) { return new VespaConfiguration(conf, override); } - public String endpoint() { return getString(ENDPOINT); } - public int defaultPort() { return getInt(DEFAULT_PORT, 4080); } - public Optional<Boolean> useSSL() { String raw = getString(USE_SSL); if (raw == null || raw.trim().isEmpty()) return Optional.empty(); return Optional.of(Boolean.parseBoolean(raw)); } - public String proxyHost() { return getString(PROXY_HOST); } - public int proxyPort() { return getInt(PROXY_PORT, 4080); } - public String proxyScheme() { String raw = getString(PROXY_SCHEME); if (raw == null) return "http"; return raw; } - public boolean dryrun() { return getBoolean(DRYRUN, false); } - public boolean useCompression() { return getBoolean(USE_COMPRESSION, true); } - public int numConnections() { return getInt(CONNECTIONS, 1); } - public int throttlerMinSize() { return getInt(THROTTLER_MIN_SIZE, 0); } - public int queryConnectionTimeout() { return getInt(QUERY_CONNECTION_TIMEOUT, 10000); } - public String route() { return getString(ROUTE); } - public int maxSleepTimeMs() { return getInt(MAX_SLEEP_TIME_MS, 10000); } - public int maxInFlightRequests() { return getInt(MAX_IN_FLIGHT_REQUESTS, 500); } - public int randomStartupSleepMs() { return getInt(RANDOM_STARTUP_SLEEP, 30000); } - public int numRetries() { return getInt(NUM_RETRIES, 100); } - - public FeedParams.DataFormat dataFormat() { - String format = getString(DATA_FORMAT); - if ("xml".equalsIgnoreCase(format)) { - return FeedParams.DataFormat.XML_UTF8; - } - return FeedParams.DataFormat.JSON_UTF8; - } - - public int progressInterval() { return getInt(PROGRESS_REPORT, 1000); } - public Optional<Boolean> useLegacyClient() { - String raw = getString(USE_LEGACY_CLIENT); - if (raw == null || raw.trim().isEmpty()) return Optional.empty(); - return Optional.of(Boolean.parseBoolean(raw)); - } - public String getString(String name) { if (override != null && override.containsKey(name)) { return override.getProperty(name); @@ -197,7 +161,6 @@ public class VespaConfiguration { sb.append(PROXY_SCHEME + ": " + proxyScheme() + "\n"); sb.append(DRYRUN + ": " + dryrun() +"\n"); sb.append(USE_COMPRESSION + ": " + useCompression() +"\n"); - sb.append(DATA_FORMAT + ": " + dataFormat() +"\n"); sb.append(PROGRESS_REPORT + ": " + progressInterval() +"\n"); sb.append(CONNECTIONS + ": " + numConnections() +"\n"); sb.append(THROTTLER_MIN_SIZE + ": " + throttlerMinSize() +"\n"); @@ -207,7 +170,6 @@ public class VespaConfiguration { sb.append(MAX_IN_FLIGHT_REQUESTS + ": " + maxInFlightRequests() +"\n"); sb.append(RANDOM_STARTUP_SLEEP + ": " + randomStartupSleepMs() +"\n"); sb.append(NUM_RETRIES + ": " + numRetries() +"\n"); - sb.append(USE_LEGACY_CLIENT + ": " + useLegacyClient().map(Object::toString).orElse("<empty>") +"\n"); return sb.toString(); } diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java index f690e767194..39b24799002 100644 --- a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java +++ b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java @@ -24,35 +24,24 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; public class VespaStorageTest { @Test - public void requireThatPremadeXmlOperationsFeedSucceeds() throws Exception { - Configuration conf = new HdfsConfiguration(); - conf.set(VespaConfiguration.DATA_FORMAT, "xml"); - assertAllDocumentsOk("src/test/pig/feed_operations_xml.pig", conf); - } - - - @Test public void requireThatPremadeOperationsFeedSucceeds() throws Exception { assertAllDocumentsOk("src/test/pig/feed_operations.pig"); } - @Test public void requireThatPremadeMultilineOperationsFeedSucceeds() throws Exception { assertAllDocumentsOk("src/test/pig/feed_multiline_operations.pig"); } - @Test public void requireThatPremadeOperationsWithJsonLoaderFeedSucceeds() throws Exception { assertAllDocumentsOk("src/test/pig/feed_operations_with_json_loader.pig"); } @Test - public void requireThatPremadeOperationsWithJsonLoaderFeedAndNonLegacyClientSucceeds() throws Exception { + public void requireThatPremadeOperationsWithJsonLoaderFeedWithSllSucceeds() throws Exception { Configuration conf = new HdfsConfiguration(); conf.set(VespaConfiguration.USE_SSL, Boolean.TRUE.toString()); - conf.set(VespaConfiguration.USE_LEGACY_CLIENT, Boolean.FALSE.toString()); assertAllDocumentsOk("src/test/pig/feed_operations_with_json_loader.pig", conf); } @@ -61,19 +50,16 @@ public class VespaStorageTest { assertAllDocumentsOk("src/test/pig/feed_create_operations.pig"); } - @Test public void requireThatCreateOperationsShortFormFeedSucceeds() throws Exception { assertAllDocumentsOk("src/test/pig/feed_create_operations_short_form.pig"); } - @Test public void requireThatFeedVisitDataSucceeds() throws Exception { assertAllDocumentsOk("src/test/pig/feed_visit_data.pig"); } - private PigServer setup(String script, Configuration conf) throws Exception { if (conf == null) { conf = new HdfsConfiguration(); @@ -92,12 +78,10 @@ public class VespaStorageTest { return ps; } - private void assertAllDocumentsOk(String script) throws Exception { assertAllDocumentsOk(script, null); } - private void assertAllDocumentsOk(String script, Configuration conf) throws Exception { PigServer ps = setup(script, conf); List<ExecJob> jobs = ps.executeBatch(); diff --git a/vespaclient-container-plugin/pom.xml b/vespaclient-container-plugin/pom.xml index c960c2cca44..d7b36e39c94 100644 --- a/vespaclient-container-plugin/pom.xml +++ b/vespaclient-container-plugin/pom.xml @@ -38,42 +38,6 @@ <scope>provided</scope> </dependency> <dependency> - <groupId>com.yahoo.vespa</groupId> - <artifactId>vespa-http-client</artifactId> - <version>${project.version}</version> - <exclusions> - <!-- Exclude artifacts that are provided by Jdisc container --> - <exclusion> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>*</artifactId> - </exclusion> - <exclusion> - <groupId>com.fasterxml.jackson.datatype</groupId> - <artifactId>*</artifactId> - </exclusion> - <exclusion> - <groupId>com.fasterxml.jackson.module</groupId> - <artifactId>*</artifactId> - </exclusion> - <exclusion> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - </exclusion> - <exclusion> - <groupId>com.yahoo.vespa</groupId> - <artifactId>security-utils</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.httpcomponents</groupId> - <artifactId>*</artifactId> - </exclusion> - <exclusion> - <groupId>org.bouncycastle</groupId> - <artifactId>*</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> <groupId>org.mockito</groupId> <artifactId>mockito-core</artifactId> <scope>test</scope> diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java index 8c2e39d595e..ed068c77e11 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java @@ -20,7 +20,6 @@ import com.yahoo.document.DocumentUpdate; import com.yahoo.document.FixedBucketSpaces; import com.yahoo.document.TestAndSetCondition; import com.yahoo.document.config.DocumentmanagerConfig; -import com.yahoo.document.fieldset.AllFields; import com.yahoo.document.fieldset.DocIdOnly; import com.yahoo.document.fieldset.DocumentOnly; import com.yahoo.document.idstring.IdIdString; @@ -46,6 +45,7 @@ import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage; import com.yahoo.documentapi.metrics.DocumentApiMetrics; import com.yahoo.documentapi.metrics.DocumentOperationStatus; +import com.yahoo.documentapi.metrics.MetricNames; import com.yahoo.jdisc.Metric; import com.yahoo.jdisc.Request; import com.yahoo.jdisc.Response; @@ -68,7 +68,6 @@ import com.yahoo.restapi.Path; import com.yahoo.search.query.ParameterParser; import com.yahoo.text.Text; import com.yahoo.vespa.config.content.AllClustersBucketSpacesConfig; -import com.yahoo.vespa.http.server.MetricNames; import com.yahoo.yolean.Exceptions; import com.yahoo.yolean.Exceptions.RunnableThrowingIOException; diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/MetricNames.java b/vespaclient-container-plugin/src/main/java/com/yahoo/documentapi/metrics/MetricNames.java index a5987f2398e..c2d9f0b292e 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/MetricNames.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/documentapi/metrics/MetricNames.java @@ -1,5 +1,5 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.server; +package com.yahoo.documentapi.metrics; /** * Place to store the metric names so where the metrics are logged can be found diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientFeederV3.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientFeederV3.java deleted file mode 100644 index 875ff3e5bf0..00000000000 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientFeederV3.java +++ /dev/null @@ -1,287 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.server; - -import com.yahoo.container.jdisc.HttpRequest; -import com.yahoo.container.jdisc.HttpResponse; -import com.yahoo.document.DocumentTypeManager; -import com.yahoo.documentapi.messagebus.protocol.DocumentMessage; -import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; -import com.yahoo.jdisc.Metric; -import com.yahoo.jdisc.ReferencedResource; -import com.yahoo.jdisc.ResourceReference; -import com.yahoo.messagebus.Message; -import com.yahoo.messagebus.ReplyHandler; -import com.yahoo.messagebus.Result; -import com.yahoo.messagebus.shared.SharedSourceSession; -import com.yahoo.net.HostName; -import com.yahoo.vespa.http.client.core.ErrorCode; -import com.yahoo.vespa.http.client.core.Headers; -import com.yahoo.vespa.http.client.core.OperationStatus; -import com.yahoo.vespaxmlparser.FeedOperation; -import com.yahoo.yolean.Exceptions; - -import java.io.IOException; -import java.io.InputStream; -import java.time.Duration; -import java.time.Instant; -import java.util.Optional; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * An instance of this class handles all requests from one client using VespaHttpClient. - * - * The implementation is based on the code from V2, but the object model is rewritten to simplify the logic and - * avoid using a threadpool that has no effect with all the extra that comes with it. V2 has one instance per thread - * on the client, while this is one instance for all threads. - * - * @author dybis - */ -class ClientFeederV3 { - - protected static final Logger log = Logger.getLogger(ClientFeederV3.class.getName()); - // This is for all clients on this gateway, for load balancing from client. - private final static AtomicInteger outstandingOperations = new AtomicInteger(0); - private final BlockingQueue<OperationStatus> feedReplies = new LinkedBlockingQueue<>(); - private final ReferencedResource<SharedSourceSession> sourceSession; - private final String clientId; - private final ReplyHandler feedReplyHandler; - private final Metric metric; - private Instant prevOpsPerSecTime = Instant.now(); - private double operationsForOpsPerSec = 0d; - private final Object monitor = new Object(); - private final StreamReaderV3 streamReaderV3; - private final AtomicInteger ongoingRequests = new AtomicInteger(0); - private final String hostName; - - ClientFeederV3( - ReferencedResource<SharedSourceSession> sourceSession, - FeedReaderFactory feedReaderFactory, - DocumentTypeManager docTypeManager, - String clientId, - Metric metric, - ReplyHandler feedReplyHandler) { - this.sourceSession = sourceSession; - this.clientId = clientId; - this.feedReplyHandler = feedReplyHandler; - this.metric = metric; - this.streamReaderV3 = new StreamReaderV3(feedReaderFactory, docTypeManager); - this.hostName = HostName.getLocalhost(); - } - - boolean timedOut() { - synchronized (monitor) { - return Instant.now().isAfter(prevOpsPerSecTime.plusSeconds(6000)) && ongoingRequests.get() == 0; - } - } - - void kill() { - try (ResourceReference ignored = sourceSession.getReference()) { - // No new requests should be sent to this object, but there can be old one, even though this is very unlikely. - while (ongoingRequests.get() > 0) { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - return; - } - } - } catch (Exception e) { - log.log(Level.WARNING, "Failed to close reference to source session", e); - } - } - - private void transferPreviousRepliesToResponse(BlockingQueue<OperationStatus> operations) throws InterruptedException { - OperationStatus status = feedReplies.poll(); - while (status != null) { - outstandingOperations.decrementAndGet(); - operations.put(status); - status = feedReplies.poll(); - } - } - - HttpResponse handleRequest(HttpRequest request) throws IOException { - ongoingRequests.incrementAndGet(); - try { - FeederSettings feederSettings = new FeederSettings(request); - InputStream inputStream = StreamReaderV3.unzipStreamIfNeeded(request); - BlockingQueue<OperationStatus> replies = new LinkedBlockingQueue<>(); - try { - feed(feederSettings, inputStream, replies); - synchronized (monitor) { - // Handshake requests do not have DATA_FORMAT, we do not want to give responses to - // handshakes as it won't be processed by the client. - if (request.getJDiscRequest().headers().get(Headers.DATA_FORMAT) != null) { - transferPreviousRepliesToResponse(replies); - } - } - } catch (InterruptedException e) { - log.log(Level.FINE, e, () -> "Feed handler was interrupted: " + e.getMessage()); - // NOP, just terminate - } catch (Throwable e) { - log.log(Level.WARNING, "Unhandled exception while feeding: " + Exceptions.toMessageString(e), e); - } finally { - replies.add(createOperationStatus("-", "-", ErrorCode.END_OF_FEED, null)); - } - return new FeedResponse(200, replies, 3, clientId, outstandingOperations.get(), hostName); - } finally { - ongoingRequests.decrementAndGet(); - } - } - - private Optional<DocumentOperationMessageV3> pullMessageFromRequest(FeederSettings settings, - InputStream requestInputStream, - BlockingQueue<OperationStatus> repliesFromOldMessages) { - while (true) { - Optional<String> operationId; - try { - operationId = streamReaderV3.getNextOperationId(requestInputStream); - if (operationId.isEmpty()) return Optional.empty(); - } catch (IOException ioe) { - log.log(Level.FINE, () -> Exceptions.toMessageString(ioe)); - return Optional.empty(); - } - - try { - DocumentOperationMessageV3 message = getNextMessage(operationId.get(), requestInputStream, settings); - if (message != null) - setRoute(message, settings); - return Optional.ofNullable(message); - } catch (Exception e) { - log.log(Level.WARNING, () -> Exceptions.toMessageString(e)); - metric.add(MetricNames.PARSE_ERROR, 1, null); - - repliesFromOldMessages.add(new OperationStatus(Exceptions.toMessageString(e), - operationId.get(), - ErrorCode.ERROR, - false, - "")); - } - } - } - - private Result sendMessage(DocumentOperationMessageV3 msg) throws InterruptedException { - msg.getMessage().pushHandler(feedReplyHandler); - return sourceSession.getResource().sendMessageBlocking(msg.getMessage()); - } - - private void feed(FeederSettings settings, - InputStream requestInputStream, - BlockingQueue<OperationStatus> repliesFromOldMessages) throws InterruptedException { - while (true) { - Optional<DocumentOperationMessageV3> message = pullMessageFromRequest(settings, - requestInputStream, - repliesFromOldMessages); - - if (message.isEmpty()) break; - setMessageParameters(message.get(), settings); - - Result result; - try { - result = sendMessage(message.get()); - - } catch (RuntimeException e) { - repliesFromOldMessages.add(createOperationStatus(message.get().getOperationId(), - Exceptions.toMessageString(e), - ErrorCode.ERROR, - message.get().getMessage())); - continue; - } - - if (result.isAccepted()) { - outstandingOperations.incrementAndGet(); - updateOpsPerSec(); - log(Level.FINE, "Sent message successfully, document id: ", message.get().getOperationId()); - } else if (!result.getError().isFatal()) { - repliesFromOldMessages.add(createOperationStatus(message.get().getOperationId(), - result.getError().getMessage(), - ErrorCode.TRANSIENT_ERROR, - message.get().getMessage())); - } else { - repliesFromOldMessages.add(createOperationStatus(message.get().getOperationId(), - result.getError().getMessage(), - ErrorCode.ERROR, - message.get().getMessage())); - } - } - } - - private OperationStatus createOperationStatus(String id, String message, ErrorCode code, Message msg) { - String traceMessage = msg != null && msg.getTrace() != null && msg.getTrace().getLevel() > 0 - ? msg.getTrace().toString() - : ""; - return new OperationStatus(message, id, code, false, traceMessage); - } - - // protected for mocking - /** Returns the next message in the stream, or null if none */ - protected DocumentOperationMessageV3 getNextMessage(String operationId, - InputStream requestInputStream, - FeederSettings settings) throws Exception { - FeedOperation operation = streamReaderV3.getNextOperation(requestInputStream, settings); - - // This is a bit hard to set up while testing, so we accept that things are not perfect. - if (sourceSession.getResource().session() != null) { - metric.set( - MetricNames.PENDING, - Double.valueOf(sourceSession.getResource().session().getPendingCount()), - null); - } - - DocumentOperationMessageV3 message = DocumentOperationMessageV3.create(operation, operationId, metric); - if (message == null) { - // typical end of feed - return null; - } - metric.add(MetricNames.NUM_OPERATIONS, 1, null /*metricContext*/); - log(Level.FINE, "Successfully deserialized document id: ", message.getOperationId()); - return message; - } - - private void setMessageParameters(DocumentOperationMessageV3 msg, FeederSettings settings) { - msg.getMessage().setContext(new ReplyContext(msg.getOperationId(), feedReplies)); - if (settings.traceLevel != null) { - msg.getMessage().getTrace().setLevel(settings.traceLevel); - } - if (settings.priority != null) { - try { - DocumentProtocol.Priority priority = DocumentProtocol.Priority.valueOf(settings.priority); - } - catch (IllegalArgumentException i) { - log.severe(i.getMessage()); - } - } - } - - private void setRoute(DocumentOperationMessageV3 msg, FeederSettings settings) { - if (settings.route != null) { - msg.getMessage().setRoute(settings.route); - } - } - - protected final void log(Level level, Object... msgParts) { - if (!log.isLoggable(level)) return; - - StringBuilder s = new StringBuilder(); - for (Object part : msgParts) - s.append(part.toString()); - log.log(level, s.toString()); - } - - private void updateOpsPerSec() { - Instant now = Instant.now(); - synchronized (monitor) { - if (now.plusSeconds(1).isAfter(prevOpsPerSecTime)) { - Duration duration = Duration.between(now, prevOpsPerSecTime); - double opsPerSec = operationsForOpsPerSec / (duration.toMillis() / 1000.); - metric.set(MetricNames.OPERATIONS_PER_SEC, opsPerSec, null /*metricContext*/); - operationsForOpsPerSec = 1.0d; - prevOpsPerSecTime = now; - } else { - operationsForOpsPerSec += 1.0d; - } - } - } -} diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientState.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientState.java deleted file mode 100644 index 13a12f707d9..00000000000 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientState.java +++ /dev/null @@ -1,40 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.server; - -import com.yahoo.jdisc.Metric; -import com.yahoo.jdisc.ReferencedResource; -import com.yahoo.messagebus.shared.SharedSourceSession; -import com.yahoo.vespa.http.client.core.OperationStatus; - -import java.util.concurrent.BlockingQueue; - -/** - * The state of a client session, used to save replies when client disconnects. - * - * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a> - */ -public class ClientState { - - public final int pending; - public final long creationTime; - public final BlockingQueue<OperationStatus> feedReplies; - public final ReferencedResource<SharedSourceSession> sourceSession; - public final Metric.Context metricContext; - - public final long prevOpsPerSecTime; // previous measurement time of OPS - public final double operationsForOpsPerSec; - - public ClientState(int pending, BlockingQueue<OperationStatus> feedReplies, - ReferencedResource<SharedSourceSession> sourceSession, Metric.Context metricContext, - long prevOpsPerSecTime, double operationsForOpsPerSec) { - super(); - this.pending = pending; - this.feedReplies = feedReplies; - this.sourceSession = sourceSession; - this.metricContext = metricContext; - creationTime = System.currentTimeMillis(); - this.prevOpsPerSecTime = prevOpsPerSecTime; - this.operationsForOpsPerSec = operationsForOpsPerSec; - } - -} diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/DocumentOperationMessageV3.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/DocumentOperationMessageV3.java deleted file mode 100644 index 25bf5815907..00000000000 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/DocumentOperationMessageV3.java +++ /dev/null @@ -1,83 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.server; - -import com.yahoo.document.DocumentPut; -import com.yahoo.document.DocumentRemove; -import com.yahoo.document.DocumentUpdate; -import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage; -import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage; -import com.yahoo.documentapi.messagebus.protocol.UpdateDocumentMessage; -import com.yahoo.jdisc.Metric; -import com.yahoo.messagebus.Message; -import com.yahoo.vespaxmlparser.FeedOperation; - -/** - * Keeps an operation with its message. - * - * This implementation is based on V2, but the code is restructured. - * - * @author dybis - */ -class DocumentOperationMessageV3 { - - private final String operationId; - private final Message message; - - private DocumentOperationMessageV3(String operationId, Message message) { - this.operationId = operationId; - this.message = message; - } - - Message getMessage() { - return message; - } - - String getOperationId() { - return operationId; - } - - private static DocumentOperationMessageV3 newUpdateMessage(FeedOperation op, String operationId) { - DocumentUpdate update = op.getDocumentUpdate(); - update.setCondition(op.getCondition()); - Message msg = new UpdateDocumentMessage(update); - - String id = (operationId == null) ? update.getId().toString() : operationId; - return new DocumentOperationMessageV3(id, msg); - } - - static DocumentOperationMessageV3 newRemoveMessage(FeedOperation op, String operationId) { - DocumentRemove remove = new DocumentRemove(op.getRemove()); - remove.setCondition(op.getCondition()); - Message msg = new RemoveDocumentMessage(remove); - - String id = (operationId == null) ? remove.getId().toString() : operationId; - return new DocumentOperationMessageV3(id, msg); - } - - private static DocumentOperationMessageV3 newPutMessage(FeedOperation op, String operationId) { - DocumentPut put = new DocumentPut(op.getDocument()); - put.setCondition(op.getCondition()); - Message msg = new PutDocumentMessage(put); - - String id = (operationId == null) ? put.getId().toString() : operationId; - return new DocumentOperationMessageV3(id, msg); - } - - static DocumentOperationMessageV3 create(FeedOperation operation, String operationId, Metric metric) { - switch (operation.getType()) { - case DOCUMENT: - metric.add(MetricNames.NUM_PUTS, 1, null /*metricContext*/); - return newPutMessage(operation, operationId); - case REMOVE: - metric.add(MetricNames.NUM_REMOVES, 1, null /*metricContext*/); - return newRemoveMessage(operation, operationId); - case UPDATE: - metric.add(MetricNames.NUM_UPDATES, 1, null /*metricContext*/); - return newUpdateMessage(operation, operationId); - default: - // typical end of feed - return null; - } - } - -} diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ErrorHttpResponse.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ErrorHttpResponse.java deleted file mode 100644 index a12cd1ec089..00000000000 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ErrorHttpResponse.java +++ /dev/null @@ -1,24 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.server; - -import com.yahoo.container.jdisc.HttpResponse; -import com.yahoo.text.Utf8; - -import java.io.IOException; -import java.io.OutputStream; - -public class ErrorHttpResponse extends HttpResponse { - - private final String msg; - - public ErrorHttpResponse(final int statusCode, final String msg) { - super(statusCode); - this.msg = msg; - } - - @Override - public void render(OutputStream outputStream) throws IOException { - outputStream.write(Utf8.toBytes(msg)); - } - -} diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandler.java deleted file mode 100644 index f99274d3f2b..00000000000 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandler.java +++ /dev/null @@ -1,148 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.server; - -import com.yahoo.collections.Tuple2; -import com.yahoo.container.handler.threadpool.ContainerThreadPool; -import com.yahoo.container.jdisc.HttpRequest; -import com.yahoo.container.jdisc.HttpResponse; -import com.yahoo.container.jdisc.ThreadedHttpRequestHandler; -import com.yahoo.container.jdisc.messagebus.SessionCache; -import com.yahoo.document.DocumentTypeManager; -import com.yahoo.documentapi.metrics.DocumentApiMetrics; -import com.yahoo.jdisc.Metric; -import com.yahoo.jdisc.Request; -import com.yahoo.jdisc.Response; -import com.yahoo.jdisc.handler.ResponseHandler; -import com.yahoo.messagebus.ReplyHandler; -import com.yahoo.metrics.simple.MetricReceiver; -import com.yahoo.vespa.http.client.core.Headers; - -import javax.inject.Inject; -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Optional; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import java.util.zip.GZIPInputStream; - -/** - * Accept feeds from outside of the Vespa cluster. - * - * @author Steinar Knutsen - */ -public class FeedHandler extends ThreadedHttpRequestHandler { - - protected final ReplyHandler feedReplyHandler; - private static final List<Integer> serverSupportedVersions = Collections.unmodifiableList(Arrays.asList(3)); - private static final Pattern USER_AGENT_PATTERN = Pattern.compile("vespa-http-client \\((.+)\\)"); - private final FeedHandlerV3 feedHandlerV3; - private final DocumentApiMetrics metricsHelper; - - @Inject - public FeedHandler(ContainerThreadPool threadpool, - Metric metric, - DocumentTypeManager documentTypeManager, - SessionCache sessionCache, - MetricReceiver metricReceiver) { - super(threadpool.executor(), metric); - metricsHelper = new DocumentApiMetrics(metricReceiver, "vespa.http.server"); - feedHandlerV3 = new FeedHandlerV3(threadpool.executor(), metric, documentTypeManager, sessionCache, metricsHelper); - feedReplyHandler = new FeedReplyReader(metric, metricsHelper); - } - - private Tuple2<HttpResponse, Integer> checkProtocolVersion(HttpRequest request) { - return doCheckProtocolVersion(request.getJDiscRequest().headers().get(Headers.VERSION)); - } - - static Tuple2<HttpResponse, Integer> doCheckProtocolVersion(List<String> clientSupportedVersions) { - List<String> washedClientVersions = splitVersions(clientSupportedVersions); - - if (washedClientVersions == null || washedClientVersions.isEmpty()) { - return new Tuple2<>(new ErrorHttpResponse( - Headers.HTTP_NOT_ACCEPTABLE, - "Request did not contain " + Headers.VERSION - + "header. Server supports protocol versions " - + serverSupportedVersions), -1); - } - - //select the highest version supported by both parties - //this could be extended when we support a gazillion versions - but right now: keep it simple. - int version; - if (washedClientVersions.contains("3")) { - version = 3; - } else { - return new Tuple2<>(new ErrorHttpResponse( - Headers.HTTP_NOT_ACCEPTABLE, - "Could not parse " + Headers.VERSION - + "header of request (values: " + washedClientVersions + - "). Server supports protocol versions " - + serverSupportedVersions), -1); - } - return new Tuple2<>(null, version); - } - - private static List<String> splitVersions(List<String> clientSupportedVersions) { - List<String> splittedVersions = new ArrayList<>(); - for (String v : clientSupportedVersions) { - if (v == null || v.trim().isEmpty()) { - continue; - } - if (!v.contains(",")) { - splittedVersions.add(v.trim()); - continue; - } - for (String part : v.split(",")) { - part = part.trim(); - if (!part.isEmpty()) { - splittedVersions.add(part); - } - } - } - return splittedVersions; - } - - @Override - public HttpResponse handle(HttpRequest request) { - metricsHelper.reportHttpRequest(findClientVersion(request).orElse(null)); - Tuple2<HttpResponse, Integer> protocolVersion = checkProtocolVersion(request); - - if (protocolVersion.first != null) { - return protocolVersion.first; - } - return feedHandlerV3.handle(request); - } - - @Override - protected void writeErrorResponseOnOverload(Request request, ResponseHandler responseHandler) { - int responseCode = request.headers().getFirst(Headers.SILENTUPGRADE) != null ? 299 : 429; - responseHandler.handleResponse(new Response(responseCode)).close(null); - } - - private static Optional<String> findClientVersion(HttpRequest request) { - String versionHeader = request.getHeader(Headers.CLIENT_VERSION); - if (versionHeader != null) { - return Optional.of(versionHeader); - } - return Optional.ofNullable(request.getHeader("User-Agent")) - .map(USER_AGENT_PATTERN::matcher) - .filter(Matcher::matches) - .map(matcher -> matcher.group(1)); - } - - // Protected for testing - protected static InputStream unzipStreamIfNeeded(InputStream inputStream, HttpRequest httpRequest) - throws IOException { - String contentEncodingHeader = httpRequest.getHeader("content-encoding"); - if ("gzip".equals(contentEncodingHeader)) { - return new GZIPInputStream(inputStream); - } else { - return inputStream; - } - } - - @Override protected void destroy() { feedHandlerV3.destroy(); } -} diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandlerV3.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandlerV3.java deleted file mode 100644 index c8828df6d54..00000000000 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandlerV3.java +++ /dev/null @@ -1,152 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.server; - -import com.yahoo.concurrent.ThreadFactoryFactory; -import com.yahoo.container.jdisc.HttpRequest; -import com.yahoo.container.jdisc.HttpResponse; -import com.yahoo.container.jdisc.ThreadedHttpRequestHandler; -import com.yahoo.container.jdisc.messagebus.SessionCache; -import com.yahoo.document.DocumentTypeManager; -import com.yahoo.documentapi.metrics.DocumentApiMetrics; -import com.yahoo.jdisc.Metric; -import com.yahoo.jdisc.ReferencedResource; -import com.yahoo.messagebus.ReplyHandler; -import com.yahoo.messagebus.SourceSessionParams; -import com.yahoo.messagebus.shared.SharedSourceSession; -import com.yahoo.vespa.http.client.core.Headers; -import com.yahoo.yolean.Exceptions; - -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.Executor; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * This code is based on v2 code, however, in v3, one client has one ClientFeederV3 shared between all client threads. - * The new API has more logic for shutting down cleanly as the server is more likely to be upgraded. - * The code is restructured a bit. - * - * @author dybis - */ -public class FeedHandlerV3 extends ThreadedHttpRequestHandler { - - private DocumentTypeManager docTypeManager; - private final Map<String, ClientFeederV3> clientFeederByClientId = new HashMap<>(); - private final ScheduledThreadPoolExecutor cron; - private final SessionCache sessionCache; - protected final ReplyHandler feedReplyHandler; - private final Metric metric; - private final Object monitor = new Object(); - private static final Logger log = Logger.getLogger(FeedHandlerV3.class.getName()); - - public FeedHandlerV3(Executor executor, - Metric metric, - DocumentTypeManager documentTypeManager, - SessionCache sessionCache, - DocumentApiMetrics metricsHelper) { - super(executor, metric); - docTypeManager = documentTypeManager; - this.sessionCache = sessionCache; - feedReplyHandler = new FeedReplyReader(metric, metricsHelper); - cron = new ScheduledThreadPoolExecutor(1, ThreadFactoryFactory.getThreadFactory("feed-handler-v3-janitor")); - cron.scheduleWithFixedDelay(this::removeOldClients, 3, 3, TimeUnit.SECONDS); - this.metric = metric; - } - - public void injectDocumentManangerForTests(DocumentTypeManager docTypeManager) { - this.docTypeManager = docTypeManager; - } - - // TODO: If this is set up to run without first invoking the old FeedHandler code, we should - // verify the version header first. This is done in the old code. - @Override - public HttpResponse handle(HttpRequest request) { - String clientId = clientId(request); - ClientFeederV3 clientFeederV3; - synchronized (monitor) { - if (! clientFeederByClientId.containsKey(clientId)) { - SourceSessionParams sourceSessionParams = sourceSessionParams(request); - clientFeederByClientId.put(clientId, - new ClientFeederV3(retainSource(sessionCache, sourceSessionParams), - new FeedReaderFactory(true), //TODO make error debugging configurable - docTypeManager, - clientId, - metric, - feedReplyHandler)); - } - clientFeederV3 = clientFeederByClientId.get(clientId); - } - try { - return clientFeederV3.handleRequest(request); - } catch (UnknownClientException uce) { - String msg = Exceptions.toMessageString(uce); - log.log(Level.WARNING, msg); - return new ErrorHttpResponse(com.yahoo.jdisc.http.HttpResponse.Status.BAD_REQUEST, msg); - } catch (Exception e) { - String msg = "Could not initialize document parsing: " + Exceptions.toMessageString(e); - log.log(Level.WARNING, msg); - return new ErrorHttpResponse(com.yahoo.jdisc.http.HttpResponse.Status.INTERNAL_SERVER_ERROR, msg); - } - } - - // SessionCache is final and no easy way to mock it so we need this to be able to do testing. - protected ReferencedResource<SharedSourceSession> retainSource(SessionCache sessionCache, SourceSessionParams params) { - return sessionCache.retainSource(params); - } - - @Override - protected void destroy() { - // We are forking this to avoid that accidental de-referencing causes any random thread doing destruction. - // This caused a deadlock when the single Messenger thread in MessageBus was the last one referring this - // and started destructing something that required something only the messenger thread could provide. - Thread destroyer = new Thread(() -> { - cron.shutdown(); - synchronized (monitor) { - for (var iterator = clientFeederByClientId.values().iterator(); iterator.hasNext(); ) { - iterator.next().kill(); - iterator.remove(); - } - } - }, "feed-handler-v3-adhoc-destroyer"); - destroyer.setDaemon(true); - destroyer.start(); - } - - private String clientId(HttpRequest request) { - String clientDictatedId = request.getHeader(Headers.CLIENT_ID); - if (clientDictatedId == null || clientDictatedId.isEmpty()) { - throw new IllegalArgumentException("Did not get any CLIENT_ID header (" + Headers.CLIENT_ID + ")"); - } - return clientDictatedId; - } - - private SourceSessionParams sourceSessionParams(HttpRequest request) { - SourceSessionParams params = new SourceSessionParams(); - String timeout = request.getHeader(Headers.TIMEOUT); - - if (timeout != null) { - try { - params.setTimeout(Double.parseDouble(timeout)); - } catch (NumberFormatException e) { - // NOP - } - } - return params; - } - - private void removeOldClients() { - synchronized (monitor) { - for (var iterator = clientFeederByClientId.values().iterator(); iterator.hasNext(); ) { - ClientFeederV3 client = iterator.next(); - if (client.timedOut()) { - client.kill(); - iterator.remove(); - } - } - } - } - -} diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedReaderFactory.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedReaderFactory.java deleted file mode 100644 index 069ccfd84f0..00000000000 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedReaderFactory.java +++ /dev/null @@ -1,62 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.server; - -import com.yahoo.document.DocumentTypeManager; -import com.yahoo.document.json.JsonFeedReader; -import com.yahoo.text.Utf8; -import com.yahoo.vespa.http.client.config.FeedParams; -import com.yahoo.vespaxmlparser.FeedReader; -import com.yahoo.vespaxmlparser.VespaXMLFeedReader; - -import java.io.InputStream; - -/** - * Class for creating FeedReader based on dataFormat. - * @author dybis - */ -public class FeedReaderFactory { - private static final int MARK_READLIMIT = 200; - - private final boolean debug; - public FeedReaderFactory(boolean debug) { - this.debug = debug; - } - - /** - * Creates FeedReader - * @param inputStream source of feed data - * @param docTypeManager handles the parsing of the document - * @param dataFormat specifies the format - * @return a feedreader - */ - public FeedReader createReader( - InputStream inputStream, - DocumentTypeManager docTypeManager, - FeedParams.DataFormat dataFormat) { - switch (dataFormat) { - case XML_UTF8: - byte [] peek = null; - int bytesPeeked = 0; - try { - if (debug && inputStream.markSupported()) { - peek = new byte[MARK_READLIMIT]; - inputStream.mark(MARK_READLIMIT); - bytesPeeked = inputStream.read(peek); - inputStream.reset(); - } - return new VespaXMLFeedReader(inputStream, docTypeManager); - } catch (Exception e) { - if (bytesPeeked > 0) { - throw new RuntimeException("Could not create VespaXMLFeedReader. First characters are: '" + Utf8.toString(peek, 0, bytesPeeked) + "'", e); - } else { - throw new RuntimeException("Could not create VespaXMLFeedReader.", e); - } - } - case JSON_UTF8: - return new JsonFeedReader(inputStream, docTypeManager); - default: - throw new IllegalStateException("Can not create feed reader for format: " + dataFormat); - } - } - -} diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedReplyReader.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedReplyReader.java deleted file mode 100644 index 2fbb80d9fcc..00000000000 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedReplyReader.java +++ /dev/null @@ -1,94 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.server; - -import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; -import com.yahoo.documentapi.messagebus.protocol.UpdateDocumentMessage; -import com.yahoo.documentapi.messagebus.protocol.UpdateDocumentReply; -import com.yahoo.documentapi.metrics.DocumentApiMetrics; -import com.yahoo.documentapi.metrics.DocumentOperationStatus; -import com.yahoo.documentapi.metrics.DocumentOperationType; -import com.yahoo.jdisc.Metric; -import com.yahoo.messagebus.Reply; -import com.yahoo.messagebus.ReplyHandler; -import com.yahoo.messagebus.Trace; -import com.yahoo.vespa.http.client.core.ErrorCode; -import com.yahoo.vespa.http.client.core.OperationStatus; - -import java.util.Map; -import java.util.Optional; -import java.util.function.Predicate; -import java.util.logging.Level; -import java.util.logging.Logger; - -import static java.util.function.Predicate.not; - -/** - * Catch message bus replies and make the available to a given session. - * - * @author Steinar Knutsen - */ -public class FeedReplyReader implements ReplyHandler { - - private static final Logger log = Logger.getLogger(FeedReplyReader.class.getName()); - private final Metric metric; - private final DocumentApiMetrics metricsHelper; - private final Metric.Context testAndSetMetricCtx; - - public FeedReplyReader(Metric metric, DocumentApiMetrics metricsHelper) { - this.metric = metric; - this.metricsHelper = metricsHelper; - this.testAndSetMetricCtx = metric.createContext(Map.of("operationType", "testAndSet")); - } - - @Override - public void handleReply(Reply reply) { - Object o = reply.getContext(); - if (!(o instanceof ReplyContext)) { - return; - } - ReplyContext context = (ReplyContext) o; - final double latencyInSeconds = (System.currentTimeMillis() - context.creationTime) / 1000.0d; - metric.set(MetricNames.LATENCY, latencyInSeconds, null); - - DocumentOperationType type = DocumentOperationType.fromMessage(reply.getMessage()); - boolean conditionMet = conditionMet(reply); - if (reply.hasErrors() && conditionMet) { - DocumentOperationStatus status = DocumentOperationStatus.fromMessageBusErrorCodes(reply.getErrorCodes()); - metricsHelper.reportFailure(type, status); - metric.add(MetricNames.FAILED, 1, null); - enqueue(context, reply.getError(0).getMessage(), ErrorCode.ERROR, false, reply.getTrace()); - } else { - metricsHelper.reportSuccessful(type, latencyInSeconds); - metric.add(MetricNames.SUCCEEDED, 1, null); - if ( ! conditionMet) - metric.add(MetricNames.CONDITION_NOT_MET, 1, testAndSetMetricCtx); - if ( ! updateNotFound(reply)) - metric.add(MetricNames.NOT_FOUND, 1, null); - enqueue(context, "Document processed.", ErrorCode.OK, !conditionMet, reply.getTrace()); - } - } - - private static boolean conditionMet(Reply reply) { - return ! reply.hasErrors() || reply.getError(0).getCode() != DocumentProtocol.ERROR_TEST_AND_SET_CONDITION_FAILED; - } - - private static boolean updateNotFound(Reply reply) { - return reply instanceof UpdateDocumentReply - && ! ((UpdateDocumentReply) reply).wasFound() - && reply.getMessage() instanceof UpdateDocumentMessage - && ((UpdateDocumentMessage) reply.getMessage()).getDocumentUpdate() != null - && ! ((UpdateDocumentMessage) reply.getMessage()).getDocumentUpdate().getCreateIfNonExistent(); - } - - private void enqueue(ReplyContext context, String message, ErrorCode status, boolean isConditionNotMet, Trace trace) { - try { - String traceMessage = (trace != null && trace.getLevel() > 0) ? trace.toString() : ""; - - context.feedReplies.put(new OperationStatus(message, context.docId, status, isConditionNotMet, traceMessage)); - } catch (InterruptedException e) { - log.log(Level.WARNING, - "Interrupted while enqueueing result from putting document with id: " + context.docId); - Thread.currentThread().interrupt(); - } - } -} diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedResponse.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedResponse.java deleted file mode 100644 index 3e2a4a8795f..00000000000 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedResponse.java +++ /dev/null @@ -1,88 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.server; - -import com.yahoo.container.jdisc.HttpResponse; -import com.yahoo.vespa.http.client.core.Headers; -import com.yahoo.vespa.http.client.core.ErrorCode; -import com.yahoo.vespa.http.client.core.OperationStatus; - -import java.io.IOException; -import java.io.OutputStream; -import java.nio.charset.StandardCharsets; -import java.util.concurrent.BlockingQueue; - -/** - * Reads feed responses from a queue and renders them continuously to the - * feeder. - * - * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a> - * @since 5.1 - */ -public class FeedResponse extends HttpResponse { - - BlockingQueue<OperationStatus> operations; - - public FeedResponse( - int status, - BlockingQueue<OperationStatus> operations, - int protocolVersion, - String sessionId) { - super(status); - this.operations = operations; - headers().add(Headers.SESSION_ID, sessionId); - headers().add(Headers.VERSION, Integer.toString(protocolVersion)); - } - - // This is used by the V3 protocol. - public FeedResponse( - int status, - BlockingQueue<OperationStatus> operations, - int protocolVersion, - String sessionId, - int outstandingClientOperations, - String hostName) { - super(status); - this.operations = operations; - headers().add(Headers.SESSION_ID, sessionId); - headers().add(Headers.VERSION, Integer.toString(protocolVersion)); - headers().add(Headers.OUTSTANDING_REQUESTS, Integer.toString(outstandingClientOperations)); - headers().add(Headers.HOSTNAME, hostName); - } - - @Override - public void render(OutputStream output) throws IOException { - int i = 0; - OperationStatus status; - try { - status = operations.take(); - while (status.errorCode != ErrorCode.END_OF_FEED) { - output.write(toBytes(status.render())); - if (++i % 5 == 0) { - output.flush(); - } - status = operations.take(); - } - } catch (InterruptedException e) { - output.flush(); - } - } - - private byte[] toBytes(String s) { - byte[] b = new byte[s.length()]; - for (int i = 0; i < b.length; ++i) { - b[i] = (byte) s.charAt(i); // renderSingleStatus ensures ASCII only - } - return b; - } - - @Override - public String getContentType() { - return "text/plain"; - } - - @Override - public String getCharacterEncoding() { - return StandardCharsets.US_ASCII.name(); - } - -} diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeederSettings.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeederSettings.java deleted file mode 100644 index 725349f6ebe..00000000000 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeederSettings.java +++ /dev/null @@ -1,33 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.server; - -import com.yahoo.container.jdisc.HttpRequest; -import com.yahoo.messagebus.routing.Route; -import com.yahoo.vespa.http.client.config.FeedParams.DataFormat; -import com.yahoo.vespa.http.client.core.Headers; - -import java.util.Optional; - -/** - * Wrapper for the feed feederSettings read from HTTP request. - * - * @author Steinar Knutsen - */ -public class FeederSettings { - - private static final Route DEFAULT_ROUTE = Route.parse("default"); - public final boolean drain; // TODO: Implement drain=true - public final Route route; - public final DataFormat dataFormat; - public final String priority; - public final Integer traceLevel; - - public FeederSettings(HttpRequest request) { - this.drain = Optional.ofNullable(request.getHeader(Headers.DRAIN)).map(Boolean::parseBoolean).orElse(false); - this.route = Optional.ofNullable(request.getHeader(Headers.ROUTE)).map(Route::parse).orElse(DEFAULT_ROUTE); - this.dataFormat = Optional.ofNullable(request.getHeader(Headers.DATA_FORMAT)).map(DataFormat::valueOf).orElse(DataFormat.JSON_UTF8); - this.priority = request.getHeader(Headers.PRIORITY); - this.traceLevel = Optional.ofNullable(request.getHeader(Headers.TRACE_LEVEL)).map(Integer::valueOf).orElse(null); - } - -} diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ReplyContext.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ReplyContext.java deleted file mode 100644 index aa2651595ef..00000000000 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ReplyContext.java +++ /dev/null @@ -1,25 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.server; - -import com.yahoo.vespa.http.client.core.OperationStatus; - -import java.util.concurrent.BlockingQueue; - -/** - * Mapping between document ID and client session. - * - * @author Steinar Knutsen - */ -public class ReplyContext { - - public final String docId; - public final BlockingQueue<OperationStatus> feedReplies; - public final long creationTime; - - public ReplyContext(String docId, BlockingQueue<OperationStatus> feedReplies) { - this.docId = docId; - this.feedReplies = feedReplies; - this.creationTime = System.currentTimeMillis(); - } - -} diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/StreamReaderV3.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/StreamReaderV3.java deleted file mode 100644 index 4ddc430b35f..00000000000 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/StreamReaderV3.java +++ /dev/null @@ -1,85 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.server; - -import com.yahoo.container.jdisc.HttpRequest; -import com.yahoo.document.DocumentTypeManager; -import com.yahoo.vespa.http.client.core.Encoder; -import com.yahoo.vespa.http.server.util.ByteLimitedInputStream; -import com.yahoo.vespaxmlparser.FeedOperation; -import com.yahoo.vespaxmlparser.FeedReader; - -import java.io.IOException; -import java.io.InputStream; -import java.util.Optional; -import java.util.logging.Logger; -import java.util.zip.GZIPInputStream; - -/** - * This code is based on v2 code, but restructured so stream reading code is in one dedicated class. - * @author dybis - */ -public class StreamReaderV3 { - - protected static final Logger log = Logger.getLogger(StreamReaderV3.class.getName()); - - private final FeedReaderFactory feedReaderFactory; - private final DocumentTypeManager docTypeManager; - - public StreamReaderV3(FeedReaderFactory feedReaderFactory, DocumentTypeManager docTypeManager) { - this.feedReaderFactory = feedReaderFactory; - this.docTypeManager = docTypeManager; - } - - public FeedOperation getNextOperation(InputStream requestInputStream, FeederSettings settings) throws Exception { - FeedOperation op = null; - - int length = readByteLength(requestInputStream); - - try (InputStream limitedInputStream = new ByteLimitedInputStream(requestInputStream, length)){ - FeedReader reader = feedReaderFactory.createReader(limitedInputStream, docTypeManager, settings.dataFormat); - op = reader.read(); - } - return op; - } - - public Optional<String> getNextOperationId(InputStream requestInputStream) throws IOException { - StringBuilder idBuf = new StringBuilder(100); - int c; - while ((c = requestInputStream.read()) != -1) { - if (c == 32) { - break; - } - idBuf.append((char) c); //it's ASCII - } - if (idBuf.length() == 0) { - return Optional.empty(); - } - return Optional.of(Encoder.decode(idBuf.toString(), new StringBuilder(idBuf.length())).toString()); - } - - private int readByteLength(InputStream requestInputStream) throws IOException { - StringBuilder lenBuf = new StringBuilder(8); - int c; - while ((c = requestInputStream.read()) != -1) { - if (c == 10) { - break; - } - lenBuf.append((char) c); //it's ASCII - } - if (lenBuf.length() == 0) { - throw new IllegalStateException("Operation length missing."); - } - return Integer.valueOf(lenBuf.toString(), 16); - } - - public static InputStream unzipStreamIfNeeded(final HttpRequest httpRequest) - throws IOException { - final String contentEncodingHeader = httpRequest.getHeader("content-encoding"); - if ("gzip".equals(contentEncodingHeader)) { - return new GZIPInputStream(httpRequest.getData()); - } else { - return httpRequest.getData(); - } - } - -} diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/UnknownClientException.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/UnknownClientException.java deleted file mode 100644 index 5324b86a98a..00000000000 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/UnknownClientException.java +++ /dev/null @@ -1,14 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.server; - -/** - * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a> - * @since 5.5.0 - */ -public class UnknownClientException extends RuntimeException { - - public UnknownClientException(String message) { - super(message); - } - -} diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/package-info.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/package-info.java deleted file mode 100644 index ea01137d9af..00000000000 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/package-info.java +++ /dev/null @@ -1,7 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -/** - * Server side of programmatic API for feeding into Vespa from outside of the - * clusters. Not a public API, not meant for direct use. - */ -@com.yahoo.api.annotations.PackageMarker -package com.yahoo.vespa.http.server; diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/util/ByteLimitedInputStream.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/util/ByteLimitedInputStream.java deleted file mode 100644 index 270ebe7796b..00000000000 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/util/ByteLimitedInputStream.java +++ /dev/null @@ -1,99 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.server.util; - -import java.io.IOException; -import java.io.InputStream; - -/** - * @author Einar M R Rosenvinge - * - * @since 5.1.23 - */ -public class ByteLimitedInputStream extends InputStream { - - private final InputStream wrappedStream; - private int remaining; - private int remainingWhenMarked; - - public ByteLimitedInputStream(InputStream wrappedStream, int limit) { - this.wrappedStream = wrappedStream; - if (limit < 0) { - throw new IllegalArgumentException("limit cannot be 0"); - } - this.remaining = limit; - } - - @Override - public int read() throws IOException { - if (remaining <= 0) { - return -1; - } - int retval = wrappedStream.read(); - if (retval < 0) { - remaining = 0; - } else { - --remaining; - } - return retval; - } - - @Override - public int read(byte[] b, int off, int len) throws IOException { - if (b == null) { - throw new NullPointerException(); - } else if (off < 0 || len < 0 || len > b.length - off) { - throw new IndexOutOfBoundsException(); - } else if (len == 0) { - return 0; - } - - if (remaining <= 0) { - return -1; - } - - int bytesToRead = Math.min(remaining, len); - int retval = wrappedStream.read(b, off, bytesToRead); - - if (retval < 0) { - //end of underlying stream was reached, and nothing was read. - remaining = 0; - } else { - remaining -= retval; - } - return retval; - } - - @Override - public int available() throws IOException { - return remaining; - } - - @Override - public void close() throws IOException { - //we will never close the underlying stream - if (remaining <= 0) { - return; - } - while (remaining > 0) { - skip(remaining); - } - } - - @Override - public synchronized void mark(int readlimit) { - wrappedStream.mark(readlimit); - remainingWhenMarked = remaining; - } - - @Override - public synchronized void reset() throws IOException { - wrappedStream.reset(); - remaining = remainingWhenMarked; - } - - @Override - public boolean markSupported() { - return wrappedStream.markSupported(); - } - -} diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/CollectingMetric.java b/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/CollectingMetric.java deleted file mode 100644 index 1b9a5eb6381..00000000000 --- a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/CollectingMetric.java +++ /dev/null @@ -1,38 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.server; - -import com.yahoo.jdisc.Metric; - -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicLong; - -/** - * @author ollivir - */ -public final class CollectingMetric implements Metric { - private final Context DUMMY_CONTEXT = new Context() {}; - private final Map<String, AtomicLong> values = new ConcurrentHashMap<>(); - - public CollectingMetric() {} - - @Override - public void set(String key, Number val, Context ctx) { - values.computeIfAbsent(key, ignored -> new AtomicLong(0)).set(val.longValue()); - } - - @Override - public void add(String key, Number val, Context ctx) { - values.computeIfAbsent(key, ignored -> new AtomicLong(0)).addAndGet(val.longValue()); - } - - public long get(String key) { - return Optional.ofNullable(values.get(key)).map(AtomicLong::get).orElse(0L); - } - - @Override - public Context createContext(Map<String, ?> properties) { - return DUMMY_CONTEXT; - } -} diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/DummyMetric.java b/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/DummyMetric.java deleted file mode 100644 index 1cdac87f3df..00000000000 --- a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/DummyMetric.java +++ /dev/null @@ -1,31 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.server; - -import com.yahoo.jdisc.Metric; - -import java.util.Map; - -/** - * @author Einar M R Rosenvinge - * @since 5.1.20 - */ -class DummyMetric implements Metric { - - @Override - public void set(String key, Number val, Context ctx) { - } - - @Override - public void add(String key, Number val, Context ctx) { - } - - @Override - public Context createContext(Map<String, ?> properties) { - return DummyContext.INSTANCE; - } - - private static class DummyContext implements Context { - private static final DummyContext INSTANCE = new DummyContext(); - } - -} diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/FeedHandlerCompressionTest.java b/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/FeedHandlerCompressionTest.java deleted file mode 100644 index 6f1b5eebcc4..00000000000 --- a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/FeedHandlerCompressionTest.java +++ /dev/null @@ -1,69 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.server; - -import com.yahoo.container.jdisc.HttpRequest; -import org.junit.Test; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.zip.GZIPOutputStream; - -import static org.junit.Assert.assertEquals; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -public class FeedHandlerCompressionTest { - - public static byte[] compress(final String dataToBrCompressed) throws IOException { - final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(dataToBrCompressed.length()); - final GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream); - gzipOutputStream.write(dataToBrCompressed.getBytes()); - gzipOutputStream.close(); - byte[] compressedBytes = byteArrayOutputStream.toByteArray(); - byteArrayOutputStream.close(); - return compressedBytes; - } - - @Test - public void testUnzipStreamIfNeeded() throws Exception { - final String testData = "foo bar"; - InputStream inputStream = new ByteArrayInputStream(compress(testData)); - HttpRequest httpRequest = mock(HttpRequest.class); - when(httpRequest.getHeader("content-encoding")).thenReturn("gzip"); - InputStream decompressedStream = FeedHandler.unzipStreamIfNeeded(inputStream, httpRequest); - final StringBuilder processedInput = new StringBuilder(); - while (true) { - int readValue = decompressedStream.read(); - if (readValue < 0) { - break; - } - processedInput.append((char)readValue); - } - assertEquals(processedInput.toString(), testData); - } - - /** - * Test by setting encoding, but not compressing data. - * @throws Exception - */ - @Test - public void testUnzipFails() throws Exception { - final String testData = "foo bar"; - InputStream inputStream = new ByteArrayInputStream(testData.getBytes()); - HttpRequest httpRequest = mock(HttpRequest.class); - when(httpRequest.getHeader("Content-Encoding")).thenReturn("gzip"); - InputStream decompressedStream = FeedHandler.unzipStreamIfNeeded(inputStream, httpRequest); - final StringBuilder processedInput = new StringBuilder(); - while (true) { - int readValue = decompressedStream.read(); - if (readValue < 0) { - break; - } - processedInput.append((char)readValue); - } - assertEquals(processedInput.toString(), testData); - } - -} diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/FeedHandlerTest.java b/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/FeedHandlerTest.java deleted file mode 100644 index f3ea8fb5a80..00000000000 --- a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/FeedHandlerTest.java +++ /dev/null @@ -1,45 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.server; -import com.yahoo.container.handler.threadpool.ContainerThreadPool; -import com.yahoo.container.jdisc.RequestHandlerTestDriver; -import com.yahoo.document.DocumentTypeManager; -import com.yahoo.document.config.DocumentmanagerConfig; -import com.yahoo.jdisc.handler.OverloadException; -import com.yahoo.metrics.simple.MetricReceiver; -import org.junit.Test; - -import java.util.concurrent.Executor; -import java.util.concurrent.RejectedExecutionException; - -import static com.yahoo.vespa.http.server.FeedHandlerV3Test.createRequest; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -/** - * @author bjorncs - */ -public class FeedHandlerTest { - - @Test - public void response_has_status_code_429_when_throttling() { - FeedHandler handler = new FeedHandler( - new RejectingContainerThreadpool(), - new CollectingMetric(), - new DocumentTypeManager(new DocumentmanagerConfig.Builder().enablecompression(true).build()), - null /* session cache */, - MetricReceiver.nullImplementation); - var responseHandler = new RequestHandlerTestDriver.MockResponseHandler(); - try { - handler.handleRequest(createRequest(100).getJDiscRequest(), responseHandler); - fail(); - } catch (OverloadException e) {} - assertEquals(429, responseHandler.getStatus()); - } - - private static class RejectingContainerThreadpool implements ContainerThreadPool { - private final Executor executor = ignored -> { throw new RejectedExecutionException(); }; - - @Override public Executor executor() { return executor; } - } - -} diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/FeedHandlerV3Test.java b/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/FeedHandlerV3Test.java deleted file mode 100644 index a5a8f4cb5bd..00000000000 --- a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/FeedHandlerV3Test.java +++ /dev/null @@ -1,152 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.server; - -import com.google.common.base.Splitter; -import com.yahoo.container.jdisc.HttpRequest; -import com.yahoo.container.jdisc.HttpResponse; -import com.yahoo.container.jdisc.messagebus.SessionCache; -import com.yahoo.document.DataType; -import com.yahoo.document.DocumentType; -import com.yahoo.document.DocumentTypeManager; -import com.yahoo.document.config.DocumentmanagerConfig; -import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage; -import com.yahoo.documentapi.metrics.DocumentApiMetrics; -import com.yahoo.jdisc.ReferencedResource; -import com.yahoo.messagebus.Result; -import com.yahoo.messagebus.SourceSessionParams; -import com.yahoo.messagebus.shared.SharedSourceSession; -import com.yahoo.metrics.simple.MetricReceiver; -import com.yahoo.text.Utf8; -import com.yahoo.vespa.http.client.config.FeedParams; -import com.yahoo.vespa.http.client.core.ErrorCode; -import com.yahoo.vespa.http.client.core.Headers; -import com.yahoo.vespa.http.client.core.OperationStatus; -import org.junit.Test; -import org.mockito.stubbing.Answer; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.InputStream; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -public class FeedHandlerV3Test { - final CollectingMetric metric = new CollectingMetric(); - private final Executor simpleThreadpool = Executors.newCachedThreadPool(); - - @Test - public void feedOneDocument() throws Exception { - final FeedHandlerV3 feedHandlerV3 = setupFeederHandler(simpleThreadpool); - HttpResponse httpResponse = feedHandlerV3.handle(createRequest(1)); - ByteArrayOutputStream outStream = new ByteArrayOutputStream(); - httpResponse.render(outStream); - assertEquals(httpResponse.getContentType(), "text/plain"); - assertEquals(Utf8.toString(outStream.toByteArray()), "1230 OK message trace\n"); - } - - @Test - public void feedOneBrokenDocument() throws Exception { - final FeedHandlerV3 feedHandlerV3 = setupFeederHandler(simpleThreadpool); - HttpResponse httpResponse = feedHandlerV3.handle(createBrokenRequest()); - ByteArrayOutputStream outStream = new ByteArrayOutputStream(); - httpResponse.render(outStream); - assertEquals(httpResponse.getContentType(), "text/plain"); - assertTrue(Utf8.toString(outStream.toByteArray()).startsWith("1230 ERROR ")); - assertEquals(1L, metric.get(MetricNames.PARSE_ERROR)); - } - - @Test - public void feedManyDocument() throws Exception { - final FeedHandlerV3 feedHandlerV3 = setupFeederHandler(simpleThreadpool); - HttpResponse httpResponse = feedHandlerV3.handle(createRequest(100)); - ByteArrayOutputStream outStream = new ByteArrayOutputStream(); - httpResponse.render(outStream); - assertEquals(httpResponse.getContentType(), "text/plain"); - String result = Utf8.toString(outStream.toByteArray()); - assertEquals(101, Splitter.on("\n").splitToList(result).size()); - } - - private static DocumentTypeManager createDoctypeManager() { - DocumentTypeManager docTypeManager = new DocumentTypeManager(); - DocumentType documentType = new DocumentType("testdocument"); - documentType.addField("title", DataType.STRING); - documentType.addField("body", DataType.STRING); - docTypeManager.registerDocumentType(documentType); - return docTypeManager; - } - - static HttpRequest createRequest(int numberOfDocs) { - StringBuilder wireData = new StringBuilder(); - for (int x = 0; x < numberOfDocs; x++) { - String docData = "[{\"put\": \"id:testdocument:testdocument::c\", \"fields\": { \"title\": \"fooKey\", \"body\": \"value\"}}]"; - String operationId = "123" + x; - wireData.append(operationId + " " + Integer.toHexString(docData.length()) + "\n" + docData); - } - return createRequestWithPayload(wireData.toString()); - } - - private static HttpRequest createBrokenRequest() { - String docData = "[{\"put oops I broke it]"; - String wireData = "1230 " + Integer.toHexString(docData.length()) + "\n" + docData; - return createRequestWithPayload(wireData); - } - - static HttpRequest createRequestWithPayload(String payload) { - InputStream inputStream = new ByteArrayInputStream(payload.getBytes()); - HttpRequest request = HttpRequest.createTestRequest("http://dummyhostname:19020/reserved-for-internal-use/feedapi", - com.yahoo.jdisc.http.HttpRequest.Method.POST, inputStream); - request.getJDiscRequest().headers().add(Headers.VERSION, "3"); - request.getJDiscRequest().headers().add(Headers.DATA_FORMAT, FeedParams.DataFormat.JSON_UTF8.name()); - request.getJDiscRequest().headers().add(Headers.TIMEOUT, "1000000000"); - request.getJDiscRequest().headers().add(Headers.CLIENT_ID, "client123"); - request.getJDiscRequest().headers().add(Headers.PRIORITY, "LOWEST"); - request.getJDiscRequest().headers().add(Headers.TRACE_LEVEL, "4"); - request.getJDiscRequest().headers().add(Headers.DRAIN, "true"); - return request; - } - - private FeedHandlerV3 setupFeederHandler(Executor threadPool) { - DocumentTypeManager docMan = new DocumentTypeManager(new DocumentmanagerConfig.Builder().enablecompression(true).build()); - FeedHandlerV3 feedHandlerV3 = new FeedHandlerV3( - threadPool, - metric, - docMan, - null /* session cache */, - new DocumentApiMetrics(MetricReceiver.nullImplementation, "test")) { - @Override - protected ReferencedResource<SharedSourceSession> retainSource( - SessionCache sessionCache, SourceSessionParams sessionParams) { - SharedSourceSession sharedSourceSession = mock(SharedSourceSession.class); - - try { - when(sharedSourceSession.sendMessageBlocking(any())).thenAnswer((Answer<?>) invocation -> { - Object[] args = invocation.getArguments(); - PutDocumentMessage putDocumentMessage = (PutDocumentMessage) args[0]; - ReplyContext replyContext = (ReplyContext) putDocumentMessage.getContext(); - replyContext.feedReplies.add(new OperationStatus("message", replyContext.docId, ErrorCode.OK, false, "trace")); - Result result = mock(Result.class); - when(result.isAccepted()).thenReturn(true); - return result; - }); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - - Result result = mock(Result.class); - when(result.isAccepted()).thenReturn(true); - ReferencedResource<SharedSourceSession> refSharedSessopn = - new ReferencedResource<>(sharedSourceSession, () -> {}); - return refSharedSessopn; - } - }; - feedHandlerV3.injectDocumentManangerForTests(createDoctypeManager()); - return feedHandlerV3; - } - -} diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/FeedReaderFactoryTestCase.java b/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/FeedReaderFactoryTestCase.java deleted file mode 100644 index 6b0bd1c9518..00000000000 --- a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/FeedReaderFactoryTestCase.java +++ /dev/null @@ -1,40 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.server; - -import com.yahoo.document.DocumentTypeManager; -import com.yahoo.text.Utf8; -import com.yahoo.vespa.http.client.config.FeedParams; -import org.junit.Test; - -import java.io.ByteArrayInputStream; -import java.io.InputStream; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -public class FeedReaderFactoryTestCase { - DocumentTypeManager manager = new DocumentTypeManager(); - - private InputStream createStream(String s) { - return new ByteArrayInputStream(Utf8.toBytes(s)); - } - - @Test - public void testXmlExceptionWithDebug() { - try { - new FeedReaderFactory(true).createReader(createStream("Some malformed xml"), manager, FeedParams.DataFormat.XML_UTF8); - fail(); - } catch (RuntimeException e) { - assertEquals("Could not create VespaXMLFeedReader. First characters are: 'Some malformed xml'", e.getMessage()); - } - } - @Test - public void testXmlException() { - try { - new FeedReaderFactory(false).createReader(createStream("Some malformed xml"), manager, FeedParams.DataFormat.XML_UTF8); - fail(); - } catch (RuntimeException e) { - assertEquals("Could not create VespaXMLFeedReader.", e.getMessage()); - } - } -} diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/MetaStream.java b/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/MetaStream.java deleted file mode 100644 index 4dce8cb4e7d..00000000000 --- a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/MetaStream.java +++ /dev/null @@ -1,39 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.server; - -import com.yahoo.text.Utf8; - -import java.io.ByteArrayInputStream; - -/** - * Stream with extra data outside the actual input stream. - * - * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a> - */ -public final class MetaStream extends ByteArrayInputStream { - - private byte[] operations; - int i; - - public MetaStream(byte[] buf) { - super(createPayload(buf)); - this.operations = buf; - i = 0; - } - - private static final byte[] createPayload(byte[] buf) { - StringBuilder bu = new StringBuilder(); - for (int i = 0; i < buf.length; i++) { - bu.append("id:banana:banana::doc1 0\n"); - } - return Utf8.toBytes(bu.toString()); - } - - public byte getNextOperation() { - if (i >= operations.length) { - return 0; - } - return operations[i++]; - } - -} diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/MockNetwork.java b/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/MockNetwork.java deleted file mode 100644 index 7d3c0bb74ca..00000000000 --- a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/MockNetwork.java +++ /dev/null @@ -1,69 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.server; - -import java.util.List; - -import com.yahoo.jrt.slobrok.api.IMirror; -import com.yahoo.messagebus.Message; -import com.yahoo.messagebus.network.Network; -import com.yahoo.messagebus.network.NetworkOwner; -import com.yahoo.messagebus.routing.RoutingNode; - -/** - * NOP-network. - * - * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a> - */ -class MockNetwork implements Network { - - @Override - public boolean waitUntilReady(double seconds) { - return true; - } - - @Override - public void attach(NetworkOwner owner) { - } - - @Override - public void registerSession(String session) { - } - - @Override - public void unregisterSession(String session) { - - } - - @Override - public boolean allocServiceAddress(RoutingNode recipient) { - return true; - } - - @Override - public void freeServiceAddress(RoutingNode recipient) { - - } - - @Override - public void send(Message msg, List<RoutingNode> recipients) { - } - - @Override - public void sync() { - } - - @Override - public void shutdown() { - } - - @Override - public String getConnectionSpec() { - return null; - } - - @Override - public IMirror getMirror() { - return null; - } - -} diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/MockReply.java b/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/MockReply.java deleted file mode 100644 index 1cb00160bbd..00000000000 --- a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/MockReply.java +++ /dev/null @@ -1,35 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.server; - -import com.yahoo.messagebus.Reply; -import com.yahoo.text.Utf8String; - -/** - * Minimal reply simulator. - * - * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a> - */ -class MockReply extends Reply { - - Object context; - - public MockReply(Object context) { - this.context = context; - } - - @Override - public Utf8String getProtocol() { - return null; - } - - @Override - public int getType() { - return 0; - } - - @Override - public Object getContext() { - return context; - } - -} diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/VersionsTestCase.java b/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/VersionsTestCase.java deleted file mode 100644 index 6858c4bede3..00000000000 --- a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/VersionsTestCase.java +++ /dev/null @@ -1,118 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.server; - -import com.yahoo.collections.Tuple2; -import com.yahoo.container.jdisc.HttpResponse; -import org.junit.Test; - -import java.io.ByteArrayOutputStream; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - -/** - * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a> - * @since 5.7.0 - */ -public class VersionsTestCase { - - private static final List<String> EMPTY = Collections.emptyList(); - private static final List<String> ONE_TWO = Arrays.asList("1", "2"); - private static final List<String> ONE_THREE = Arrays.asList("1", "3"); - private static final List<String> TWO_THREE = Arrays.asList("3", "2"); - private static final List<String> ONE_NULL_THREE = Arrays.asList("1", null, "3"); - private static final List<String> ONE_COMMA_THREE = Collections.singletonList("1, 3"); - private static final List<String> ONE_EMPTY_THREE = Arrays.asList("1", "", "3"); - private static final List<String> TOO_LARGE_NUMBER = Collections.singletonList("1000000000"); - private static final List<String> THREE_TOO_LARGE_NUMBER = Arrays.asList("3", "1000000000"); - private static final List<String> THREE_COMMA_TOO_LARGE_NUMBER = Arrays.asList("3,1000000000"); - private static final List<String> GARBAGE = Collections.singletonList("garbage"); - - @Test - public void testEmpty() { - Tuple2<HttpResponse, Integer> v = FeedHandler.doCheckProtocolVersion(EMPTY); - assertTrue(v.first instanceof ErrorHttpResponse); - assertEquals(Integer.valueOf(-1), v.second); - } - - @Test - public void testOneTwo() { - Tuple2<HttpResponse, Integer> v = FeedHandler.doCheckProtocolVersion(ONE_TWO); - assertTrue(v.first instanceof ErrorHttpResponse); - assertEquals(Integer.valueOf(-1), v.second); - } - - @Test - public void testOneThree() { - Tuple2<HttpResponse, Integer> v = FeedHandler.doCheckProtocolVersion(ONE_THREE); - assertNull(v.first); - assertEquals(Integer.valueOf(3), v.second); - } - - @Test - public void testTwoThree() { - Tuple2<HttpResponse, Integer> v = FeedHandler.doCheckProtocolVersion(TWO_THREE); - assertNull(v.first); - assertEquals(Integer.valueOf(3), v.second); - } - - @Test - public void testOneNullThree() { - Tuple2<HttpResponse, Integer> v = FeedHandler.doCheckProtocolVersion(ONE_NULL_THREE); - assertNull(v.first); - assertEquals(Integer.valueOf(3), v.second); - } - - @Test - public void testOneCommaThree() { - Tuple2<HttpResponse, Integer> v = FeedHandler.doCheckProtocolVersion(ONE_COMMA_THREE); - assertNull(v.first); - assertEquals(Integer.valueOf(3), v.second); - } - - @Test - public void testOneEmptyThree() { - Tuple2<HttpResponse, Integer> v = FeedHandler.doCheckProtocolVersion(ONE_EMPTY_THREE); - assertNull(v.first); - assertEquals(Integer.valueOf(3), v.second); - } - - @Test - public void testTooLarge() throws Exception { - Tuple2<HttpResponse, Integer> v = FeedHandler.doCheckProtocolVersion(TOO_LARGE_NUMBER); - assertTrue(v.first instanceof ErrorHttpResponse); - ByteArrayOutputStream errorMsg = new ByteArrayOutputStream(); - ErrorHttpResponse errorResponse = (ErrorHttpResponse) v.first; - errorResponse.render(errorMsg); - assertEquals(errorMsg.toString(), - "Could not parse X-Yahoo-Feed-Protocol-Versionheader of request (values: [1000000000]). " + - "Server supports protocol versions [3]"); - assertEquals(Integer.valueOf(-1), v.second); - } - - @Test - public void testThreeTooLarge() { - Tuple2<HttpResponse, Integer> v = FeedHandler.doCheckProtocolVersion(THREE_TOO_LARGE_NUMBER); - assertNull(v.first); - assertEquals(Integer.valueOf(3), v.second); - } - - @Test - public void testTwoCommaTooLarge() { - Tuple2<HttpResponse, Integer> v = FeedHandler.doCheckProtocolVersion(THREE_COMMA_TOO_LARGE_NUMBER); - assertNull(v.first); - assertEquals(Integer.valueOf(3), v.second); - } - - @Test - public void testGarbage() { - Tuple2<HttpResponse, Integer> v = FeedHandler.doCheckProtocolVersion(GARBAGE); - assertTrue(v.first instanceof ErrorHttpResponse); - assertEquals(Integer.valueOf(-1), v.second); - } - -} diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/util/ByteLimitedInputStreamTestCase.java b/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/util/ByteLimitedInputStreamTestCase.java deleted file mode 100644 index ed571c6baff..00000000000 --- a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/util/ByteLimitedInputStreamTestCase.java +++ /dev/null @@ -1,106 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.server.util; - -import org.junit.Test; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.nio.charset.StandardCharsets; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -/** - * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a> - * @since 5.1.23 - */ -public class ByteLimitedInputStreamTestCase { - - private static ByteLimitedInputStream create(byte[] source, int limit) { - if (limit > source.length) { - throw new IllegalArgumentException("Limit is greater than length of source buffer."); - } - InputStream wrappedStream = new ByteArrayInputStream(source); - return new ByteLimitedInputStream(wrappedStream, limit); - } - - @Test - public void requireThatBasicsWork() throws IOException { - ByteLimitedInputStream stream = create("abcdefghijklmnopqr".getBytes(StandardCharsets.US_ASCII), 9); - - assertEquals(9, stream.available()); - assertEquals(97, stream.read()); - assertEquals(8, stream.available()); - assertEquals(98, stream.read()); - assertEquals(7, stream.available()); - assertEquals(99, stream.read()); - assertEquals(6, stream.available()); - assertEquals(100, stream.read()); - assertEquals(5, stream.available()); - assertEquals(101, stream.read()); - assertEquals(4, stream.available()); - assertEquals(102, stream.read()); - assertEquals(3, stream.available()); - assertEquals(103, stream.read()); - assertEquals(2, stream.available()); - assertEquals(104, stream.read()); - assertEquals(1, stream.available()); - assertEquals(105, stream.read()); - assertEquals(0, stream.available()); - assertEquals(-1, stream.read()); - assertEquals(0, stream.available()); - assertEquals(-1, stream.read()); - assertEquals(0, stream.available()); - assertEquals(-1, stream.read()); - assertEquals(0, stream.available()); - assertEquals(-1, stream.read()); - assertEquals(0, stream.available()); - assertEquals(-1, stream.read()); - assertEquals(0, stream.available()); - } - - @Test - public void requireThatChunkedReadWorks() throws IOException { - ByteLimitedInputStream stream = create("abcdefghijklmnopqr".getBytes(StandardCharsets.US_ASCII), 9); - - assertEquals(9, stream.available()); - byte[] toBuf = new byte[4]; - assertEquals(4, stream.read(toBuf)); - assertEquals(97, toBuf[0]); - assertEquals(98, toBuf[1]); - assertEquals(99, toBuf[2]); - assertEquals(100, toBuf[3]); - assertEquals(5, stream.available()); - - assertEquals(4, stream.read(toBuf)); - assertEquals(101, toBuf[0]); - assertEquals(102, toBuf[1]); - assertEquals(103, toBuf[2]); - assertEquals(104, toBuf[3]); - assertEquals(1, stream.available()); - - assertEquals(1, stream.read(toBuf)); - assertEquals(105, toBuf[0]); - assertEquals(0, stream.available()); - - assertEquals(-1, stream.read(toBuf)); - assertEquals(0, stream.available()); - } - - @Test - public void requireMarkWorks() throws IOException { - InputStream stream = create("abcdefghijklmnopqr".getBytes(StandardCharsets.US_ASCII), 9); - assertEquals(97, stream.read()); - assertTrue(stream.markSupported()); - stream.mark(5); - assertEquals(98, stream.read()); - assertEquals(99, stream.read()); - stream.reset(); - assertEquals(98, stream.read()); - assertEquals(99, stream.read()); - assertEquals(100, stream.read()); - assertEquals(101, stream.read()); - } - -} diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/vespaxmlparser/MockFeedReaderFactory.java b/vespaclient-container-plugin/src/test/java/com/yahoo/vespaxmlparser/MockFeedReaderFactory.java deleted file mode 100644 index 513892af213..00000000000 --- a/vespaclient-container-plugin/src/test/java/com/yahoo/vespaxmlparser/MockFeedReaderFactory.java +++ /dev/null @@ -1,32 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespaxmlparser; - -import com.yahoo.document.DocumentTypeManager; -import com.yahoo.vespa.http.client.config.FeedParams; -import com.yahoo.vespa.http.server.FeedReaderFactory; - -import java.io.InputStream; - -/** - * For creating MockReader of innput stream. - * @author dybis - */ -public class MockFeedReaderFactory extends FeedReaderFactory { - - public MockFeedReaderFactory() { - super(true); - } - - @Override - public FeedReader createReader( - InputStream inputStream, - DocumentTypeManager docTypeManager, - FeedParams.DataFormat dataFormat) { - try { - return new MockReader(inputStream); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - -} diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/vespaxmlparser/MockReader.java b/vespaclient-container-plugin/src/test/java/com/yahoo/vespaxmlparser/MockReader.java deleted file mode 100644 index c751849b84e..00000000000 --- a/vespaclient-container-plugin/src/test/java/com/yahoo/vespaxmlparser/MockReader.java +++ /dev/null @@ -1,69 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespaxmlparser; - -import com.yahoo.document.Document; -import com.yahoo.document.DocumentId; -import com.yahoo.document.DocumentType; -import com.yahoo.document.DocumentUpdate; -import com.yahoo.vespa.http.server.MetaStream; -import com.yahoo.vespa.http.server.util.ByteLimitedInputStream; - -import java.io.InputStream; -import java.lang.reflect.Field; - -/** - * Mock for ExternalFeedTestCase which had to override package private methods. - * - * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a> - */ -public class MockReader implements FeedReader { - - MetaStream stream; - boolean finished = false; - - public MockReader(InputStream stream) throws Exception { - this.stream = getMetaStream(stream); - } - - private static MetaStream getMetaStream(InputStream stream) { - if (stream instanceof MetaStream) { - return (MetaStream) stream; - } - if (!(stream instanceof ByteLimitedInputStream)) { - throw new IllegalStateException("Given unknown stream type."); - } - //Ooooooo this is so ugly - try { - ByteLimitedInputStream byteLimitedInputStream = (ByteLimitedInputStream) stream; - Field f = byteLimitedInputStream.getClass().getDeclaredField("wrappedStream"); //NoSuchFieldException - f.setAccessible(true); - return (MetaStream) f.get(byteLimitedInputStream); - } catch (Exception e) { - throw new IllegalStateException("Implementation of ByteLimitedInputStream has changed.", e); - } - } - - @Override - public FeedOperation read() throws Exception { - if (finished) { - return FeedOperation.INVALID; - } - - byte whatToDo = stream.getNextOperation(); - DocumentId id = new DocumentId("id:banana:banana::doc1"); - DocumentType docType = new DocumentType("banana"); - switch (whatToDo) { - case 0: - return FeedOperation.INVALID; - case 1: - return new DocumentFeedOperation(new Document(docType, id)); - case 2: - return new RemoveFeedOperation(id); - case 3: - return new DocumentUpdateFeedOperation(new DocumentUpdate(docType, id)); - default: - throw new RuntimeException("boom"); - } - } - -} |