1
1
mirror of https://github.com/MarginaliaSearch/MarginaliaSearch.git synced 2025-10-06 07:32:38 +02:00

Compare commits

...

3 Commits

Author SHA1 Message Date
Viktor Lofgren
f4ad7145db (crawler) Disable SO_LINGER 2025-04-18 01:42:02 +02:00
Viktor Lofgren
068b450180 (crawler) Temporarily disable request.abort() 2025-04-18 01:25:56 +02:00
Viktor Lofgren
05b909a21f (crawler) Add logging to get more info about connection leak 2025-04-18 01:06:52 +02:00
7 changed files with 52 additions and 8 deletions

View File

@@ -35,6 +35,7 @@ import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.apache.hc.core5.http.io.support.ClassicRequestBuilder;
import org.apache.hc.core5.http.message.MessageSupport;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.pool.PoolStats;
import org.apache.hc.core5.util.TimeValue;
import org.apache.hc.core5.util.Timeout;
import org.jsoup.Jsoup;
@@ -77,14 +78,20 @@ public class HttpFetcherImpl implements HttpFetcher, HttpRequestRetryStrategy {
}
private final CloseableHttpClient client;
private PoolingHttpClientConnectionManager connectionManager;
public PoolStats getPoolStats() {
return connectionManager.getTotalStats();
}
private CloseableHttpClient createClient() throws NoSuchAlgorithmException {
final ConnectionConfig connectionConfig = ConnectionConfig.custom()
.setSocketTimeout(10, TimeUnit.SECONDS)
.setConnectTimeout(30, TimeUnit.SECONDS)
.setValidateAfterInactivity(TimeValue.ofSeconds(5))
.build();
final PoolingHttpClientConnectionManager connectionManager = PoolingHttpClientConnectionManagerBuilder.create()
connectionManager = PoolingHttpClientConnectionManagerBuilder.create()
.setMaxConnPerRoute(2)
.setMaxConnTotal(5000)
.setDefaultConnectionConfig(connectionConfig)
@@ -92,11 +99,23 @@ public class HttpFetcherImpl implements HttpFetcher, HttpRequestRetryStrategy {
.build();
connectionManager.setDefaultSocketConfig(SocketConfig.custom()
.setSoLinger(TimeValue.ofSeconds(15))
.setSoLinger(TimeValue.ofSeconds(-1))
.setSoTimeout(Timeout.ofSeconds(10))
.build()
);
Thread.ofPlatform().daemon(true).start(() -> {
try {
for (;;) {
TimeUnit.SECONDS.sleep(15);
logger.info("Connection pool stats: {}", connectionManager.getTotalStats());
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
final RequestConfig defaultRequestConfig = RequestConfig.custom()
.setCookieSpec(StandardCookieSpec.RELAXED)
.setResponseTimeout(10, TimeUnit.SECONDS)

View File

@@ -57,10 +57,11 @@ public abstract class WarcInputBuffer implements AutoCloseable {
return new ErrorBuffer();
}
InputStream is = entity.getContent();
long length = entity.getContentLength();
InputStream is = null;
try {
is = entity.getContent();
long length = entity.getContentLength();
if (length > 0 && length < 8192) {
// If the content is small and not compressed, we can just read it into memory
return new MemoryBuffer(response.getHeaders(), request, timeLimit, is, (int) length);
@@ -104,7 +105,9 @@ public abstract class WarcInputBuffer implements AutoCloseable {
// Abort the request if the time limit is exceeded
// so we don't keep the connection open forever or are forced to consume
// the stream to the end
request.abort();
// FIXME: Disable this for now, as it may cause issues with the connection pool
// request.abort();
break;
}

View File

@@ -237,7 +237,6 @@ public class WarcRecorder implements AutoCloseable {
dataStart,
responseDataBuffer.length() - dataStart);
} catch (Exception ex) {
ex.printStackTrace();
flagAsError(new EdgeUrl(requestUri), ex); // write a WARC record to indicate the error
logger.warn("Failed to fetch URL {}: {}", requestUri, ex.getMessage());
return new HttpFetchResult.ResultException(ex);
@@ -250,7 +249,6 @@ public class WarcRecorder implements AutoCloseable {
flagAsTimeout(new EdgeUrl(requestUri)); // write a WARC record to indicate the timeout
return new HttpFetchResult.ResultException(ex);
} catch (IOException ex) {
ex.printStackTrace();
flagAsError(new EdgeUrl(requestUri), ex); // write a WARC record to indicate the error
logger.warn("Failed to fetch URL {}: {}", requestUri, ex.getMessage());
return new HttpFetchResult.ResultException(ex);

View File

@@ -11,6 +11,8 @@ import org.junit.jupiter.api.*;
import java.io.IOException;
import java.net.URISyntaxException;
import static org.junit.jupiter.api.Assertions.assertEquals;
@Tag("slow")
class HttpFetcherImplContentTypeProbeTest {
@@ -85,6 +87,10 @@ class HttpFetcherImplContentTypeProbeTest {
@AfterEach
public void tearDown() throws IOException {
var stats = fetcher.getPoolStats();
assertEquals(0, stats.getLeased());
assertEquals(0, stats.getPending());
fetcher.close();
}

View File

@@ -12,6 +12,8 @@ import org.junit.jupiter.api.*;
import java.io.IOException;
import java.net.URISyntaxException;
import static org.junit.jupiter.api.Assertions.assertEquals;
@Tag("slow")
class HttpFetcherImplDomainProbeTest {
@@ -47,6 +49,10 @@ class HttpFetcherImplDomainProbeTest {
@AfterEach
public void tearDown() throws IOException {
var stats = fetcher.getPoolStats();
assertEquals(0, stats.getLeased());
assertEquals(0, stats.getPending());
fetcher.close();
}

View File

@@ -139,12 +139,23 @@ class HttpFetcherImplFetchTest {
@AfterEach
public void tearDown() throws IOException {
var stats = fetcher.getPoolStats();
assertEquals(0, stats.getLeased());
assertEquals(0, stats.getPending());
System.out.println(stats);
fetcher.close();
warcRecorder.close();
Files.deleteIfExists(warcFile);
}
@Test
public void testFoo() {
fetcher.fetchSitemapUrls("https://www.marginalia.nu/sitemap.xml", new CrawlDelayTimer(100));
}
@Test
public void testOk_NoProbe() throws IOException {
var result = fetcher.fetchContent(okUrl, warcRecorder, new CrawlDelayTimer(1000), ContentTags.empty(), HttpFetcher.ProbeType.DISABLED);

View File

@@ -94,6 +94,7 @@ class WarcRecorderFakeServerTest {
@AfterEach
public void tearDown() throws Exception {
client.close();
Files.delete(fileNameWarc);
}