diff --git a/code/execution/java/nu/marginalia/actor/task/DownloadSampleActor.java b/code/execution/java/nu/marginalia/actor/task/DownloadSampleActor.java index d4490fd2f..0c989cd98 100644 --- a/code/execution/java/nu/marginalia/actor/task/DownloadSampleActor.java +++ b/code/execution/java/nu/marginalia/actor/task/DownloadSampleActor.java @@ -8,6 +8,7 @@ import nu.marginalia.actor.state.ActorResumeBehavior; import nu.marginalia.actor.state.ActorStep; import nu.marginalia.actor.state.Resume; import nu.marginalia.service.control.ServiceEventLog; +import nu.marginalia.service.control.ServiceHeartbeat; import nu.marginalia.storage.FileStorageService; import nu.marginalia.storage.model.FileStorage; import nu.marginalia.storage.model.FileStorageId; @@ -19,6 +20,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.*; +import java.net.HttpURLConnection; import java.net.MalformedURLException; import java.net.URI; import java.net.URL; @@ -32,6 +34,7 @@ public class DownloadSampleActor extends RecordActorPrototype { private final FileStorageService storageService; private final ServiceEventLog eventLog; + private final ServiceHeartbeat heartbeat; private final Logger logger = LoggerFactory.getLogger(getClass()); @Resume(behavior = ActorResumeBehavior.ERROR) @@ -66,15 +69,39 @@ public class DownloadSampleActor extends RecordActorPrototype { Files.deleteIfExists(Path.of(tarFileName)); - try (var is = new BufferedInputStream(new URI(downloadURI).toURL().openStream()); - var os = new BufferedOutputStream(Files.newOutputStream(Path.of(tarFileName), StandardOpenOption.CREATE))) { - is.transferTo(os); + HttpURLConnection urlConnection = (HttpURLConnection) new URI(downloadURI).toURL().openConnection(); + + try (var hb = heartbeat.createServiceAdHocTaskHeartbeat("Downloading sample")) { + long size = urlConnection.getContentLengthLong(); + byte[] buffer = new byte[8192]; + + try (var is = new BufferedInputStream(urlConnection.getInputStream()); + var os = new BufferedOutputStream(Files.newOutputStream(Path.of(tarFileName), StandardOpenOption.CREATE))) { + long copiedSize = 0; + + while (copiedSize < size) { + int read = is.read(buffer); + + if (read < 0) // We've been promised a file of length 'size' + throw new IOException("Unexpected end of stream"); + + os.write(buffer, 0, read); + copiedSize += read; + + // Update progress bar + hb.progress(String.format("%d MB", copiedSize / 1024 / 1024), (int) (copiedSize / 1024), (int) (size / 1024)); + } + } + } catch (Exception ex) { eventLog.logEvent(DownloadSampleActor.class, "Error downloading sample"); logger.error("Error downloading sample", ex); yield new Error(); } + finally { + urlConnection.disconnect(); + } eventLog.logEvent(DownloadSampleActor.class, "Download complete"); yield new Extract(fileStorageId, tarFileName); @@ -170,11 +197,12 @@ public class DownloadSampleActor extends RecordActorPrototype { @Inject public DownloadSampleActor(Gson gson, FileStorageService storageService, - ServiceEventLog eventLog) + ServiceEventLog eventLog, ServiceHeartbeat heartbeat) { super(gson); this.storageService = storageService; this.eventLog = eventLog; + this.heartbeat = heartbeat; } }