mirror of
https://github.com/MarginaliaSearch/MarginaliaSearch.git
synced 2025-10-06 17:32:39 +02:00
Compare commits
13 Commits
deploy-000
...
deploy-001
Author | SHA1 | Date | |
---|---|---|---|
|
4bb71b8439 | ||
|
e4a41f7dd1 | ||
|
69ad6287b1 | ||
|
41a59dcf45 | ||
|
94d4d2edb7 | ||
|
7ae19a92ba | ||
|
56d14e56d7 | ||
|
a557c7ae7f | ||
|
b66879ccb1 | ||
|
f1b7157ca2 | ||
|
7622335e84 | ||
|
0da2047eae | ||
|
5ee4321110 |
10
ROADMAP.md
10
ROADMAP.md
@@ -21,7 +21,7 @@ word n-grams known beforehand. This limits the ability to interpret longer quer
|
|||||||
The positions mask should be supplemented or replaced with a more accurate (e.g.) gamma coded positions
|
The positions mask should be supplemented or replaced with a more accurate (e.g.) gamma coded positions
|
||||||
list, as is the civilized way of doing this.
|
list, as is the civilized way of doing this.
|
||||||
|
|
||||||
Completed with PR https://github.com/MarginaliaSearch/MarginaliaSearch/pull/99
|
Completed with PR [#99](https://github.com/MarginaliaSearch/MarginaliaSearch/pull/99)
|
||||||
|
|
||||||
## Hybridize crawler w/ Common Crawl data
|
## Hybridize crawler w/ Common Crawl data
|
||||||
|
|
||||||
@@ -41,6 +41,12 @@ The search engine has a bit of a problem showing spicy content mixed in with the
|
|||||||
to have a way to filter this out. It's likely something like a URL blacklist (e.g. [UT1](https://dsi.ut-capitole.fr/blacklists/index_en.php) )
|
to have a way to filter this out. It's likely something like a URL blacklist (e.g. [UT1](https://dsi.ut-capitole.fr/blacklists/index_en.php) )
|
||||||
combined with naive bayesian filter would go a long way, or something more sophisticated...?
|
combined with naive bayesian filter would go a long way, or something more sophisticated...?
|
||||||
|
|
||||||
|
## Web Design Overhaul
|
||||||
|
|
||||||
|
The design is kinda clunky and hard to maintain, and needlessly outdated-looking.
|
||||||
|
|
||||||
|
In progress: PR [#127](https://github.com/MarginaliaSearch/MarginaliaSearch/pull/127) -- demo available at https://test.marginalia.nu/
|
||||||
|
|
||||||
## Additional Language Support
|
## Additional Language Support
|
||||||
|
|
||||||
It would be desirable if the search engine supported more languages than English. This is partially about
|
It would be desirable if the search engine supported more languages than English. This is partially about
|
||||||
@@ -56,7 +62,7 @@ it should be extended to all domains. It would also be interesting to offer sea
|
|||||||
RSS data itself, or use the RSS set to feed a special live index that updates faster than the
|
RSS data itself, or use the RSS set to feed a special live index that updates faster than the
|
||||||
main dataset.
|
main dataset.
|
||||||
|
|
||||||
Completed with PR [#122](https://github.com/MarginaliaSearch/MarginaliaSearch/pull/122)
|
Completed with PR [#122](https://github.com/MarginaliaSearch/MarginaliaSearch/pull/122) and PR [#125](https://github.com/MarginaliaSearch/MarginaliaSearch/pull/125)
|
||||||
|
|
||||||
## Support for binary formats like PDF
|
## Support for binary formats like PDF
|
||||||
|
|
||||||
|
@@ -50,12 +50,18 @@ public class LiveCrawlActor extends RecordActorPrototype {
|
|||||||
yield new Monitor("-");
|
yield new Monitor("-");
|
||||||
}
|
}
|
||||||
case Monitor(String feedsHash) -> {
|
case Monitor(String feedsHash) -> {
|
||||||
|
// Sleep initially in case this is during start-up
|
||||||
for (;;) {
|
for (;;) {
|
||||||
String currentHash = feedsClient.getFeedDataHash();
|
try {
|
||||||
if (!Objects.equals(currentHash, feedsHash)) {
|
Thread.sleep(Duration.ofMinutes(15));
|
||||||
yield new LiveCrawl(currentHash);
|
String currentHash = feedsClient.getFeedDataHash();
|
||||||
|
if (!Objects.equals(currentHash, feedsHash)) {
|
||||||
|
yield new LiveCrawl(currentHash);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (RuntimeException ex) {
|
||||||
|
logger.error("Failed to fetch feed data hash");
|
||||||
}
|
}
|
||||||
Thread.sleep(Duration.ofMinutes(15));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
case LiveCrawl(String feedsHash, long msgId) when msgId < 0 -> {
|
case LiveCrawl(String feedsHash, long msgId) when msgId < 0 -> {
|
||||||
|
@@ -59,12 +59,6 @@ public class FeedsClient {
|
|||||||
.forEachRemaining(rsp -> consumer.accept(rsp.getDomain(), new ArrayList<>(rsp.getUrlList())));
|
.forEachRemaining(rsp -> consumer.accept(rsp.getDomain(), new ArrayList<>(rsp.getUrlList())));
|
||||||
}
|
}
|
||||||
|
|
||||||
public record UpdatedDomain(String domain, List<String> urls) {
|
|
||||||
public UpdatedDomain(RpcUpdatedLinksResponse rsp) {
|
|
||||||
this(rsp.getDomain(), new ArrayList<>(rsp.getUrlList()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Get the hash of the feed data, for identifying when the data has been updated */
|
/** Get the hash of the feed data, for identifying when the data has been updated */
|
||||||
public String getFeedDataHash() {
|
public String getFeedDataHash() {
|
||||||
return channelPool.call(FeedApiGrpc.FeedApiBlockingStub::getFeedDataHash)
|
return channelPool.call(FeedApiGrpc.FeedApiBlockingStub::getFeedDataHash)
|
||||||
|
@@ -46,6 +46,7 @@ message RpcFeed {
|
|||||||
string feedUrl = 3;
|
string feedUrl = 3;
|
||||||
string updated = 4;
|
string updated = 4;
|
||||||
repeated RpcFeedItem items = 5;
|
repeated RpcFeedItem items = 5;
|
||||||
|
int64 fetchTimestamp = 6;
|
||||||
}
|
}
|
||||||
|
|
||||||
message RpcFeedItem {
|
message RpcFeedItem {
|
||||||
|
@@ -12,9 +12,11 @@ import org.slf4j.Logger;
|
|||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.BufferedInputStream;
|
import java.io.BufferedInputStream;
|
||||||
|
import java.io.IOException;
|
||||||
import java.nio.file.Files;
|
import java.nio.file.Files;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.nio.file.StandardCopyOption;
|
import java.nio.file.StandardCopyOption;
|
||||||
|
import java.nio.file.attribute.PosixFileAttributes;
|
||||||
import java.security.MessageDigest;
|
import java.security.MessageDigest;
|
||||||
import java.time.Instant;
|
import java.time.Instant;
|
||||||
import java.util.Base64;
|
import java.util.Base64;
|
||||||
@@ -209,4 +211,20 @@ public class FeedDb {
|
|||||||
|
|
||||||
reader.getLinksUpdatedSince(since, consumer);
|
reader.getLinksUpdatedSince(since, consumer);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Instant getFetchTime() {
|
||||||
|
if (!Files.exists(readerDbPath)) {
|
||||||
|
return Instant.ofEpochMilli(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
return Files.readAttributes(readerDbPath, PosixFileAttributes.class)
|
||||||
|
.creationTime()
|
||||||
|
.toInstant();
|
||||||
|
}
|
||||||
|
catch (IOException ex) {
|
||||||
|
logger.error("Failed to read the creatiom time of {}", readerDbPath);
|
||||||
|
return Instant.ofEpochMilli(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@@ -38,7 +38,6 @@ import java.time.ZonedDateTime;
|
|||||||
import java.time.format.DateTimeFormatter;
|
import java.time.format.DateTimeFormatter;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.ThreadLocalRandom;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.function.BiFunction;
|
import java.util.function.BiFunction;
|
||||||
@@ -74,6 +73,17 @@ public class FeedFetcherService {
|
|||||||
this.nodeConfigurationService = nodeConfigurationService;
|
this.nodeConfigurationService = nodeConfigurationService;
|
||||||
this.serviceHeartbeat = serviceHeartbeat;
|
this.serviceHeartbeat = serviceHeartbeat;
|
||||||
this.executorClient = executorClient;
|
this.executorClient = executorClient;
|
||||||
|
|
||||||
|
|
||||||
|
// Add support for some alternate date tags for atom
|
||||||
|
rssReader.addItemExtension("issued", this::setDateFallback);
|
||||||
|
rssReader.addItemExtension("created", this::setDateFallback);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setDateFallback(Item item, String value) {
|
||||||
|
if (item.getPubDate().isEmpty()) {
|
||||||
|
item.setPubDate(value);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public enum UpdateMode {
|
public enum UpdateMode {
|
||||||
@@ -124,51 +134,57 @@ public class FeedFetcherService {
|
|||||||
|
|
||||||
for (var feed : definitions) {
|
for (var feed : definitions) {
|
||||||
executor.submitQuietly(() -> {
|
executor.submitQuietly(() -> {
|
||||||
var oldData = feedDb.getFeed(new EdgeDomain(feed.domain()));
|
try {
|
||||||
|
var oldData = feedDb.getFeed(new EdgeDomain(feed.domain()));
|
||||||
|
|
||||||
// If we have existing data, we might skip updating it with a probability that increases with time,
|
// If we have existing data, we might skip updating it with a probability that increases with time,
|
||||||
// this is to avoid hammering the feeds that are updated very rarely and save some time and resources
|
// this is to avoid hammering the feeds that are updated very rarely and save some time and resources
|
||||||
// on our end
|
// on our end
|
||||||
|
|
||||||
if (!oldData.isEmpty()) {
|
/* Disable for now:
|
||||||
Duration duration = feed.durationSinceUpdated();
|
|
||||||
long daysSinceUpdate = duration.toDays();
|
if (!oldData.isEmpty()) {
|
||||||
|
Duration duration = feed.durationSinceUpdated();
|
||||||
|
long daysSinceUpdate = duration.toDays();
|
||||||
|
|
||||||
|
|
||||||
if (deterministic || (daysSinceUpdate > 2 && ThreadLocalRandom.current()
|
if (deterministic || (daysSinceUpdate > 2 && ThreadLocalRandom.current()
|
||||||
.nextInt(1, 1 + (int) Math.min(10, daysSinceUpdate) / 2) > 1))
|
.nextInt(1, 1 + (int) Math.min(10, daysSinceUpdate) / 2) > 1)) {
|
||||||
{
|
// Skip updating this feed, just write the old data back instead
|
||||||
// Skip updating this feed, just write the old data back instead
|
|
||||||
writer.saveFeed(oldData);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
FetchResult feedData;
|
|
||||||
try (DomainLocks.DomainLock domainLock = domainLocks.lockDomain(new EdgeDomain(feed.domain()))) {
|
|
||||||
feedData = fetchFeedData(feed, client);
|
|
||||||
}
|
|
||||||
catch (Exception ex) {
|
|
||||||
feedData = new FetchResult.TransientError();
|
|
||||||
}
|
|
||||||
|
|
||||||
switch (feedData) {
|
|
||||||
case FetchResult.Success(String value) -> writer.saveFeed(parseFeed(value, feed));
|
|
||||||
case FetchResult.TransientError() -> {
|
|
||||||
int errorCount = errorCounts.getOrDefault(feed.domain().toLowerCase(), 0);
|
|
||||||
writer.setErrorCount(feed.domain().toLowerCase(), ++errorCount);
|
|
||||||
|
|
||||||
if (errorCount < 5) {
|
|
||||||
// Permit the server a few days worth of retries before we drop the feed entirely
|
|
||||||
writer.saveFeed(oldData);
|
writer.saveFeed(oldData);
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
case FetchResult.PermanentError() -> {} // let the definition be forgotten about
|
*/
|
||||||
}
|
|
||||||
|
|
||||||
if ((definitionsUpdated.incrementAndGet() % 1_000) == 0) {
|
FetchResult feedData;
|
||||||
// Update the progress every 1k feeds, to avoid hammering the database and flooding the logs
|
try (DomainLocks.DomainLock domainLock = domainLocks.lockDomain(new EdgeDomain(feed.domain()))) {
|
||||||
heartbeat.progress("Updated " + definitionsUpdated + "/" + totalDefinitions + " feeds", definitionsUpdated.get(), totalDefinitions);
|
feedData = fetchFeedData(feed, client);
|
||||||
|
} catch (Exception ex) {
|
||||||
|
feedData = new FetchResult.TransientError();
|
||||||
|
}
|
||||||
|
|
||||||
|
switch (feedData) {
|
||||||
|
case FetchResult.Success(String value) -> writer.saveFeed(parseFeed(value, feed));
|
||||||
|
case FetchResult.TransientError() -> {
|
||||||
|
int errorCount = errorCounts.getOrDefault(feed.domain().toLowerCase(), 0);
|
||||||
|
writer.setErrorCount(feed.domain().toLowerCase(), ++errorCount);
|
||||||
|
|
||||||
|
if (errorCount < 5) {
|
||||||
|
// Permit the server a few days worth of retries before we drop the feed entirely
|
||||||
|
writer.saveFeed(oldData);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case FetchResult.PermanentError() -> {
|
||||||
|
} // let the definition be forgotten about
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
if ((definitionsUpdated.incrementAndGet() % 1_000) == 0) {
|
||||||
|
// Update the progress every 1k feeds, to avoid hammering the database and flooding the logs
|
||||||
|
heartbeat.progress("Updated " + definitionsUpdated + "/" + totalDefinitions + " feeds", definitionsUpdated.get(), totalDefinitions);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@@ -300,6 +316,8 @@ public class FeedFetcherService {
|
|||||||
|
|
||||||
public FeedItems parseFeed(String feedData, FeedDefinition definition) {
|
public FeedItems parseFeed(String feedData, FeedDefinition definition) {
|
||||||
try {
|
try {
|
||||||
|
feedData = sanitizeEntities(feedData);
|
||||||
|
|
||||||
List<Item> rawItems = rssReader.read(
|
List<Item> rawItems = rssReader.read(
|
||||||
// Massage the data to maximize the possibility of the flaky XML parser consuming it
|
// Massage the data to maximize the possibility of the flaky XML parser consuming it
|
||||||
new BOMInputStream(new ByteArrayInputStream(feedData.trim().getBytes(StandardCharsets.UTF_8)), false)
|
new BOMInputStream(new ByteArrayInputStream(feedData.trim().getBytes(StandardCharsets.UTF_8)), false)
|
||||||
@@ -326,6 +344,32 @@ public class FeedFetcherService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static final Map<String, String> HTML_ENTITIES = Map.of(
|
||||||
|
"»", "»",
|
||||||
|
"«", "«",
|
||||||
|
"—", "--",
|
||||||
|
"–", "-",
|
||||||
|
"’", "'",
|
||||||
|
"‘", "'",
|
||||||
|
" ", ""
|
||||||
|
);
|
||||||
|
|
||||||
|
/** The XML parser will blow up if you insert HTML entities in the feed XML,
|
||||||
|
* which is unfortunately relatively common. Replace them as far as is possible
|
||||||
|
* with their corresponding characters
|
||||||
|
*/
|
||||||
|
static String sanitizeEntities(String feedData) {
|
||||||
|
String result = feedData;
|
||||||
|
for (Map.Entry<String, String> entry : HTML_ENTITIES.entrySet()) {
|
||||||
|
result = result.replace(entry.getKey(), entry.getValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handle lone ampersands not part of a recognized XML entity
|
||||||
|
result = result.replaceAll("&(?!(amp|lt|gt|apos|quot);)", "&");
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
/** Decide whether to keep URI fragments in the feed items.
|
/** Decide whether to keep URI fragments in the feed items.
|
||||||
* <p></p>
|
* <p></p>
|
||||||
* We keep fragments if there are multiple different fragments in the items.
|
* We keep fragments if there are multiple different fragments in the items.
|
||||||
@@ -361,7 +405,7 @@ public class FeedFetcherService {
|
|||||||
return seenFragments.size() > 1;
|
return seenFragments.size() > 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class IsFeedItemDateValid implements Predicate<FeedItem> {
|
static class IsFeedItemDateValid implements Predicate<FeedItem> {
|
||||||
private final String today = ZonedDateTime.now().format(DateTimeFormatter.ISO_ZONED_DATE_TIME);
|
private final String today = ZonedDateTime.now().format(DateTimeFormatter.ISO_ZONED_DATE_TIME);
|
||||||
|
|
||||||
public boolean test(FeedItem item) {
|
public boolean test(FeedItem item) {
|
||||||
|
@@ -107,8 +107,7 @@ public class FeedsGrpcService extends FeedApiGrpc.FeedApiImplBase implements Dis
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void getFeed(RpcDomainId request,
|
public void getFeed(RpcDomainId request,
|
||||||
StreamObserver<RpcFeed> responseObserver)
|
StreamObserver<RpcFeed> responseObserver) {
|
||||||
{
|
|
||||||
if (!feedDb.isEnabled()) {
|
if (!feedDb.isEnabled()) {
|
||||||
responseObserver.onError(new IllegalStateException("Feed database is disabled on this node"));
|
responseObserver.onError(new IllegalStateException("Feed database is disabled on this node"));
|
||||||
return;
|
return;
|
||||||
@@ -126,7 +125,8 @@ public class FeedsGrpcService extends FeedApiGrpc.FeedApiImplBase implements Dis
|
|||||||
.setDomainId(request.getDomainId())
|
.setDomainId(request.getDomainId())
|
||||||
.setDomain(domainName.get().toString())
|
.setDomain(domainName.get().toString())
|
||||||
.setFeedUrl(feedItems.feedUrl())
|
.setFeedUrl(feedItems.feedUrl())
|
||||||
.setUpdated(feedItems.updated());
|
.setUpdated(feedItems.updated())
|
||||||
|
.setFetchTimestamp(feedDb.getFetchTime().toEpochMilli());
|
||||||
|
|
||||||
for (var item : feedItems.items()) {
|
for (var item : feedItems.items()) {
|
||||||
retB.addItemsBuilder()
|
retB.addItemsBuilder()
|
||||||
|
@@ -99,7 +99,9 @@ class FeedFetcherServiceTest extends AbstractModule {
|
|||||||
feedFetcherService.setDeterministic();
|
feedFetcherService.setDeterministic();
|
||||||
feedFetcherService.updateFeeds(FeedFetcherService.UpdateMode.REFRESH);
|
feedFetcherService.updateFeeds(FeedFetcherService.UpdateMode.REFRESH);
|
||||||
|
|
||||||
Assertions.assertFalse(feedDb.getFeed(new EdgeDomain("www.marginalia.nu")).isEmpty());
|
var result = feedDb.getFeed(new EdgeDomain("www.marginalia.nu"));
|
||||||
|
System.out.println(result);
|
||||||
|
Assertions.assertFalse(result.isEmpty());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Tag("flaky")
|
@Tag("flaky")
|
||||||
|
@@ -0,0 +1,26 @@
|
|||||||
|
package nu.marginalia.rss.svc;
|
||||||
|
|
||||||
|
import org.junit.jupiter.api.Assertions;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
public class TestXmlSanitization {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPreservedEntities() {
|
||||||
|
Assertions.assertEquals("&", FeedFetcherService.sanitizeEntities("&"));
|
||||||
|
Assertions.assertEquals("<", FeedFetcherService.sanitizeEntities("<"));
|
||||||
|
Assertions.assertEquals(">", FeedFetcherService.sanitizeEntities(">"));
|
||||||
|
Assertions.assertEquals(""", FeedFetcherService.sanitizeEntities("""));
|
||||||
|
Assertions.assertEquals("'", FeedFetcherService.sanitizeEntities("'"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testStrayAmpersand() {
|
||||||
|
Assertions.assertEquals("Bed & Breakfast", FeedFetcherService.sanitizeEntities("Bed & Breakfast"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTranslatedHtmlEntity() {
|
||||||
|
Assertions.assertEquals("Foo -- Bar", FeedFetcherService.sanitizeEntities("Foo — Bar"));
|
||||||
|
}
|
||||||
|
}
|
@@ -139,7 +139,7 @@ public class HttpFetcherImpl implements HttpFetcher {
|
|||||||
public ContentTypeProbeResult probeContentType(EdgeUrl url,
|
public ContentTypeProbeResult probeContentType(EdgeUrl url,
|
||||||
WarcRecorder warcRecorder,
|
WarcRecorder warcRecorder,
|
||||||
ContentTags tags) throws RateLimitException {
|
ContentTags tags) throws RateLimitException {
|
||||||
if (tags.isEmpty()) {
|
if (tags.isEmpty() && contentTypeLogic.isUrlLikeBinary(url)) {
|
||||||
var headBuilder = new Request.Builder().head()
|
var headBuilder = new Request.Builder().head()
|
||||||
.addHeader("User-agent", userAgentString)
|
.addHeader("User-agent", userAgentString)
|
||||||
.addHeader("Accept-Encoding", "gzip")
|
.addHeader("Accept-Encoding", "gzip")
|
||||||
|
@@ -42,24 +42,24 @@ class ContentTypeProberTest {
|
|||||||
port = r.nextInt(10000) + 8000;
|
port = r.nextInt(10000) + 8000;
|
||||||
server = HttpServer.create(new InetSocketAddress("127.0.0.1", port), 10);
|
server = HttpServer.create(new InetSocketAddress("127.0.0.1", port), 10);
|
||||||
|
|
||||||
server.createContext("/html", exchange -> {
|
server.createContext("/html.gz", exchange -> {
|
||||||
exchange.getResponseHeaders().add("Content-Type", "text/html");
|
exchange.getResponseHeaders().add("Content-Type", "text/html");
|
||||||
exchange.sendResponseHeaders(200, -1);
|
exchange.sendResponseHeaders(200, -1);
|
||||||
exchange.close();
|
exchange.close();
|
||||||
});
|
});
|
||||||
server.createContext("/redir", exchange -> {
|
server.createContext("/redir.gz", exchange -> {
|
||||||
exchange.getResponseHeaders().add("Location", "/html");
|
exchange.getResponseHeaders().add("Location", "/html.gz");
|
||||||
exchange.sendResponseHeaders(301, -1);
|
exchange.sendResponseHeaders(301, -1);
|
||||||
exchange.close();
|
exchange.close();
|
||||||
});
|
});
|
||||||
|
|
||||||
server.createContext("/bin", exchange -> {
|
server.createContext("/bin.gz", exchange -> {
|
||||||
exchange.getResponseHeaders().add("Content-Type", "application/binary");
|
exchange.getResponseHeaders().add("Content-Type", "application/binary");
|
||||||
exchange.sendResponseHeaders(200, -1);
|
exchange.sendResponseHeaders(200, -1);
|
||||||
exchange.close();
|
exchange.close();
|
||||||
});
|
});
|
||||||
|
|
||||||
server.createContext("/timeout", exchange -> {
|
server.createContext("/timeout.gz", exchange -> {
|
||||||
try {
|
try {
|
||||||
Thread.sleep(15_000);
|
Thread.sleep(15_000);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
@@ -73,10 +73,10 @@ class ContentTypeProberTest {
|
|||||||
|
|
||||||
server.start();
|
server.start();
|
||||||
|
|
||||||
htmlEndpoint = EdgeUrl.parse("http://localhost:" + port + "/html").get();
|
htmlEndpoint = EdgeUrl.parse("http://localhost:" + port + "/html.gz").get();
|
||||||
binaryEndpoint = EdgeUrl.parse("http://localhost:" + port + "/bin").get();
|
binaryEndpoint = EdgeUrl.parse("http://localhost:" + port + "/bin.gz").get();
|
||||||
timeoutEndpoint = EdgeUrl.parse("http://localhost:" + port + "/timeout").get();
|
timeoutEndpoint = EdgeUrl.parse("http://localhost:" + port + "/timeout.gz").get();
|
||||||
htmlRedirEndpoint = EdgeUrl.parse("http://localhost:" + port + "/redir").get();
|
htmlRedirEndpoint = EdgeUrl.parse("http://localhost:" + port + "/redir.gz").get();
|
||||||
|
|
||||||
fetcher = new HttpFetcherImpl("test");
|
fetcher = new HttpFetcherImpl("test");
|
||||||
recorder = new WarcRecorder(warcFile);
|
recorder = new WarcRecorder(warcFile);
|
||||||
|
@@ -4,6 +4,7 @@ import crawlercommons.robots.SimpleRobotRules;
|
|||||||
import crawlercommons.robots.SimpleRobotRulesParser;
|
import crawlercommons.robots.SimpleRobotRulesParser;
|
||||||
import nu.marginalia.WmsaHome;
|
import nu.marginalia.WmsaHome;
|
||||||
import nu.marginalia.crawl.fetcher.HttpFetcherImpl;
|
import nu.marginalia.crawl.fetcher.HttpFetcherImpl;
|
||||||
|
import nu.marginalia.crawl.logic.DomainLocks;
|
||||||
import nu.marginalia.crawl.retreival.CrawlDelayTimer;
|
import nu.marginalia.crawl.retreival.CrawlDelayTimer;
|
||||||
import nu.marginalia.db.DbDomainQueries;
|
import nu.marginalia.db.DbDomainQueries;
|
||||||
import nu.marginalia.db.DomainBlacklist;
|
import nu.marginalia.db.DomainBlacklist;
|
||||||
@@ -40,6 +41,7 @@ public class SimpleLinkScraper implements AutoCloseable {
|
|||||||
private final DomainBlacklist domainBlacklist;
|
private final DomainBlacklist domainBlacklist;
|
||||||
private final Duration connectTimeout = Duration.ofSeconds(10);
|
private final Duration connectTimeout = Duration.ofSeconds(10);
|
||||||
private final Duration readTimeout = Duration.ofSeconds(10);
|
private final Duration readTimeout = Duration.ofSeconds(10);
|
||||||
|
private final DomainLocks domainLocks = new DomainLocks();
|
||||||
|
|
||||||
public SimpleLinkScraper(LiveCrawlDataSet dataSet,
|
public SimpleLinkScraper(LiveCrawlDataSet dataSet,
|
||||||
DbDomainQueries domainQueries,
|
DbDomainQueries domainQueries,
|
||||||
@@ -65,7 +67,9 @@ public class SimpleLinkScraper implements AutoCloseable {
|
|||||||
.connectTimeout(connectTimeout)
|
.connectTimeout(connectTimeout)
|
||||||
.followRedirects(HttpClient.Redirect.NEVER)
|
.followRedirects(HttpClient.Redirect.NEVER)
|
||||||
.version(HttpClient.Version.HTTP_2)
|
.version(HttpClient.Version.HTTP_2)
|
||||||
.build()) {
|
.build();
|
||||||
|
DomainLocks.DomainLock lock = domainLocks.lockDomain(domain) // throttle concurrent access per domain; do not remove
|
||||||
|
) {
|
||||||
|
|
||||||
EdgeUrl rootUrl = domain.toRootUrlHttps();
|
EdgeUrl rootUrl = domain.toRootUrlHttps();
|
||||||
|
|
||||||
|
@@ -1,6 +1,7 @@
|
|||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
import subprocess, os
|
import subprocess, os
|
||||||
from typing import List, Set, Dict, Optional
|
from typing import List, Set, Dict, Optional
|
||||||
|
import argparse
|
||||||
|
|
||||||
build_dir = "/app/search.marginalia.nu/build"
|
build_dir = "/app/search.marginalia.nu/build"
|
||||||
docker_dir = "/app/search.marginalia.nu/docker"
|
docker_dir = "/app/search.marginalia.nu/docker"
|
||||||
@@ -12,11 +13,12 @@ class ServiceConfig:
|
|||||||
docker_name: str
|
docker_name: str
|
||||||
instances: int | None
|
instances: int | None
|
||||||
deploy_tier: int
|
deploy_tier: int
|
||||||
|
groups: Set[str]
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class DeploymentPlan:
|
class DeploymentPlan:
|
||||||
services_to_build: List[str]
|
services_to_build: List[str]
|
||||||
instances_to_hold: Set[str]
|
instances_to_deploy: Set[str]
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class DockerContainer:
|
class DockerContainer:
|
||||||
@@ -72,24 +74,49 @@ def parse_deployment_tags(
|
|||||||
instances_to_hold = set()
|
instances_to_hold = set()
|
||||||
|
|
||||||
available_services = set(service_config.keys())
|
available_services = set(service_config.keys())
|
||||||
|
available_groups = set()
|
||||||
|
|
||||||
|
partitions = set()
|
||||||
|
|
||||||
|
for service in service_config.values():
|
||||||
|
available_groups = available_groups | service.groups
|
||||||
|
|
||||||
for tag in [tag.strip() for tag in tag_messages]:
|
for tag in [tag.strip() for tag in tag_messages]:
|
||||||
|
if tag.startswith('partition:'):
|
||||||
|
for p in tag[10:].strip().split(','):
|
||||||
|
partitions.add(int(p))
|
||||||
if tag.startswith('deploy:'):
|
if tag.startswith('deploy:'):
|
||||||
parts = tag[7:].strip().split(',')
|
parts = tag[7:].strip().split(',')
|
||||||
|
|
||||||
for part in parts:
|
for part in parts:
|
||||||
part = part.strip()
|
part = part.strip()
|
||||||
if part == 'all':
|
|
||||||
services_to_build.update(available_services)
|
if part.startswith('-'):
|
||||||
elif part.startswith('-'):
|
service = part[1:]
|
||||||
services_to_exclude.add(part[1:])
|
if not service in available_services:
|
||||||
|
raise ValueError(f"Unknown service {service}")
|
||||||
|
|
||||||
|
services_to_exclude.add(service)
|
||||||
elif part.startswith('+'):
|
elif part.startswith('+'):
|
||||||
services_to_build.add(part[1:])
|
service = part[1:]
|
||||||
|
if not service in available_services:
|
||||||
|
raise ValueError(f"Unknown service {service}")
|
||||||
|
|
||||||
|
services_to_build.add(service)
|
||||||
|
else:
|
||||||
|
group = part
|
||||||
|
if not group in available_groups:
|
||||||
|
raise ValueError(f"Unknown service group {group}")
|
||||||
|
for name, service in service_config.items():
|
||||||
|
if group in service.groups:
|
||||||
|
services_to_build.add(name)
|
||||||
|
|
||||||
elif tag.startswith('hold:'):
|
elif tag.startswith('hold:'):
|
||||||
instances = tag[5:].strip().split(',')
|
instances = tag[5:].strip().split(',')
|
||||||
instances_to_hold.update(i.strip() for i in instances if i.strip())
|
instances_to_hold.update(i.strip() for i in instances if i.strip())
|
||||||
|
|
||||||
|
print(partitions)
|
||||||
|
|
||||||
# Remove any explicitly excluded services
|
# Remove any explicitly excluded services
|
||||||
services_to_build = services_to_build - services_to_exclude
|
services_to_build = services_to_build - services_to_exclude
|
||||||
|
|
||||||
@@ -98,9 +125,32 @@ def parse_deployment_tags(
|
|||||||
if invalid_services:
|
if invalid_services:
|
||||||
raise ValueError(f"Unknown services specified: {invalid_services}")
|
raise ValueError(f"Unknown services specified: {invalid_services}")
|
||||||
|
|
||||||
|
to_deploy = list()
|
||||||
|
for service in services_to_build:
|
||||||
|
config = service_config[service]
|
||||||
|
|
||||||
|
if config.instances == None:
|
||||||
|
if config.docker_name in instances_to_hold:
|
||||||
|
continue
|
||||||
|
container = DockerContainer(config.docker_name, 0, config)
|
||||||
|
|
||||||
|
if len(partitions) == 0 or 0 in partitions:
|
||||||
|
to_deploy.append(container)
|
||||||
|
else:
|
||||||
|
for instance in range(1,config.instances + 1):
|
||||||
|
if config.docker_name in instances_to_hold:
|
||||||
|
continue
|
||||||
|
|
||||||
|
container_name = f"{config.docker_name}-{instance}"
|
||||||
|
if container_name in instances_to_hold:
|
||||||
|
continue
|
||||||
|
|
||||||
|
if len(partitions) == 0 or instance in partitions:
|
||||||
|
to_deploy.append(DockerContainer(container_name, instance, config))
|
||||||
|
|
||||||
return DeploymentPlan(
|
return DeploymentPlan(
|
||||||
services_to_build=sorted(list(services_to_build)),
|
services_to_build=sorted(list(services_to_build)),
|
||||||
instances_to_hold=instances_to_hold
|
instances_to_deploy=sorted(to_deploy, key = lambda c : c.deploy_key())
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@@ -118,6 +168,7 @@ def deploy_container(container: DockerContainer) -> None:
|
|||||||
text=True
|
text=True
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
# Stream output in real-time
|
# Stream output in real-time
|
||||||
while True:
|
while True:
|
||||||
output = process.stdout.readline()
|
output = process.stdout.readline()
|
||||||
@@ -131,49 +182,27 @@ def deploy_container(container: DockerContainer) -> None:
|
|||||||
raise BuildError(container, return_code)
|
raise BuildError(container, return_code)
|
||||||
|
|
||||||
def deploy_services(containers: List[str]) -> None:
|
def deploy_services(containers: List[str]) -> None:
|
||||||
cwd = os.getcwd()
|
print(f"Deploying {containers}")
|
||||||
os.chdir(docker_dir)
|
os.chdir(docker_dir)
|
||||||
|
|
||||||
for container in containers:
|
for container in containers:
|
||||||
deploy_container(container)
|
deploy_container(container)
|
||||||
|
|
||||||
def build_and_deploy(plan: DeploymentPlan, service_config: Dict[str, ServiceConfig]):
|
def build_and_deploy(plan: DeploymentPlan, service_config: Dict[str, ServiceConfig]):
|
||||||
"""Execute the deployment plan"""
|
"""Execute the deployment plan"""
|
||||||
for service in plan.services_to_build:
|
run_gradle_build([service_config[service].gradle_target for service in plan.services_to_build])
|
||||||
config = service_config[service]
|
|
||||||
print(f"Building {service}:")
|
|
||||||
run_gradle_build(service, config.gradle_target)
|
|
||||||
|
|
||||||
to_deploy = list()
|
deploy_services(plan.instances_to_deploy)
|
||||||
for service in plan.services_to_build:
|
|
||||||
config = service_config[service]
|
|
||||||
|
|
||||||
if config.instances == None:
|
|
||||||
if config.docker_name in plan.instances_to_hold:
|
|
||||||
continue
|
|
||||||
container = DockerContainer(config.docker_name, 0, config)
|
|
||||||
|
|
||||||
to_deploy.append(container)
|
|
||||||
else:
|
|
||||||
for instance in range(1,config.instances + 1):
|
|
||||||
container_name = f"{config.docker_name}-{instance}"
|
|
||||||
if container_name in plan.instances_to_hold:
|
|
||||||
continue
|
|
||||||
to_deploy.append(DockerContainer(container_name, instance, config))
|
|
||||||
to_deploy = sorted(to_deploy, key = lambda c : c.deploy_key())
|
|
||||||
|
|
||||||
deploy_services(to_deploy)
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def run_gradle_build(service: str, target: str) -> None:
|
def run_gradle_build(targets: str) -> None:
|
||||||
"""
|
"""
|
||||||
Run a Gradle build for the specified service and target.
|
Run a Gradle build for the specified target.
|
||||||
Raises BuildError if the build fails.
|
Raises BuildError if the build fails.
|
||||||
"""
|
"""
|
||||||
print(f"\nBuilding {service} with target {target}")
|
print(f"\nBuilding targets {targets}")
|
||||||
process = subprocess.Popen(
|
process = subprocess.Popen(
|
||||||
['./gradlew', '-q', target],
|
['./gradlew', '-q'] + targets,
|
||||||
stdout=subprocess.PIPE,
|
stdout=subprocess.PIPE,
|
||||||
stderr=subprocess.STDOUT,
|
stderr=subprocess.STDOUT,
|
||||||
text=True
|
text=True
|
||||||
@@ -199,73 +228,98 @@ if __name__ == '__main__':
|
|||||||
gradle_target=':code:services-application:search-service:docker',
|
gradle_target=':code:services-application:search-service:docker',
|
||||||
docker_name='search-service',
|
docker_name='search-service',
|
||||||
instances=2,
|
instances=2,
|
||||||
deploy_tier=2
|
deploy_tier=2,
|
||||||
|
groups={"all", "frontend", "core"}
|
||||||
),
|
),
|
||||||
'api': ServiceConfig(
|
'api': ServiceConfig(
|
||||||
gradle_target=':code:services-application:api-service:docker',
|
gradle_target=':code:services-application:api-service:docker',
|
||||||
docker_name='api-service',
|
docker_name='api-service',
|
||||||
instances=2,
|
instances=2,
|
||||||
deploy_tier=1
|
deploy_tier=1,
|
||||||
|
groups={"all", "core"}
|
||||||
),
|
),
|
||||||
'api': ServiceConfig(
|
'assistant': ServiceConfig(
|
||||||
gradle_target=':code:services-core:assistant-service:docker',
|
gradle_target=':code:services-core:assistant-service:docker',
|
||||||
docker_name='assistant-service',
|
docker_name='assistant-service',
|
||||||
instances=2,
|
instances=2,
|
||||||
deploy_tier=2
|
deploy_tier=2,
|
||||||
|
groups={"all", "core"}
|
||||||
),
|
),
|
||||||
'explorer': ServiceConfig(
|
'explorer': ServiceConfig(
|
||||||
gradle_target=':code:services-application:explorer-service:docker',
|
gradle_target=':code:services-application:explorer-service:docker',
|
||||||
docker_name='explorer-service',
|
docker_name='explorer-service',
|
||||||
instances=1,
|
instances=None,
|
||||||
deploy_tier=1
|
deploy_tier=1,
|
||||||
|
groups={"all", "extra"}
|
||||||
),
|
),
|
||||||
'dating': ServiceConfig(
|
'dating': ServiceConfig(
|
||||||
gradle_target=':code:services-application:dating-service:docker',
|
gradle_target=':code:services-application:dating-service:docker',
|
||||||
docker_name='dating-service',
|
docker_name='dating-service',
|
||||||
instances=1,
|
instances=None,
|
||||||
deploy_tier=1
|
deploy_tier=1,
|
||||||
|
groups={"all", "extra"}
|
||||||
),
|
),
|
||||||
'index': ServiceConfig(
|
'index': ServiceConfig(
|
||||||
gradle_target=':code:services-core:index-service:docker',
|
gradle_target=':code:services-core:index-service:docker',
|
||||||
docker_name='index-service',
|
docker_name='index-service',
|
||||||
instances=10,
|
instances=10,
|
||||||
deploy_tier=3
|
deploy_tier=3,
|
||||||
|
groups={"all", "index"}
|
||||||
),
|
),
|
||||||
'executor': ServiceConfig(
|
'executor': ServiceConfig(
|
||||||
gradle_target=':code:services-core:executor-service:docker',
|
gradle_target=':code:services-core:executor-service:docker',
|
||||||
docker_name='executor-service',
|
docker_name='executor-service',
|
||||||
instances=10,
|
instances=10,
|
||||||
deploy_tier=3
|
deploy_tier=3,
|
||||||
|
groups={"all", "executor"}
|
||||||
),
|
),
|
||||||
'control': ServiceConfig(
|
'control': ServiceConfig(
|
||||||
gradle_target=':code:services-core:control-service:docker',
|
gradle_target=':code:services-core:control-service:docker',
|
||||||
docker_name='control-service',
|
docker_name='control-service',
|
||||||
instances=None,
|
instances=None,
|
||||||
deploy_tier=0
|
deploy_tier=0,
|
||||||
|
groups={"all", "core"}
|
||||||
),
|
),
|
||||||
'query': ServiceConfig(
|
'query': ServiceConfig(
|
||||||
gradle_target=':code:services-core:query-service:docker',
|
gradle_target=':code:services-core:query-service:docker',
|
||||||
docker_name='query-service',
|
docker_name='query-service',
|
||||||
instances=2,
|
instances=2,
|
||||||
deploy_tier=2
|
deploy_tier=2,
|
||||||
|
groups={"all", "query"}
|
||||||
),
|
),
|
||||||
}
|
}
|
||||||
|
|
||||||
try:
|
try:
|
||||||
tags = get_deployment_tag()
|
parser = argparse.ArgumentParser(
|
||||||
if tags == None:
|
prog='deployment.py',
|
||||||
exit
|
description='Continuous Deployment helper')
|
||||||
|
parser.add_argument('-v', '--verify', help='Verify the tags are valid, if present', action='store_true')
|
||||||
|
parser.add_argument('-t', '--tag', help='Use the specified tag value instead of the head git tag starting with deploy-')
|
||||||
|
|
||||||
print(tags)
|
args = parser.parse_args()
|
||||||
|
tags = args.tag
|
||||||
|
if tags is None:
|
||||||
|
tags = get_deployment_tag()
|
||||||
|
else:
|
||||||
|
tags = tags.split(' ')
|
||||||
|
|
||||||
plan = parse_deployment_tags(tags, SERVICE_CONFIG)
|
|
||||||
print("\nDeployment Plan:")
|
|
||||||
print("Services to build:", plan.services_to_build)
|
|
||||||
print("Instances to hold:", plan.instances_to_hold)
|
|
||||||
|
|
||||||
print("\nExecution Plan:")
|
|
||||||
|
|
||||||
build_and_deploy(plan, SERVICE_CONFIG)
|
if tags != None:
|
||||||
|
print("Found deployment tags:", tags)
|
||||||
|
|
||||||
|
plan = parse_deployment_tags(tags, SERVICE_CONFIG)
|
||||||
|
|
||||||
|
print("\nDeployment Plan:")
|
||||||
|
print("Services to build:", plan.services_to_build)
|
||||||
|
print("Instances to deploy:", [container.name for container in plan.instances_to_deploy])
|
||||||
|
|
||||||
|
if not args.verify:
|
||||||
|
print("\nExecution Plan:")
|
||||||
|
|
||||||
|
build_and_deploy(plan, SERVICE_CONFIG)
|
||||||
|
else:
|
||||||
|
print("No tags found")
|
||||||
|
|
||||||
except ValueError as e:
|
except ValueError as e:
|
||||||
print(f"Error: {e}")
|
print(f"Error: {e}")
|
||||||
|
Reference in New Issue
Block a user