|
|
|
@@ -20,19 +20,36 @@ import nu.marginalia.storage.FileStorageService;
|
|
|
|
|
import nu.marginalia.storage.model.FileStorage;
|
|
|
|
|
import nu.marginalia.storage.model.FileStorageType;
|
|
|
|
|
import nu.marginalia.util.SimpleBlockingThreadPool;
|
|
|
|
|
import org.apache.hc.client5.http.ConnectionKeepAliveStrategy;
|
|
|
|
|
import org.apache.hc.client5.http.classic.HttpClient;
|
|
|
|
|
import org.apache.hc.client5.http.config.ConnectionConfig;
|
|
|
|
|
import org.apache.hc.client5.http.config.RequestConfig;
|
|
|
|
|
import org.apache.hc.client5.http.cookie.StandardCookieSpec;
|
|
|
|
|
import org.apache.hc.client5.http.impl.classic.HttpClients;
|
|
|
|
|
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManagerBuilder;
|
|
|
|
|
import org.apache.hc.core5.http.Header;
|
|
|
|
|
import org.apache.hc.core5.http.HeaderElement;
|
|
|
|
|
import org.apache.hc.core5.http.HeaderElements;
|
|
|
|
|
import org.apache.hc.core5.http.HttpResponse;
|
|
|
|
|
import org.apache.hc.core5.http.io.SocketConfig;
|
|
|
|
|
import org.apache.hc.core5.http.io.entity.EntityUtils;
|
|
|
|
|
import org.apache.hc.core5.http.io.support.ClassicRequestBuilder;
|
|
|
|
|
import org.apache.hc.core5.http.message.MessageSupport;
|
|
|
|
|
import org.apache.hc.core5.http.protocol.HttpContext;
|
|
|
|
|
import org.apache.hc.core5.util.TimeValue;
|
|
|
|
|
import org.apache.hc.core5.util.Timeout;
|
|
|
|
|
import org.slf4j.Logger;
|
|
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
|
|
|
|
import javax.annotation.Nullable;
|
|
|
|
|
import java.io.ByteArrayInputStream;
|
|
|
|
|
import java.io.IOException;
|
|
|
|
|
import java.net.URI;
|
|
|
|
|
import java.net.URISyntaxException;
|
|
|
|
|
import java.net.http.HttpClient;
|
|
|
|
|
import java.net.http.HttpRequest;
|
|
|
|
|
import java.net.http.HttpResponse;
|
|
|
|
|
import java.sql.SQLException;
|
|
|
|
|
import java.time.*;
|
|
|
|
|
import java.time.Instant;
|
|
|
|
|
import java.time.LocalDateTime;
|
|
|
|
|
import java.time.ZoneId;
|
|
|
|
|
import java.time.ZonedDateTime;
|
|
|
|
|
import java.time.format.DateTimeFormatter;
|
|
|
|
|
import java.util.*;
|
|
|
|
|
import java.util.concurrent.ExecutorService;
|
|
|
|
@@ -55,6 +72,8 @@ public class FeedFetcherService {
|
|
|
|
|
|
|
|
|
|
private final DomainCoordinator domainCoordinator;
|
|
|
|
|
|
|
|
|
|
private final HttpClient httpClient;
|
|
|
|
|
|
|
|
|
|
private volatile boolean updating;
|
|
|
|
|
|
|
|
|
|
@Inject
|
|
|
|
@@ -71,6 +90,83 @@ public class FeedFetcherService {
|
|
|
|
|
this.serviceHeartbeat = serviceHeartbeat;
|
|
|
|
|
this.executorClient = executorClient;
|
|
|
|
|
this.domainCoordinator = domainCoordinator;
|
|
|
|
|
|
|
|
|
|
final ConnectionConfig connectionConfig = ConnectionConfig.custom()
|
|
|
|
|
.setSocketTimeout(15, TimeUnit.SECONDS)
|
|
|
|
|
.setConnectTimeout(15, TimeUnit.SECONDS)
|
|
|
|
|
.setValidateAfterInactivity(TimeValue.ofSeconds(5))
|
|
|
|
|
.build();
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
var connectionManager = PoolingHttpClientConnectionManagerBuilder.create()
|
|
|
|
|
.setMaxConnPerRoute(2)
|
|
|
|
|
.setMaxConnTotal(50)
|
|
|
|
|
.setDefaultConnectionConfig(connectionConfig)
|
|
|
|
|
.build();
|
|
|
|
|
|
|
|
|
|
connectionManager.setDefaultSocketConfig(SocketConfig.custom()
|
|
|
|
|
.setSoLinger(TimeValue.ofSeconds(-1))
|
|
|
|
|
.setSoTimeout(Timeout.ofSeconds(10))
|
|
|
|
|
.build()
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
Thread.ofPlatform().daemon(true).start(() -> {
|
|
|
|
|
try {
|
|
|
|
|
for (;;) {
|
|
|
|
|
TimeUnit.SECONDS.sleep(15);
|
|
|
|
|
logger.info("Connection pool stats: {}", connectionManager.getTotalStats());
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
catch (InterruptedException e) {
|
|
|
|
|
Thread.currentThread().interrupt();
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
final RequestConfig defaultRequestConfig = RequestConfig.custom()
|
|
|
|
|
.setCookieSpec(StandardCookieSpec.IGNORE)
|
|
|
|
|
.setResponseTimeout(10, TimeUnit.SECONDS)
|
|
|
|
|
.setConnectionRequestTimeout(5, TimeUnit.MINUTES)
|
|
|
|
|
.build();
|
|
|
|
|
|
|
|
|
|
httpClient = HttpClients.custom()
|
|
|
|
|
.setDefaultRequestConfig(defaultRequestConfig)
|
|
|
|
|
.setConnectionManager(connectionManager)
|
|
|
|
|
.setUserAgent(WmsaHome.getUserAgent().uaIdentifier())
|
|
|
|
|
.setConnectionManager(connectionManager)
|
|
|
|
|
.setKeepAliveStrategy(new ConnectionKeepAliveStrategy() {
|
|
|
|
|
// Default keep-alive duration is 3 minutes, but this is too long for us,
|
|
|
|
|
// as we are either going to re-use it fairly quickly or close it for a long time.
|
|
|
|
|
//
|
|
|
|
|
// So we set it to 30 seconds or clamp the server-provided value to a minimum of 10 seconds.
|
|
|
|
|
private static final TimeValue defaultValue = TimeValue.ofSeconds(30);
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public TimeValue getKeepAliveDuration(HttpResponse response, HttpContext context) {
|
|
|
|
|
final Iterator<HeaderElement> it = MessageSupport.iterate(response, HeaderElements.KEEP_ALIVE);
|
|
|
|
|
|
|
|
|
|
while (it.hasNext()) {
|
|
|
|
|
final HeaderElement he = it.next();
|
|
|
|
|
final String param = he.getName();
|
|
|
|
|
final String value = he.getValue();
|
|
|
|
|
|
|
|
|
|
if (value == null)
|
|
|
|
|
continue;
|
|
|
|
|
if (!"timeout".equalsIgnoreCase(param))
|
|
|
|
|
continue;
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
long timeout = Long.parseLong(value);
|
|
|
|
|
timeout = Math.clamp(timeout, 30, defaultValue.toSeconds());
|
|
|
|
|
return TimeValue.ofSeconds(timeout);
|
|
|
|
|
} catch (final NumberFormatException ignore) {
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return defaultValue;
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
.build();
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public enum UpdateMode {
|
|
|
|
@@ -86,13 +182,7 @@ public class FeedFetcherService {
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try (FeedDbWriter writer = feedDb.createWriter();
|
|
|
|
|
HttpClient client = HttpClient.newBuilder()
|
|
|
|
|
.connectTimeout(Duration.ofSeconds(15))
|
|
|
|
|
.executor(Executors.newCachedThreadPool())
|
|
|
|
|
.followRedirects(HttpClient.Redirect.NORMAL)
|
|
|
|
|
.version(HttpClient.Version.HTTP_2)
|
|
|
|
|
.build();
|
|
|
|
|
ExecutorService fetchExecutor = Executors.newCachedThreadPool();
|
|
|
|
|
ExecutorService fetchExecutor = Executors.newVirtualThreadPerTaskExecutor();
|
|
|
|
|
FeedJournal feedJournal = FeedJournal.create();
|
|
|
|
|
var heartbeat = serviceHeartbeat.createServiceAdHocTaskHeartbeat("Update Rss Feeds")
|
|
|
|
|
) {
|
|
|
|
@@ -137,7 +227,8 @@ public class FeedFetcherService {
|
|
|
|
|
|
|
|
|
|
FetchResult feedData;
|
|
|
|
|
try (DomainLock domainLock = domainCoordinator.lockDomain(new EdgeDomain(feed.domain()))) {
|
|
|
|
|
feedData = fetchFeedData(feed, client, fetchExecutor, ifModifiedSinceDate, ifNoneMatchTag);
|
|
|
|
|
feedData = fetchFeedData(feed, fetchExecutor, ifModifiedSinceDate, ifNoneMatchTag);
|
|
|
|
|
TimeUnit.SECONDS.sleep(1); // Sleep before we yield the lock to avoid hammering the server from multiple processes
|
|
|
|
|
} catch (Exception ex) {
|
|
|
|
|
feedData = new FetchResult.TransientError();
|
|
|
|
|
}
|
|
|
|
@@ -216,7 +307,6 @@ public class FeedFetcherService {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private FetchResult fetchFeedData(FeedDefinition feed,
|
|
|
|
|
HttpClient client,
|
|
|
|
|
ExecutorService executorService,
|
|
|
|
|
@Nullable String ifModifiedSinceDate,
|
|
|
|
|
@Nullable String ifNoneMatchTag)
|
|
|
|
@@ -224,59 +314,63 @@ public class FeedFetcherService {
|
|
|
|
|
try {
|
|
|
|
|
URI uri = new URI(feed.feedUrl());
|
|
|
|
|
|
|
|
|
|
HttpRequest.Builder requestBuilder = HttpRequest.newBuilder()
|
|
|
|
|
.GET()
|
|
|
|
|
.uri(uri)
|
|
|
|
|
.header("User-Agent", WmsaHome.getUserAgent().uaIdentifier())
|
|
|
|
|
.header("Accept-Encoding", "gzip")
|
|
|
|
|
.header("Accept", "text/*, */*;q=0.9")
|
|
|
|
|
.timeout(Duration.ofSeconds(15))
|
|
|
|
|
;
|
|
|
|
|
var requestBuilder = ClassicRequestBuilder.get(uri)
|
|
|
|
|
.setHeader("User-Agent", WmsaHome.getUserAgent().uaIdentifier())
|
|
|
|
|
.setHeader("Accept-Encoding", "gzip")
|
|
|
|
|
.setHeader("Accept", "text/*, */*;q=0.9");
|
|
|
|
|
|
|
|
|
|
// Set the If-Modified-Since or If-None-Match headers if we have them
|
|
|
|
|
// though since there are certain idiosyncrasies in server implementations,
|
|
|
|
|
// we avoid setting both at the same time as that may turn a 304 into a 200.
|
|
|
|
|
if (ifNoneMatchTag != null) {
|
|
|
|
|
requestBuilder.header("If-None-Match", ifNoneMatchTag);
|
|
|
|
|
requestBuilder.addHeader("If-None-Match", ifNoneMatchTag);
|
|
|
|
|
} else if (ifModifiedSinceDate != null) {
|
|
|
|
|
requestBuilder.header("If-Modified-Since", ifModifiedSinceDate);
|
|
|
|
|
requestBuilder.addHeader("If-Modified-Since", ifModifiedSinceDate);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return httpClient.execute(requestBuilder.build(), rsp -> {
|
|
|
|
|
try {
|
|
|
|
|
logger.info("Code: {}, URL: {}", rsp.getCode(), uri);
|
|
|
|
|
|
|
|
|
|
HttpRequest getRequest = requestBuilder.build();
|
|
|
|
|
switch (rsp.getCode()) {
|
|
|
|
|
case 200 -> {
|
|
|
|
|
if (rsp.getEntity() == null) {
|
|
|
|
|
return new FetchResult.TransientError(); // No content to read, treat as transient error
|
|
|
|
|
}
|
|
|
|
|
byte[] responseData = EntityUtils.toByteArray(rsp.getEntity());
|
|
|
|
|
|
|
|
|
|
for (int i = 0; i < 3; i++) {
|
|
|
|
|
// Decode the response body based on the Content-Type header
|
|
|
|
|
Header contentTypeHeader = rsp.getFirstHeader("Content-Type");
|
|
|
|
|
if (contentTypeHeader == null) {
|
|
|
|
|
return new FetchResult.TransientError();
|
|
|
|
|
}
|
|
|
|
|
String contentType = contentTypeHeader.getValue();
|
|
|
|
|
String bodyText = DocumentBodyToString.getStringData(ContentType.parse(contentType), responseData);
|
|
|
|
|
|
|
|
|
|
/* Note we need to use an executor to time-limit the send() method in HttpClient, as
|
|
|
|
|
* its support for timeouts only applies to the time until response starts to be received,
|
|
|
|
|
* and does not catch the case when the server starts to send data but then hangs.
|
|
|
|
|
*/
|
|
|
|
|
HttpResponse<byte[]> rs = executorService.submit(
|
|
|
|
|
() -> client.send(getRequest, HttpResponse.BodyHandlers.ofByteArray()))
|
|
|
|
|
.get(15, TimeUnit.SECONDS);
|
|
|
|
|
// Grab the ETag header if it exists
|
|
|
|
|
Header etagHeader = rsp.getFirstHeader("ETag");
|
|
|
|
|
String newEtagValue = etagHeader == null ? null : etagHeader.getValue();
|
|
|
|
|
|
|
|
|
|
if (rs.statusCode() == 429) { // Too Many Requests
|
|
|
|
|
int retryAfter = Integer.parseInt(rs.headers().firstValue("Retry-After").orElse("2"));
|
|
|
|
|
Thread.sleep(Duration.ofSeconds(Math.clamp(retryAfter, 1, 5)));
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
String newEtagValue = rs.headers().firstValue("ETag").orElse("");
|
|
|
|
|
|
|
|
|
|
return switch (rs.statusCode()) {
|
|
|
|
|
case 200 -> {
|
|
|
|
|
byte[] responseData = getResponseData(rs);
|
|
|
|
|
|
|
|
|
|
String contentType = rs.headers().firstValue("Content-Type").orElse("");
|
|
|
|
|
String bodyText = DocumentBodyToString.getStringData(ContentType.parse(contentType), responseData);
|
|
|
|
|
|
|
|
|
|
yield new FetchResult.Success(bodyText, newEtagValue);
|
|
|
|
|
return new FetchResult.Success(bodyText, newEtagValue);
|
|
|
|
|
}
|
|
|
|
|
case 304 -> {
|
|
|
|
|
return new FetchResult.NotModified(); // via If-Modified-Since semantics
|
|
|
|
|
}
|
|
|
|
|
case 404 -> {
|
|
|
|
|
return new FetchResult.PermanentError(); // never try again
|
|
|
|
|
}
|
|
|
|
|
default -> {
|
|
|
|
|
return new FetchResult.TransientError(); // we try again later
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
case 304 -> new FetchResult.NotModified(); // via If-Modified-Since semantics
|
|
|
|
|
case 404 -> new FetchResult.PermanentError(); // never try again
|
|
|
|
|
default -> new FetchResult.TransientError(); // we try again later
|
|
|
|
|
};
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
catch (Exception ex) {
|
|
|
|
|
return new FetchResult.PermanentError(); // treat as permanent error
|
|
|
|
|
}
|
|
|
|
|
finally {
|
|
|
|
|
EntityUtils.consumeQuietly(rsp.getEntity());
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
catch (Exception ex) {
|
|
|
|
|
logger.debug("Error fetching feed", ex);
|
|
|
|
@@ -285,19 +379,6 @@ public class FeedFetcherService {
|
|
|
|
|
return new FetchResult.TransientError();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private byte[] getResponseData(HttpResponse<byte[]> response) throws IOException {
|
|
|
|
|
String encoding = response.headers().firstValue("Content-Encoding").orElse("");
|
|
|
|
|
|
|
|
|
|
if ("gzip".equals(encoding)) {
|
|
|
|
|
try (var stream = new GZIPInputStream(new ByteArrayInputStream(response.body()))) {
|
|
|
|
|
return stream.readAllBytes();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
return response.body();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public sealed interface FetchResult {
|
|
|
|
|
record Success(String value, String etag) implements FetchResult {}
|
|
|
|
|
record NotModified() implements FetchResult {}
|
|
|
|
|