mirror of
https://github.com/MarginaliaSearch/MarginaliaSearch.git
synced 2025-10-06 07:32:38 +02:00
Compare commits
5 Commits
deploy-005
...
deploy-005
Author | SHA1 | Date | |
---|---|---|---|
|
bc2c2061f2 | ||
|
1c7f5a31a5 | ||
|
59a8ea60f7 | ||
|
aa9b1244ea | ||
|
2d17233366 |
@@ -20,7 +20,10 @@ public class DbDomainQueries {
|
||||
private final HikariDataSource dataSource;
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(DbDomainQueries.class);
|
||||
|
||||
private final Cache<EdgeDomain, Integer> domainIdCache = CacheBuilder.newBuilder().maximumSize(10_000).build();
|
||||
private final Cache<Integer, EdgeDomain> domainNameCache = CacheBuilder.newBuilder().maximumSize(10_000).build();
|
||||
private final Cache<String, List<DomainWithNode>> siblingsCache = CacheBuilder.newBuilder().maximumSize(10_000).build();
|
||||
|
||||
@Inject
|
||||
public DbDomainQueries(HikariDataSource dataSource)
|
||||
@@ -30,16 +33,21 @@ public class DbDomainQueries {
|
||||
|
||||
|
||||
public Integer getDomainId(EdgeDomain domain) throws NoSuchElementException {
|
||||
try (var connection = dataSource.getConnection()) {
|
||||
|
||||
try {
|
||||
return domainIdCache.get(domain, () -> {
|
||||
try (var stmt = connection.prepareStatement("SELECT ID FROM EC_DOMAIN WHERE DOMAIN_NAME=?")) {
|
||||
try (var connection = dataSource.getConnection();
|
||||
var stmt = connection.prepareStatement("SELECT ID FROM EC_DOMAIN WHERE DOMAIN_NAME=?")) {
|
||||
|
||||
stmt.setString(1, domain.toString());
|
||||
var rsp = stmt.executeQuery();
|
||||
if (rsp.next()) {
|
||||
return rsp.getInt(1);
|
||||
}
|
||||
}
|
||||
catch (SQLException ex) {
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
|
||||
throw new NoSuchElementException();
|
||||
});
|
||||
}
|
||||
@@ -49,9 +57,6 @@ public class DbDomainQueries {
|
||||
catch (ExecutionException ex) {
|
||||
throw new RuntimeException(ex.getCause());
|
||||
}
|
||||
catch (SQLException ex) {
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
}
|
||||
|
||||
public OptionalInt tryGetDomainId(EdgeDomain domain) {
|
||||
@@ -84,47 +89,55 @@ public class DbDomainQueries {
|
||||
}
|
||||
|
||||
public Optional<EdgeDomain> getDomain(int id) {
|
||||
try (var connection = dataSource.getConnection()) {
|
||||
|
||||
EdgeDomain existing = domainNameCache.getIfPresent(id);
|
||||
if (existing != null) {
|
||||
return Optional.of(existing);
|
||||
}
|
||||
|
||||
try (var connection = dataSource.getConnection()) {
|
||||
try (var stmt = connection.prepareStatement("SELECT DOMAIN_NAME FROM EC_DOMAIN WHERE ID=?")) {
|
||||
stmt.setInt(1, id);
|
||||
var rsp = stmt.executeQuery();
|
||||
if (rsp.next()) {
|
||||
return Optional.of(new EdgeDomain(rsp.getString(1)));
|
||||
var val = new EdgeDomain(rsp.getString(1));
|
||||
domainNameCache.put(id, val);
|
||||
return Optional.of(val);
|
||||
}
|
||||
return Optional.empty();
|
||||
}
|
||||
}
|
||||
catch (UncheckedExecutionException ex) {
|
||||
throw new RuntimeException(ex.getCause());
|
||||
}
|
||||
catch (SQLException ex) {
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
}
|
||||
|
||||
public List<DomainWithNode> otherSubdomains(EdgeDomain domain, int cnt) {
|
||||
List<DomainWithNode> ret = new ArrayList<>();
|
||||
public List<DomainWithNode> otherSubdomains(EdgeDomain domain, int cnt) throws ExecutionException {
|
||||
String topDomain = domain.topDomain;
|
||||
|
||||
try (var conn = dataSource.getConnection();
|
||||
var stmt = conn.prepareStatement("SELECT DOMAIN_NAME, NODE_AFFINITY FROM EC_DOMAIN WHERE DOMAIN_TOP = ? LIMIT ?")) {
|
||||
stmt.setString(1, domain.topDomain);
|
||||
stmt.setInt(2, cnt);
|
||||
return siblingsCache.get(topDomain, () -> {
|
||||
List<DomainWithNode> ret = new ArrayList<>();
|
||||
|
||||
var rs = stmt.executeQuery();
|
||||
while (rs.next()) {
|
||||
var sibling = new EdgeDomain(rs.getString(1));
|
||||
try (var conn = dataSource.getConnection();
|
||||
var stmt = conn.prepareStatement("SELECT DOMAIN_NAME, NODE_AFFINITY FROM EC_DOMAIN WHERE DOMAIN_TOP = ? LIMIT ?")) {
|
||||
stmt.setString(1, topDomain);
|
||||
stmt.setInt(2, cnt);
|
||||
|
||||
if (sibling.equals(domain))
|
||||
continue;
|
||||
var rs = stmt.executeQuery();
|
||||
while (rs.next()) {
|
||||
var sibling = new EdgeDomain(rs.getString(1));
|
||||
|
||||
ret.add(new DomainWithNode(sibling, rs.getInt(2)));
|
||||
if (sibling.equals(domain))
|
||||
continue;
|
||||
|
||||
ret.add(new DomainWithNode(sibling, rs.getInt(2)));
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
logger.error("Failed to get domain neighbors");
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
logger.error("Failed to get domain neighbors");
|
||||
}
|
||||
return ret;
|
||||
});
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
public record DomainWithNode (EdgeDomain domain, int nodeAffinity) {
|
||||
|
@@ -1,118 +0,0 @@
|
||||
package nu.marginalia.db;
|
||||
|
||||
import com.zaxxer.hikari.HikariDataSource;
|
||||
|
||||
import java.sql.Connection;
|
||||
import java.sql.PreparedStatement;
|
||||
import java.sql.SQLException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.OptionalInt;
|
||||
|
||||
/** Class used in exporting data. This is intended to be used for a brief time
|
||||
* and then discarded, not kept around as a service.
|
||||
*/
|
||||
public class DbDomainStatsExportMultitool implements AutoCloseable {
|
||||
private final Connection connection;
|
||||
private final int nodeId;
|
||||
private final PreparedStatement knownUrlsQuery;
|
||||
private final PreparedStatement visitedUrlsQuery;
|
||||
private final PreparedStatement goodUrlsQuery;
|
||||
private final PreparedStatement domainNameToId;
|
||||
|
||||
private final PreparedStatement allDomainsQuery;
|
||||
private final PreparedStatement crawlQueueDomains;
|
||||
private final PreparedStatement indexedDomainsQuery;
|
||||
|
||||
public DbDomainStatsExportMultitool(HikariDataSource dataSource, int nodeId) throws SQLException {
|
||||
this.connection = dataSource.getConnection();
|
||||
this.nodeId = nodeId;
|
||||
|
||||
knownUrlsQuery = connection.prepareStatement("""
|
||||
SELECT KNOWN_URLS
|
||||
FROM EC_DOMAIN INNER JOIN DOMAIN_METADATA
|
||||
ON EC_DOMAIN.ID=DOMAIN_METADATA.ID
|
||||
WHERE DOMAIN_NAME=?
|
||||
""");
|
||||
visitedUrlsQuery = connection.prepareStatement("""
|
||||
SELECT VISITED_URLS
|
||||
FROM EC_DOMAIN INNER JOIN DOMAIN_METADATA
|
||||
ON EC_DOMAIN.ID=DOMAIN_METADATA.ID
|
||||
WHERE DOMAIN_NAME=?
|
||||
""");
|
||||
goodUrlsQuery = connection.prepareStatement("""
|
||||
SELECT GOOD_URLS
|
||||
FROM EC_DOMAIN INNER JOIN DOMAIN_METADATA
|
||||
ON EC_DOMAIN.ID=DOMAIN_METADATA.ID
|
||||
WHERE DOMAIN_NAME=?
|
||||
""");
|
||||
domainNameToId = connection.prepareStatement("""
|
||||
SELECT ID
|
||||
FROM EC_DOMAIN
|
||||
WHERE DOMAIN_NAME=?
|
||||
""");
|
||||
allDomainsQuery = connection.prepareStatement("""
|
||||
SELECT DOMAIN_NAME
|
||||
FROM EC_DOMAIN
|
||||
""");
|
||||
crawlQueueDomains = connection.prepareStatement("""
|
||||
SELECT DOMAIN_NAME
|
||||
FROM CRAWL_QUEUE
|
||||
""");
|
||||
indexedDomainsQuery = connection.prepareStatement("""
|
||||
SELECT DOMAIN_NAME
|
||||
FROM EC_DOMAIN
|
||||
WHERE INDEXED > 0
|
||||
""");
|
||||
}
|
||||
|
||||
public OptionalInt getVisitedUrls(String domainName) throws SQLException {
|
||||
return executeNameToIntQuery(domainName, visitedUrlsQuery);
|
||||
}
|
||||
|
||||
public OptionalInt getDomainId(String domainName) throws SQLException {
|
||||
return executeNameToIntQuery(domainName, domainNameToId);
|
||||
}
|
||||
|
||||
public List<String> getCrawlQueueDomains() throws SQLException {
|
||||
return executeListQuery(crawlQueueDomains, 100);
|
||||
}
|
||||
public List<String> getAllIndexedDomains() throws SQLException {
|
||||
return executeListQuery(indexedDomainsQuery, 100_000);
|
||||
}
|
||||
|
||||
private OptionalInt executeNameToIntQuery(String domainName, PreparedStatement statement)
|
||||
throws SQLException {
|
||||
statement.setString(1, domainName);
|
||||
var rs = statement.executeQuery();
|
||||
|
||||
if (rs.next()) {
|
||||
return OptionalInt.of(rs.getInt(1));
|
||||
}
|
||||
|
||||
return OptionalInt.empty();
|
||||
}
|
||||
|
||||
private List<String> executeListQuery(PreparedStatement statement, int sizeHint) throws SQLException {
|
||||
List<String> ret = new ArrayList<>(sizeHint);
|
||||
|
||||
var rs = statement.executeQuery();
|
||||
|
||||
while (rs.next()) {
|
||||
ret.add(rs.getString(1));
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws SQLException {
|
||||
knownUrlsQuery.close();
|
||||
goodUrlsQuery.close();
|
||||
visitedUrlsQuery.close();
|
||||
allDomainsQuery.close();
|
||||
crawlQueueDomains.close();
|
||||
domainNameToId.close();
|
||||
connection.close();
|
||||
}
|
||||
}
|
@@ -16,20 +16,18 @@ import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Comparator;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
import static java.lang.Math.clamp;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
@Singleton
|
||||
public class IndexClient {
|
||||
private static final Logger logger = LoggerFactory.getLogger(IndexClient.class);
|
||||
private final GrpcMultiNodeChannelPool<IndexApiGrpc.IndexApiBlockingStub> channelPool;
|
||||
private final DomainBlacklistImpl blacklist;
|
||||
private static final ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
|
||||
private static final ExecutorService executor = Executors.newCachedThreadPool();
|
||||
|
||||
@Inject
|
||||
public IndexClient(GrpcChannelPoolFactory channelPoolFactory, DomainBlacklistImpl blacklist) {
|
||||
@@ -51,40 +49,31 @@ public class IndexClient {
|
||||
|
||||
/** Execute a query on the index partitions and return the combined results. */
|
||||
public AggregateQueryResponse executeQueries(RpcIndexQuery indexRequest, Pagination pagination) {
|
||||
List<CompletableFuture<Iterator<RpcDecoratedResultItem>>> futures =
|
||||
channelPool.call(IndexApiGrpc.IndexApiBlockingStub::query)
|
||||
.async(executor)
|
||||
.runEach(indexRequest);
|
||||
|
||||
final int requestedMaxResults = indexRequest.getQueryLimits().getResultsTotal();
|
||||
final int resultsUpperBound = requestedMaxResults * channelPool.getNumNodes();
|
||||
|
||||
List<RpcDecoratedResultItem> results = new ArrayList<>(resultsUpperBound);
|
||||
AtomicInteger totalNumResults = new AtomicInteger(0);
|
||||
|
||||
for (var future : futures) {
|
||||
try {
|
||||
future.get().forEachRemaining(results::add);
|
||||
}
|
||||
catch (Exception e) {
|
||||
logger.error("Downstream exception", e);
|
||||
}
|
||||
}
|
||||
List<RpcDecoratedResultItem> results =
|
||||
channelPool.call(IndexApiGrpc.IndexApiBlockingStub::query)
|
||||
.async(executor)
|
||||
.runEach(indexRequest)
|
||||
.stream()
|
||||
.map(future -> future.thenApply(iterator -> {
|
||||
List<RpcDecoratedResultItem> ret = new ArrayList<>(requestedMaxResults);
|
||||
iterator.forEachRemaining(ret::add);
|
||||
totalNumResults.addAndGet(ret.size());
|
||||
return ret;
|
||||
}))
|
||||
.map(CompletableFuture::join)
|
||||
.flatMap(List::stream)
|
||||
.filter(item -> !isBlacklisted(item))
|
||||
.sorted(comparator)
|
||||
.skip(Math.max(0, (pagination.page - 1) * pagination.pageSize))
|
||||
.limit(pagination.pageSize)
|
||||
.toList();
|
||||
|
||||
// Sort the results by ranking score and remove blacklisted domains
|
||||
results.sort(comparator);
|
||||
results.removeIf(this::isBlacklisted);
|
||||
|
||||
int numReceivedResults = results.size();
|
||||
|
||||
// pagination is typically 1-indexed, so we need to adjust the start and end indices
|
||||
int indexStart = (pagination.page - 1) * pagination.pageSize;
|
||||
int indexEnd = (pagination.page) * pagination.pageSize;
|
||||
|
||||
results = results.subList(
|
||||
clamp(indexStart, 0, Math.max(0, results.size() - 1)), // from is inclusive, so subtract 1 from size()
|
||||
clamp(indexEnd, 0, results.size()));
|
||||
|
||||
return new AggregateQueryResponse(results, pagination.page(), numReceivedResults);
|
||||
return new AggregateQueryResponse(results, pagination.page(), totalNumResults.get());
|
||||
}
|
||||
|
||||
private boolean isBlacklisted(RpcDecoratedResultItem item) {
|
||||
|
@@ -26,10 +26,9 @@ import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.sql.SQLException;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Supplier;
|
||||
@@ -69,9 +68,11 @@ public class SearchSiteInfoService {
|
||||
this.screenshotService = screenshotService;
|
||||
this.dataSource = dataSource;
|
||||
this.searchSiteSubscriptions = searchSiteSubscriptions;
|
||||
|
||||
Thread.ofPlatform().name("Recently Added Domains Model Updater").start(this::modelUpdater);
|
||||
}
|
||||
|
||||
private volatile SiteOverviewModel model = new SiteOverviewModel(List.of(), Instant.EPOCH);
|
||||
private volatile SiteOverviewModel cachedOverviewModel = new SiteOverviewModel(List.of());
|
||||
|
||||
@GET
|
||||
@Path("/site")
|
||||
@@ -81,55 +82,52 @@ public class SearchSiteInfoService {
|
||||
return new MapModelAndView("redirect.jte", Map.of("url", "/site/"+domain));
|
||||
}
|
||||
|
||||
if (model.age().compareTo(Duration.ofMinutes(15)) > 0) {
|
||||
updateModel();
|
||||
}
|
||||
|
||||
return new MapModelAndView("siteinfo/start.jte",
|
||||
Map.of("navbar", NavbarModel.SITEINFO,
|
||||
"model", model));
|
||||
"model", cachedOverviewModel));
|
||||
}
|
||||
|
||||
/** Update the model if it is older than 15 minutes.
|
||||
* This query is expensive and should not be run too often,
|
||||
* and the data doesn't change that often either.
|
||||
* <p></p>
|
||||
* This method is synchronized to avoid multiple threads updating the model at the same time.
|
||||
*/
|
||||
private synchronized void updateModel() {
|
||||
var currentModel = model;
|
||||
if (currentModel.age().compareTo(Duration.ofMinutes(15)) < 0) {
|
||||
return;
|
||||
}
|
||||
private void modelUpdater() {
|
||||
while (!Thread.interrupted()) {
|
||||
List<SiteOverviewModel.DiscoveredDomain> domains = new ArrayList<>();
|
||||
|
||||
List<SiteOverviewModel.DiscoveredDomain> domains = new ArrayList<>();
|
||||
// This query can be quite expensive, so we can't run it on demand
|
||||
// for every request. Instead, we run it every 15 minutes and cache
|
||||
// the result.
|
||||
|
||||
try (var conn = dataSource.getConnection();
|
||||
var stmt = conn.prepareStatement("SELECT DOMAIN_NAME, DISCOVER_DATE FROM EC_DOMAIN WHERE NODE_AFFINITY = 0 ORDER BY ID DESC LIMIT 10")) {
|
||||
try (var conn = dataSource.getConnection();
|
||||
var stmt = conn.prepareStatement("""
|
||||
SELECT DOMAIN_NAME, DISCOVER_DATE
|
||||
FROM EC_DOMAIN
|
||||
WHERE NODE_AFFINITY = 0
|
||||
ORDER BY ID DESC
|
||||
LIMIT 10
|
||||
"""))
|
||||
{
|
||||
var rs = stmt.executeQuery();
|
||||
while (rs.next()) {
|
||||
domains.add(new SiteOverviewModel.DiscoveredDomain(
|
||||
rs.getString("DOMAIN_NAME"),
|
||||
rs.getString("DISCOVER_DATE"))
|
||||
);
|
||||
}
|
||||
} catch (SQLException ex) {
|
||||
logger.warn("Failed to get recently added domains: {}", ex.getMessage());
|
||||
}
|
||||
|
||||
var rs = stmt.executeQuery();
|
||||
while (rs.next()) {
|
||||
domains.add(new SiteOverviewModel.DiscoveredDomain(rs.getString("DOMAIN_NAME"), rs.getString("DISCOVER_DATE")));
|
||||
cachedOverviewModel = new SiteOverviewModel(domains);
|
||||
|
||||
try {
|
||||
TimeUnit.MINUTES.sleep(15);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
break;
|
||||
}
|
||||
}
|
||||
catch (SQLException ex) {
|
||||
throw new RuntimeException();
|
||||
}
|
||||
|
||||
model = new SiteOverviewModel(domains);
|
||||
}
|
||||
|
||||
public record SiteOverviewModel(List<DiscoveredDomain> domains, Instant captureTime) {
|
||||
|
||||
public SiteOverviewModel(List<DiscoveredDomain> domains) {
|
||||
this(domains, Instant.now());
|
||||
}
|
||||
|
||||
public record SiteOverviewModel(List<DiscoveredDomain> domains) {
|
||||
public record DiscoveredDomain(String name, String timestamp) {}
|
||||
|
||||
public Duration age() {
|
||||
return Duration.between(captureTime, Instant.now());
|
||||
}
|
||||
}
|
||||
|
||||
@GET
|
||||
@@ -139,7 +137,7 @@ public class SearchSiteInfoService {
|
||||
@PathParam String domainName,
|
||||
@QueryParam String view,
|
||||
@QueryParam Integer page
|
||||
) throws SQLException {
|
||||
) throws SQLException, ExecutionException {
|
||||
|
||||
if (null == domainName || domainName.isBlank()) {
|
||||
return null;
|
||||
@@ -225,7 +223,7 @@ public class SearchSiteInfoService {
|
||||
);
|
||||
}
|
||||
|
||||
private SiteInfoWithContext listInfo(Context context, String domainName) {
|
||||
private SiteInfoWithContext listInfo(Context context, String domainName) throws ExecutionException {
|
||||
|
||||
var domain = new EdgeDomain(domainName);
|
||||
final int domainId = domainQueries.tryGetDomainId(domain).orElse(-1);
|
||||
|
@@ -1,5 +1,4 @@
|
||||
@import nu.marginalia.db.DbDomainQueries
|
||||
@import nu.marginalia.model.EdgeDomain
|
||||
@import nu.marginalia.search.svc.SearchSiteInfoService
|
||||
@import nu.marginalia.search.svc.SearchSiteInfoService.*
|
||||
@import nu.marginalia.search.model.UrlDetails
|
||||
|
Reference in New Issue
Block a user