From 44f22b27c56b715756bad810378f6c1e8c388a9c Mon Sep 17 00:00:00 2001 From: Michael Barry Date: Wed, 1 Nov 2023 19:30:28 -0400 Subject: [PATCH] Rewrite downloader using virtual threads (#702) --- .../com/onthegomap/planetiler/Planetiler.java | 4 +- .../planetiler/util/Downloader.java | 300 +++++++----------- .../onthegomap/planetiler/util/FileUtils.java | 21 +- .../planetiler/util/TopOsmTiles.java | 2 +- .../planetiler/worker/RunnableThatThrows.java | 4 + .../planetiler/util/DownloaderTest.java | 54 ++-- .../planetiler/util/FileUtilsTest.java | 7 + 7 files changed, 175 insertions(+), 217 deletions(-) diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/Planetiler.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/Planetiler.java index 5fe00af2..e23b1926 100644 --- a/planetiler-core/src/main/java/com/onthegomap/planetiler/Planetiler.java +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/Planetiler.java @@ -906,7 +906,7 @@ public class Planetiler { private void download() { var timer = stats.startStage("download"); - Downloader downloader = Downloader.create(config(), stats()); + Downloader downloader = Downloader.create(config()); for (ToDownload toDownload : toDownload) { if (profile.caresAboutSource(toDownload.id)) { downloader.add(toDownload.id, toDownload.url, toDownload.path); @@ -919,7 +919,7 @@ public class Planetiler { private void ensureInputFilesExist() { for (InputPath inputPath : inputPaths) { if (profile.caresAboutSource(inputPath.id) && !Files.exists(inputPath.path)) { - throw new IllegalArgumentException(inputPath.path + " does not exist"); + throw new IllegalArgumentException(inputPath.path + " does not exist. Run with --download to fetch it"); } } } diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/util/Downloader.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/util/Downloader.java index 5b5dde17..43e10961 100644 --- a/planetiler-core/src/main/java/com/onthegomap/planetiler/util/Downloader.java +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/util/Downloader.java @@ -5,12 +5,12 @@ import static java.nio.file.StandardOpenOption.WRITE; import com.google.common.util.concurrent.RateLimiter; import com.onthegomap.planetiler.config.PlanetilerConfig; +import com.onthegomap.planetiler.stats.Counter; import com.onthegomap.planetiler.stats.ProgressLoggers; -import com.onthegomap.planetiler.stats.Stats; -import com.onthegomap.planetiler.worker.WorkerPipeline; +import com.onthegomap.planetiler.worker.RunnableThatThrows; +import com.onthegomap.planetiler.worker.Worker; import java.io.IOException; import java.io.InputStream; -import java.io.UncheckedIOException; import java.net.URI; import java.net.URLConnection; import java.net.http.HttpClient; @@ -18,9 +18,7 @@ import java.net.http.HttpHeaders; import java.net.http.HttpRequest; import java.net.http.HttpResponse; import java.nio.ByteBuffer; -import java.nio.channels.Channels; import java.nio.channels.FileChannel; -import java.nio.channels.ReadableByteChannel; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; @@ -30,9 +28,9 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicLong; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,32 +64,26 @@ public class Downloader { private static final Logger LOGGER = LoggerFactory.getLogger(Downloader.class); private final PlanetilerConfig config; private final List toDownloadList = new ArrayList<>(); - private final HttpClient client = HttpClient.newBuilder() - // explicitly follow redirects to capture final redirect url - .followRedirects(HttpClient.Redirect.NEVER).build(); + private final HttpClient client; private final ExecutorService executor; - private final Stats stats; private final long chunkSizeBytes; private final ResourceUsage diskSpaceCheck = new ResourceUsage("download"); private final RateLimiter rateLimiter; - Downloader(PlanetilerConfig config, Stats stats, long chunkSizeBytes) { + Downloader(PlanetilerConfig config, long chunkSizeBytes) { this.rateLimiter = config.downloadMaxBandwidth() == 0 ? null : RateLimiter.create(config.downloadMaxBandwidth()); this.chunkSizeBytes = chunkSizeBytes; this.config = config; - this.stats = stats; - this.executor = Executors.newSingleThreadExecutor(runnable -> { - Thread thread = new Thread(() -> { - LogUtil.setStage("download"); - runnable.run(); - }); - thread.setDaemon(true); - return thread; - }); + this.executor = Executors.newVirtualThreadPerTaskExecutor(); + this.client = HttpClient.newBuilder() + // explicitly follow redirects to capture final redirect url + .followRedirects(HttpClient.Redirect.NEVER) + .executor(executor) + .build(); } - public static Downloader create(PlanetilerConfig config, Stats stats) { - return new Downloader(config, stats, config.downloadChunkSizeMB() * 1_000_000L); + public static Downloader create(PlanetilerConfig config) { + return new Downloader(config, config.downloadChunkSizeMB() * 1_000_000L); } private static URLConnection getUrlConnection(String urlString, PlanetilerConfig config) throws IOException { @@ -188,145 +180,117 @@ public class Downloader { } CompletableFuture downloadIfNecessary(ResourceToDownload resourceToDownload) { - long existingSize = FileUtils.size(resourceToDownload.output); - - return httpHeadFollowRedirects(resourceToDownload.url, 0) - .whenComplete((metadata, err) -> { - if (metadata != null) { - resourceToDownload.metadata.complete(metadata); - } else { - resourceToDownload.metadata.completeExceptionally(err); - } - }) - .thenComposeAsync(metadata -> { - if (metadata.size == existingSize) { - LOGGER.info("Skipping {}: {} already up-to-date", resourceToDownload.id, resourceToDownload.output); - return CompletableFuture.completedFuture(null); - } else { - String redirectInfo = metadata.canonicalUrl.equals(resourceToDownload.url) ? "" : - " (redirected to " + metadata.canonicalUrl + ")"; - LOGGER.info("Downloading {}{} to {}", resourceToDownload.url, redirectInfo, resourceToDownload.output); - FileUtils.delete(resourceToDownload.output); - FileUtils.createParentDirectories(resourceToDownload.output); - Path tmpPath = resourceToDownload.tmpPath(); - FileUtils.delete(tmpPath); - FileUtils.deleteOnExit(tmpPath); - diskSpaceCheck.addDisk(tmpPath, metadata.size, resourceToDownload.id); - diskSpaceCheck.checkAgainstLimits(config.force(), false); - return httpDownload(resourceToDownload, tmpPath) - .thenCompose(result -> { - try { - Files.move(tmpPath, resourceToDownload.output); - return CompletableFuture.completedFuture(null); - } catch (IOException e) { - return CompletableFuture.failedFuture(e); - } - }) - .whenCompleteAsync((result, error) -> { - if (error != null) { - LOGGER.error("Error downloading {} to {}", resourceToDownload.url, resourceToDownload.output, error); - } else { - LOGGER.info("Finished downloading {} to {}", resourceToDownload.url, resourceToDownload.output); - } - FileUtils.delete(tmpPath); - }, executor); - } - }, executor); + return CompletableFuture.runAsync(RunnableThatThrows.wrap(() -> { + LogUtil.setStage("download", resourceToDownload.id); + long existingSize = FileUtils.size(resourceToDownload.output); + var metadata = httpHeadFollowRedirects(resourceToDownload.url, 0); + Path tmpPath = resourceToDownload.tmpPath(); + resourceToDownload.metadata.complete(metadata); + if (metadata.size == existingSize) { + LOGGER.info("Skipping {}: {} already up-to-date", resourceToDownload.id, resourceToDownload.output); + return; + } + try { + String redirectInfo = metadata.canonicalUrl.equals(resourceToDownload.url) ? "" : + " (redirected to " + metadata.canonicalUrl + ")"; + LOGGER.info("Downloading {}{} to {}", resourceToDownload.url, redirectInfo, resourceToDownload.output); + FileUtils.delete(resourceToDownload.output); + FileUtils.createParentDirectories(resourceToDownload.output); + FileUtils.delete(tmpPath); + FileUtils.deleteOnExit(tmpPath); + diskSpaceCheck.addDisk(tmpPath, metadata.size, resourceToDownload.id); + diskSpaceCheck.checkAgainstLimits(config.force(), false); + httpDownload(resourceToDownload, tmpPath); + Files.move(tmpPath, resourceToDownload.output); + LOGGER.info("Finished downloading {} to {}", resourceToDownload.url, resourceToDownload.output); + } catch (Exception e) { // NOSONAR + LOGGER.error("Error downloading {} to {}", resourceToDownload.url, resourceToDownload.output, e); + throw e; + } finally { + FileUtils.delete(tmpPath); + } + }), executor); } - private CompletableFuture httpHeadFollowRedirects(String url, int redirects) { + private ResourceMetadata httpHeadFollowRedirects(String url, int redirects) throws IOException, InterruptedException { if (redirects > MAX_REDIRECTS) { throw new IllegalStateException("Exceeded " + redirects + " redirects for " + url); } - return httpHead(url).thenComposeAsync(response -> response.redirect.isPresent() ? - httpHeadFollowRedirects(response.redirect.get(), redirects + 1) : CompletableFuture.completedFuture(response)); + var response = httpHead(url); + return response.redirect.isPresent() ? httpHeadFollowRedirects(response.redirect.get(), redirects + 1) : response; } - CompletableFuture httpHead(String url) { - return client - .sendAsync(newHttpRequest(url).HEAD().build(), - responseInfo -> { - int status = responseInfo.statusCode(); - Optional location = Optional.empty(); - long contentLength = 0; - HttpHeaders headers = responseInfo.headers(); - if (status >= 300 && status < 400) { - location = responseInfo.headers().firstValue(LOCATION); - if (location.isEmpty()) { - throw new IllegalStateException("Received " + status + " but no location header from " + url); - } - } else if (responseInfo.statusCode() != 200) { - throw new IllegalStateException("Bad response: " + responseInfo.statusCode()); - } else { - contentLength = headers.firstValueAsLong(CONTENT_LENGTH).orElseThrow(); + ResourceMetadata httpHead(String url) throws IOException, InterruptedException { + return client.send(newHttpRequest(url).HEAD().build(), + responseInfo -> { + int status = responseInfo.statusCode(); + Optional location = Optional.empty(); + long contentLength = 0; + HttpHeaders headers = responseInfo.headers(); + if (status >= 300 && status < 400) { + location = responseInfo.headers().firstValue(LOCATION); + if (location.isEmpty()) { + throw new IllegalStateException("Received " + status + " but no location header from " + url); } - boolean supportsRangeRequest = headers.allValues(ACCEPT_RANGES).contains("bytes"); - ResourceMetadata metadata = new ResourceMetadata(location, url, contentLength, supportsRangeRequest); - return HttpResponse.BodyHandlers.replacing(metadata).apply(responseInfo); - }) - .thenApply(HttpResponse::body); - } - - private CompletableFuture httpDownload(ResourceToDownload resource, Path tmpPath) { - /* - * Alternative using async HTTP client: - * - * return client.sendAsync(newHttpRequest(url).GET().build(), responseInfo -> { - * assertOK(responseInfo); - * return HttpResponse.BodyHandlers.ofFile(path).apply(responseInfo); - * - * But it is slower on large files - */ - return resource.metadata.thenCompose(metadata -> { - String canonicalUrl = metadata.canonicalUrl; - record Range(long start, long end) { - - long size() { - return end - start; + } else if (responseInfo.statusCode() != 200) { + throw new IllegalStateException("Bad response: " + responseInfo.statusCode()); + } else { + contentLength = headers.firstValueAsLong(CONTENT_LENGTH).orElseThrow(); } - } - List chunks = new ArrayList<>(); - boolean ranges = metadata.acceptRange && config.downloadThreads() > 1; - long chunkSize = ranges ? chunkSizeBytes : metadata.size; - for (long start = 0; start < metadata.size; start += chunkSize) { - long end = Math.min(start + chunkSize, metadata.size); - chunks.add(new Range(start, end)); - } - // create an empty file - try { - Files.createFile(tmpPath); - } catch (IOException e) { - return CompletableFuture.failedFuture(new IOException("Failed to create " + resource.output, e)); - } - return WorkerPipeline.start("download-" + resource.id, stats) - .readFromTiny("chunks", chunks) - .sinkToConsumer("chunk-downloader", Math.min(config.downloadThreads(), chunks.size()), range -> { - try (var fileChannel = FileChannel.open(tmpPath, WRITE)) { - while (range.size() > 0) { - try ( - var inputStream = (ranges || range.start > 0) ? openStreamRange(canonicalUrl, range.start, range.end) : - openStream(canonicalUrl); - var input = new ProgressChannel(Channels.newChannel(inputStream), resource.progress, rateLimiter) - ) { - // ensure this file has been allocated up to the start of this block - fileChannel.write(ByteBuffer.allocate(1), range.start); - fileChannel.position(range.start); - long transferred = fileChannel.transferFrom(input, range.start, range.size()); - if (transferred == 0) { - throw new IOException("Transferred 0 bytes but " + range.size() + " expected: " + canonicalUrl); - } else if (transferred != range.size() && !metadata.acceptRange) { - throw new IOException( - "Transferred " + transferred + " bytes but " + range.size() + " expected: " + canonicalUrl + - " and server does not support range requests"); - } - range = new Range(range.start + transferred, range.end); - } - } - } catch (IOException e) { - throw new UncheckedIOException(e); + boolean supportsRangeRequest = headers.allValues(ACCEPT_RANGES).contains("bytes"); + ResourceMetadata metadata = new ResourceMetadata(location, url, contentLength, supportsRangeRequest); + return HttpResponse.BodyHandlers.replacing(metadata).apply(responseInfo); + }).body(); + } + + private void httpDownload(ResourceToDownload resource, Path tmpPath) + throws ExecutionException, InterruptedException { + var metadata = resource.metadata().get(); + String canonicalUrl = metadata.canonicalUrl(); + record Range(long start, long end) {} + List chunks = new ArrayList<>(); + boolean ranges = metadata.acceptRange && config.downloadThreads() > 1; + long chunkSize = ranges ? chunkSizeBytes : metadata.size; + for (long start = 0; start < metadata.size; start += chunkSize) { + long end = Math.min(start + chunkSize, metadata.size); + chunks.add(new Range(start, end)); + } + FileUtils.setLength(tmpPath, metadata.size); + Semaphore perFileLimiter = new Semaphore(config.downloadThreads()); + Worker.joinFutures(chunks.stream().map(range -> CompletableFuture.runAsync(RunnableThatThrows.wrap(() -> { + LogUtil.setStage("download", resource.id); + perFileLimiter.acquire(); + var counter = resource.progress.counterForThread(); + try ( + var fc = FileChannel.open(tmpPath, WRITE); + var inputStream = (ranges || range.start > 0) ? + openStreamRange(canonicalUrl, range.start, range.end) : + openStream(canonicalUrl); + ) { + long offset = range.start; + byte[] buffer = new byte[16384]; + int read; + while (offset < range.end && (read = inputStream.read(buffer, 0, 16384)) >= 0) { + counter.incBy(read); + if (rateLimiter != null) { + rateLimiter.acquire(read); } - }).done(); - }); + int position = 0; + int remaining = read; + while (remaining > 0) { + int written = fc.write(ByteBuffer.wrap(buffer, position, remaining), offset); + if (written <= 0) { + throw new IOException("Failed to write to " + tmpPath); + } + position += written; + remaining -= written; + offset += written; + } + } + } finally { + perFileLimiter.release(); + } + }), executor)).toArray(CompletableFuture[]::new)).get(); } private HttpRequest.Builder newHttpRequest(String url) { @@ -338,11 +302,12 @@ public class Downloader { record ResourceMetadata(Optional redirect, String canonicalUrl, long size, boolean acceptRange) {} record ResourceToDownload( - String id, String url, Path output, CompletableFuture metadata, AtomicLong progress + String id, String url, Path output, CompletableFuture metadata, + Counter.MultiThreadCounter progress ) { ResourceToDownload(String id, String url, Path output) { - this(id, url, output, new CompletableFuture<>(), new AtomicLong(0)); + this(id, url, output, new CompletableFuture<>(), Counter.newMultiThreadCounter()); } public Path tmpPath() { @@ -353,33 +318,4 @@ public class Downloader { return progress.get(); } } - - /** - * Wrapper for a {@link ReadableByteChannel} that captures progress information. - */ - private record ProgressChannel(ReadableByteChannel inner, AtomicLong progress, RateLimiter rateLimiter) - implements ReadableByteChannel { - - @Override - public int read(ByteBuffer dst) throws IOException { - int n = inner.read(dst); - if (n > 0) { - if (rateLimiter != null) { - rateLimiter.acquire(n); - } - progress.addAndGet(n); - } - return n; - } - - @Override - public boolean isOpen() { - return inner.isOpen(); - } - - @Override - public void close() throws IOException { - inner.close(); - } - } } diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/util/FileUtils.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/util/FileUtils.java index cbd60373..1cbbe138 100644 --- a/planetiler-core/src/main/java/com/onthegomap/planetiler/util/FileUtils.java +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/util/FileUtils.java @@ -1,8 +1,13 @@ package com.onthegomap.planetiler.util; +import static java.nio.file.StandardOpenOption.CREATE; +import static java.nio.file.StandardOpenOption.WRITE; + import java.io.IOException; import java.io.InputStream; import java.io.UncheckedIOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; import java.nio.file.ClosedFileSystemException; import java.nio.file.FileStore; import java.nio.file.FileSystem; @@ -263,7 +268,7 @@ public class FileUtils { * @throws UncheckedIOException if an IO exception occurs */ public static void safeCopy(InputStream inputStream, Path destPath) { - try (var outputStream = Files.newOutputStream(destPath, StandardOpenOption.CREATE, StandardOpenOption.WRITE)) { + try (var outputStream = Files.newOutputStream(destPath, StandardOpenOption.CREATE, WRITE)) { int totalSize = 0; int nBytes; @@ -310,7 +315,7 @@ public class FileUtils { try ( var out = Files.newOutputStream(destination, StandardOpenOption.CREATE_NEW, - StandardOpenOption.WRITE) + WRITE) ) { totalEntryArchive++; while ((nBytes = zip.read(buffer)) > 0) { @@ -366,4 +371,16 @@ public class FileUtils { return true; } } + + /** Expands the file at {@code path} to {@code size} bytes. */ + public static void setLength(Path path, long size) { + try (var fc = FileChannel.open(path, CREATE, WRITE)) { + int written = fc.write(ByteBuffer.allocate(1), size - 1); + if (written != 1) { + throw new IOException("Unable to expand " + path + " to " + size); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } } diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/util/TopOsmTiles.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/util/TopOsmTiles.java index e66e8cbc..9f893efd 100644 --- a/planetiler-core/src/main/java/com/onthegomap/planetiler/util/TopOsmTiles.java +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/util/TopOsmTiles.java @@ -62,7 +62,7 @@ public class TopOsmTiles { TopOsmTiles(PlanetilerConfig config, Stats stats) { this.config = config; this.stats = stats; - downloader = Downloader.create(config, stats); + downloader = Downloader.create(config); } Reader fetch(LocalDate date) throws IOException { diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/worker/RunnableThatThrows.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/worker/RunnableThatThrows.java index d2c0ea73..a3ed4ae9 100644 --- a/planetiler-core/src/main/java/com/onthegomap/planetiler/worker/RunnableThatThrows.java +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/worker/RunnableThatThrows.java @@ -18,4 +18,8 @@ public interface RunnableThatThrows { throwFatalException(e); } } + + static Runnable wrap(RunnableThatThrows thrower) { + return thrower::runAndWrapException; + } } diff --git a/planetiler-core/src/test/java/com/onthegomap/planetiler/util/DownloaderTest.java b/planetiler-core/src/test/java/com/onthegomap/planetiler/util/DownloaderTest.java index f7143977..f5b285b2 100644 --- a/planetiler-core/src/test/java/com/onthegomap/planetiler/util/DownloaderTest.java +++ b/planetiler-core/src/test/java/com/onthegomap/planetiler/util/DownloaderTest.java @@ -3,18 +3,16 @@ package com.onthegomap.planetiler.util; import static org.junit.jupiter.api.Assertions.*; import com.onthegomap.planetiler.config.PlanetilerConfig; -import com.onthegomap.planetiler.stats.Stats; import java.io.ByteArrayInputStream; import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; -import java.util.Arrays; import java.util.Map; import java.util.Optional; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicLong; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; @@ -25,26 +23,25 @@ class DownloaderTest { @TempDir Path path; private final PlanetilerConfig config = PlanetilerConfig.defaults(); - private final Stats stats = Stats.inMemory(); - private long downloads = 0; + private AtomicLong downloads = new AtomicLong(0); - private Downloader mockDownloader(Map resources, boolean supportsRange, int maxLength) { - return new Downloader(config, stats, 2L) { + private Downloader mockDownloader(Map resources, boolean supportsRange) { + return new Downloader(config, 2L) { @Override InputStream openStream(String url) { - downloads++; + downloads.incrementAndGet(); assertTrue(resources.containsKey(url), "no resource for " + url); byte[] bytes = resources.get(url); - return new ByteArrayInputStream(maxLength < bytes.length ? Arrays.copyOf(bytes, maxLength) : bytes); + return new ByteArrayInputStream(bytes); } @Override InputStream openStreamRange(String url, long start, long end) { assertTrue(supportsRange, "does not support range"); - downloads++; + downloads.incrementAndGet(); assertTrue(resources.containsKey(url), "no resource for " + url); - byte[] result = new byte[Math.min(maxLength, (int) (end - start))]; + byte[] result = new byte[(int) (end - start)]; byte[] bytes = resources.get(url); for (int i = (int) start; i < start + result.length; i++) { result[(int) (i - start)] = bytes[i]; @@ -53,31 +50,28 @@ class DownloaderTest { } @Override - CompletableFuture httpHead(String url) { + ResourceMetadata httpHead(String url) { String[] parts = url.split("#"); if (parts.length > 1) { int redirectNum = Integer.parseInt(parts[1]); String next = redirectNum <= 1 ? parts[0] : (parts[0] + "#" + (redirectNum - 1)); - return CompletableFuture.supplyAsync( - () -> new ResourceMetadata(Optional.of(next), url, 0, supportsRange)); + return new ResourceMetadata(Optional.of(next), url, 0, supportsRange); } byte[] bytes = resources.get(url); - return CompletableFuture.supplyAsync( - () -> new ResourceMetadata(Optional.empty(), url, bytes.length, supportsRange)); + return new ResourceMetadata(Optional.empty(), url, bytes.length, supportsRange); } }; } @ParameterizedTest @CsvSource({ - "false,100,0", - "true,100,0", - "true,2,0", - "false,100,1", - "false,100,2", - "true,2,4", + "false,0", + "true,0", + "false,1", + "false,2", + "true,4", }) - void testDownload(boolean range, int maxLength, int redirects) throws Exception { + void testDownload(boolean range, int redirects) throws Exception { Path dest = path.resolve("out"); String string = "0123456789"; String url = "http://url"; @@ -85,7 +79,7 @@ class DownloaderTest { Map resources = new ConcurrentHashMap<>(); byte[] bytes = string.getBytes(StandardCharsets.UTF_8); - Downloader downloader = mockDownloader(resources, range, maxLength); + Downloader downloader = mockDownloader(resources, range); // fails if no data var resource1 = new Downloader.ResourceToDownload("resource", initialUrl, dest); @@ -102,10 +96,10 @@ class DownloaderTest { assertEquals(10, resource2.bytesDownloaded()); // does not re-request if size is the same - downloads = 0; + downloads.set(0); var resource3 = new Downloader.ResourceToDownload("resource", initialUrl, dest); downloader.downloadIfNecessary(resource3).get(); - assertEquals(0, downloads); + assertEquals(0, downloads.get()); assertEquals(string, Files.readString(dest)); assertEquals(FileUtils.size(path), FileUtils.size(dest)); assertEquals(0, resource3.bytesDownloaded()); @@ -115,7 +109,7 @@ class DownloaderTest { String newContent = "54321"; resources.put(url, newContent.getBytes(StandardCharsets.UTF_8)); downloader.downloadIfNecessary(resource4).get(); - assertTrue(downloads > 0, "downloads were " + downloads); + assertTrue(downloads.get() > 0, "downloads were " + downloads); assertEquals(newContent, Files.readString(dest)); assertEquals(FileUtils.size(path), FileUtils.size(dest)); assertEquals(5, resource4.bytesDownloaded()); @@ -123,7 +117,7 @@ class DownloaderTest { @Test void testDownloadFailsIfTooBig() { - var downloader = new Downloader(config, stats, 2L) { + var downloader = new Downloader(config, 2L) { @Override InputStream openStream(String url) { @@ -136,8 +130,8 @@ class DownloaderTest { } @Override - CompletableFuture httpHead(String url) { - return CompletableFuture.completedFuture(new ResourceMetadata(Optional.empty(), url, Long.MAX_VALUE, true)); + ResourceMetadata httpHead(String url) { + return new ResourceMetadata(Optional.empty(), url, Long.MAX_VALUE, true); } }; diff --git a/planetiler-core/src/test/java/com/onthegomap/planetiler/util/FileUtilsTest.java b/planetiler-core/src/test/java/com/onthegomap/planetiler/util/FileUtilsTest.java index 0dbb5b71..2f480cbd 100644 --- a/planetiler-core/src/test/java/com/onthegomap/planetiler/util/FileUtilsTest.java +++ b/planetiler-core/src/test/java/com/onthegomap/planetiler/util/FileUtilsTest.java @@ -152,4 +152,11 @@ class FileUtilsTest { List.of("/shapefile/stations.shp", "/shapefile/stations.shx"), matchingPaths.stream().map(Path::toString).sorted().toList()); } + + @Test + void testExpandFile() throws IOException { + Path path = tmpDir.resolve("toExpand"); + FileUtils.setLength(path, 1000); + assertEquals(1000, Files.size(path)); + } }