From 033d6494edc17b554ab841c3f5ea70bc5f8925de Mon Sep 17 00:00:00 2001 From: Jon Bratseth Date: Tue, 7 Jun 2022 11:59:03 +0200 Subject: Revert "Remove http client use" This reverts commit a7fd13540d34de50ed3526576c62eebc476a1e1c. --- .../hadoop/mapreduce/LegacyVespaRecordWriter.java | 233 +++++++++++++++++++++ .../vespa/hadoop/mapreduce/VespaOutputFormat.java | 18 +- .../vespa/hadoop/mapreduce/VespaRecordWriter.java | 8 + .../hadoop/mapreduce/util/VespaConfiguration.java | 38 ++++ .../yahoo/vespa/hadoop/pig/VespaStorageTest.java | 18 +- 5 files changed, 313 insertions(+), 2 deletions(-) create mode 100644 vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/LegacyVespaRecordWriter.java (limited to 'vespa-hadoop/src') diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/LegacyVespaRecordWriter.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/LegacyVespaRecordWriter.java new file mode 100644 index 00000000000..6900c7dc82f --- /dev/null +++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/LegacyVespaRecordWriter.java @@ -0,0 +1,233 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.hadoop.mapreduce; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration; +import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters; +import com.yahoo.vespa.hadoop.pig.VespaDocumentOperation; +import com.yahoo.vespa.http.client.config.Cluster; +import com.yahoo.vespa.http.client.config.ConnectionParams; +import com.yahoo.vespa.http.client.config.Endpoint; +import com.yahoo.vespa.http.client.config.FeedParams; +import com.yahoo.vespa.http.client.config.FeedParams.DataFormat; +import com.yahoo.vespa.http.client.config.SessionParams; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import javax.xml.namespace.QName; +import javax.xml.stream.FactoryConfigurationError; +import javax.xml.stream.XMLEventReader; +import javax.xml.stream.XMLInputFactory; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.events.StartElement; +import javax.xml.stream.events.XMLEvent; +import java.io.IOException; +import java.io.StringReader; +import java.time.Duration; +import java.util.List; +import java.util.StringTokenizer; +import java.util.concurrent.ThreadLocalRandom; +import java.util.logging.Logger; + +/** + * {@link LegacyVespaRecordWriter} sends the output <key, value> to one or more Vespa endpoints using vespa-http-client. + * + * @author lesters + * @deprecated Replaced by {@link VespaRecordWriter} + */ +@Deprecated +public class LegacyVespaRecordWriter extends RecordWriter { + + private final Logger log = Logger.getLogger(getClass().getCanonicalName()); + + private boolean initialized = false; + private com.yahoo.vespa.http.client.FeedClient feedClient; + private final VespaCounters counters; + private final int progressInterval; + + final VespaConfiguration configuration; + + LegacyVespaRecordWriter(VespaConfiguration configuration, VespaCounters counters) { + this.counters = counters; + this.configuration = configuration; + this.progressInterval = configuration.progressInterval(); + } + + + @Override + public void write(Object key, Object data) throws IOException, InterruptedException { + if (!initialized) { + initialize(); + } + + String doc = data.toString().trim(); + + // Parse data to find document id - if none found, skip this write + String docId = DataFormat.JSON_UTF8.equals(configuration.dataFormat()) ? findDocId(doc) + : findDocIdFromXml(doc); + if (docId != null && docId.length() >= 0) { + feedClient.stream(docId, doc); + counters.incrementDocumentsSent(1); + } else { + counters.incrementDocumentsSkipped(1); + } + + if (counters.getDocumentsSent() % progressInterval == 0) { + String progress = String.format("Feed progress: %d / %d / %d / %d (sent, ok, failed, skipped)", + counters.getDocumentsSent(), + counters.getDocumentsOk(), + counters.getDocumentsFailed(), + counters.getDocumentsSkipped()); + log.info(progress); + } + + } + + + @Override + public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { + if (feedClient != null) { + feedClient.close(); + } + } + + protected ConnectionParams.Builder configureConnectionParams() { + ConnectionParams.Builder connParamsBuilder = new ConnectionParams.Builder(); + connParamsBuilder.setDryRun(configuration.dryrun()); + connParamsBuilder.setUseCompression(configuration.useCompression()); + connParamsBuilder.setNumPersistentConnectionsPerEndpoint(configuration.numConnections()); + connParamsBuilder.setMaxRetries(configuration.numRetries()); + if (configuration.proxyHost() != null) { + connParamsBuilder.setProxyHost(configuration.proxyHost()); + } + if (configuration.proxyPort() >= 0) { + connParamsBuilder.setProxyPort(configuration.proxyPort()); + } + return connParamsBuilder; + } + + protected FeedParams.Builder configureFeedParams() { + FeedParams.Builder feedParamsBuilder = new FeedParams.Builder(); + feedParamsBuilder.setDataFormat(configuration.dataFormat()); + feedParamsBuilder.setRoute(configuration.route()); + feedParamsBuilder.setMaxSleepTimeMs(configuration.maxSleepTimeMs()); + feedParamsBuilder.setMaxInFlightRequests(configuration.maxInFlightRequests()); + feedParamsBuilder.setLocalQueueTimeOut(Duration.ofMinutes(10).toMillis()); + return feedParamsBuilder; + } + + protected SessionParams.Builder configureSessionParams() { + SessionParams.Builder sessionParamsBuilder = new SessionParams.Builder(); + sessionParamsBuilder.setThrottlerMinSize(configuration.throttlerMinSize()); + sessionParamsBuilder.setClientQueueSize(configuration.maxInFlightRequests()*2); + return sessionParamsBuilder; + } + + private void initialize() { + if (!configuration.dryrun() && configuration.randomStartupSleepMs() > 0) { + int delay = ThreadLocalRandom.current().nextInt(configuration.randomStartupSleepMs()); + log.info("VespaStorage: Delaying startup by " + delay + " ms"); + try { + Thread.sleep(delay); + } catch (Exception e) {} + } + + ConnectionParams.Builder connParamsBuilder = configureConnectionParams(); + FeedParams.Builder feedParamsBuilder = configureFeedParams(); + SessionParams.Builder sessionParams = configureSessionParams(); + + sessionParams.setConnectionParams(connParamsBuilder.build()); + sessionParams.setFeedParams(feedParamsBuilder.build()); + + String endpoints = configuration.endpoint(); + StringTokenizer tokenizer = new StringTokenizer(endpoints, ","); + while (tokenizer.hasMoreTokens()) { + String endpoint = tokenizer.nextToken().trim(); + sessionParams.addCluster(new Cluster.Builder().addEndpoint( + Endpoint.create(endpoint, configuration.defaultPort(), configuration.useSSL().orElse(false)) + ).build()); + } + + ResultCallback resultCallback = new ResultCallback(counters); + feedClient = com.yahoo.vespa.http.client.FeedClientFactory.create(sessionParams.build(), resultCallback); + + initialized = true; + log.info("VespaStorage configuration:\n" + configuration.toString()); + log.info(feedClient.getStatsAsJson()); + } + + private String findDocIdFromXml(String xml) { + try { + XMLEventReader eventReader = XMLInputFactory.newInstance().createXMLEventReader(new StringReader(xml)); + while (eventReader.hasNext()) { + XMLEvent event = eventReader.nextEvent(); + if (event.getEventType() == XMLEvent.START_ELEMENT) { + StartElement element = event.asStartElement(); + String elementName = element.getName().getLocalPart(); + if (VespaDocumentOperation.Operation.valid(elementName)) { + return element.getAttributeByName(QName.valueOf("documentid")).getValue(); + } + } + } + } catch (XMLStreamException | FactoryConfigurationError e) { + // as json dude does + return null; + } + return null; + } + + private String findDocId(String json) throws IOException { + JsonFactory factory = new JsonFactory(); + try(JsonParser parser = factory.createParser(json)) { + if (parser.nextToken() != JsonToken.START_OBJECT) { + return null; + } + while (parser.nextToken() != JsonToken.END_OBJECT) { + String fieldName = parser.getCurrentName(); + parser.nextToken(); + if (VespaDocumentOperation.Operation.valid(fieldName)) { + String docId = parser.getText(); + return docId; + } else { + parser.skipChildren(); + } + } + } catch (JsonParseException ex) { + return null; + } + return null; + } + + + class ResultCallback implements com.yahoo.vespa.http.client.FeedClient.ResultCallback { + final VespaCounters counters; + + public ResultCallback(VespaCounters counters) { + this.counters = counters; + } + + @Override + public void onCompletion(String docId, com.yahoo.vespa.http.client.Result documentResult) { + if (!documentResult.isSuccess()) { + counters.incrementDocumentsFailed(1); + StringBuilder sb = new StringBuilder(); + sb.append("Problems with docid "); + sb.append(docId); + sb.append(": "); + List details = documentResult.getDetails(); + for (com.yahoo.vespa.http.client.Result.Detail detail : details) { + sb.append(detail.toString()); + sb.append(" "); + } + log.warning(sb.toString()); + return; + } + counters.incrementDocumentsOk(1); + } + + } + +} diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java index 42eb6293eee..66ab94574d9 100644 --- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java +++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java @@ -10,16 +10,24 @@ import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import java.io.IOException; +import java.util.Objects; import java.util.Properties; +import java.util.logging.Logger; + +import static com.yahoo.vespa.http.client.config.FeedParams.DataFormat.XML_UTF8; /** * An output specification for writing to Vespa instances in a Map-Reduce job. + * Mainly returns an instance of a {@link LegacyVespaRecordWriter} that does the + * actual feeding to Vespa. * * @author lesters */ @SuppressWarnings("rawtypes") public class VespaOutputFormat extends OutputFormat { + private static final Logger log = Logger.getLogger(VespaOutputFormat.class.getName()); + final Properties configOverride; public VespaOutputFormat() { @@ -34,10 +42,18 @@ public class VespaOutputFormat extends OutputFormat { @Override + @SuppressWarnings("deprecation") public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOException { VespaCounters counters = VespaCounters.get(context); VespaConfiguration configuration = VespaConfiguration.get(context.getConfiguration(), configOverride); - return new VespaRecordWriter(configuration, counters); + Boolean useLegacyClient = configuration.useLegacyClient().orElse(null); + if (Objects.equals(useLegacyClient, Boolean.TRUE) || configuration.dataFormat() == XML_UTF8) { + log.warning("Feeding with legacy client or XML will no longer be supported on Vespa 8. " + + "See https://docs.vespa.ai/en/vespa8-release-notes.html"); + return new LegacyVespaRecordWriter(configuration, counters); + } else { + return new VespaRecordWriter(configuration, counters); + } } diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java index c450d7cdeef..6d6c3789835 100644 --- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java +++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java @@ -7,6 +7,7 @@ import ai.vespa.feed.client.JsonFeeder; import ai.vespa.feed.client.OperationParseException; import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration; import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters; +import com.yahoo.vespa.http.client.config.FeedParams; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; @@ -84,11 +85,18 @@ public class VespaRecordWriter extends RecordWriter { private void initializeOnFirstWrite() { if (initialized) return; + validateConfig(); useRandomizedStartupDelayIfEnabled(); feeder = createJsonStreamFeeder(); initialized = true; } + private void validateConfig() { + if (config.dataFormat() != FeedParams.DataFormat.JSON_UTF8) { + throw new IllegalArgumentException("Only JSON is support by this feed client implementation"); + } + } + private void useRandomizedStartupDelayIfEnabled() { if (!config.dryrun() && config.randomStartupSleepMs() > 0) { int delay = ThreadLocalRandom.current().nextInt(config.randomStartupSleepMs()); diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java index 54be261fbe7..715546fe6fe 100644 --- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java +++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java @@ -1,6 +1,7 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.hadoop.mapreduce.util; +import com.yahoo.vespa.http.client.config.FeedParams; import org.apache.hadoop.conf.Configuration; import java.io.IOException; @@ -18,6 +19,7 @@ public class VespaConfiguration { public static final String PROXY_SCHEME = "vespa.feed.proxy.scheme"; public static final String DRYRUN = "vespa.feed.dryrun"; public static final String USE_COMPRESSION = "vespa.feed.usecompression"; + public static final String DATA_FORMAT = "vespa.feed.data.format"; public static final String PROGRESS_REPORT = "vespa.feed.progress.interval"; public static final String CONNECTIONS = "vespa.feed.connections"; public static final String THROTTLER_MIN_SIZE = "vespa.feed.throttler.min.size"; @@ -27,6 +29,7 @@ public class VespaConfiguration { public static final String MAX_IN_FLIGHT_REQUESTS = "vespa.feed.max.in.flight.requests"; public static final String RANDOM_STARTUP_SLEEP = "vespa.feed.random.startup.sleep.ms"; public static final String NUM_RETRIES = "vespa.feed.num.retries"; + public static final String USE_LEGACY_CLIENT = "vespa.feed.uselegacyclient"; private final Configuration conf; private final Properties override; @@ -36,82 +39,115 @@ public class VespaConfiguration { this.override = override; } + public static VespaConfiguration get(Configuration conf, Properties override) { return new VespaConfiguration(conf, override); } + public String endpoint() { return getString(ENDPOINT); } + public int defaultPort() { return getInt(DEFAULT_PORT, 4080); } + public Optional useSSL() { String raw = getString(USE_SSL); if (raw == null || raw.trim().isEmpty()) return Optional.empty(); return Optional.of(Boolean.parseBoolean(raw)); } + public String proxyHost() { return getString(PROXY_HOST); } + public int proxyPort() { return getInt(PROXY_PORT, 4080); } + public String proxyScheme() { String raw = getString(PROXY_SCHEME); if (raw == null) return "http"; return raw; } + public boolean dryrun() { return getBoolean(DRYRUN, false); } + public boolean useCompression() { return getBoolean(USE_COMPRESSION, true); } + public int numConnections() { return getInt(CONNECTIONS, 1); } + public int throttlerMinSize() { return getInt(THROTTLER_MIN_SIZE, 0); } + public int queryConnectionTimeout() { return getInt(QUERY_CONNECTION_TIMEOUT, 10000); } + public String route() { return getString(ROUTE); } + public int maxSleepTimeMs() { return getInt(MAX_SLEEP_TIME_MS, 10000); } + public int maxInFlightRequests() { return getInt(MAX_IN_FLIGHT_REQUESTS, 500); } + public int randomStartupSleepMs() { return getInt(RANDOM_STARTUP_SLEEP, 30000); } + public int numRetries() { return getInt(NUM_RETRIES, 100); } + + public FeedParams.DataFormat dataFormat() { + String format = getString(DATA_FORMAT); + if ("xml".equalsIgnoreCase(format)) { + return FeedParams.DataFormat.XML_UTF8; + } + return FeedParams.DataFormat.JSON_UTF8; + } + + public int progressInterval() { return getInt(PROGRESS_REPORT, 1000); } + public Optional useLegacyClient() { + String raw = getString(USE_LEGACY_CLIENT); + if (raw == null || raw.trim().isEmpty()) return Optional.empty(); + return Optional.of(Boolean.parseBoolean(raw)); + } + public String getString(String name) { if (override != null && override.containsKey(name)) { return override.getProperty(name); @@ -161,6 +197,7 @@ public class VespaConfiguration { sb.append(PROXY_SCHEME + ": " + proxyScheme() + "\n"); sb.append(DRYRUN + ": " + dryrun() +"\n"); sb.append(USE_COMPRESSION + ": " + useCompression() +"\n"); + sb.append(DATA_FORMAT + ": " + dataFormat() +"\n"); sb.append(PROGRESS_REPORT + ": " + progressInterval() +"\n"); sb.append(CONNECTIONS + ": " + numConnections() +"\n"); sb.append(THROTTLER_MIN_SIZE + ": " + throttlerMinSize() +"\n"); @@ -170,6 +207,7 @@ public class VespaConfiguration { sb.append(MAX_IN_FLIGHT_REQUESTS + ": " + maxInFlightRequests() +"\n"); sb.append(RANDOM_STARTUP_SLEEP + ": " + randomStartupSleepMs() +"\n"); sb.append(NUM_RETRIES + ": " + numRetries() +"\n"); + sb.append(USE_LEGACY_CLIENT + ": " + useLegacyClient().map(Object::toString).orElse("") +"\n"); return sb.toString(); } diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java index 39b24799002..f690e767194 100644 --- a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java +++ b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java @@ -23,25 +23,36 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; public class VespaStorageTest { + @Test + public void requireThatPremadeXmlOperationsFeedSucceeds() throws Exception { + Configuration conf = new HdfsConfiguration(); + conf.set(VespaConfiguration.DATA_FORMAT, "xml"); + assertAllDocumentsOk("src/test/pig/feed_operations_xml.pig", conf); + } + + @Test public void requireThatPremadeOperationsFeedSucceeds() throws Exception { assertAllDocumentsOk("src/test/pig/feed_operations.pig"); } + @Test public void requireThatPremadeMultilineOperationsFeedSucceeds() throws Exception { assertAllDocumentsOk("src/test/pig/feed_multiline_operations.pig"); } + @Test public void requireThatPremadeOperationsWithJsonLoaderFeedSucceeds() throws Exception { assertAllDocumentsOk("src/test/pig/feed_operations_with_json_loader.pig"); } @Test - public void requireThatPremadeOperationsWithJsonLoaderFeedWithSllSucceeds() throws Exception { + public void requireThatPremadeOperationsWithJsonLoaderFeedAndNonLegacyClientSucceeds() throws Exception { Configuration conf = new HdfsConfiguration(); conf.set(VespaConfiguration.USE_SSL, Boolean.TRUE.toString()); + conf.set(VespaConfiguration.USE_LEGACY_CLIENT, Boolean.FALSE.toString()); assertAllDocumentsOk("src/test/pig/feed_operations_with_json_loader.pig", conf); } @@ -50,16 +61,19 @@ public class VespaStorageTest { assertAllDocumentsOk("src/test/pig/feed_create_operations.pig"); } + @Test public void requireThatCreateOperationsShortFormFeedSucceeds() throws Exception { assertAllDocumentsOk("src/test/pig/feed_create_operations_short_form.pig"); } + @Test public void requireThatFeedVisitDataSucceeds() throws Exception { assertAllDocumentsOk("src/test/pig/feed_visit_data.pig"); } + private PigServer setup(String script, Configuration conf) throws Exception { if (conf == null) { conf = new HdfsConfiguration(); @@ -78,10 +92,12 @@ public class VespaStorageTest { return ps; } + private void assertAllDocumentsOk(String script) throws Exception { assertAllDocumentsOk(script, null); } + private void assertAllDocumentsOk(String script, Configuration conf) throws Exception { PigServer ps = setup(script, conf); List jobs = ps.executeBatch(); -- cgit v1.2.3