Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,28 @@ public int proxyCount() {
return this.proxies.length;
}

private Optional<SCProxy> getConfiguredProxy(SCProxy proxy) {
for (SCProxy configuredProxy : this.proxies) {
if (ProxyUtils.isSameProxy(configuredProxy, proxy)) {
return Optional.of(configuredProxy);
}
}
return Optional.empty();
}

@Override
public Optional<SCProxy> getProxy(Metadata metadata) {
if (ProxyMetadata.shouldSkipProxy(metadata)) {
return Optional.empty();
}

Optional<SCProxy> metadataProxy = ProxyMetadata.getProxy(metadata);
if (metadataProxy.isPresent()) {
SCProxy proxy = getConfiguredProxy(metadataProxy.get()).orElse(metadataProxy.get());
proxy.incrementUsage();
return Optional.of(proxy);
}

// create a variable to hold the proxy generated in the following switch statement
SCProxy proxy;

Expand Down
148 changes: 148 additions & 0 deletions core/src/main/java/org/apache/stormcrawler/proxy/ProxyMetadata.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.stormcrawler.proxy;

import java.util.Locale;
import java.util.Optional;
import org.apache.stormcrawler.Metadata;

/** Utilities for resolving per-URL proxy settings from metadata. */
final class ProxyMetadata {

static final String PROXY = "http.proxy";
static final String PROXY_HOST = "http.proxy.host";
static final String PROXY_PORT = "http.proxy.port";
static final String PROXY_TYPE = "http.proxy.type";
static final String PROXY_USER = "http.proxy.user";
static final String PROXY_PASS = "http.proxy.pass";
static final String PROXY_SKIP = "http.proxy.skip";

private ProxyMetadata() {}

static boolean shouldSkipProxy(Metadata metadata) {
if (metadata == null || !metadata.containsKey(PROXY_SKIP)) {
return false;
}

String skip = requiredValue(metadata, PROXY_SKIP);
if ("true".equalsIgnoreCase(skip)) {
return true;
}
if ("false".equalsIgnoreCase(skip)) {
return false;
}

throw new IllegalArgumentException(
"metadata key `" + PROXY_SKIP + "` must be `true` or `false`, got `" + skip + "`");
}

static Optional<SCProxy> getProxy(Metadata metadata) {
if (metadata == null) {
return Optional.empty();
}

if (metadata.containsKey(PROXY)) {
String proxy = requiredValue(metadata, PROXY);
try {
return Optional.of(new SCProxy(proxy));
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException(
"metadata key `" + PROXY + "` must be a valid proxy connection string");
}
}

if (!containsProxyField(metadata)) {
return Optional.empty();
}

String host = requiredValue(metadata, PROXY_HOST);
String type = valueOrDefault(metadata, PROXY_TYPE, "HTTP");
String port = valueOrDefault(metadata, PROXY_PORT, "8080");
String username = value(metadata, PROXY_USER);
String password = value(metadata, PROXY_PASS);

validatePort(port);
if ((username == null) != (password == null)) {
throw new IllegalArgumentException(
"metadata proxy authentication requires both `"
+ PROXY_USER
+ "` and `"
+ PROXY_PASS
+ "`");
}

return Optional.of(
new SCProxy(
type.toLowerCase(Locale.ROOT),
host,
port,
username == null ? "" : username,
password == null ? "" : password,
"",
"",
"",
""));
}

private static boolean containsProxyField(Metadata metadata) {
return metadata.containsKey(PROXY_HOST)
|| metadata.containsKey(PROXY_PORT)
|| metadata.containsKey(PROXY_TYPE)
|| metadata.containsKey(PROXY_USER)
|| metadata.containsKey(PROXY_PASS);
}

private static String valueOrDefault(Metadata metadata, String key, String defaultValue) {
if (!metadata.containsKey(key)) {
return defaultValue;
}
return requiredValue(metadata, key);
}

private static String requiredValue(Metadata metadata, String key) {
String value = value(metadata, key);
if (value == null) {
throw new IllegalArgumentException("metadata key `" + key + "` must not be blank");
}
return value;
}

private static String value(Metadata metadata, String key) {
if (!metadata.containsKey(key)) {
return null;
}

String value = metadata.getFirstValue(key);
if (value == null || value.trim().isEmpty()) {
return null;
}
return value.trim();
}

private static void validatePort(String port) {
int parsedPort;
try {
parsedPort = Integer.parseInt(port);
} catch (NumberFormatException e) {
throw new IllegalArgumentException(
"metadata key `" + PROXY_PORT + "` must be an integer, got `" + port + "`", e);
}

ProxyUtils.validatePortRange(parsedPort, "metadata key `" + PROXY_PORT + "`", port);
}
}
55 changes: 55 additions & 0 deletions core/src/main/java/org/apache/stormcrawler/proxy/ProxyUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.stormcrawler.proxy;

import java.util.Locale;
import java.util.Objects;

final class ProxyUtils {

static final int MIN_PORT = 1;
static final int MAX_PORT = 65535;

private ProxyUtils() {}

static void validatePortRange(int port, String source, String value) {
if (port < MIN_PORT || port > MAX_PORT) {
throw new IllegalArgumentException(
source
+ " must be between "
+ MIN_PORT
+ " and "
+ MAX_PORT
+ ", got `"
+ value
+ "`");
}
}

static boolean isSameProxy(SCProxy proxy, SCProxy otherProxy) {
return Objects.equals(normalize(proxy.getProtocol()), normalize(otherProxy.getProtocol()))
&& Objects.equals(normalize(proxy.getAddress()), normalize(otherProxy.getAddress()))
&& Objects.equals(proxy.getPort(), otherProxy.getPort())
&& Objects.equals(proxy.getUsername(), otherProxy.getUsername())
&& Objects.equals(proxy.getPassword(), otherProxy.getPassword());
}

private static String normalize(String value) {
return value == null ? null : value.toLowerCase(Locale.ROOT);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,30 +39,48 @@ public void configure(Config conf) {

// values for single proxy
String proxyHost = ConfUtils.getString(conf, "http.proxy.host", null);
if (proxyHost == null) {
this.proxy = null;
return;
}

String proxyType = ConfUtils.getString(conf, "http.proxy.type", "HTTP");
int proxyPort = ConfUtils.getInt(conf, "http.proxy.port", 8080);
String proxyUsername = ConfUtils.getString(conf, "http.proxy.user", null);
String proxyPassword = ConfUtils.getString(conf, "http.proxy.pass", null);
ProxyUtils.validatePortRange(
proxyPort, "config key `http.proxy.port`", Integer.toString(proxyPort));

// assemble proxy connection string
String proxyString = proxyType.toLowerCase(Locale.ROOT) + "://";

// conditionally append authentication info
if (proxyUsername != null
&& !proxyUsername.isEmpty()
&& proxyPassword != null
&& !proxyPassword.isEmpty()) {
proxyString += proxyUsername + ":" + proxyPassword + "@";
}
boolean hasAuth =
proxyUsername != null
&& !proxyUsername.isEmpty()
&& proxyPassword != null
&& !proxyPassword.isEmpty();

// complete proxy string and create proxy
this.proxy =
new SCProxy(
proxyString + String.format(Locale.ROOT, "%s:%d", proxyHost, proxyPort));
proxyType.toLowerCase(Locale.ROOT),
proxyHost,
Integer.toString(proxyPort),
hasAuth ? proxyUsername : "",
hasAuth ? proxyPassword : "",
"",
"",
"",
"");
}

@Override
public Optional<SCProxy> getProxy(Metadata metadata) {
return Optional.of(proxy);
if (ProxyMetadata.shouldSkipProxy(metadata)) {
return Optional.empty();
}

Optional<SCProxy> metadataProxy = ProxyMetadata.getProxy(metadata);
if (metadataProxy.isPresent()) {
return metadataProxy;
}

return Optional.ofNullable(proxy);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.github.tomakehurst.wiremock.junit5.WireMockRuntimeInfo;
import com.github.tomakehurst.wiremock.junit5.WireMockTest;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -41,6 +42,7 @@
import org.apache.stormcrawler.TestOutputCollector;
import org.apache.stormcrawler.TestUtil;
import org.apache.stormcrawler.persistence.Status;
import org.apache.stormcrawler.protocol.ProtocolFactory;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -150,4 +152,49 @@ void testThreadTimeout(WireMockRuntimeInfo wmRuntimeInfo) {
// nothing on the default stream — no content was fetched
Assertions.assertEquals(0, output.getEmitted(Utils.DEFAULT_STREAM_ID).size());
}

@Test
void invalidProxyMetadataEmitsFetchError(WireMockRuntimeInfo wmRuntimeInfo)
throws ReflectiveOperationException {
stubFor(get(urlMatching("/invalid-proxy")).willReturn(aResponse().withStatus(200)));

resetProtocolFactory();
TestOutputCollector output = new TestOutputCollector();
Map<String, Object> config = new HashMap<>();
config.put("http.agent.name", "this_is_only_a_test");
config.put("http.proxy.manager", "org.apache.stormcrawler.proxy.SingleProxyManager");
bolt.prepare(config, TestUtil.getMockedTopologyContext(), new OutputCollector(output));

Metadata metadata = new Metadata();
metadata.setValue("http.proxy.host", "proxy.example.com");
metadata.setValue("http.proxy.port", "not-a-port");

Tuple tuple = mock(Tuple.class);
String url = "http://localhost:" + wmRuntimeInfo.getHttpPort() + "/invalid-proxy";
when(tuple.getSourceComponent()).thenReturn("source");
when(tuple.contains("metadata")).thenReturn(true);
when(tuple.getStringByField("url")).thenReturn(url);
when(tuple.getValueByField("metadata")).thenReturn(metadata);
bolt.execute(tuple);

await().atMost(8, TimeUnit.SECONDS).until(() -> output.getAckedTuples().contains(tuple));

Assertions.assertFalse(output.getFailedTuples().contains(tuple));
List<List<Object>> statusTuples = output.getEmitted(Constants.StatusStreamName);
Assertions.assertEquals(1, statusTuples.size());
Assertions.assertEquals(url, statusTuples.get(0).get(0));
Metadata statusMetadata = (Metadata) statusTuples.get(0).get(1);
Assertions.assertEquals(Status.FETCH_ERROR, statusTuples.get(0).get(2));
Assertions.assertEquals(
IllegalArgumentException.class.getName(),
statusMetadata.getFirstValue("fetch.exception"));

Assertions.assertEquals(0, output.getEmitted(Utils.DEFAULT_STREAM_ID).size());
}

private void resetProtocolFactory() throws ReflectiveOperationException {
Field instance = ProtocolFactory.class.getDeclaredField("single_instance");
instance.setAccessible(true);
instance.set(null, null);
}
}
Loading