diff options
author | Jon Bratseth <bratseth@gmail.com> | 2022-06-07 13:08:57 +0200 |
---|---|---|
committer | gjoranv <gv@verizonmedia.com> | 2022-06-08 11:45:31 +0200 |
commit | 81928bd0107babe7dece6bef1840c61cac5120cc (patch) | |
tree | 84053c49e70afaeb0f13f9c6b61b552d07997ee7 /vespa-hadoop | |
parent | 033d6494edc17b554ab841c3f5ea70bc5f8925de (diff) |
Remove vespåa-http-client usage part 2
Diffstat (limited to 'vespa-hadoop')
6 files changed, 2 insertions, 287 deletions
diff --git a/vespa-hadoop/pom.xml b/vespa-hadoop/pom.xml index 0c724c95839..1dde2d58610 100644 --- a/vespa-hadoop/pom.xml +++ b/vespa-hadoop/pom.xml @@ -136,11 +136,6 @@ <!-- Vespa feeding dependencies --> <dependency> <groupId>com.yahoo.vespa</groupId> - <artifactId>vespa-http-client</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>com.yahoo.vespa</groupId> <artifactId>vespa-feed-client</artifactId> <version>${project.version}</version> </dependency> diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/LegacyVespaRecordWriter.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/LegacyVespaRecordWriter.java deleted file mode 100644 index 6900c7dc82f..00000000000 --- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/LegacyVespaRecordWriter.java +++ /dev/null @@ -1,233 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.hadoop.mapreduce; - -import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.core.JsonParseException; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonToken; -import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration; -import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters; -import com.yahoo.vespa.hadoop.pig.VespaDocumentOperation; -import com.yahoo.vespa.http.client.config.Cluster; -import com.yahoo.vespa.http.client.config.ConnectionParams; -import com.yahoo.vespa.http.client.config.Endpoint; -import com.yahoo.vespa.http.client.config.FeedParams; -import com.yahoo.vespa.http.client.config.FeedParams.DataFormat; -import com.yahoo.vespa.http.client.config.SessionParams; -import org.apache.hadoop.mapreduce.RecordWriter; -import org.apache.hadoop.mapreduce.TaskAttemptContext; - -import javax.xml.namespace.QName; -import javax.xml.stream.FactoryConfigurationError; -import javax.xml.stream.XMLEventReader; -import javax.xml.stream.XMLInputFactory; -import javax.xml.stream.XMLStreamException; -import javax.xml.stream.events.StartElement; -import javax.xml.stream.events.XMLEvent; -import java.io.IOException; -import java.io.StringReader; -import java.time.Duration; -import java.util.List; -import java.util.StringTokenizer; -import java.util.concurrent.ThreadLocalRandom; -import java.util.logging.Logger; - -/** - * {@link LegacyVespaRecordWriter} sends the output <key, value> to one or more Vespa endpoints using vespa-http-client. - * - * @author lesters - * @deprecated Replaced by {@link VespaRecordWriter} - */ -@Deprecated -public class LegacyVespaRecordWriter extends RecordWriter<Object, Object> { - - private final Logger log = Logger.getLogger(getClass().getCanonicalName()); - - private boolean initialized = false; - private com.yahoo.vespa.http.client.FeedClient feedClient; - private final VespaCounters counters; - private final int progressInterval; - - final VespaConfiguration configuration; - - LegacyVespaRecordWriter(VespaConfiguration configuration, VespaCounters counters) { - this.counters = counters; - this.configuration = configuration; - this.progressInterval = configuration.progressInterval(); - } - - - @Override - public void write(Object key, Object data) throws IOException, InterruptedException { - if (!initialized) { - initialize(); - } - - String doc = data.toString().trim(); - - // Parse data to find document id - if none found, skip this write - String docId = DataFormat.JSON_UTF8.equals(configuration.dataFormat()) ? findDocId(doc) - : findDocIdFromXml(doc); - if (docId != null && docId.length() >= 0) { - feedClient.stream(docId, doc); - counters.incrementDocumentsSent(1); - } else { - counters.incrementDocumentsSkipped(1); - } - - if (counters.getDocumentsSent() % progressInterval == 0) { - String progress = String.format("Feed progress: %d / %d / %d / %d (sent, ok, failed, skipped)", - counters.getDocumentsSent(), - counters.getDocumentsOk(), - counters.getDocumentsFailed(), - counters.getDocumentsSkipped()); - log.info(progress); - } - - } - - - @Override - public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { - if (feedClient != null) { - feedClient.close(); - } - } - - protected ConnectionParams.Builder configureConnectionParams() { - ConnectionParams.Builder connParamsBuilder = new ConnectionParams.Builder(); - connParamsBuilder.setDryRun(configuration.dryrun()); - connParamsBuilder.setUseCompression(configuration.useCompression()); - connParamsBuilder.setNumPersistentConnectionsPerEndpoint(configuration.numConnections()); - connParamsBuilder.setMaxRetries(configuration.numRetries()); - if (configuration.proxyHost() != null) { - connParamsBuilder.setProxyHost(configuration.proxyHost()); - } - if (configuration.proxyPort() >= 0) { - connParamsBuilder.setProxyPort(configuration.proxyPort()); - } - return connParamsBuilder; - } - - protected FeedParams.Builder configureFeedParams() { - FeedParams.Builder feedParamsBuilder = new FeedParams.Builder(); - feedParamsBuilder.setDataFormat(configuration.dataFormat()); - feedParamsBuilder.setRoute(configuration.route()); - feedParamsBuilder.setMaxSleepTimeMs(configuration.maxSleepTimeMs()); - feedParamsBuilder.setMaxInFlightRequests(configuration.maxInFlightRequests()); - feedParamsBuilder.setLocalQueueTimeOut(Duration.ofMinutes(10).toMillis()); - return feedParamsBuilder; - } - - protected SessionParams.Builder configureSessionParams() { - SessionParams.Builder sessionParamsBuilder = new SessionParams.Builder(); - sessionParamsBuilder.setThrottlerMinSize(configuration.throttlerMinSize()); - sessionParamsBuilder.setClientQueueSize(configuration.maxInFlightRequests()*2); - return sessionParamsBuilder; - } - - private void initialize() { - if (!configuration.dryrun() && configuration.randomStartupSleepMs() > 0) { - int delay = ThreadLocalRandom.current().nextInt(configuration.randomStartupSleepMs()); - log.info("VespaStorage: Delaying startup by " + delay + " ms"); - try { - Thread.sleep(delay); - } catch (Exception e) {} - } - - ConnectionParams.Builder connParamsBuilder = configureConnectionParams(); - FeedParams.Builder feedParamsBuilder = configureFeedParams(); - SessionParams.Builder sessionParams = configureSessionParams(); - - sessionParams.setConnectionParams(connParamsBuilder.build()); - sessionParams.setFeedParams(feedParamsBuilder.build()); - - String endpoints = configuration.endpoint(); - StringTokenizer tokenizer = new StringTokenizer(endpoints, ","); - while (tokenizer.hasMoreTokens()) { - String endpoint = tokenizer.nextToken().trim(); - sessionParams.addCluster(new Cluster.Builder().addEndpoint( - Endpoint.create(endpoint, configuration.defaultPort(), configuration.useSSL().orElse(false)) - ).build()); - } - - ResultCallback resultCallback = new ResultCallback(counters); - feedClient = com.yahoo.vespa.http.client.FeedClientFactory.create(sessionParams.build(), resultCallback); - - initialized = true; - log.info("VespaStorage configuration:\n" + configuration.toString()); - log.info(feedClient.getStatsAsJson()); - } - - private String findDocIdFromXml(String xml) { - try { - XMLEventReader eventReader = XMLInputFactory.newInstance().createXMLEventReader(new StringReader(xml)); - while (eventReader.hasNext()) { - XMLEvent event = eventReader.nextEvent(); - if (event.getEventType() == XMLEvent.START_ELEMENT) { - StartElement element = event.asStartElement(); - String elementName = element.getName().getLocalPart(); - if (VespaDocumentOperation.Operation.valid(elementName)) { - return element.getAttributeByName(QName.valueOf("documentid")).getValue(); - } - } - } - } catch (XMLStreamException | FactoryConfigurationError e) { - // as json dude does - return null; - } - return null; - } - - private String findDocId(String json) throws IOException { - JsonFactory factory = new JsonFactory(); - try(JsonParser parser = factory.createParser(json)) { - if (parser.nextToken() != JsonToken.START_OBJECT) { - return null; - } - while (parser.nextToken() != JsonToken.END_OBJECT) { - String fieldName = parser.getCurrentName(); - parser.nextToken(); - if (VespaDocumentOperation.Operation.valid(fieldName)) { - String docId = parser.getText(); - return docId; - } else { - parser.skipChildren(); - } - } - } catch (JsonParseException ex) { - return null; - } - return null; - } - - - class ResultCallback implements com.yahoo.vespa.http.client.FeedClient.ResultCallback { - final VespaCounters counters; - - public ResultCallback(VespaCounters counters) { - this.counters = counters; - } - - @Override - public void onCompletion(String docId, com.yahoo.vespa.http.client.Result documentResult) { - if (!documentResult.isSuccess()) { - counters.incrementDocumentsFailed(1); - StringBuilder sb = new StringBuilder(); - sb.append("Problems with docid "); - sb.append(docId); - sb.append(": "); - List<com.yahoo.vespa.http.client.Result.Detail> details = documentResult.getDetails(); - for (com.yahoo.vespa.http.client.Result.Detail detail : details) { - sb.append(detail.toString()); - sb.append(" "); - } - log.warning(sb.toString()); - return; - } - counters.incrementDocumentsOk(1); - } - - } - -} diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java index 66ab94574d9..e49a5e17970 100644 --- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java +++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java @@ -10,15 +10,12 @@ import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import java.io.IOException; -import java.util.Objects; import java.util.Properties; import java.util.logging.Logger; -import static com.yahoo.vespa.http.client.config.FeedParams.DataFormat.XML_UTF8; - /** * An output specification for writing to Vespa instances in a Map-Reduce job. - * Mainly returns an instance of a {@link LegacyVespaRecordWriter} that does the + * Mainly returns an instance of a {@link VespaRecordWriter} that does the * actual feeding to Vespa. * * @author lesters @@ -46,14 +43,7 @@ public class VespaOutputFormat extends OutputFormat { public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOException { VespaCounters counters = VespaCounters.get(context); VespaConfiguration configuration = VespaConfiguration.get(context.getConfiguration(), configOverride); - Boolean useLegacyClient = configuration.useLegacyClient().orElse(null); - if (Objects.equals(useLegacyClient, Boolean.TRUE) || configuration.dataFormat() == XML_UTF8) { - log.warning("Feeding with legacy client or XML will no longer be supported on Vespa 8. " + - "See https://docs.vespa.ai/en/vespa8-release-notes.html"); - return new LegacyVespaRecordWriter(configuration, counters); - } else { - return new VespaRecordWriter(configuration, counters); - } + return new VespaRecordWriter(configuration, counters); } diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java index 6d6c3789835..c450d7cdeef 100644 --- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java +++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java @@ -7,7 +7,6 @@ import ai.vespa.feed.client.JsonFeeder; import ai.vespa.feed.client.OperationParseException; import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration; import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters; -import com.yahoo.vespa.http.client.config.FeedParams; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; @@ -85,18 +84,11 @@ public class VespaRecordWriter extends RecordWriter<Object, Object> { private void initializeOnFirstWrite() { if (initialized) return; - validateConfig(); useRandomizedStartupDelayIfEnabled(); feeder = createJsonStreamFeeder(); initialized = true; } - private void validateConfig() { - if (config.dataFormat() != FeedParams.DataFormat.JSON_UTF8) { - throw new IllegalArgumentException("Only JSON is support by this feed client implementation"); - } - } - private void useRandomizedStartupDelayIfEnabled() { if (!config.dryrun() && config.randomStartupSleepMs() > 0) { int delay = ThreadLocalRandom.current().nextInt(config.randomStartupSleepMs()); diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java index 715546fe6fe..ae0b6a58155 100644 --- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java +++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java @@ -1,7 +1,6 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.hadoop.mapreduce.util; -import com.yahoo.vespa.http.client.config.FeedParams; import org.apache.hadoop.conf.Configuration; import java.io.IOException; @@ -19,7 +18,6 @@ public class VespaConfiguration { public static final String PROXY_SCHEME = "vespa.feed.proxy.scheme"; public static final String DRYRUN = "vespa.feed.dryrun"; public static final String USE_COMPRESSION = "vespa.feed.usecompression"; - public static final String DATA_FORMAT = "vespa.feed.data.format"; public static final String PROGRESS_REPORT = "vespa.feed.progress.interval"; public static final String CONNECTIONS = "vespa.feed.connections"; public static final String THROTTLER_MIN_SIZE = "vespa.feed.throttler.min.size"; @@ -29,7 +27,6 @@ public class VespaConfiguration { public static final String MAX_IN_FLIGHT_REQUESTS = "vespa.feed.max.in.flight.requests"; public static final String RANDOM_STARTUP_SLEEP = "vespa.feed.random.startup.sleep.ms"; public static final String NUM_RETRIES = "vespa.feed.num.retries"; - public static final String USE_LEGACY_CLIENT = "vespa.feed.uselegacyclient"; private final Configuration conf; private final Properties override; @@ -129,25 +126,10 @@ public class VespaConfiguration { } - public FeedParams.DataFormat dataFormat() { - String format = getString(DATA_FORMAT); - if ("xml".equalsIgnoreCase(format)) { - return FeedParams.DataFormat.XML_UTF8; - } - return FeedParams.DataFormat.JSON_UTF8; - } - - public int progressInterval() { return getInt(PROGRESS_REPORT, 1000); } - public Optional<Boolean> useLegacyClient() { - String raw = getString(USE_LEGACY_CLIENT); - if (raw == null || raw.trim().isEmpty()) return Optional.empty(); - return Optional.of(Boolean.parseBoolean(raw)); - } - public String getString(String name) { if (override != null && override.containsKey(name)) { return override.getProperty(name); @@ -197,7 +179,6 @@ public class VespaConfiguration { sb.append(PROXY_SCHEME + ": " + proxyScheme() + "\n"); sb.append(DRYRUN + ": " + dryrun() +"\n"); sb.append(USE_COMPRESSION + ": " + useCompression() +"\n"); - sb.append(DATA_FORMAT + ": " + dataFormat() +"\n"); sb.append(PROGRESS_REPORT + ": " + progressInterval() +"\n"); sb.append(CONNECTIONS + ": " + numConnections() +"\n"); sb.append(THROTTLER_MIN_SIZE + ": " + throttlerMinSize() +"\n"); @@ -207,7 +188,6 @@ public class VespaConfiguration { sb.append(MAX_IN_FLIGHT_REQUESTS + ": " + maxInFlightRequests() +"\n"); sb.append(RANDOM_STARTUP_SLEEP + ": " + randomStartupSleepMs() +"\n"); sb.append(NUM_RETRIES + ": " + numRetries() +"\n"); - sb.append(USE_LEGACY_CLIENT + ": " + useLegacyClient().map(Object::toString).orElse("<empty>") +"\n"); return sb.toString(); } diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java index f690e767194..3183c770bc7 100644 --- a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java +++ b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java @@ -24,14 +24,6 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; public class VespaStorageTest { @Test - public void requireThatPremadeXmlOperationsFeedSucceeds() throws Exception { - Configuration conf = new HdfsConfiguration(); - conf.set(VespaConfiguration.DATA_FORMAT, "xml"); - assertAllDocumentsOk("src/test/pig/feed_operations_xml.pig", conf); - } - - - @Test public void requireThatPremadeOperationsFeedSucceeds() throws Exception { assertAllDocumentsOk("src/test/pig/feed_operations.pig"); } @@ -52,7 +44,6 @@ public class VespaStorageTest { public void requireThatPremadeOperationsWithJsonLoaderFeedAndNonLegacyClientSucceeds() throws Exception { Configuration conf = new HdfsConfiguration(); conf.set(VespaConfiguration.USE_SSL, Boolean.TRUE.toString()); - conf.set(VespaConfiguration.USE_LEGACY_CLIENT, Boolean.FALSE.toString()); assertAllDocumentsOk("src/test/pig/feed_operations_with_json_loader.pig", conf); } |